Skip to content

Commit 0531064

Browse files
authored
CC-7163: Add task information injection (#50)
* Add task id header * Add task generation header * Add current iteration header Signed-off-by: Greg Harris <[email protected]>
1 parent ccca6ec commit 0531064

File tree

4 files changed

+38
-3
lines changed

4 files changed

+38
-3
lines changed

README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,19 @@ If you are using Avro format for producing data to Kafka, here is the correspond
212212

213213
If you are not using Avro format for producing data to Kafka, there will be no schema in Confluent Schema Registry.
214214

215+
# Utility Headers
216+
217+
The Datagen Connector will capture details about the record's generation in the headers of the records it produces.
218+
The following fields are populated:
219+
220+
Header Key | Header Value
221+
-|-
222+
`task.generation` | Task generation number (starts at 0, incremented each time the task restarts)
223+
`task.id` | Task id number (0 up to `tasks.max` - 1)
224+
`current.iteration` | Record iteration number (starts at 0, incremented each time a record is generated)
225+
226+
227+
215228
# Publishing Docker Images
216229

217230
*Note: The following instructions are only relevant if you are an administrator of this repository and have push access to the https://hub.docker.com/r/cnfldemos/kafka-connect-datagen/ repository. The local Docker daemon must be logged into a proper Docker Hub account.*

checkstyle/suppressions.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@
66

77
<suppressions>
88
<suppress checks="ClassDataAbstractionCoupling" files="(DatagenTask|AvroMessageReader|RestService|Errors|SchemaRegistryRestApplication|KafkaSchemaRegistry|KafkaStore|AvroData|KafkaGroupMasterElector).java"/>
9+
<suppress checks="(CyclomaticComplexity|NPathComplexity)" files="DatagenTask.java"/>
910
</suppressions>

src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.avro.generic.GenericRecord;
3333
import org.apache.kafka.connect.data.SchemaAndValue;
3434
import org.apache.kafka.connect.errors.ConnectException;
35+
import org.apache.kafka.connect.header.ConnectHeaders;
3536
import org.apache.kafka.connect.source.SourceRecord;
3637
import org.apache.kafka.connect.source.SourceTask;
3738
import org.apache.kafka.connect.data.Schema;
@@ -66,7 +67,7 @@ public class DatagenTask extends SourceTask {
6667
private AvroData avroData;
6768
private int taskId;
6869
private Map<String, Object> sourcePartition;
69-
private int taskGeneration;
70+
private long taskGeneration;
7071
private Random random;
7172

7273
protected enum Quickstart {
@@ -110,6 +111,7 @@ public void start(Map<String, String> props) {
110111
maxRecords = config.getIterations();
111112
schemaFilename = config.getSchemaFilename();
112113
schemaKeyField = config.getSchemaKeyfield();
114+
taskGeneration = 0;
113115
taskId = Integer.parseInt(props.get(TASK_ID));
114116
sourcePartition = Collections.singletonMap(TASK_ID, taskId);
115117

@@ -232,21 +234,29 @@ public List<SourceRecord> poll() throws InterruptedException {
232234
// Essentially, the "next" state of the connector after this loop completes
233235
Map<String, Object> sourceOffset = new HashMap<>();
234236
// The next lifetime will be a member of the next generation.
235-
sourceOffset.put(TASK_GENERATION, (long) (taskGeneration + 1));
237+
sourceOffset.put(TASK_GENERATION, taskGeneration + 1);
236238
// We will have produced this record
237239
sourceOffset.put(CURRENT_ITERATION, count + 1);
238240
// This is the seed that we just re-seeded for our own next iteration.
239241
sourceOffset.put(RANDOM_SEED, seed);
240242

243+
final ConnectHeaders headers = new ConnectHeaders();
244+
headers.addLong(TASK_GENERATION, taskGeneration);
245+
headers.addLong(TASK_ID, taskId);
246+
headers.addLong(CURRENT_ITERATION, count);
247+
241248
final List<SourceRecord> records = new ArrayList<>();
242249
SourceRecord record = new SourceRecord(
243250
sourcePartition,
244251
sourceOffset,
245252
topic,
253+
null,
246254
KEY_SCHEMA,
247255
keyString,
248256
messageSchema,
249-
messageValue
257+
messageValue,
258+
null,
259+
headers
250260
);
251261
records.add(record);
252262
count += records.size();

src/test/java/io/confluent/kafka/connect/datagen/DatagenTaskTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,17 @@ public void shouldRestoreFromSourceOffsets() throws Exception {
152152
assertEquals(pollA.value(), pollB.value());
153153
}
154154

155+
@Test
156+
public void shouldInjectHeaders() throws Exception {
157+
createTaskWith(Quickstart.USERS);
158+
generateRecords();
159+
for (SourceRecord record : records) {
160+
assertEquals((long) TASK_ID, record.headers().lastWithName(DatagenTask.TASK_ID).value());
161+
assertEquals(0L, record.headers().lastWithName(DatagenTask.TASK_GENERATION).value());
162+
assertNotNull(record.headers().lastWithName(DatagenTask.CURRENT_ITERATION));
163+
}
164+
}
165+
155166
@Test
156167
public void shouldFailToGenerateMoreRecordsThanSpecified() throws Exception {
157168
// Generate the expected number of records

0 commit comments

Comments
 (0)