# Wednesday, January 28, 2009

Ron challenged me to write my own implementation of reader/writers lock. He did a great job of doing his own implementation.

I came up with this:
(note: this is really naive implementation, it's far from ideal in terms of fairness and possible racing conditions. Just take it as brain-teaser)

   1: public class StateReaderWriterLock : IDisposable
   2:     {
   3:         private readonly AutoResetEvent _changeStateAutoLock = new AutoResetEvent(false);
   4:         private readonly AutoResetEvent _writerDone = new AutoResetEvent(false);
   5:         private readonly object _readersLock = new object();
   6:         private readonly object _writersLock = new object();
   7:         private int _readers;
   8:         private int _state; // 0 is "neutral", 1 is read, 2 is write
   9:         private int _writers; 
  10:  
  11:         public void LockReader()
  12:         {
  13:             while (true)
  14:             {
  15:                 // try to bring the state from "neutral" or "read" to "read". If the current state is "write", let's wait.
  16:                 while (Interlocked.CompareExchange(ref _state, 1, 0) == 2)
  17:                     _changeStateAutoLock.WaitOne(); 
  18:  
  19:                 // an interesting case here where the last reader is now in ReleaseRead and we're trying to read as well, we might be too late (the writer might have changed the _state already)
  20:                 // if that happens - we're back to square one, but we still want to avoid recursive locking!
  21:                 bool loseInRace = false;
  22:                 lock (_readersLock)
  23:                 {
  24:                     if (Interlocked.CompareExchange(ref _state, 1, 0) == 2)
  25:                         loseInRace = true;
  26:                     else
  27:                         _readers++;
  28:                 }
  29:                 if (!loseInRace)
  30:                     return; // success!
  31:             }
  32:         } 
  33:  
  34:         public void ReleaseReader()
  35:         {
  36:             lock (_readersLock)
  37:             {
  38:                 _readers--; 
  39:  
  40:                 // if I am the last reader, let's reset the state so any given reader/writer can take it
  41:                 if (_readers == 0)
  42:                 {
  43:                     Thread.VolatileWrite(ref _state, 0);
  44:                     _changeStateAutoLock.Set();
  45:                 }
  46:             }
  47:         } 
  48:  
  49:         public void LockWriter()
  50:         {
  51:             while (true)
  52:             {
  53:                 // try to bring the state from "neutral" or "write" to "write". If the current state is "read", let's wait.
  54:                 while (Interlocked.CompareExchange(ref _state, 2, 0) == 1)
  55:                     _changeStateAutoLock.WaitOne(); 
  56:  
  57:                 bool loseInRace = false;
  58:                 lock (_writersLock)
  59:                 {
  60:                     if (Interlocked.CompareExchange(ref _state, 2, 0) == 1)
  61:                         loseInRace = true;
  62:                     else
  63:                         _writers++;
  64:                 } 
  65:  
  66:                 if (!loseInRace)
  67:                     break; // great success!
  68:             } 
  69:  
  70:             // allow 1 writer only
  71:             if (Thread.VolatileRead(ref _writers) > 1)
  72:             {
  73:                 _writerDone.WaitOne();
  74:             }
  75:         } 
  76:  
  77:         public void ReleaseWriter()
  78:         {
  79:             lock (_writersLock)
  80:             {
  81:                 _writers--; 
  82:  
  83:                 // if I am the last writer, let's reset the state so any given reader/writer can take it
  84:                 if (_writers == 0)
  85:                 {
  86:                     Thread.VolatileWrite(ref _state, 0);
  87:                     _changeStateAutoLock.Set();
  88:                 }
  89:                 else
  90:                 {
  91:                     _writerDone.Set();
  92:                 }
  93:             }
  94:         } 
  95:  
  96:         public void Dispose()
  97:         {
  98:             _changeStateAutoLock.Close();
  99:             _writerDone.Close();
 100:         }
 101:     }
 102:  


Not sure it's the greatest job interview question, but it is indeed challenging and fun to play with.

   

Posted by Oren Ellenbogen 
28/01/2009 01:20, Israel time UTC-07:00,     Comments [0]  | 

Imagine you've got the following code, running in 2 separate threads T1 and T2:

private bool _go = true;

T1                                      T2
while (_go)                        // ... some code here ...
{                                         _go = false; // something happened, let's stop
   // ... do work
}

Even if T2 will set _go to false, this code might lead to endless loop in T1. Why is that?
Each CPU have a local memory called L1 Cache, that might cache the value of _go. Multiple threads running on multiple CPU's can (and will) cache data and re-order instructions to optimize code. So if for example, T1 is running on processorA and T2 is running on processorB, we might have an endless loop here. A simple solution here is to add the volatile keyword on the _go field definition to assure order and avoid caching on CPU level:

"The volatile modifier is usually used for a field that is accessed by multiple threads without using the lock statement to serialize access. Using the volatile modifier ensures that one thread retrieves the most up-to-date value written by another thread." (MSDN)


Now let's examine another example, again 2 threads T1 and T2:

private int _writers = 0;

T1                                                         T2
while (_writers > 1)                              ....
{                                                           Interlocked.Increment(ref _writers);
    _someAutoEvent.WaitOne();
}

You can see that we're using an atomic increment via Interlocked.Increment method, so we should be thread-safe here right? not quite.
We've got one thread that is reading (T1) and another thread that is writing (T2) to _writers. If T1 is running in processorA and T2 in processorB, the _writers value will be cached on the CPU level for some time which means T1 might see different value than T2. If T2 would have catch the returned value from Interlocked.Increment and it as the only reader of that field, then it was thread-safe. This is not the case here.
The solution here is to use Thread.VolatileRead(ref _writers) to make sure T1 gets the latest value. We could have used lock keyword as well of course to serialize access to _writers field.


Summary:

  • Although setting a word size variable is atomic (like in our first example), it doesn't mean it is thread-safe! For "boolean flags" I would recommend using volatile as a clean and simple solution.
  • For "counters scenarios", I would have used Interlocked with Thread.ReadVolatile. This should outperform lock usage and still keep your code neat and shiny.
  • The lock keyword is probably the safest way to avoid dangerous race conditions, so unless you're sure about your solution, keep it simple and use lock.
   

Posted by Oren Ellenbogen 
28/01/2009 12:31, Israel time UTC-07:00,     Comments [0]  | 
# Tuesday, December 30, 2008

It seems that someone has upload a 10 minutes video for some time now (thanks TristanIce!).
This is far from being a well prepared lecture (alt.net is all about "everyone can get up and talk"), I just tried to show the charm behind CCR after people started to share their pain developing multi-threaded applications. I'm talking crazy-fast and the material is not well organized so I had to think while talking/writing. The results are hard to watch. Multi-threading is a bitch eh?

All in all, the vibe was positive and people looked intrigued by it. Anyway, here it is:

Oren explains how to use CCR framework (MSN video, the movie is playing on the right)

 

Makes me want to prepare a lecture on the subject some day...
   

Posted by Oren Ellenbogen 
30/12/2008 09:29, Israel time UTC-07:00,     Comments [0]  | 

Requirements:

  1. Be able to read all the lines in a given file.
  2. Be able to do so even if the file is HUGE ( == don't load it all at once).
  3. Control the number of items I want to receive and whether or not the enumerator ignore empty lines. Always nice to have.
  4. Thread-safe should be supported easily. Think about 50 threads, each reading the next line and processing it.
  5. Nice performance is a plus.


Playing with the API with my teammate Ron gave the following (code written in notepad, stupidity won't compile):

"Common foreach" usage:

foreach (string line in FileStreamer.GetLines(@"c:\temp\myfile.txt", true, 1000)) { /* .. code */ } // read 1000 items from the file while ignoring empty lines.


Reading from multiple threads usage:

using (FileStreamer streamer = new FileStreamer(@"c:\temp\myfile.txt", true, -1)) // -1 means no limit, read all non-empty lines
{
    Thread[] threads = new Thread[10];
    for(int i=0; i<threads.Length; i++)
    {
        threads[i] = new Thread((ThreadStart)delegate {
            string line;
            if (!streamer.TryGetNextLine(out line)) // thread safe!
                return; // end of file, we can exit

            // do work ...
        });

        threads[i].Start();
    }

    // join the threads + whatever ...
}

 

After reading a few ideas in stackoverflow, I thought to share my solution:    

// written by bogen (30/12/2008)
 
#region using
 
using System;
using System.Collections.Generic;
using System.IO;
 
#endregion
 
namespace Semingo.Common.Utils
{
    /// <summary>
    /// Return a stream of lines for the specified file.
    /// This class is thread safe by design!
    /// Use the static method FileStreamer.GetLines for not thread safe usage (via foreach)
    /// </summary>
    public class FileStreamer : IDisposable
    {
        #region fields
 
        private readonly object _locker = new object();
        private readonly string _path;
        private readonly bool _ignoreEmptyLines;
        private readonly int _limit;
        private readonly IEnumerator<string> _enumerator;
        private int _linesGiven;
        private bool _disposed;
 
        #endregion
 
        #region ctors
 
        /// <summary>
        /// Create a file streamer instance
        /// </summary>
        /// <param name="path">File path</param>
        public FileStreamer(string path) : this(path, false, -1)
        {
        }
 
        /// <summary>
        /// Create a file streamer instance
        /// </summary>
        /// <param name="path">File path</param>
        /// <param name="ignoreEmptyLines">Should the streamer avoid empty lines</param>
        /// <param name="limit">Number of maximum lines the streamer should return. Send -1 for no limit</param>
        public FileStreamer(string path, bool ignoreEmptyLines, int limit)
        {
            if (!File.Exists(path))
                throw new ArgumentException("Cannot find the file: " + path);
            if (limit != -1 && limit <=0 )
                throw new ArgumentException("Limit must be bigger than 0 (or -1 for no limit) but was: " + limit + ". File given was: " + path);
 
            _path = path;
            _ignoreEmptyLines = ignoreEmptyLines;
            _limit = limit;
            
            _enumerator = CreateStream().GetEnumerator();
        }
 
        #endregion
 
        #region public API
 
        public bool TryGetNextLine(out string nextItem)
        {
            lock (_locker)
            {
                return TryGetNextLineAssumingInsideLock(out nextItem);
            }
        }
 
        public bool TryGetNextLines(out ICollection<string> nextItems, int howMany)
        {
            if (howMany <= 0)
                throw new ArgumentException("'howMany' parameter must be > 0 but was " + howMany, "howMany");
 
            nextItems = new List<string>(howMany);
            lock (_locker)
            {
                string nextItem;
                for(int i=0; i<howMany; i++)
                {
                    if (!TryGetNextLineAssumingInsideLock(out nextItem))
                        break; // no more lines (EOF)
                    
                    nextItems.Add(nextItem);
                }
            }
 
            return nextItems.Count > 0;
        }
       
        public static IEnumerable<string> GetLines(string path)
        {
            return GetLines(path, false, -1);
        }
 
        /// <summary>
        /// 
        /// </summary>
        /// <param name="path"></param>
        /// <param name="ignoreEmptyLines"></param>
        /// <param name="limit">send -1 for no limit</param>
        /// <returns></returns>
        public static IEnumerable<string> GetLines(string path, bool ignoreEmptyLines, int limit)
        {
            using (FileStreamer streamer = new FileStreamer(path, ignoreEmptyLines, limit))
            {
                string nextItem;
                while (streamer.TryGetNextLine(out nextItem))
                    yield return nextItem;
 
                yield break; // EOF
            }
        }
 
        ///<summary>
        ///Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
        ///</summary>
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
 
        #endregion
 
        #region private API
 
        /// <summary>
        /// Get the next line in the file.
        /// dev: assume that the lock is from the outside, by the caller (this is why it's a private method)
        /// </summary>
        private bool TryGetNextLineAssumingInsideLock(out string nextItem)
        {
            nextItem = null;
            if (_linesGiven == _limit)
                return false; // we reached the limit, no more please.
 
            if (!_enumerator.MoveNext())
                return false; // end of stream (EOF)
 
            nextItem = _enumerator.Current;
            _linesGiven++;
            return true;
        }
 
        private IEnumerable<string> CreateStream()
        {
            using (FileStream fs = new FileStream(_path, FileMode.Open, FileAccess.Read, FileShare.Read, 1024, FileOptions.SequentialScan))
            using (StreamReader reader = new StreamReader(fs))
            {
                string line;
                while ((line = reader.ReadLine()) != null)
                {
                    if (_ignoreEmptyLines && line == string.Empty)
                        continue; // skip empty lines if needed
 
                    yield return line;
                }
 
                yield break;
            }
        }
 
        protected void Dispose(bool disposing)
        {
            if (_disposed)
                return;
 
            if (disposing)
            {
                _enumerator.Dispose();
            }
 
            _disposed = true;
        }
 
        #endregion
    }
}
   

Posted by Oren Ellenbogen 
30/12/2008 09:17, Israel time UTC-07:00,     Comments [0]  | 
# Friday, November 07, 2008

"Why do you love working as X so much? so much that you're willing to spend that many hours of your life at?"

Pause for a second. Try to close your eyes and think what will be your answer for this question?


For me, the answer is obvious: It's the people and challenges that make my brain tick and my motivation SKY high. It's the feeling that I can really make things better by investing everything I got into it that makes me proud of my work. I get huge satisfaction installing TFS 2008, trying to make our Integration Tests work X6 faster, practicing some Agile principles I've read about or take any other "dirty yet important work" no one would like to touch. I'm not scared of  hard work and if I can feel, down there in my stomach, that it would make my teammates more productive - I'll do anything I can to make it happen. Oh, and I'm trying to build one of the most complex search engine the world has the offer with a bunch of b-r-i-l-l-i-a-n-t guys! Can you blame me for working so hard, enjoying every minute of it?

Sure, getting a few bucks more would be great, but that will not make me proud of what I'm doing. One of the main things I've learned in my 8 years of developing software, is that highly motivated teams will always make the best products. Leave aside for a moment the productivity boost these teams enjoy and imagine their daily work, their lunches together, their working environment, their joy of talking with one another about day to day stuff. Imagine how they dream about their goals together, discussing ways to making it better and more enjoyable. It's the buzz these companies have that drove the best guys to them, so "effortlessly". The commitment to one another will make sure you'll build quality systems, that you'll try your best to deliver on time, to make it better, smarter, BIGGER, every single day. It will allow you to grow like you could never anticipated. Trying to grow this culture in your team is one of the hardest things in the world, way harder than any logical puzzle thrown at you. Believe me.

It's just so damn hard to get it right.


"
There are many men who feel a kind of twister pride in cynicism" (Theodore Roosevelt, The Man In The Arena speech).
Over cynicism means death for any joint effort. No matter how strong your team is, negativity and cynicism will break your team spirit. It always does.
Stop being so negative, so cynical about your actions and your dreams. You can do great things by answer the question above and remember that it's all about the people around you. It's all about you! you can actually make everyone around you better by taking action. Stop listening to people who thinks they know best and mocking you with "you're only a tiny nail in a giant machine". Don't be afraid of constantly trying to make a difference, even if you'll lose here and there. Read books, talk about them and your ideas, share and try, try, try, and try again!

This attitude will probably make you a winner, someone that others will enjoy working with, being with, taking inspiration from.
I know that these guys are the one I love working with or going to a bar close by, drinking some beer and talking about how to change the world.

Best people simply do for each other.

   

Posted by Oren Ellenbogen 
07/11/2008 05:40, Israel time UTC-07:00,     Comments [1]  | 
# Friday, October 31, 2008

We're doing a lot of thinking these days about which features will give us the best ROI, trying to prioritize existing features and asking ourselves "did we miss something? is there a new feature out there we left behind?". It's not easy to think about great one-of-a-kind ideas. It is easy though to make it almost impossible.

Why? it's all because the "parallel" right hemisphere of our brain, imagine the following brain storming conversation:

me(Left brain): Alright I've got one! The user will enter the screen, do X and Y (we'll do some Z behind the scene) to receive ...
   me(Right brain kicks in): but, doing Z will take me two weeks to develop..
   me(R): gosh, we'll need to build a dictionary and hold it in memory if we want it to scale..
   me(R): reminder, use ReaderWriteLockSlim this time. It's much faster than ReaderWriterLock!
   me(R): I guess that this feature is not as important as feature F1, maybe I'm spending my time thinking about this feature??!
   me(R): I'm so hungry!
   me(R): Oh, we can use [some service name] to do Z. Cool, so now this feature is feasible.
   me(R): crap, [some service name] cost money, I think.

[To the surrounding, it looks like I'm saying one fluent sentence of course. During that time they have their right brain working on "why not" / "how" / "when"]
 
   joe(R): gosh, is he for real? this is the lamest idea I've heard!
   jack(R): hmm, maybe he have a point there. This feature reminds me something I've always wanted to do... what was it now??...
   jack(R): naa.. this guy is crazy. for sure.
   joe(R): oh wait! we can use something I wrote to implement this feature! might be cool to use this code finally. It is laying there for ages.
   sarah(R): wonderful idea! I wonder if I'll be assigned to work on it?
   jack(R): I'm listening to his bubbling for 20 minutes now. self reminder: talk about a bonus with the CEO.
   sarah(R): I need coffee! God, if you'll end this meeting now I promise the donate 10$ for charity! coffee... please...

me(L): .. a brilliant search result !!


Any wonder that most brain storming meetings are futile?


Brain storming is a process that should be mastered and I suggest that you'll jump to the nearest browser to find books at the topic, it's a skill worth investing time at.
Before you do so, here are some rules I use to silent my right brain while doing Left Brain Storming:

  1. Never ever prioritize your ideas during brain storming. I can't stress enough how important is this rule. Don't worry about it now, you'll have time later. 
  2. Listen to others.
  3. Be patient = don't judge quality of ideas.
  4. Write everything down. I really mean everything! There are no "stupid ideas" now.
  5. You are not going to execute these ideas. At least that is what you should tell your right brain during that time.
  6. Understand the meaning behind the feature, imagine how it will work, not how it will be executed!
  7. Don't invest more than 2 hours in a single brain storming meeting. If you feel you've missed some ideas, rest a few hours (or even better - a few days) and then give it another shoot. "Burned out quickly, left brain does. Burned out leads to impatience. Impatience kicking the right brain in action. Right brain means trouble for your brain storming meeting" -- so does Master Yoda say (well, sort of)
  8. 80/20: after you're done throwing out ideas (or the 2h gong), go over the features you've raised and mark features you think are interesting and feasible with 80 and features that are not with 20. This should take no longer than 2 minutes, so please use only 80 and 20 as numbers.
  9. Set a separate meeting to prioritize features with the existing backlog you've got. Important: don't do it at the same day, you'll probably want to sleep things over.


Happy hunting!

   

Posted by Oren Ellenbogen 
31/10/2008 03:14, Israel time UTC-07:00,     Comments [1]  | 
# Sunday, October 19, 2008

[ Part 1, Part 2, this is Part 3 ]

In part 1 I've talked about the general notion behind SpawnEnumerator. In part 2 I've implemented it and talked a bit about CCR and how it works under the hood.
In the third and last part I'll try to show how we can actually execute billions of tasks very easily.

Iterating over billions of tasks:

In my previous post I've tried to show one of the greatest yet unused features in C# 2.0 - the yield statement. I've closed the post with:
"Prefer using the yield statement as long as calculating values doesn't require holding an expensive resource like a DB connection, or a FileHandler for long period of time."

Now, let's assume that we have 100,000,000 Tasks to pull from a repository (DB, files, doesn't really matter) and execute them all via SpawnEnumerator.
Should we define a list of Task, fill it with 100,000,000 items and then iterate over it? of course not! we can't even if we want to.
Assuming that reading 1000 (for example) Tasks is much faster than executing them (it usually is), and it's safe to hold them in memory, we can "bulk read & yield" the entire thing:

public delegate T Func<T>();

public static class Yield
{
    public static IEnumerable<T> Bulked<T>(Func<IEnumerable<T>> bulkYielder)
    {
        while (true)
        {
            IEnumerable<T> yielder = bulkYielder();
            if (yielder == null)
                throw new ArgumentException("bulkYielder cannot return a null enumerable", "bulkYielder");

            int itemsGiven = 0;
            foreach (T t in yielder)
            {
                itemsGiven++;
                yield return t;
            }

            if (itemsGiven == 0)
                break;
        }
    }
}

This can be used to return a stream of "tasks bulk" :

Yield.Bulked<Task>(delegate { return tasksRepository.Dequeue(1000); }) // return the next 1000 from the 100,000,000 items queue until no more tasks exists in the repository.

Important note:
In this scenario, we should return 1000 items from Dequeue method without using yield to avoid holding expensive resources for long period of time (as discussed).
Once we have those 1000 items, we can yield each one of them to the caller (very cheap, no resources are used).

Finally, we can execute billions of tasks in parallel via SpawnEnumerator:

IEnumerator<Task> tasksEnumerator = Yield.Bulked<Task>(delegate { return tasksRepository.Dequeue(1000); });
Action<Task> taskHandler = delegate(Task t) { /* execute single task */ };

// execute the tasks in parallel, using 50 threads and holding ~1000 items in memory.
_tasksExecutor = SpawnEnumerator<Task>.Start(50, "mypool", 1000, tasksEnumerator, taskHandler);

// note:don't forget to dispose _tasksExecutor when killing/stopping app!


Using Yield.Bulked guarantees that we won't hold expensive resources for too long while allowing us to generate a "stream" of Tasks to run in parallel.
The code is easy to read and follow (I hope) and we gain a simple method for executing billions of tasks very effectively (CPU & memory wise).

   

Posted by Oren Ellenbogen 
19/10/2008 01:39, Israel time UTC-07:00,     Comments [0]  | 

How many of you played with C# yield statement ? I guess that most of you did.
Anyway, like most of the MSDN examples out there, when used incorrectly, it could introduce very bad behavior to your code. Consider the following:

public IEnumerable<User> GetUsers(int count)
{
    using (MysqlConnection connection = new MysqlConnection("..."))
    {
        // MysqlDataReader reader = create an MysqlCommand and execute it
        while (reader.Read())
            yield return new User(/*... fill parameters from the reader ... */);
    }
}

Looks pretty harmless right? Not quite. The yield statement is actually transformed to a "state machine" which means that every time we yield a result back to the client (the caller of GetUsers in our example), we wait for the client to call to the next item (via IEnumerator<T>.MoveNext()). The code above will hold the connection open until the client done iterating all of the User items. This will lead into major scalability issues very quickly! You should always keep your DB connections open for short period to prevent connection exhaustion (threads waiting for available DB connection in the pool for long period, until timeout). Because yield returns the control to the caller, it might be that the caller will "take his time" thus leading to connection exhaustion.


On the other hand, used wisely and yield yields (lame joke, sorry) HUGE benefits:
[note: code written in notepad, stupidity won't compile]

  • Avoid useless memory allocations

#1:
How many times you end up creating something like this:

    public List<T> Filter(List<T> input, Predicate<T> predicate)
    {
       List<T> output = new List<T>(input.Count /2);
       foreach (T item in input)
          if (predicate(item))
             output.Add(item);

       return output;
   }

We allocate much more memory than we need only to hold the output during the calculation. A better approach will be:

    public IEnumerator<T> Filter(IEnumerator<T> input, Predicate<T> predicate)
    {
       foreach (T item in input)
          if (predicate(item))
             yield return item;
   }

This way we allocate only one T at a time (will be saved in the generated state machine). In addition, the client could choose to send each item via yield as well, thus saving the need to create the "input" before calling our Filter method.


#2:

Another oh-(gosh-why)-so-common example is the following:

public void Save(T item)
{
    Save(new T[] { item });
}

public void Save(ICollection<T> items)
{
    // do your magic here to save items
}

Assuming you call Save with a single item quite a lot, you're allocating A LOT of memory to create one-item arrays. A better approach will be:

public void Save(T item)
{
    Save(Yield.One(item));
}

public void Save(IEnumerator<T> items)
{
    // do your magic here to save items
}

public static class Yield
{
    public static IEnumerator<T> One<T>(T item)
    {
        yield item;
    }
}

~Zero memory allocation here.

  • Avoid "impossible" memory allocation

Let's say you want to read a 20G file with emails where every line holds a single email. Trying to declare a List<string> and filling it up will make your memory blow up obviously. You simply can't hold that much in memory. Instead, you can use Stream.ReadLine and yield back each row to your client, until all of the emails are taken care of. Yes, you can try to read the file in chunks (keeping a pointer), but this is exactly what yield does under the hood. Reminder: DRY principle is gold (or Don't Repeat .Net Framework, in our case).

  • Execute synchronous code asynchronously (nicely achieved via CCR)

A bit advanced, but you can read all about it here. The great benefit is you can transform (almost) any "yield based" code to run async, if needed/wanted.


Recap:
Prefer using the yield statement as long as calculating values doesn't require holding an expensive resource like a DB connection, or a FileHandler for long period of time.

   

Posted by Oren Ellenbogen 
19/10/2008 12:35, Israel time UTC-07:00,     Comments [3]  | 
# Tuesday, October 14, 2008

It seems that there are MANY ways to perform http web request poorly. This is a huge problem in today's world where web-services are more common than bankrupt banks. Here is a quick pattern of how to do it right:

public string Fetch(Uri requestUri)
{

HttpWebRequest webRequest = (HttpWebRequest)WebRequest.Create(requestUri);

webRequest.Timeout = requestConnectTimeoutInMs; // take timeout from config
webRequest.ReadWriteTimeout = requestReadWriteTimeoutInMs; // take timeout from config

using (WebResponse webResponse = webRequest.GetResponse())
using (StreamReader streamReader = new StreamReader(new TimeoutStream(webResponse.GetResponseStream(), fetchTimeoutInMs)) // take timeout from config
     return streamReader.ReadToEnd();

}

Details:

  1. Setting Timeout property: to make sure we don't wait the default 100 seconds for "ACK" from the server. WAY too much.
  2. Setting ReadWriteTimeout: This is crucial to understand. StreamReader under the hood read data in chunks, this timeout determine how much time you should wait for reading a single chunk. 100 seconds, the default value, is again WAY too much.
  3. Using TimeoutStream (you need to implement your own or let me know if you're interested and I'll send it to you): Alright, let's say you're willing to wait for 500ms for ACK (Timeout), up to 500ms for reading every chunk (ReadWriteTimeout) but not more than 5 seconds for the entire read to complete. There is no way to achieve it without TimeoutStream. It will start a timer internally and override Seek/Read/Write (etc) method by checking the timer before calling the internal stream method. TimeoutStream is a very simple wrapper around Stream. For example:
  4.    public override int Read(byte[] buffer, int offset, int count)
          {
              CheckTimeout(); // throw TimeoutException if timeout was reached
              return _stream.Read(buffer, offset, count);
          }

Multiple HttpWebRequest limitation:
By default, you can't perform more than 2-3 async HttpWebRequest (depends on the OS). In order to override it (the easiest way, IMHO) don't forget to add this under <configuration> section in the application's config file:

<system.net>
  <connectionManagement>
     <add address="*" maxconnection="65000" />
  </connectionManagement>
</system.net>

Why should you follow these guidelines:

  1. Never trust 3rd party components: avoid excuses like "my site is not responsive because 1000 threads are waiting for web-service-X to respond". By setting those parameters you're safe to make your own choices of how much time to wait. Log and monitor these things to adjust your application and alert your suppliers.
  2. Be able to determine your own SLA for the world: again, if internally you need to call a web-service, make sure you're able to control the time you're willing to spend. You've got clients to serve and they want you to meet the SLA as you promised!

Important note about recycling HttpWebRequest.GetResposne()
Simply put, it's not working by design. That means that if you fail to get a response on time (due to 1,2 or 3), don't call the webRequest.GetResponse() again as it is cached internally (you'll get the same HttpWebResposne). What you should do is to re-create the HttpWebRequest and try again. I don't agree with the selected design by Microsoft for this method, but at least it's good to be aware of it.

   from MSDN:

   " Multiple calls to GetResponse return the same response object; the request is not reissued. "

Final note:
You should obviously consider writing a HttpWebRequestHelper class (or extension method) and use it instead of copy&paste this code all over your codebase.

   

Posted by Oren Ellenbogen 
14/10/2008 05:32, Israel time UTC-07:00,     Comments [1]  | 

[ Part 1, this is Part 2, Part 3 ]

In Part 1 I've talked about the general notions behind SpawnEnumerator and played with the API. If you're not familiar with Microsoft's CCR, this post might require a 2nd & 3rd read to understand completely. CCR changed the way you should think or address async code. It's a game worth playing and studying the rules is only for your advantage. Alright, enough chit chat, let's make it happen (complete code attached at the end).

Class definition:

  public sealed class SpawnEnumerator<T> : IDisposable
       private SpawnEnumerator(int threadsCount, string threadsPoolName, IEnumerator<T> filler) { /* initialize fields by parameters, nothing more */ }

Now, let's have a look at some of the fields:

  private const double DefaultLowThresholdPrcentage = 0.1; // When "items to process" queue reach 10%, we want to re-fill. Should be exposed of course.

  private event Action<DateTime> _enumeratorDepleted = delegate { }; // trigger when the enumerator is empty
  private event Action<DateTime> _allTasksAreCompleted = delegate { }; // trigger when all tasks are completed

  private readonly Dispatcher _dispatcher; // our "threadpool"
  private readonly DispatcherQueue _dispatcherQueue; // hold actual ITask, waiting for the dispatcher (aka worker threads) to handle them. more about it soon.

  private readonly Port<T> _itemsToProcessPort;  // hold wannabe tasks, currently a queue of items of T we want to process.
  private readonly Port<EmptyValue> _itemCompleteNotificationPort; // soon...
  private readonly Port<EmptyValue> _initializePort;  // soon...

  private readonly IEnumerator<T> _enumerator; // the enumerator we'll use to fill the _itemsToProcessPort

Deeper look on what we have so far:

  • _itemsToProcessPort: will act as the queue of items we want to process. In CCR's world, Port<T> is actually a "smart queue" (more on it later).
  • _itemCompleteNotificationPort: will be used to notify on every completed item. Assuming that we need to fill 100 items to the port, and our lower limit is 10%, we want to re-fill the _itemsToProcessPort every 90 completed items. Notice we're using EmptyValue as T. EmptyValue is a CCR type that holds EmptyValue.SharedInstance to avoid memory allocation.
  • _initializePort: will be used to initialize the _itemsToProcessPort with the 1st bulk of items.

Step back, what's going on?!!? why so many Port ?
Well, the idea behind CCR is all about messaging. You can pass messages to ports (Port<T> is thread-safe of course) and by doing so, you can take advantage of the "smart queue" implementation behind Port<T>. When posting a message to a port, the CCR will try to apply some "predicates" on the port and if a "predicate" returns true, it will dequeue the item(s) from the Port matched that "predicate", create an ITask of it and push it into the DispatcherQueue as an "actual task".

Using Ports makes it easier for us to define complex async code. instead of putting locks all over the place, I can simply post a message to _itemCompleteNotificationPort and after X messages posted to this port, ask to re-fill the queue when a thread is available. This is much easier then counting each completed item and if ((counter % X) == 0), lock some object and re-fill. Both will work, but using the CCR world you don't have to think about technical async problems/solutions but rather on the logical async operations you want to perform. You'll write much less code, zero locks of your own and mostly think about "this could run concurrently", "this must run exclusively" and let the CCR schedule everything for you.

Start method: (as discussed in Part 1)
public static SpawnEnumerator<T> Start(int threadsCount, string threadsPoolName, int upperQueueSize, IEnumerable<T> filler, Action<T> handler)
{
  // .. validate parameters, nothing interesting...
  SpawnEnumerator<T> sw = new SpawnEnumerator<T>(threadsCount, threadsPoolName, filler.GetEnumerator());
  sw.Initialize(handler, upperQueueSize);
  return sw;
}

API Design: Why Start method with private constructor instead of public constructor alone:
The client of this method should understand that once she supply the arguments, things will start happening - we'll immediately start to process items from the enumerator. You'll soon find out that Start is non-blocking method. This "Start" method, so I feel, make it's it more explicit as it should be.

Initialize method:
private void Initialize(Action<T> handler, int upperQueueSize)
{
  RegisterRecievers(handler, upperQueueSize); // where CCR *magic* happens. soon...
  _initializePort.Post(EmptyValue.SharedInstance); // post a message to let "someone" know we want to fill the 1st bulk of items
}

RegisterRecievers method:
Before we look at the code, here is a remainder of the main things we want to accomplish:

  1. We should fill the _itemsToProcessPort for the 1st bulk or once we reach the lower limit of the queue, by counting how many items were completed. Keeping in mind that _enumerator is not thread-safe and we don't want to start locking access to it on our own, we should make sure that re-filling is done exclusively from 1 thread only.
  2. We want to handle each one of the items posted to _itemsToProcessPort with the supplied "handler" (given in Start method). Each item is independent so obviously we want to process each item concurrently, according to the amount of threads in the Dispatcher.

private void RegisterRecievers(Action<T> handler, int upperQueueSize)
{
  int numberOfItemsToDepleteBeforePushingNewBulk = (int)Math.Ceiling((1 - DefaultLowThresholdPrcentage) * upperQueueSize);

  Arbiter.Activate(_dispatcherQueue,

     Arbiter.Interleave(
         new TeardownReceiverGroup(),// nothing here
         new ExclusiveReceiverGroup(
                // 1st bulk:
                Arbiter.Receive(false, _initializePort, delegate { FillItemsToProcessQueueWithNextBulk(upperQueueSize); }), // only once, that's why the "false" is here.

                // enough items were completed which means "items to process" queue reached lower limit:
                Arbiter.MultipleItemReceive(true, _itemCompleteNotificationPort, numberOfItemsToDepleteBeforePushingNewBulk,
                                          delegate { FillItemsToProcessQueueWithNextBulk(upperQueueSize); })
            ),
         new ConcurrentReceiverGroup(
                // process items concurrently
                Arbiter.Receive(true, _itemsToProcessPort, // listen to every post, that's why the "true"
                     delegate(T item)
                     {
                         try
                         {
                             handler(item);
                         }
                         catch (Exception err) { /* log error */ }
                         finally
                         {
                             HandleCompletedItem();
                         }
                     })
            )
        )
    );
}

Alright, this is a bit harder to read but let's try to simplify it by reading it from inside-out.

  • Under ExclusiveReceiverGroup you'll see 2 receivers, one that listen to _initalizePort and once a message is posted, it will create an ITask that internally (when a thread is available) call FillItemsToProcessQueueWithNextBulk method. The second receiver will listen to _itemCompleteNotificationPort and do the same as the first one for every numberOfItemsToDepleteBeforePushingNewBulk items posted to the Port.
  • Under ConcurrentReceiverGroup you'll see a receiver listening to _itemsToProcessPort and for every message it will create an ITask that internally (again, one a thread is available) will run the given "item handler" with the item dequeued from the Port.
  • We use Arbiter.Activate(_dispatcherQueue, ...) to register each created ITask from the receivers to the internal queue. Queue of actual tasks.
  • This method is non-blocking, we only register receivers that know how to create and enqueue ITask from messages posted to some Port. That's it.

Recap:
We post messages to different Port<T> and write receivers that "listen" to the messages based on some rules (one receiver listen to every message, one listen to only one message, one listen to X messages etc). When the receiver's rules apply, the CCR will dequeue all the relevant (applied the rule) messages from the Port<T> and wrap them with the supplied delegated as ITask. This ITask instance will be enqueued to the DispatcherQueue until the Dispatcher have a free thread to handle it. The Dispatcher, according to the requested scheduling (some ITask must run exclusively, some can run concurrently, as we've seen), will execute those tasks.


You can download the complete code (with some extra features) here: SpawnEnumerator.txt (12KB) - you'll need CCR & log4net dll in order to compile it.

   

Posted by Oren Ellenbogen 
14/10/2008 04:07, Israel time UTC-07:00,     Comments [0]  | 
# Monday, October 13, 2008

[ this is Part 1, Part 2, Part 3 ]

I promised to write more about CCR and how to use it in practice.
There is no better way to explain how infrastructure work then writing some code and play with it, so lets toy with a simple utility, based on CCR under the hood.

Scenario: you've got billions of *independent* tasks you need to execute as fast as possible.
Requirements:

  1. Amount of "worker threads" should be easy to define. obviously.
  2. Memory consumption - don't eat more than you can chew. We don't want to hold everything in memory, this should be easy to configure.
  3. The process should *not* shutdown due to un-handled exception in one of the worker threads.
  4. We want to know when the enumerator is depleted and/or all items were completed. This is crucial for testing and adjusting parameters.

 

The general notion behind SpawnEnumerator is something like this:

SpawnEnumerator_generalIdea

 

We're pulling a bulk of items from the enumerator, each one of them is actually a "wannabe" item. Combining the value of each T with a given "item handler" could be transformed into an actual task, waiting for its turn in the "threads pool". Once the port/queue of "items to execute" reach the lower limit, we'll pull another bulk of items from the enumerator until the enumerator is depleted.


API playground:

SpawnEnumerator<T>.Start(int numberOfThreads, string threadPoolName, int numberOfItemsToHoldInMemory, IEnumerator<T> enumerator, Action<T> singleItemHandler)

T = type of items we would like to run. if we have 10,000,000 items of type int, we'll supply "int" as T.
numberOfThreads = how many worker threads do we want to run?
threadPoolName = name the worker threads for easier debugging.
numberOfItemsToHoldInMemory = rough number of items we want to hold in memory, to make sure memory consumption won't blow up in our face.
enumerator = the enumerator of "items" to execute, assuming that it will hold a *huge* amount of tasks.
singleItemHandler = delegate that receives 1 item and handle it.

event Action<DateTime> EnumeratorDepleted

event Action<DateTime> AllTasksAreCompleted


Test playground:

(written in notepad, sorry for stupid mistakes)

[Test]
public void Execute_LargeEnumeratorOfNumbers_ExecuteAllItems
{

// arrange:
int numberOfThreads = 10;
string threadPoolName = "mypool";
int numberOfItemsToHoldInMemory = 100;

IEnumerator<int> items = Yield.For(1, 10000); // yield return numbers from 1-10000
Action<int> singleItemHandler = delegate(int item) {  /* sum the given number, via Interlocked.Add */ };

ManualResetEvent trigger = new ManualResetEvent(false);

// act:
_executor = SpawnEnumerator<int>.Start(threadPoolSize, threadPoolName, numberOfItemsToHoldInMemory, items, singleItemHandler))
_executor.AllTasksAreCompleted += delegate { trigger.Set(); };   

// assert:
bool signaled = trigger.WaitOne(TimeSpan.FromSeconds(1), false);
Assert.IsTrue(signaled, "timedout reached, that shouldn't happen!");
// assert that all of the items were called by matching sum of 1-N sequential numbers (simple formula) to what we collected in the handler

}


* In the "test teardown" we can check if _executor is not null and if so Dispose it to close the "worker threads".

 

Next post - Implementing SpawnEnumerator via CCR.

   

Posted by Oren Ellenbogen 
13/10/2008 04:50, Israel time UTC-07:00,     Comments [2]  | 
# Monday, October 06, 2008

This is mostly a self-note but heck, maybe someone will reach this post via Delver (or Google :)), so I'm all about sharing.

Anyway, we're using (for now) Bugzilla and I tried to get all the "open" bugs for my team with status equal to X,Y,Z.

Sounds easy right? well, you are ... sadly mistaken.
After investing 15 minutes banging my head into the nearest wall with “Bugzilla Advanced Serach” (well, I'll be polite and say it's "advanced" alright), I gave in on that one.
Instead, I hacked the url a bit to understand the dark voodoo of Bugzilla and voila, 2 minutes later:

http://qabugz/cgi-bin/bugzilla/buglist.cgi
?bug_status=NEW
&bug_status=ASSIGNED
&bug_status=REOPENED
&assigned_to=Joe
&assigned_to=Joe2
&assigned_to=Joe3
&query_format=specific
&field0-0-0=bug_status
&field0-0-1=assigned_to
&type0-0-0=anyexact
&type0-0-1=anyexact
&order=bugs.bug_severity


Just remove the “break line” of course (this is easier to edit) and replace "Joe" with your favorite developer name.

If you want to make it a bit more complex, here is a nice site with the supported field names:
http://pkp.sfu.ca/bugzilla/page.cgi?id=quicksearchhack.html

 

Anyway, I hope it will help someone.

* geeky btw – notice the funny “binary index” of the fields! made me laugh quite a bit, being the geek that I am.

   

Posted by Oren Ellenbogen 
06/10/2008 03:20, Israel time UTC-07:00,     Comments [0]  | 
# Tuesday, September 30, 2008

One of the biggest challenges in management is be able to track your own rhythm, making sure your plans are executed and things go smoothly.

In my previous post about Driven By Self Organization I stressed the importance of making things visible and plan for the future. How can one set the environment of the team to drive the entire team to success?

Thinking about it lately made me think that I have my own thoughts about what should be visible, how the team should react internally and how should one behave in such team as a Team Lead. Don't get me wrong, I don't have 30 years of experience leading small<->giant teams for small<->giant companies, these ideas are solely based on my gut feelings: making things SIMPLE (KISS) so the entire team will be driven by self-organization without the "burden" of self-management. If things are easy to do, it's easier to get better at them.

The trick is to allow all of the members in the team to be part of the organize->execute game. Some will play, some won't, but they will all be affected by the always-ready environment and notice how inner-interaction change their day. By making it visible, they will be motivated to take action (it will feel natural). I came up with this drawing to expose our sprint plans and progress:

 

 image

 

This is a very SIMPLE presentation of the current sprint features and every-day progress - "cards" will move between columns as people work on them.
There are some magic "self organization" tricks integrated into this 1-Visible-view:

  1. Achievement-based planning: Before arranging the features/cards, we try to set the "deliveries" for the week. This is a free-text (don't be tempted to make it a list of features) ideas to "set the mood" for the week. It will describe features we want to finish, some design we need to handle before the next week, some quality check we want to pass ("make sure no critical/major bugs are open") etc. They help to plan the week as they define goals that are measurable, driving the team to achievement-based planning rather than like-best-do-first development.
  2. Visibility is key: Every member of the team knows exactly what's left for the entire team on a weekly level. No more "but I finished all my tasks two days earlier than we've talked! what can I do that Joe is new here and couldn't continue as you thought? oh, I didn't know we are dependent on his task before we can release the package..."
  3. Help each other: remind yourselves that things need to be completed, help one another by helping out when someone is behind of schedule (alright Joe, I'll take Feature A, don't worry). The board will make it easier to know "where can I help?"
  4. We want Quality: Nothing is DONE until it's tested and fixed. The idea of splitting "coded" tasks from "tested" tasks is to set the mood for "production ready" code.
  5. Small goals are easier to achieve: Splitting the sprint into smaller chunks make it easier to win small battles. Each week defines small trophies - the "deliveries" we promised for that week.
  6. Plan leftovers for tomorrow: at the end of every week\sprint, you could easily see what was left. Discuss why it failed and plan it for tomorrow (=next week or next sprint).

 

Team Lead in such a team will mostly act as a coach, helping the team members to split the features into tasks, remove obstacles, motivate cooperation and taking notes about "how can we get better?". Most importantly, it will allow her (or him) to be productive and feel he can help the team's effort instead of the constant-chaos feeling managers tend to have when things go poorly. The team members are aware of the plans and can balance the efforts to break loose of this chaos-like feeling.

   

Posted by Oren Ellenbogen 
30/09/2008 06:07, Israel time UTC-07:00,     Comments [0]  | 

Think about a young fellow, wanting to get into shape. Here are 2 scenarios of how one can tackle that wish:

1. Create a plan and "manage yourself" to keep it.

   20:00 - set the clock to 06:00
   06:00 - wake up, make sure you're not falling back to sleep! get dressed etc
   06:15 - make sure to eat something small and drink some water
   06:35 - run 5km
   08:00 - make sure your bag is ready for school
   08:15 - go to school
   ...
   20:00 - set the clock to 06:00


2.
Organize things to drive your day:

   20:00 - put your shoes next to your bed, set your clock to 06:00 and set it FAR from the bed, prepare a little something to eat for tomorrow morning, prepare your bag for (tomorrow's) school.
   06:00 - wake up - go close the damn clock (you need to stand in order to do it), get dressed etc.
   06:15 - eat something small and drink some water
   06:35 - run 5km
   08:00 - relax for a few minutes and then go to school
   ...
   20:00 - put your shoes next to your bed, set your clock to 06:00 and set it FAR from the bed, prepare a little something to eat for tomorrow morning, prepare your bag for (tomorrow's) school.


Where most of us fail?

I might be wrong, but it seems that a lot of us (myself included) simply can't manage our time wisely during the day, for long period of time. It's too easy to forget something to do TODAY when you had to force yourself to plan it TODAY. It's too easy to fail. It's too easy to stop the rhythm.

This is why most of us can't lose weight, can't get into shape, can't read 5 books every month etc.


Driven by Self-organization

To me, it means that I want to set my environment to drive me into success. If I'll take the time to prepare my tomorrow, little chance I'll fail due to laziness: (1) I'm planning for TOMORROW, so what do I care to invest the time? I only plan things and set the environment / mood. I don't need to run the 5km now, I only want to make sure it will be easier to achieve tomorrow and (2) tomorrow morning, surprise surprise, everything is ready for me! I don't need to wake up and find out that I forgot a stupid thing like the fact that my shoes are in the washing machine. The rules are pretty easy: de-couple planning from performing and make sure everything you need for those actions are visible and available for the time you'll need them.


Although it seems like planning is more tedious than actually doing the task, planning for the future is quite relaxing. Try it: plan your tomorrow at the end of today, think about what should be set so your tomorrow will go smoothly. It will make it easier for you to come in the morning and simply perform, without the burden of planning fast to perform now:

"hhmmm... alright, it's Wednesday, what should I do today... gosh, so many things to complete! Maybe I'll start with sending those emails... naaa.. don't have power for that now.. maybe I'll finish that task I promised yesterday! naa.. she don't need it for today anyway... crap! I don't have the energy to deal with it! alright, first thing is to grab a cup of coffee".

This leads to "do what I like best first" syndrome.


At the end of the day, before going home, go over the things you've accomplished and try to see what is left for tomorrow. Relax, you don't need to do those chores now. Write everything you should accomplish for tomorrow down (with time estimations) and go home smiling, knowing that your tomorrow is best planned for achievements rather than for your personal whims.

   

Posted by Oren Ellenbogen 
30/09/2008 05:12, Israel time UTC-07:00,     Comments [1]  | 
# Tuesday, September 16, 2008

I'm preparing a set of articles regarding Microsoft CCR and Parallel Extensions, trying to explain the multi threading libraries supplied by MS and how they are going to change our life in the multi-core world.

I thought about starting with some architecture point of view for each library, going over the data structures and then demonstrate usage for real life scenarios. I'll start with level 100 articles and dive deeper until complex scenarios and how to harness the framework's full power to your needs.

We had the pleasure here at Delver working with CCR so I'll share with you some of the code base we created on top of it, some tips for using it correctly and my take about using these libraries in mainstream applications.

 

Some nice articles to get you started if you didn't hear about these libraries so far:

Parallel Programming with .NET : Most Common Performance Issues in Parallel Programs

Concurrent Affairs: Concurrency and Coordination Runtime -- MSDN Magazine, September 2006

Channel9 Wiki: Concurrency Runtime

Parallel Programming with .NET : Coordination Data Structures Overview

Parallel Programming with .NET : Fork/Join parallelism with .NET CountdownEvent

Parallel Programming with .NET : Wrapping an APM implementation with Future<T>

 

Is there anything specific you're interested to read about regarding those topics ?

   

Posted by Oren Ellenbogen 
16/09/2008 02:58, Israel time UTC-07:00,     Comments [0]  |