Microsoft CCR: clean way to write parallel code in .Net

Having to parallelize almost every bit of code here at Delver, some common patterns emerged while we wrote a lot of back-end code.
I remember reading about a new framework from Microsoft Robotics Division called “Microsoft CCR” (Concurrency and Coordination Runtime) a few months ago in the “concurrent affairs” column at the MSDN Magazine but I didn’t pay much attention to it at the time. Two weeks ago, it jumped back to my mind so I revisit the article and started diving a little deeper into it, thinking about what sort of problems it can solve in my world and if it does, where could I use it to our benefit. If you don’t know anything about the CCR, there is great public content published already like the CCR User Guide but I’ll try to give you a 2 minutes intro of the general architecture. The way CCR is built is very much like the SOA world, using messages to communicate between “services”. The major components are the Dispatcher which is actually an array of OS Threads, the DispatcherQueue which holds a queue of delegates to run so the Dispatcher can “look” at the DispatcherQueue and when it has a free OS Thread available, it pulls one delegate out of the queue and run it. So far – we’ve got a classic ThreadPool. There are some differences but I’ll let you read about it in the User Guide. The third component, which is the most important one is Port. Think about Port as a pipe that can receive messages (FIFO style – first in, first out) and hold them until someone will know what to do with them. The last component is the “manager”, the Arbiter; Arbiter expose a set of methods that allows you to listen to a given pipe and if some conditions are met on the messages the pipe contains, we can take the message(s) and transform them into a runnable delegate placed in the DispatcherQueue.


One of the goals for this library is the make sure you’ve got much less places to go wrong, by exposing a set of common async patterns you can easily use to guarantee clean(er) code that is easier to read. Think about sending messages from one pipe to another, creating a flow-based code via messages rather than spaghetti code with a lot of messy indentation. This is a very powerful architecture.
Obviously, the entire CCR framework is thread-safe by design so no need to protect the library. A simple example:



using (Dispatcher dispatcher = new Dispatcher(5, “my_dispatcher”)) //5 OS threads
using (DispatcherQueue queue = new DispatcherQueue(“my_queue”, dispatcher))
{
    Port<Uri> urlsPort = new Port<Uri>();
    
    Arbiter.Activate(queue,
        Arbiter.Receive(true, urlsPort, delegate(Uri uri) {
            // some code(run async!): for example we can fetch the uri content(HTML) and persist it to your Hard Disk..
         })
    );

    urlsPort.Post(new Uri(“http://www.lnbogen.com”));
    urlsPort.Post(new Uri(“http://www.delver.com”));
}


There is no need to create an array of Threads and some sort of Queue<Uri> in order to pull the items. The “ThreadPool” is implemented as part of the CCR.
So far no big deal right? well, it turns out that you can easily write common patterns with much less complexity: almost no locks, less spaghetti code and much less code in general.


One of the patterns we (all of us) use a lot is the “execute-and-wait-till-finish” pattern where you’ve got a list of independent items you want to run in parallel, but you want your main thread to wait for them to finish before continue. The simplest way to achieve it is by creating an array of Thread, activating them with a delegate and then call Join() on each one of the Threads. Let’s define a few more requirements for this pattern:



  1. We want to be able to know about all the errors that occurred during the execution.
  2. We want to be able to set a timeout so each operation(inside a Thread) won’t take more than a sensible time.
  3. We want to be able to know which items were timed out and when.
  4. We want to be able to know which items were completed successfully.
  5. BONUS: We want to avoid writing the obvious. 

Well, in order to implement these requirements from scratch, we need to use an outer timer with some sort of List (for example) so each thread will “register” to it when it begins and “unregister” when it’s done. The timer should be able to interrupt the thread and be optimized to “wake up” as soon as possible (determined by the registered threads and which thread needs to wake up first(next in line)). In addition, we need some sort of List of exceptions to collect all the exceptions that occurred and make sure we lock shared objects. We’ll need to use Thread[] and some sort of Queue to enqueue\dequeue items to\from it. A lot of code for a simple pattern.

With Microsoft CCR it’s much easier.
Assuming that we want to handle a list of strings:


List<string> messagesToWorkOn = new List<string>();
for (int i = 0; i < 10; i++)
   messagesToWorkOn.Add(“message “ + i);


Here is the final API I’ve implemented on top of the CCR:


using (SpawnAndWaitPattern<string> saw = new SpawnAndWaitPattern<string>(5, “mythreadspool”))
{
   AggregatedResult<string> result = saw.Execute(messagesToWorkOn, TimeSpan.FromMilliseconds(500),
                                             delegate(string msg)
                                             {
                                                if (msg.Contains(“1”))
                                                   Thread.Sleep(2000); // simulate time-out
                                                else if (msg.Contains(“5”))
                                                   throw new ArgumentException(“5 is a very bad value…”); // simulate exception
   
                                                Console.WriteLine(“The message: “ + msg + ” processed successfully.”);
                                             });

   Console.WriteLine(“Done!”);
   Console.WriteLine(“Summarized Report:\n completed results: “ 
      + result.SuccessfulItems.Count + “\n exceptions occurred: “ + result.FaultedItems.Count 
      + “\n timed-out results: “ + result.TimedoutItems.Count);
}


We’ve got 5 OS Threads, we’re waiting for up to 0.5 second per item and we’ve got a full result object, holding all the requirements from above.


The code of SpawnAndWaitPattern class is quite neat and contains 0 locks (on my behalf, the CCR manage its own locks). The CCR schedule the tasks for me; combining it with thread-safe Port and we’ve got a very powerful yet simple framework at our hands. I decided to attach the entire solution (with A LOT of nice-green-comments) including the Ccr.Core.dll file so you could play with it:


CcrPatterns.rar (162.64 KB)


Have fun.

 

Oren Ellenbogen