FileStreamer

Requirements:

  1. Be able to read all the lines in a given file.
  2. Be able to do so even if the file is HUGE ( == don’t load it all at once).
  3. Control the number of items I want to receive and whether or not the enumerator ignore empty lines. Always nice to have.
  4. Thread-safe should be supported easily. Think about 50 threads, each reading the next line and processing it.
  5. Nice performance is a plus.

Playing with the API with my teammate Ron gave the following (code written in notepad, stupidity won’t compile):

“Common foreach” usage:

foreach (string line in FileStreamer.GetLines(@”c:\temp\myfile.txt”, true, 1000)) { /* .. code */ } // read 1000 items from the file while ignoring empty lines.

Reading from multiple threads usage:

using (FileStreamer streamer = new FileStreamer(@”c:\temp\myfile.txt”, true, -1)) // -1 means no limit, read all non-empty lines
{
    Thread[] threads = new Thread[10];
    for(int i=0; i<threads.Length; i++)
    {
        threads[i] = new Thread((ThreadStart)delegate {
            string line;
            if (!streamer.TryGetNextLine(out line)) // thread safe!
                return; // end of file, we can exit

            // do work …
        });

        threads[i].Start();
    }

    // join the threads + whatever …
}

 

After reading a few ideas in stackoverflow, I thought to share my solution:    

// written by bogen (30/12/2008)
 
#region using
 
using System;
using System.Collections.Generic;
using System.IO;
 
#endregion
 
namespace Semingo.Common.Utils
{
    /// <summary>
    /// Return a stream of lines for the specified file.
    /// This class is thread safe by design!
    /// Use the static method FileStreamer.GetLines for not thread safe usage (via foreach)
    /// </summary>
    public class FileStreamer : IDisposable
    {
        #region fields
 
        private readonly object _locker = new object();
        private readonly string _path;
        private readonly bool _ignoreEmptyLines;
        private readonly int _limit;
        private readonly IEnumerator<string> _enumerator;
        private int _linesGiven;
        private bool _disposed;
 
        #endregion
 
        #region ctors
 
        /// <summary>
        /// Create a file streamer instance
        /// </summary>
        /// <param name="path">File path</param>
        public FileStreamer(string path) : this(path, false, -1)
        {
        }
 
        /// <summary>
        /// Create a file streamer instance
        /// </summary>
        /// <param name="path">File path</param>
        /// <param name="ignoreEmptyLines">Should the streamer avoid empty lines</param>
        /// <param name="limit">Number of maximum lines the streamer should return. Send -1 for no limit</param>
        public FileStreamer(string path, bool ignoreEmptyLines, int limit)
        {
            if (!File.Exists(path))
                throw new ArgumentException("Cannot find the file: " + path);
            if (limit != -1 && limit <=0 )
                throw new ArgumentException("Limit must be bigger than 0 (or -1 for no limit) but was: " + limit + ". File given was: " + path);
 
            _path = path;
            _ignoreEmptyLines = ignoreEmptyLines;
            _limit = limit;
            
            _enumerator = CreateStream().GetEnumerator();
        }
 
        #endregion
 
        #region public API
 
        public bool TryGetNextLine(out string nextItem)
        {
            lock (_locker)
            {
                return TryGetNextLineAssumingInsideLock(out nextItem);
            }
        }
 
        public bool TryGetNextLines(out ICollection<string> nextItems, int howMany)
        {
            if (howMany <= 0)
                throw new ArgumentException("'howMany' parameter must be > 0 but was " + howMany, "howMany");
 
            nextItems = new List<string>(howMany);
            lock (_locker)
            {
                string nextItem;
                for(int i=0; i<howMany; i++)
                {
                    if (!TryGetNextLineAssumingInsideLock(out nextItem))
                        break; // no more lines (EOF)
                    
                    nextItems.Add(nextItem);
                }
            }
 
            return nextItems.Count > 0;
        }
       
        public static IEnumerable<string> GetLines(string path)
        {
            return GetLines(path, false, -1);
        }
 
        /// <summary>
        /// 
        /// </summary>
        /// <param name="path"></param>
        /// <param name="ignoreEmptyLines"></param>
        /// <param name="limit">send -1 for no limit</param>
        /// <returns></returns>
        public static IEnumerable<string> GetLines(string path, bool ignoreEmptyLines, int limit)
        {
            using (FileStreamer streamer = new FileStreamer(path, ignoreEmptyLines, limit))
            {
                string nextItem;
                while (streamer.TryGetNextLine(out nextItem))
                    yield return nextItem;
 
                yield break; // EOF
            }
        }
 
        ///<summary>
        ///Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
        ///</summary>
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
 
        #endregion
 
        #region private API
 
        /// <summary>
        /// Get the next line in the file.
        /// dev: assume that the lock is from the outside, by the caller (this is why it's a private method)
        /// </summary>
        private bool TryGetNextLineAssumingInsideLock(out string nextItem)
        {
            nextItem = null;
            if (_linesGiven == _limit)
                return false; // we reached the limit, no more please.
 
            if (!_enumerator.MoveNext())
                return false; // end of stream (EOF)
 
            nextItem = _enumerator.Current;
            _linesGiven++;
            return true;
        }
 
        private IEnumerable<string> CreateStream()
        {
            using (FileStream fs = new FileStream(_path, FileMode.Open, FileAccess.Read, FileShare.Read, 1024, FileOptions.SequentialScan))
            using (StreamReader reader = new StreamReader(fs))
            {
                string line;
                while ((line = reader.ReadLine()) != null)
                {
                    if (_ignoreEmptyLines && line == string.Empty)
                        continue; // skip empty lines if needed
 
                    yield return line;
                }
 
                yield break;
            }
        }
 
        protected void Dispose(bool disposing)
        {
            if (_disposed)
                return;
 
            if (disposing)
            {
                _enumerator.Dispose();
            }
 
            _disposed = true;
        }
 
        #endregion
    }
}
 

Oren Ellenbogen