diff --git a/pyproject.toml b/pyproject.toml index aca86056..f55134cd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,8 @@ dependencies = [ "protobuf>=5.28.3", "pydantic-core>=2.20.1", "websockets>=12.0", + "opentelemetry-sdk>=1.28.2", + "opentelemetry-api>=1.28.2", ] [tool.uv] diff --git a/replit_river/client.py b/replit_river/client.py index fab2dc62..8111c636 100644 --- a/replit_river/client.py +++ b/replit_river/client.py @@ -1,8 +1,12 @@ import logging from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Callable -from typing import Any, Generic, Optional, Union +from contextlib import contextmanager +from typing import Any, Generator, Generic, Literal, Optional, Union + +from opentelemetry import trace from replit_river.client_transport import ClientTransport +from replit_river.error_schema import RiverException from replit_river.transport_options import ( HandshakeMetadataType, TransportOptions, @@ -17,6 +21,7 @@ ) logger = logging.getLogger(__name__) +tracer = trace.get_tracer(__name__) class Client(Generic[HandshakeMetadataType]): @@ -55,15 +60,16 @@ async def send_rpc( response_deserializer: Callable[[Any], ResponseType], error_deserializer: Callable[[Any], ErrorType], ) -> ResponseType: - session = await self._transport.get_or_create_session() - return await session.send_rpc( - service_name, - procedure_name, - request, - request_serializer, - response_deserializer, - error_deserializer, - ) + with _trace_procedure("rpc", service_name, procedure_name): + session = await self._transport.get_or_create_session() + return await session.send_rpc( + service_name, + procedure_name, + request, + request_serializer, + response_deserializer, + error_deserializer, + ) async def send_upload( self, @@ -76,17 +82,18 @@ async def send_upload( response_deserializer: Callable[[Any], ResponseType], error_deserializer: Callable[[Any], ErrorType], ) -> ResponseType: - session = await self._transport.get_or_create_session() - return await session.send_upload( - service_name, - procedure_name, - init, - request, - init_serializer, - request_serializer, - response_deserializer, - error_deserializer, - ) + with _trace_procedure("upload", service_name, procedure_name): + session = await self._transport.get_or_create_session() + return await session.send_upload( + service_name, + procedure_name, + init, + request, + init_serializer, + request_serializer, + response_deserializer, + error_deserializer, + ) async def send_subscription( self, @@ -97,15 +104,16 @@ async def send_subscription( response_deserializer: Callable[[Any], ResponseType], error_deserializer: Callable[[Any], ErrorType], ) -> AsyncIterator[Union[ResponseType, ErrorType]]: - session = await self._transport.get_or_create_session() - return session.send_subscription( - service_name, - procedure_name, - request, - request_serializer, - response_deserializer, - error_deserializer, - ) + with _trace_procedure("subscription", service_name, procedure_name): + session = await self._transport.get_or_create_session() + return session.send_subscription( + service_name, + procedure_name, + request, + request_serializer, + response_deserializer, + error_deserializer, + ) async def send_stream( self, @@ -118,14 +126,33 @@ async def send_stream( response_deserializer: Callable[[Any], ResponseType], error_deserializer: Callable[[Any], ErrorType], ) -> AsyncIterator[Union[ResponseType, ErrorType]]: - session = await self._transport.get_or_create_session() - return session.send_stream( - service_name, - procedure_name, - init, - request, - init_serializer, - request_serializer, - response_deserializer, - error_deserializer, - ) + with _trace_procedure("stream", service_name, procedure_name): + session = await self._transport.get_or_create_session() + return session.send_stream( + service_name, + procedure_name, + init, + request, + init_serializer, + request_serializer, + response_deserializer, + error_deserializer, + ) + + +@contextmanager +def _trace_procedure( + procedure_type: Literal["rpc", "upload", "subscription", "stream"], + service_name: str, + procedure_name: str, +) -> Generator[None, None, None]: + with tracer.start_as_current_span( + f"river.client.{procedure_type}.{service_name}.{procedure_name}", + kind=trace.SpanKind.CLIENT, + ) as span: + try: + yield + except RiverException as e: + span.set_attribute("river.error_code", e.code) + span.set_attribute("river.error_message", e.message) + raise e diff --git a/replit_river/rpc.py b/replit_river/rpc.py index 2362b82a..0845ea46 100644 --- a/replit_river/rpc.py +++ b/replit_river/rpc.py @@ -22,6 +22,7 @@ import grpc from aiochannel import Channel, ChannelClosed +from opentelemetry.propagators.textmap import Setter from pydantic import BaseModel, ConfigDict, Field from replit_river.error_schema import ( @@ -86,6 +87,11 @@ class ControlMessageHandshakeResponse(BaseModel): status: HandShakeStatus +class PropagationContext(BaseModel): + traceparent: Optional[str] = None + tracestate: Optional[str] = None + + class TransportMessage(BaseModel): id: str # from_ is used instead of from because from is a reserved keyword in Python @@ -97,12 +103,30 @@ class TransportMessage(BaseModel): procedureName: Optional[str] = None streamId: str controlFlags: int + tracing: Optional[PropagationContext] = None payload: Any model_config = ConfigDict(populate_by_name=True) # need this because we create TransportMessage objects with destructuring # where the key is "from" +class TransportMessageTracingSetter(Setter[TransportMessage]): + """ + Handles propagating tracing context to the recipient of the message. + """ + + def set(self, carrier: TransportMessage, key: str, value: str) -> None: + if not carrier.tracing: + carrier.tracing = PropagationContext() + match key: + case "traceparent": + carrier.tracing.traceparent = value + case "tracestate": + carrier.tracing.tracestate = value + case _: + logger.warning("unknown trace propagation key", extra={"key": key}) + + class GrpcContext(grpc.aio.ServicerContext): """Represents a gRPC-compatible ServicerContext for River interop.""" diff --git a/replit_river/session.py b/replit_river/session.py index 38d09723..bd7fa5d7 100644 --- a/replit_river/session.py +++ b/replit_river/session.py @@ -6,6 +6,7 @@ import nanoid # type: ignore import websockets from aiochannel import Channel, ChannelClosed +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator from websockets.exceptions import ConnectionClosed from replit_river.message_buffer import MessageBuffer, MessageBufferClosedError @@ -31,6 +32,7 @@ STREAM_OPEN_BIT, GenericRpcHandler, TransportMessage, + TransportMessageTracingSetter, ) logger = logging.getLogger(__name__) @@ -380,6 +382,9 @@ async def send_message( serviceName=service_name, procedureName=procedure_name, ) + TraceContextTextMapPropagator().inject( + msg, None, TransportMessageTracingSetter() + ) try: # We need this lock to ensure the buffer order and message sending order # are the same. diff --git a/scripts/lint.sh b/scripts/lint.sh index 960fdd7c..3116f095 100755 --- a/scripts/lint.sh +++ b/scripts/lint.sh @@ -1,4 +1,4 @@ -#!/bin/bash +#!/usr/bin/env bash set -ex diff --git a/uv.lock b/uv.lock index b495b7fa..3c6980c6 100644 --- a/uv.lock +++ b/uv.lock @@ -125,6 +125,18 @@ toml = [ { name = "tomli", marker = "python_full_version <= '3.11'" }, ] +[[package]] +name = "deprecated" +version = "1.2.15" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "wrapt" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/2e/a3/53e7d78a6850ffdd394d7048a31a6f14e44900adedf190f9a165f6b69439/deprecated-1.2.15.tar.gz", hash = "sha256:683e561a90de76239796e6b6feac66b99030d2dd3fcf61ef996330f14bbb9b0d", size = 2977612 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1d/8f/c7f227eb42cfeaddce3eb0c96c60cbca37797fa7b34f8e1aeadf6c5c0983/Deprecated-1.2.15-py2.py3-none-any.whl", hash = "sha256:353bc4a8ac4bfc96800ddab349d89c25dec1079f65fd53acdcc1e0b975b21320", size = 9941 }, +] + [[package]] name = "deptry" version = "0.20.0" @@ -218,6 +230,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/96/d0/f0855a0ccb26ffeb41e6db68b5cbb25d7e9ba1f8f19151eef36210e64efc/grpcio_tools-1.67.1-cp313-cp313-win_amd64.whl", hash = "sha256:6961da86e9856b4ddee0bf51ef6636b4bf9c29c0715aa71f3c8f027c45d42654", size = 1089819 }, ] +[[package]] +name = "importlib-metadata" +version = "8.5.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "zipp" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/cd/12/33e59336dca5be0c398a7482335911a33aa0e20776128f038019f1a95f1b/importlib_metadata-8.5.0.tar.gz", hash = "sha256:71522656f0abace1d072b9e5481a48f07c138e00f079c38c8f883823f9c26bd7", size = 55304 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a0/d9/a1e041c5e7caa9a05c925f4bdbdfb7f006d1f74996af53467bc394c97be7/importlib_metadata-8.5.0-py3-none-any.whl", hash = "sha256:45e54197d28b7a7f1559e60b95e7c567032b602131fbd588f1497f47880aa68b", size = 26514 }, +] + [[package]] name = "iniconfig" version = "2.0.0" @@ -340,6 +364,46 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2e/0d/8630f13998638dc01e187fadd2e5c6d42d127d08aeb4943d231664d6e539/nanoid-2.0.0-py3-none-any.whl", hash = "sha256:90aefa650e328cffb0893bbd4c236cfd44c48bc1f2d0b525ecc53c3187b653bb", size = 5844 }, ] +[[package]] +name = "opentelemetry-api" +version = "1.28.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "deprecated" }, + { name = "importlib-metadata" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/51/34/e4e9245c868c6490a46ffedf6bd5b0f512bbc0a848b19e3a51f6bbad648c/opentelemetry_api-1.28.2.tar.gz", hash = "sha256:ecdc70c7139f17f9b0cf3742d57d7020e3e8315d6cffcdf1a12a905d45b19cc0", size = 62796 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/4d/58/b17393cdfc149e14ee84c662abf921993dcce8058628359ef1f49e2abb97/opentelemetry_api-1.28.2-py3-none-any.whl", hash = "sha256:6fcec89e265beb258fe6b1acaaa3c8c705a934bd977b9f534a2b7c0d2d4275a6", size = 64302 }, +] + +[[package]] +name = "opentelemetry-sdk" +version = "1.28.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "opentelemetry-api" }, + { name = "opentelemetry-semantic-conventions" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/4b/f4/840a5af4efe48d7fb4c456ad60fd624673e871a60d6494f7ff8a934755d4/opentelemetry_sdk-1.28.2.tar.gz", hash = "sha256:5fed24c5497e10df30282456fe2910f83377797511de07d14cec0d3e0a1a3110", size = 157272 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/da/8b/4f2b418496c08016d4384f9b1c4725a8af7faafa248d624be4bb95993ce1/opentelemetry_sdk-1.28.2-py3-none-any.whl", hash = "sha256:93336c129556f1e3ccd21442b94d3521759541521861b2214c499571b85cb71b", size = 118757 }, +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.49b2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "deprecated" }, + { name = "opentelemetry-api" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/7d/0a/e3b93f94aa3223c6fd8e743502a1fefd4fb3a753d8f501ce2a418f7c0bd4/opentelemetry_semantic_conventions-0.49b2.tar.gz", hash = "sha256:44e32ce6a5bb8d7c0c617f84b9dc1c8deda1045a07dc16a688cc7cbeab679997", size = 95213 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b1/be/6661c8f76708bb3ba38c90be8fa8d7ffe17ccbc5cbbc229334f5535f6448/opentelemetry_semantic_conventions-0.49b2-py3-none-any.whl", hash = "sha256:51e7e1d0daa958782b6c2a8ed05e5f0e7dd0716fc327ac058777b8659649ee54", size = 159199 }, +] + [[package]] name = "packaging" version = "24.1" @@ -515,6 +579,8 @@ dependencies = [ { name = "msgpack" }, { name = "msgpack-types" }, { name = "nanoid" }, + { name = "opentelemetry-api" }, + { name = "opentelemetry-sdk" }, { name = "protobuf" }, { name = "pydantic" }, { name = "pydantic-core" }, @@ -543,6 +609,8 @@ requires-dist = [ { name = "msgpack", specifier = ">=1.0.7" }, { name = "msgpack-types", specifier = ">=0.3.0" }, { name = "nanoid", specifier = ">=2.0.0" }, + { name = "opentelemetry-api", specifier = ">=1.28.2" }, + { name = "opentelemetry-sdk", specifier = ">=1.28.2" }, { name = "protobuf", specifier = ">=5.28.3" }, { name = "pydantic", specifier = "==2.9.2" }, { name = "pydantic-core", specifier = ">=2.20.1" }, @@ -664,3 +732,41 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/41/d8/63d6194aae711d7263df4498200c690a9c39fb437ede10f3e157a6343e0d/websockets-13.1-cp313-cp313-win_amd64.whl", hash = "sha256:c518e84bb59c2baae725accd355c8dc517b4a3ed8db88b4bc93c78dae2974bf2", size = 159144 }, { url = "https://files.pythonhosted.org/packages/56/27/96a5cd2626d11c8280656c6c71d8ab50fe006490ef9971ccd154e0c42cd2/websockets-13.1-py3-none-any.whl", hash = "sha256:a9a396a6ad26130cdae92ae10c36af09d9bfe6cafe69670fd3b6da9b07b4044f", size = 152134 }, ] + +[[package]] +name = "wrapt" +version = "1.16.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/95/4c/063a912e20bcef7124e0df97282a8af3ff3e4b603ce84c481d6d7346be0a/wrapt-1.16.0.tar.gz", hash = "sha256:5f370f952971e7d17c7d1ead40e49f32345a7f7a5373571ef44d800d06b1899d", size = 53972 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fd/03/c188ac517f402775b90d6f312955a5e53b866c964b32119f2ed76315697e/wrapt-1.16.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:1a5db485fe2de4403f13fafdc231b0dbae5eca4359232d2efc79025527375b09", size = 37313 }, + { url = "https://files.pythonhosted.org/packages/0f/16/ea627d7817394db04518f62934a5de59874b587b792300991b3c347ff5e0/wrapt-1.16.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:75ea7d0ee2a15733684badb16de6794894ed9c55aa5e9903260922f0482e687d", size = 38164 }, + { url = "https://files.pythonhosted.org/packages/7f/a7/f1212ba098f3de0fd244e2de0f8791ad2539c03bef6c05a9fcb03e45b089/wrapt-1.16.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a452f9ca3e3267cd4d0fcf2edd0d035b1934ac2bd7e0e57ac91ad6b95c0c6389", size = 80890 }, + { url = "https://files.pythonhosted.org/packages/b7/96/bb5e08b3d6db003c9ab219c487714c13a237ee7dcc572a555eaf1ce7dc82/wrapt-1.16.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:43aa59eadec7890d9958748db829df269f0368521ba6dc68cc172d5d03ed8060", size = 73118 }, + { url = "https://files.pythonhosted.org/packages/6e/52/2da48b35193e39ac53cfb141467d9f259851522d0e8c87153f0ba4205fb1/wrapt-1.16.0-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:72554a23c78a8e7aa02abbd699d129eead8b147a23c56e08d08dfc29cfdddca1", size = 80746 }, + { url = "https://files.pythonhosted.org/packages/11/fb/18ec40265ab81c0e82a934de04596b6ce972c27ba2592c8b53d5585e6bcd/wrapt-1.16.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:d2efee35b4b0a347e0d99d28e884dfd82797852d62fcd7ebdeee26f3ceb72cf3", size = 85668 }, + { url = "https://files.pythonhosted.org/packages/0f/ef/0ecb1fa23145560431b970418dce575cfaec555ab08617d82eb92afc7ccf/wrapt-1.16.0-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:6dcfcffe73710be01d90cae08c3e548d90932d37b39ef83969ae135d36ef3956", size = 78556 }, + { url = "https://files.pythonhosted.org/packages/25/62/cd284b2b747f175b5a96cbd8092b32e7369edab0644c45784871528eb852/wrapt-1.16.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:eb6e651000a19c96f452c85132811d25e9264d836951022d6e81df2fff38337d", size = 85712 }, + { url = "https://files.pythonhosted.org/packages/e5/a7/47b7ff74fbadf81b696872d5ba504966591a3468f1bc86bca2f407baef68/wrapt-1.16.0-cp311-cp311-win32.whl", hash = "sha256:66027d667efe95cc4fa945af59f92c5a02c6f5bb6012bff9e60542c74c75c362", size = 35327 }, + { url = "https://files.pythonhosted.org/packages/cf/c3/0084351951d9579ae83a3d9e38c140371e4c6b038136909235079f2e6e78/wrapt-1.16.0-cp311-cp311-win_amd64.whl", hash = "sha256:aefbc4cb0a54f91af643660a0a150ce2c090d3652cf4052a5397fb2de549cd89", size = 37523 }, + { url = "https://files.pythonhosted.org/packages/92/17/224132494c1e23521868cdd57cd1e903f3b6a7ba6996b7b8f077ff8ac7fe/wrapt-1.16.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:5eb404d89131ec9b4f748fa5cfb5346802e5ee8836f57d516576e61f304f3b7b", size = 37614 }, + { url = "https://files.pythonhosted.org/packages/6a/d7/cfcd73e8f4858079ac59d9db1ec5a1349bc486ae8e9ba55698cc1f4a1dff/wrapt-1.16.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:9090c9e676d5236a6948330e83cb89969f433b1943a558968f659ead07cb3b36", size = 38316 }, + { url = "https://files.pythonhosted.org/packages/7e/79/5ff0a5c54bda5aec75b36453d06be4f83d5cd4932cc84b7cb2b52cee23e2/wrapt-1.16.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:94265b00870aa407bd0cbcfd536f17ecde43b94fb8d228560a1e9d3041462d73", size = 86322 }, + { url = "https://files.pythonhosted.org/packages/c4/81/e799bf5d419f422d8712108837c1d9bf6ebe3cb2a81ad94413449543a923/wrapt-1.16.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f2058f813d4f2b5e3a9eb2eb3faf8f1d99b81c3e51aeda4b168406443e8ba809", size = 79055 }, + { url = "https://files.pythonhosted.org/packages/62/62/30ca2405de6a20448ee557ab2cd61ab9c5900be7cbd18a2639db595f0b98/wrapt-1.16.0-cp312-cp312-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:98b5e1f498a8ca1858a1cdbffb023bfd954da4e3fa2c0cb5853d40014557248b", size = 87291 }, + { url = "https://files.pythonhosted.org/packages/49/4e/5d2f6d7b57fc9956bf06e944eb00463551f7d52fc73ca35cfc4c2cdb7aed/wrapt-1.16.0-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:14d7dc606219cdd7405133c713f2c218d4252f2a469003f8c46bb92d5d095d81", size = 90374 }, + { url = "https://files.pythonhosted.org/packages/a6/9b/c2c21b44ff5b9bf14a83252a8b973fb84923764ff63db3e6dfc3895cf2e0/wrapt-1.16.0-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:49aac49dc4782cb04f58986e81ea0b4768e4ff197b57324dcbd7699c5dfb40b9", size = 83896 }, + { url = "https://files.pythonhosted.org/packages/14/26/93a9fa02c6f257df54d7570dfe8011995138118d11939a4ecd82cb849613/wrapt-1.16.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:418abb18146475c310d7a6dc71143d6f7adec5b004ac9ce08dc7a34e2babdc5c", size = 91738 }, + { url = "https://files.pythonhosted.org/packages/a2/5b/4660897233eb2c8c4de3dc7cefed114c61bacb3c28327e64150dc44ee2f6/wrapt-1.16.0-cp312-cp312-win32.whl", hash = "sha256:685f568fa5e627e93f3b52fda002c7ed2fa1800b50ce51f6ed1d572d8ab3e7fc", size = 35568 }, + { url = "https://files.pythonhosted.org/packages/5c/cc/8297f9658506b224aa4bd71906447dea6bb0ba629861a758c28f67428b91/wrapt-1.16.0-cp312-cp312-win_amd64.whl", hash = "sha256:dcdba5c86e368442528f7060039eda390cc4091bfd1dca41e8046af7c910dda8", size = 37653 }, + { url = "https://files.pythonhosted.org/packages/ff/21/abdedb4cdf6ff41ebf01a74087740a709e2edb146490e4d9beea054b0b7a/wrapt-1.16.0-py3-none-any.whl", hash = "sha256:6906c4100a8fcbf2fa735f6059214bb13b97f75b1a61777fcf6432121ef12ef1", size = 23362 }, +] + +[[package]] +name = "zipp" +version = "3.21.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/3f/50/bad581df71744867e9468ebd0bcd6505de3b275e06f202c2cb016e3ff56f/zipp-3.21.0.tar.gz", hash = "sha256:2c9958f6430a2040341a52eb608ed6dd93ef4392e02ffe219417c1b28b5dd1f4", size = 24545 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b7/1a/7e4798e9339adc931158c9d69ecc34f5e6791489d469f5e50ec15e35f458/zipp-3.21.0-py3-none-any.whl", hash = "sha256:ac1bbe05fd2991f160ebce24ffbac5f6d11d83dc90891255885223d42b3cd931", size = 9630 }, +]