Implementing a simple multi-threaded TasksQueue

In one of my tasks, I had to create a simple mechanism that will able my Team to enqueue tasks, while multiple threads will work on them on the background. I’ve used the Wait\PulseAll pattern in order to signal the workers when a new task is available. Here it goes:


public class TasksQueue : IDisposable
{
   private readonly object _locker = new object();
   private Thread[] _workers;
   private Queue<string> _tasksQueue = new Queue<string>();


Nothing fancy so far. Just notice that we’re holding a private lock object and an array of Thread that will represent our workers.
The constructor simple initialize the threads and activate them:


public TasksQueue(int workerCount)
{
   if (workerCount <= 0) throw new ArgumentException(“The number of working threads can not be equal or less than 0”, “workerCount”);
   
   _workers = new Thread[workerCount];
   for (int i = 0; i < workerCount; i++) { // Create and start a separate thread for each worker
      Thread workerThread = new Thread(ConsumeTask);
      workerThread.Name = “WorkerThread #” + i;
      _workers[i] = workerThread;
      workerThread.Start();
   }
}


I’ve bold the “ConsumeTask” method that each Thread activates. Let’s see what we have there:


private void ConsumeTask()
{
   while (true)
   {
      string task;
      lock (_locker)
      {
         // if no tasks available, wait up till we’ll get something.
         while (_tasksQueue.Count == 0) {
            // Monitor.Wait is equal to:
            // (1) Monitor.Exit(locker); (give others a chance to lock _locker) 
            // (2) wait for pulse 
            // (3) on pulse – Monitor.Enter(locker);
            Monitor.Wait(_locker); 
         }

         // The first working thread that will be pulsed will reacquire the lock on “locker”, thus
         // it’s impossible that 2 working threads will try to dequeue the same task.
         task = _tasksQueue.Dequeue();
      }

      if (task == null) return; // This signals our exit

      try
      {
         Console.WriteLine(DateTime.Now + ” Start Processing task: “ + task + ” on thread: “ + Thread.CurrentThread.Name);
         // Simulate a time-consuming task
         Console.WriteLine(DateTime.Now + “Done Processing task: “ + task + ” on thread: “ + Thread.CurrentThread.Name);
      }
      catch (Exception err)
      {
         // log err & eat the exception, we still want to resume consuming other tasks.
         //Debug.Fail(“Exception occurred while trying to consume the job (thread#: ” + Thread.CurrentThread.Name + “): ” + err.Message);
      }
   }
}


We’re looping(infinite) and trying to see if our queue is empty – if so, we’re waiting for a new task to arrive. Till then – the thread will be blocked(inactive). When we have a new task (or the queue is not empty), we dequeue a task and “process” it. Let’s see the Enqueue method:


public void EnqueueTask(string task)
{
   lock (_locker)
   {
      if (task != null)
         Console.WriteLine(DateTime.Now + ” Enqueue task: “ + task);

      _tasksQueue.Enqueue(task);
      Monitor.Pulse(_locker); // wake 1 worker (thanks Alan) to handle the new task
   
   }
}


This leaves us with disposing the all thing (killing our threads gracefully):


public void Dispose()
{
   EndCurrentWorkingThreads();
}

private void EndCurrentWorkingThreads()
{
   if (_workers == null || (_workers != null && _workers.Length == 0))
      return;

   // Signal the threads to shutdown gracefully
   for (int i = 0; i < _workers.Length; i++)
      EnqueueTask(null); //will shutdown the worker that receive a null task.

   // Wait up…
   foreach (Thread worker in _workers)
      worker.Join();
}


That’s it, but how can we use it you might ask – here it goes:


using (TasksQueue q = new TasksQueue(2))
{
   for (int i = 0; i < 10; i++)
      q.EnqueueTask(“Task “ + i);

   Console.WriteLine(“Waiting for completion…”);
}

Console.WriteLine(“All tasks done!”);

 

Oren Ellenbogen