Skip to content

Commit 51f8215

Browse files
ac-patelcopybara-github
authored andcommitted
[PH2][StreamQ] Adding priority for streams (grpc#40456)
Streams now track the priority in the writable stream list based on what type of messages have been enqueued. Closes grpc#40456 COPYBARA_INTEGRATE_REVIEW=grpc#40456 from ac-patel:streamq8 9510b31 PiperOrigin-RevId: 797595198
1 parent 9c0c847 commit 51f8215

File tree

6 files changed

+319
-245
lines changed

6 files changed

+319
-245
lines changed

src/core/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8182,6 +8182,7 @@ grpc_cc_library(
81828182
"message",
81838183
"message_assembler",
81848184
"metadata_batch",
8185+
"writable_streams",
81858186
],
81868187
)
81878188

src/core/ext/transport/chttp2/transport/http2_client_transport.cc

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ namespace grpc_core {
6262
namespace http2 {
6363

6464
using grpc_event_engine::experimental::EventEngine;
65+
using EnqueueResult = StreamDataQueue<ClientMetadataHandle>::EnqueueResult;
6566

6667
// Experimental : This is just the initial skeleton of class
6768
// and it is functions. The code will be written iteratively.
@@ -781,7 +782,8 @@ auto Http2ClientTransport::StreamMultiplexerLoop() {
781782
LOG(ERROR) << "Failed to enqueue stream " << stream_id
782783
<< " with status: " << status;
783784
// Close transport if we fail to enqueue stream.
784-
return absl::InternalError("Failed to enqueue stream");
785+
return absl::UnavailableError(
786+
"Failed to enqueue stream to writable stream list");
785787
}
786788
} else if (GPR_UNLIKELY(!result.ok())) {
787789
// Close the corresponding stream if we fail to dequeue frames from
@@ -1137,12 +1139,12 @@ auto Http2ClientTransport::CallOutboundLoop(
11371139
stream != nullptr,
11381140
[self, stream, message = std::move(message), stream_id]() mutable {
11391141
return TrySeq(stream->EnqueueMessage(std::move(message)),
1140-
[self, stream_id](bool became_writable) {
1142+
[self, stream_id](const EnqueueResult result) {
11411143
GRPC_HTTP2_CLIENT_DLOG
11421144
<< "Http2ClientTransport CallOutboundLoop "
11431145
"Enqueued Message";
11441146
return self->MaybeAddStreamToWritableStreamList(
1145-
stream_id, became_writable);
1147+
stream_id, result);
11461148
});
11471149
},
11481150
[]() {
@@ -1160,12 +1162,12 @@ auto Http2ClientTransport::CallOutboundLoop(
11601162
[self, stream, metadata = std::move(metadata), stream_id]() mutable {
11611163
return TrySeq(
11621164
stream->EnqueueInitialMetadata(std::move(metadata)),
1163-
[self, stream_id](bool became_writable) {
1165+
[self, stream_id](const EnqueueResult result) {
11641166
GRPC_HTTP2_CLIENT_DLOG
11651167
<< "Http2ClientTransport CallOutboundLoop "
11661168
"Enqueued Initial Metadata";
1167-
return self->MaybeAddStreamToWritableStreamList(
1168-
stream_id, became_writable);
1169+
return self->MaybeAddStreamToWritableStreamList(stream_id,
1170+
result);
11691171
},
11701172
[stream] {
11711173
// TODO(akshitpatel) : [PH2][P2] : Think how to handle stream
@@ -1188,12 +1190,12 @@ auto Http2ClientTransport::CallOutboundLoop(
11881190
stream != nullptr,
11891191
[self, stream, stream_id]() mutable {
11901192
return TrySeq(stream->EnqueueHalfClosed(),
1191-
[self, stream_id](bool became_writable) {
1193+
[self, stream_id](const EnqueueResult result) {
11921194
GRPC_HTTP2_CLIENT_DLOG
11931195
<< "Http2ClientTransport CallOutboundLoop "
11941196
"Enqueued Half Closed";
11951197
return self->MaybeAddStreamToWritableStreamList(
1196-
stream_id, became_writable);
1198+
stream_id, result);
11971199
});
11981200
},
11991201
[]() {

src/core/ext/transport/chttp2/transport/http2_client_transport.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -739,18 +739,20 @@ class Http2ClientTransport final : public ClientTransport {
739739

740740
WritableStreams writable_stream_list_;
741741

742-
auto MaybeAddStreamToWritableStreamList(const uint32_t stream_id,
743-
const bool became_writable) {
744-
if (became_writable) {
742+
absl::Status MaybeAddStreamToWritableStreamList(
743+
const uint32_t stream_id,
744+
const StreamDataQueue<ClientMetadataHandle>::EnqueueResult result) {
745+
if (result.became_writable) {
745746
GRPC_HTTP2_CLIENT_DLOG
746747
<< "Http2ClientTransport MaybeAddStreamToWritableStreamList "
747748
" Stream id: "
748749
<< stream_id << " became writable";
749-
absl::Status status = writable_stream_list_.Enqueue(
750-
stream_id, WritableStreams::StreamPriority::kDefault);
750+
absl::Status status =
751+
writable_stream_list_.Enqueue(stream_id, result.priority);
751752
if (!status.ok()) {
752753
return HandleError(Http2Status::Http2ConnectionError(
753-
Http2ErrorCode::kProtocolError, "Failed to enqueue stream"));
754+
Http2ErrorCode::kRefusedStream,
755+
"Failed to enqueue stream to writable stream list"));
754756
}
755757
}
756758
return absl::OkStatus();

src/core/ext/transport/chttp2/transport/stream_data_queue.h

Lines changed: 77 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include "absl/log/log.h"
2626
#include "src/core/ext/transport/chttp2/transport/header_assembler.h"
2727
#include "src/core/ext/transport/chttp2/transport/message_assembler.h"
28+
#include "src/core/ext/transport/chttp2/transport/writable_streams.h"
2829

2930
namespace grpc_core {
3031
namespace http2 {
@@ -36,11 +37,6 @@ namespace http2 {
3637
template <typename T>
3738
class SimpleQueue {
3839
public:
39-
struct EnqueueResult {
40-
absl::Status status;
41-
bool became_non_empty;
42-
};
43-
4440
explicit SimpleQueue(const uint32_t max_tokens) : max_tokens_(max_tokens) {}
4541
SimpleQueue(SimpleQueue&& rhs) = delete;
4642
SimpleQueue& operator=(SimpleQueue&& rhs) = delete;
@@ -54,7 +50,7 @@ class SimpleQueue {
5450
// with tokens = 0. Enqueues with tokens = 0 are primarily for sending
5551
// metadata as flow control does not apply to them. This function is NOT
5652
// thread safe.
57-
auto Enqueue(T& data, const uint32_t tokens) {
53+
Poll<absl::StatusOr<bool>> Enqueue(T& data, const uint32_t tokens) {
5854
return PollEnqueue(data, tokens);
5955
}
6056

@@ -80,7 +76,7 @@ class SimpleQueue {
8076
bool TestOnlyIsEmpty() const { return IsEmpty(); }
8177

8278
private:
83-
Poll<EnqueueResult> PollEnqueue(T& data, const uint32_t tokens) {
79+
Poll<absl::StatusOr<bool>> PollEnqueue(T& data, const uint32_t tokens) {
8480
GRPC_STREAM_DATA_QUEUE_DEBUG << "Enqueueing data. Data tokens: " << tokens;
8581
const uint32_t max_tokens_consumed_threshold =
8682
max_tokens_ >= tokens ? max_tokens_ - tokens : 0;
@@ -91,8 +87,7 @@ class SimpleQueue {
9187
GRPC_STREAM_DATA_QUEUE_DEBUG
9288
<< "Enqueue successful. Data tokens: " << tokens
9389
<< " Current tokens consumed: " << tokens_consumed_;
94-
return EnqueueResult{absl::OkStatus(),
95-
/*became_non_empty=*/queue_.size() == 1};
90+
return /*became_non_empty*/ (queue_.size() == 1);
9691
}
9792

9893
GRPC_STREAM_DATA_QUEUE_DEBUG
@@ -197,33 +192,40 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
197192
// end_stream set. If the stream needs to be half closed, the client should
198193
// enqueue a half close message.
199194

195+
struct EnqueueResult {
196+
bool became_writable;
197+
WritableStreams::StreamPriority priority;
198+
};
199+
200200
// Enqueue Initial Metadata.
201201
// 1. MUST be called at most once.
202202
// 2. This MUST be called before any messages are enqueued.
203203
// 3. MUST not be called after trailing metadata is enqueued.
204204
// 4. This function is thread safe.
205-
auto EnqueueInitialMetadata(MetadataHandle metadata) {
205+
auto EnqueueInitialMetadata(MetadataHandle&& metadata) {
206206
DCHECK(!is_initial_metadata_queued_);
207207
DCHECK(!is_trailing_metadata_or_half_close_queued_);
208208
DCHECK(metadata != nullptr);
209209
DCHECK(!is_reset_stream_queued_);
210210

211211
is_initial_metadata_queued_ = true;
212212
return [self = this->Ref(),
213-
entry = QueueEntry{InitialMetadataType{
214-
std::move(metadata)}}]() mutable -> Poll<absl::StatusOr<bool>> {
213+
entry = QueueEntry{InitialMetadataType{std::move(
214+
metadata)}}]() mutable -> Poll<absl::StatusOr<EnqueueResult>> {
215215
MutexLock lock(&self->mu_);
216-
auto result = self->queue_.Enqueue(entry, /*tokens=*/0);
216+
Poll<absl::StatusOr<bool>> result =
217+
self->queue_.Enqueue(entry, /*tokens=*/0);
217218
if (result.ready()) {
218219
GRPC_STREAM_DATA_QUEUE_DEBUG
219220
<< "Enqueued initial metadata for stream " << self->stream_id_
220-
<< " with status: " << result.value().status;
221-
if (result.value().status.ok()) {
222-
DCHECK(result.value().became_non_empty);
221+
<< " with status: " << result.value().status();
222+
if (result.value().ok()) {
223+
DCHECK(/*became_non_empty*/ result.value().value());
223224
return self->UpdateWritableStateLocked(
224-
result.value().became_non_empty);
225+
/*became_non_empty*/ result.value().value(),
226+
WritableStreams::StreamPriority::kDefault);
225227
}
226-
return result.value().status;
228+
return result.value().status();
227229
}
228230
return Pending{};
229231
};
@@ -233,27 +235,29 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
233235
// 1. MUST be called at most once.
234236
// 2. MUST be called only for a server.
235237
// 3. This function is thread safe.
236-
auto EnqueueTrailingMetadata(MetadataHandle metadata) {
238+
auto EnqueueTrailingMetadata(MetadataHandle&& metadata) {
237239
DCHECK(metadata != nullptr);
238240
DCHECK(!is_reset_stream_queued_);
239241
DCHECK(!is_client_);
240242
DCHECK(!is_trailing_metadata_or_half_close_queued_);
241243

242244
is_trailing_metadata_or_half_close_queued_ = true;
243245
return [self = this->Ref(),
244-
entry = QueueEntry{TrailingMetadataType{
245-
std::move(metadata)}}]() mutable -> Poll<absl::StatusOr<bool>> {
246+
entry = QueueEntry{TrailingMetadataType{std::move(
247+
metadata)}}]() mutable -> Poll<absl::StatusOr<EnqueueResult>> {
246248
MutexLock lock(&self->mu_);
247-
auto result = self->queue_.Enqueue(entry, /*tokens=*/0);
249+
Poll<absl::StatusOr<bool>> result =
250+
self->queue_.Enqueue(entry, /*tokens=*/0);
248251
if (result.ready()) {
249252
GRPC_STREAM_DATA_QUEUE_DEBUG
250253
<< "Enqueued trailing metadata for stream " << self->stream_id_
251-
<< " with status: " << result.value().status;
252-
if (result.value().status.ok()) {
254+
<< " with status: " << result.value().status();
255+
if (result.value().ok()) {
253256
return self->UpdateWritableStateLocked(
254-
result.value().became_non_empty);
257+
/*became_non_empty*/ result.value().value(),
258+
WritableStreams::StreamPriority::kStreamClosed);
255259
}
256-
return result.value().status;
260+
return result.value().status();
257261
}
258262
return Pending{};
259263
};
@@ -264,7 +268,7 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
264268
// 1. MUST be called after initial metadata is enqueued.
265269
// 2. MUST not be called after trailing metadata is enqueued.
266270
// 3. This function is thread safe.
267-
auto EnqueueMessage(MessageHandle message) {
271+
auto EnqueueMessage(MessageHandle&& message) {
268272
DCHECK(is_initial_metadata_queued_);
269273
DCHECK(message != nullptr);
270274
DCHECK(!is_reset_stream_queued_);
@@ -275,19 +279,20 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
275279
const uint32_t tokens =
276280
message->payload()->Length() + kGrpcHeaderSizeInBytes;
277281
return [self = this->Ref(), entry = QueueEntry{std::move(message)},
278-
tokens]() mutable -> Poll<absl::StatusOr<bool>> {
282+
tokens]() mutable -> Poll<absl::StatusOr<EnqueueResult>> {
279283
MutexLock lock(&self->mu_);
280-
auto result = self->queue_.Enqueue(entry, tokens);
284+
Poll<absl::StatusOr<bool>> result = self->queue_.Enqueue(entry, tokens);
281285
if (result.ready()) {
282286
GRPC_STREAM_DATA_QUEUE_DEBUG
283287
<< "Enqueued message for stream " << self->stream_id_
284-
<< " with status: " << result.value().status;
288+
<< " with status: " << result.value().status();
285289
// TODO(akshitpatel) : [PH2][P2] : Add check for flow control tokens.
286-
if (result.value().status.ok()) {
290+
if (result.value().ok()) {
287291
return self->UpdateWritableStateLocked(
288-
result.value().became_non_empty);
292+
/*became_non_empty*/ result.value().value(),
293+
WritableStreams::StreamPriority::kDefault);
289294
}
290-
return result.value().status;
295+
return result.value().status();
291296
}
292297
return Pending{};
293298
};
@@ -305,18 +310,20 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
305310

306311
is_trailing_metadata_or_half_close_queued_ = true;
307312
return [self = this->Ref(), entry = QueueEntry{HalfClosed{}}]() mutable
308-
-> Poll<absl::StatusOr<bool>> {
313+
-> Poll<absl::StatusOr<EnqueueResult>> {
309314
MutexLock lock(&self->mu_);
310-
auto result = self->queue_.Enqueue(entry, /*tokens=*/0);
315+
Poll<absl::StatusOr<bool>> result =
316+
self->queue_.Enqueue(entry, /*tokens=*/0);
311317
if (result.ready()) {
312318
GRPC_STREAM_DATA_QUEUE_DEBUG
313319
<< "Marking stream " << self->stream_id_ << " as half closed"
314-
<< " with status: " << result.value().status;
315-
if (result.value().status.ok()) {
320+
<< " with status: " << result.value().status();
321+
if (result.value().ok()) {
316322
return self->UpdateWritableStateLocked(
317-
result.value().became_non_empty);
323+
/*became_non_empty*/ result.value().value(),
324+
WritableStreams::StreamPriority::kStreamClosed);
318325
}
319-
return result.value().status;
326+
return result.value().status();
320327
}
321328
return Pending{};
322329
};
@@ -332,18 +339,20 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
332339
is_reset_stream_queued_ = true;
333340
return [self = this->Ref(),
334341
entry = QueueEntry{ResetStream{
335-
error_code}}]() mutable -> Poll<absl::StatusOr<bool>> {
342+
error_code}}]() mutable -> Poll<absl::StatusOr<EnqueueResult>> {
336343
MutexLock lock(&self->mu_);
337-
auto result = self->queue_.Enqueue(entry, /*tokens=*/0);
344+
Poll<absl::StatusOr<bool>> result =
345+
self->queue_.Enqueue(entry, /*tokens=*/0);
338346
if (result.ready()) {
339347
GRPC_STREAM_DATA_QUEUE_DEBUG
340348
<< "Enqueueing reset stream for stream " << self->stream_id_
341-
<< " with status: " << result.value().status;
342-
if (result.value().status.ok()) {
349+
<< " with status: " << result.value().status();
350+
if (result.value().ok()) {
343351
return self->UpdateWritableStateLocked(
344-
result.value().became_non_empty);
352+
/*became_non_empty*/ result.value().value(),
353+
WritableStreams::StreamPriority::kStreamClosed);
345354
}
346-
return result.value().status;
355+
return result.value().status();
347356
}
348357
return Pending{};
349358
};
@@ -357,6 +366,7 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
357366
struct DequeueResult {
358367
std::vector<Http2Frame> frames;
359368
bool is_writable;
369+
WritableStreams::StreamPriority priority;
360370
};
361371

362372
// TODO(akshitpatel) : [PH2][P4] : Measure the performance of this function
@@ -411,7 +421,7 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
411421
GRPC_STREAM_DATA_QUEUE_DEBUG << "Stream id: " << stream_id_
412422
<< " writable state changed to "
413423
<< is_writable_;
414-
return DequeueResult{handle_dequeue.GetFrames(), is_writable_};
424+
return DequeueResult{handle_dequeue.GetFrames(), is_writable_, priority_};
415425
}
416426

417427
// Returns true if the queue is empty. This function is thread safe.
@@ -572,17 +582,29 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
572582
HPackCompressor& encoder_;
573583
};
574584

575-
// Returns true if the queue is now writable. It is expected that the caller
576-
// will hold the lock on the queue when calling this function.
577-
bool UpdateWritableStateLocked(const bool became_non_empty)
585+
// Updates the stream priority. Also sets the writable state to true if the
586+
// stream has become writable. Returns if the stream became writable and
587+
// updated priority. It is expected that the caller will hold the lock on the
588+
// queue when calling this function.
589+
EnqueueResult UpdateWritableStateLocked(
590+
const bool became_non_empty,
591+
const WritableStreams::StreamPriority priority)
578592
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
593+
priority_ = priority;
579594
if (!is_writable_ && became_non_empty) {
580-
GRPC_STREAM_DATA_QUEUE_DEBUG << "Stream id: " << stream_id_
581-
<< " writeable state changed to true";
582595
is_writable_ = true;
583-
return true;
596+
GRPC_STREAM_DATA_QUEUE_DEBUG
597+
<< "UpdateWritableStateLocked for stream id: " << stream_id_
598+
<< " became writable with priority: "
599+
<< WritableStreams::GetPriorityString(priority_);
600+
return EnqueueResult{/*became_writable=*/true, priority_};
584601
}
585-
return false;
602+
603+
GRPC_STREAM_DATA_QUEUE_DEBUG
604+
<< "UpdateWritableStateLocked for stream id: " << stream_id_
605+
<< " with priority: " << WritableStreams::GetPriorityString(priority_)
606+
<< " is_writable: " << is_writable_;
607+
return EnqueueResult{/*became_writable=*/false, priority_};
586608
}
587609

588610
const uint32_t stream_id_;
@@ -597,6 +619,8 @@ class StreamDataQueue : public RefCounted<StreamDataQueue<MetadataHandle>> {
597619
Mutex mu_;
598620
bool is_writable_ ABSL_GUARDED_BY(mu_) = false;
599621
SimpleQueue<QueueEntry> queue_;
622+
WritableStreams::StreamPriority priority_ ABSL_GUARDED_BY(mu_) =
623+
WritableStreams::StreamPriority::kDefault;
600624

601625
// Accessed only during dequeue.
602626
HeaderDisassembler initial_metadata_disassembler_;

test/core/transport/chttp2/stream_data_queue_fuzz_test.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ class SimpleQueueFuzzTest : public YodelTest {
4848
auto EnqueueAndCheckSuccess(SimpleQueue<int>& queue, int data, int tokens) {
4949
return Map([&queue, data,
5050
tokens]() mutable { return queue.Enqueue(data, tokens); },
51-
[](auto result) { EXPECT_EQ(result.status, absl::OkStatus()); });
51+
[](absl::StatusOr<bool> result) {
52+
EXPECT_EQ(result.status(), absl::OkStatus());
53+
});
5254
}
5355

5456
bool DequeueAndCheck(SimpleQueue<int>& queue, int data,

0 commit comments

Comments
 (0)