Skip to content

Commit b0f6f49

Browse files
Fix: Propagate Otel Context to Subscriber Callback if Provided (#1429)
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 2b7e423 commit b0f6f49

File tree

3 files changed

+15
-5
lines changed

3 files changed

+15
-5
lines changed

google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ def end_subscribe_scheduler_span(self) -> None:
160160
assert self._scheduler_span is not None
161161
self._scheduler_span.end()
162162

163-
def start_process_span(self) -> None:
163+
def start_process_span(self) -> trace.Span:
164164
assert self._subscribe_span is not None
165165
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
166166
publish_create_span_link: Optional[trace.Link] = None
@@ -186,6 +186,7 @@ def start_process_span(self) -> None:
186186
end_on_exit=False,
187187
) as process_span:
188188
self._process_span = process_span
189+
return process_span
189190

190191
def end_process_span(self) -> None:
191192
assert self._process_span is not None
@@ -200,6 +201,13 @@ def add_process_span_event(self, event: str) -> None:
200201
},
201202
)
202203

204+
def __enter__(self) -> trace.Span:
205+
return self.start_process_span()
206+
207+
def __exit__(self, exc_type, exc_val, traceback):
208+
if self._process_span:
209+
self.end_process_span()
210+
203211

204212
def start_modack_span(
205213
subscribe_span_links: List[trace.Link],

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,10 @@ def _wrap_callback_errors(
148148
try:
149149
if message.opentelemetry_data:
150150
message.opentelemetry_data.end_subscribe_concurrency_control_span()
151-
message.opentelemetry_data.start_process_span()
152-
callback(message)
151+
with message.opentelemetry_data:
152+
callback(message)
153+
else:
154+
callback(message)
153155
except BaseException as exc:
154156
# Note: the likelihood of this failing is extremely low. This just adds
155157
# a message to a queue, so if this doesn't work the world is in an

tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2956,10 +2956,10 @@ def test_opentelemetry_subscriber_concurrency_control_span_end(span_exporter):
29562956
streaming_pull_manager._wrap_callback_errors(mock.Mock(), mock.Mock(), msg)
29572957

29582958
spans = span_exporter.get_finished_spans()
2959-
assert len(spans) == 1
2959+
assert len(spans) == 2
29602960

29612961
concurrency_control_span = spans[0]
2962-
concurrency_control_span.name == "subscriber concurrency control"
2962+
assert concurrency_control_span.name == "subscriber concurrency control"
29632963

29642964

29652965
def test_opentelemetry_wrap_callback_error(span_exporter):

0 commit comments

Comments
 (0)