Thursday, May 20, 2010

Scalable concurrent deterministic event processing via lazy topological sorting

Quite often in designing software you come across the case where you have a stream of events or messages and you have to correlate them to update a set of entities (distinctive state objects). In a financial system, you might have a stream of transaction events (not to be confused with transactions in the consistent/concurrent data sense) that are used to update multiple accounts (i.e. transfer money from one account to another) and in a video game you might have a set of things that happened (collisions, triggered events, timers, events from entities) that cause entities to be updated (rocket hits player, rocket explodes, player dies). Of course, in an MMORPG with a full economy system, you might have all of the above!

In general, processing events in a deterministic order per entity is also an important feature of these systems. In the case of a bank, it's usually illegal to process transactions out of order, because if I transferred money into an account and then paid a bill, I wouldn't want to end up in overdraw. For a game it may be important for reproducibility, which can be very important for debugging and keeping state in sync across a network for multi-player.

Doing this as a completely serial process is straightforward and easy. However, doing it concurrently can be quite a tricky problem, one that can be very hard to solve with fine grained locking. Interestingly enough, a simple traditional transactional system (as implemented in an RDBMS or Software Transactional Memory) alone isn't quite enough to handle the problem of deterministic ordering as transactions themselves will need to be processed in a deterministic order, which brings it back to a fully serial problem!

What we need is a lazily processed topological sort for events, where events are processed in dependent order and dependencies are implicit based on what entities must be consistent at the time each event is processed and the order the events arrive in. That sounds like quite a big problem to solve, especially with concurrency in the mix!

It turns out however that there is a solution, as long as you know what entities need to be processed for an event ahead of it being processed. Firstly, the system involves two processes running concurrently; the inherently serial event ordering (which may be split into two concurrent sub-processes) and the event processing, where multiple events can potentially be processed concurrently. I say "potentially" because in the worst case, every event involves the same entity, forcing serial processing.

Secondly, it turns out that once we have the system in place, we don't need a transaction system for consistency as with this solution each event is guaranteed to be processed consistently. This is good, because it means less overhead overall.

In the simplest and most naive terms, this ordering and consistency is accomplished by each entity having a FIFO queue. The event ordering process places events at the back of the queue for each entity the event processing for that event must touch. This is done in the order events arrive, to enforce the deterministic ordering of event processing.

The set of events for which this has been done are placed in a list (let's call it the "ordered" list). The ordering process will then poll through this list, checking which events have reached the front of all of the queues they have been placed in. These events may now be passed off for processing (on a work queue, or some such). This can be done concurrently to the event queuing done in the previous step.

The only thing that the processing must do (apart from the actual entity updates and event logic) is remove the events from the per-entity queues after the event has been processed. As such, queues must either be lock protected (although, only a single queue must be locked at a time, meaning no possibility of deadlocks) or using a lock-free algorithm (note this is a special case single provider/single consumer FIFO, as each entity may only be processed by on event processor at a time).

All these queues and such however impose an extra unnecessary overhead on processing. Luckily, we can virtualize the queues using a technique similar to the ticket spin-locks implemented in the Linux kernel. So in place of a queue, each entity has a ticket counter, one number indicating the ticket which is being processed, the other the back of the line. Instead of putting the event at the back of the queue, the event orderer takes a ticket from the back of the queue, incrementing the counter. The event processor will increment the "processing" counter on each entity after processing is completed (just like your number flashing up at the DMV) and during polling events which have all their tickets equal to the "processing" tickets of their entities will be dispatched for processing.

In fact, it turns out that if the counters are large enough they will never overflow in practice, only the *sum* of the tickets need to be stored in the event. It also turns out that there is only ever a single process that may increment each counter at any one time, so that only consistent atomic writes and reads are required, not even a full atomic increment. No locks, no composite atomic operations required (although your processing system may use them for dispatch and you may queue events that way)!

Another interesting thing about this algorithm is that it can be implemented so that event ordering is done on-demand when there are more events need to be processed or added. Given a queue of events to process for each "processing" thread, the ordering process can be invoked when the size of a processing queue falls below a certain threshold (with some sort of locking mechanism preventing two event ordering processes running at a time, although some finer grained locking mechanism can be used if only the polling part of the event ordering is done) and the ordering process can also be invoked when enough events have been added that need ordering. Doing it before the processing queue is completely empty means that other threads need not block waiting to get more events to process. In this way, all your event processing/queueing need only use limited size single producer/single consumer queues to work (for which there are some very fast lock-free implementations).

The main disadvantage of this method however is obvious; it's only useful if you know ahead of time what entities must be consistent for each event. Also, readers might block other readers, although in practice, this isn't a performance problem if you have lots of events that work on unrelated entities, because the other events can be processed as long as they're non-dependent. Finally, there is the potential for event ordering itself to become a bottleneck, although at this stage it is likely that some finer grained implementation of the ordering system and some micro-optimization can probably provide a reasonable boost.