Microsoft CCR: clean way to write parallel code in .Net

Having to parallelize almost every bit of code here at Delver, some common patterns emerged while we wrote a lot of back-end code.
I remember reading about a new framework from Microsoft Robotics Division called “Microsoft CCR” (Concurrency and Coordination Runtime) a few months ago in the “concurrent affairs” column at the MSDN Magazine but I didn’t pay much attention to it at the time. Two weeks ago, it jumped back to my mind so I revisit the article and started diving a little deeper into it, thinking about what sort of problems it can solve in my world and if it does, where could I use it to our benefit. If you don’t know anything about the CCR, there is great public content published already like the CCR User Guide but I’ll try to give you a 2 minutes intro of the general architecture. The way CCR is built is very much like the SOA world, using messages to communicate between “services”. The major components are the Dispatcher which is actually an array of OS Threads, the DispatcherQueue which holds a queue of delegates to run so the Dispatcher can “look” at the DispatcherQueue and when it has a free OS Thread available, it pulls one delegate out of the queue and run it. So far – we’ve got a classic ThreadPool. There are some differences but I’ll let you read about it in the User Guide. The third component, which is the most important one is Port. Think about Port as a pipe that can receive messages (FIFO style – first in, first out) and hold them until someone will know what to do with them. The last component is the “manager”, the Arbiter; Arbiter expose a set of methods that allows you to listen to a given pipe and if some conditions are met on the messages the pipe contains, we can take the message(s) and transform them into a runnable delegate placed in the DispatcherQueue.


One of the goals for this library is the make sure you’ve got much less places to go wrong, by exposing a set of common async patterns you can easily use to guarantee clean(er) code that is easier to read. Think about sending messages from one pipe to another, creating a flow-based code via messages rather than spaghetti code with a lot of messy indentation. This is a very powerful architecture.
Obviously, the entire CCR framework is thread-safe by design so no need to protect the library. A simple example:



using (Dispatcher dispatcher = new Dispatcher(5, “my_dispatcher”)) //5 OS threads
using (DispatcherQueue queue = new DispatcherQueue(“my_queue”, dispatcher))
{
    Port<Uri> urlsPort = new Port<Uri>();
    
    Arbiter.Activate(queue,
        Arbiter.Receive(true, urlsPort, delegate(Uri uri) {
            // some code(run async!): for example we can fetch the uri content(HTML) and persist it to your Hard Disk..
         })
    );

    urlsPort.Post(new Uri(“http://www.lnbogen.com”));
    urlsPort.Post(new Uri(“http://www.delver.com”));
}


There is no need to create an array of Threads and some sort of Queue<Uri> in order to pull the items. The “ThreadPool” is implemented as part of the CCR.
So far no big deal right? well, it turns out that you can easily write common patterns with much less complexity: almost no locks, less spaghetti code and much less code in general.


One of the patterns we (all of us) use a lot is the “execute-and-wait-till-finish” pattern where you’ve got a list of independent items you want to run in parallel, but you want your main thread to wait for them to finish before continue. The simplest way to achieve it is by creating an array of Thread, activating them with a delegate and then call Join() on each one of the Threads. Let’s define a few more requirements for this pattern:



  1. We want to be able to know about all the errors that occurred during the execution.
  2. We want to be able to set a timeout so each operation(inside a Thread) won’t take more than a sensible time.
  3. We want to be able to know which items were timed out and when.
  4. We want to be able to know which items were completed successfully.
  5. BONUS: We want to avoid writing the obvious. 

Well, in order to implement these requirements from scratch, we need to use an outer timer with some sort of List (for example) so each thread will “register” to it when it begins and “unregister” when it’s done. The timer should be able to interrupt the thread and be optimized to “wake up” as soon as possible (determined by the registered threads and which thread needs to wake up first(next in line)). In addition, we need some sort of List of exceptions to collect all the exceptions that occurred and make sure we lock shared objects. We’ll need to use Thread[] and some sort of Queue to enqueue\dequeue items to\from it. A lot of code for a simple pattern.

With Microsoft CCR it’s much easier.
Assuming that we want to handle a list of strings:


List<string> messagesToWorkOn = new List<string>();
for (int i = 0; i < 10; i++)
   messagesToWorkOn.Add(“message “ + i);


Here is the final API I’ve implemented on top of the CCR:


using (SpawnAndWaitPattern<string> saw = new SpawnAndWaitPattern<string>(5, “mythreadspool”))
{
   AggregatedResult<string> result = saw.Execute(messagesToWorkOn, TimeSpan.FromMilliseconds(500),
                                             delegate(string msg)
                                             {
                                                if (msg.Contains(“1”))
                                                   Thread.Sleep(2000); // simulate time-out
                                                else if (msg.Contains(“5”))
                                                   throw new ArgumentException(“5 is a very bad value…”); // simulate exception
   
                                                Console.WriteLine(“The message: “ + msg + ” processed successfully.”);
                                             });

   Console.WriteLine(“Done!”);
   Console.WriteLine(“Summarized Report:\n completed results: “ 
      + result.SuccessfulItems.Count + “\n exceptions occurred: “ + result.FaultedItems.Count 
      + “\n timed-out results: “ + result.TimedoutItems.Count);
}


We’ve got 5 OS Threads, we’re waiting for up to 0.5 second per item and we’ve got a full result object, holding all the requirements from above.


The code of SpawnAndWaitPattern class is quite neat and contains 0 locks (on my behalf, the CCR manage its own locks). The CCR schedule the tasks for me; combining it with thread-safe Port and we’ve got a very powerful yet simple framework at our hands. I decided to attach the entire solution (with A LOT of nice-green-comments) including the Ccr.Core.dll file so you could play with it:


CcrPatterns.rar (162.64 KB)


Have fun.

 

Bring lock back until 12, it has a busy day tomorrow

One of the downsides of using lock is obviously performance. While locking an object, any other thread trying to acquire the lock on that object will wait in line. This can open up a deep hole to performance hit. Rule of thumb while working with locks is to acquire it as late as possible and release it as soon as possible. To demonstrate the order of magnitude bad usage of locks can affect your performance, I decided to write a little demo. 
So let’s assume we have a component that is responsible for executing tasks while getting new ones in the process (on different threads). I tried to make this example as simple as possible. Let’s start with our “task” class:


public class Task
{
   private int _id;
   private string _name;

   public Task(int id, string name) {
      _id = id;
      _name = name;
   }

   public int Id { // getter, setter }
   public string Name { // getter, setter }
}


We have a TasksRunner that’s responsible for getting new tasks and saving it to internal list and executing the current tasks every X milliseconds (via timer). In order to simulate a real-life process, I’ve made sure that executing a single task is expensive. Let’s start with the non-optimized solution:


public class TasksRunner
{
   private List<Task> _tasks;
   private System.Timers.Timer _handleTasksTimer;

   public TasksRunner()
   {
      _tasks = new List<Task>();

      _handleTasksTimer = new Timer(200); 
      _handleTasksTimer.Elapsed += new System.Timers.ElapsedEventHandler(_handleTasksTimer_Elapsed);
      _handleTasksTimer.Start();
   }


   public void AddTask(Task t)
   {
      lock (_tasks)
      {
         _tasks.Add(t);
         Console.WriteLine(“Task added, id: “ + t.Id + “, name: “ + t.Name);
      }
   }

   //Execute the (delta) tasks in a thread from the ThreadPool
   private void _handleTasksTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
   {
      ExecuteCurrentTasks();
   }

   public void ExecuteCurrentTasks()
   {
      lock (_tasks)
      {
         foreach (Task t in _tasks)
            ExecuteSingleTask(t);
         
         _tasks.Clear();
      }
   }

   private void ExecuteSingleTask(Task t)
   {
      Console.WriteLine(“Handling task, id: “ + t.Id + “, name: “ + t.Name);
      Thread.Sleep(1000); //simulate long run
   }
}


AddTask will acquire the lock on _tasks and add the new task to the list while ExecuteCurrentTasks will acquire the lock (on _tasks) and simulate real execution on the task. Notice that during the execution, calling AddTask will wait until the current execution will be finished. Using Roy‘s ThreadTester, we can run the following in order to notice the behavior so far:


static void Main(string[] args)
{
   TasksRunner runner = new TasksRunner();
   ThreadTester threadTester = new ThreadTester();
   threadTester.RunBehavior = ThreadRunBehavior.RunUntilAllThreadsFinish;

   Stopwatch watch = new Stopwatch();
   watch.Start();

   int numberOfTasksToCreate = 100;
   threadTester.AddThreadAction(delegate
      {
         for (int j = 0; j < numberOfTasksToCreate; j++) 
         {
            runner.AddTask(new Task(j, “job “ + j));
            Thread.Sleep(100);
         }
      });

   threadTester.StartAllThreads(int.MaxValue); //wait, no matter how long

   Console.WriteLine(“Total time so far (milliseconds): “ + watch.ElapsedMilliseconds);
   Console.WriteLine(“Tasks added so far: “ + runner.TasksAdded);
   Console.WriteLine(“Tasks executed so far: “ + runner.TasksExecuted);
   Console.WriteLine(“Waiting for tasks to end…”);

   while (runner.TasksExecuted < numberOfTasksToCreate)
      Thread.Sleep(1000);

   runner.Shutdown();

   Console.WriteLine(“done!”);
   Console.WriteLine(“Total time so far (milliseconds): “ + watch.ElapsedMilliseconds);
   Console.WriteLine(“Tasks added so far: “ + runner.TasksAdded);
   Console.WriteLine(“Tasks executed so far: “ + runner.TasksExecuted);
}


Running this test will give us a very poor result for adding & executing 100 tasks takes around ~99 seconds.


No doubt, the lock on _tasks while executing each and every task in the list is too expensive as we’re depend on ExecuteSingleTask (which is expensive by itself). This way, each new task we’re trying to add must wait until the current execution is finished. An elegant solution to this problem, suggested by my teammate Tomer Gabel, is to use a temporal object to point to the current tasks thus freeing the lock much quicker. So here is an optimized version of ExecuteCurrentTasks:


public void ExecuteCurrentTasks()
{
   List<Task> copyOfTasks = null;
   lock (_tasks)
   {
      copyOfTasks = _tasks;
      _tasks = new List<Task>();
   }

   foreach (Task t in copyOfTasks)
      ExecuteSingleTask(t);
}


This little refactoring give us around ~11 seconds for adding & executing 100 tasks.


Smoking!

 

Writing Thread Safety tests for instance data

In my last post, I wrote about Implementing a simple multi-threaded TasksQueue. This post will concentrate in how to test for Thread Safety of the queue. Reminder: our queue is used by multiple consumers which means that I must make sure that before each Enqueue\Dequeue\Count, a lock will be obtained on the queue. Imagine that I have 1 item in the queue and 2 consumers trying to dequeue this item at the same time from different threads: The first dequeue will work just fine but the second will throw an exception (dequeue from an empty queue). We’re actually trying to make sure that this queue works as expected in multi-threaded environment. So far about our goal.

So how can we test it?
Testing for the queue’s thread safety through testing of TasksQueue, the way it’s written now, can be quite hard and misleading. The ConsumeTask method calls dequeue inside a lock but what if we had a thread-safety-related-bug there? do we test only that the dequeue works as expected? not really. ConsumeTask (1) dequeue an item and then (2) “consume it”. We’re actually testing 2 behaviors\logics – this way, it’s really hard to test only for the queue’s thread safety. We should always test a single method for a specific behavior and eliminate dependencies. Only when we cover our basis, we can check for integration between multiple components (the underlying queue and the TasksQueue).


One way of allegedly achieving this goal is to create a decorator around the queue, let’s call it SafeQueue, which will encapsulate a queue and wrap it with thread-safe forwarding of the calls (it will lock some inner object and call the original queue). The SafeQueue could be tested then by its own and used by our TasksQueue. This will “enable” us to remove the locking in the TasksQueue and use Set\WaitOne instead of Pulse\Wait in order to notify our consumers on arrival of a new task: 


while (_safeQueue.Count == 0)
   Monitor.WaitOne();

// NOTICE: by the time we get here, someone could have pulled the last item from the queue on another thread!
string
item = _safeQueue.Dequeue();


WATCH OUT: This is a deadly solution that will make our TasksQueue break in a multi-threaded environment. Just like that, our code is not thread-safe anymore although we’re using a SafeQueue that expose (atomic) thread-safe methods\properties. This is exactly why instance state should not be thread-safe by default (more details at Joe Duffy’s post).


The locking of the queue should remain in our TasksQueue, but we should separate the dequeue part from the handling part and check each one by its own. We’ll check the dequeue part for thread-safety(assuming that the underlying queue was tested by itself) and the handling part for pure logic. We can now test that for X calls for enqueue we get the same X calls for dequeue.


Here is the refactored code:


private void ConsumeTask()
{
   while (true)
   {
      string task = WaitForTask();

      if (task == null) return; // This signals our exit

      try
      {
         // work on the task
      }
      catch (Exception err)
      {
        // log err & eat the exception, we still want to resume consuming other tasks.
      }
   }
}


protected virtual string WaitForTask()
{
   lock (_locker)
   {
      // if no tasks available, wait up till we’ll get something.
      while (_queue.Count == 0)
         Monitor.Wait(_locker);

      // try to put it outside of the lock statement and run the test(bellow)
      return
_queue.Dequeue(); 
   }
}


public virtual void EnqueueTask(string task)
{
   lock (_locker)
   {
      _queue.Enqueue(task);
      Monitor.Pulse(_locker);
   }
}


Now we can create a simple test for the thread safety by overriding both of the enqueue\dequeue methods:


internal class TestableTasksQueue : TasksQueue
{
   private static int _dequeueCount = 0;
   private static int _enqueueCount = 0;

   public TestableTasksQueue(int workerCount) : base(workerCount) {}

   protected override string WaitForTask()
   {
      string item = base.WaitForTask();
      Interlocked.Increment(ref _dequeueCount);
      return item;
   }

   public override void EnqueueTask(string task)
   {
      base.EnqueueTask(task);
      Interlocked.Increment(ref _enqueueCount);
   }

   public static int DequeueCount
   {
      get { return _dequeueCount; }
   }

   public static int EnqueueCount
   {
      get { return _enqueueCount; }
   }
}


The tricky part here is the test itself. Because of subtle multi-threading issues, we can’t actually know when 2 (or more) threads will try to dequeue on the same time, so we have to run this test enough times in order to detect bugs. Here is a little sample:


[TestFixture]
public class TasksQueueTests
{
   [Test]
   public void Counting_DequeueAndEnqueueCountsShouldBeEqual()
   {
      for (int j = 0; j < 1000; j++)
      {
         using (TestableTasksQueue queue = new TestableTasksQueue(5))
         {
            for (int i = 0; i < 100; i++)
               queue.EnqueueTask(“test” + i);
         }

         Assert.AreEqual(TestableTasksQueue.DequeueCount, TestableTasksQueue.EnqueueCount);
      }
   }
}


Well, it’s not that elegant, I know, but thread-safety is hard to test.
I would love to hear some suggestion from you regarding this issue.

 

Implementing a simple multi-threaded TasksQueue

In one of my tasks, I had to create a simple mechanism that will able my Team to enqueue tasks, while multiple threads will work on them on the background. I’ve used the Wait\PulseAll pattern in order to signal the workers when a new task is available. Here it goes:


public class TasksQueue : IDisposable
{
   private readonly object _locker = new object();
   private Thread[] _workers;
   private Queue<string> _tasksQueue = new Queue<string>();


Nothing fancy so far. Just notice that we’re holding a private lock object and an array of Thread that will represent our workers.
The constructor simple initialize the threads and activate them:


public TasksQueue(int workerCount)
{
   if (workerCount <= 0) throw new ArgumentException(“The number of working threads can not be equal or less than 0”, “workerCount”);
   
   _workers = new Thread[workerCount];
   for (int i = 0; i < workerCount; i++) { // Create and start a separate thread for each worker
      Thread workerThread = new Thread(ConsumeTask);
      workerThread.Name = “WorkerThread #” + i;
      _workers[i] = workerThread;
      workerThread.Start();
   }
}


I’ve bold the “ConsumeTask” method that each Thread activates. Let’s see what we have there:


private void ConsumeTask()
{
   while (true)
   {
      string task;
      lock (_locker)
      {
         // if no tasks available, wait up till we’ll get something.
         while (_tasksQueue.Count == 0) {
            // Monitor.Wait is equal to:
            // (1) Monitor.Exit(locker); (give others a chance to lock _locker) 
            // (2) wait for pulse 
            // (3) on pulse – Monitor.Enter(locker);
            Monitor.Wait(_locker); 
         }

         // The first working thread that will be pulsed will reacquire the lock on “locker”, thus
         // it’s impossible that 2 working threads will try to dequeue the same task.
         task = _tasksQueue.Dequeue();
      }

      if (task == null) return; // This signals our exit

      try
      {
         Console.WriteLine(DateTime.Now + ” Start Processing task: “ + task + ” on thread: “ + Thread.CurrentThread.Name);
         // Simulate a time-consuming task
         Console.WriteLine(DateTime.Now + “Done Processing task: “ + task + ” on thread: “ + Thread.CurrentThread.Name);
      }
      catch (Exception err)
      {
         // log err & eat the exception, we still want to resume consuming other tasks.
         //Debug.Fail(“Exception occurred while trying to consume the job (thread#: ” + Thread.CurrentThread.Name + “): ” + err.Message);
      }
   }
}


We’re looping(infinite) and trying to see if our queue is empty – if so, we’re waiting for a new task to arrive. Till then – the thread will be blocked(inactive). When we have a new task (or the queue is not empty), we dequeue a task and “process” it. Let’s see the Enqueue method:


public void EnqueueTask(string task)
{
   lock (_locker)
   {
      if (task != null)
         Console.WriteLine(DateTime.Now + ” Enqueue task: “ + task);

      _tasksQueue.Enqueue(task);
      Monitor.Pulse(_locker); // wake 1 worker (thanks Alan) to handle the new task
   
   }
}


This leaves us with disposing the all thing (killing our threads gracefully):


public void Dispose()
{
   EndCurrentWorkingThreads();
}

private void EndCurrentWorkingThreads()
{
   if (_workers == null || (_workers != null && _workers.Length == 0))
      return;

   // Signal the threads to shutdown gracefully
   for (int i = 0; i < _workers.Length; i++)
      EnqueueTask(null); //will shutdown the worker that receive a null task.

   // Wait up…
   foreach (Thread worker in _workers)
      worker.Join();
}


That’s it, but how can we use it you might ask – here it goes:


using (TasksQueue q = new TasksQueue(2))
{
   for (int i = 0; i < 10; i++)
      q.EnqueueTask(“Task “ + i);

   Console.WriteLine(“Waiting for completion…”);
}

Console.WriteLine(“All tasks done!”);