Skip to content

Commit db3b0ad

Browse files
committed
quic: handle control streams correctly
Signed-off-by: James M Snell <[email protected]>
1 parent ba59928 commit db3b0ad

File tree

5 files changed

+120
-55
lines changed

5 files changed

+120
-55
lines changed

src/quic/application.cc

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -429,13 +429,47 @@ class DefaultApplication final : public Session::Application {
429429
// of the namespace.
430430
using Application::Application; // NOLINT
431431

432-
bool ReceiveStreamData(Stream* stream,
432+
bool ReceiveStreamData(int64_t stream_id,
433433
const uint8_t* data,
434434
size_t datalen,
435-
Stream::ReceiveDataFlags flags) override {
435+
const Stream::ReceiveDataFlags& flags,
436+
void* stream_user_data) override {
436437
Debug(&session(), "Default application receiving stream data");
437-
DCHECK_NOT_NULL(stream);
438-
if (!stream->is_destroyed()) stream->ReceiveData(data, datalen, flags);
438+
439+
BaseObjectPtr<Stream> stream;
440+
if (stream_user_data == nullptr) {
441+
// This is the first time we're seeing this stream. Implicitly create it.
442+
stream = session().CreateStream(stream_id);
443+
if (!stream) {
444+
// We couldn't actually create the stream for whatever reason.
445+
Debug(&session(), "Default application failed to create new stream");
446+
return false;
447+
}
448+
// Let the JavaScript side know about the stream before we emit any data.
449+
session().EmitStream(stream);
450+
} else {
451+
stream = BaseObjectPtr<Stream>(Stream::From(stream_user_data));
452+
if (!stream) {
453+
Debug(&session(),
454+
"Default application failed to get existing stream "
455+
"from user data");
456+
return false;
457+
}
458+
}
459+
460+
DCHECK(stream);
461+
462+
// If the stream is destroyed, we are going to silently ignore the
463+
// data here.
464+
if (stream->is_destroyed()) {
465+
Debug(&session(),
466+
"Data received for a stream that is already "
467+
"destroyed. Ignoring.");
468+
return true;
469+
}
470+
471+
// Now we can actually receive the data! Woo!
472+
stream->ReceiveData(data, datalen, flags);
439473
return true;
440474
}
441475

src/quic/application.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@ class Session::Application : public MemoryRetainer {
2727
// Application. The only additional processing the Session does is to
2828
// automatically adjust the session-level flow control window. It is up to
2929
// the Application to do the same for the Stream-level flow control.
30-
virtual bool ReceiveStreamData(Stream* stream,
30+
virtual bool ReceiveStreamData(int64_t stream_id,
3131
const uint8_t* data,
3232
size_t datalen,
33-
Stream::ReceiveDataFlags flags) = 0;
33+
const Stream::ReceiveDataFlags& flags,
34+
void* stream_user_data) = 0;
3435

3536
// Session will forward all data acknowledgements for a stream to the
3637
// Application.

src/quic/http3.cc

Lines changed: 52 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
#include "sessionticket.h"
1919

2020
namespace node::quic {
21-
namespace {
2221

2322
struct Http3HeadersTraits {
2423
typedef nghttp3_nv nv_t;
@@ -116,13 +115,15 @@ class Http3Application final : public Session::Application {
116115
return CreateAndBindControlStreams();
117116
}
118117

119-
bool ReceiveStreamData(Stream* stream,
118+
bool ReceiveStreamData(int64_t stream_id,
120119
const uint8_t* data,
121120
size_t datalen,
122-
Stream::ReceiveDataFlags flags) override {
121+
const Stream::ReceiveDataFlags& flags,
122+
void* unused) override {
123123
Debug(&session(), "HTTP/3 application received %zu bytes of data", datalen);
124+
124125
ssize_t nread = nghttp3_conn_read_stream(
125-
*this, stream->id(), data, datalen, flags.fin ? 1 : 0);
126+
*this, stream_id, data, datalen, flags.fin ? 1 : 0);
126127

127128
if (nread < 0) {
128129
Debug(&session(),
@@ -134,7 +135,7 @@ class Http3Application final : public Session::Application {
134135
Debug(&session(),
135136
"Extending stream and connection offset by %zd bytes",
136137
nread);
137-
session().ExtendStreamOffset(stream->id(), nread);
138+
session().ExtendStreamOffset(stream_id, nread);
138139
session().ExtendOffset(nread);
139140

140141
return true;
@@ -614,11 +615,13 @@ class Http3Application final : public Session::Application {
614615
}
615616

616617
#define NGHTTP3_CALLBACK_SCOPE(name) \
617-
auto name = From(conn, conn_user_data); \
618-
if (name->is_destroyed()) [[unlikely]] { \
618+
auto ptr = From(conn, conn_user_data); \
619+
CHECK_NOT_NULL(ptr); \
620+
auto& name = *ptr; \
621+
if (name.is_destroyed()) [[unlikely]] { \
619622
return NGHTTP3_ERR_CALLBACK_FAILURE; \
620623
} \
621-
NgHttp3CallbackScope scope(name->env());
624+
NgHttp3CallbackScope scope(name.env());
622625

623626
static nghttp3_ssize on_read_data_callback(nghttp3_conn* conn,
624627
int64_t stream_id,
@@ -638,7 +641,7 @@ class Http3Application final : public Session::Application {
638641
NGHTTP3_CALLBACK_SCOPE(app);
639642
auto stream = From(stream_id, stream_user_data);
640643
if (stream == nullptr) return NGHTTP3_ERR_CALLBACK_FAILURE;
641-
app->AcknowledgeStreamData(stream, static_cast<size_t>(datalen));
644+
app.AcknowledgeStreamData(stream, static_cast<size_t>(datalen));
642645
return NGTCP2_SUCCESS;
643646
}
644647

@@ -650,7 +653,7 @@ class Http3Application final : public Session::Application {
650653
NGHTTP3_CALLBACK_SCOPE(app);
651654
auto stream = From(stream_id, stream_user_data);
652655
if (stream == nullptr) return NGHTTP3_ERR_CALLBACK_FAILURE;
653-
app->OnStreamClose(stream, app_error_code);
656+
app.OnStreamClose(stream, app_error_code);
654657
return NGTCP2_SUCCESS;
655658
}
656659

@@ -661,10 +664,31 @@ class Http3Application final : public Session::Application {
661664
void* conn_user_data,
662665
void* stream_user_data) {
663666
NGHTTP3_CALLBACK_SCOPE(app);
664-
auto stream = From(stream_id, stream_user_data);
665-
if (stream == nullptr) return NGHTTP3_ERR_CALLBACK_FAILURE;
666-
app->OnReceiveData(stream,
667-
nghttp3_vec{const_cast<uint8_t*>(data), datalen});
667+
auto& session = app.session();
668+
BaseObjectPtr<Stream> stream;
669+
if (stream_user_data == nullptr) {
670+
// The stream does not exist yet! Create it
671+
stream = session.CreateStream(stream_id);
672+
if (!stream) {
673+
Debug(&session, "HTTP3 application failed to create new stream");
674+
return NGHTTP3_ERR_CALLBACK_FAILURE;
675+
}
676+
// Memoize the stream instance so we can look it up next time.
677+
nghttp3_conn_set_stream_user_data(conn, stream_id, stream.get());
678+
session.EmitStream(stream);
679+
} else {
680+
stream = BaseObjectPtr<Stream>(From(stream_id, stream_user_data));
681+
if (!stream) {
682+
Debug(&session,
683+
"HTTP3 application failed to get existing stream "
684+
"from user data");
685+
return NGHTTP3_ERR_CALLBACK_FAILURE;
686+
}
687+
}
688+
689+
DCHECK(stream);
690+
app.OnReceiveData(stream.get(),
691+
nghttp3_vec{const_cast<uint8_t*>(data), datalen});
668692
return NGTCP2_SUCCESS;
669693
}
670694

@@ -676,7 +700,7 @@ class Http3Application final : public Session::Application {
676700
NGHTTP3_CALLBACK_SCOPE(app);
677701
auto stream = From(stream_id, stream_user_data);
678702
if (stream == nullptr) return NGHTTP3_ERR_CALLBACK_FAILURE;
679-
app->OnDeferredConsume(stream, consumed);
703+
app.OnDeferredConsume(stream, consumed);
680704
return NGTCP2_SUCCESS;
681705
}
682706

@@ -687,7 +711,7 @@ class Http3Application final : public Session::Application {
687711
NGHTTP3_CALLBACK_SCOPE(app);
688712
auto stream = From(stream_id, stream_user_data);
689713
if (stream == nullptr) return NGHTTP3_ERR_CALLBACK_FAILURE;
690-
app->OnBeginHeaders(stream);
714+
app.OnBeginHeaders(stream);
691715
return NGTCP2_SUCCESS;
692716
}
693717

@@ -703,8 +727,8 @@ class Http3Application final : public Session::Application {
703727
auto stream = From(stream_id, stream_user_data);
704728
if (stream == nullptr) return NGHTTP3_ERR_CALLBACK_FAILURE;
705729
if (Http3Header::IsZeroLength(token, name, value)) return NGTCP2_SUCCESS;
706-
app->OnReceiveHeader(stream,
707-
Http3Header(app->env(), token, name, value, flags));
730+
app.OnReceiveHeader(stream,
731+
Http3Header(app.env(), token, name, value, flags));
708732
return NGTCP2_SUCCESS;
709733
}
710734

@@ -716,7 +740,7 @@ class Http3Application final : public Session::Application {
716740
NGHTTP3_CALLBACK_SCOPE(app);
717741
auto stream = From(stream_id, stream_user_data);
718742
if (stream == nullptr) return NGHTTP3_ERR_CALLBACK_FAILURE;
719-
app->OnEndHeaders(stream, fin);
743+
app.OnEndHeaders(stream, fin);
720744
return NGTCP2_SUCCESS;
721745
}
722746

@@ -727,7 +751,7 @@ class Http3Application final : public Session::Application {
727751
NGHTTP3_CALLBACK_SCOPE(app);
728752
auto stream = From(stream_id, stream_user_data);
729753
if (stream == nullptr) return NGHTTP3_ERR_CALLBACK_FAILURE;
730-
app->OnBeginTrailers(stream);
754+
app.OnBeginTrailers(stream);
731755
return NGTCP2_SUCCESS;
732756
}
733757

@@ -743,8 +767,8 @@ class Http3Application final : public Session::Application {
743767
auto stream = From(stream_id, stream_user_data);
744768
if (stream == nullptr) return NGHTTP3_ERR_CALLBACK_FAILURE;
745769
if (Http3Header::IsZeroLength(token, name, value)) return NGTCP2_SUCCESS;
746-
app->OnReceiveTrailer(stream,
747-
Http3Header(app->env(), token, name, value, flags));
770+
app.OnReceiveTrailer(stream,
771+
Http3Header(app.env(), token, name, value, flags));
748772
return NGTCP2_SUCCESS;
749773
}
750774

@@ -756,7 +780,7 @@ class Http3Application final : public Session::Application {
756780
NGHTTP3_CALLBACK_SCOPE(app);
757781
auto stream = From(stream_id, stream_user_data);
758782
if (stream == nullptr) return NGHTTP3_ERR_CALLBACK_FAILURE;
759-
app->OnEndTrailers(stream, fin);
783+
app.OnEndTrailers(stream, fin);
760784
return NGTCP2_SUCCESS;
761785
}
762786

@@ -767,7 +791,7 @@ class Http3Application final : public Session::Application {
767791
NGHTTP3_CALLBACK_SCOPE(app);
768792
auto stream = From(stream_id, stream_user_data);
769793
if (stream == nullptr) return NGHTTP3_ERR_CALLBACK_FAILURE;
770-
app->OnEndStream(stream);
794+
app.OnEndStream(stream);
771795
return NGTCP2_SUCCESS;
772796
}
773797

@@ -779,7 +803,7 @@ class Http3Application final : public Session::Application {
779803
NGHTTP3_CALLBACK_SCOPE(app);
780804
auto stream = From(stream_id, stream_user_data);
781805
if (stream == nullptr) return NGHTTP3_ERR_CALLBACK_FAILURE;
782-
app->OnStopSending(stream, app_error_code);
806+
app.OnStopSending(stream, app_error_code);
783807
return NGTCP2_SUCCESS;
784808
}
785809

@@ -791,21 +815,21 @@ class Http3Application final : public Session::Application {
791815
NGHTTP3_CALLBACK_SCOPE(app);
792816
auto stream = From(stream_id, stream_user_data);
793817
if (stream == nullptr) return NGHTTP3_ERR_CALLBACK_FAILURE;
794-
app->OnResetStream(stream, app_error_code);
818+
app.OnResetStream(stream, app_error_code);
795819
return NGTCP2_SUCCESS;
796820
}
797821

798822
static int on_shutdown(nghttp3_conn* conn, int64_t id, void* conn_user_data) {
799823
NGHTTP3_CALLBACK_SCOPE(app);
800-
app->OnShutdown();
824+
app.OnShutdown();
801825
return NGTCP2_SUCCESS;
802826
}
803827

804828
static int on_receive_settings(nghttp3_conn* conn,
805829
const nghttp3_settings* settings,
806830
void* conn_user_data) {
807831
NGHTTP3_CALLBACK_SCOPE(app);
808-
app->OnReceiveSettings(settings);
832+
app.OnReceiveSettings(settings);
809833
return NGTCP2_SUCCESS;
810834
}
811835

@@ -825,7 +849,6 @@ class Http3Application final : public Session::Application {
825849
on_shutdown,
826850
on_receive_settings};
827851
};
828-
} // namespace
829852

830853
std::unique_ptr<Session::Application> createHttp3Application(
831854
Session* session, const Session::Application_Options& options) {

src/quic/session.cc

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2061,27 +2061,33 @@ struct Session::Impl {
20612061
void* user_data,
20622062
void* stream_user_data) {
20632063
NGTCP2_CALLBACK_SCOPE(session)
2064-
Stream::ReceiveDataFlags f;
2065-
f.early = flags & NGTCP2_STREAM_DATA_FLAG_0RTT;
2066-
f.fin = flags & NGTCP2_STREAM_DATA_FLAG_FIN;
2067-
2068-
if (stream_user_data == nullptr) {
2069-
// We have an implicitly created stream.
2070-
auto stream = session->CreateStream(stream_id);
2071-
if (stream) {
2072-
session->EmitStream(stream);
2073-
session->application().ReceiveStreamData(
2074-
stream.get(), data, datalen, f);
2075-
} else {
2076-
return ngtcp2_conn_shutdown_stream(
2077-
*session, 0, stream_id, NGTCP2_APP_NOERROR) == 0
2078-
? NGTCP2_SUCCESS
2079-
: NGTCP2_ERR_CALLBACK_FAILURE;
2080-
}
2081-
} else {
2082-
session->application().ReceiveStreamData(
2083-
Stream::From(stream_user_data), data, datalen, f);
2064+
Stream::ReceiveDataFlags data_flags{
2065+
// The fin flag indicates that this is the last chunk of data we will
2066+
// receive on this stream.
2067+
.fin = (flags & NGTCP2_STREAM_DATA_FLAG_FIN) ==
2068+
NGTCP2_STREAM_DATA_FLAG_FIN,
2069+
// Stream data is early if it is received before the TLS handshake is
2070+
// complete.
2071+
.early = (flags & NGTCP2_STREAM_DATA_FLAG_0RTT) ==
2072+
NGTCP2_STREAM_DATA_FLAG_0RTT,
2073+
};
2074+
2075+
// We received data for a stream! What we don't know yet at this point
2076+
// is whether the application wants us to treat this as a control stream
2077+
// data (something the application will handle on its own) or a user stream
2078+
// data (something that we should create a Stream handle for that is passed
2079+
// out to JavaScript). HTTP3, for instance, will generally create three
2080+
// control stream in either direction and we want to make sure those are
2081+
// never exposed to users and that we don't waste time creating Stream
2082+
// handles for them. So, what we do here is pass the stream data on to the
2083+
// application for processing. If it ends up being a user stream, the
2084+
// application will handle creating the Stream handle and passing that off
2085+
// to the JavaScript side.
2086+
if (!session->application().ReceiveStreamData(
2087+
stream_id, data, datalen, data_flags, stream_user_data)) {
2088+
return NGTCP2_ERR_CALLBACK_FAILURE;
20842089
}
2090+
20852091
return NGTCP2_SUCCESS;
20862092
}
20872093

src/quic/session.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,7 @@ class Session final : public AsyncWrap, private SessionTicket::AppData::Source {
427427

428428
friend class Application;
429429
friend class DefaultApplication;
430+
friend class Http3Application;
430431
friend class Endpoint;
431432
friend struct Impl;
432433
friend struct MaybeCloseConnectionScope;

0 commit comments

Comments
 (0)