Skip to content

Commit e2c590c

Browse files
ac-patelcopybara-github
authored andcommitted
[PH2] Defer assigning ID to streams till the first write.
PiperOrigin-RevId: 815642269
1 parent ecb17d4 commit e2c590c

File tree

8 files changed

+256
-173
lines changed

8 files changed

+256
-173
lines changed

src/core/ext/transport/chttp2/transport/header_assembler.h

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ class HeaderAssembler {
146146

147147
// Validate
148148
GRPC_DCHECK_EQ(is_ready_, true);
149+
GRPC_DCHECK_GT(stream_id_, 0u)
150+
<< "Stream id must be set before reading metadata.";
149151

150152
// Generate the gRPC Metadata from buffer_
151153
// RFC9113 : A receiver MUST terminate the connection with a connection
@@ -198,12 +200,11 @@ class HeaderAssembler {
198200
// This value MUST be checked before calling ReadMetadata()
199201
bool IsReady() const { return is_ready_; }
200202

201-
explicit HeaderAssembler(const uint32_t stream_id,
202-
const bool allow_true_binary_metadata_acked)
203+
explicit HeaderAssembler(const bool allow_true_binary_metadata_acked)
203204
: header_in_progress_(false),
204205
is_ready_(false),
205206
allow_true_binary_metadata_acked_(allow_true_binary_metadata_acked),
206-
stream_id_(stream_id) {}
207+
stream_id_(0) {}
207208

208209
~HeaderAssembler() = default;
209210

@@ -212,6 +213,12 @@ class HeaderAssembler {
212213
HeaderAssembler(const HeaderAssembler&) = delete;
213214
HeaderAssembler& operator=(const HeaderAssembler&) = delete;
214215

216+
void SetStreamId(const uint32_t stream_id) {
217+
GRPC_DCHECK_EQ(stream_id_, 0u);
218+
GRPC_DCHECK_NE(stream_id, 0u);
219+
stream_id_ = stream_id;
220+
}
221+
215222
private:
216223
void Cleanup() {
217224
buffer_.Clear();
@@ -222,7 +229,7 @@ class HeaderAssembler {
222229
bool header_in_progress_;
223230
bool is_ready_;
224231
GRPC_UNUSED const bool allow_true_binary_metadata_acked_;
225-
const uint32_t stream_id_;
232+
uint32_t stream_id_;
226233
SliceBuffer buffer_;
227234
};
228235

@@ -244,6 +251,9 @@ class HeaderDisassembler {
244251

245252
Http2Frame GetNextFrame(const uint32_t max_frame_length,
246253
bool& out_end_headers) {
254+
GRPC_DCHECK_GT(stream_id_, 0u) << "Stream id must be set before getting "
255+
"next Header/Continuation frame.";
256+
247257
if (buffer_.Length() == 0 || is_done_) {
248258
GRPC_DCHECK(false) << "Calling code must check size using HasMoreData() "
249259
"before GetNextFrame()";
@@ -273,10 +283,9 @@ class HeaderDisassembler {
273283

274284
// A separate HeaderDisassembler object MUST be made for Initial Metadata and
275285
// Trailing Metadata
276-
explicit HeaderDisassembler(const uint32_t stream_id,
277-
const bool is_trailing_metadata,
286+
explicit HeaderDisassembler(const bool is_trailing_metadata,
278287
const bool allow_true_binary_metadata_peer)
279-
: stream_id_(stream_id),
288+
: stream_id_(0),
280289
end_stream_(is_trailing_metadata),
281290
did_send_header_frame_(false),
282291
is_done_(false),
@@ -290,9 +299,14 @@ class HeaderDisassembler {
290299
HeaderDisassembler& operator=(const HeaderDisassembler&) = delete;
291300

292301
size_t TestOnlyGetMainBufferLength() const { return buffer_.Length(); }
302+
void SetStreamId(const uint32_t stream_id) {
303+
GRPC_DCHECK_EQ(stream_id_, 0u);
304+
GRPC_DCHECK_NE(stream_id, 0u);
305+
stream_id_ = stream_id;
306+
}
293307

294308
private:
295-
const uint32_t stream_id_;
309+
uint32_t stream_id_;
296310
const bool end_stream_;
297311
bool did_send_header_frame_;
298312
bool is_done_; // Protect against the same disassembler from being used twice

src/core/ext/transport/chttp2/transport/http2_client_transport.cc

Lines changed: 86 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "absl/strings/string_view.h"
3232
#include "src/core/call/call_spine.h"
3333
#include "src/core/call/message.h"
34+
#include "src/core/call/metadata.h"
3435
#include "src/core/call/metadata_batch.h"
3536
#include "src/core/ext/transport/chttp2/transport/flow_control.h"
3637
#include "src/core/ext/transport/chttp2/transport/flow_control_manager.h"
@@ -855,6 +856,25 @@ auto Http2ClientTransport::MultiplexerLoop() {
855856
<< stream->GetStreamId()
856857
<< " is_closed_for_writes = " << stream->IsClosedForWrites();
857858

859+
if (stream->GetStreamId() == kInvalidStreamId) {
860+
// TODO(akshitpatel) : [PH2][P4] : We will waste a stream id in
861+
// the rare scenario where the stream is aborted before it can be
862+
// written to. This is a possible area to optimize in future.
863+
absl::Status status =
864+
self->AssignStreamIdAndAddToStreamList(stream);
865+
if (!status.ok()) {
866+
GRPC_HTTP2_CLIENT_DLOG
867+
<< "Http2ClientTransport MultiplexerLoop "
868+
"Failed to assign stream id and add to stream list for "
869+
"stream: "
870+
<< stream.get() << " closing this stream.";
871+
self->BeginCloseStream(
872+
stream, /*reset_stream_error_code=*/std::nullopt,
873+
CancelledServerMetadataFromStatus(status));
874+
continue;
875+
}
876+
}
877+
858878
if (GPR_LIKELY(!stream->IsClosedForWrites())) {
859879
auto stream_frames = self->DequeueStreamFrames(stream);
860880
if (GPR_UNLIKELY(!stream_frames.ok())) {
@@ -909,6 +929,28 @@ auto Http2ClientTransport::OnMultiplexerLoopEnded() {
909929
};
910930
}
911931

932+
absl::Status Http2ClientTransport::AssignStreamIdAndAddToStreamList(
933+
RefCountedPtr<Stream> stream) {
934+
absl::StatusOr<uint32_t> next_stream_id = NextStreamId();
935+
if (!next_stream_id.ok()) {
936+
GRPC_HTTP2_CLIENT_DLOG
937+
<< "Http2ClientTransport AssignStreamIdAndAddToStreamList "
938+
"Failed to get next stream id for stream: "
939+
<< stream.get();
940+
return std::move(next_stream_id).status();
941+
}
942+
GRPC_HTTP2_CLIENT_DLOG
943+
<< "Http2ClientTransport AssignStreamIdAndAddToStreamList "
944+
"Assigned stream id: "
945+
<< next_stream_id.value() << " to stream: " << stream.get();
946+
stream->SetStreamId(next_stream_id.value());
947+
{
948+
MutexLock lock(&transport_mutex_);
949+
stream_list_.emplace(next_stream_id.value(), stream);
950+
}
951+
return absl::OkStatus();
952+
}
953+
912954
///////////////////////////////////////////////////////////////////////////////
913955
// Constructor Destructor
914956

@@ -919,7 +961,7 @@ Http2ClientTransport::Http2ClientTransport(
919961
: channelz::DataSource(http2::CreateChannelzSocketNode(
920962
endpoint.GetEventEngineEndpoint(), channel_args)),
921963
endpoint_(std::move(endpoint)),
922-
stream_id_mutex_(/*Initial Stream Id*/ 1),
964+
next_stream_id_(/*Initial Stream ID*/ 1),
923965
should_reset_ping_clock_(false),
924966
incoming_header_in_progress_(false),
925967
incoming_header_end_stream_(false),
@@ -1308,37 +1350,26 @@ bool Http2ClientTransport::SetOnDone(CallHandler call_handler,
13081350
}
13091351

13101352
std::optional<RefCountedPtr<Stream>> Http2ClientTransport::MakeStream(
1311-
CallHandler call_handler,
1312-
InterActivityMutex<uint32_t>::Lock& next_stream_id_lock) {
1353+
CallHandler call_handler) {
13131354
// https://datatracker.ietf.org/doc/html/rfc9113#name-stream-identifiers
1314-
// TODO(akshitpatel) : [PH2][P1] : Probably do not need this lock. This
1315-
// function is always called under the stream_id_mutex_. The issue is the
1316-
// OnDone needs to be synchronous and hence InterActivityMutex might not be
1317-
// an option to protect the stream_list_.
1318-
MutexLock lock(&transport_mutex_);
1319-
1320-
std::optional<uint32_t> stream_id = NextStreamId(next_stream_id_lock);
1321-
if (!stream_id.has_value()) {
1322-
return std::nullopt;
1355+
RefCountedPtr<Stream> stream;
1356+
{
1357+
MutexLock lock(&transport_mutex_);
1358+
stream = MakeRefCounted<Stream>(
1359+
call_handler, settings_.peer().allow_true_binary_metadata(),
1360+
settings_.acked().allow_true_binary_metadata(), flow_control_);
13231361
}
1324-
1325-
RefCountedPtr<Stream> stream = MakeRefCounted<Stream>(
1326-
call_handler, stream_id.value(),
1327-
settings_.peer().allow_true_binary_metadata(),
1328-
settings_.acked().allow_true_binary_metadata(), flow_control_);
13291362
const bool on_done_added = SetOnDone(call_handler, stream);
13301363
if (!on_done_added) return std::nullopt;
1331-
stream_list_.emplace(stream_id.value(), stream);
13321364
return stream;
13331365
}
13341366

13351367
///////////////////////////////////////////////////////////////////////////////
13361368
// Call Spine related operations
13371369

1338-
auto Http2ClientTransport::CallOutboundLoop(
1339-
CallHandler call_handler, RefCountedPtr<Stream> stream,
1340-
InterActivityMutex<uint32_t>::Lock lock /* Locked stream_id_mutex */,
1341-
ClientMetadataHandle metadata) {
1370+
auto Http2ClientTransport::CallOutboundLoop(CallHandler call_handler,
1371+
RefCountedPtr<Stream> stream,
1372+
ClientMetadataHandle metadata) {
13421373
GRPC_HTTP2_CLIENT_DLOG << "Http2ClientTransport CallOutboundLoop";
13431374
GRPC_DCHECK(stream != nullptr);
13441375

@@ -1384,7 +1415,7 @@ auto Http2ClientTransport::CallOutboundLoop(
13841415
"Ph2CallOutboundLoop",
13851416
TrySeq(
13861417
send_initial_metadata(),
1387-
[call_handler, send_message, lock = std::move(lock)]() {
1418+
[call_handler, send_message]() {
13881419
// The lock will be released once the promise is constructed from
13891420
// this factory. ForEach will be polled after the lock is
13901421
// released.
@@ -1409,43 +1440,38 @@ void Http2ClientTransport::StartCall(CallHandler call_handler) {
14091440
GRPC_HTTP2_CLIENT_DLOG << "Http2ClientTransport StartCall Begin";
14101441
call_handler.SpawnGuarded(
14111442
"OutboundLoop",
1412-
TrySeq(
1413-
call_handler.PullClientInitialMetadata(),
1414-
[self = RefAsSubclass<Http2ClientTransport>()](
1415-
ClientMetadataHandle metadata) {
1416-
// Lock the stream_id_mutex_
1417-
return Staple(self->stream_id_mutex_.Acquire(),
1418-
std::move(metadata));
1419-
},
1420-
[self = RefAsSubclass<Http2ClientTransport>(),
1421-
call_handler](auto args /* Locked stream_id_mutex */) mutable {
1422-
// For a gRPC Client, we only need to check the
1423-
// MAX_CONCURRENT_STREAMS setting compliance at the time of
1424-
// sending (that is write path). A gRPC Client will never
1425-
// receive a stream initiated by a server, so we dont have to
1426-
// check MAX_CONCURRENT_STREAMS compliance on the Read-Path.
1427-
//
1428-
// TODO(tjagtap) : [PH2][P1] Check for MAX_CONCURRENT_STREAMS
1429-
// sent by peer before making a stream. Decide behaviour if we are
1430-
// crossing this threshold.
1431-
//
1432-
// TODO(tjagtap) : [PH2][P1] : For a server we will have to do
1433-
// this for incoming streams only. If a server receives more streams
1434-
// from a client than is allowed by the clients settings, whether or
1435-
// not we should fail is debatable.
1436-
std::optional<RefCountedPtr<Stream>> stream =
1437-
self->MakeStream(call_handler, std::get<0>(args));
1438-
return If(
1439-
stream.has_value(),
1440-
[self, call_handler, stream, args = std::move(args)]() mutable {
1441-
return Map(
1442-
self->CallOutboundLoop(call_handler, stream.value(),
1443-
std::move(std::get<0>(args)),
1444-
std::move(std::get<1>(args))),
1445-
[](absl::Status status) { return status; });
1446-
},
1447-
[]() { return absl::InternalError("Failed to make stream"); });
1448-
}));
1443+
TrySeq(call_handler.PullClientInitialMetadata(),
1444+
[self = RefAsSubclass<Http2ClientTransport>(),
1445+
call_handler](ClientMetadataHandle metadata) mutable {
1446+
// For a gRPC Client, we only need to check the
1447+
// MAX_CONCURRENT_STREAMS setting compliance at the time of
1448+
// sending (that is write path). A gRPC Client will never
1449+
// receive a stream initiated by a server, so we dont have to
1450+
// check MAX_CONCURRENT_STREAMS compliance on the Read-Path.
1451+
//
1452+
// TODO(tjagtap) : [PH2][P1] Check for MAX_CONCURRENT_STREAMS
1453+
// sent by peer before making a stream. Decide behaviour if we
1454+
// are crossing this threshold.
1455+
//
1456+
// TODO(tjagtap) : [PH2][P1] : For a server we will have to do
1457+
// this for incoming streams only. If a server receives more
1458+
// streams from a client than is allowed by the clients settings,
1459+
// whether or not we should fail is debatable.
1460+
std::optional<RefCountedPtr<Stream>> stream =
1461+
self->MakeStream(call_handler);
1462+
return If(
1463+
stream.has_value(),
1464+
[self, call_handler, stream,
1465+
initial_metadata = std::move(metadata)]() mutable {
1466+
return Map(
1467+
self->CallOutboundLoop(call_handler, stream.value(),
1468+
std::move(initial_metadata)),
1469+
[](absl::Status status) { return status; });
1470+
},
1471+
[]() {
1472+
return absl::InternalError("Failed to make stream");
1473+
});
1474+
}));
14491475
GRPC_HTTP2_CLIENT_DLOG << "Http2ClientTransport StartCall End";
14501476
}
14511477

src/core/ext/transport/chttp2/transport/http2_client_transport.h

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,6 @@ class Http2ClientTransport final : public ClientTransport,
234234
// Returns a promise to fetch data from the callhandler and pass it further
235235
// down towards the endpoint.
236236
auto CallOutboundLoop(CallHandler call_handler, RefCountedPtr<Stream> stream,
237-
InterActivityMutex<uint32_t>::Lock lock,
238237
ClientMetadataHandle metadata);
239238

240239
// Force triggers a transport write cycle
@@ -280,11 +279,10 @@ class Http2ClientTransport final : public ClientTransport,
280279
return stream_list_.size();
281280
}
282281

283-
std::optional<uint32_t> NextStreamId(
284-
InterActivityMutex<uint32_t>::Lock& next_stream_id_lock)
285-
ABSL_EXCLUSIVE_LOCKS_REQUIRED(transport_mutex_) {
286-
const uint32_t stream_id = *next_stream_id_lock;
287-
if (stream_id > RFC9113::kMaxStreamId31Bit) {
282+
// Returns the next stream id. If the next stream id is not available, it
283+
// returns std::nullopt. MUST be called from the transport party.
284+
absl::StatusOr<uint32_t> NextStreamId() {
285+
if (next_stream_id_ > RFC9113::kMaxStreamId31Bit) {
288286
// TODO(tjagtap) : [PH2][P3] : Handle case if transport runs out of stream
289287
// ids
290288
// RFC9113 : Stream identifiers cannot be reused. Long-lived connections
@@ -294,29 +292,36 @@ class Http2ClientTransport final : public ClientTransport,
294292
// that is unable to establish a new stream identifier can send a GOAWAY
295293
// frame so that the client is forced to open a new connection for new
296294
// streams.
297-
return std::nullopt;
295+
return absl::ResourceExhaustedError("No more stream ids available");
298296
}
299297
// TODO(akshitpatel) : [PH2][P3] : There is a channel arg to delay
300298
// starting new streams instead of failing them. This needs to be
301299
// implemented.
302-
if (GetActiveStreamCount() >= settings_.peer().max_concurrent_streams()) {
303-
return std::nullopt;
300+
{
301+
MutexLock lock(&transport_mutex_);
302+
if (GetActiveStreamCount() >= settings_.peer().max_concurrent_streams()) {
303+
return absl::ResourceExhaustedError("Reached max concurrent streams");
304+
}
304305
}
306+
305307
// RFC9113 : Streams initiated by a client MUST use odd-numbered stream
306308
// identifiers.
307-
(*next_stream_id_lock) += 2;
308-
return stream_id;
309+
return std::exchange(next_stream_id_, next_stream_id_ + 2);
309310
}
310311

312+
// Returns the next stream id without incrementing it. MUST be called from the
313+
// transport party.
314+
uint32_t PeekNextStreamId() const { return next_stream_id_; }
315+
316+
absl::Status AssignStreamIdAndAddToStreamList(RefCountedPtr<Stream> stream);
317+
311318
Mutex transport_mutex_;
312319
// TODO(tjagtap) : [PH2][P2] : Add to map in StartCall and clean this
313320
// mapping up in the on_done of the CallInitiator or CallHandler
314321
absl::flat_hash_map<uint32_t, RefCountedPtr<Stream>> stream_list_
315322
ABSL_GUARDED_BY(transport_mutex_);
316323

317-
// Mutex to preserve the order of headers being sent out for new streams.
318-
// This also tracks the stream_id for creating new streams.
319-
InterActivityMutex<uint32_t> stream_id_mutex_;
324+
uint32_t next_stream_id_;
320325
HPackCompressor encoder_;
321326
HPackParser parser_;
322327
bool is_transport_closed_ ABSL_GUARDED_BY(transport_mutex_) = false;
@@ -335,9 +340,8 @@ class Http2ClientTransport final : public ClientTransport,
335340
ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(transport_mutex_){
336341
"http2_client", GRPC_CHANNEL_READY};
337342

338-
std::optional<RefCountedPtr<Stream>> MakeStream(
339-
CallHandler call_handler,
340-
InterActivityMutex<uint32_t>::Lock& next_stream_id_lock);
343+
// Runs on the call party.
344+
std::optional<RefCountedPtr<Stream>> MakeStream(CallHandler call_handler);
341345

342346
struct CloseStreamArgs {
343347
bool close_reads;

0 commit comments

Comments
 (0)