Part2: SpawnEnumerator, dealing with billions of independent items

[ Part 1, this is Part 2, Part 3 ]

In Part 1 I’ve talked about the general notions behind SpawnEnumerator and played with the API. If you’re not familiar with Microsoft’s CCR, this post might require a 2nd & 3rd read to understand completely. CCR changed the way you should think or address async code. It’s a game worth playing and studying the rules is only for your advantage. Alright, enough chit chat, let’s make it happen (complete code attached at the end).

Class definition:

  public sealed class SpawnEnumerator<T> : IDisposable
       private SpawnEnumerator(int threadsCount, string threadsPoolName, IEnumerator<T> filler) { /* initialize fields by parameters, nothing more */ }

Now, let’s have a look at some of the fields:

  private const double DefaultLowThresholdPrcentage = 0.1; // When “items to process” queue reach 10%, we want to re-fill. Should be exposed of course.

  private event Action<DateTime> _enumeratorDepleted = delegate { }; // trigger when the enumerator is empty
  private event Action<DateTime> _allTasksAreCompleted = delegate { }; // trigger when all tasks are completed

  private readonly Dispatcher _dispatcher; // our “threadpool”
  private readonly DispatcherQueue _dispatcherQueue; // hold actual ITask, waiting for the dispatcher (aka worker threads) to handle them. more about it soon.

  private readonly Port<T> _itemsToProcessPort;  // hold wannabe tasks, currently a queue of items of T we want to process.
  private readonly Port<EmptyValue> _itemCompleteNotificationPort; // soon…
  private readonly Port<EmptyValue> _initializePort;  // soon…

  private readonly IEnumerator<T> _enumerator; // the enumerator we’ll use to fill the _itemsToProcessPort

Deeper look on what we have so far:

  • _itemsToProcessPort: will act as the queue of items we want to process. In CCR’s world, Port<T> is actually a “smart queue” (more on it later).
  • _itemCompleteNotificationPort: will be used to notify on every completed item. Assuming that we need to fill 100 items to the port, and our lower limit is 10%, we want to re-fill the _itemsToProcessPort every 90 completed items. Notice we’re using EmptyValue as T. EmptyValue is a CCR type that holds EmptyValue.SharedInstance to avoid memory allocation.
  • _initializePort: will be used to initialize the _itemsToProcessPort with the 1st bulk of items.

Step back, what’s going on?!!? why so many Port ?
Well, the idea behind CCR is all about messaging. You can pass messages to ports (Port<T> is thread-safe of course) and by doing so, you can take advantage of the “smart queue” implementation behind Port<T>. When posting a message to a port, the CCR will try to apply some “predicates” on the port and if a “predicate” returns true, it will dequeue the item(s) from the Port matched that “predicate”, create an ITask of it and push it into the DispatcherQueue as an “actual task”.

Using Ports makes it easier for us to define complex async code. instead of putting locks all over the place, I can simply post a message to _itemCompleteNotificationPort and after X messages posted to this port, ask to re-fill the queue when a thread is available. This is much easier then counting each completed item and if ((counter % X) == 0), lock some object and re-fill. Both will work, but using the CCR world you don’t have to think about technical async problems/solutions but rather on the logical async operations you want to perform. You’ll write much less code, zero locks of your own and mostly think about “this could run concurrently”, “this must run exclusively” and let the CCR schedule everything for you.

Start method: (as discussed in Part 1)
public static SpawnEnumerator<T> Start(int threadsCount, string threadsPoolName, int upperQueueSize, IEnumerable<T> filler, Action<T> handler)
  // .. validate parameters, nothing interesting…
  SpawnEnumerator<T> sw = new SpawnEnumerator<T>(threadsCount, threadsPoolName, filler.GetEnumerator());
  sw.Initialize(handler, upperQueueSize);
  return sw;

API Design: Why Start method with private constructor instead of public constructor alone:
The client of this method should understand that once she supply the arguments, things will start happening – we’ll immediately start to process items from the enumerator. You’ll soon find out that Start is non-blocking method. This “Start” method, so I feel, make it’s it more explicit as it should be.

Initialize method:
private void Initialize(Action<T> handler, int upperQueueSize)
  RegisterRecievers(handler, upperQueueSize); // where CCR *magic* happens. soon…
  _initializePort.Post(EmptyValue.SharedInstance); // post a message to let “someone” know we want to fill the 1st bulk of items

RegisterRecievers method:
Before we look at the code, here is a remainder of the main things we want to accomplish:

  1. We should fill the _itemsToProcessPort for the 1st bulk or once we reach the lower limit of the queue, by counting how many items were completed. Keeping in mind that _enumerator is not thread-safe and we don’t want to start locking access to it on our own, we should make sure that re-filling is done exclusively from 1 thread only.
  2. We want to handle each one of the items posted to _itemsToProcessPort with the supplied “handler” (given in Start method). Each item is independent so obviously we want to process each item concurrently, according to the amount of threads in the Dispatcher.

private void RegisterRecievers(Action<T> handler, int upperQueueSize)
  int numberOfItemsToDepleteBeforePushingNewBulk = (int)Math.Ceiling((1 – DefaultLowThresholdPrcentage) * upperQueueSize);


         new TeardownReceiverGroup(),// nothing here
         new ExclusiveReceiverGroup(
                // 1st bulk:
                Arbiter.Receive(false, _initializePort, delegate { FillItemsToProcessQueueWithNextBulk(upperQueueSize); }), // only once, that’s why the “false” is here.

                // enough items were completed which means “items to process” queue reached lower limit:
                Arbiter.MultipleItemReceive(true, _itemCompleteNotificationPort, numberOfItemsToDepleteBeforePushingNewBulk,
                                          delegate { FillItemsToProcessQueueWithNextBulk(upperQueueSize); })
         new ConcurrentReceiverGroup(
                // process items concurrently
                Arbiter.Receive(true, _itemsToProcessPort, // listen to every post, that’s why the “true”
                     delegate(T item)
                         catch (Exception err) { /* log error */ }

Alright, this is a bit harder to read but let’s try to simplify it by reading it from inside-out.

  • Under ExclusiveReceiverGroup you’ll see 2 receivers, one that listen to _initalizePort and once a message is posted, it will create an ITask that internally (when a thread is available) call FillItemsToProcessQueueWithNextBulk method. The second receiver will listen to _itemCompleteNotificationPort and do the same as the first one for every numberOfItemsToDepleteBeforePushingNewBulk items posted to the Port.
  • Under ConcurrentReceiverGroup you’ll see a receiver listening to _itemsToProcessPort and for every message it will create an ITask that internally (again, one a thread is available) will run the given “item handler” with the item dequeued from the Port.
  • We use Arbiter.Activate(_dispatcherQueue, …) to register each created ITask from the receivers to the internal queue. Queue of actual tasks.
  • This method is non-blocking, we only register receivers that know how to create and enqueue ITask from messages posted to some Port. That’s it.

We post messages to different Port<T> and write receivers that “listen” to the messages based on some rules (one receiver listen to every message, one listen to only one message, one listen to X messages etc). When the receiver’s rules apply, the CCR will dequeue all the relevant (applied the rule) messages from the Port<T> and wrap them with the supplied delegated as ITask. This ITask instance will be enqueued to the DispatcherQueue until the Dispatcher have a free thread to handle it. The Dispatcher, according to the requested scheduling (some ITask must run exclusively, some can run concurrently, as we’ve seen), will execute those tasks.

You can download the complete code (with some extra features) here: SpawnEnumerator.txt (12KB) – you’ll need CCR & log4net dll in order to compile it.


Oren Ellenbogen