Part1: SpawnEnumerator, dealing with billions of independent items

[ this is Part 1, Part 2, Part 3 ]

I promised to write more about CCR and how to use it in practice.
There is no better way to explain how infrastructure work then writing some code and play with it, so lets toy with a simple utility, based on CCR under the hood.

Scenario: you’ve got billions of *independent* tasks you need to execute as fast as possible.

  1. Amount of “worker threads” should be easy to define. obviously.
  2. Memory consumption – don’t eat more than you can chew. We don’t want to hold everything in memory, this should be easy to configure.
  3. The process should *not* shutdown due to un-handled exception in one of the worker threads.
  4. We want to know when the enumerator is depleted and/or all items were completed. This is crucial for testing and adjusting parameters.


The general notion behind SpawnEnumerator is something like this:



We’re pulling a bulk of items from the enumerator, each one of them is actually a “wannabe” item. Combining the value of each T with a given “item handler” could be transformed into an actual task, waiting for its turn in the “threads pool”. Once the port/queue of “items to execute” reach the lower limit, we’ll pull another bulk of items from the enumerator until the enumerator is depleted.

API playground:

SpawnEnumerator<T>.Start(int numberOfThreads, string threadPoolName, int numberOfItemsToHoldInMemory, IEnumerator<T> enumerator, Action<T> singleItemHandler)

T = type of items we would like to run. if we have 10,000,000 items of type int, we’ll supply “int” as T.
numberOfThreads = how many worker threads do we want to run?
threadPoolName = name the worker threads for easier debugging.
numberOfItemsToHoldInMemory = rough number of items we want to hold in memory, to make sure memory consumption won’t blow up in our face.
enumerator = the enumerator of “items” to execute, assuming that it will hold a *huge* amount of tasks.
singleItemHandler = delegate that receives 1 item and handle it.

event Action<DateTime> EnumeratorDepleted

event Action<DateTime> AllTasksAreCompleted

Test playground:

(written in notepad, sorry for stupid mistakes)

public void Execute_LargeEnumeratorOfNumbers_ExecuteAllItems

// arrange:
int numberOfThreads = 10;
string threadPoolName = “mypool”;
int numberOfItemsToHoldInMemory = 100;

IEnumerator<int> items = Yield.For(1, 10000); // yield return numbers from 1-10000
Action<int> singleItemHandler = delegate(int item) {  /* sum the given number, via Interlocked.Add */ };

ManualResetEvent trigger = new ManualResetEvent(false);

// act:
_executor = SpawnEnumerator<int>.Start(threadPoolSize, threadPoolName, numberOfItemsToHoldInMemory, items, singleItemHandler))
_executor.AllTasksAreCompleted += delegate { trigger.Set(); };   

// assert:
bool signaled = trigger.WaitOne(TimeSpan.FromSeconds(1), false);
Assert.IsTrue(signaled, “timedout reached, that shouldn’t happen!”);
// assert that all of the items were called by matching sum of 1-N sequential numbers (simple formula) to what we collected in the handler


* In the “test teardown” we can check if _executor is not null and if so Dispose it to close the “worker threads”.


Next post – Implementing SpawnEnumerator via CCR.


Oren Ellenbogen