Part3: SpawnEnumerator, dealing with billions of independent items
[ Part 1, Part 2, this is Part 3 ]
In part 1 I’ve talked about the general notion behind SpawnEnumerator. In part 2 I’ve implemented it and talked a bit about CCR and how it works under the hood.
In the third and last part I’ll try to show how we can actually execute billions of tasks very easily.
Iterating over billions of tasks:
In my previous post I’ve tried to show one of the greatest yet unused features in C# 2.0 – the yield statement. I’ve closed the post with:
“Prefer using the yield statement as long as calculating values doesn’t require holding an expensive resource like a DB connection, or a FileHandler for long period of time.”
Now, let’s assume that we have 100,000,000 Tasks to pull from a repository (DB, files, doesn’t really matter) and execute them all via SpawnEnumerator.
Should we define a list of Task, fill it with 100,000,000 items and then iterate over it? of course not! we can’t even if we want to.
Assuming that reading 1000 (for example) Tasks is much faster than executing them (it usually is), and it’s safe to hold them in memory, we can “bulk read & yield” the entire thing:
public delegate T Func<T>();
public static class Yield
{
public static IEnumerable<T> Bulked<T>(Func<IEnumerable<T>> bulkYielder)
{
while (true)
{
IEnumerable<T> yielder = bulkYielder();
if (yielder == null)
throw new ArgumentException(“bulkYielder cannot return a null enumerable”, “bulkYielder”);
int itemsGiven = 0;
foreach (T t in yielder)
{
itemsGiven++;
yield return t;
}
if (itemsGiven == 0)
break;
}
}
}
This can be used to return a stream of “tasks bulk” :
Yield.Bulked<Task>(delegate { return tasksRepository.Dequeue(1000); }) // return the next 1000 from the 100,000,000 items queue until no more tasks exists in the repository.
Important note:
In this scenario, we should return 1000 items from Dequeue method without using yield to avoid holding expensive resources for long period of time (as discussed).
Once we have those 1000 items, we can yield each one of them to the caller (very cheap, no resources are used).
Finally, we can execute billions of tasks in parallel via SpawnEnumerator:
IEnumerator<Task> tasksEnumerator = Yield.Bulked<Task>(delegate { return tasksRepository.Dequeue(1000); });
Action<Task> taskHandler = delegate(Task t) { /* execute single task */ };
// execute the tasks in parallel, using 50 threads and holding ~1000 items in memory.
_tasksExecutor = SpawnEnumerator<Task>.Start(50, “mypool”, 1000, tasksEnumerator, taskHandler);// note:don’t forget to dispose _tasksExecutor when killing/stopping app!
Using Yield.Bulked guarantees that we won’t hold expensive resources for too long while allowing us to generate a “stream” of Tasks to run in parallel.
The code is easy to read and follow (I hope) and we gain a simple method for executing billions of tasks very effectively (CPU & memory wise).