@@ -70,67 +70,69 @@ async Task<IResult> IngestAsync(HttpContext context)
7070
7171 async Task < IResult > IngestCompactFormatAsync ( HttpContext context )
7272 {
73+ byte [ ] ? rented = null ;
74+
7375 try
7476 {
75- var cts = CancellationTokenSource . CreateLinkedTokenSource ( context . RequestAborted ) ;
77+ using var cts = CancellationTokenSource . CreateLinkedTokenSource ( context . RequestAborted ) ;
7678 cts . CancelAfter ( TimeSpan . FromSeconds ( 5 ) ) ;
77-
79+
7880 var requestApiKey = GetApiKey ( context . Request ) ;
79- var log = _forwardingChannels . GetForwardingChannel ( requestApiKey ) ;
80-
81+ var log = _forwardingChannels . GetForwardingChannel ( requestApiKey ) ;
82+
8183 // Add one for the extra newline that we have to insert at the end of batches.
8284 var bufferSize = _config . Connection . BatchSizeLimitBytes + 1 ;
83- var rented = ArrayPool < byte > . Shared . Rent ( bufferSize ) ;
84- var buffer = rented [ .. bufferSize ] ;
85+ rented = ArrayPool < byte > . Shared . Rent ( bufferSize ) ;
86+ var buffer = new ArraySegment < byte > ( rented , 0 , bufferSize ) ;
8587 var writeHead = 0 ;
8688 var readHead = 0 ;
87-
89+
8890 var done = false ;
8991 while ( ! done )
9092 {
9193 // Fill the memory buffer from as much of the incoming request payload as possible; buffering in memory increases the
9294 // size of write batches.
9395 while ( ! done )
9496 {
95- var remaining = buffer . Length - 1 - writeHead ;
97+ var remaining = buffer . Count - 1 - writeHead ;
9698 if ( remaining == 0 )
9799 {
98100 IngestionLog . ForClient ( context . Connection . RemoteIpAddress )
99101 . Error ( "An incoming request exceeded the configured batch size limit" ) ;
100102 return Error ( HttpStatusCode . RequestEntityTooLarge , "the request is too large to process" ) ;
101103 }
102-
104+
103105 var read = await context . Request . Body . ReadAsync ( buffer . AsMemory ( writeHead , remaining ) , cts . Token ) ;
104106 if ( read == 0 )
105107 {
106108 done = true ;
107109 }
108-
110+
109111 writeHead += read ;
110-
112+
111113 // Ingested batches must be terminated with `\n`, but this isn't an API requirement.
112- if ( done && writeHead > 0 && writeHead < buffer . Length && buffer [ writeHead - 1 ] != ( byte ) '\n ' )
114+ if ( done && writeHead > 0 && writeHead < buffer . Count && buffer [ writeHead - 1 ] != ( byte ) '\n ' )
113115 {
114116 buffer [ writeHead ] = ( byte ) '\n ' ;
115117 writeHead += 1 ;
116118 }
117119 }
118-
120+
119121 // Validate what we read, marking out a batch of one or more complete newline-delimited events.
120122 var batchStart = readHead ;
121123 var batchEnd = readHead ;
122124 while ( batchEnd < writeHead )
123125 {
124126 var eventStart = batchEnd ;
125127 var nlIndex = buffer . AsSpan ( ) [ eventStart ..] . IndexOf ( ( byte ) '\n ' ) ;
126-
128+
127129 if ( nlIndex == - 1 )
128130 {
129131 break ;
130132 }
131-
133+
132134 var eventEnd = eventStart + nlIndex + 1 ;
133-
135+
134136 batchEnd = eventEnd ;
135137 readHead = batchEnd ;
136138
@@ -142,12 +144,12 @@ async Task<IResult> IngestCompactFormatAsync(HttpContext context)
142144 return Error ( HttpStatusCode . BadRequest , $ "Payload validation failed: { error } .") ;
143145 }
144146 }
145-
147+
146148 if ( batchStart != batchEnd )
147149 {
148- await Write ( log , ArrayPool < byte > . Shared , buffer , batchStart ..batchEnd , cts . Token ) ;
150+ await log . WriteAsync ( buffer [ batchStart ..batchEnd ] , cts . Token ) ;
149151 }
150-
152+
151153 // Copy any unprocessed data into our buffer and continue
152154 if ( ! done && readHead != 0 )
153155 {
@@ -157,10 +159,7 @@ async Task<IResult> IngestCompactFormatAsync(HttpContext context)
157159 writeHead = retain ;
158160 }
159161 }
160-
161- // Exception cases are handled by `Write`
162- ArrayPool < byte > . Shared . Return ( rented ) ;
163-
162+
164163 return SuccessfulIngestion ( ) ;
165164 }
166165 catch ( Exception ex )
@@ -169,6 +168,13 @@ async Task<IResult> IngestCompactFormatAsync(HttpContext context)
169168 . Error ( ex , "Ingestion failed" ) ;
170169 return Error ( HttpStatusCode . InternalServerError , "Ingestion failed." ) ;
171170 }
171+ finally
172+ {
173+ if ( rented != null )
174+ {
175+ ArrayPool < byte > . Shared . Return ( rented ) ;
176+ }
177+ }
172178 }
173179
174180 static bool DefaultedBoolQuery ( HttpRequest request , string queryParameterName )
@@ -263,19 +269,6 @@ bool ValidateClef(Span<byte> evt, [NotNullWhen(false)] out string? errorFragment
263269 errorFragment = null ;
264270 return true ;
265271 }
266-
267- static async Task Write ( ForwardingChannel forwardingChannel , ArrayPool < byte > pool , byte [ ] storage , Range range , CancellationToken cancellationToken )
268- {
269- try
270- {
271- await forwardingChannel . WriteAsync ( storage , range , cancellationToken ) ;
272- }
273- catch
274- {
275- pool . Return ( storage ) ;
276- throw ;
277- }
278- }
279272
280273 static IResult Error ( HttpStatusCode statusCode , string message )
281274 {
0 commit comments