System.IO.Pipelines is a new library designed to make it easier to perform high-performance I/O in .NET. The goal of this library is the .NET Standard applicable to all .NET implementations.
What problem does System.IO.Pipelines solve
System.IO.Pipelines
has been built as:
< ul>
The following code is a typical TCP server, which receives line-separated messages from the client (by code>’
‘ separated):
async Task ProcessLinesAsync(NetworkStream stream ) { var buffer = new byte[1024]; await stream.ReadAsync(buffer, 0, buffer.Length); // Process a single line from the buffer ProcessLine(buffer); }
The previous code has several problems:
- A single call to
ReadAsync
may not be able to receive the entire message (end of line). - The result of
stream.ReadAsync
is ignored.stream.ReadAsync
Returns the amount of data read. - It cannot handle the case of reading multiple lines in a single
ReadAsync
call . - It allocates a
byte
array for each read.
To solve the above problem, the following changes are required:
-
Buffer the incoming data until a new row is found.
-
Analyze all the rows returned in the buffer.
-
The line may be larger than 1KB (1024 bytes). Find the code (a complete line of code) that needs to adjust the size of the input buffer.
- If you adjust the size of the buffer, when a long line appears in the input, the Generate more buffer copies.
- Compress the buffer used to read the line to reduce the margin.
-
Please consider using a buffer pool to avoid re-allocation of memory.
The following code solves some of these problems:< /span>
1< /span> async Task ProcessLinesAsync(NetworkStream stream)
2 {
3 byte[] buffer = ArrayPool<byte>.Shared.Rent(1024 );
4 var bytesBuffered = 0;
5 var bytesConsumed = 0;
6
7 while (true)
8 {
9 // Calculate the amount of bytes remaining in the buffer.
10 var bytesRemaining = buffer.Length - bytesBuffered;
11
12 if (bytesRemaining == 0)
13 {
14 // Double the buffer size and copy the previously buffered data into the new buffer.
15 var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
16 Buffer.BlockCopy(buffer, 0, newBuffer , 0, buffer.Length);
17 // Return the old buffer to the pool.
18 ArrayPool<byte>.Shared.Return(buffer);
19 buffer = newBuffer;
20 bytesRemaining = buffer.Length - bytesBuffered;
21 }
22
23 var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
24 if (bytesRead == 0)
25 {
26 // EOF
27 break ;
28 }
29
30 // Keep track of the amount of buffered bytes.
31 bytesBuffered += bytesRead;
32 var linePosition = -1;
33
34 do
35 {
36 // Look for a EOL in the buffered data.
37 linePosition = Array.IndexOf(buffer, (byte)'
' , bytesConsumed,
38 bytesBuffered - bytesConsumed);
39
40 if (linePosition >= 0)
41 {
42 // Calculate the length of the line based on the offset.
43 var lineLength = linePosition - bytesConsumed;
44
45 // Process the line.
46 ProcessLine(buffer, bytesConsumed, lineLength);
47
48 // Move the bytesConsumed to skip past the line consumed (including
).
49 bytesConsumed += lineLength + 1;
50 }
51 }
52 while (linePosition >= 0);
53 }
54 }
View Code div>
Pipe
The Pipe class can be used to create a PipeWriter/PipeReader
pair. All data written to PipeWriter
can be used for PipeReader
:
var pipe = new Pipe(); PipeReader reader = pipe.Reader; PipeWriter writer = pipe.Writer;
Pipe basic usage
1 async Task ProcessLinesAsync(Socket socket)
2 {
3 var pipe = new Pipe();
4 Task writing = FillPipeAsync(socket, pipe.Writer);
5 Task reading = ReadPipeAsync(pipe.Reader);
6
7 await Task.WhenAll(reading, writing);
8 }
9
10 async Task FillPipeAsync(Socket socket, PipeWriter writer)
11 {
12 const int minimumBufferSize = 512;
13
14 while (true)
15 {
16 // Allocate at least 512 bytes from the PipeWriter.
17 Memory<byte> memory = writer.GetMemory(minimumBufferSize);
18 try
19 {
20 int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
21 if (bytesRead == 0)
22 {
23 break;
24 }
25 // Tell the PipeWriter how much was read from the Socket.
26 writer.Advance(bytesRead);
27 }
28 catch (Exception ex)
29 {
30 LogError(ex);
31 break;
32 }
33
34 // Make the data available to the PipeReader.
35 FlushResult result = await writer.FlushAsync();
36
37 if (result.IsCompleted)
38 {
39 break;
40 }
41 }
42
43 // By completing PipeWriter, tell the PipeReader that there's no more data coming.
44 await writer.CompleteAsync();
45 }
46
47 async Task ReadPipeAsync(PipeReader reader)
48 {
49 while (true)
50 {
51 ReadResult result = await reader.ReadAsync();
52 ReadOnlySequence<byte> buffer = result.Buffer;
53
54 while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
55 {
56 // Process the line.
57 ProcessLine(line);
58 }
59
60 // Tell the PipeReader how much of the buffer has been consumed.
61 reader.AdvanceTo(buffer.Start, buffer.End);
62
63 // Stop reading if there's no more data coming.
64 if (result.IsCompleted)
65 {
66 break;
67 }
68 }
69
70 // Mark the PipeReader as complete.
71 await reader.CompleteAsync();
72 }
73
74 bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
75 {
76 // Look for a EOL in the buffer.
77 SequencePosition? position = buffer.PositionOf((byte)' ' );
78
79 if (position == null)
80 {
81 line = default;
82 return false;
83 }
84
85 // Skip the line + the .
86 line = buffer.Slice(0, position.Value);
87 buffer = buffer.Slice(buffer.GetPosition(1 span>, position.Value));
88 return true;
89 }
View Code div>
There are two cycles:
FillPipeAsync
Read fromSocket
and write toPipeWriter
.ReadPipeAsync
Read and analyze the incoming line fromPipeReader
.
No explicit buffer is allocated. All buffer management is entrusted to PipeReader
and PipeWriter
. Delegating buffer management makes it easier to use code to focus on business logic.
In the first loop:
- Call PipeWriter.GetMemory(Int32) to get memory from the basic writer.
- Call PipeWriter.Advance(Int32) to tell
PipeWriter
how much data has been written into the buffer . - Call PipeWriter.FlushAsync to make the data available to
PipeReader
.
In the second loop, PipeReader
is used by PipeWriter
The buffer for writing. The buffer comes from a socket. Call to PipeReader.ReadAsync
:
-
Returns ReadResult containing two important pieces of information:
- Data read in the form of
ReadOnlySequence
. - The boolean
IsCompleted
indicates whether the end of data (EOF) has been reached.
- Data read in the form of
Find the end of line (EOL) separator and analyze the line after :
- This logic processes the buffer to skip the processed content.
- Call
PipeReader.AdvanceTo
to inform thatPipeReader
has been consumed and checked How much data was collected.
The reader and writer loop ends by calling Complete
. Complete
Make the basic pipeline release its allocated memory.
Backpressure and flow control< /h3>
Ideally, reading and analysis can work together:
- The write thread uses the data from the network and puts it in the buffer.
- The analysis thread is responsible for constructing the appropriate data structure.
In general, analysis takes longer than just copying data blocks from the network: span>
- The reading thread is ahead of the analysis thread.
- The reading thread must slow down or allocate more memory to store data for the analysis thread.
For optimal performance, a balance needs to be struck between frequent pauses and allocating more memory.
In order to solve the above problems, Pipe
provides two settings to control data flow: span>
- PauseWriterThreshold: Determine how much data should be buffered before calling FlushAsync to pause.
- ResumeWriterThreshold: Make sure that the reader must observe before resuming the call to
PipeWriter.FlushAsync
How much data.
PipeWriter.FlushAsync:
- When the amount of data in
Pipe
exceedsPauseWriterThreshold
, an incompleteValueTask
is returned. - When it is lower than
ResumeWriterThreshold
, return the completeValueTask
.
Using two values prevents rapid looping, which can happen if only one value is used .
async Task ProcessLinesAsync(NetworkStream stream) { var buffer = new byte[1024]; await stream.ReadAsync(buffer, 0, buffer.Length); // Process a single line from the buffer ProcessLine(buffer); }
1 async Task ProcessLinesAsync(NetworkStream stream)
2 {
3 byte[] buffer = ArrayPool<byte>.Shared.Rent(1024 );
4 var bytesBuffered = 0;
5 var bytesConsumed = 0;
6
7 while (true)
8 {
9 // Calculate the amount of bytes remaining in the buffer.
10 var bytesRemaining = buffer.Length - bytesBuffered;
11
12 if (bytesRemaining == 0)
13 {
14 // Double the buffer size and copy the previously buffered data into the new buffer.
15 var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
16 Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
17 // Return the old buffer to the pool.
18 ArrayPool<byte>.Shared.Return(buffer);
19 buffer = newBuffer;
20 bytesRemaining = buffer.Length - bytesBuffered;
21 }
22
23 var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
24 if (bytesRead == 0)
25 {
26 // EOF
27 break;
28 }
29
30 // Keep track of the amount of buffered bytes.
31 bytesBuffered += bytesRead;
32 var linePosition = -1;
33
34 do
35 {
36 // Look for a EOL in the buffered data.
37 linePosition = Array.IndexOf(buffer, (byte)‘ ‘, bytesConsumed,
38 bytesBuffered - bytesConsumed);
39
40 if (linePosition >= 0)
41 {
42 // Calculate the length of the line based on the offset.
43 var lineLength = linePosition - bytesConsumed;
44
45 // Process the line.
46 ProcessLine(buffer, bytesConsumed, lineLength);
47
48 // Move the bytesConsumed to skip past the line consumed (including ).
49 bytesConsumed += lineLength + 1;
50 }
51 }
52 while (linePosition >= 0);
53 }
54 }
View Code
1 async Task ProcessLinesAsync(NetworkStream stream)
2 {
3 byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
4 var bytesBuffered = 0;
5 var bytesConsumed = 0;
6
7 while (true)
8 {
9 // Calculate the amount of bytes remaining in the buffer.
10 var bytesRemaining = buffer.Length - bytesBuffered;
11
12 if (bytesRemaining == 0)
13 {
14 // Double the buffer size and copy the previously buffered data into the new buffer.
15 var newBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length * 2);
16 Buffer.BlockCopy(buffer, 0, newBuffer, 0, buffer.Length);
17 // Return the old buffer to the pool.
18 ArrayPool<byte>.Shared.Return(buffer);
19 buffer = newBuffer;
20 bytesRemaining = buffer.Length - bytesBuffered;
21 }
22
23 var bytesRead = await stream.ReadAsync(buffer, bytesBuffered, bytesRemaining);
24 if (bytesRead == 0)
25 {
26 // EOF
27 break;
28 }
29
30 // Keep track of the amount of buffered bytes.
31 bytesBuffered += bytesRead;
32 var linePosition = -1;
33
34 do
35 {
36 // Look for a EOL in the buffered data.
37 linePosition = Array.IndexOf(buffer, (byte)‘ ‘, bytesConsumed,
38 bytesBuffered - bytesConsumed);
39
40 if (linePosition >= 0)
41 {
42 // Calculate the length of the line based on the offset.
43 var lineLength = linePosition - bytesConsumed;
44
45 // Process the line.
46 ProcessLine(buffer, bytesConsumed, lineLength);
47
48 // Move the bytesConsumed to skip past the line consumed (including ).
49 bytesConsumed += lineLength + 1;
50 }
51 }
52 while (linePosition >= 0);
53 }
54 }
var pipe = new Pipe(); PipeReader reader = pipe.Reader; PipeWriter writer = pipe.Writer;
1 async Task ProcessLinesAsync(Socket socket)
2 {
3 var pipe = new Pipe();
4 Task writing = FillPipeAsync(socket, pipe.Writer);
5 Task reading = ReadPipeAsync(pipe.Reader);
6
7 await Task.WhenAll(reading, writing);
8 }
9
10 async Task FillPipeAsync(Socket socket, PipeWriter writer)
11 {
12 const int minimumBufferSize = 512;
13
14 while (true)
15 {
16 // Allocate at least 512 bytes from the PipeWriter.
17 Memory<byte> memory = writer.GetMemory(minimumBufferSize);
18 try
19 {
20 int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
21 if (bytesRead == 0)
22 {
23 break;
24 }
25 // Tell the PipeWriter how much was read from the Socket.
26 writer.Advance(bytesRead);
27 }
28 catch (Exception ex)
29 {
30 LogError(ex);
31 break;
32 }
33
34 // Make the data available to the PipeReader.
35 FlushResult result = await writer.FlushAsync();
36
37 if (result.IsCompleted)
38 {
39 break;
40 }
41 }
42
43 // By completing PipeWriter, tell the PipeReader that there‘s no more data coming.
44 await writer.CompleteAsync();
45 }
46
47 async Task ReadPipeAsync(PipeReader reader)
48 {
49 while (true)
50 {
51 ReadResult result = await reader.ReadAsync();
52 ReadOnlySequence<byte> buffer = result.Buffer;
53
54 while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
55 {
56 // Process the line.
57 ProcessLine(line);
58 }
59
60 // Tell the PipeReader how much of the buffer has been consumed.
61 reader.AdvanceTo(buffer.Start, buffer.End);
62
63 // Stop reading if there‘s no more data coming.
64 if (result.IsCompleted)
65 {
66 break;
67 }
68 }
69
70 // Mark the PipeReader as complete.
71 await reader.CompleteAsync();
72 }
73
74 bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
75 {
76 // Look for a EOL in the buffer.
77 SequencePosition? position = buffer.PositionOf((byte)‘ ‘);
78
79 if (position == null)
80 {
81 line = default;
82 return false;
83 }
84
85 // Skip the line + the .
86 line = buffer.Slice(0, position.Value);
87 buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
88 return true;
89 }
View Code
1 async Task ProcessLinesAsync(Socket socket)
2 {
3 var pipe = new Pipe();
4 Task writing = FillPipeAsync(socket, pipe.Writer);
5 Task reading = ReadPipeAsync(pipe.Reader);
6
7 await Task.WhenAll(reading, writing);
8 }
9
10 async Task FillPipeAsync(Socket socket, PipeWriter writer)
11 {
12 const int minimumBufferSize = 512;
13
14 while (true)
15 {
16 // Allocate at least 512 bytes from the PipeWriter.
17 Memory<byte> memory = writer.GetMemory(minimumBufferSize);
18 try
19 {
20 int bytesRead = await socket.ReceiveAsync(memory, SocketFlags.None);
21 if (bytesRead == 0)
22 {
23 break;
24 }
25 // Tell the PipeWriter how much was read from the Socket.
26 writer.Advance(bytesRead);
27 }
28 catch (Exception ex)
29 {
30 LogError(ex);
31 break;
32 }
33
34 // Make the data available to the PipeReader.
35 FlushResult result = await writer.FlushAsync();
36
37 if (result.IsCompleted)
38 {
39 break;
40 }
41 }
42
43 // By completing PipeWriter, tell the PipeReader that there‘s no more data coming.
44 await writer.CompleteAsync();
45 }
46
47 async Task ReadPipeAsync(PipeReader reader)
48 {
49 while (true)
50 {
51 ReadResult result = await reader.ReadAsync();
52 ReadOnlySequence<byte> buffer = result.Buffer;
53
54 while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
55 {
56 // Process the line.
57 ProcessLine(line);
58 }
59
60 // Tell the PipeReader how much of the buffer has been consumed.
61 reader.AdvanceTo(buffer.Start, buffer.End);
62
63 // Stop reading if there‘s no more data coming.
64 if (result.IsCompleted)
65 {
66 break;
67 }
68 }
69
70 // Mark the PipeReader as complete.
71 await reader.CompleteAsync();
72 }
73
74 bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line)
75 {
76 // Look for a EOL in the buffer.
77 SequencePosition? position = buffer.PositionOf((byte)‘ ‘);
78
79 if (position == null)
80 {
81 line = default;
82 return false;
83 }
84
85 // Skip the line + the .
86 line = buffer.Slice(0, position.Value);
87 buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
88 return true;
89 }
WordPress database error: [Table 'yf99682.wp_s6mz6tyggq_comments' doesn't exist]SELECT SQL_CALC_FOUND_ROWS wp_s6mz6tyggq_comments.comment_ID FROM wp_s6mz6tyggq_comments WHERE ( comment_approved = '1' ) AND comment_post_ID = 2616 ORDER BY wp_s6mz6tyggq_comments.comment_date_gmt ASC, wp_s6mz6tyggq_comments.comment_ID ASC