Skip to content

Conversation

vasantteja
Copy link
Contributor

@vasantteja vasantteja commented Aug 20, 2025

Resolves 12322 and 12261 and adds instrumentation for kafka connect's sink connector.

@vasantteja vasantteja requested a review from a team as a code owner August 20, 2025 21:49
Copy link
Contributor

@laurit laurit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that this instrumentation only creates a span that is linked to the producer when the message is consumed by the sink. Or does it do something else like allow for the trace to propagate to where the message is read from where the sink sent it? I don't know anything about kafka connect so sorry if this didn't make sense.

@vasantteja
Copy link
Contributor Author

vasantteja commented Aug 28, 2025

Do I understand correctly that this instrumentation only creates a span that is linked to the producer when the message is consumed by the sink. Or does it do something else like allow for the trace to propagate to where the message is read from where the sink sent it? I don't know anything about kafka connect so sorry if this didn't make sense.

@laurit Great question!! Yes, this instrumentation creates a span that is linked to the producer when the message is consumed by the sink. However, it depends on the specific connector implementation when it comes to automatic trace propagation to downstream databases. If we have instrumentation for the database operation that the connector uses, automatic trace propagation happens; if not, the trace is not propagated beyond the Kafka Connect span.

Examples:

JDBC Kafka Connector: Uses PreparedStatement.executeBatch(). Since PreparedStatement.executeBatch() is explicitly excluded from JDBC instrumentation, we don't see a parent-child relationship between the Kafka Connect span and the database operation.

MongoDB Kafka Connector: Uses collection.bulkWrite(). We don't have instrumentation for the bulkWrite() function (MongoDB instrumentation only covers wire protocol commands like insert, update, delete), hence we will not see parent-child relationships between the Kafka Connect span and the resulting MongoDB spans.

Cosmos DB Connector: Uses Cosmos DB SDK calls. Since Cosmos DB doesn't have OpenTelemetry instrumentation, the trace stops at the Kafka Connect span, but span links to producers are preserved.

Summary: This instrumentation provides span linking (connecting to producer spans) and sets up the foundation for trace propagation by making the Kafka Connect span the active context. Whether downstream spans are created depends entirely on whether the specific database operations used by each connector are instrumented by OpenTelemetry.

Let me know if this answers your question!!

@laurit
Copy link
Contributor

laurit commented Sep 2, 2025

@laurit Great question!! Yes, this instrumentation creates a span that is linked to the producer when the message is consumed by the sink. However, it depends on the specific connector implementation when it comes to automatic trace propagation to downstream databases. If we have instrumentation for the database operation that the connector uses, automatic trace propagation happens; if not, the trace is not propagated beyond the Kafka Connect span.

Generally we don't expect databases to propagate context. Even when the context is propagated to the database the intention is to tag the database query. For example context propagation to sql database could be used to get the trace id for a slow query so you could track down what executed that query.

Examples:

JDBC Kafka Connector: Uses PreparedStatement.executeBatch(). Since PreparedStatement.executeBatch() is explicitly excluded from JDBC instrumentation, we don't see a parent-child relationship between the Kafka Connect span and the database operation.

You are misreading this. https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/jdbc/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jdbc/PreparedStatementInstrumentation.java#L58 excludes these methods because they are instrumented by

Summary: This instrumentation provides span linking (connecting to producer spans) and sets up the foundation for trace propagation by making the Kafka Connect span the active context. Whether downstream spans are created depends entirely on whether the specific database operations used by each connector are instrumented by OpenTelemetry.

I'm wondering why you chose to create an internal span, wouldn't a consumer span be more appropriate?

@vasantteja
Copy link
Contributor Author

@laurit Great question!! Yes, this instrumentation creates a span that is linked to the producer when the message is consumed by the sink. However, it depends on the specific connector implementation when it comes to automatic trace propagation to downstream databases. If we have instrumentation for the database operation that the connector uses, automatic trace propagation happens; if not, the trace is not propagated beyond the Kafka Connect span.

Generally we don't expect databases to propagate context. Even when the context is propagated to the database the intention is to tag the database query. For example context propagation to sql database could be used to get the trace id for a slow query so you could track down what executed that query.

Examples:

JDBC Kafka Connector: Uses PreparedStatement.executeBatch(). Since PreparedStatement.executeBatch() is explicitly excluded from JDBC instrumentation, we don't see a parent-child relationship between the Kafka Connect span and the database operation.

You are misreading this. https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/jdbc/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/jdbc/PreparedStatementInstrumentation.java#L58 excludes these methods because they are instrumented by

Summary: This instrumentation provides span linking (connecting to producer spans) and sets up the foundation for trace propagation by making the Kafka Connect span the active context. Whether downstream spans are created depends entirely on whether the specific database operations used by each connector are instrumented by OpenTelemetry.

I'm wondering why you chose to create an internal span, wouldn't a consumer span be more appropriate?

@laurit Thanks for the reply!! Appreciate the help and thanks for correcting me.

There are three things which I want to breakdown here:

  1. Thanks for the context on the database part. It makes sense now. So I believe if we link the producer trace with kafka-connect we are good.
  2. I am sorry I was mistaken on the JDBC instrumentation. In fact, I wrote a test to confirm if it's being instrumented or not and it looks like it's being instrumented. Here are the logs from the container showing the complete trace flow:
[otel.javaagent 2025-09-02 22:14:56:041 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.exporter.logging.LoggingSpanExporter - 'SELECT test' : faa1a2f506b412fc46b6daf1c8bb9bce 36869b3ce35d42f3 CLIENT [tracer: io.opentelemetry.jdbc:2.20.0-alpha-SNAPSHOT] AttributesMap{data={thread.id=129, db.operation=SELECT, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=SELECT NULL AS TABLE_CAT, n.nspname AS TABLE_SCHEM, c.relname AS TABLE_NAME, CASE n.nspname ~ ? OR n.nspname = ? WHEN true THEN CASE WHEN n.nspname = ? OR n.nspname = ? THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END WHEN n.nspname = ? THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END ELSE CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END END WHEN false THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? then ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END ELSE NULL END AS TABLE_TYPE, d.description AS REMARKS, ? as TYPE_CAT, ? as TYPE_SCHEM, ? as TYPE_NAME, ? AS SELF_REFERENCING_COL_NAME, ? AS REF_GENERATION FROM pg_catalog.pg_namespace n, pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_description d ON (c.oid = d.objoid AND d.objsubid = ? and d.classoid = ?::regclass) WHERE c.relnamespace = n.oid AND c.relname LIKE ? AND (false OR ( c.relkind = ? AND n.nspname !~ ? AND n.nspname <> ? ) ) ORDER BY TABLE_TYPE,TABLE_SCHEM,TABLE_NAME }, capacity=128, totalAddedValues=10}
[otel.javaagent 2025-09-02 22:14:56:041 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.javaagent.testing.exporter.OtlpInMemorySpanExporter - Exporting span SpanData{spanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=36869b3ce35d42f3, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=ebe3a0102b0469cf, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, resource=Resource{schemaUrl=null, attributes={service.instance.id="5b2f150f-21b3-452e-b15d-5af413ba2da8", service.name="unknown_service:java", telemetry.distro.name="opentelemetry-java-instrumentation", telemetry.distro.version="2.20.0-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.53.0"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.opentelemetry.jdbc, version=2.20.0-alpha-SNAPSHOT, schemaUrl=null, attributes={}}, name=SELECT test, kind=CLIENT, startEpochNanos=1756851296025217709, endEpochNanos=1756851296040659750, attributes=AttributesMap{data={thread.id=129, db.operation=SELECT, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=SELECT NULL AS TABLE_CAT, n.nspname AS TABLE_SCHEM, c.relname AS TABLE_NAME, CASE n.nspname ~ ? OR n.nspname = ? WHEN true THEN CASE WHEN n.nspname = ? OR n.nspname = ? THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END WHEN n.nspname = ? THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END ELSE CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END END WHEN false THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? then ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END ELSE NULL END AS TABLE_TYPE, d.description AS REMARKS, ? as TYPE_CAT, ? as TYPE_SCHEM, ? as TYPE_NAME, ? AS SELF_REFERENCING_COL_NAME, ? AS REF_GENERATION FROM pg_catalog.pg_namespace n, pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_description d ON (c.oid = d.objoid AND d.objsubid = ? and d.classoid = ?::regclass) WHERE c.relnamespace = n.oid AND c.relname LIKE ? AND (false OR ( c.relkind = ? AND n.nspname !~ ? AND n.nspname <> ? ) ) ORDER BY TABLE_TYPE,TABLE_SCHEM,TABLE_NAME }, capacity=128, totalAddedValues=10}, totalAttributeCount=10, events=[], totalRecordedEvents=0, links=[], totalRecordedLinks=0, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}
[2025-09-02 22:14:56,041] INFO Using PostgreSql dialect TABLE "person" present (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[otel.javaagent 2025-09-02 22:14:56:049 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.exporter.logging.LoggingSpanExporter - 'SELECT test' : faa1a2f506b412fc46b6daf1c8bb9bce 6d767da1a2b935f7 CLIENT [tracer: io.opentelemetry.jdbc:2.20.0-alpha-SNAPSHOT] AttributesMap{data={thread.id=129, db.operation=SELECT, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=SELECT result.TABLE_CAT, result.TABLE_SCHEM, result.TABLE_NAME, result.COLUMN_NAME, result.KEY_SEQ, result.PK_NAME FROM (SELECT NULL AS TABLE_CAT, n.nspname AS TABLE_SCHEM, ct.relname AS TABLE_NAME, a.attname AS COLUMN_NAME, (information_schema._pg_expandarray(i.indkey)).n AS KEY_SEQ, ci.relname AS PK_NAME, information_schema._pg_expandarray(i.indkey) AS KEYS, a.attnum AS A_ATTNUM FROM pg_catalog.pg_class ct JOIN pg_catalog.pg_attribute a ON (ct.oid = a.attrelid) JOIN pg_catalog.pg_namespace n ON (ct.relnamespace = n.oid) JOIN pg_catalog.pg_index i ON ( a.attrelid = i.indrelid) JOIN pg_catalog.pg_class ci ON (ci.oid = i.indexrelid) WHERE true AND ct.relname = ? AND i.indisprimary ) result where result.A_ATTNUM = (result.KEYS).x ORDER BY result.table_name, result.pk_name, result.key_seq}, capacity=128, totalAddedValues=10}
[otel.javaagent 2025-09-02 22:14:56:050 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.javaagent.testing.exporter.OtlpInMemorySpanExporter - Exporting span SpanData{spanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=6d767da1a2b935f7, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=ebe3a0102b0469cf, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, resource=Resource{schemaUrl=null, attributes={service.instance.id="5b2f150f-21b3-452e-b15d-5af413ba2da8", service.name="unknown_service:java", telemetry.distro.name="opentelemetry-java-instrumentation", telemetry.distro.version="2.20.0-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.53.0"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.opentelemetry.jdbc, version=2.20.0-alpha-SNAPSHOT, schemaUrl=null, attributes={}}, name=SELECT test, kind=CLIENT, startEpochNanos=1756851296042146375, endEpochNanos=1756851296049224084, attributes=AttributesMap{data={thread.id=129, db.operation=SELECT, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=SELECT result.TABLE_CAT, result.TABLE_SCHEM, result.TABLE_NAME, result.COLUMN_NAME, result.KEY_SEQ, result.PK_NAME FROM (SELECT NULL AS TABLE_CAT, n.nspname AS TABLE_SCHEM, ct.relname AS TABLE_NAME, a.attname AS COLUMN_NAME, (information_schema._pg_expandarray(i.indkey)).n AS KEY_SEQ, ci.relname AS PK_NAME, information_schema._pg_expandarray(i.indkey) AS KEYS, a.attnum AS A_ATTNUM FROM pg_catalog.pg_class ct JOIN pg_catalog.pg_attribute a ON (ct.oid = a.attrelid) JOIN pg_catalog.pg_namespace n ON (ct.relnamespace = n.oid) JOIN pg_catalog.pg_index i ON ( a.attrelid = i.indrelid) JOIN pg_catalog.pg_class ci ON (ci.oid = i.indexrelid) WHERE true AND ct.relname = ? AND i.indisprimary ) result where result.A_ATTNUM = (result.KEYS).x ORDER BY result.table_name, result.pk_name, result.key_seq}, capacity=128, totalAddedValues=10}, totalAttributeCount=10, events=[], totalRecordedEvents=0, links=[], totalRecordedLinks=0, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}
[otel.javaagent 2025-09-02 22:14:56:074 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.exporter.logging.LoggingSpanExporter - 'SELECT test' : faa1a2f506b412fc46b6daf1c8bb9bce bc140d85a1f10b67 CLIENT [tracer: io.opentelemetry.jdbc:2.20.0-alpha-SNAPSHOT] AttributesMap{data={thread.id=129, db.operation=SELECT, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=SELECT * FROM (SELECT n.nspname,c.relname,a.attname,a.atttypid,a.attnotnull OR (t.typtype = ? AND t.typnotnull) AS attnotnull,a.atttypmod,a.attlen,t.typtypmod,row_number() OVER (PARTITION BY a.attrelid ORDER BY a.attnum) AS attnum, nullif(a.attidentity, ?) as attidentity,null as attgenerated,pg_catalog.pg_get_expr(def.adbin, def.adrelid) AS adsrc,dsc.description,t.typbasetype,t.typtype FROM pg_catalog.pg_namespace n JOIN pg_catalog.pg_class c ON (c.relnamespace = n.oid) JOIN pg_catalog.pg_attribute a ON (a.attrelid=c.oid) JOIN pg_catalog.pg_type t ON (a.atttypid = t.oid) LEFT JOIN pg_catalog.pg_attrdef def ON (a.attrelid=def.adrelid AND a.attnum = def.adnum) LEFT JOIN pg_catalog.pg_description dsc ON (c.oid=dsc.objoid AND a.attnum = dsc.objsubid) LEFT JOIN pg_catalog.pg_class dc ON (dc.oid=dsc.classoid AND dc.relname=?) LEFT JOIN pg_catalog.pg_namespace dn ON (dc.relnamespace=dn.oid AND dn.nspname=?) WHERE c.relkind in (?) and a.attnum > ? AND NOT a.attisdropped AND c.relname LIKE ?) c WHERE true ORDER BY nspname,c.relname,attnum }, capacity=128, totalAddedValues=10}
[otel.javaagent 2025-09-02 22:14:56:074 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.javaagent.testing.exporter.OtlpInMemorySpanExporter - Exporting span SpanData{spanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=bc140d85a1f10b67, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=ebe3a0102b0469cf, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, resource=Resource{schemaUrl=null, attributes={service.instance.id="5b2f150f-21b3-452e-b15d-5af413ba2da8", service.name="unknown_service:java", telemetry.distro.name="opentelemetry-java-instrumentation", telemetry.distro.version="2.20.0-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.53.0"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.opentelemetry.jdbc, version=2.20.0-alpha-SNAPSHOT, schemaUrl=null, attributes={}}, name=SELECT test, kind=CLIENT, startEpochNanos=1756851296053238250, endEpochNanos=1756851296073618584, attributes=AttributesMap{data={thread.id=129, db.operation=SELECT, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=SELECT * FROM (SELECT n.nspname,c.relname,a.attname,a.atttypid,a.attnotnull OR (t.typtype = ? AND t.typnotnull) AS attnotnull,a.atttypmod,a.attlen,t.typtypmod,row_number() OVER (PARTITION BY a.attrelid ORDER BY a.attnum) AS attnum, nullif(a.attidentity, ?) as attidentity,null as attgenerated,pg_catalog.pg_get_expr(def.adbin, def.adrelid) AS adsrc,dsc.description,t.typbasetype,t.typtype FROM pg_catalog.pg_namespace n JOIN pg_catalog.pg_class c ON (c.relnamespace = n.oid) JOIN pg_catalog.pg_attribute a ON (a.attrelid=c.oid) JOIN pg_catalog.pg_type t ON (a.atttypid = t.oid) LEFT JOIN pg_catalog.pg_attrdef def ON (a.attrelid=def.adrelid AND a.attnum = def.adnum) LEFT JOIN pg_catalog.pg_description dsc ON (c.oid=dsc.objoid AND a.attnum = dsc.objsubid) LEFT JOIN pg_catalog.pg_class dc ON (dc.oid=dsc.classoid AND dc.relname=?) LEFT JOIN pg_catalog.pg_namespace dn ON (dc.relnamespace=dn.oid AND dn.nspname=?) WHERE c.relkind in (?) and a.attnum > ? AND NOT a.attisdropped AND c.relname LIKE ?) c WHERE true ORDER BY nspname,c.relname,attnum }, capacity=128, totalAddedValues=10}, totalAttributeCount=10, events=[], totalRecordedEvents=0, links=[], totalRecordedLinks=0, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}
[2025-09-02 22:14:56,082] INFO Checking PostgreSql dialect for type of TABLE "person" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[otel.javaagent 2025-09-02 22:14:56:086 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.exporter.logging.LoggingSpanExporter - 'SELECT test' : faa1a2f506b412fc46b6daf1c8bb9bce 501aa90eb31583b9 CLIENT [tracer: io.opentelemetry.jdbc:2.20.0-alpha-SNAPSHOT] AttributesMap{data={thread.id=129, db.operation=SELECT, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=SELECT NULL AS TABLE_CAT, n.nspname AS TABLE_SCHEM, c.relname AS TABLE_NAME, CASE n.nspname ~ ? OR n.nspname = ? WHEN true THEN CASE WHEN n.nspname = ? OR n.nspname = ? THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END WHEN n.nspname = ? THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END ELSE CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END END WHEN false THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? then ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END ELSE NULL END AS TABLE_TYPE, d.description AS REMARKS, ? as TYPE_CAT, ? as TYPE_SCHEM, ? as TYPE_NAME, ? AS SELF_REFERENCING_COL_NAME, ? AS REF_GENERATION FROM pg_catalog.pg_namespace n, pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_description d ON (c.oid = d.objoid AND d.objsubid = ? and d.classoid = ?::regclass) WHERE c.relnamespace = n.oid AND c.relname LIKE ? AND (false OR ( c.relkind = ? AND n.nspname !~ ? AND n.nspname <> ? ) ) ORDER BY TABLE_TYPE,TABLE_SCHEM,TABLE_NAME }, capacity=128, totalAddedValues=10}
[otel.javaagent 2025-09-02 22:14:56:087 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.javaagent.testing.exporter.OtlpInMemorySpanExporter - Exporting span SpanData{spanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=501aa90eb31583b9, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=ebe3a0102b0469cf, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, resource=Resource{schemaUrl=null, attributes={service.instance.id="5b2f150f-21b3-452e-b15d-5af413ba2da8", service.name="unknown_service:java", telemetry.distro.name="opentelemetry-java-instrumentation", telemetry.distro.version="2.20.0-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.53.0"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.opentelemetry.jdbc, version=2.20.0-alpha-SNAPSHOT, schemaUrl=null, attributes={}}, name=SELECT test, kind=CLIENT, startEpochNanos=1756851296082378959, endEpochNanos=1756851296086323542, attributes=AttributesMap{data={thread.id=129, db.operation=SELECT, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=SELECT NULL AS TABLE_CAT, n.nspname AS TABLE_SCHEM, c.relname AS TABLE_NAME, CASE n.nspname ~ ? OR n.nspname = ? WHEN true THEN CASE WHEN n.nspname = ? OR n.nspname = ? THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END WHEN n.nspname = ? THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END ELSE CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END END WHEN false THEN CASE c.relkind WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? then ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? WHEN ? THEN ? ELSE NULL END ELSE NULL END AS TABLE_TYPE, d.description AS REMARKS, ? as TYPE_CAT, ? as TYPE_SCHEM, ? as TYPE_NAME, ? AS SELF_REFERENCING_COL_NAME, ? AS REF_GENERATION FROM pg_catalog.pg_namespace n, pg_catalog.pg_class c LEFT JOIN pg_catalog.pg_description d ON (c.oid = d.objoid AND d.objsubid = ? and d.classoid = ?::regclass) WHERE c.relnamespace = n.oid AND c.relname LIKE ? AND (false OR ( c.relkind = ? AND n.nspname !~ ? AND n.nspname <> ? ) ) ORDER BY TABLE_TYPE,TABLE_SCHEM,TABLE_NAME }, capacity=128, totalAddedValues=10}, totalAttributeCount=10, events=[], totalRecordedEvents=0, links=[], totalRecordedLinks=0, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}
[2025-09-02 22:14:56,090] INFO Setting metadata for table "person" to Table{name='"person"', type=TABLE columns=[Column{'id', isPrimaryKey=true, allowsNull=false, sqlType=int4}, Column{'name', isPrimaryKey=false, allowsNull=false, sqlType=varchar}]} (io.confluent.connect.jdbc.util.TableDefinitions)
[otel.javaagent 2025-09-02 22:14:56:163 +0000] [task-thread-test-connector-0] DEBUG io.opentelemetry.javaagent.tooling.AgentInstaller$TransformLoggingListener - Transformed org.postgresql.jdbc.PgPreparedStatement -- PluginClassLoader{pluginLocation=file:/usr/share/java/confluentinc-kafka-connect-jdbc/}
[otel.javaagent 2025-09-02 22:14:56:196 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.exporter.logging.LoggingSpanExporter - 'INSERT test.person' : faa1a2f506b412fc46b6daf1c8bb9bce 999244f75d4a7d29 CLIENT [tracer: io.opentelemetry.jdbc:2.20.0-alpha-SNAPSHOT] AttributesMap{data={thread.id=129, db.operation=INSERT, db.sql.table=person, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=INSERT INTO "person" ("id","name") VALUES (?,?)}, capacity=128, totalAddedValues=11}
[otel.javaagent 2025-09-02 22:14:56:196 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.javaagent.testing.exporter.OtlpInMemorySpanExporter - Exporting span SpanData{spanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=999244f75d4a7d29, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=ebe3a0102b0469cf, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, resource=Resource{schemaUrl=null, attributes={service.instance.id="5b2f150f-21b3-452e-b15d-5af413ba2da8", service.name="unknown_service:java", telemetry.distro.name="opentelemetry-java-instrumentation", telemetry.distro.version="2.20.0-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.53.0"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.opentelemetry.jdbc, version=2.20.0-alpha-SNAPSHOT, schemaUrl=null, attributes={}}, name=INSERT test.person, kind=CLIENT, startEpochNanos=1756851296189256750, endEpochNanos=1756851296196073625, attributes=AttributesMap{data={thread.id=129, db.operation=INSERT, db.sql.table=person, db.name=test, server.port=5432, thread.name=task-thread-test-connector-0, db.user=postgres, db.connection_string=postgresql://postgres:5432, server.address=postgres, db.system=postgresql, db.statement=INSERT INTO "person" ("id","name") VALUES (?,?)}, capacity=128, totalAddedValues=11}, totalAttributeCount=11, events=[], totalRecordedEvents=0, links=[], totalRecordedLinks=0, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}
[otel.javaagent 2025-09-02 22:14:56:201 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.exporter.logging.LoggingSpanExporter - 'KafkaConnect.put' : faa1a2f506b412fc46b6daf1c8bb9bce ebe3a0102b0469cf INTERNAL [tracer: io.opentelemetry.kafka-connect-2.6:2.20.0-alpha-SNAPSHOT] AttributesMap{data={thread.id=129, thread.name=task-thread-test-connector-0}, capacity=128, totalAddedValues=2}
[otel.javaagent 2025-09-02 22:14:56:202 +0000] [task-thread-test-connector-0] INFO io.opentelemetry.javaagent.testing.exporter.OtlpInMemorySpanExporter - Exporting span SpanData{spanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=ebe3a0102b0469cf, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, parentSpanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=e37a2c4da90a8684, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=true, valid=true}, resource=Resource{schemaUrl=null, attributes={service.instance.id="5b2f150f-21b3-452e-b15d-5af413ba2da8", service.name="unknown_service:java", telemetry.distro.name="opentelemetry-java-instrumentation", telemetry.distro.version="2.20.0-SNAPSHOT", telemetry.sdk.language="java", telemetry.sdk.name="opentelemetry", telemetry.sdk.version="1.53.0"}}, instrumentationScopeInfo=InstrumentationScopeInfo{name=io.opentelemetry.kafka-connect-2.6, version=2.20.0-alpha-SNAPSHOT, schemaUrl=null, attributes={}}, name=KafkaConnect.put, kind=INTERNAL, startEpochNanos=1756851295420017000, endEpochNanos=1756851296201312542, attributes=AttributesMap{data={thread.id=129, thread.name=task-thread-test-connector-0}, capacity=128, totalAddedValues=2}, totalAttributeCount=2, events=[], totalRecordedEvents=0, links=[ImmutableLinkData{spanContext=ImmutableSpanContext{traceId=faa1a2f506b412fc46b6daf1c8bb9bce, spanId=e37a2c4da90a8684, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=true, valid=true}, attributes={}, totalAttributeCount=0}], totalRecordedLinks=1, status=ImmutableStatusData{statusCode=UNSET, description=}, hasEnded=true}

I see that, we have perfect trace propagation:
Producer span: e37a2c4da90a8684 (remote=true, the original message)
Kafka Connect span: ebe3a0102b0469cf (parent: e37a2c4da90a8684, with span link)
JDBC spans: All have parent ebe3a0102b0469cf, including the crucial INSERT test.person operation

  1. Regarding the span kind: I felt Kafka Connect acts more like an internal processing/transformation layer rather than a direct consumer client. It receives messages, processes them (potentially transforming data), and forwards them to sinks. This seems more aligned with internal processing than direct message consumption. Tbh I started developing this as a CONSUMER and shifted it to INTERNAL midway.

Should I change this to CONSUMER as I am conflicted? I'm open to feedback on this decision.

@laurit
Copy link
Contributor

laurit commented Sep 3, 2025

3. Regarding the span kind: I felt Kafka Connect acts more like an internal processing/transformation layer rather than a direct consumer client. It receives messages, processes them (potentially transforming data), and forwards them to sinks. This seems more aligned with internal processing than direct message consumption. Tbh I started developing this as a CONSUMER and shifted it to INTERNAL midway.

Should I change this to CONSUMER as I am conflicted? I'm open to feedback on this decision.

Doesn't it read from the topic like a regular consumer? If you believe that INTERNAL span is the correct choice then keep in mind that the convention in this project is not to emit telemetry that doesn't have semantic conventions in default configuration. This means that we usually instrumentations that only emit internal spans need to be enabled with a flag e.g

@vasantteja
Copy link
Contributor Author

vasantteja commented Sep 3, 2025

Doesn't it read from the topic like a regular consumer? If you believe that INTERNAL span is the correct choice then keep in mind that the convention in this project is not to emit telemetry that doesn't have semantic conventions in default configuration. This means that we usually instrumentations that only emit internal spans need to be enabled with a flag e.g

@laurit Yep agree with you on this. Kafka Connect does read from topics like a regular consumer.

Looking at it from that perspective, CONSUMER makes more sense since:

  • Kafka Connect fundamentally consumes messages from Kafka topics
  • The processing/transformation aspect doesn't change the fact that it's consuming from messaging infrastructure
  • It aligns better with the semantic conventions for messaging systems

Regarding the convention about INTERNAL spans needing to be behind a flag - that's a great point I wasn't aware of. Since there are established semantic conventions for messaging consumers, CONSUMER would be the more appropriate choice here.

Edit: I updated the implementation to use CONSUMER span kind. Thank you for the guidance! Let me know your thoughts.

@otelbot-java-instrumentation
Copy link
Contributor

🔧 The result from spotlessApply was committed to the PR branch.

@otelbot-java-instrumentation
Copy link
Contributor

🔧 The result from spotlessApply was committed to the PR branch.

@vasantteja
Copy link
Contributor Author

vasantteja commented Oct 6, 2025

@laurit Can you please take another look? Kafka connect's WorkerSinkTask is pulling records from multiple topics instead of one single topic. I am returning the topic name if all the records are from single topic otherwise I am returning a null. Here is the SpanData

{ "traceId" : "835d2dc1437d7f420a28adfab89347bb", "spanId" : "d9db28e45ce804f7", "startEpochNanos" : 1759528513964221000, "endEpochNanos" : 1759528514783972376, "kind" : "CONSUMER", "name" : "unknown process", "attributes" : { "messaging.system" : "kafka", "messaging.batch.message_count" : 3, "thread.name" : "task-thread-test-postgres-connector-multi-0", "thread.id" : 134, "messaging.operation" : "process" }, "parentSpanId" : "0000000000000000" }

Can the span name be unknown process?

Please let me know if you need additional changes.

@vasantteja
Copy link
Contributor Author

vasantteja commented Oct 8, 2025

@laurit When you get a chance, could you take another look? Just let me know if there's anything else you'd like me to adjust. Thanks in advance.

@otelbot-java-instrumentation
Copy link
Contributor

🔧 The result from spotlessApply was committed to the PR branch.

@otelbot-java-instrumentation
Copy link
Contributor

🔧 The result from spotlessApply was committed to the PR branch.

@laurit laurit added this to the v2.21.0 milestone Oct 15, 2025
@trask trask merged commit 9dce150 into open-telemetry:main Oct 16, 2025
81 checks passed
Copy link
Contributor

otelbot bot commented Oct 16, 2025

Thank you for your contribution @vasantteja! 🎉 We would like to hear from you about your experience contributing to OpenTelemetry by taking a few minutes to fill out this survey.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Multiple Trace-ids are generated for Kafka sink connector

5 participants