Thursday, May 20, 2010

Scalable concurrent deterministic event processing via lazy topological sorting

Quite often in designing software you come across the case where you have a stream of events or messages and you have to correlate them to update a set of entities (distinctive state objects). In a financial system, you might have a stream of transaction events (not to be confused with transactions in the consistent/concurrent data sense) that are used to update multiple accounts (i.e. transfer money from one account to another) and in a video game you might have a set of things that happened (collisions, triggered events, timers, events from entities) that cause entities to be updated (rocket hits player, rocket explodes, player dies). Of course, in an MMORPG with a full economy system, you might have all of the above!

In general, processing events in a deterministic order per entity is also an important feature of these systems. In the case of a bank, it's usually illegal to process transactions out of order, because if I transferred money into an account and then paid a bill, I wouldn't want to end up in overdraw. For a game it may be important for reproducibility, which can be very important for debugging and keeping state in sync across a network for multi-player.

Doing this as a completely serial process is straightforward and easy. However, doing it concurrently can be quite a tricky problem, one that can be very hard to solve with fine grained locking. Interestingly enough, a simple traditional transactional system (as implemented in an RDBMS or Software Transactional Memory) alone isn't quite enough to handle the problem of deterministic ordering as transactions themselves will need to be processed in a deterministic order, which brings it back to a fully serial problem!

What we need is a lazily processed topological sort for events, where events are processed in dependent order and dependencies are implicit based on what entities must be consistent at the time each event is processed and the order the events arrive in. That sounds like quite a big problem to solve, especially with concurrency in the mix!

It turns out however that there is a solution, as long as you know what entities need to be processed for an event ahead of it being processed. Firstly, the system involves two processes running concurrently; the inherently serial event ordering (which may be split into two concurrent sub-processes) and the event processing, where multiple events can potentially be processed concurrently. I say "potentially" because in the worst case, every event involves the same entity, forcing serial processing.

Secondly, it turns out that once we have the system in place, we don't need a transaction system for consistency as with this solution each event is guaranteed to be processed consistently. This is good, because it means less overhead overall.

In the simplest and most naive terms, this ordering and consistency is accomplished by each entity having a FIFO queue. The event ordering process places events at the back of the queue for each entity the event processing for that event must touch. This is done in the order events arrive, to enforce the deterministic ordering of event processing.

The set of events for which this has been done are placed in a list (let's call it the "ordered" list). The ordering process will then poll through this list, checking which events have reached the front of all of the queues they have been placed in. These events may now be passed off for processing (on a work queue, or some such). This can be done concurrently to the event queuing done in the previous step.

The only thing that the processing must do (apart from the actual entity updates and event logic) is remove the events from the per-entity queues after the event has been processed. As such, queues must either be lock protected (although, only a single queue must be locked at a time, meaning no possibility of deadlocks) or using a lock-free algorithm (note this is a special case single provider/single consumer FIFO, as each entity may only be processed by on event processor at a time).

All these queues and such however impose an extra unnecessary overhead on processing. Luckily, we can virtualize the queues using a technique similar to the ticket spin-locks implemented in the Linux kernel. So in place of a queue, each entity has a ticket counter, one number indicating the ticket which is being processed, the other the back of the line. Instead of putting the event at the back of the queue, the event orderer takes a ticket from the back of the queue, incrementing the counter. The event processor will increment the "processing" counter on each entity after processing is completed (just like your number flashing up at the DMV) and during polling events which have all their tickets equal to the "processing" tickets of their entities will be dispatched for processing.

In fact, it turns out that if the counters are large enough they will never overflow in practice, only the *sum* of the tickets need to be stored in the event. It also turns out that there is only ever a single process that may increment each counter at any one time, so that only consistent atomic writes and reads are required, not even a full atomic increment. No locks, no composite atomic operations required (although your processing system may use them for dispatch and you may queue events that way)!

Another interesting thing about this algorithm is that it can be implemented so that event ordering is done on-demand when there are more events need to be processed or added. Given a queue of events to process for each "processing" thread, the ordering process can be invoked when the size of a processing queue falls below a certain threshold (with some sort of locking mechanism preventing two event ordering processes running at a time, although some finer grained locking mechanism can be used if only the polling part of the event ordering is done) and the ordering process can also be invoked when enough events have been added that need ordering. Doing it before the processing queue is completely empty means that other threads need not block waiting to get more events to process. In this way, all your event processing/queueing need only use limited size single producer/single consumer queues to work (for which there are some very fast lock-free implementations).

The main disadvantage of this method however is obvious; it's only useful if you know ahead of time what entities must be consistent for each event. Also, readers might block other readers, although in practice, this isn't a performance problem if you have lots of events that work on unrelated entities, because the other events can be processed as long as they're non-dependent. Finally, there is the potential for event ordering itself to become a bottleneck, although at this stage it is likely that some finer grained implementation of the ordering system and some micro-optimization can probably provide a reasonable boost.


  1. Nice!
    I guess the system may have a special support for events that touch only a single entity (if that's expected to be a common case). Each entity needs a SP/SC FIFO queue with such events. When processing of an event completes, the threads polls the queue while tickets are subsequent.
    I am only a bit concerned with readiness determination. If I have zillions of pending events and only a few ready to run, I need to periodically scan all pending event in the hope of picking up a few new ready to run events.

  2. Events that touch a single entity are indeed a special case that you can optimize for, but I don't know how much of a need there will be in practice as I imagine they would be first to be processed. It really comes down to the storage vs the cost of polling the extra events.

    As to the amount of polling you have to do, you only really need to have a fairly limited number of events in that ordered list to be polled, enough to feed your consumers really; you could even do the first phase of the event ordering on demand to fill the list to a particular size.

    The other big thing is that the oldest events are the most likely to be ready, so you can keep them in age order and poll from the start of the list and then stop when you have dispatched enough.

    In the end, you still pay a cost for contention, although it's lower than what would be paid with transactions or locking. There is also the advantage that if you have lots of events that are not dependent on the entities being contended for, they can "flow around" them and still be processed.

    It also should be noted that when you take the initial tickets for an event, if they match the "now processing" ticket already, the event can be dispatched without polling. So in a low-contention system, you don't pay much price at all!

  3. > It really comes down to the storage vs the cost of polling the extra events.

    I am not sure I get you. What extra storage are talking about? Note that "single-entry" events does not need to be stored in a global queue, they can be stored only in entry-specific queue.

    > As to the amount of polling you have to do, you only really need to have a fairly limited number of events in that ordered list to be polled, enough to feed your consumers really

    I suspect that there may be some "worst cases", when one needs to pool a lot of events to find few ready-to-run.
    It's like with GC. In some situations GC has to scan a whole huge heap to find that there is only few kb to reclaim. And such each time.

    > if they match the "now processing" ticket already, the event can be dispatched without polling

    Cool! I guess in some systems (bank account processing) 99% of events can be dispatched that way.