diff --git a/docs/instrumentation-list.yaml b/docs/instrumentation-list.yaml index 3b0522e14b20..06c8e7b977ae 100644 --- a/docs/instrumentation-list.yaml +++ b/docs/instrumentation-list.yaml @@ -6867,8 +6867,11 @@ libraries: - org.apache.tomcat:tomcat-jasper:[7.0.19,10) kafka: - name: kafka-clients-0.11 - description: | - This instrumentation enables messaging spans and metrics for Apache Kafka 0.11 clients. It automatically traces message production and consumption, propagates context, and emits metrics for production and consumption. + display_name: Apache Kafka Client + description: This instrumentation enables messaging spans for Kafka producers + and consumers, and collects internal Kafka client metrics. + semantic_conventions: + - MESSAGING_SPANS library_link: https://kafka.apache.org/ source_path: instrumentation/kafka/kafka-clients/kafka-clients-0.11 scope: @@ -6878,33 +6881,13 @@ libraries: - org.apache.kafka:kafka-clients:[0.11.0.0,) configurations: - name: otel.instrumentation.kafka.producer-propagation.enabled - description: Enable context propagation for kafka message producers. + description: Enable context propagation for Kafka message producers. type: boolean default: true - name: otel.instrumentation.kafka.experimental-span-attributes - description: Enables the capture of the experimental consumer attribute "kafka.record.queue_time_ms" - type: boolean - default: false - - name: otel.instrumentation.messaging.experimental.capture-headers - description: A comma-separated list of header names to capture as span attributes. - type: list - default: '' - - name: otel.instrumentation.messaging.experimental.receive-telemetry.enabled - description: | - Enables experimental receive telemetry, which will cause consumers to start a new trace, with only a span link connecting it to the producer trace. + description: Enables the capture of the experimental consumer attribute `kafka.record.queue_time_ms`. type: boolean default: false - - name: kafka-clients-2.6 - description: | - This instrumentation provides a library integration that enables messaging spans and metrics for Apache Kafka 2.6+ clients. - library_link: https://kafka.apache.org/ - source_path: instrumentation/kafka/kafka-clients/kafka-clients-2.6 - scope: - name: io.opentelemetry.kafka-clients-2.6 - target_versions: - library: - - org.apache.kafka:kafka-clients:2.6.0 - configurations: - name: otel.instrumentation.messaging.experimental.capture-headers description: A comma-separated list of header names to capture as span attributes. type: list @@ -6914,18 +6897,6 @@ libraries: Enables experimental receive telemetry, which will cause consumers to start a new trace, with only a span link connecting it to the producer trace. type: boolean default: false - - name: kafka-connect-2.6 - description: This instrumentation enables messaging spans for Kafka Connect sink - tasks. - semantic_conventions: - - MESSAGING_SPANS - library_link: https://kafka.apache.org/documentation/#connect - source_path: instrumentation/kafka/kafka-connect-2.6 - scope: - name: io.opentelemetry.kafka-connect-2.6 - target_versions: - javaagent: - - org.apache.kafka:connect-api:[2.6.0,) telemetry: - when: default spans: @@ -6933,24 +6904,1380 @@ libraries: attributes: - name: messaging.batch.message_count type: LONG + - name: messaging.client_id + type: STRING - name: messaging.destination.name type: STRING + - name: messaging.destination.partition.id + type: STRING + - name: messaging.kafka.consumer.group + type: STRING + - name: messaging.kafka.message.key + type: STRING + - name: messaging.kafka.message.offset + type: LONG + - name: messaging.kafka.message.tombstone + type: BOOLEAN + - name: messaging.message.body.size + type: LONG - name: messaging.operation type: STRING - name: messaging.system type: STRING - - name: thread.id + - span_kind: PRODUCER + attributes: + - name: messaging.client_id + type: STRING + - name: messaging.destination.name + type: STRING + - name: messaging.destination.partition.id + type: STRING + - name: messaging.kafka.message.key + type: STRING + - name: messaging.kafka.message.offset type: LONG - - name: thread.name + - name: messaging.kafka.message.tombstone + type: BOOLEAN + - name: messaging.operation type: STRING - - name: kafka-streams-0.11 - library_link: https://kafka.apache.org/documentation/streams/ - source_path: instrumentation/kafka/kafka-streams-0.11 + - name: messaging.system + type: STRING + - name: kafka-clients-2.6 + display_name: Apache Kafka Client + description: This standalone instrumentation enables messaging spans for Kafka + producers and consumers, and collects internal Kafka client metrics. + semantic_conventions: + - MESSAGING_SPANS + library_link: https://kafka.apache.org/ + source_path: instrumentation/kafka/kafka-clients/kafka-clients-2.6 scope: - name: io.opentelemetry.kafka-streams-0.11 + name: io.opentelemetry.kafka-clients-2.6 target_versions: - javaagent: - - org.apache.kafka:kafka-streams:[0.11.0.0,) + library: + - org.apache.kafka:kafka-clients:2.6.0 + telemetry: + - when: default + metrics: + - name: kafka.consumer.assigned_partitions + description: The number of partitions currently assigned to this consumer + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.bytes_consumed_rate + description: The average number of bytes consumed per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: topic + type: STRING + - name: kafka.consumer.bytes_consumed_total + description: The total number of bytes consumed + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: topic + type: STRING + - name: kafka.consumer.commit_latency_avg + description: The average time taken for a commit request + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.commit_latency_max + description: The max time taken for a commit request + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.commit_rate + description: The number of commit calls per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.commit_total + description: The total number of commit calls + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.connection_close_rate + description: The number of connections closed per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.connection_close_total + description: The total number of connections closed + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.connection_count + description: The current number of active connections. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.connection_creation_rate + description: The number of new connections established per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.connection_creation_total + description: The total number of new connections established + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.failed_authentication_rate + description: The number of connections with failed authentication per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.failed_authentication_total + description: The total number of connections with failed authentication + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.failed_reauthentication_rate + description: The number of failed re-authentication of connections per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.failed_reauthentication_total + description: The total number of failed re-authentication of connections + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.failed_rebalance_rate_per_hour + description: The number of failed rebalance events per hour + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.failed_rebalance_total + description: The total number of failed rebalance events + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.fetch_latency_avg + description: The average time taken for a fetch request. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.fetch_latency_max + description: The max time taken for any fetch request. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.fetch_rate + description: The number of fetch requests per second. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.fetch_size_avg + description: The average number of bytes fetched per request + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: topic + type: STRING + - name: kafka.consumer.fetch_size_max + description: The maximum number of bytes fetched per request + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: topic + type: STRING + - name: kafka.consumer.fetch_throttle_time_avg + description: The average throttle time in ms + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.fetch_throttle_time_max + description: The maximum throttle time in ms + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.fetch_total + description: The total number of fetch requests. + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.heartbeat_rate + description: The number of heartbeats per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.heartbeat_total + description: The total number of heartbeats + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.incoming_byte_rate + description: The number of bytes read off all sockets per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.consumer.incoming_byte_total + description: The total number of bytes read off all sockets + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.consumer.io_ratio + description: The fraction of time the I/O thread spent doing I/O + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.io_time_ns_avg + description: The average length of time for I/O per select call in nanoseconds. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.io_wait_ratio + description: The fraction of time the I/O thread spent waiting + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.io_wait_time_ns_avg + description: The average length of time the I/O thread spent waiting for a + socket ready for reads or writes in nanoseconds. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.io_waittime_total + description: The total time the I/O thread spent waiting + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.iotime_total + description: The total time the I/O thread spent doing I/O + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.join_rate + description: The number of group joins per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.join_time_avg + description: The average time taken for a group rejoin + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.join_time_max + description: The max time taken for a group rejoin + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.join_total + description: The total number of group joins + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.last_heartbeat_seconds_ago + description: The number of seconds since the last coordinator heartbeat was + sent + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.last_poll_seconds_ago + description: The number of seconds since the last poll() invocation. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.last_rebalance_seconds_ago + description: The number of seconds since the last successful rebalance event + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.network_io_rate + description: The number of network operations (reads or writes) on all connections + per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.network_io_total + description: The total number of network operations (reads or writes) on all + connections + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.outgoing_byte_rate + description: The number of outgoing bytes sent to all servers per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.consumer.outgoing_byte_total + description: The total number of outgoing bytes sent to all servers + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.consumer.partition_assigned_latency_avg + description: The average time taken for a partition-assigned rebalance listener + callback + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.partition_assigned_latency_max + description: The max time taken for a partition-assigned rebalance listener + callback + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.poll_idle_ratio_avg + description: The average fraction of time the consumer's poll() is idle as + opposed to waiting for the user code to process records. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.rebalance_latency_avg + description: The average time taken for a group to complete a successful rebalance, + which may be composed of several failed re-trials until it succeeded + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.rebalance_latency_max + description: The max time taken for a group to complete a successful rebalance, + which may be composed of several failed re-trials until it succeeded + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.rebalance_latency_total + description: The total number of milliseconds this consumer has spent in successful + rebalances since creation + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.rebalance_rate_per_hour + description: The number of successful rebalance events per hour, each event + is composed of several failed re-trials until it succeeded + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.rebalance_total + description: The total number of successful rebalance events, each event is + composed of several failed re-trials until it succeeded + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.records_consumed_rate + description: The average number of records consumed per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: topic + type: STRING + - name: kafka.consumer.records_consumed_total + description: The total number of records consumed + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: topic + type: STRING + - name: kafka.consumer.records_lag + description: The latest lag of the partition + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: partition + type: STRING + - name: topic + type: STRING + - name: kafka.consumer.records_lag_avg + description: The average lag of the partition + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: partition + type: STRING + - name: topic + type: STRING + - name: kafka.consumer.records_lag_max + description: The maximum lag in terms of number of records for any partition + in this window + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: partition + type: STRING + - name: topic + type: STRING + - name: kafka.consumer.records_lead + description: The latest lead of the partition + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: partition + type: STRING + - name: topic + type: STRING + - name: kafka.consumer.records_lead_avg + description: The average lead of the partition + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: partition + type: STRING + - name: topic + type: STRING + - name: kafka.consumer.records_lead_min + description: The minimum lead in terms of number of records for any partition + in this window + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: partition + type: STRING + - name: topic + type: STRING + - name: kafka.consumer.records_per_request_avg + description: The average number of records in each request + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: topic + type: STRING + - name: kafka.consumer.request_rate + description: The number of requests sent per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.consumer.request_size_avg + description: The average size of requests sent. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.consumer.request_size_max + description: The maximum size of any request sent. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.consumer.request_total + description: The total number of requests sent + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.consumer.response_rate + description: The number of responses received per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.consumer.response_total + description: The total number of responses received + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.consumer.select_rate + description: The number of times the I/O layer checked for new I/O to perform + per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.select_total + description: The total number of times the I/O layer checked for new I/O to + perform + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.successful_authentication_no_reauth_total + description: The total number of connections with successful authentication + where the client does not support re-authentication + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.successful_authentication_rate + description: The number of connections with successful authentication per + second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.successful_authentication_total + description: The total number of connections with successful authentication + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.successful_reauthentication_rate + description: The number of successful re-authentication of connections per + second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.successful_reauthentication_total + description: The total number of successful re-authentication of connections + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.sync_rate + description: The number of group syncs per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.sync_time_avg + description: The average time taken for a group sync + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.sync_time_max + description: The max time taken for a group sync + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.sync_total + description: The total number of group syncs + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.time_between_poll_avg + description: The average delay between invocations of poll(). + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.consumer.time_between_poll_max + description: The max delay between invocations of poll(). + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.batch_size_avg + description: The average number of bytes sent per partition per-request. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.batch_size_max + description: The max number of bytes sent per partition per-request. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.batch_split_rate + description: The average number of batch splits per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.batch_split_total + description: The total number of batch splits + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.buffer_available_bytes + description: The total amount of buffer memory that is not being used (either + unallocated or in the free list). + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.buffer_exhausted_rate + description: The average per-second number of record sends that are dropped + due to buffer exhaustion + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.buffer_exhausted_total + description: The total number of record sends that are dropped due to buffer + exhaustion + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.buffer_total_bytes + description: The maximum amount of buffer memory the client can use (whether + or not it is currently used). + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.bufferpool_wait_ratio + description: The fraction of time an appender waits for space allocation. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.bufferpool_wait_time_total + description: The total time an appender waits for space allocation. + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.byte_rate + description: The average number of bytes sent per second for a topic. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: topic + type: STRING + - name: kafka.producer.byte_total + description: The total number of bytes sent for a topic. + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: topic + type: STRING + - name: kafka.producer.compression_rate + description: The average compression rate of record batches for a topic. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: topic + type: STRING + - name: kafka.producer.compression_rate_avg + description: The average compression rate of record batches. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.connection_close_rate + description: The number of connections closed per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.connection_close_total + description: The total number of connections closed + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.connection_count + description: The current number of active connections. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.connection_creation_rate + description: The number of new connections established per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.connection_creation_total + description: The total number of new connections established + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.failed_authentication_rate + description: The number of connections with failed authentication per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.failed_authentication_total + description: The total number of connections with failed authentication + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.failed_reauthentication_rate + description: The number of failed re-authentication of connections per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.failed_reauthentication_total + description: The total number of failed re-authentication of connections + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.incoming_byte_rate + description: The number of bytes read off all sockets per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.producer.incoming_byte_total + description: The total number of bytes read off all sockets + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.producer.io_ratio + description: The fraction of time the I/O thread spent doing I/O + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.io_time_ns_avg + description: The average length of time for I/O per select call in nanoseconds. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.io_wait_ratio + description: The fraction of time the I/O thread spent waiting + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.io_wait_time_ns_avg + description: The average length of time the I/O thread spent waiting for a + socket ready for reads or writes in nanoseconds. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.io_waittime_total + description: The total time the I/O thread spent waiting + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.iotime_total + description: The total time the I/O thread spent doing I/O + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.metadata_age + description: The age in seconds of the current producer metadata being used. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.network_io_rate + description: The number of network operations (reads or writes) on all connections + per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.network_io_total + description: The total number of network operations (reads or writes) on all + connections + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.outgoing_byte_rate + description: The number of outgoing bytes sent to all servers per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.producer.outgoing_byte_total + description: The total number of outgoing bytes sent to all servers + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.producer.produce_throttle_time_avg + description: The average time in ms a request was throttled by a broker + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.produce_throttle_time_max + description: The maximum time in ms a request was throttled by a broker + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.record_error_rate + description: The average per-second number of record sends that resulted in + errors + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: topic + type: STRING + - name: kafka.producer.record_error_total + description: The total number of record sends that resulted in errors + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: topic + type: STRING + - name: kafka.producer.record_queue_time_avg + description: The average time in ms record batches spent in the send buffer. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.record_queue_time_max + description: The maximum time in ms record batches spent in the send buffer. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.record_retry_rate + description: The average per-second number of retried record sends + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: topic + type: STRING + - name: kafka.producer.record_retry_total + description: The total number of retried record sends + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: topic + type: STRING + - name: kafka.producer.record_send_rate + description: The average number of records sent per second. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: topic + type: STRING + - name: kafka.producer.record_send_total + description: The total number of records sent. + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: topic + type: STRING + - name: kafka.producer.record_size_avg + description: The average record size + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.record_size_max + description: The maximum record size + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.records_per_request_avg + description: The average number of records per request. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.request_latency_avg + description: The average request latency in ms + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.producer.request_latency_max + description: The maximum request latency in ms + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.producer.request_rate + description: The number of requests sent per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.producer.request_size_avg + description: The average size of requests sent. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.producer.request_size_max + description: The maximum size of any request sent. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.producer.request_total + description: The total number of requests sent + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.producer.requests_in_flight + description: The current number of in-flight requests awaiting a response. + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.response_rate + description: The number of responses received per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.producer.response_total + description: The total number of responses received + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: node-id + type: STRING + - name: kafka.producer.select_rate + description: The number of times the I/O layer checked for new I/O to perform + per second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.select_total + description: The total number of times the I/O layer checked for new I/O to + perform + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.successful_authentication_no_reauth_total + description: The total number of connections with successful authentication + where the client does not support re-authentication + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.successful_authentication_rate + description: The number of connections with successful authentication per + second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.successful_authentication_total + description: The total number of connections with successful authentication + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.successful_reauthentication_rate + description: The number of successful re-authentication of connections per + second + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.successful_reauthentication_total + description: The total number of successful re-authentication of connections + type: DOUBLE_SUM + unit: null + attributes: + - name: client-id + type: STRING + - name: kafka.producer.waiting_threads + description: The number of user threads blocked waiting for buffer memory + to enqueue their records + type: DOUBLE_GAUGE + unit: null + attributes: + - name: client-id + type: STRING + spans: + - span_kind: CONSUMER + attributes: + - name: kafka.record.queue_time_ms + type: LONG + - name: messaging.batch.message_count + type: LONG + - name: messaging.client_id + type: STRING + - name: messaging.destination.name + type: STRING + - name: messaging.destination.partition.id + type: STRING + - name: messaging.kafka.consumer.group + type: STRING + - name: messaging.kafka.message.offset + type: LONG + - name: messaging.message.body.size + type: LONG + - name: messaging.operation + type: STRING + - name: messaging.system + type: STRING + - span_kind: PRODUCER + attributes: + - name: messaging.client_id + type: STRING + - name: messaging.destination.name + type: STRING + - name: messaging.destination.partition.id + type: STRING + - name: messaging.kafka.message.offset + type: LONG + - name: messaging.operation + type: STRING + - name: messaging.system + type: STRING + - name: kafka-connect-2.6 + display_name: Apache Kafka Connect + description: This instrumentation enables messaging spans for Kafka Connect sink + tasks. + semantic_conventions: + - MESSAGING_SPANS + library_link: https://kafka.apache.org/documentation/#connect + source_path: instrumentation/kafka/kafka-connect-2.6 + scope: + name: io.opentelemetry.kafka-connect-2.6 + target_versions: + javaagent: + - org.apache.kafka:connect-api:[2.6.0,) + telemetry: + - when: default + spans: + - span_kind: CONSUMER + attributes: + - name: messaging.batch.message_count + type: LONG + - name: messaging.destination.name + type: STRING + - name: messaging.operation + type: STRING + - name: messaging.system + type: STRING + - name: thread.id + type: LONG + - name: thread.name + type: STRING + - name: kafka-streams-0.11 + display_name: Apache Kafka Streams + description: This instrumentation enables messaging spans for Kafka Streams processing. + semantic_conventions: + - MESSAGING_SPANS + library_link: https://kafka.apache.org/documentation/streams/ + source_path: instrumentation/kafka/kafka-streams-0.11 + scope: + name: io.opentelemetry.kafka-streams-0.11 + target_versions: + javaagent: + - org.apache.kafka:kafka-streams:[0.11.0.0,) + configurations: + - name: otel.instrumentation.kafka.experimental-span-attributes + description: Enables the capture of the experimental consumer attribute `kafka.record.queue_time_ms`. + type: boolean + default: false + - name: otel.instrumentation.messaging.experimental.capture-headers + description: A comma-separated list of header names to capture as span attributes. + type: list + default: '' + - name: otel.instrumentation.messaging.experimental.receive-telemetry.enabled + description: | + Enables experimental receive telemetry, which will cause consumers to start a new trace, with only a span link connecting it to the producer trace. + type: boolean + default: false + telemetry: + - when: default + spans: + - span_kind: CONSUMER + attributes: + - name: asdf + type: STRING + - name: messaging.client_id + type: STRING + - name: messaging.destination.name + type: STRING + - name: messaging.destination.partition.id + type: STRING + - name: messaging.kafka.consumer.group + type: STRING + - name: messaging.kafka.message.key + type: STRING + - name: messaging.kafka.message.offset + type: LONG + - name: messaging.message.body.size + type: LONG + - name: messaging.operation + type: STRING + - name: messaging.system + type: STRING + - when: otel.instrumentation.kafka.experimental-span-attributes=true + spans: + - span_kind: CONSUMER + attributes: + - name: asdf + type: STRING + - name: kafka.record.queue_time_ms + type: LONG + - name: messaging.client_id + type: STRING + - name: messaging.destination.name + type: STRING + - name: messaging.destination.partition.id + type: STRING + - name: messaging.kafka.consumer.group + type: STRING + - name: messaging.kafka.message.key + type: STRING + - name: messaging.kafka.message.offset + type: LONG + - name: messaging.message.body.size + type: LONG + - name: messaging.operation + type: STRING + - name: messaging.system + type: STRING kotlinx: - name: kotlinx-coroutines-1.0 source_path: instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0 diff --git a/instrumentation-docs/ci-collect.sh b/instrumentation-docs/ci-collect.sh index dec93c26ea62..1d3414d0f93a 100755 --- a/instrumentation-docs/ci-collect.sh +++ b/instrumentation-docs/ci-collect.sh @@ -8,6 +8,7 @@ set -euo pipefail # shellcheck source=instrumentation-docs/instrumentations.sh source "$(dirname "$0")/instrumentations.sh" +# Collect standard and colima tasks (without testLatestDeps) ALL_TASKS=() for task in "${INSTRUMENTATIONS[@]}"; do ALL_TASKS+=(":instrumentation:${task}") @@ -16,8 +17,23 @@ for task in "${COLIMA_INSTRUMENTATIONS[@]}"; do ALL_TASKS+=(":instrumentation:${task}") done -echo "Processing instrumentations..." +echo "Processing standard instrumentations..." ./gradlew "${ALL_TASKS[@]}" \ -PcollectMetadata=true \ --rerun-tasks --continue + +# Collect and run tasks that need testLatestDeps +LATEST_DEPS_TASKS=() +for task in "${TEST_LATEST_DEPS_INSTRUMENTATIONS[@]}"; do + LATEST_DEPS_TASKS+=(":instrumentation:${task}") +done + +if [[ ${#LATEST_DEPS_TASKS[@]} -gt 0 ]]; then + echo "Processing instrumentations with -PtestLatestDeps=true..." + ./gradlew "${LATEST_DEPS_TASKS[@]}" \ + -PcollectMetadata=true \ + -PtestLatestDeps=true \ + --rerun-tasks --continue +fi + echo "Telemetry file regeneration complete." diff --git a/instrumentation-docs/collect.sh b/instrumentation-docs/collect.sh index 130aaff2d908..34d92bebeb0f 100755 --- a/instrumentation-docs/collect.sh +++ b/instrumentation-docs/collect.sh @@ -134,6 +134,25 @@ run_gradle_tasks() { --rerun-tasks --continue --no-parallel } +run_gradle_tasks_with_latest_deps() { + local -a tasks=("$@") + + if [[ ${#tasks[@]} -eq 0 ]]; then + echo "No tasks to run" + return 0 + fi + + echo + echo "Running Gradle tasks with -PtestLatestDeps=true:" + printf ' %s\n' "${tasks[@]}" + echo + + ./gradlew "${tasks[@]}" \ + -PcollectMetadata=true \ + -PtestLatestDeps=true \ + --rerun-tasks --continue --no-parallel +} + # Cleans any stray .telemetry directories left in the repo. find_and_remove_all_telemetry() { echo "Removing stray .telemetry directories..." @@ -153,6 +172,14 @@ main() { done < <(process_descriptors "${INSTRUMENTATIONS[@]}") run_gradle_tasks "${gradle_tasks[@]}" + # Process instrumentations requiring testLatestDeps + echo "Processing instrumentations with -PtestLatestDeps=true..." + gradle_tasks=() + while IFS= read -r line; do + gradle_tasks+=("$line") + done < <(process_descriptors "${TEST_LATEST_DEPS_INSTRUMENTATIONS[@]}") + run_gradle_tasks_with_latest_deps "${gradle_tasks[@]}" + # Setup colima if needed setup_colima diff --git a/instrumentation-docs/instrumentations.sh b/instrumentation-docs/instrumentations.sh index bb624ca1a48c..0ee9ec03592d 100755 --- a/instrumentation-docs/instrumentations.sh +++ b/instrumentation-docs/instrumentations.sh @@ -146,6 +146,7 @@ readonly INSTRUMENTATIONS=( "jsf:jsf-mojarra-3.0:javaagent:test" "jsf:jsf-myfaces-1.2:javaagent:myfaces2Test" "jsf:jsf-myfaces-3.0:javaagent:test" + "kafka:kafka-clients:kafka-clients-2.6:library:test" "kafka:kafka-connect-2.6:testing:test" "nats:nats-2.17:javaagent:test" "nats:nats-2.17:javaagent:testExperimental" @@ -222,3 +223,13 @@ readonly COLIMA_INSTRUMENTATIONS=( "oracle-ucp-11.2:javaagent:testStableSemconv" "spring:spring-jms:spring-jms-6.0:javaagent:test" ) + +# Some instrumentation test suites need to run with -PtestLatestDeps=true to collect +# metrics telemetry or test against latest library versions. +# shellcheck disable=SC2034 +readonly TEST_LATEST_DEPS_INSTRUMENTATIONS=( + "kafka:kafka-clients:kafka-clients-0.11:javaagent:test" + "kafka:kafka-clients:kafka-clients-0.11:javaagent:testExperimental" + "kafka:kafka-streams-0.11:javaagent:test" + "kafka:kafka-streams-0.11:javaagent:testExperimental" +) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts index dc22e71a5d02..82df1bfccf66 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/build.gradle.kts @@ -30,9 +30,7 @@ tasks { usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean) - - // TODO run tests both with and without experimental span attributes - jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") + systemProperty("collectMetadata", findProperty("collectMetadata")?.toString() ?: "false") } val testPropagationDisabled by registering(Test::class) { @@ -54,6 +52,20 @@ tasks { include("**/KafkaClientSuppressReceiveSpansTest.*") } + val testExperimental by registering(Test::class) { + testClassesDirs = sourceSets.test.get().output.classesDirs + classpath = sourceSets.test.get().runtimeClasspath + + filter { + excludeTestsMatching("KafkaClientPropagationDisabledTest") + excludeTestsMatching("KafkaClientSuppressReceiveSpansTest") + } + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") + + jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") + systemProperty("metadataConfig", "otel.instrumentation.kafka.experimental-span-attributes=true") + } + test { filter { excludeTestsMatching("KafkaClientPropagationDisabledTest") @@ -63,7 +75,7 @@ tasks { } check { - dependsOn(testPropagationDisabled, testReceiveSpansDisabled) + dependsOn(testPropagationDisabled, testReceiveSpansDisabled, testExperimental) } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java index 6df0d51d53e8..61e2474a1f72 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientDefaultTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.ProducerRecord; +import org.assertj.core.api.AbstractIterableAssert; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -34,6 +35,9 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest { + private static final boolean testLatestDeps = + Boolean.parseBoolean(System.getProperty("testLatestDeps", "true")); + @RegisterExtension static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); @@ -110,6 +114,13 @@ void testKafkaProducerAndConsumerSpan(boolean testHeaders) throws Exception { .hasAttributesSatisfyingExactly( processAttributes("10", greeting, testHeaders, false)), span -> span.hasName("processing").hasParent(trace.getSpan(1)))); + + if (testLatestDeps) { + testing.waitAndAssertMetrics( + "io.opentelemetry.kafka-clients-0.11", + "kafka.producer.record_send_total", + AbstractIterableAssert::isNotEmpty); + } } @DisplayName("test pass through tombstone") @@ -155,6 +166,7 @@ void testPassThroughTombstone() processAttributes(null, null, false, false)))); } + @ParameterizedTest @DisplayName("test records(TopicPartition) kafka consume") @ValueSource(booleans = {true, false}) void testRecordsWithTopicPartitionKafkaConsume(boolean testListIterator) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/metadata.yaml b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/metadata.yaml index edf9dc017838..5956d37e36e3 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/metadata.yaml +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/metadata.yaml @@ -1,14 +1,15 @@ -description: > - This instrumentation enables messaging spans and metrics for Apache Kafka 0.11 clients. - It automatically traces message production and consumption, propagates context, and emits metrics for production and consumption. +description: This instrumentation enables messaging spans for Kafka producers and consumers, and collects internal Kafka client metrics. +display_name: Apache Kafka Client library_link: https://kafka.apache.org/ +semantic_conventions: + - MESSAGING_SPANS configurations: - name: otel.instrumentation.kafka.producer-propagation.enabled - description: Enable context propagation for kafka message producers. + description: Enable context propagation for Kafka message producers. type: boolean default: true - name: otel.instrumentation.kafka.experimental-span-attributes - description: Enables the capture of the experimental consumer attribute "kafka.record.queue_time_ms" + description: Enables the capture of the experimental consumer attribute `kafka.record.queue_time_ms`. type: boolean default: false - name: otel.instrumentation.messaging.experimental.capture-headers diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaClientBaseTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaClientBaseTest.java index 6c7a334a48ec..3d4c593d6d85 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaClientBaseTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafkaclients/common/v0_11/internal/KafkaClientBaseTest.java @@ -5,6 +5,7 @@ package io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal; +import static io.opentelemetry.api.common.AttributeKey.longKey; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT; @@ -17,6 +18,7 @@ import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; +import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; @@ -71,6 +73,9 @@ public abstract class KafkaClientBaseTest { protected Consumer consumer; private final CountDownLatch consumerReady = new CountDownLatch(1); + static final boolean isExperimentalEnabled = + Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes"); + public static final int partition = 0; public static final TopicPartition topicPartition = new TopicPartition(SHARED_TOPIC, partition); @@ -230,8 +235,11 @@ protected static List processAttributes( satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty), satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative), satisfies( - AttributeKey.longKey("kafka.record.queue_time_ms"), - AbstractLongAssert::isNotNegative))); + longKey("kafka.record.queue_time_ms"), + val -> + val.satisfiesAnyOf( + v -> assertThat(v).isNotNegative(), + v -> assertThat(isExperimentalEnabled).isFalse())))); // consumer group is not available in version 0.11 if (Boolean.getBoolean("testLatestDeps")) { assertions.add(equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test")); diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts index 35e5e0a2b40a..1927b001db07 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts @@ -20,6 +20,7 @@ tasks { withType().configureEach { usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean) + systemProperty("collectMetadata", findProperty("collectMetadata")?.toString() ?: "false") } test { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java index 402fada6c6ca..d824454cc4ba 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java @@ -32,6 +32,7 @@ abstract class AbstractInterceptorsTest extends KafkaClientBaseTest { public Map producerProps() { Map props = super.producerProps(); props.putAll(kafkaTelemetry().producerInterceptorConfigProperties()); + props.putAll(kafkaTelemetry().metricConfigProperties()); return props; } @@ -39,6 +40,7 @@ public Map producerProps() { public Map consumerProps() { Map props = super.consumerProps(); props.putAll(kafkaTelemetry().consumerInterceptorConfigProperties()); + props.putAll(kafkaTelemetry().metricConfigProperties()); return props; } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java index 3e42dce07622..d96b4d951295 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java @@ -25,6 +25,7 @@ import io.opentelemetry.sdk.trace.data.LinkData; import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicReference; +import org.assertj.core.api.AbstractIterableAssert; import org.assertj.core.api.AbstractLongAssert; import org.assertj.core.api.AbstractStringAssert; @@ -125,5 +126,10 @@ void assertTraces() { trace.hasSpansSatisfyingExactly( span -> span.hasName("producer callback").hasKind(SpanKind.INTERNAL).hasNoParent())); + + testing.waitAndAssertMetrics( + "io.opentelemetry.kafka-clients-2.6", + "kafka.producer.record_send_total", + AbstractIterableAssert::isNotEmpty); } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml index 85d9ffa83e6c..8ebe47ff61bf 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml @@ -1,14 +1,5 @@ -description: > - This instrumentation provides a library integration that enables messaging spans and metrics for Apache Kafka 2.6+ clients. +description: This standalone instrumentation enables messaging spans for Kafka producers and consumers, and collects internal Kafka client metrics. +display_name: Apache Kafka Client library_link: https://kafka.apache.org/ -configurations: - - name: otel.instrumentation.messaging.experimental.capture-headers - description: A comma-separated list of header names to capture as span attributes. - type: list - default: '' - - name: otel.instrumentation.messaging.experimental.receive-telemetry.enabled - description: > - Enables experimental receive telemetry, which will cause consumers to start a new trace, with - only a span link connecting it to the producer trace. - type: boolean - default: false +semantic_conventions: + - MESSAGING_SPANS diff --git a/instrumentation/kafka/kafka-connect-2.6/metadata.yaml b/instrumentation/kafka/kafka-connect-2.6/metadata.yaml index 824ac80c665f..2bbbf70b023b 100644 --- a/instrumentation/kafka/kafka-connect-2.6/metadata.yaml +++ b/instrumentation/kafka/kafka-connect-2.6/metadata.yaml @@ -1,4 +1,5 @@ -description: "This instrumentation enables messaging spans for Kafka Connect sink tasks." +description: This instrumentation enables messaging spans for Kafka Connect sink tasks. +display_name: Apache Kafka Connect +library_link: https://kafka.apache.org/documentation/#connect semantic_conventions: - MESSAGING_SPANS -library_link: https://kafka.apache.org/documentation/#connect \ No newline at end of file diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/build.gradle.kts b/instrumentation/kafka/kafka-streams-0.11/javaagent/build.gradle.kts index b525e5f0055b..558ec36c20ae 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/build.gradle.kts +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/build.gradle.kts @@ -28,9 +28,7 @@ tasks { usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service) systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean) - - // TODO run tests both with and without experimental span attributes - jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") + systemProperty("collectMetadata", findProperty("collectMetadata")?.toString() ?: "false") } val testReceiveSpansDisabled by registering(Test::class) { @@ -42,6 +40,19 @@ tasks { include("**/KafkaStreamsSuppressReceiveSpansTest.*") } + val testExperimental by registering(Test::class) { + testClassesDirs = sourceSets.test.get().output.classesDirs + classpath = sourceSets.test.get().runtimeClasspath + + filter { + excludeTestsMatching("KafkaStreamsSuppressReceiveSpansTest") + } + jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") + + jvmArgs("-Dotel.instrumentation.kafka.experimental-span-attributes=true") + systemProperty("metadataConfig", "otel.instrumentation.kafka.experimental-span-attributes=true") + } + test { filter { excludeTestsMatching("KafkaStreamsSuppressReceiveSpansTest") diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsBaseTest.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsBaseTest.java index 345761431682..db49bd484671 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsBaseTest.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsBaseTest.java @@ -60,6 +60,9 @@ abstract class KafkaStreamsBaseTest { static Consumer consumer; static CountDownLatch consumerReady = new CountDownLatch(1); + protected static final boolean isExperimental = + Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes"); + @BeforeAll static void setup() throws ExecutionException, InterruptedException, TimeoutException { kafka = diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDefaultTest.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDefaultTest.java index 930b639a0e18..66c41ac20641 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDefaultTest.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsDefaultTest.java @@ -156,10 +156,14 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception { k -> k.isInstanceOf(String.class)), equalTo(MESSAGING_KAFKA_MESSAGE_OFFSET, 0), equalTo(MESSAGING_KAFKA_MESSAGE_KEY, "10"), - satisfies( - longKey("kafka.record.queue_time_ms"), - k -> k.isGreaterThanOrEqualTo(0)), equalTo(stringKey("asdf"), "testing"))); + + if (isExperimental) { + assertions.add( + satisfies( + longKey("kafka.record.queue_time_ms"), k -> k.isGreaterThanOrEqualTo(0))); + } + if (Boolean.getBoolean("testLatestDeps")) { assertions.add(equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test-application")); } @@ -224,10 +228,14 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception { k -> k.isInstanceOf(String.class)), equalTo(MESSAGING_KAFKA_MESSAGE_OFFSET, 0), equalTo(MESSAGING_KAFKA_MESSAGE_KEY, "10"), - satisfies( - longKey("kafka.record.queue_time_ms"), - k -> k.isGreaterThanOrEqualTo(0)), equalTo(longKey("testing"), 123))); + if (isExperimental) { + assertions.add( + satisfies( + longKey("kafka.record.queue_time_ms"), + k -> k.isGreaterThanOrEqualTo(0))); + } + if (Boolean.getBoolean("testLatestDeps")) { assertions.add(equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test")); } diff --git a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSuppressReceiveSpansTest.java b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSuppressReceiveSpansTest.java index 1d66b2a5abd4..0c9f4cb2faaa 100644 --- a/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSuppressReceiveSpansTest.java +++ b/instrumentation/kafka/kafka-streams-0.11/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSuppressReceiveSpansTest.java @@ -124,10 +124,13 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception { k -> k.isInstanceOf(String.class)), equalTo(MESSAGING_KAFKA_MESSAGE_OFFSET, 0), equalTo(MESSAGING_KAFKA_MESSAGE_KEY, "10"), - satisfies( - longKey("kafka.record.queue_time_ms"), - k -> k.isGreaterThanOrEqualTo(0)), equalTo(stringKey("asdf"), "testing"))); + if (isExperimental) { + assertions.add( + satisfies( + longKey("kafka.record.queue_time_ms"), + k -> k.isGreaterThanOrEqualTo(0))); + } if (Boolean.getBoolean("testLatestDeps")) { assertions.add(equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test-application")); } @@ -170,10 +173,13 @@ void testKafkaProduceAndConsumeWithStreamsInBetween() throws Exception { k -> k.isInstanceOf(String.class)), equalTo(MESSAGING_KAFKA_MESSAGE_OFFSET, 0), equalTo(MESSAGING_KAFKA_MESSAGE_KEY, "10"), - satisfies( - longKey("kafka.record.queue_time_ms"), - k -> k.isGreaterThanOrEqualTo(0)), equalTo(longKey("testing"), 123))); + if (isExperimental) { + assertions.add( + satisfies( + longKey("kafka.record.queue_time_ms"), + k -> k.isGreaterThanOrEqualTo(0))); + } if (Boolean.getBoolean("testLatestDeps")) { assertions.add(equalTo(MESSAGING_KAFKA_CONSUMER_GROUP, "test")); } diff --git a/instrumentation/kafka/kafka-streams-0.11/metadata.yaml b/instrumentation/kafka/kafka-streams-0.11/metadata.yaml index 7e93d3199ffe..c40c01df5949 100644 --- a/instrumentation/kafka/kafka-streams-0.11/metadata.yaml +++ b/instrumentation/kafka/kafka-streams-0.11/metadata.yaml @@ -1 +1,20 @@ +description: This instrumentation enables messaging spans for Kafka Streams processing. +display_name: Apache Kafka Streams library_link: https://kafka.apache.org/documentation/streams/ +semantic_conventions: + - MESSAGING_SPANS +configurations: + - name: otel.instrumentation.kafka.experimental-span-attributes + description: Enables the capture of the experimental consumer attribute `kafka.record.queue_time_ms`. + type: boolean + default: false + - name: otel.instrumentation.messaging.experimental.capture-headers + description: A comma-separated list of header names to capture as span attributes. + type: list + default: '' + - name: otel.instrumentation.messaging.experimental.receive-telemetry.enabled + description: > + Enables experimental receive telemetry, which will cause consumers to start a new trace, with + only a span link connecting it to the producer trace. + type: boolean + default: false