Skip to content

Commit 7367108

Browse files
authored
CC-7162: Add offset recovery and deterministic seeding (#49)
* Add deterministic seed configuration * Add offset recovery after task failure * Add tests for determinism * Depend on avro-random-generator:0.3.0 for reentrancy features * Migrate from constructor to builder for Avro Random Generator * Update documentation for new feature & clarify iterations behavior Signed-off-by: Greg Harris <[email protected]>
1 parent d90337e commit 7367108

File tree

7 files changed

+150
-20
lines changed

7 files changed

+150
-20
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ Parameter | Description | Default
120120
-|-|-
121121
`kafka.topic` | Topic to write to |
122122
`max.interval` | Max interval between messages (ms) | 500
123-
`iterations` | Number of messages to send, or less than 1 for unlimited | -1
123+
`iterations` | Number of messages to send from each task, or less than 1 for unlimited | -1
124124
`schema.filename` | Filename of schema to use
125125
`schema.keyfield` | Name of field to use as the message key
126126
`quickstart` | Name of [quickstart](https://github.com/confluentinc/kafka-connect-datagen/tree/master/src/main/resources) to use

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
<properties>
3333
<connect-runtime-version>2.0.0</connect-runtime-version>
3434
<confluent.version>5.1.0</confluent.version>
35-
<confluent.avro.generator.version>0.2.0</confluent.avro.generator.version>
35+
<confluent.avro.generator.version>0.3.0</confluent.avro.generator.version>
3636
<junit.version>4.12</junit.version>
3737
<avro.version>1.8.1</avro.version>
3838
<licenses.version>5.1.0</licenses.version>

src/main/java/io/confluent/kafka/connect/datagen/DatagenConnector.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.confluent.kafka.connect.datagen;
1818

1919
import java.util.ArrayList;
20+
import java.util.HashMap;
2021
import java.util.List;
2122
import java.util.Map;
2223

@@ -60,7 +61,9 @@ public Class<? extends Task> taskClass() {
6061
public List<Map<String, String>> taskConfigs(int maxTasks) {
6162
List<Map<String, String>> taskConfigs = new ArrayList<>();
6263
for (int i = 0; i < maxTasks; i++) {
63-
taskConfigs.add(this.props);
64+
Map<String, String> taskConfig = new HashMap<>(this.props);
65+
taskConfig.put(DatagenTask.TASK_ID, Integer.toString(i));
66+
taskConfigs.add(taskConfig);
6467
}
6568
return taskConfigs;
6669
}

src/main/java/io/confluent/kafka/connect/datagen/DatagenConnectorConfig.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,18 @@ public class DatagenConnectorConfig extends AbstractConfig {
3030
public static final String MAXINTERVAL_CONF = "max.interval";
3131
private static final String MAXINTERVAL_DOC = "Max interval between messages (ms)";
3232
public static final String ITERATIONS_CONF = "iterations";
33-
private static final String ITERATIONS_DOC = "Number of messages to send, or less than 1 for "
34-
+ "unlimited";
33+
private static final String ITERATIONS_DOC = "Number of messages to send from each task, "
34+
+ "or less than 1 for unlimited";
3535
public static final String SCHEMA_FILENAME_CONF = "schema.filename";
3636
private static final String SCHEMA_FILENAME_DOC = "Filename of schema to use";
3737
public static final String SCHEMA_KEYFIELD_CONF = "schema.keyfield";
3838
private static final String SCHEMA_KEYFIELD_DOC = "Name of field to use as the message key";
3939
public static final String QUICKSTART_CONF = "quickstart";
4040
private static final String QUICKSTART_DOC = "Name of quickstart to use";
41+
public static final String RANDOM_SEED_CONF = "random.seed";
42+
private static final String RANDOM_SEED_DOC = "Numeric seed for generating random data. "
43+
+ "Two connectors started with the same seed will deterministically produce the same data. "
44+
+ "Each task will generate different data than the other tasks in the same connector.";
4145

4246
public DatagenConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
4347
super(config, parsedConfig);
@@ -54,7 +58,8 @@ public static ConfigDef conf() {
5458
.define(ITERATIONS_CONF, Type.INT, -1, Importance.HIGH, ITERATIONS_DOC)
5559
.define(SCHEMA_FILENAME_CONF, Type.STRING, "", Importance.HIGH, SCHEMA_FILENAME_DOC)
5660
.define(SCHEMA_KEYFIELD_CONF, Type.STRING, "", Importance.HIGH, SCHEMA_KEYFIELD_DOC)
57-
.define(QUICKSTART_CONF, Type.STRING, "", Importance.HIGH, QUICKSTART_DOC);
61+
.define(QUICKSTART_CONF, Type.STRING, "", Importance.HIGH, QUICKSTART_DOC)
62+
.define(RANDOM_SEED_CONF, Type.LONG, null, Importance.LOW, RANDOM_SEED_DOC);
5863
}
5964

6065
public String getKafkaTopic() {
@@ -81,5 +86,9 @@ public String getQuickstart() {
8186
return this.getString(QUICKSTART_CONF);
8287
}
8388

89+
public Long getRandomSeed() {
90+
return this.getLong(RANDOM_SEED_CONF);
91+
}
92+
8493
}
8594

src/main/java/io/confluent/kafka/connect/datagen/DatagenTask.java

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.FileInputStream;
2121
import java.util.ArrayList;
2222
import java.util.Collections;
23+
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Map;
2526
import java.util.Random;
@@ -45,8 +46,10 @@ public class DatagenTask extends SourceTask {
4546
static final Logger log = LoggerFactory.getLogger(DatagenTask.class);
4647

4748
private static final Schema KEY_SCHEMA = Schema.STRING_SCHEMA;
48-
private static final Map<String, ?> SOURCE_PARTITION = Collections.emptyMap();
49-
private static final Map<String, ?> SOURCE_OFFSET = Collections.emptyMap();
49+
public static final String TASK_ID = "task.id";
50+
public static final String TASK_GENERATION = "task.generation";
51+
public static final String CURRENT_ITERATION = "current.iteration";
52+
public static final String RANDOM_SEED = "random.seed";
5053

5154

5255
private DatagenConnectorConfig config;
@@ -61,6 +64,10 @@ public class DatagenTask extends SourceTask {
6164
private org.apache.avro.Schema avroSchema;
6265
private org.apache.kafka.connect.data.Schema ksqlSchema;
6366
private AvroData avroData;
67+
private int taskId;
68+
private Map<String, Object> sourcePartition;
69+
private int taskGeneration;
70+
private Random random;
6471

6572
protected enum Quickstart {
6673
CLICKSTREAM_CODES("clickstream_codes_schema.avro", "code"),
@@ -103,6 +110,26 @@ public void start(Map<String, String> props) {
103110
maxRecords = config.getIterations();
104111
schemaFilename = config.getSchemaFilename();
105112
schemaKeyField = config.getSchemaKeyfield();
113+
taskId = Integer.parseInt(props.get(TASK_ID));
114+
sourcePartition = Collections.singletonMap(TASK_ID, taskId);
115+
116+
random = new Random();
117+
if (config.getRandomSeed() != null) {
118+
random.setSeed(config.getRandomSeed());
119+
// Each task will now deterministically advance it's random source
120+
// This makes it such that each task will generate different data
121+
for (int i = 0; i < taskId; i++) {
122+
random.setSeed(random.nextLong());
123+
}
124+
}
125+
126+
Map<String, Object> offset = context.offsetStorageReader().offset(sourcePartition);
127+
if (offset != null) {
128+
// The offset as it is stored contains our next state, so restore it as-is.
129+
taskGeneration = ((Long) offset.get(TASK_GENERATION)).intValue();
130+
count = ((Long) offset.get(CURRENT_ITERATION));
131+
random.setSeed((Long) offset.get(RANDOM_SEED));
132+
}
106133

107134
String quickstartName = config.getQuickstart();
108135
if (quickstartName != "") {
@@ -112,10 +139,11 @@ public void start(Map<String, String> props) {
112139
schemaFilename = quickstart.getSchemaFilename();
113140
schemaKeyField = quickstart.getSchemaKeyField();
114141
try {
115-
generator = new Generator(
116-
getClass().getClassLoader().getResourceAsStream(schemaFilename),
117-
new Random()
118-
);
142+
generator = new Generator.Builder()
143+
.schemaStream(getClass().getClassLoader().getResourceAsStream(schemaFilename))
144+
.random(random)
145+
.generation(count)
146+
.build();
119147
} catch (IOException e) {
120148
throw new ConnectException("Unable to read the '"
121149
+ schemaFilename + "' schema file", e);
@@ -126,10 +154,11 @@ public void start(Map<String, String> props) {
126154
}
127155
} else {
128156
try {
129-
generator = new Generator(
130-
new FileInputStream(schemaFilename),
131-
new Random()
132-
);
157+
generator = new Generator.Builder()
158+
.schemaStream(new FileInputStream(schemaFilename))
159+
.random(random)
160+
.generation(count)
161+
.build();
133162
} catch (IOException e) {
134163
throw new ConnectException("Unable to read the '"
135164
+ schemaFilename + "' schema file", e);
@@ -195,10 +224,24 @@ public List<SourceRecord> poll() throws InterruptedException {
195224
);
196225
}
197226

227+
// Re-seed the random each time so that we can save the seed to the source offsets.
228+
long seed = random.nextLong();
229+
random.setSeed(seed);
230+
231+
// The source offsets will be the values that the next task lifetime will restore from
232+
// Essentially, the "next" state of the connector after this loop completes
233+
Map<String, Object> sourceOffset = new HashMap<>();
234+
// The next lifetime will be a member of the next generation.
235+
sourceOffset.put(TASK_GENERATION, (long) (taskGeneration + 1));
236+
// We will have produced this record
237+
sourceOffset.put(CURRENT_ITERATION, count + 1);
238+
// This is the seed that we just re-seeded for our own next iteration.
239+
sourceOffset.put(RANDOM_SEED, seed);
240+
198241
final List<SourceRecord> records = new ArrayList<>();
199242
SourceRecord record = new SourceRecord(
200-
SOURCE_PARTITION,
201-
SOURCE_OFFSET,
243+
sourcePartition,
244+
sourceOffset,
202245
topic,
203246
KEY_SCHEMA,
204247
keyString,

src/test/java/io/confluent/kafka/connect/datagen/DatagenConnectorTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,11 @@ protected void assertTaskConfigs(int maxTasks) {
6767
List<Map<String, String>> taskConfigs = connector.taskConfigs(maxTasks);
6868
assertEquals(maxTasks, taskConfigs.size());
6969
// All task configs should match the connector config
70-
for (Map<String, String> taskConfig : taskConfigs) {
71-
assertEquals(config, taskConfig);
70+
for (int i = 0; i < taskConfigs.size(); i++) {
71+
Map<String, String> taskConfig = taskConfigs.get(i);
72+
Map<String, String> expectedTaskConfig = new HashMap<>(config);
73+
expectedTaskConfig.put(DatagenTask.TASK_ID, Integer.toString(i));
74+
assertEquals(expectedTaskConfig, taskConfig);
7275
}
7376
}
7477

src/test/java/io/confluent/kafka/connect/datagen/DatagenTaskTest.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616

1717
package io.confluent.kafka.connect.datagen;
1818

19+
import io.confluent.kafka.connect.datagen.DatagenTask.Quickstart;
1920
import java.io.IOException;
2021
import java.util.ArrayList;
22+
import java.util.Collection;
23+
import java.util.Collections;
2124
import java.util.HashMap;
2225
import java.util.List;
2326
import java.util.Locale;
@@ -27,11 +30,15 @@
2730
import io.confluent.avro.random.generator.Generator;
2831
import io.confluent.connect.avro.AvroData;
2932

33+
import java.util.function.Function;
34+
import java.util.stream.Collectors;
3035
import org.apache.kafka.connect.data.Field;
3136
import org.apache.kafka.connect.data.Schema;
3237
import org.apache.kafka.connect.data.Struct;
3338
import org.apache.kafka.connect.errors.ConnectException;
3439
import org.apache.kafka.connect.source.SourceRecord;
40+
import org.apache.kafka.connect.source.SourceTaskContext;
41+
import org.apache.kafka.connect.storage.OffsetStorageReader;
3542
import org.junit.After;
3643
import org.junit.Before;
3744
import org.junit.Test;
@@ -46,6 +53,7 @@ public class DatagenTaskTest {
4653
private static final String TOPIC = "my-topic";
4754
private static final int NUM_MESSAGES = 100;
4855
private static final int MAX_INTERVAL_MS = 0;
56+
private static final int TASK_ID = 0;
4957

5058
private static final AvroData AVRO_DATA = new AvroData(20);
5159

@@ -54,11 +62,13 @@ public class DatagenTaskTest {
5462
private List<SourceRecord> records;
5563
private Schema expectedValueConnectSchema;
5664
private Schema expectedKeyConnectSchema;
65+
private Map<String, Object> sourceOffsets;
5766

5867
@Before
5968
public void setUp() throws Exception {
6069
config = new HashMap<>();
6170
records = new ArrayList<>();
71+
sourceOffsets = null;
6272
}
6373

6474
@After
@@ -112,6 +122,36 @@ public void shouldGenerateFilesForStockTradesQuickstart() throws Exception {
112122
generateAndValidateRecordsFor(DatagenTask.Quickstart.STOCK_TRADES);
113123
}
114124

125+
@Test
126+
public void shouldRestoreFromSourceOffsets() throws Exception {
127+
// Give the task an arbitrary source offset
128+
sourceOffsets = new HashMap<>();
129+
sourceOffsets.put(DatagenTask.RANDOM_SEED, 100L);
130+
sourceOffsets.put(DatagenTask.CURRENT_ITERATION, 50L);
131+
sourceOffsets.put(DatagenTask.TASK_GENERATION, 0L);
132+
createTaskWith(Quickstart.ORDERS);
133+
134+
// poll once to advance the generator
135+
SourceRecord firstPoll = task.poll().get(0);
136+
// poll a second time to predict the future
137+
SourceRecord pollA = task.poll().get(0);
138+
// extract the offsets after the first poll to restore to the next task instance
139+
//noinspection unchecked
140+
sourceOffsets = (Map<String, Object>) firstPoll.sourceOffset();
141+
createTaskWith(Quickstart.ORDERS);
142+
// poll once after the restore
143+
SourceRecord pollB = task.poll().get(0);
144+
145+
// the generation should have incremented, but the remaining details of the record should be identical
146+
assertEquals(1L, pollA.sourceOffset().get(DatagenTask.TASK_GENERATION));
147+
assertEquals(2L, pollB.sourceOffset().get(DatagenTask.TASK_GENERATION));
148+
assertEquals(pollA.sourceOffset().get(DatagenTask.TASK_ID), pollB.sourceOffset().get(DatagenTask.TASK_ID));
149+
assertEquals(pollA.sourceOffset().get(DatagenTask.CURRENT_ITERATION), pollB.sourceOffset().get(DatagenTask.CURRENT_ITERATION));
150+
assertEquals(pollA.sourcePartition(), pollB.sourcePartition());
151+
assertEquals(pollA.valueSchema(), pollB.valueSchema());
152+
assertEquals(pollA.value(), pollB.value());
153+
}
154+
115155
@Test
116156
public void shouldFailToGenerateMoreRecordsThanSpecified() throws Exception {
117157
// Generate the expected number of records
@@ -229,8 +269,40 @@ private void createTask() {
229269
config.putIfAbsent(DatagenConnectorConfig.KAFKA_TOPIC_CONF, TOPIC);
230270
config.putIfAbsent(DatagenConnectorConfig.ITERATIONS_CONF, Integer.toString(NUM_MESSAGES));
231271
config.putIfAbsent(DatagenConnectorConfig.MAXINTERVAL_CONF, Integer.toString(MAX_INTERVAL_MS));
272+
config.putIfAbsent(DatagenTask.TASK_ID, Integer.toString(TASK_ID));
232273

233274
task = new DatagenTask();
275+
// Initialize an offsetStorageReader that returns mocked sourceOffsets.
276+
task.initialize(new SourceTaskContext() {
277+
@Override
278+
public Map<String, String> configs() {
279+
return config;
280+
}
281+
282+
@Override
283+
public OffsetStorageReader offsetStorageReader() {
284+
return new OffsetStorageReader() {
285+
@Override
286+
public <T> Map<String, Object> offset(final Map<String, T> partition) {
287+
return offsets(Collections.singletonList(partition)).get(partition);
288+
}
289+
290+
@Override
291+
public <T> Map<Map<String, T>, Map<String, Object>> offsets(
292+
final Collection<Map<String, T>> partitions) {
293+
if (sourceOffsets == null) {
294+
return Collections.emptyMap();
295+
}
296+
return partitions
297+
.stream()
298+
.collect(Collectors.toMap(
299+
Function.identity(),
300+
ignored -> sourceOffsets
301+
));
302+
}
303+
};
304+
}
305+
});
234306
task.start(config);
235307
}
236308

0 commit comments

Comments
 (0)