Skip to content

Commit b5f105d

Browse files
committed
Update kafka-connect test for PostgreSQL db.namespace format
Gate database|schema namespace behind stable semconv flag. The PostgreSQL JDBC URL parser now only sets the database|schema namespace format when stable database semconv is enabled. Under old semconv (default), namespace falls back to just the database name. Added testStableSemconv task for kafka-connect-2.6 testing module. The base test class forwards the semconv-stability.opt-in system property to the container as OTEL_SEMCONV_STABILITY_OPT_IN env var.
1 parent 4a1299c commit b5f105d

File tree

5 files changed

+70
-22
lines changed

5 files changed

+70
-22
lines changed

instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/DbClientSpanNameExtractor.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ private GenericDbClientSpanNameExtractor(DbClientAttributesGetter<REQUEST, ?> ge
152152
this.getter = getter;
153153
}
154154

155+
@SuppressWarnings("deprecation") // getDbName is used for old semconv span names
155156
@Override
156157
public String extract(REQUEST request) {
157158
if (emitStableDatabaseSemconv()) {
@@ -162,9 +163,9 @@ public String extract(REQUEST request) {
162163
String operationName = getter.getDbOperationName(request);
163164
return computeSpanNameStable(getter, request, operationName, null, null);
164165
}
165-
String namespace = getter.getDbNamespace(request);
166+
String dbName = getter.getDbName(request);
166167
String operationName = getter.getDbOperationName(request);
167-
return computeSpanName(namespace, operationName, null, null);
168+
return computeSpanName(dbName, operationName, null, null);
168169
}
169170
}
170171

@@ -177,9 +178,9 @@ private SqlClientSpanNameExtractor(SqlClientAttributesGetter<REQUEST, ?> getter)
177178
this.getter = getter;
178179
}
179180

181+
@SuppressWarnings("deprecation") // getDbName is used for old semconv span names
180182
@Override
181183
public String extract(REQUEST request) {
182-
String namespace = getter.getDbNamespace(request);
183184
Collection<String> rawQueryTexts = getter.getRawQueryTexts(request);
184185

185186
if (rawQueryTexts.isEmpty()) {
@@ -191,18 +192,20 @@ public String extract(REQUEST request) {
191192
String operationName = getter.getDbOperationName(request);
192193
return computeSpanNameStable(getter, request, operationName, null, null);
193194
}
195+
String dbName = getter.getDbName(request);
194196
String operationName = getter.getDbOperationName(request);
195-
return computeSpanName(namespace, operationName, null, null);
197+
return computeSpanName(dbName, operationName, null, null);
196198
}
197199

198200
if (!emitStableDatabaseSemconv()) {
201+
String dbName = getter.getDbName(request);
199202
if (rawQueryTexts.size() > 1) { // for backcompat(?)
200-
return computeSpanName(namespace, null, null, null);
203+
return computeSpanName(dbName, null, null, null);
201204
}
202205
SqlQuery sanitizedQuery = SqlQuerySanitizerUtil.sanitize(rawQueryTexts.iterator().next());
203206

204207
return computeSpanName(
205-
namespace,
208+
dbName,
206209
sanitizedQuery.getOperationName(),
207210
sanitizedQuery.getCollectionName(),
208211
sanitizedQuery.getStoredProcedureName());
@@ -252,14 +255,15 @@ private MigratingSqlClientSpanNameExtractor(SqlClientAttributesGetter<REQUEST, ?
252255
this.sqlDelegate = new SqlClientSpanNameExtractor<>(getter);
253256
}
254257

258+
@SuppressWarnings("deprecation") // getDbName is used for old semconv span names
255259
@Override
256260
public String extract(REQUEST request) {
257261
if (emitStableDatabaseSemconv()) {
258262
return sqlDelegate.extract(request);
259263
}
260-
// For old semconv, use the generic span name format (operation + namespace)
264+
// For old semconv, use the generic span name format (operation + db.name)
261265
// without collection name to preserve backward compatibility
262-
String namespace = getter.getDbNamespace(request);
266+
String dbName = getter.getDbName(request);
263267
Collection<String> rawQueryTexts = getter.getRawQueryTexts(request);
264268
String operationName = null;
265269
if (rawQueryTexts.size() == 1) {
@@ -269,7 +273,7 @@ public String extract(REQUEST request) {
269273
if (operationName == null) {
270274
operationName = getter.getDbOperationName(request);
271275
}
272-
return computeSpanName(namespace, operationName, null, null);
276+
return computeSpanName(dbName, operationName, null, null);
273277
}
274278
}
275279
}

instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/db/DbClientSpanNameExtractorTest.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.mockito.Mock;
2020
import org.mockito.junit.jupiter.MockitoExtension;
2121

22+
@SuppressWarnings("deprecation") // getDbName is used for old semconv span names
2223
@ExtendWith(MockitoExtension.class)
2324
class DbClientSpanNameExtractorTest {
2425
@Mock DbClientAttributesGetter<DbRequest, Void> dbAttributesGetter;
@@ -32,7 +33,8 @@ void shouldExtractFullSpanName() {
3233

3334
when(sqlAttributesGetter.getRawQueryTexts(dbRequest))
3435
.thenReturn(singleton("SELECT * from table"));
35-
when(sqlAttributesGetter.getDbNamespace(dbRequest)).thenReturn("database");
36+
lenient().when(sqlAttributesGetter.getDbNamespace(dbRequest)).thenReturn("database");
37+
lenient().when(sqlAttributesGetter.getDbName(dbRequest)).thenReturn("database");
3638

3739
SpanNameExtractor<DbRequest> underTest = DbClientSpanNameExtractor.create(sqlAttributesGetter);
3840

@@ -51,7 +53,8 @@ void shouldSkipNamespaceIfTableAlreadyHasNamespacePrefix() {
5153

5254
when(sqlAttributesGetter.getRawQueryTexts(dbRequest))
5355
.thenReturn(singleton("SELECT * from another.table"));
54-
when(sqlAttributesGetter.getDbNamespace(dbRequest)).thenReturn("database");
56+
lenient().when(sqlAttributesGetter.getDbNamespace(dbRequest)).thenReturn("database");
57+
lenient().when(sqlAttributesGetter.getDbName(dbRequest)).thenReturn("database");
5558

5659
SpanNameExtractor<DbRequest> underTest = DbClientSpanNameExtractor.create(sqlAttributesGetter);
5760

@@ -85,7 +88,8 @@ void shouldExtractOperationAndName() {
8588
DbRequest dbRequest = new DbRequest();
8689

8790
when(dbAttributesGetter.getDbOperationName(dbRequest)).thenReturn("SELECT");
88-
when(dbAttributesGetter.getDbNamespace(dbRequest)).thenReturn("database");
91+
lenient().when(dbAttributesGetter.getDbNamespace(dbRequest)).thenReturn("database");
92+
lenient().when(dbAttributesGetter.getDbName(dbRequest)).thenReturn("database");
8993

9094
SpanNameExtractor<DbRequest> underTest = DbClientSpanNameExtractor.create(dbAttributesGetter);
9195

@@ -117,7 +121,8 @@ void shouldExtractNamespace() {
117121
// given
118122
DbRequest dbRequest = new DbRequest();
119123

120-
when(dbAttributesGetter.getDbNamespace(dbRequest)).thenReturn("database");
124+
lenient().when(dbAttributesGetter.getDbNamespace(dbRequest)).thenReturn("database");
125+
lenient().when(dbAttributesGetter.getDbName(dbRequest)).thenReturn("database");
121126

122127
SpanNameExtractor<DbRequest> underTest = DbClientSpanNameExtractor.create(dbAttributesGetter);
123128

@@ -152,6 +157,7 @@ void shouldUseQuerySummaryWhenAvailable() {
152157
// Needs to be lenient because not called during this test under new semconv mode
153158
lenient().when(dbAttributesGetter.getDbOperationName(dbRequest)).thenReturn("SELECT");
154159
lenient().when(dbAttributesGetter.getDbNamespace(dbRequest)).thenReturn("database");
160+
lenient().when(dbAttributesGetter.getDbName(dbRequest)).thenReturn("database");
155161

156162
SpanNameExtractor<DbRequest> underTest = DbClientSpanNameExtractor.create(dbAttributesGetter);
157163

@@ -170,7 +176,8 @@ void shouldExtractFullSpanNameForBatch() {
170176

171177
when(sqlAttributesGetter.getRawQueryTexts(dbRequest))
172178
.thenReturn(Arrays.asList("INSERT INTO table VALUES(1)", "INSERT INTO table VALUES(2)"));
173-
when(sqlAttributesGetter.getDbNamespace(dbRequest)).thenReturn("database");
179+
lenient().when(sqlAttributesGetter.getDbNamespace(dbRequest)).thenReturn("database");
180+
lenient().when(sqlAttributesGetter.getDbName(dbRequest)).thenReturn("database");
174181

175182
SpanNameExtractor<DbRequest> underTest = DbClientSpanNameExtractor.create(sqlAttributesGetter);
176183

@@ -188,7 +195,8 @@ void shouldExtractFullSpanNameForSingleQueryBatch() {
188195

189196
when(sqlAttributesGetter.getRawQueryTexts(dbRequest))
190197
.thenReturn(singleton("INSERT INTO table VALUES(?)"));
191-
when(sqlAttributesGetter.getDbNamespace(dbRequest)).thenReturn("database");
198+
lenient().when(sqlAttributesGetter.getDbNamespace(dbRequest)).thenReturn("database");
199+
lenient().when(sqlAttributesGetter.getDbName(dbRequest)).thenReturn("database");
192200
if (emitStableDatabaseSemconv()) {
193201
when(sqlAttributesGetter.getDbOperationBatchSize(dbRequest)).thenReturn(2L);
194202
}
@@ -210,7 +218,8 @@ void shouldFallBackToExplicitOperationNameForEmptySqlQuery() {
210218

211219
when(sqlAttributesGetter.getRawQueryTexts(dbRequest)).thenReturn(emptyList());
212220
when(sqlAttributesGetter.getDbOperationName(dbRequest)).thenReturn("WRITE");
213-
when(sqlAttributesGetter.getDbNamespace(dbRequest)).thenReturn("mydb");
221+
lenient().when(sqlAttributesGetter.getDbNamespace(dbRequest)).thenReturn("mydb");
222+
lenient().when(sqlAttributesGetter.getDbName(dbRequest)).thenReturn("mydb");
214223

215224
SpanNameExtractor<DbRequest> underTest = DbClientSpanNameExtractor.create(sqlAttributesGetter);
216225

@@ -230,6 +239,7 @@ void shouldPreserveOldSemconvSpanNameForMigration() {
230239
when(sqlAttributesGetter.getRawQueryTexts(dbRequest))
231240
.thenReturn(singleton("SELECT * from table"));
232241
lenient().when(sqlAttributesGetter.getDbNamespace(dbRequest)).thenReturn("database");
242+
lenient().when(sqlAttributesGetter.getDbName(dbRequest)).thenReturn("database");
233243

234244
SpanNameExtractor<DbRequest> underTest =
235245
DbClientSpanNameExtractor.createForMigration(sqlAttributesGetter);
@@ -250,7 +260,8 @@ void shouldFallBackToExplicitOperationForEmptySqlQueryInMigration() {
250260

251261
when(sqlAttributesGetter.getRawQueryTexts(dbRequest)).thenReturn(emptyList());
252262
when(sqlAttributesGetter.getDbOperationName(dbRequest)).thenReturn("WRITE");
253-
when(sqlAttributesGetter.getDbNamespace(dbRequest)).thenReturn("mydb");
263+
lenient().when(sqlAttributesGetter.getDbNamespace(dbRequest)).thenReturn("mydb");
264+
lenient().when(sqlAttributesGetter.getDbName(dbRequest)).thenReturn("mydb");
254265

255266
SpanNameExtractor<DbRequest> underTest =
256267
DbClientSpanNameExtractor.createForMigration(sqlAttributesGetter);

instrumentation/kafka/kafka-connect-2.6/testing/build.gradle.kts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,15 @@ tasks.withType<Test>().configureEach {
3636
systemProperty("io.opentelemetry.smoketest.agent.shadowJar.path", agentShadowJar.get().archiveFile.get().toString())
3737
systemProperty("collectMetadata", findProperty("collectMetadata")?.toString() ?: "false")
3838
}
39+
40+
tasks {
41+
val testStableSemconv by registering(Test::class) {
42+
testClassesDirs = sourceSets.test.get().output.classesDirs
43+
classpath = sourceSets.test.get().runtimeClasspath
44+
jvmArgs("-Dotel.semconv-stability.opt-in=database")
45+
}
46+
47+
check {
48+
dependsOn(testStableSemconv)
49+
}
50+
}

instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/KafkaConnectSinkTaskBaseTest.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,15 @@ private void setupKafkaConnect() {
274274
.withEnv("OTEL_EXPORTER_OTLP_PROTOCOL", "grpc")
275275
.withEnv("OTEL_BSP_MAX_EXPORT_BATCH_SIZE", "1")
276276
.withEnv("OTEL_BSP_SCHEDULE_DELAY", "10ms")
277-
.withEnv("OTEL_METRIC_EXPORT_INTERVAL", "1000")
277+
.withEnv("OTEL_METRIC_EXPORT_INTERVAL", "1000");
278+
279+
// Pass semconv stability opt-in to the agent inside the container
280+
String semconvOptIn = System.getProperty("otel.semconv-stability.opt-in");
281+
if (semconvOptIn != null) {
282+
kafkaConnect.withEnv("OTEL_SEMCONV_STABILITY_OPT_IN", semconvOptIn);
283+
}
284+
285+
kafkaConnect
278286
.withEnv("CONNECT_BOOTSTRAP_SERVERS", getInternalKafkaBoostrapServers())
279287
.withEnv("CONNECT_REST_ADVERTISED_HOST_NAME", KAFKA_CONNECT_NETWORK_ALIAS)
280288
.withEnv("CONNECT_PLUGIN_PATH", PLUGIN_PATH_CONTAINER)

instrumentation/kafka/kafka-connect-2.6/testing/src/test/java/io/opentelemetry/instrumentation/kafkaconnect/v2_6/PostgresKafkaConnectSinkTaskTest.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ class PostgresKafkaConnectSinkTaskTest extends KafkaConnectSinkTaskBaseTest {
6767
private static final String DB_USERNAME = "postgres";
6868
private static final String DB_PASSWORD = "password";
6969
private static final String DB_TABLE_PERSON = "person";
70+
// Old semconv span names use db.name (just the database), stable semconv uses
71+
// db.namespace (database|schema) per PostgreSQL semantic conventions
72+
private static final String DB_SPAN_NAMESPACE =
73+
stableDatabaseSemconv() ? DATABASE_NAME + "|" + DB_USERNAME : DATABASE_NAME;
7074
private static final String CONNECTOR_NAME = "test-postgres-connector";
7175
private static final String TOPIC_NAME = "test-postgres-topic";
7276

@@ -163,7 +167,9 @@ void testSingleMessage() throws Exception {
163167
// kafka connect consumer trace, linked to producer span via a span link
164168
Consumer<SpanDataAssert> selectAssertion =
165169
span ->
166-
span.hasName("SELECT test").hasKind(SpanKind.CLIENT).hasParent(trace.getSpan(0));
170+
span.hasName("SELECT " + DB_SPAN_NAMESPACE)
171+
.hasKind(SpanKind.CLIENT)
172+
.hasParent(trace.getSpan(0));
167173

168174
trace.hasSpansSatisfyingExactly(
169175
span ->
@@ -184,7 +190,7 @@ void testSingleMessage() throws Exception {
184190
selectAssertion,
185191
selectAssertion,
186192
span ->
187-
span.hasName("INSERT test." + DB_TABLE_PERSON)
193+
span.hasName("INSERT " + DB_SPAN_NAMESPACE + "." + DB_TABLE_PERSON)
188194
.hasKind(SpanKind.CLIENT)
189195
.hasParent(trace.getSpan(0)));
190196
},
@@ -288,7 +294,9 @@ void testMultiTopic() throws Exception {
288294
// kafka connect consumer trace, linked to producer span via a span link
289295
Consumer<SpanDataAssert> selectAssertion =
290296
span ->
291-
span.hasName("SELECT test").hasKind(SpanKind.CLIENT).hasParent(trace.getSpan(0));
297+
span.hasName("SELECT " + DB_SPAN_NAMESPACE)
298+
.hasKind(SpanKind.CLIENT)
299+
.hasParent(trace.getSpan(0));
292300

293301
trace.hasSpansSatisfyingExactly(
294302
span ->
@@ -311,7 +319,7 @@ void testMultiTopic() throws Exception {
311319
selectAssertion,
312320
selectAssertion,
313321
span ->
314-
span.hasName("INSERT test." + DB_TABLE_PERSON)
322+
span.hasName("INSERT " + DB_SPAN_NAMESPACE + "." + DB_TABLE_PERSON)
315323
.hasKind(SpanKind.CLIENT)
316324
.hasParent(trace.getSpan(0)));
317325
},
@@ -426,4 +434,9 @@ private static void clearPostgresTable() throws SQLException {
426434
logger.info("Cleared PostgreSQL table: {}", DB_TABLE_PERSON);
427435
}
428436
}
437+
438+
private static boolean stableDatabaseSemconv() {
439+
String optIn = System.getProperty("otel.semconv-stability.opt-in", "");
440+
return optIn.contains("database");
441+
}
429442
}

0 commit comments

Comments
 (0)