Several months ago I came across the very intriguing ZeroMQ project. I enjoy writing scalable services and 0MQ offered a set of paradigms and a library intended to ease a lot of the complexity of scalability. After reading the excellent guide, I wrote up a couple trivial experiements and demonstrated them to the team, highlighting the performance and simplicity of implementation. The team was duly impressed (by the library, not my experiments), and it didn’t take much effort to keep 0MQ in everyone’s mind until we had an opportunity to try it out in a real project.
In late May we got the go-ahead to re-implement our product data feeding service using 0MQ. The service was originally implemented as a highly multithreaded (100’s of concurrent threads) single-process Windows service running our own proprietary XML messaging platform over MSMQ. That messaging platform was not designed for the high data volumes we found ourselves handling, and over time we encountered several significant shortcomings that we hoped would be addressed by 0MQ and the paradigms brought with it. I’ve listed out those shortcomings along with how we’ve found 0MQ addresses them:
Our original implementation brought along a few legacy concerns that required a significant amount of state sharing between threads. While this allowed our code to take a more traditional object-oriented shape, tracking and safely sharing this state introduced tons of noise into the code as well as proving to be an endless wellspring of bugs and unanticipated complications.
0MQ’s solution for concurrent state is simple: there is none. With 0MQ, the philosophy is to use multiple single-threaded instances to achieve concurrency rather than one multithreaded instance. 0MQ then ensures that work is divided among your many instances and handed to each instance on a single thread. We were all in with this philosophy and wrote our instances in a completely state-free (and, hence, functional) style. While this style is a significant change from OOP, the resulting code simplicity more than makes up for it. In my opinion, this is the real silver bullet of 0MQ: achieving full concurrency without ever having to think about it in your code. For the small amount of state we had to share between processes, we wrote an extremely simple in-memory key/value store using 0MQ. Again, since 0MQ ensured we only received messages on a single thread, we didn’t have to clutter it with any locks or similar constructs.
Having the entire application hosted in a single process with many threads meant that there was a fair amount of boilerplate in any new code. Additionally, using MSMQ required that we maintained our own envelope, configuration and other data along with the message payload in a single large XML document. These two facts together required hundreds of lines of boilerplate and an understanding of the very complex hosting system in order to implement most basic functionality in any new modules.
When we reimplemented with 0MQ, reducing this complexity was a high priority feature. We packaged each step of our workflow (which we call a worker) as a DLL, then had a single worker host that we would run multiple instance of, loading each worker DLL. Since each worker host is independent, there’s no code required to integrate into an existing system.
0MQ also has the concept of message parts: each message can be sent as a series of chunks, and the result can be loaded as such. This allows us to partition the message into different independent areas (e.g. configuration, routing, etc.) and use the most efficient encoding means for each message part, rather than converting everything to XML as in the original implementation. Additionally, we were able to easily expose these parts as native objects rather than XPath expression, resulting in significantly cleaner code.
These changes, along with the elimination of the multithreaded state code mentioned above, gave us an average of an 80% reduction in line count in the affected areas, with most modules moving for 300-400 LOC to 60-80 LOC.
MSMQ tended to get in our way as much as it helped, and this was most obvious with regards to messaging performance. Every MSMQ message is persisted to disk, which was not desirable in our service.
0MQ has no built-in persistence and in some naive benchmarks provided a higher throughput than MSMQ (unfortunately I’ve lost the numbers). While we didn’t have an opportunity to switch out MSMQ for 0MQ on the original implementation for an apples-to-apples comparison, our new implementation is currently running 30% faster on the same data, much of which we attribute to no needing to interface with the disk for every message.
The only way to scale our single-process service was to run one instance per server and partition the input set modulo the number of instances. Unfortunately, the amount of work generated by each item in the input set was unknown ahead of time, so this partition was very inefficient and ultimately ineffective without a large number of high-performance servers.
0MQ’s many-instance model works across cores on a single machine as well as across multiple machines. We’ve written each stage in the workflow to be an independent process, so we can vary the number of processes to match the relative amount of CPU time spent on each one. For example if worker 2 needed twice as much CPU time as worker 1, we can fire up one instance of worker 1 and two instances of worker 2 to provide a constant throughput rate. Since these workers can be on many machines, the relative computing power of each server is irrelevant. Whether we fire up one worker instance each on two small servers or two worker instances on one larger server, the system will gain capacity equally.
Additionally, 0MQ automatically allows instances to freely connect and disconnect, allowing us to scale resources on demand. If we find that we’re maxing out our current capacity, we can simply bring a new machine online and the instances will join the existing workflow. Once the spike is over, we shutdown the machine and everything continues on as normal.
An instance of the original implementation would often consume all available CPU and RAM when under load, and could eat up dozens of gigabytes of disk space. Each thread would consume a certain amount of resources, and the threads would come and go rapidly. Our only control was to throttle the number of messages handled concurrently by each stage of the workflow, which either resulted in underutilization of the server for small items or resource over-consumption (and sometimes crashes) for large input items.
0MQ uses a single thread per instance, and our resource consumption per instance is stable. We can see our resource consumption stay constant whether under load or idle, and can adjust the number of instances to most efficiently utilize the given resources.
An unfortunate side effect of shared state is having that state become invalid when an error occurs. We did our best to mitigate this issue, but new exceptions would appear and cascade through the entire system, bringing the service (and sometimes the server) to its knees.
Under 0MQ, each worker process is independent and state free. If a worker crashes, we simply replace it with another instance and continue on.
Working with 0MQ has been a huge pleasure, bringing massive improvements in stability, understandability, and code quality. I also highly recommend anyone working on concurrent applications of any sort, whether or not you plan to use 0MQ, read the 0MQ Guide to learn how to mitigate most of the current headaches in multithreaded applications. I know for a fact that multithreading code is a path I will never voluntarily tread again.