[I] System.io.pipelines

  System.IO.Pipelines is a new library designed to make it easier to perform high-performance I/O in .NET. The goal of this library is the .NET Standard applicable to all .NET implementations.

What problem does System.IO.Pipelines solve

  System.IO.Pipelines has been built as:

< ul>

  • With high-performance streaming data analysis function.
  • Reduce code complexity.
  • The following code is a typical TCP server, which receives line-separated messages from the client (by code>’
    ‘ separated):

    async Task ProcessLinesAsync(NetworkStream stream )
    {
        var buffer = new byte[1024];
        await stream.ReadAsync(buffer, 0, buffer.Length);
        
        // Process a single line from the buffer
        ProcessLine(buffer);
    }

    The previous code has several problems:

    • A single call to ReadAsync may not be able to receive the entire message (end of line).
    • The result of stream.ReadAsync is ignored. stream.ReadAsync Returns the amount of data read.
    • It cannot handle the case of reading multiple lines in a single ReadAsync call .
    • It allocates a byte array for each read.

    To solve the above problem, the following changes are required:

    • Buffer the incoming data until a new row is found.

    • Analyze all the rows returned in the buffer.

    • The line may be larger than 1KB (1024 bytes). Find the code (a complete line of code) that needs to adjust the size of the input buffer.

      • If you adjust the size of the buffer, when a long line appears in the input, the Generate more buffer copies.
      • Compress the buffer used to read the line to reduce the margin.
    • Please consider using a buffer pool to avoid re-allocation of memory.

    The following code solves some of these problems:< /span>

    share picture

     1< /span> async Task ProcessLinesAsync(NetworkStream stream)
    
    2 {
    3 byte[] buffer = ArrayPool<byte>.Shared.Rent(1024 );
    4 var bytesBuffered = 0;
    5 var bytesConsumed = 0;
    6
    7 while (true)
    8 {
    9 // Calculate the amount of bytes remaining in the buffer.
    10 var bytesRemaining = buffer.Length - bytesBuffered;
    11
    12 if (bytesRemaining == 0)
    13 {
    14 // Double the buffer size and copy the previously buffered data into the new buffer.
    15 var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
    16 Buffer.BlockCopy(buffer, 0, newBuffer , 0, buffer.Length);
    17 // Return the old buffer to the pool.
    18 ArrayPool<byte>.Shared.Return(buffer);
    19 buffer = newBuffer;
    20 bytesRemaining = buffer.Length - bytesBuffered;
    21 }
    22
    23 var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
    24 if (bytesRead == 0)
    25 {
    26 // EOF
    27 break ;
    28 }
    29
    30 // Keep track of the amount of buffered bytes.
    31 bytesBuffered += bytesRead;
    32 var linePosition = -1;
    33
    34 do
    35 {
    36 // Look for a EOL in the buffered data.
    37 linePosition = Array.IndexOf(buffer, (byte)' ' , bytesConsumed,
    38 bytesBuffered - bytesConsumed);
    39
    40 if (linePosition >= 0)
    41 {
    42 // Calculate the length of the line based on the offset.
    43 var lineLength = linePosition - bytesConsumed;
    44
    45 // Process the line.
    46 ProcessLine(buffer, bytesConsumed, lineLength);
    47
    48 // Move the bytesConsumed to skip past the line consumed (including ).
    49 bytesConsumed += lineLength + 1;
    50 }
    51 }
    52 while (linePosition >= 0);
    53 }
    54 }

    View Code

    Pipe

    The Pipe class can be used to create a PipeWriter/PipeReader pair. All data written to PipeWriter can be used for PipeReader:

    var pipe = new Pipe();
    PipeReader reader = pipe.Reader;
    PipeWriter writer = pipe.Writer;
    

    Pipe basic usage

    share picture

     1 async Task ProcessLinesAsync(Socket socket)
    
    2 {
    3 var pipe = new Pipe();
    4 Task writing = FillPipeAsync(socket, pipe.Writer);
    5 Task reading = ReadPipeAsync(pipe.Reader);
    6
    7 await Task.WhenAll(reading, writing);
    8 }
    9
    10 async Task FillPipeAsync(Socket socket, PipeWriter writer)
    11 {
    12 const int minimumBufferSize = 512;
    13
    14 while (true)
    15 {
    16 // Allocate at least 512 bytes from the PipeWriter.
    17 Memory<byte> memory = writer.GetMemory(minimumBufferSize);
    18 try
    19 {
    20 int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
    21 if (bytesRead == 0)
    22 {
    23 break;
    24 }
    25 // Tell the PipeWriter how much was read from the Socket.
    26 writer.Advance(bytesRead);
    27 }
    28 catch (Exception ex)
    29 {
    30 LogError(ex);
    31 break;
    32 }
    33
    34 // Make the data available to the PipeReader.
    35 FlushResult result = await writer.FlushAsync();
    36
    37 if (result.IsCompleted)
    38 {
    39 break;
    40 }
    41 }
    42
    43 // By completing PipeWriter, tell the PipeReader that there's no more data coming.
    44 await writer.CompleteAsync();
    45 }
    46
    47 async Task ReadPipeAsync(PipeReader reader)
    48 {
    49 while (true)
    50 {
    51 ReadResult result = await reader.ReadAsync();
    52 ReadOnlySequence<byte> buffer = result.Buffer;
    53
    54 while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
    55 {
    56 // Process the line.
    57 ProcessLine(line);
    58 }
    59
    60 // Tell the PipeReader how much of the buffer has been consumed.
    61 reader.AdvanceTo(buffer.Start, buffer.End);
    62
    63 // Stop reading if there's no more data coming.
    64 if (result.IsCompleted)
    65 {
    66 break;
    67 }
    68 }
    69
    70 // Mark the PipeReader as complete.
    71 await reader.CompleteAsync();
    72 }
    73
    74 bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
    75 {
    76 // Look for a EOL in the buffer.
    77 SequencePosition? position = buffer.PositionOf((byte)' ' );
    78
    79 if (position == null)
    80 {
    81 line = default;
    82 return false;
    83 }
    84
    85 // Skip the line + the .
    86 line = buffer.Slice(0, position.Value);
    87 buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    88 return true;
    89 }

    View Code

    There are two cycles:

    • FillPipeAsync Read from Socket and write to PipeWriter.
    • ReadPipeAsync Read and analyze the incoming line from PipeReader .

    No explicit buffer is allocated. All buffer management is entrusted to PipeReader and PipeWriter. Delegating buffer management makes it easier to use code to focus on business logic.

    In the first loop:

    • Call PipeWriter.GetMemory(Int32) to get memory from the basic writer.
    • Call PipeWriter.Advance(Int32) to tell PipeWriter how much data has been written into the buffer .
    • Call PipeWriter.FlushAsync to make the data available to PipeReader.

    In the second loop, PipeReader is used by PipeWriter The buffer for writing. The buffer comes from a socket. Call to PipeReader.ReadAsync:

    • Returns ReadResult containing two important pieces of information:

      • Data read in the form of ReadOnlySequence.
      • The boolean IsCompleted indicates whether the end of data (EOF) has been reached.

    Find the end of line (EOL) separator and analyze the line after :

    • This logic processes the buffer to skip the processed content.
    • Call PipeReader.AdvanceTo to inform that PipeReader has been consumed and checked How much data was collected.

    The reader and writer loop ends by calling Complete. Complete Make the basic pipeline release its allocated memory.

    Backpressure and flow control< /h3>

    Ideally, reading and analysis can work together:

    • The write thread uses the data from the network and puts it in the buffer.
    • The analysis thread is responsible for constructing the appropriate data structure.

    In general, analysis takes longer than just copying data blocks from the network: span>

    • The reading thread is ahead of the analysis thread.
    • The reading thread must slow down or allocate more memory to store data for the analysis thread.

    For optimal performance, a balance needs to be struck between frequent pauses and allocating more memory.

    In order to solve the above problems, Pipe provides two settings to control data flow: span>

    • PauseWriterThreshold: Determine how much data should be buffered before calling FlushAsync to pause.
    • ResumeWriterThreshold: Make sure that the reader must observe before resuming the call to PipeWriter.FlushAsync How much data.

    Picture with ResumeWriterThreshold and PauseWriterThreshold

    PipeWriter.FlushAsync:

    • When the amount of data in Pipe exceeds PauseWriterThreshold, an incomplete ValueTask is returned.
    • When it is lower than ResumeWriterThreshold, return the complete ValueTask.

    Using two values ​​prevents rapid looping, which can happen if only one value is used .

    async Task ProcessLinesAsync(NetworkStream stream)
    {
        var buffer = new byte[1024];
        await stream.ReadAsync(buffer, 0, buffer.Length);
        
        // Process a single line from the buffer
        ProcessLine(buffer);
    }

    share picture

     1 async Task ProcessLinesAsync(NetworkStream stream)
    
    2 {
    3 byte[] buffer = ArrayPool<byte>.Shared.Rent(1024 );
    4 var bytesBuffered = 0;
    5 var bytesConsumed = 0;
    6
    7 while (true)
    8 {
    9 // Calculate the amount of bytes remaining in the buffer.
    10 var bytesRemaining = buffer.Length - bytesBuffered;
    11
    12 if (bytesRemaining == 0)
    13 {
    14 // Double the buffer size and copy the previously buffered data into the new buffer.
    15 var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
    16 Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
    17 // Return the old buffer to the pool.
    18 ArrayPool<byte>.Shared.Return(buffer);
    19 buffer = newBuffer;
    20 bytesRemaining = buffer.Length - bytesBuffered;
    21 }
    22
    23 var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
    24 if (bytesRead == 0)
    25 {
    26 // EOF
    27 break;
    28 }
    29
    30 // Keep track of the amount of buffered bytes.
    31 bytesBuffered += bytesRead;
    32 var linePosition = -1;
    33
    34 do
    35 {
    36 // Look for a EOL in the buffered data.
    37 linePosition = Array.IndexOf(buffer, (byte) , bytesConsumed,
    38 bytesBuffered - bytesConsumed);
    39
    40 if (linePosition >= 0)
    41 {
    42 // Calculate the length of the line based on the offset.
    43 var lineLength = linePosition - bytesConsumed;
    44
    45 // Process the line.
    46 ProcessLine(buffer, bytesConsumed, lineLength);
    47
    48 // Move the bytesConsumed to skip past the line consumed (including ).
    49 bytesConsumed += lineLength + 1;
    50 }
    51 }
    52 while (linePosition >= 0);
    53 }
    54 }

    View Code

     1 async Task ProcessLinesAsync(NetworkStream stream)
    
    2 {
    3 byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
    4 var bytesBuffered = 0;
    5 var bytesConsumed = 0;
    6
    7 while (true)
    8 {
    9 // Calculate the amount of bytes remaining in the buffer.
    10 var bytesRemaining = buffer.Length - bytesBuffered;
    11
    12 if (bytesRemaining == 0)
    13 {
    14 // Double the buffer size and copy the previously buffered data into the new buffer.
    15 var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
    16 Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
    17 // Return the old buffer to the pool.
    18 ArrayPool<byte>.Shared.Return(buffer);
    19 buffer = newBuffer;
    20 bytesRemaining = buffer.Length - bytesBuffered;
    21 }
    22
    23 var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
    24 if (bytesRead == 0)
    25 {
    26 // EOF
    27 break;
    28 }
    29
    30 // Keep track of the amount of buffered bytes.
    31 bytesBuffered += bytesRead;
    32 var linePosition = -1;
    33
    34 do
    35 {
    36 // Look for a EOL in the buffered data.
    37 linePosition = Array.IndexOf(buffer, (byte) , bytesConsumed,
    38 bytesBuffered - bytesConsumed);
    39
    40 if (linePosition >= 0)
    41 {
    42 // Calculate the length of the line based on the offset.
    43 var lineLength = linePosition - bytesConsumed;
    44
    45 // Process the line.
    46 ProcessLine(buffer, bytesConsumed, lineLength);
    47
    48 // Move the bytesConsumed to skip past the line consumed (including ).
    49 bytesConsumed += lineLength + 1;
    50 }
    51 }
    52 while (linePosition >= 0);
    53 }
    54 }

    var pipe = new Pipe();
    PipeReader reader = pipe.Reader;
    PipeWriter writer = pipe.Writer;
    

    分享图片

     1 async Task ProcessLinesAsync(Socket socket)
    
    2 {
    3 var pipe = new Pipe();
    4 Task writing = FillPipeAsync(socket, pipe.Writer);
    5 Task reading = ReadPipeAsync(pipe.Reader);
    6
    7 await Task.WhenAll(reading, writing);
    8 }
    9
    10 async Task FillPipeAsync(Socket socket, PipeWriter writer)
    11 {
    12 const int minimumBufferSize = 512;
    13
    14 while (true)
    15 {
    16 // Allocate at least 512 bytes from the PipeWriter.
    17 Memory<byte> memory = writer.GetMemory(minimumBufferSize);
    18 try
    19 {
    20 int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
    21 if (bytesRead == 0)
    22 {
    23 break;
    24 }
    25 // Tell the PipeWriter how much was read from the Socket.
    26 writer.Advance(bytesRead);
    27 }
    28 catch (Exception ex)
    29 {
    30 LogError(ex);
    31 break;
    32 }
    33
    34 // Make the data available to the PipeReader.
    35 FlushResult result = await writer.FlushAsync();
    36
    37 if (result.IsCompleted)
    38 {
    39 break;
    40 }
    41 }
    42
    43 // By completing PipeWriter, tell the PipeReader that there‘s no more data coming.
    44 await writer.CompleteAsync();
    45 }
    46
    47 async Task ReadPipeAsync(PipeReader reader)
    48 {
    49 while (true)
    50 {
    51 ReadResult result = await reader.ReadAsync();
    52 ReadOnlySequence<byte> buffer = result.Buffer;
    53
    54 while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
    55 {
    56 // Process the line.
    57 ProcessLine(line);
    58 }
    59
    60 // Tell the PipeReader how much of the buffer has been consumed.
    61 reader.AdvanceTo(buffer.Start, buffer.End);
    62
    63 // Stop reading if there‘s no more data coming.
    64 if (result.IsCompleted)
    65 {
    66 break;
    67 }
    68 }
    69
    70 // Mark the PipeReader as complete.
    71 await reader.CompleteAsync();
    72 }
    73
    74 bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
    75 {
    76 // Look for a EOL in the buffer.
    77 SequencePosition? position = buffer.PositionOf((byte) );
    78
    79 if (position == null)
    80 {
    81 line = default;
    82 return false;
    83 }
    84
    85 // Skip the line + the .
    86 line = buffer.Slice(0, position.Value);
    87 buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    88 return true;
    89 }

    View Code

     1 async Task ProcessLinesAsync(Socket socket)
    
    2 {
    3 var pipe = new Pipe();
    4 Task writing = FillPipeAsync(socket, pipe.Writer);
    5 Task reading = ReadPipeAsync(pipe.Reader);
    6
    7 await Task.WhenAll(reading, writing);
    8 }
    9
    10 async Task FillPipeAsync(Socket socket, PipeWriter writer)
    11 {
    12 const int minimumBufferSize = 512;
    13
    14 while (true)
    15 {
    16 // Allocate at least 512 bytes from the PipeWriter.
    17 Memory<byte> memory = writer.GetMemory(minimumBufferSize);
    18 try
    19 {
    20 int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
    21 if (bytesRead == 0)
    22 {
    23 break;
    24 }
    25 // Tell the PipeWriter how much was read from the Socket.
    26 writer.Advance(bytesRead);
    27 }
    28 catch (Exception ex)
    29 {
    30 LogError(ex);
    31 break;
    32 }
    33
    34 // Make the data available to the PipeReader.
    35 FlushResult result = await writer.FlushAsync();
    36
    37 if (result.IsCompleted)
    38 {
    39 break;
    40 }
    41 }
    42
    43 // By completing PipeWriter, tell the PipeReader that there‘s no more data coming.
    44 await writer.CompleteAsync();
    45 }
    46
    47 async Task ReadPipeAsync(PipeReader reader)
    48 {
    49 while (true)
    50 {
    51 ReadResult result = await reader.ReadAsync();
    52 ReadOnlySequence<byte> buffer = result.Buffer;
    53
    54 while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
    55 {
    56 // Process the line.
    57 ProcessLine(line);
    58 }
    59
    60 // Tell the PipeReader how much of the buffer has been consumed.
    61 reader.AdvanceTo(buffer.Start, buffer.End);
    62
    63 // Stop reading if there‘s no more data coming.
    64 if (result.IsCompleted)
    65 {
    66 break;
    67 }
    68 }
    69
    70 // Mark the PipeReader as complete.
    71 await reader.CompleteAsync();
    72 }
    73
    74 bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
    75 {
    76 // Look for a EOL in the buffer.
    77 SequencePosition? position = buffer.PositionOf((byte) );
    78
    79 if (position == null)
    80 {
    81 line = default;
    82 return false;
    83 }
    84
    85 // Skip the line + the .
    86 line = buffer.Slice(0, position.Value);
    87 buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
    88 return true;
    89 }

    Leave a Comment

    Your email address will not be published.