|
3 | 3 | #include "application.h" |
4 | 4 | #include <async_wrap-inl.h> |
5 | 5 | #include <debug_utils-inl.h> |
| 6 | +#include <ngtcp2/ngtcp2.h> |
6 | 7 | #include <node_bob.h> |
7 | 8 | #include <node_sockaddr-inl.h> |
8 | 9 | #include <uv.h> |
@@ -95,6 +96,20 @@ Maybe<Session::Application_Options> Session::Application_Options::From( |
95 | 96 | return Just<Application_Options>(options); |
96 | 97 | } |
97 | 98 |
|
| 99 | +// ============================================================================ |
| 100 | + |
| 101 | +std::string Session::Application::StreamData::ToString() const { |
| 102 | + DebugIndentScope indent; |
| 103 | + auto prefix = indent.Prefix(); |
| 104 | + std::string res("{"); |
| 105 | + res += prefix + "count: " + std::to_string(count); |
| 106 | + res += prefix + "remaining: " + std::to_string(remaining); |
| 107 | + res += prefix + "id: " + std::to_string(id); |
| 108 | + res += prefix + "fin: " + std::to_string(fin); |
| 109 | + res += indent.Close(); |
| 110 | + return res; |
| 111 | +} |
| 112 | + |
98 | 113 | Session::Application::Application(Session* session, const Options& options) |
99 | 114 | : session_(session) {} |
100 | 115 |
|
@@ -189,7 +204,7 @@ Packet* Session::Application::CreateStreamDataPacket() { |
189 | 204 | return Packet::Create(env(), |
190 | 205 | session_->endpoint_.get(), |
191 | 206 | session_->remote_address_, |
192 | | - ngtcp2_conn_get_max_tx_udp_payload_size(*session_), |
| 207 | + session_->max_packet_size(), |
193 | 208 | "stream data"); |
194 | 209 | } |
195 | 210 |
|
@@ -221,141 +236,188 @@ void Session::Application::StreamReset(Stream* stream, |
221 | 236 | } |
222 | 237 |
|
223 | 238 | void Session::Application::SendPendingData() { |
| 239 | + static constexpr size_t kMaxPackets = 32; |
224 | 240 | Debug(session_, "Application sending pending data"); |
225 | 241 | PathStorage path; |
| 242 | + StreamData stream_data; |
226 | 243 |
|
227 | | - Packet* packet = nullptr; |
228 | | - uint8_t* pos = nullptr; |
229 | | - int err = 0; |
| 244 | + // The maximum size of packet to create. |
| 245 | + const size_t max_packet_size = session_->max_packet_size(); |
230 | 246 |
|
231 | | - size_t maxPacketCount = std::min(static_cast<size_t>(64000), |
232 | | - ngtcp2_conn_get_send_quantum(*session_)); |
233 | | - size_t packetSendCount = 0; |
| 247 | + // The maximum number of packets to send in this call to SendPendingData. |
| 248 | + const size_t max_packet_count = std::min( |
| 249 | + kMaxPackets, ngtcp2_conn_get_send_quantum(*session_) / max_packet_size); |
234 | 250 |
|
235 | | - const auto updateTimer = [&] { |
236 | | - Debug(session_, "Application updating the session timer"); |
237 | | - ngtcp2_conn_update_pkt_tx_time(*session_, uv_hrtime()); |
238 | | - session_->UpdateTimer(); |
239 | | - }; |
| 251 | + // The number of packets that have been sent in this call to SendPendingData. |
| 252 | + size_t packet_send_count = 0; |
240 | 253 |
|
241 | | - const auto congestionLimited = [&](auto packet) { |
242 | | - auto len = pos - ngtcp2_vec(*packet).base; |
243 | | - // We are either congestion limited or done. |
244 | | - if (len) { |
245 | | - // Some data was serialized into the packet. We need to send it. |
246 | | - packet->Truncate(len); |
247 | | - session_->Send(std::move(packet), path); |
248 | | - } |
| 254 | + Packet* packet = nullptr; |
| 255 | + uint8_t* pos = nullptr; |
| 256 | + uint8_t* begin = nullptr; |
249 | 257 |
|
250 | | - updateTimer(); |
| 258 | + auto ensure_packet = [&] { |
| 259 | + if (packet == nullptr) { |
| 260 | + packet = CreateStreamDataPacket(); |
| 261 | + if (packet == nullptr) return false; |
| 262 | + pos = begin = ngtcp2_vec(*packet).base; |
| 263 | + } |
| 264 | + DCHECK_NOT_NULL(packet); |
| 265 | + DCHECK_NOT_NULL(pos); |
| 266 | + DCHECK_NOT_NULL(begin); |
| 267 | + return true; |
251 | 268 | }; |
252 | 269 |
|
| 270 | + // We're going to enter a loop here to prepare and send no more than |
| 271 | + // max_packet_count packets. |
253 | 272 | for (;;) { |
254 | | - ssize_t ndatalen; |
255 | | - StreamData stream_data; |
256 | | - |
257 | | - err = GetStreamData(&stream_data); |
| 273 | + // ndatalen is the amount of stream data that was accepted into the packet. |
| 274 | + ssize_t ndatalen = 0; |
258 | 275 |
|
259 | | - if (err < 0) { |
| 276 | + // Make sure we have a packet to write data into. |
| 277 | + if (!ensure_packet()) { |
| 278 | + Debug(session_, "Failed to create packet for stream data"); |
| 279 | + // Doh! Could not create a packet. Time to bail. |
260 | 280 | session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL); |
261 | 281 | return session_->Close(Session::CloseMethod::SILENT); |
262 | 282 | } |
263 | 283 |
|
264 | | - if (packet == nullptr) { |
265 | | - packet = CreateStreamDataPacket(); |
266 | | - if (packet == nullptr) { |
267 | | - session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL); |
268 | | - return session_->Close(Session::CloseMethod::SILENT); |
269 | | - } |
270 | | - pos = ngtcp2_vec(*packet).base; |
| 284 | + // The stream_data is the next block of data from the application stream. |
| 285 | + if (GetStreamData(&stream_data) < 0) { |
| 286 | + Debug(session_, "Application failed to get stream data"); |
| 287 | + session_->last_error_ = QuicError::ForNgtcp2Error(NGTCP2_ERR_INTERNAL); |
| 288 | + packet->Done(UV_ECANCELED); |
| 289 | + return session_->Close(Session::CloseMethod::SILENT); |
271 | 290 | } |
272 | 291 |
|
273 | | - ssize_t nwrite = WriteVStream(&path, pos, &ndatalen, stream_data); |
| 292 | + // If we got here, we were at least successful in checking for stream data. |
| 293 | + // There might not be any stream data to send. |
| 294 | + Debug(session_, "Application using stream data: %s", stream_data); |
| 295 | + |
| 296 | + // Awesome, let's write our packet! |
| 297 | + ssize_t nwrite = |
| 298 | + WriteVStream(&path, pos, &ndatalen, max_packet_size, stream_data); |
| 299 | + Debug(session_, "Application accepted %zu bytes into packet", ndatalen); |
274 | 300 |
|
275 | | - if (nwrite <= 0) { |
| 301 | + // A negative nwrite value indicates either an error or that there is more |
| 302 | + // data to write into the packet. |
| 303 | + if (nwrite < 0) { |
276 | 304 | switch (nwrite) { |
277 | | - case 0: |
278 | | - if (stream_data.id >= 0) ResumeStream(stream_data.id); |
279 | | - return congestionLimited(std::move(packet)); |
280 | 305 | case NGTCP2_ERR_STREAM_DATA_BLOCKED: { |
281 | | - session().StreamDataBlocked(stream_data.id); |
282 | | - if (session().max_data_left() == 0) { |
283 | | - if (stream_data.id >= 0) ResumeStream(stream_data.id); |
284 | | - return congestionLimited(std::move(packet)); |
285 | | - } |
286 | | - CHECK_LE(ndatalen, 0); |
| 306 | + // We could not write any data for this stream into the packet because |
| 307 | + // the flow control for the stream itself indicates that the stream |
| 308 | + // is blocked. We'll skip and move on to the next stream. |
| 309 | + // ndatalen = -1 means that no stream data was accepted into the |
| 310 | + // packet, which is what we want here. |
| 311 | + DCHECK_EQ(ndatalen, -1); |
| 312 | + DCHECK(stream_data.stream); |
| 313 | + session_->StreamDataBlocked(stream_data.id); |
287 | 314 | continue; |
288 | 315 | } |
289 | 316 | case NGTCP2_ERR_STREAM_SHUT_WR: { |
290 | | - // Indicates that the writable side of the stream has been closed |
| 317 | + // Indicates that the writable side of the stream should be closed |
291 | 318 | // locally or the stream is being reset. In either case, we can't send |
292 | 319 | // any stream data! |
293 | | - CHECK_GE(stream_data.id, 0); |
294 | | - // We need to notify the stream that the writable side has been closed |
295 | | - // and no more outbound data can be sent. |
296 | | - CHECK_LE(ndatalen, 0); |
297 | | - auto stream = session_->FindStream(stream_data.id); |
298 | | - if (stream) stream->EndWritable(); |
| 320 | + Debug(session_, |
| 321 | + "Stream %" PRIi64 " should be closed for writing", |
| 322 | + stream_data.id); |
| 323 | + // ndatalen = -1 means that no stream data was accepted into the |
| 324 | + // packet, which is what we want here. |
| 325 | + DCHECK_EQ(ndatalen, -1); |
| 326 | + DCHECK(stream_data.stream); |
| 327 | + stream_data.stream->EndWritable(); |
299 | 328 | continue; |
300 | 329 | } |
301 | 330 | case NGTCP2_ERR_WRITE_MORE: { |
302 | | - CHECK_GT(ndatalen, 0); |
303 | | - if (!StreamCommit(&stream_data, ndatalen)) return session_->Close(); |
304 | | - pos += ndatalen; |
| 331 | + // This return value indicates that we should call into WriteVStream |
| 332 | + // again to write more data into the same packet. |
| 333 | + Debug(session_, "Application should write more to packet"); |
| 334 | + DCHECK_GE(ndatalen, 0); |
| 335 | + if (!StreamCommit(&stream_data, ndatalen)) { |
| 336 | + packet->Done(UV_ECANCELED); |
| 337 | + return session_->Close(CloseMethod::SILENT); |
| 338 | + } |
305 | 339 | continue; |
306 | 340 | } |
307 | 341 | } |
308 | 342 |
|
309 | | - packet->Done(UV_ECANCELED); |
310 | | - session_->last_error_ = QuicError::ForNgtcp2Error(nwrite); |
311 | | - return session_->Close(Session::CloseMethod::SILENT); |
312 | | - } |
313 | | - |
314 | | - pos += nwrite; |
315 | | - if (ndatalen > 0 && !StreamCommit(&stream_data, ndatalen)) { |
316 | | - // Since we are closing the session here, we don't worry about updating |
317 | | - // the pkt tx time. The failed StreamCommit should have updated the |
318 | | - // last_error_ appropriately. |
| 343 | + // Some other type of error happened. |
| 344 | + DCHECK_EQ(ndatalen, -1); |
| 345 | + Debug(session_, |
| 346 | + "Application encountered error while writing packet: %s", |
| 347 | + ngtcp2_strerror(nwrite)); |
| 348 | + session_->SetLastError(QuicError::ForNgtcp2Error(nwrite)); |
319 | 349 | packet->Done(UV_ECANCELED); |
320 | 350 | return session_->Close(Session::CloseMethod::SILENT); |
| 351 | + } else if (ndatalen >= 0) { |
| 352 | + // We wrote some data into the packet. We need to update the flow control |
| 353 | + // by committing the data. |
| 354 | + if (!StreamCommit(&stream_data, ndatalen)) { |
| 355 | + packet->Done(UV_ECANCELED); |
| 356 | + return session_->Close(CloseMethod::SILENT); |
| 357 | + } |
321 | 358 | } |
322 | 359 |
|
323 | | - if (stream_data.id >= 0 && ndatalen < 0) ResumeStream(stream_data.id); |
| 360 | + // When nwrite is zero, it means we are congestion limited. |
| 361 | + // We should stop trying to send additional packets. |
| 362 | + if (nwrite == 0) { |
| 363 | + Debug(session_, "Congestion limited."); |
| 364 | + // There might be a partial packet already prepared. If so, send it. |
| 365 | + size_t datalen = pos - begin; |
| 366 | + if (datalen) { |
| 367 | + Debug(session_, "Packet has %zu bytes to send", datalen); |
| 368 | + // At least some data had been written into the packet. We should send |
| 369 | + // it. |
| 370 | + packet->Truncate(datalen); |
| 371 | + session_->Send(packet, path); |
| 372 | + } else { |
| 373 | + packet->Done(UV_ECANCELED); |
| 374 | + } |
324 | 375 |
|
325 | | - packet->Truncate(nwrite); |
326 | | - session_->Send(std::move(packet), path); |
| 376 | + // If there was stream data selected, we should reschedule it to try |
| 377 | + // sending again. |
| 378 | + if (stream_data.id >= 0) ResumeStream(stream_data.id); |
327 | 379 |
|
328 | | - pos = nullptr; |
| 380 | + return session_->UpdatePacketTxTime(); |
| 381 | + } |
329 | 382 |
|
330 | | - if (++packetSendCount == maxPacketCount) { |
331 | | - break; |
| 383 | + // At this point we have a packet prepared to send. |
| 384 | + pos += nwrite; |
| 385 | + size_t datalen = pos - begin; |
| 386 | + Debug(session_, "Sending packet with %zu bytes", datalen); |
| 387 | + packet->Truncate(datalen); |
| 388 | + session_->Send(packet, path); |
| 389 | + |
| 390 | + // If we have sent the maximum number of packets, we're done. |
| 391 | + if (++packet_send_count == max_packet_count) { |
| 392 | + return session_->UpdatePacketTxTime(); |
332 | 393 | } |
333 | | - } |
334 | 394 |
|
335 | | - updateTimer(); |
| 395 | + // Prepare to loop back around to prepare a new packet. |
| 396 | + packet = nullptr; |
| 397 | + pos = begin = nullptr; |
| 398 | + } |
336 | 399 | } |
337 | 400 |
|
338 | 401 | ssize_t Session::Application::WriteVStream(PathStorage* path, |
339 | | - uint8_t* buf, |
| 402 | + uint8_t* dest, |
340 | 403 | ssize_t* ndatalen, |
| 404 | + size_t max_packet_size, |
341 | 405 | const StreamData& stream_data) { |
342 | | - CHECK_LE(stream_data.count, kMaxVectorCount); |
343 | | - uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_NONE; |
344 | | - if (stream_data.remaining > 0) flags |= NGTCP2_WRITE_STREAM_FLAG_MORE; |
| 406 | + DCHECK_LE(stream_data.count, kMaxVectorCount); |
| 407 | + uint32_t flags = NGTCP2_WRITE_STREAM_FLAG_MORE; |
345 | 408 | if (stream_data.fin) flags |= NGTCP2_WRITE_STREAM_FLAG_FIN; |
346 | | - ssize_t ret = ngtcp2_conn_writev_stream( |
347 | | - *session_, |
348 | | - &path->path, |
349 | | - nullptr, |
350 | | - buf, |
351 | | - ngtcp2_conn_get_max_tx_udp_payload_size(*session_), |
352 | | - ndatalen, |
353 | | - flags, |
354 | | - stream_data.id, |
355 | | - stream_data.buf, |
356 | | - stream_data.count, |
357 | | - uv_hrtime()); |
358 | | - return ret; |
| 409 | + ngtcp2_pkt_info pi; |
| 410 | + return ngtcp2_conn_writev_stream(*session_, |
| 411 | + &path->path, |
| 412 | + &pi, |
| 413 | + dest, |
| 414 | + max_packet_size, |
| 415 | + ndatalen, |
| 416 | + flags, |
| 417 | + stream_data.id, |
| 418 | + stream_data.buf, |
| 419 | + stream_data.count, |
| 420 | + uv_hrtime()); |
359 | 421 | } |
360 | 422 |
|
361 | 423 | // The DefaultApplication is the default implementation of Session::Application |
|
0 commit comments