Qupid - A Golang Queue
In my experience working with Golang thus far, I encounter a pretty regular requirement to fetch some sort of data, process the data accordingly and then persist or send the output. Naturally the use of Golangs’ channels really shines here, but you still need a little orchestration to tie your channels together. Publish to one and subscribe to another along with the ability to stop processing as and when required too!
But how do you stop processing duplicate records? Take the following simple example.
+-----------+ | | | Datastore | ^^^ | | ^^^ ||| | | ||| ||| +-----------+ ||| ||| ||| Poll ||| ||| Save ||| ||| ||| +---------+ ||| ||| | | ||| ||| | App | ||| vvv | | ||| | | +---------+
In this example the App polls a datastore to find records that need to be processed and then persists them back. Note that there is no pushing or event dispatchers here =] When it comes to ensuring consistency, one would often leave that up to a 3rd party vendor, eg database transactions.
But rather than force that logic down the chain into a black box, I find it’s much better to bring that logic into plain view at the code level. In this way anyone involved has the same viewpoint, can identify issues and contribute with patches and tests. In addition, what do you do when unable to rely on a database transaction, when retrieving data from multiple external sources? Something more distributed like this:
+-----------+ +---------+ +-----------+ | | | | | | | API | Poll | App | Save | NOT | | Resource |<------->| |------->| S3!!! | | | | | | | +-----------+ +---------+ +-----------+
Clearly you can’t rely on a database transaction here! Because I found myself repeatedly dealing with the generic structure above, I’ve created a Golang package to take the hassle out of queues and channels so you can concentrate on the important stuff!
Called Qupid, you can head over to Gitlab and view the source code
To handle state locking and keeping things mutually exclusive, Golang offers the
sync package and it is utilised so that completed queue entries do not
(by default) get reprocessed. Although you can of course explicitly remove entries as required or auto purge
completed results! If you want to know more, you can see all of that and more via the
source code and the example file.
As always I’m happy to take on comments and feedback and of course with ANY distributed system, YOU need to ensure that your work is idempotent!!!!
Happy coding =]