adam-gligor monologue about stuff, brain dump

Pipeline to stream data to a zip file

.Net coding challenge, how to export huge amounts of data from a data source like a database into flat files efficiently.

Solution

Build a streaming pipeline. The producer end reads and transforms the data, the consumer end archives and writes to a file.

This will be a c# implementation. I’ll use Streams and Enumerables.

Streaming data from a database is supported using Ado.Net's DataReader or EntityFramework deferred execution operator like Select, these produce an IEnumerable. IEnumerable exposes and enumerator which supports iteration of the result.

Files work with the Stream abstraction. Streams can be stacked to produce the compressed file directly. Wrap a FileStream into a GZipStream to write and compress at the same time. Streams work with byte arrays.

What’s missing is a way to convert the producer format IEnumerable into the consumer format Stream

An enumerable stream

So the simplest thing I came up with, say I have a byte enumerable, and I want to convert that into a stream that supports reading.

The official documentation on the Stream class provides a good guideline of implementing a custom stream.

I have to override a single method from the stream Read, there’s the implementation:

 
    public class EnumerableStream : Stream
    {
        private readonly IEnumerator<byte> _source;

        public EnumerableStream(IEnumerable<byte> source)
        {
            _source = source.GetEnumerator();
        }

        public override bool CanRead
        {
            get { return true; }
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            var read = 0;

            while (_source.MoveNext())
            {
                buffer[offset + read] = _source.Current;
                read++;

                if (read == count)
                    break;
            }

            return read;
        }

        // override all other required stream methods and throw `NotImplementedException`
    }

The pipeline

With this bit in place, all I have to do is stitch everything together. The main routine looks like this, with comments explaining the relevant lines.

class Program
{
    // this is the producer end 
    // use database calls here, I use just a generator function to illustrate the concept
    internal static IEnumerable<(int Id, string Title)> FetchAll()
    {
        return Enumerable.Range(0, 100000).Select(n => (n, n.ToString()));
    }

    static void Main(string[] args)
    {

        var allBytes = 
            //1. stream the data from the data source
            FetchAll() 
            //2. transform a source object into a string
            .Select(r => $"{r.Id}|{r.Title}\r\n") 
            //3. encode the string into a byte array
            .Select(s => Encoding.UTF8.GetBytes(s)) 
            //4. flatten the sequence
            .SelectMany(l => l);

        //5. conversion from IEnumerable to a readable Stream
        using (var sourceStream = new EnumerableStream(allBytes)) 
        {
            using (FileStream compressedFileStream = File.Create($"out.csv.gz.tmp"))
            {
                using (var compressionStream = 
                    new GZipStream(compressedFileStream, CompressionMode.Compress))
                {
                    //6. read the source stream and write it to the destination stream 
                    sourceStream.CopyTo(compressionStream);
                }
            }
        }

        File.Move("out.csv.gz.tmp", "out.csv.gz");
    }
}

NOTE

The name of the file inside the archive is inferred from the name of the archive. In my case unpacking a out.csv.gz results in a file called out.csv