Skip to content

Commit 07e5e9a

Browse files
committed
Track schema registry usage
1 parent b436de8 commit 07e5e9a

File tree

20 files changed

+1816
-1
lines changed

20 files changed

+1816
-1
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ out/
4646
######################
4747
.vscode
4848

49+
# Cursor #
50+
##########
51+
.cursor
52+
4953
# Others #
5054
##########
5155
/logs/*

dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/datastreams/RecordingDatastreamsPayloadWriter.groovy

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter {
1919
@SuppressWarnings('UnusedPrivateField')
2020
private final Set<DataStreamsTags> backlogs = []
2121

22+
@SuppressWarnings('UnusedPrivateField')
23+
private final List<StatsBucket.SchemaRegistryKey> schemaRegistryUsages = []
24+
2225
private final Set<String> serviceNameOverrides = []
2326

2427
@Override
@@ -33,6 +36,11 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter {
3336
this.@backlogs.add(backlog.getKey())
3437
}
3538
}
39+
if (bucket.schemaRegistryUsages != null) {
40+
for (Map.Entry<StatsBucket.SchemaRegistryKey, StatsBucket.SchemaRegistryCount> usage : bucket.schemaRegistryUsages) {
41+
this.@schemaRegistryUsages.add(usage.getKey())
42+
}
43+
}
3644
}
3745
}
3846

@@ -52,10 +60,15 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter {
5260
Collections.unmodifiableList(new ArrayList<>(this.@backlogs))
5361
}
5462

63+
synchronized List<StatsBucket.SchemaRegistryKey> getSchemaRegistryUsages() {
64+
Collections.unmodifiableList(new ArrayList<>(this.@schemaRegistryUsages))
65+
}
66+
5567
synchronized void clear() {
5668
this.@payloads.clear()
5769
this.@groups.clear()
5870
this.@backlogs.clear()
71+
this.@schemaRegistryUsages.clear()
5972
}
6073

6174
void waitForPayloads(int count, long timeout = TimeUnit.SECONDS.toMillis(3)) {
@@ -70,6 +83,10 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter {
7083
waitFor(count, timeout, this.@backlogs)
7184
}
7285

86+
void waitForSchemaRegistryUsages(int count, long timeout = TimeUnit.SECONDS.toMillis(3)) {
87+
waitFor(count, timeout, this.@schemaRegistryUsages)
88+
}
89+
7390
private static void waitFor(int count, long timeout, Collection collection) {
7491
long deadline = System.currentTimeMillis() + timeout
7592
while (System.currentTimeMillis() < deadline) {
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
# Confluent Schema Registry Instrumentation
2+
3+
This instrumentation module provides detailed observability for Confluent Schema Registry operations in Kafka applications.
4+
5+
## Features
6+
7+
This instrumentation captures:
8+
9+
### Producer Operations
10+
- **Schema Registration**: Tracks when schemas are registered with the Schema Registry
11+
- Subject name
12+
- Schema ID assigned
13+
- Success/failure status
14+
- Compatibility check results
15+
- **Serialization**: Logs every message serialization with:
16+
- Topic name
17+
- Key schema ID (if applicable)
18+
- Value schema ID
19+
- Success/failure status
20+
21+
### Consumer Operations
22+
- **Deserialization**: Tracks every message deserialization with:
23+
- Topic name
24+
- Key schema ID (if present in message)
25+
- Value schema ID (extracted from Confluent wire format)
26+
- Success/failure status
27+
28+
### Schema Registry Client Operations
29+
- **Schema Registration** (`register()` method)
30+
- Successful registrations with schema ID
31+
- Compatibility failures with error details
32+
- **Compatibility Checks** (`testCompatibility()` method)
33+
- Pass/fail status
34+
- Error messages for incompatible schemas
35+
- **Schema Retrieval** (`getSchemaById()` method)
36+
- Schema ID lookups during deserialization
37+
38+
## Metrics Collected
39+
40+
The `SchemaRegistryMetrics` class tracks:
41+
42+
- `schemaRegistrationSuccess` - Count of successful schema registrations
43+
- `schemaRegistrationFailure` - Count of failed schema registrations (compatibility issues)
44+
- `schemaCompatibilitySuccess` - Count of successful compatibility checks
45+
- `schemaCompatibilityFailure` - Count of failed compatibility checks
46+
- `serializationSuccess` - Count of successful message serializations
47+
- `serializationFailure` - Count of failed serializations
48+
- `deserializationSuccess` - Count of successful message deserializations
49+
- `deserializationFailure` - Count of failed deserializations
50+
51+
## Log Output Examples
52+
53+
### Successful Producer Operation
54+
```
55+
[Schema Registry] Schema registered successfully - Subject: myTopic-value, Schema ID: 123, Is Key: false, Topic: myTopic
56+
[Schema Registry] Produce to topic 'myTopic', schema for key: none, schema for value: 123, serializing: VALUE
57+
```
58+
59+
### Failed Schema Registration (Incompatibility)
60+
```
61+
[Schema Registry] Schema registration FAILED - Subject: myTopic-value, Is Key: false, Topic: myTopic, Error: Schema being registered is incompatible with an earlier schema
62+
[Schema Registry] Schema compatibility check FAILED - Subject: myTopic-value, Error: Schema being registered is incompatible with an earlier schema
63+
[Schema Registry] Serialization FAILED for topic 'myTopic', VALUE - Error: Schema being registered is incompatible with an earlier schema
64+
```
65+
66+
### Consumer Operation
67+
```
68+
[Schema Registry] Retrieved schema from registry - Schema ID: 123, Type: Schema
69+
[Schema Registry] Consume from topic 'myTopic', schema for key: none, schema for value: 123, deserializing: VALUE
70+
```
71+
72+
## Supported Serialization Formats
73+
74+
- **Avro** (via `KafkaAvroSerializer`/`KafkaAvroDeserializer`)
75+
- **Protobuf** (via `KafkaProtobufSerializer`/`KafkaProtobufDeserializer`)
76+
77+
## Implementation Details
78+
79+
### Instrumented Classes
80+
81+
1. **CachedSchemaRegistryClient** - The main Schema Registry client
82+
- `register(String subject, Schema schema)` - Schema registration
83+
- `testCompatibility(String subject, Schema schema)` - Compatibility testing
84+
- `getSchemaById(int id)` - Schema retrieval
85+
86+
2. **AbstractKafkaSchemaSerDe and subclasses** - Serializers
87+
- `serialize(String topic, Object data)` - Message serialization
88+
- `serialize(String topic, Headers headers, Object data)` - With headers (Kafka 2.1+)
89+
90+
3. **AbstractKafkaSchemaSerDe and subclasses** - Deserializers
91+
- `deserialize(String topic, byte[] data)` - Message deserialization
92+
- `deserialize(String topic, Headers headers, byte[] data)` - With headers (Kafka 2.1+)
93+
94+
### Context Management
95+
96+
The `SchemaRegistryContext` class uses ThreadLocal storage to pass context between:
97+
- Serializer → Schema Registry Client (for logging topic information)
98+
- Deserializer → Schema Registry Client (for logging topic information)
99+
100+
This allows the instrumentation to correlate schema operations with the topics they're associated with.
101+
102+
## Usage
103+
104+
This instrumentation is automatically activated when:
105+
1. Confluent Schema Registry client (version 7.0.0+) is present on the classpath
106+
2. The Datadog Java agent is attached to the JVM
107+
108+
No configuration or code changes are required.
109+
110+
## Metrics Access
111+
112+
To access metrics programmatically:
113+
114+
```java
115+
import datadog.trace.instrumentation.confluentschemaregistry.SchemaRegistryMetrics;
116+
117+
// Get current counts
118+
long registrationFailures = SchemaRegistryMetrics.getSchemaRegistrationFailureCount();
119+
long compatibilityFailures = SchemaRegistryMetrics.getSchemaCompatibilityFailureCount();
120+
long serializationFailures = SchemaRegistryMetrics.getSerializationFailureCount();
121+
122+
// Print summary
123+
SchemaRegistryMetrics.printSummary();
124+
```
125+
126+
## Monitoring Schema Compatibility Issues
127+
128+
The primary use case for this instrumentation is to detect and monitor schema compatibility issues that cause production failures. By tracking `schemaRegistrationFailure` and `schemaCompatibilityFailure` metrics, you can:
129+
130+
1. **Alert on schema compatibility failures** before they impact production
131+
2. **Track the rate of schema-related errors** per topic
132+
3. **Identify problematic schema changes** that break compatibility
133+
4. **Monitor serialization/deserialization failure rates** as a proxy for schema issues
134+
135+
## Future Enhancements
136+
137+
Potential additions:
138+
- JSON Schema serializer support (currently excluded due to dependency issues)
139+
- Schema evolution tracking
140+
- Schema version diff logging
141+
- Integration with Datadog APM for schema-related span tags
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
apply from: "$rootDir/gradle/java.gradle"
2+
3+
muzzle {
4+
pass {
5+
group = "io.confluent"
6+
module = "kafka-schema-registry-client"
7+
versions = "[7.0.0,)"
8+
assertInverse = true
9+
}
10+
}
11+
12+
dependencies {
13+
compileOnly group: 'io.confluent', name: 'kafka-schema-registry-client', version: '7.0.0'
14+
compileOnly group: 'io.confluent', name: 'kafka-avro-serializer', version: '7.0.0'
15+
compileOnly group: 'io.confluent', name: 'kafka-protobuf-serializer', version: '7.0.0'
16+
compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: '3.0.0'
17+
18+
testImplementation group: 'io.confluent', name: 'kafka-schema-registry-client', version: '7.5.2'
19+
testImplementation group: 'io.confluent', name: 'kafka-avro-serializer', version: '7.5.2'
20+
testImplementation group: 'io.confluent', name: 'kafka-protobuf-serializer', version: '7.5.1'
21+
testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.5.0'
22+
testImplementation group: 'org.apache.avro', name: 'avro', version: '1.11.0'
23+
}
24+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package datadog.trace.instrumentation.confluentschemaregistry;
2+
3+
import static datadog.trace.agent.tooling.bytebuddy.matcher.ClassLoaderMatchers.hasClassNamed;
4+
import static java.util.Arrays.asList;
5+
6+
import com.google.auto.service.AutoService;
7+
import datadog.trace.agent.tooling.Instrumenter;
8+
import datadog.trace.agent.tooling.InstrumenterModule;
9+
import java.util.List;
10+
import net.bytebuddy.matcher.ElementMatcher;
11+
12+
/**
13+
* Instrumentation module for Confluent Schema Registry to capture schema operations including
14+
* registration, compatibility checks, serialization, and deserialization.
15+
*/
16+
@AutoService(InstrumenterModule.class)
17+
public class ConfluentSchemaRegistryModule extends InstrumenterModule.Tracing {
18+
19+
public ConfluentSchemaRegistryModule() {
20+
super("confluent-schema-registry");
21+
}
22+
23+
@Override
24+
public String[] helperClassNames() {
25+
return new String[] {
26+
packageName + ".SchemaRegistryMetrics", packageName + ".SchemaRegistryContext",
27+
};
28+
}
29+
30+
@Override
31+
@SuppressWarnings("unchecked")
32+
public List<Instrumenter> typeInstrumentations() {
33+
return (List<Instrumenter>)
34+
(List<?>)
35+
asList(
36+
new SchemaRegistryClientInstrumentation(),
37+
new KafkaAvroSerializerInstrumentation(),
38+
new KafkaAvroDeserializerInstrumentation());
39+
}
40+
41+
@Override
42+
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
43+
return hasClassNamed("io.confluent.kafka.schemaregistry.client.SchemaRegistryClient");
44+
}
45+
}

0 commit comments

Comments
 (0)