System.IO.Pipelines is a new library designed to make it easier to perform high-performance I/O in .NET. The goal of this library is the .NET Standard applicable to all .NET implementations.
What problem does System.IO.Pipelines solve
System.IO.Pipelines has been built as:
< ul>
The following code is a typical TCP server, which receives line-separated messages from the client (by code>’
‘ separated):
async Task ProcessLinesAsync(NetworkStream stream )
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
The previous code has several problems:
- A single call to
ReadAsyncmay not be able to receive the entire message (end of line). - The result of
stream.ReadAsyncis ignored.stream.ReadAsyncReturns the amount of data read. - It cannot handle the case of reading multiple lines in a single
ReadAsynccall . - It allocates a
bytearray for each read.
To solve the above problem, the following changes are required:
-
Buffer the incoming data until a new row is found.
-
Analyze all the rows returned in the buffer.
-
The line may be larger than 1KB (1024 bytes). Find the code (a complete line of code) that needs to adjust the size of the input buffer.
- If you adjust the size of the buffer, when a long line appears in the input, the Generate more buffer copies.
- Compress the buffer used to read the line to reduce the margin.
-
Please consider using a buffer pool to avoid re-allocation of memory.
The following code solves some of these problems:< /span>
1< /span> async Task ProcessLinesAsync(NetworkStream stream)
2 {
3 byte[] buffer = ArrayPool<byte>.Shared.Rent(1024 );
4 var bytesBuffered = 0;
5 var bytesConsumed = 0;
6
7 while (true)
8 {
9 // Calculate the amount of bytes remaining in the buffer.
10 var bytesRemaining = buffer.Length - bytesBuffered;
11
12 if (bytesRemaining == 0)
13 {
14 // Double the buffer size and copy the previously buffered data into the new buffer.
15 var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
16 Buffer.BlockCopy(buffer, 0, newBuffer , 0, buffer.Length);
17 // Return the old buffer to the pool.
18 ArrayPool<byte>.Shared.Return(buffer);
19 buffer = newBuffer;
20 bytesRemaining = buffer.Length - bytesBuffered;
21 }
22
23 var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
24 if (bytesRead == 0)
25 {
26 // EOF
27 break ;
28 }
29
30 // Keep track of the amount of buffered bytes.
31 bytesBuffered += bytesRead;
32 var linePosition = -1;
33
34 do
35 {
36 // Look for a EOL in the buffered data.
37 linePosition = Array.IndexOf(buffer, (byte)'
' , bytesConsumed,
38 bytesBuffered - bytesConsumed);
39
40 if (linePosition >= 0)
41 {
42 // Calculate the length of the line based on the offset.
43 var lineLength = linePosition - bytesConsumed;
44
45 // Process the line.
46 ProcessLine(buffer, bytesConsumed, lineLength);
47
48 // Move the bytesConsumed to skip past the line consumed (including
).
49 bytesConsumed += lineLength + 1;
50 }
51 }
52 while (linePosition >= 0);
53 }
54 }
View Code div>
Pipe
The Pipe class can be used to create a PipeWriter/PipeReader pair. All data written to PipeWriter can be used for PipeReader:
var pipe = new Pipe(); PipeReader reader = pipe.Reader; PipeWriter writer = pipe.Writer;
Pipe basic usage
1 async Task ProcessLinesAsync(Socket socket)
2 {
3 var pipe = new Pipe();
4 Task writing = FillPipeAsync(socket, pipe.Writer);
5 Task reading = ReadPipeAsync(pipe.Reader);
6
7 await Task.WhenAll(reading, writing);
8 }
9
10 async Task FillPipeAsync(Socket socket, PipeWriter writer)
11 {
12 const int minimumBufferSize = 512;
13
14 while (true)
15 {
16 // Allocate at least 512 bytes from the PipeWriter.
17 Memory<byte> memory = writer.GetMemory(minimumBufferSize);
18 try
19 {
20 int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
21 if (bytesRead == 0)
22 {
23 break;
24 }
25 // Tell the PipeWriter how much was read from the Socket.
26 writer.Advance(bytesRead);
27 }
28 catch (Exception ex)
29 {
30 LogError(ex);
31 break;
32 }
33
34 // Make the data available to the PipeReader.
35 FlushResult result = await writer.FlushAsync();
36
37 if (result.IsCompleted)
38 {
39 break;
40 }
41 }
42
43 // By completing PipeWriter, tell the PipeReader that there's no more data coming.
44 await writer.CompleteAsync();
45 }
46
47 async Task ReadPipeAsync(PipeReader reader)
48 {
49 while (true)
50 {
51 ReadResult result = await reader.ReadAsync();
52 ReadOnlySequence<byte> buffer = result.Buffer;
53
54 while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
55 {
56 // Process the line.
57 ProcessLine(line);
58 }
59
60 // Tell the PipeReader how much of the buffer has been consumed.
61 reader.AdvanceTo(buffer.Start, buffer.End);
62
63 // Stop reading if there's no more data coming.
64 if (result.IsCompleted)
65 {
66 break;
67 }
68 }
69
70 // Mark the PipeReader as complete.
71 await reader.CompleteAsync();
72 }
73
74 bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
75 {
76 // Look for a EOL in the buffer.
77 SequencePosition? position = buffer.PositionOf((byte)' ' );
78
79 if (position == null)
80 {
81 line = default;
82 return false;
83 }
84
85 // Skip the line + the .
86 line = buffer.Slice(0, position.Value);
87 buffer = buffer.Slice(buffer.GetPosition(1 span>, position.Value));
88 return true;
89 }
View Code div>
There are two cycles:
FillPipeAsyncRead fromSocketand write toPipeWriter.ReadPipeAsyncRead and analyze the incoming line fromPipeReader.
No explicit buffer is allocated. All buffer management is entrusted to PipeReader and PipeWriter. Delegating buffer management makes it easier to use code to focus on business logic.
In the first loop:
- Call PipeWriter.GetMemory(Int32) to get memory from the basic writer.
- Call PipeWriter.Advance(Int32) to tell
PipeWriterhow much data has been written into the buffer . - Call PipeWriter.FlushAsync to make the data available to
PipeReader.
In the second loop, PipeReader is used by PipeWriter The buffer for writing. The buffer comes from a socket. Call to PipeReader.ReadAsync:
-
Returns ReadResult containing two important pieces of information:
- Data read in the form of
ReadOnlySequence. - The boolean
IsCompletedindicates whether the end of data (EOF) has been reached.
- Data read in the form of
Find the end of line (EOL) separator and analyze the line after :
- This logic processes the buffer to skip the processed content.
- Call
PipeReader.AdvanceToto inform thatPipeReaderhas been consumed and checked How much data was collected.
The reader and writer loop ends by calling Complete. Complete Make the basic pipeline release its allocated memory.
Backpressure and flow control< /h3>
Ideally, reading and analysis can work together:
- The write thread uses the data from the network and puts it in the buffer.
- The analysis thread is responsible for constructing the appropriate data structure.
In general, analysis takes longer than just copying data blocks from the network: span>
- The reading thread is ahead of the analysis thread.
- The reading thread must slow down or allocate more memory to store data for the analysis thread.
For optimal performance, a balance needs to be struck between frequent pauses and allocating more memory.
In order to solve the above problems, Pipe provides two settings to control data flow: span>
- PauseWriterThreshold: Determine how much data should be buffered before calling FlushAsync to pause.
- ResumeWriterThreshold: Make sure that the reader must observe before resuming the call to
PipeWriter.FlushAsyncHow much data.

PipeWriter.FlushAsync:
- When the amount of data in
PipeexceedsPauseWriterThreshold, an incompleteValueTaskis returned. - When it is lower than
ResumeWriterThreshold, return the completeValueTask.
Using two values prevents rapid looping, which can happen if only one value is used .
async Task ProcessLinesAsync(NetworkStream stream)
{
var buffer = new byte[1024];
await stream.ReadAsync(buffer, 0, buffer.Length);
// Process a single line from the buffer
ProcessLine(buffer);
}
1 async Task ProcessLinesAsync(NetworkStream stream)
2 {
3 byte[] buffer = ArrayPool<byte>.Shared.Rent(1024 );
4 var bytesBuffered = 0;
5 var bytesConsumed = 0;
6
7 while (true)
8 {
9 // Calculate the amount of bytes remaining in the buffer.
10 var bytesRemaining = buffer.Length - bytesBuffered;
11
12 if (bytesRemaining == 0)
13 {
14 // Double the buffer size and copy the previously buffered data into the new buffer.
15 var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
16 Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
17 // Return the old buffer to the pool.
18 ArrayPool<byte>.Shared.Return(buffer);
19 buffer = newBuffer;
20 bytesRemaining = buffer.Length - bytesBuffered;
21 }
22
23 var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
24 if (bytesRead == 0)
25 {
26 // EOF
27 break;
28 }
29
30 // Keep track of the amount of buffered bytes.
31 bytesBuffered += bytesRead;
32 var linePosition = -1;
33
34 do
35 {
36 // Look for a EOL in the buffered data.
37 linePosition = Array.IndexOf(buffer, (byte)‘ ‘, bytesConsumed,
38 bytesBuffered - bytesConsumed);
39
40 if (linePosition >= 0)
41 {
42 // Calculate the length of the line based on the offset.
43 var lineLength = linePosition - bytesConsumed;
44
45 // Process the line.
46 ProcessLine(buffer, bytesConsumed, lineLength);
47
48 // Move the bytesConsumed to skip past the line consumed (including ).
49 bytesConsumed += lineLength + 1;
50 }
51 }
52 while (linePosition >= 0);
53 }
54 }
View Code
1 async Task ProcessLinesAsync(NetworkStream stream)
2 {
3 byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
4 var bytesBuffered = 0;
5 var bytesConsumed = 0;
6
7 while (true)
8 {
9 // Calculate the amount of bytes remaining in the buffer.
10 var bytesRemaining = buffer.Length - bytesBuffered;
11
12 if (bytesRemaining == 0)
13 {
14 // Double the buffer size and copy the previously buffered data into the new buffer.
15 var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
16 Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
17 // Return the old buffer to the pool.
18 ArrayPool<byte>.Shared.Return(buffer);
19 buffer = newBuffer;
20 bytesRemaining = buffer.Length - bytesBuffered;
21 }
22
23 var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
24 if (bytesRead == 0)
25 {
26 // EOF
27 break;
28 }
29
30 // Keep track of the amount of buffered bytes.
31 bytesBuffered += bytesRead;
32 var linePosition = -1;
33
34 do
35 {
36 // Look for a EOL in the buffered data.
37 linePosition = Array.IndexOf(buffer, (byte)‘ ‘, bytesConsumed,
38 bytesBuffered - bytesConsumed);
39
40 if (linePosition >= 0)
41 {
42 // Calculate the length of the line based on the offset.
43 var lineLength = linePosition - bytesConsumed;
44
45 // Process the line.
46 ProcessLine(buffer, bytesConsumed, lineLength);
47
48 // Move the bytesConsumed to skip past the line consumed (including ).
49 bytesConsumed += lineLength + 1;
50 }
51 }
52 while (linePosition >= 0);
53 }
54 }
var pipe = new Pipe(); PipeReader reader = pipe.Reader; PipeWriter writer = pipe.Writer;
1 async Task ProcessLinesAsync(Socket socket)
2 {
3 var pipe = new Pipe();
4 Task writing = FillPipeAsync(socket, pipe.Writer);
5 Task reading = ReadPipeAsync(pipe.Reader);
6
7 await Task.WhenAll(reading, writing);
8 }
9
10 async Task FillPipeAsync(Socket socket, PipeWriter writer)
11 {
12 const int minimumBufferSize = 512;
13
14 while (true)
15 {
16 // Allocate at least 512 bytes from the PipeWriter.
17 Memory<byte> memory = writer.GetMemory(minimumBufferSize);
18 try
19 {
20 int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
21 if (bytesRead == 0)
22 {
23 break;
24 }
25 // Tell the PipeWriter how much was read from the Socket.
26 writer.Advance(bytesRead);
27 }
28 catch (Exception ex)
29 {
30 LogError(ex);
31 break;
32 }
33
34 // Make the data available to the PipeReader.
35 FlushResult result = await writer.FlushAsync();
36
37 if (result.IsCompleted)
38 {
39 break;
40 }
41 }
42
43 // By completing PipeWriter, tell the PipeReader that there‘s no more data coming.
44 await writer.CompleteAsync();
45 }
46
47 async Task ReadPipeAsync(PipeReader reader)
48 {
49 while (true)
50 {
51 ReadResult result = await reader.ReadAsync();
52 ReadOnlySequence<byte> buffer = result.Buffer;
53
54 while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
55 {
56 // Process the line.
57 ProcessLine(line);
58 }
59
60 // Tell the PipeReader how much of the buffer has been consumed.
61 reader.AdvanceTo(buffer.Start, buffer.End);
62
63 // Stop reading if there‘s no more data coming.
64 if (result.IsCompleted)
65 {
66 break;
67 }
68 }
69
70 // Mark the PipeReader as complete.
71 await reader.CompleteAsync();
72 }
73
74 bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
75 {
76 // Look for a EOL in the buffer.
77 SequencePosition? position = buffer.PositionOf((byte)‘ ‘);
78
79 if (position == null)
80 {
81 line = default;
82 return false;
83 }
84
85 // Skip the line + the .
86 line = buffer.Slice(0, position.Value);
87 buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
88 return true;
89 }
View Code
1 async Task ProcessLinesAsync(Socket socket)
2 {
3 var pipe = new Pipe();
4 Task writing = FillPipeAsync(socket, pipe.Writer);
5 Task reading = ReadPipeAsync(pipe.Reader);
6
7 await Task.WhenAll(reading, writing);
8 }
9
10 async Task FillPipeAsync(Socket socket, PipeWriter writer)
11 {
12 const int minimumBufferSize = 512;
13
14 while (true)
15 {
16 // Allocate at least 512 bytes from the PipeWriter.
17 Memory<byte> memory = writer.GetMemory(minimumBufferSize);
18 try
19 {
20 int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
21 if (bytesRead == 0)
22 {
23 break;
24 }
25 // Tell the PipeWriter how much was read from the Socket.
26 writer.Advance(bytesRead);
27 }
28 catch (Exception ex)
29 {
30 LogError(ex);
31 break;
32 }
33
34 // Make the data available to the PipeReader.
35 FlushResult result = await writer.FlushAsync();
36
37 if (result.IsCompleted)
38 {
39 break;
40 }
41 }
42
43 // By completing PipeWriter, tell the PipeReader that there‘s no more data coming.
44 await writer.CompleteAsync();
45 }
46
47 async Task ReadPipeAsync(PipeReader reader)
48 {
49 while (true)
50 {
51 ReadResult result = await reader.ReadAsync();
52 ReadOnlySequence<byte> buffer = result.Buffer;
53
54 while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
55 {
56 // Process the line.
57 ProcessLine(line);
58 }
59
60 // Tell the PipeReader how much of the buffer has been consumed.
61 reader.AdvanceTo(buffer.Start, buffer.End);
62
63 // Stop reading if there‘s no more data coming.
64 if (result.IsCompleted)
65 {
66 break;
67 }
68 }
69
70 // Mark the PipeReader as complete.
71 await reader.CompleteAsync();
72 }
73
74 bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
75 {
76 // Look for a EOL in the buffer.
77 SequencePosition? position = buffer.PositionOf((byte)‘ ‘);
78
79 if (position == null)
80 {
81 line = default;
82 return false;
83 }
84
85 // Skip the line + the .
86 line = buffer.Slice(0, position.Value);
87 buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
88 return true;
89 }
WordPress database error: [Table 'yf99682.wp_s6mz6tyggq_comments' doesn't exist]SELECT SQL_CALC_FOUND_ROWS wp_s6mz6tyggq_comments.comment_ID FROM wp_s6mz6tyggq_comments WHERE ( comment_approved = '1' ) AND comment_post_ID = 2616 ORDER BY wp_s6mz6tyggq_comments.comment_date_gmt ASC, wp_s6mz6tyggq_comments.comment_ID ASC
