Skip to content

Commit 17b1db5

Browse files
authored
Guarantee reset after complete for async streams does not crash. (envoyproxy#32408)
--------- Signed-off-by: Paul Ogilby <[email protected]>
1 parent 11fe00a commit 17b1db5

File tree

3 files changed

+95
-0
lines changed

3 files changed

+95
-0
lines changed

source/common/http/async_client_impl.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ void AsyncOngoingRequestImpl::initialize() {
314314
}
315315

316316
void AsyncRequestSharedImpl::onComplete() {
317+
complete_ = true;
317318
callbacks_.onBeforeFinalizeUpstreamSpan(*child_span_, &response_->headers());
318319

319320
Tracing::HttpTracerUtility::finalizeUpstreamSpan(*child_span_, streamInfo(),
@@ -335,6 +336,11 @@ void AsyncRequestSharedImpl::onTrailers(ResponseTrailerMapPtr&& trailers) {
335336
}
336337

337338
void AsyncRequestSharedImpl::onReset() {
339+
if (complete_) {
340+
// This request has already been completed; a reset should be ignored.
341+
return;
342+
}
343+
338344
if (!cancelled_) {
339345
// Set "error reason" tag related to reset. The tagging for "error true" is done inside the
340346
// Tracing::HttpTracerUtility::finalizeUpstreamSpan.

source/common/http/async_client_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ class AsyncStreamImpl : public virtual AsyncClient::Stream,
151151
absl::optional<AsyncClient::StreamDestructorCallbacks> destructor_callback_;
152152
// Callback to listen for low/high/overflow watermark events.
153153
absl::optional<std::reference_wrapper<DecoderFilterWatermarkCallbacks>> watermark_callbacks_;
154+
bool complete_{};
154155

155156
private:
156157
void cleanup();

test/common/http/async_client_impl_test.cc

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,50 @@ TEST_F(AsyncClientImplTest, OngoingRequestWithWatermarkingAndReset) {
369369
stream_encoder_.getStream().resetStream(StreamResetReason::RemoteReset);
370370
}
371371

372+
TEST_F(AsyncClientImplTest, OngoingRequestWithResetAfterCompletion) {
373+
auto headers = std::make_unique<TestRequestHeaderMapImpl>();
374+
HttpTestUtility::addDefaultHeaders(*headers);
375+
TestRequestHeaderMapImpl headers_copy = *headers;
376+
377+
Buffer::OwnedImpl data("test data");
378+
const Buffer::OwnedImpl data_copy(data.toString());
379+
380+
EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _, _))
381+
.WillOnce(Invoke(
382+
[&](ResponseDecoder& decoder, ConnectionPool::Callbacks& callbacks,
383+
const ConnectionPool::Instance::StreamOptions&) -> ConnectionPool::Cancellable* {
384+
callbacks.onPoolReady(stream_encoder_, cm_.thread_local_cluster_.conn_pool_.host_,
385+
stream_info_, {});
386+
response_decoder_ = &decoder;
387+
return nullptr;
388+
}));
389+
390+
headers_copy.addCopy("x-envoy-internal", "true");
391+
headers_copy.addCopy("x-forwarded-for", "127.0.0.1");
392+
393+
EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&headers_copy), false));
394+
EXPECT_CALL(stream_encoder_, encodeData(BufferEqual(&data_copy), true));
395+
396+
AsyncClient::OngoingRequest* request =
397+
client_.startRequest(std::move(headers), callbacks_, AsyncClient::RequestOptions());
398+
EXPECT_NE(request, nullptr);
399+
400+
request->sendData(data, true);
401+
402+
expectSuccess(request, 200);
403+
404+
ResponseHeaderMapPtr response_headers(new TestResponseHeaderMapImpl{{":status", "200"}});
405+
response_decoder_->decodeHeaders(std::move(response_headers), true);
406+
407+
request->reset();
408+
EXPECT_EQ(
409+
1UL,
410+
cm_.thread_local_cluster_.cluster_.info_->stats_store_.counter("upstream_rq_200").value());
411+
EXPECT_EQ(1UL, cm_.thread_local_cluster_.cluster_.info_->stats_store_
412+
.counter("internal.upstream_rq_200")
413+
.value());
414+
}
415+
372416
TEST_F(AsyncClientImplTracingTest, Basic) {
373417
Tracing::MockSpan* child_span{new Tracing::MockSpan()};
374418
message_->body().add("test body");
@@ -1444,6 +1488,50 @@ TEST_F(AsyncClientImplTracingTest, CancelRequest) {
14441488
request->cancel();
14451489
}
14461490

1491+
TEST_F(AsyncClientImplTracingTest, CancelRequestAfterComplete) {
1492+
Tracing::MockSpan* child_span{new Tracing::MockSpan()};
1493+
EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _, _))
1494+
.WillOnce(Invoke(
1495+
[&](StreamDecoder&, ConnectionPool::Callbacks& callbacks,
1496+
const ConnectionPool::Instance::StreamOptions&) -> ConnectionPool::Cancellable* {
1497+
callbacks.onPoolReady(stream_encoder_, cm_.thread_local_cluster_.conn_pool_.host_,
1498+
stream_info_, {});
1499+
return nullptr;
1500+
}));
1501+
1502+
EXPECT_CALL(parent_span_, spawnChild_(_, "async fake_cluster egress", _))
1503+
.WillOnce(Return(child_span));
1504+
1505+
AsyncClient::RequestOptions options = AsyncClient::RequestOptions().setParentSpan(parent_span_);
1506+
EXPECT_CALL(*child_span, setSampled(true));
1507+
EXPECT_CALL(*child_span, injectContext(_, _));
1508+
EXPECT_CALL(callbacks_, onBeforeFinalizeUpstreamSpan(_, _))
1509+
.WillOnce(Invoke([](Tracing::Span& span, const Http::ResponseHeaderMap* response_headers) {
1510+
span.setTag("onBeforeFinalizeUpstreamSpan", "called");
1511+
// Since this is a failure, we expect no response headers.
1512+
ASSERT_EQ(nullptr, response_headers);
1513+
}));
1514+
AsyncClient::Request* request = client_.send(std::move(message_), callbacks_, options);
1515+
1516+
EXPECT_CALL(*child_span, setTag(Eq("onBeforeFinalizeUpstreamSpan"), Eq("called")));
1517+
EXPECT_CALL(*child_span,
1518+
setTag(Eq(Tracing::Tags::get().Component), Eq(Tracing::Tags::get().Proxy)));
1519+
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().HttpProtocol), Eq("HTTP/1.1")));
1520+
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().UpstreamAddress), Eq("10.0.0.1:443")));
1521+
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().PeerAddress), Eq("10.0.0.1:443")));
1522+
1523+
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().UpstreamCluster), Eq("fake_cluster")));
1524+
EXPECT_CALL(*child_span,
1525+
setTag(Eq(Tracing::Tags::get().UpstreamClusterName), Eq("observability_name")));
1526+
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().HttpStatusCode), Eq("0")));
1527+
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().ResponseFlags), Eq("-")));
1528+
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().Error), Eq(Tracing::Tags::get().True)));
1529+
EXPECT_CALL(*child_span,
1530+
setTag(Eq(Tracing::Tags::get().Canceled), Eq(Tracing::Tags::get().True)));
1531+
EXPECT_CALL(*child_span, finishSpan());
1532+
request->cancel();
1533+
}
1534+
14471535
TEST_F(AsyncClientImplTest, DestroyWithActiveStream) {
14481536
EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _, _))
14491537
.WillOnce(Invoke(

0 commit comments

Comments
 (0)