diff --git a/src/Instrumentation/Symfony/composer.json b/src/Instrumentation/Symfony/composer.json index dcc126c99..24dc54e7d 100644 --- a/src/Instrumentation/Symfony/composer.json +++ b/src/Instrumentation/Symfony/composer.json @@ -30,8 +30,9 @@ "open-telemetry/sdk": "^1.0", "phpunit/phpunit": "^9.5", "vimeo/psalm": "6.4.0", - "symfony/http-client": "^5.4||^6.0", - "symfony/messenger": "^5.4||^6.0", + "symfony/http-client": "^5.4||^6.0||^7.0", + "symfony/messenger": "^5.4||^6.0||^7.0", + "open-telemetry/test-utils": "^0.2.0", "open-telemetry/opentelemetry-propagation-traceresponse": "*", "open-telemetry/opentelemetry-propagation-server-timing": "*" }, diff --git a/src/Instrumentation/Symfony/phpunit.xml.dist b/src/Instrumentation/Symfony/phpunit.xml.dist index d85488537..dd3200529 100644 --- a/src/Instrumentation/Symfony/phpunit.xml.dist +++ b/src/Instrumentation/Symfony/phpunit.xml.dist @@ -33,6 +33,8 @@ + + diff --git a/src/Instrumentation/Symfony/src/MessengerInstrumentation.php b/src/Instrumentation/Symfony/src/MessengerInstrumentation.php index fe8e22347..5c26d4446 100644 --- a/src/Instrumentation/Symfony/src/MessengerInstrumentation.php +++ b/src/Instrumentation/Symfony/src/MessengerInstrumentation.php @@ -4,37 +4,88 @@ namespace OpenTelemetry\Contrib\Instrumentation\Symfony; +use OpenTelemetry\API\Globals; use OpenTelemetry\API\Instrumentation\CachedInstrumentation; use OpenTelemetry\API\Trace\Span; +use OpenTelemetry\API\Trace\SpanBuilderInterface; use OpenTelemetry\API\Trace\SpanKind; use OpenTelemetry\API\Trace\StatusCode; use OpenTelemetry\Context\Context; +use OpenTelemetry\Contrib\Instrumentation\Symfony\Propagation\EnvelopeContextPropagator; use function OpenTelemetry\Instrumentation\hook; use OpenTelemetry\SemConv\TraceAttributes; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Stamp\BusNameStamp; +use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; +use Symfony\Component\Messenger\Stamp\DelayStamp; +use Symfony\Component\Messenger\Stamp\HandledStamp; +use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; +use Symfony\Component\Messenger\Stamp\SentStamp; +use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; +use Symfony\Component\Messenger\Worker; + +// Add Amazon SQS stamp class if available +if (\class_exists('Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp')) { + class_alias( + 'Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsReceivedStamp', + 'OpenTelemetry\Contrib\Instrumentation\Symfony\AmazonSqsReceivedStamp' + ); +} /** - * The messenger instrumentation will create an internal span for each message dispatched. - * - * It is currently not designed to facilitate trace context propagation. - * This should be done at the transport level. + * The messenger instrumentation creates spans for message operations in Symfony's Messenger system. * - * An exception to this will be simple transports like the Doctrine and InMemory transports. + * This implementation follows the OpenTelemetry messaging semantic conventions: + * @see https://opentelemetry.io/docs/specs/semconv/messaging/ * - * Caution: MessageBuses can be nested, so we might wand to add - * a message stamp to keep track of the parent span. + * Key features: + * - Context propagation between producers and consumers + * - Span naming following the "{operation} {destination}" convention + * - Proper span kind assignment based on operation type + * - Standard messaging attributes + * - Support for distributed tracing across message boundaries */ final class MessengerInstrumentation { - const ATTRIBUTE_MESSENGER_BUS = 'symfony.messenger.bus'; - const ATTRIBUTE_MESSENGER_MESSAGE = 'symfony.messenger.message'; - const ATTRIBUTE_MESSENGER_TRANSPORT = 'symfony.messenger.transport'; + // Standard messaging operation types as defined in the spec + private const OPERATION_TYPE_CREATE = 'create'; + private const OPERATION_TYPE_SEND = 'send'; + private const OPERATION_TYPE_RECEIVE = 'receive'; + private const OPERATION_TYPE_PROCESS = 'process'; + + // Symfony-specific operation types + private const OPERATION_TYPE_MIDDLEWARE = 'middleware'; + + // Attribute constants + const ATTRIBUTE_MESSAGING_MESSAGE = 'messaging.message'; + const ATTRIBUTE_MESSAGING_BUS = 'messaging.symfony.bus'; + const ATTRIBUTE_MESSAGING_HANDLER = 'messaging.symfony.handler'; + const ATTRIBUTE_MESSAGING_REDELIVERED_AT = 'messaging.symfony.redelivered_at'; + const ATTRIBUTE_MESSAGING_SENDER = 'messaging.symfony.sender'; + const ATTRIBUTE_MESSAGING_DELAY = 'messaging.symfony.delay'; + const ATTRIBUTE_MESSAGING_RETRY_COUNT = 'messaging.symfony.retry_count'; + const ATTRIBUTE_MESSAGING_STAMPS = 'messaging.symfony.stamps'; + const ATTRIBUTE_MESSAGING_MIDDLEWARE = 'symfony.messenger.middleware'; + const ATTRIBUTE_MESSAGING_CONSUMED_BY_WORKER = 'messaging.symfony.consumed_by_worker'; + + // Constants used in tests + const ATTRIBUTE_MESSENGER_BUS = self::ATTRIBUTE_MESSAGING_BUS; + const ATTRIBUTE_MESSENGER_MESSAGE = self::ATTRIBUTE_MESSAGING_MESSAGE; - /** @psalm-suppress PossiblyUnusedMethod */ + /** + * Registers the instrumentation hooks for Symfony Messenger. + * + * @psalm-suppress PossiblyUnusedMethod + */ public static function register(): void { + // Check if we should use the stable messaging conventions + $useStableConventions = self::shouldUseStableConventions(); + $emitDuplicateConventions = self::shouldEmitDuplicateConventions(); + $instrumentation = new CachedInstrumentation( 'io.opentelemetry.contrib.php.symfony_messenger', null, @@ -43,9 +94,7 @@ public static function register(): void /** * MessageBusInterface dispatches messages to the handlers. - * - * @psalm-suppress UnusedFunctionCall - */ + */ hook( MessageBusInterface::class, 'dispatch', @@ -61,20 +110,32 @@ public static function register(): void $message = $params[0]; $messageClass = \get_class($message); + // Get destination name if available + $destinationName = $message instanceof Envelope + ? self::getDestinationName($message) + : $class; + + // Create span with proper naming convention /** @psalm-suppress ArgumentTypeCoercion */ $builder = $instrumentation ->tracer() - ->spanBuilder(\sprintf('DISPATCH %s', $messageClass)) + ->spanBuilder(\sprintf('%s %s', self::OPERATION_TYPE_CREATE, $destinationName)) ->setSpanKind(SpanKind::KIND_PRODUCER) ->setAttribute(TraceAttributes::CODE_FUNCTION_NAME, $function) ->setAttribute(TraceAttributes::CODE_NAMESPACE, $class) ->setAttribute(TraceAttributes::CODE_FILEPATH, $filename) ->setAttribute(TraceAttributes::CODE_LINE_NUMBER, $lineno) - - ->setAttribute(self::ATTRIBUTE_MESSENGER_BUS, $class) - ->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass) + ->setAttribute(TraceAttributes::MESSAGING_SYSTEM, 'symfony') + ->setAttribute(TraceAttributes::MESSAGING_OPERATION_TYPE, self::OPERATION_TYPE_CREATE) + ->setAttribute(self::ATTRIBUTE_MESSAGING_MESSAGE, $messageClass) + ->setAttribute(self::ATTRIBUTE_MESSAGING_BUS, $class) + ->setAttribute(TraceAttributes::MESSAGING_DESTINATION_NAME, $destinationName) ; + if ($message instanceof Envelope) { + self::addMessageStampsToSpan($builder, $message); + } + $parent = Context::getCurrent(); $span = $builder ->setParent($parent) @@ -110,14 +171,12 @@ public static function register(): void /** * SenderInterface sends messages to a transport. - * - * @psalm-suppress UnusedFunctionCall - */ + */ hook( SenderInterface::class, 'send', pre: static function ( - SenderInterface $bus, + SenderInterface $sender, array $params, string $class, string $function, @@ -128,34 +187,202 @@ public static function register(): void $envelope = $params[0]; $messageClass = \get_class($envelope->getMessage()); + // Inject OpenTelemetry context into message envelope + $propagator = Globals::propagator(); + $currentContext = Context::getCurrent(); + $headers = []; + $propagator->inject($headers, EnvelopeContextPropagator::getInstance(), $currentContext); + + $envelope = EnvelopeContextPropagator::getInstance()->injectContextIntoEnvelope($envelope, $headers); + + // Get destination name + $destinationName = self::getDestinationName($envelope) ?: $class; + /** @psalm-suppress ArgumentTypeCoercion */ $builder = $instrumentation ->tracer() - ->spanBuilder(\sprintf('SEND %s', $messageClass)) + ->spanBuilder(\sprintf('%s %s', self::OPERATION_TYPE_SEND, $destinationName)) ->setSpanKind(SpanKind::KIND_PRODUCER) ->setAttribute(TraceAttributes::CODE_FUNCTION_NAME, $function) ->setAttribute(TraceAttributes::CODE_NAMESPACE, $class) ->setAttribute(TraceAttributes::CODE_FILEPATH, $filename) ->setAttribute(TraceAttributes::CODE_LINE_NUMBER, $lineno) - - ->setAttribute(self::ATTRIBUTE_MESSENGER_TRANSPORT, $class) - ->setAttribute(self::ATTRIBUTE_MESSENGER_MESSAGE, $messageClass) + ->setAttribute(TraceAttributes::MESSAGING_SYSTEM, 'symfony') + ->setAttribute(TraceAttributes::MESSAGING_OPERATION_TYPE, self::OPERATION_TYPE_SEND) + ->setAttribute(self::ATTRIBUTE_MESSAGING_MESSAGE, $messageClass) + ->setAttribute(TraceAttributes::MESSAGING_DESTINATION_NAME, $destinationName) ; + self::addMessageStampsToSpan($builder, $envelope); + $parent = Context::getCurrent(); + $span = $builder + ->setParent($parent) + ->startSpan(); + + $context = $span->storeInContext($parent); + Context::storage()->attach($context); + + return [$envelope]; + }, + post: static function ( + SenderInterface $sender, + array $params, + ?Envelope $result, + ?\Throwable $exception + ): void { + $scope = Context::storage()->scope(); + if (null === $scope) { + return; + } + + $scope->detach(); + $span = Span::fromContext($scope->context()); + + if (null !== $exception) { + $span->recordException($exception); + $span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage()); + } + + $span->end(); + } + ); + /** + * Worker handles messages from a transport. + */ + hook( + Worker::class, + 'handleMessage', + pre: static function ( + Worker $worker, + array $params, + string $class, + string $function, + ?string $filename, + ?int $lineno, + ) use ($instrumentation): array { + /** @var Envelope $envelope */ + $envelope = $params[0]; + /** @var string|ReceiverInterface $transport */ + $transport = $params[1]; + $transportName = \is_object($transport) ? \get_class($transport) : $transport; + + // Use ReceivedStamp transport name if available + $receivedStamp = $envelope->last(ReceivedStamp::class); + if ($receivedStamp) { + $transportName = $receivedStamp->getTransportName(); + } + + // Extract OpenTelemetry context from message envelope + $propagator = Globals::propagator(); + $extractedContext = EnvelopeContextPropagator::getInstance()->extractContextFromEnvelope($envelope); + if ($extractedContext !== null) { + $propagator->extract($extractedContext, EnvelopeContextPropagator::getInstance()); + } + + $messageClass = \get_class($envelope->getMessage()); + + /** @psalm-suppress ArgumentTypeCoercion */ + $builder = $instrumentation + ->tracer() + ->spanBuilder(\sprintf('%s %s', self::OPERATION_TYPE_RECEIVE, $transportName)) + ->setSpanKind(SpanKind::KIND_CONSUMER) + ->setAttribute(TraceAttributes::CODE_FUNCTION_NAME, $function) + ->setAttribute(TraceAttributes::CODE_NAMESPACE, $class) + ->setAttribute(TraceAttributes::CODE_FILEPATH, $filename) + ->setAttribute(TraceAttributes::CODE_LINE_NUMBER, $lineno) + ->setAttribute(TraceAttributes::MESSAGING_SYSTEM, 'symfony') + ->setAttribute(TraceAttributes::MESSAGING_OPERATION_TYPE, self::OPERATION_TYPE_RECEIVE) + ->setAttribute(TraceAttributes::MESSAGING_DESTINATION_NAME, $transportName) + ->setAttribute(self::ATTRIBUTE_MESSAGING_MESSAGE, $messageClass) + ; + + self::addMessageStampsToSpan($builder, $envelope); + + $parent = Context::getCurrent(); $span = $builder ->setParent($parent) ->startSpan(); $context = $span->storeInContext($parent); + Context::storage()->attach($context); + + return $params; + }, + post: static function ( + Worker $worker, + array $params, + ?Envelope $result, + ?\Throwable $exception + ): void { + $scope = Context::storage()->scope(); + if (null === $scope) { + return; + } + + $scope->detach(); + $span = Span::fromContext($scope->context()); + + if (null !== $exception) { + $span->recordException($exception); + $span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage()); + } + + $span->end(); + } + ); + /** + * HandleMessageMiddleware processes messages with handlers. + */ + hook( + 'Symfony\Component\Messenger\Middleware\HandleMessageMiddleware', + 'handle', + pre: static function ( + object $middleware, + array $params, + string $class, + string $function, + ?string $filename, + ?int $lineno, + ) use ($instrumentation): array { + /** @var Envelope $envelope */ + $envelope = $params[0]; + $messageClass = \get_class($envelope->getMessage()); + + // Get destination name + $destinationName = self::getDestinationName($envelope) ?: $messageClass; + + /** @psalm-suppress ArgumentTypeCoercion */ + $builder = $instrumentation + ->tracer() + ->spanBuilder(\sprintf('%s %s', self::OPERATION_TYPE_PROCESS, $destinationName)) + ->setSpanKind(SpanKind::KIND_CONSUMER) // Changed from INTERNAL to CONSUMER per spec + ->setAttribute(TraceAttributes::CODE_FUNCTION_NAME, $function) + ->setAttribute(TraceAttributes::CODE_NAMESPACE, $class) + ->setAttribute(TraceAttributes::CODE_FILEPATH, $filename) + ->setAttribute(TraceAttributes::CODE_LINE_NUMBER, $lineno) + ->setAttribute(TraceAttributes::MESSAGING_SYSTEM, 'symfony') + ->setAttribute(TraceAttributes::MESSAGING_OPERATION_TYPE, self::OPERATION_TYPE_PROCESS) + ->setAttribute(self::ATTRIBUTE_MESSAGING_MESSAGE, $messageClass) + ->setAttribute(TraceAttributes::MESSAGING_DESTINATION_NAME, $destinationName) + ; + + self::addMessageStampsToSpan($builder, $envelope); + + $parent = Context::getCurrent(); + $span = $builder + ->setParent($parent) + ->startSpan(); + + $context = $span->storeInContext($parent); Context::storage()->attach($context); return $params; }, post: static function ( - SenderInterface $sender, + object $middleware, array $params, ?Envelope $result, ?\Throwable $exception @@ -176,5 +403,256 @@ public static function register(): void $span->end(); } ); + + // Add instrumentation for individual handlers in Symfony 6.2+ + if (method_exists('Symfony\Component\Messenger\Middleware\HandleMessageMiddleware', 'callHandler')) { + hook( + 'Symfony\Component\Messenger\Middleware\HandleMessageMiddleware', + 'callHandler', + pre: static function ( + object $middleware, + array $params, + string $class, + string $function, + ?string $filename, + ?int $lineno, + ) use ($instrumentation): array { + $handler = $params[0]; + $message = $params[1]; + $handlerClass = \get_class($handler); + $messageClass = \get_class($message); + + // For handler-specific spans, use a custom destination format + $destinationName = sprintf('%s::%s', $handlerClass, $messageClass); + + /** @psalm-suppress ArgumentTypeCoercion */ + $builder = $instrumentation + ->tracer() + ->spanBuilder(\sprintf('%s %s', self::OPERATION_TYPE_PROCESS, $destinationName)) + ->setSpanKind(SpanKind::KIND_CONSUMER) // Changed from INTERNAL to CONSUMER per spec + ->setAttribute(TraceAttributes::CODE_FUNCTION_NAME, $function) + ->setAttribute(TraceAttributes::CODE_NAMESPACE, $class) + ->setAttribute(TraceAttributes::CODE_FILEPATH, $filename) + ->setAttribute(TraceAttributes::CODE_LINE_NUMBER, $lineno) + ->setAttribute(TraceAttributes::MESSAGING_SYSTEM, 'symfony') + ->setAttribute(TraceAttributes::MESSAGING_OPERATION_TYPE, self::OPERATION_TYPE_PROCESS) + ->setAttribute(self::ATTRIBUTE_MESSAGING_MESSAGE, $messageClass) + ->setAttribute(self::ATTRIBUTE_MESSAGING_HANDLER, $handlerClass) + ->setAttribute(TraceAttributes::MESSAGING_DESTINATION_NAME, $destinationName) + ; + + $parent = Context::getCurrent(); + $span = $builder + ->setParent($parent) + ->startSpan(); + + $context = $span->storeInContext($parent); + Context::storage()->attach($context); + + return $params; + }, + post: static function ( + object $middleware, + array $params, + $result, + ?\Throwable $exception + ): void { + $scope = Context::storage()->scope(); + if (null === $scope) { + return; + } + + $scope->detach(); + $span = Span::fromContext($scope->context()); + + if (null !== $exception) { + $span->recordException($exception); + $span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage()); + } + + $span->end(); + } + ); + } + + if (getenv('OTEL_PHP_MESSENGER_INSTRUMENT_MIDDLEWARES')) { + /** + * Instrument all middlewares + */ + hook( + 'Symfony\Component\Messenger\Middleware\MiddlewareInterface', + 'handle', + pre: static function ( + object $middleware, + array $params, + string $class, + string $function, + ?string $filename, + ?int $lineno, + ) use ($instrumentation): array { + /** @var Envelope $envelope */ + $envelope = $params[0]; + $messageClass = \get_class($envelope->getMessage()); + $middlewareClass = \get_class($middleware); + + // For middleware spans, use a custom destination format + $destinationName = sprintf('%s::%s', $middlewareClass, $messageClass); + + /** @psalm-suppress ArgumentTypeCoercion */ + $builder = $instrumentation + ->tracer() + ->spanBuilder(\sprintf('%s %s', self::OPERATION_TYPE_MIDDLEWARE, $destinationName)) + ->setSpanKind(SpanKind::KIND_INTERNAL) // Keep as INTERNAL since middleware is not a standard messaging operation + ->setAttribute(TraceAttributes::CODE_FUNCTION_NAME, $function) + ->setAttribute(TraceAttributes::CODE_NAMESPACE, $class) + ->setAttribute(TraceAttributes::CODE_FILEPATH, $filename) + ->setAttribute(TraceAttributes::CODE_LINE_NUMBER, $lineno) + ->setAttribute(TraceAttributes::MESSAGING_SYSTEM, 'symfony') + ->setAttribute(TraceAttributes::MESSAGING_OPERATION_TYPE, self::OPERATION_TYPE_MIDDLEWARE) + ->setAttribute(self::ATTRIBUTE_MESSAGING_MESSAGE, $messageClass) + ->setAttribute(self::ATTRIBUTE_MESSAGING_MIDDLEWARE, $middlewareClass) + ->setAttribute(TraceAttributes::MESSAGING_DESTINATION_NAME, $destinationName) + ; + + self::addMessageStampsToSpan($builder, $envelope); + + $parent = Context::getCurrent(); + $span = $builder + ->setParent($parent) + ->startSpan(); + + $context = $span->storeInContext($parent); + Context::storage()->attach($context); + + return $params; + }, + post: static function ( + object $middleware, + array $params, + ?Envelope $result, + ?\Throwable $exception + ): void { + $scope = Context::storage()->scope(); + if (null === $scope) { + return; + } + + $scope->detach(); + $span = Span::fromContext($scope->context()); + + if (null !== $exception) { + $span->recordException($exception); + $span->setStatus(StatusCode::STATUS_ERROR, $exception->getMessage()); + } + + $span->end(); + } + ); + } + } + + /** + * Determines if stable messaging conventions should be used. + */ + private static function shouldUseStableConventions(): bool + { + $optIn = getenv('OTEL_SEMCONV_STABILITY_OPT_IN'); + if (!$optIn) { + return false; + } + + $values = explode(',', $optIn); + + return in_array('messaging', $values) || in_array('messaging/dup', $values); + } + + /** + * Determines if both old and new conventions should be emitted. + */ + private static function shouldEmitDuplicateConventions(): bool + { + $optIn = getenv('OTEL_SEMCONV_STABILITY_OPT_IN'); + if (!$optIn) { + return false; + } + + $values = explode(',', $optIn); + + return in_array('messaging/dup', $values); + } + + /** + * Gets the destination name from an envelope. + */ + private static function getDestinationName(Envelope $envelope): ?string + { + $sentStamp = $envelope->last(SentStamp::class); + $receivedStamp = $envelope->last(ReceivedStamp::class); + + if ($sentStamp && $sentStamp->getSenderAlias()) { + return $sentStamp->getSenderAlias(); + } + + if ($receivedStamp) { + return $receivedStamp->getTransportName(); + } + + return null; + } + + /** + * Adds message stamps as span attributes. + */ + private static function addMessageStampsToSpan(SpanBuilderInterface $builder, Envelope $envelope): void + { + $busStamp = $envelope->last(BusNameStamp::class); + $consumedByWorkerStamp = $envelope->last(ConsumedByWorkerStamp::class); + $delayStamp = $envelope->last(DelayStamp::class); + $handledStamp = $envelope->last(HandledStamp::class); + $receivedStamp = $envelope->last(ReceivedStamp::class); + $redeliveryStamp = $envelope->last(RedeliveryStamp::class); + $sentStamp = $envelope->last(SentStamp::class); + $transportMessageIdStamp = $envelope->last(TransportMessageIdStamp::class); + + if ($busStamp) { + $builder->setAttribute(self::ATTRIBUTE_MESSAGING_BUS, $busStamp->getBusName()); + } + + if ($consumedByWorkerStamp) { + $builder->setAttribute(self::ATTRIBUTE_MESSAGING_CONSUMED_BY_WORKER, true); + } + + if ($handledStamp) { + $builder->setAttribute(self::ATTRIBUTE_MESSAGING_HANDLER, $handledStamp->getHandlerName()); + } + + if ($redeliveryStamp) { + $builder->setAttribute(self::ATTRIBUTE_MESSAGING_REDELIVERED_AT, $redeliveryStamp->getRedeliveredAt()->format('Y-m-d\TH:i:sP')); + $builder->setAttribute(self::ATTRIBUTE_MESSAGING_RETRY_COUNT, $redeliveryStamp->getRetryCount()); + } + + if ($sentStamp) { + $builder->setAttribute(self::ATTRIBUTE_MESSAGING_SENDER, $sentStamp->getSenderClass()); + $builder->setAttribute(TraceAttributes::MESSAGING_DESTINATION_NAME, $sentStamp->getSenderAlias()); + } elseif ($receivedStamp) { + $builder->setAttribute(TraceAttributes::MESSAGING_DESTINATION_NAME, $receivedStamp->getTransportName()); + } + + if ($transportMessageIdStamp) { + $builder->setAttribute(TraceAttributes::MESSAGING_MESSAGE_ID, $transportMessageIdStamp->getId()); + } + + if ($delayStamp) { + $builder->setAttribute(self::ATTRIBUTE_MESSAGING_DELAY, $delayStamp->getDelay()); + } + + // Add all stamps count + $stamps = []; + foreach ($envelope->all() as $stampFqcn => $instances) { + $stamps[$stampFqcn] = \count($instances); + } + if (!empty($stamps)) { + $builder->setAttribute(self::ATTRIBUTE_MESSAGING_STAMPS, \json_encode($stamps)); + } } } diff --git a/src/Instrumentation/Symfony/src/Propagation/EnvelopeContextPropagator.php b/src/Instrumentation/Symfony/src/Propagation/EnvelopeContextPropagator.php new file mode 100644 index 000000000..d6353aaf8 --- /dev/null +++ b/src/Instrumentation/Symfony/src/Propagation/EnvelopeContextPropagator.php @@ -0,0 +1,88 @@ + $carrier + * @return array + */ + public function keys($carrier): array + { + return array_keys($carrier); + } + + /** + * @param array $carrier + */ + public function get($carrier, string $key): ?string + { + return $carrier[$key] ?? null; + } + + /** + * @param array $carrier + */ + public function set(&$carrier, string $key, string $value): void + { + $carrier[$key] = $value; + } + + /** + * Injects OpenTelemetry context into a Symfony Messenger envelope. + */ + public function injectContextIntoEnvelope(Envelope $envelope, array $context): Envelope + { + if (empty($context)) { + return $envelope; + } + + $serializerContext = [ + self::CONTEXT_KEY => $context, + ]; + + return $envelope->with(new SerializerStamp($serializerContext)); + } + + /** + * Extracts OpenTelemetry context from a Symfony Messenger envelope. + */ + public function extractContextFromEnvelope(Envelope $envelope): ?array + { + $serializerStamps = $envelope->all(SerializerStamp::class); + foreach ($serializerStamps as $serializerStamp) { + /** @var SerializerStamp $serializerStamp */ + $context = $serializerStamp->getContext(); + if (isset($context[self::CONTEXT_KEY])) { + return $context[self::CONTEXT_KEY]; + } + } + + return null; + } +} diff --git a/src/Instrumentation/Symfony/tests/Integration/HttpClientInstrumentationTest.php b/src/Instrumentation/Symfony/tests/Integration/HttpClientInstrumentationTest.php index be9a18f9e..061e166fe 100644 --- a/src/Instrumentation/Symfony/tests/Integration/HttpClientInstrumentationTest.php +++ b/src/Instrumentation/Symfony/tests/Integration/HttpClientInstrumentationTest.php @@ -16,7 +16,7 @@ final class HttpClientInstrumentationTest extends AbstractTest { public static function setUpBeforeClass(): void { - TestHttpServer::start(); + TestHttpServer::start(8058); } protected function getHttpClient(string $_testCase): HttpClientInterface @@ -32,7 +32,8 @@ public function test_send_request(string $method, string $uri, int $statusCode, $client = $this->getHttpClient(__FUNCTION__); $this->assertCount(0, $this->storage); - $response = $client->request($method, $uri, ['bindto' => '127.0.0.1:9876']); + $port = 9876 + (parse_url($uri, PHP_URL_PATH) === '/1' ? 1 : 0); + $response = $client->request($method, $uri, ['bindto' => "127.0.0.1:$port"]); $response->getStatusCode(); $this->assertCount(1, $this->storage); @@ -62,7 +63,7 @@ public function test_throw_exception(): void $this->assertCount(0, $this->storage); try { - $client->request('GET', 'http://localhost:8057', [ + $client->request('GET', 'http://localhost:8058', [ 'bindto' => '127.0.0.1:9876', 'auth_ntlm' => [], ]); @@ -83,10 +84,10 @@ public function test_throw_exception(): void public function requestProvider(): array { return [ - ['GET', 'http://localhost:8057', Response::HTTP_OK, StatusCode::STATUS_UNSET], - ['GET','http://localhost:8057/404', Response::HTTP_NOT_FOUND, StatusCode::STATUS_ERROR], - ['POST','http://localhost:8057/json', Response::HTTP_OK, StatusCode::STATUS_UNSET], - ['DELETE', 'http://localhost:8057/1', Response::HTTP_OK, StatusCode::STATUS_UNSET], + ['GET', 'http://localhost:8058', Response::HTTP_OK, StatusCode::STATUS_UNSET], + ['GET','http://localhost:8058/404', Response::HTTP_NOT_FOUND, StatusCode::STATUS_ERROR], + ['POST','http://localhost:8058/json', Response::HTTP_OK, StatusCode::STATUS_UNSET], + ['DELETE', 'http://localhost:8058/1', Response::HTTP_OK, StatusCode::STATUS_UNSET], ]; } } diff --git a/src/Instrumentation/Symfony/tests/Integration/MessengerInstrumentationTest.php b/src/Instrumentation/Symfony/tests/Integration/MessengerInstrumentationTest.php index c2fa139b2..df7e0b3cd 100644 --- a/src/Instrumentation/Symfony/tests/Integration/MessengerInstrumentationTest.php +++ b/src/Instrumentation/Symfony/tests/Integration/MessengerInstrumentationTest.php @@ -6,6 +6,8 @@ use OpenTelemetry\API\Trace\SpanKind; use OpenTelemetry\Contrib\Instrumentation\Symfony\MessengerInstrumentation; +use OpenTelemetry\SemConv\TraceAttributes; +use OpenTelemetry\TestUtils\TraceStructureAssertionTrait; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\MessageBus; use Symfony\Component\Messenger\MessageBusInterface; @@ -29,8 +31,34 @@ public function getMessage(): string } } +final class SendEmailMessageHandler +{ + public function __invoke(SendEmailMessage $message) + { + // Handler logic + return 'handled'; + } +} + +final class TestMessage +{ + public function __construct() + { + throw new \RuntimeException('test'); + } +} + +final class TestMessageWithRetry +{ + public function __construct() + { + throw new \RuntimeException('test'); + } +} + final class MessengerInstrumentationTest extends AbstractTest { + use TraceStructureAssertionTrait; protected function getMessenger(): MessageBusInterface { return new MessageBus(); @@ -59,17 +87,16 @@ public function test_dispatch_message($message, string $spanName, int $kind, arr $bus->dispatch($message); - $this->assertCount(1, $this->storage); - - $span = $this->storage[0]; - - $this->assertEquals($spanName, $span->getName()); - $this->assertEquals($kind, $span->getKind()); - - foreach ($attributes as $key => $value) { - $this->assertTrue($span->getAttributes()->has($key), sprintf('Attribute %s not found', $key)); - $this->assertEquals($value, $span->getAttributes()->get($key)); - } + $this->assertTraceStructure( + $this->storage, + [ + [ + 'name' => $spanName, + 'kind' => $kind, + 'attributes' => $attributes, + ], + ] + ); } /** @@ -84,17 +111,16 @@ public function test_send_message($message, string $spanName, int $kind, array $ $transport = $this->getTransport(); $transport->send(new Envelope($message)); - $this->assertCount(1, $this->storage); - - $span = $this->storage[0]; - - $this->assertEquals($spanName, $span->getName()); - $this->assertEquals($kind, $span->getKind()); - - foreach ($attributes as $key => $value) { - $this->assertTrue($span->getAttributes()->has($key), sprintf('Attribute %s not found', $key)); - $this->assertEquals($value, $span->getAttributes()->get($key)); - } + $this->assertTraceStructure( + $this->storage, + [ + [ + 'name' => $spanName, + 'kind' => $kind, + 'attributes' => $attributes, + ], + ] + ); } public function test_can_sustain_throw_while_dispatching() @@ -108,9 +134,29 @@ public function dispatch(object $message, array $stamps = []): Envelope try { $bus->dispatch(new SendEmailMessage('Hello Again')); - } catch (\Throwable $e) { - $this->assertCount(1, $this->storage); - $this->assertArrayHasKey(0, $this->storage); + } catch (\Exception $e) { + // Expected exception + $this->assertEquals('booo!', $e->getMessage()); + + // Now check the trace structure + $this->assertTraceStructure( + $this->storage, + [ + [ + 'name' => $this->stringContains('create'), + 'kind' => SpanKind::KIND_PRODUCER, + 'attributes' => [ + // Required attributes + TraceAttributes::MESSAGING_SYSTEM => 'symfony', + TraceAttributes::MESSAGING_OPERATION_TYPE => 'create', + TraceAttributes::MESSAGING_DESTINATION_NAME => $this->isType('string'), + + // Symfony-specific attributes + MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE => 'OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', + ], + ], + ] + ); } } @@ -141,21 +187,234 @@ public function send(Envelope $envelope): Envelope try { $transport->send(new Envelope(new SendEmailMessage('Hello Again'))); } catch (\Throwable $e) { - $this->assertCount(1, $this->storage); - $this->assertArrayHasKey(0, $this->storage); + // Expected exception + $this->assertEquals('booo!', $e->getMessage()); + + // Now check the trace structure + $this->assertTraceStructure( + $this->storage, + [ + [ + 'name' => $this->stringContains('send'), + 'kind' => SpanKind::KIND_PRODUCER, + 'attributes' => [ + // Required attributes + TraceAttributes::MESSAGING_SYSTEM => 'symfony', + TraceAttributes::MESSAGING_OPERATION_TYPE => 'send', + TraceAttributes::MESSAGING_DESTINATION_NAME => $this->isType('string'), + + // Symfony-specific attributes + MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE => 'OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', + ], + ], + ] + ); } } + public function test_handle_message() + { + $bus = $this->getMessenger(); + $transport = $this->getTransport(); + $worker = new \Symfony\Component\Messenger\Worker( + ['transport' => $transport], + $bus + ); + + // Send a message to the transport + $message = new SendEmailMessage('Hello Again'); + $envelope = new Envelope($message); + $transport->send($envelope); + + // Get and handle the message + $messages = iterator_to_array($transport->get()); + $message = $messages[0]; + + // Use reflection to call the protected handleMessage method + $reflection = new \ReflectionClass($worker); + $handleMessageMethod = $reflection->getMethod('handleMessage'); + $handleMessageMethod->setAccessible(true); + $handleMessageMethod->invoke($worker, $message, 'transport'); + + // We should have 2 spans: send and consume + $this->assertTraceStructure( + $this->storage, + [ + [ + 'name' => 'send ' . (class_exists('Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport') ? 'Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport' : 'Symfony\Component\Messenger\Transport\InMemoryTransport'), + 'kind' => SpanKind::KIND_PRODUCER, + 'attributes' => [ + // Required attributes + TraceAttributes::MESSAGING_SYSTEM => 'symfony', + TraceAttributes::MESSAGING_OPERATION_TYPE => 'send', + TraceAttributes::MESSAGING_DESTINATION_NAME => $this->isType('string'), + + // Symfony-specific attributes + MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE => 'OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', + ], + ], + [ + 'name' => 'receive transport', + 'kind' => SpanKind::KIND_CONSUMER, + 'attributes' => [ + // Required attributes + TraceAttributes::MESSAGING_SYSTEM => 'symfony', + TraceAttributes::MESSAGING_OPERATION_TYPE => 'receive', + TraceAttributes::MESSAGING_DESTINATION_NAME => 'transport', + + // Symfony-specific attributes + MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE => 'OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', + ], + ], + ] + ); + } + + public function test_middleware_instrumentation() + { + // Register the instrumentation first + MessengerInstrumentation::register(); + + $bus = $this->getMessenger(); + $message = new SendEmailMessage('Hello Again'); + $envelope = new Envelope($message); + + // Create a test middleware + $middleware = new class() implements \Symfony\Component\Messenger\Middleware\MiddlewareInterface { + public function handle(Envelope $envelope, \Symfony\Component\Messenger\Middleware\StackInterface $stack): Envelope + { + return $stack->next()->handle($envelope, $stack); + } + }; + + // Handle the message through the middleware + $middleware->handle($envelope, new class() implements \Symfony\Component\Messenger\Middleware\StackInterface { + public function next(): \Symfony\Component\Messenger\Middleware\MiddlewareInterface + { + return new class() implements \Symfony\Component\Messenger\Middleware\MiddlewareInterface { + public function handle(Envelope $envelope, \Symfony\Component\Messenger\Middleware\StackInterface $stack): Envelope + { + return $envelope; + } + }; + } + }); + + // Use assertTraceStructure with PHPUnit constraints + $this->assertTraceStructure( + $this->storage, + [ + [ + 'name' => $this->logicalAnd( + $this->stringStartsWith('middleware'), + $this->stringContains('SendEmailMessage') + ), + 'kind' => SpanKind::KIND_INTERNAL, + 'attributes' => [ + // Required attributes + TraceAttributes::MESSAGING_SYSTEM => 'symfony', + TraceAttributes::MESSAGING_OPERATION_TYPE => 'middleware', + TraceAttributes::MESSAGING_DESTINATION_NAME => $this->isType('string'), + + // Symfony-specific attributes + MessengerInstrumentation::ATTRIBUTE_MESSAGING_MESSAGE => 'OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', + MessengerInstrumentation::ATTRIBUTE_MESSAGING_MIDDLEWARE => $this->logicalNot($this->isEmpty()), + ], + ], + ] + ); + } + + public function test_stamp_information() + { + $transport = $this->getTransport(); + $message = new SendEmailMessage('Hello Again'); + + // Add various stamps to the envelope + $envelope = new Envelope($message, [ + new \Symfony\Component\Messenger\Stamp\BusNameStamp('test_bus'), + new \Symfony\Component\Messenger\Stamp\DelayStamp(1000), + new \Symfony\Component\Messenger\Stamp\TransportMessageIdStamp('test-id'), + ]); + + $transport->send($envelope); + + // We should have a send span with all stamp information + $this->assertTraceStructure( + $this->storage, + [ + [ + 'name' => 'send ' . (class_exists('Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport') ? 'Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport' : 'Symfony\Component\Messenger\Transport\InMemoryTransport'), + 'kind' => SpanKind::KIND_PRODUCER, + 'attributes' => [ + // Required attributes + TraceAttributes::MESSAGING_SYSTEM => 'symfony', + TraceAttributes::MESSAGING_OPERATION_TYPE => 'send', + TraceAttributes::MESSAGING_DESTINATION_NAME => $this->isType('string'), + TraceAttributes::MESSAGING_MESSAGE_ID => 'test-id', + + // Symfony-specific attributes + MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE => 'OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', + 'messaging.symfony.bus' => 'test_bus', + 'messaging.symfony.delay' => 1000, + ], + ], + ] + ); + + // Check stamps count + $sendSpan = $this->storage[0]; + $this->assertTrue($sendSpan->getAttributes()->has('messaging.symfony.stamps')); + $stamps = json_decode($sendSpan->getAttributes()->get('messaging.symfony.stamps'), true); + $this->assertIsArray($stamps); + $this->assertArrayHasKey('Symfony\Component\Messenger\Stamp\BusNameStamp', $stamps); + $this->assertArrayHasKey('Symfony\Component\Messenger\Stamp\DelayStamp', $stamps); + $this->assertArrayHasKey('Symfony\Component\Messenger\Stamp\TransportMessageIdStamp', $stamps); + } + + public function test_throw_exception(): void + { + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('test'); + + $bus = $this->getMessenger(); + $bus->dispatch(new TestMessage()); + } + + public function test_throw_exception_with_retry(): void + { + $this->expectException(\RuntimeException::class); + $this->expectExceptionMessage('test'); + + $bus = $this->getMessenger(); + $bus->dispatch(new TestMessageWithRetry()); + } + public function sendDataProvider(): array { + $transportClass = class_exists('Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport') + ? 'Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport' + : 'Symfony\Component\Messenger\Transport\InMemoryTransport'; + return [ [ new SendEmailMessage('Hello Again'), - 'SEND OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', + 'send ' . $transportClass, SpanKind::KIND_PRODUCER, [ - MessengerInstrumentation::ATTRIBUTE_MESSENGER_TRANSPORT => class_exists('Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport') ? 'Symfony\Component\Messenger\Transport\InMemory\InMemoryTransport' : 'Symfony\Component\Messenger\Transport\InMemoryTransport', + // Required attributes + TraceAttributes::MESSAGING_SYSTEM => 'symfony', + TraceAttributes::MESSAGING_OPERATION_TYPE => 'send', + TraceAttributes::MESSAGING_DESTINATION_NAME => $transportClass, + + // Symfony-specific attributes MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE => 'OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', + + // Code attributes + TraceAttributes::CODE_FUNCTION_NAME => 'send', + TraceAttributes::CODE_NAMESPACE => $this->stringContains('Symfony\Component\Messenger\Transport'), + TraceAttributes::CODE_FILEPATH => $this->isType('string'), + TraceAttributes::CODE_LINE_NUMBER => $this->greaterThan(0), ], ], ]; @@ -166,11 +425,23 @@ public function dispatchDataProvider(): array return [ [ new SendEmailMessage('Hello Again'), - 'DISPATCH OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', + 'create Symfony\Component\Messenger\MessageBus', SpanKind::KIND_PRODUCER, [ + // Required attributes + TraceAttributes::MESSAGING_SYSTEM => 'symfony', + TraceAttributes::MESSAGING_OPERATION_TYPE => 'create', + TraceAttributes::MESSAGING_DESTINATION_NAME => 'Symfony\Component\Messenger\MessageBus', + + // Symfony-specific attributes MessengerInstrumentation::ATTRIBUTE_MESSENGER_BUS => 'Symfony\Component\Messenger\MessageBus', MessengerInstrumentation::ATTRIBUTE_MESSENGER_MESSAGE => 'OpenTelemetry\Tests\Instrumentation\Symfony\tests\Integration\SendEmailMessage', + + // Code attributes + TraceAttributes::CODE_FUNCTION_NAME => 'dispatch', + TraceAttributes::CODE_NAMESPACE => $this->stringContains('Symfony\Component\Messenger'), + TraceAttributes::CODE_FILEPATH => $this->isType('string'), + TraceAttributes::CODE_LINE_NUMBER => $this->greaterThan(0), ], ], ]; diff --git a/src/Instrumentation/Symfony/tests/Unit/Propagation/EnvelopeContextPropagatorTest.php b/src/Instrumentation/Symfony/tests/Unit/Propagation/EnvelopeContextPropagatorTest.php new file mode 100644 index 000000000..5013b60c6 --- /dev/null +++ b/src/Instrumentation/Symfony/tests/Unit/Propagation/EnvelopeContextPropagatorTest.php @@ -0,0 +1,129 @@ +propagator = EnvelopeContextPropagator::getInstance(); + $this->message = new TestMessage('test'); + $this->envelope = new Envelope($this->message); + } + + public function test_singleton_instance(): void + { + $instance1 = EnvelopeContextPropagator::getInstance(); + $instance2 = EnvelopeContextPropagator::getInstance(); + + $this->assertSame($instance1, $instance2); + } + + public function test_inject_context_into_envelope(): void + { + $context = [ + 'traceparent' => '00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01', + 'tracestate' => 'congo=t61rcWkgMzE', + ]; + + $envelopeWithContext = $this->propagator->injectContextIntoEnvelope($this->envelope, $context); + + /** @var SerializerStamp $stamp */ + $stamp = $envelopeWithContext->last(SerializerStamp::class); + $this->assertNotNull($stamp); + + $stampContext = $stamp->getContext(); + $this->assertArrayHasKey('otel_context', $stampContext); + $this->assertSame($context, $stampContext['otel_context']); + } + + public function test_inject_empty_context_returns_original_envelope(): void + { + $envelopeWithContext = $this->propagator->injectContextIntoEnvelope($this->envelope, []); + + $this->assertSame($this->envelope, $envelopeWithContext); + $this->assertNull($envelopeWithContext->last(SerializerStamp::class)); + } + + public function test_extract_context_from_envelope(): void + { + $context = [ + 'traceparent' => '00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01', + 'tracestate' => 'congo=t61rcWkgMzE', + ]; + + $envelopeWithContext = $this->propagator->injectContextIntoEnvelope($this->envelope, $context); + $extractedContext = $this->propagator->extractContextFromEnvelope($envelopeWithContext); + + $this->assertSame($context, $extractedContext); + } + + public function test_extract_context_from_envelope_without_context(): void + { + $extractedContext = $this->propagator->extractContextFromEnvelope($this->envelope); + + $this->assertNull($extractedContext); + } + + public function test_extract_context_from_envelope_with_multiple_serializer_stamps(): void + { + $context = [ + 'traceparent' => '00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01', + ]; + + // Add a serializer stamp without context + $envelope = $this->envelope->with(new SerializerStamp(['foo' => 'bar'])); + // Add a serializer stamp with context + $envelope = $this->propagator->injectContextIntoEnvelope($envelope, $context); + + $extractedContext = $this->propagator->extractContextFromEnvelope($envelope); + + $this->assertSame($context, $extractedContext); + } + + public function test_propagator_getter_interface(): void + { + $carrier = [ + 'foo' => 'bar', + 'baz' => 'qux', + ]; + + $this->assertSame(['foo', 'baz'], $this->propagator->keys($carrier)); + $this->assertSame('bar', $this->propagator->get($carrier, 'foo')); + $this->assertNull($this->propagator->get($carrier, 'nonexistent')); + } + + public function test_propagator_setter_interface(): void + { + $carrier = []; + $this->propagator->set($carrier, 'foo', 'bar'); + + $this->assertSame(['foo' => 'bar'], $carrier); + } +} + +/** + * Simple message class for testing + */ +final class TestMessage +{ + public function __construct(private string $content) + { + } + + public function getContent(): string + { + return $this->content; + } +}