|
21 | 21 | import java.util.ArrayList;
|
22 | 22 | import java.util.Collections;
|
23 | 23 | import java.util.HashMap;
|
24 |
| -import java.util.HashSet; |
25 | 24 | import java.util.List;
|
26 | 25 | import java.util.Map;
|
27 | 26 | import java.util.Random;
|
28 |
| -import java.util.Set; |
29 |
| -import java.util.stream.Collectors; |
30 |
| -import org.apache.avro.generic.GenericData.Record; |
31 | 27 | import org.apache.avro.generic.GenericRecord;
|
32 |
| -import org.apache.kafka.connect.data.Field; |
33 | 28 | import org.apache.kafka.connect.data.Schema;
|
34 | 29 | import org.apache.kafka.connect.data.SchemaAndValue;
|
35 |
| -import org.apache.kafka.connect.data.SchemaBuilder; |
36 |
| -import org.apache.kafka.connect.data.Struct; |
37 | 30 | import org.apache.kafka.connect.errors.ConnectException;
|
38 | 31 | import org.apache.kafka.connect.header.ConnectHeaders;
|
39 | 32 | import org.apache.kafka.connect.source.SourceRecord;
|
@@ -67,49 +60,6 @@ public class DatagenTask extends SourceTask {
|
67 | 60 | private long taskGeneration;
|
68 | 61 | private final Random random = new Random();
|
69 | 62 |
|
70 |
| - protected enum Quickstart { |
71 |
| - CLICKSTREAM_CODES("clickstream_codes_schema.avro", "code"), |
72 |
| - CLICKSTREAM("clickstream_schema.avro", "ip"), |
73 |
| - CLICKSTREAM_USERS("clickstream_users_schema.avro", "user_id"), |
74 |
| - ORDERS("orders_schema.avro", "orderid"), |
75 |
| - RATINGS("ratings_schema.avro", "rating_id"), |
76 |
| - USERS("users_schema.avro", "userid"), |
77 |
| - USERS_("users_array_map_schema.avro", "userid"), |
78 |
| - PAGEVIEWS("pageviews_schema.avro", "viewtime"), |
79 |
| - STOCK_TRADES("stock_trades_schema.avro", "symbol"), |
80 |
| - INVENTORY("inventory.avro", "id"), |
81 |
| - PRODUCT("product.avro", "id"), |
82 |
| - PURCHASES("purchase.avro", "id"), |
83 |
| - TRANSACTIONS("transactions.avro", "transaction_id"), |
84 |
| - STORES("stores.avro", "store_id"), |
85 |
| - CREDIT_CARDS("credit_cards.avro", "card_id"), |
86 |
| - CAMPAIGN_FINANCE("campaign_finance.avro", "candidate_id"); |
87 |
| - |
88 |
| - static final Set<String> configValues = new HashSet<>(); |
89 |
| - |
90 |
| - static { |
91 |
| - for (Quickstart q : Quickstart.values()) { |
92 |
| - configValues.add(q.name().toLowerCase()); |
93 |
| - } |
94 |
| - } |
95 |
| - |
96 |
| - private final String schemaFilename; |
97 |
| - private final String keyName; |
98 |
| - |
99 |
| - Quickstart(String schemaFilename, String keyName) { |
100 |
| - this.schemaFilename = schemaFilename; |
101 |
| - this.keyName = keyName; |
102 |
| - } |
103 |
| - |
104 |
| - public String getSchemaFilename() { |
105 |
| - return schemaFilename; |
106 |
| - } |
107 |
| - |
108 |
| - public String getSchemaKeyField() { |
109 |
| - return keyName; |
110 |
| - } |
111 |
| - } |
112 |
| - |
113 | 63 | @Override
|
114 | 64 | public String version() {
|
115 | 65 | return VersionUtil.getVersion();
|
@@ -176,19 +126,6 @@ public List<SourceRecord> poll() throws InterruptedException {
|
176 | 126 | }
|
177 | 127 | final GenericRecord randomAvroMessage = (GenericRecord) generatedObject;
|
178 | 128 |
|
179 |
| - final List<Object> genericRowValues = new ArrayList<>(); |
180 |
| - for (org.apache.avro.Schema.Field field : avroSchema.getFields()) { |
181 |
| - final Object value = randomAvroMessage.get(field.name()); |
182 |
| - if (value instanceof Record) { |
183 |
| - final Record record = (Record) value; |
184 |
| - final Object ksqlValue = avroData.toConnectData(record.getSchema(), record).value(); |
185 |
| - Object optionValue = getOptionalValue(ksqlSchema.field(field.name()).schema(), ksqlValue); |
186 |
| - genericRowValues.add(optionValue); |
187 |
| - } else { |
188 |
| - genericRowValues.add(value); |
189 |
| - } |
190 |
| - } |
191 |
| - |
192 | 129 | // Key
|
193 | 130 | SchemaAndValue key = new SchemaAndValue(DEFAULT_KEY_SCHEMA, null);
|
194 | 131 | if (!schemaKeyField.isEmpty()) {
|
@@ -248,84 +185,4 @@ public List<SourceRecord> poll() throws InterruptedException {
|
248 | 185 | @Override
|
249 | 186 | public void stop() {
|
250 | 187 | }
|
251 |
| - |
252 |
| - private org.apache.kafka.connect.data.Schema getOptionalSchema( |
253 |
| - final org.apache.kafka.connect.data.Schema schema |
254 |
| - ) { |
255 |
| - switch (schema.type()) { |
256 |
| - case BOOLEAN: |
257 |
| - return org.apache.kafka.connect.data.Schema.OPTIONAL_BOOLEAN_SCHEMA; |
258 |
| - case INT32: |
259 |
| - return org.apache.kafka.connect.data.Schema.OPTIONAL_INT32_SCHEMA; |
260 |
| - case INT64: |
261 |
| - return org.apache.kafka.connect.data.Schema.OPTIONAL_INT64_SCHEMA; |
262 |
| - case FLOAT64: |
263 |
| - return org.apache.kafka.connect.data.Schema.OPTIONAL_FLOAT64_SCHEMA; |
264 |
| - case STRING: |
265 |
| - return org.apache.kafka.connect.data.Schema.OPTIONAL_STRING_SCHEMA; |
266 |
| - case ARRAY: |
267 |
| - return SchemaBuilder.array(getOptionalSchema(schema.valueSchema())).optional().build(); |
268 |
| - case MAP: |
269 |
| - return SchemaBuilder.map( |
270 |
| - getOptionalSchema(schema.keySchema()), |
271 |
| - getOptionalSchema(schema.valueSchema()) |
272 |
| - ).optional().build(); |
273 |
| - case STRUCT: |
274 |
| - final SchemaBuilder schemaBuilder = SchemaBuilder.struct(); |
275 |
| - for (Field field : schema.fields()) { |
276 |
| - schemaBuilder.field( |
277 |
| - field.name(), |
278 |
| - getOptionalSchema(field.schema()) |
279 |
| - ); |
280 |
| - } |
281 |
| - return schemaBuilder.optional().build(); |
282 |
| - default: |
283 |
| - throw new ConnectException("Unsupported type: " + schema); |
284 |
| - } |
285 |
| - } |
286 |
| - |
287 |
| - private Object getOptionalValue( |
288 |
| - final org.apache.kafka.connect.data.Schema schema, |
289 |
| - final Object value |
290 |
| - ) { |
291 |
| - if (value == null) { |
292 |
| - return null; |
293 |
| - } |
294 |
| - |
295 |
| - switch (schema.type()) { |
296 |
| - case BOOLEAN: |
297 |
| - case INT32: |
298 |
| - case INT64: |
299 |
| - case FLOAT64: |
300 |
| - case STRING: |
301 |
| - return value; |
302 |
| - case ARRAY: |
303 |
| - final List<?> list = (List<?>) value; |
304 |
| - return list.stream() |
305 |
| - .map(listItem -> getOptionalValue(schema.valueSchema(), listItem)) |
306 |
| - .collect(Collectors.toList()); |
307 |
| - case MAP: |
308 |
| - final Map<?, ?> map = (Map<?, ?>) value; |
309 |
| - return map.entrySet().stream() |
310 |
| - .collect(Collectors.toMap( |
311 |
| - k -> getOptionalValue(schema.keySchema(), k), |
312 |
| - v -> getOptionalValue(schema.valueSchema(), v) |
313 |
| - )); |
314 |
| - case STRUCT: |
315 |
| - final Struct struct = (Struct) value; |
316 |
| - final Struct optionalStruct = new Struct(getOptionalSchema(schema)); |
317 |
| - for (Field field : schema.fields()) { |
318 |
| - optionalStruct.put( |
319 |
| - field.name(), |
320 |
| - getOptionalValue( |
321 |
| - field.schema(), |
322 |
| - struct.get(field.name()) |
323 |
| - ) |
324 |
| - ); |
325 |
| - } |
326 |
| - return optionalStruct; |
327 |
| - default: |
328 |
| - throw new ConnectException("Invalid value schema: " + schema + ", value = " + value); |
329 |
| - } |
330 |
| - } |
331 | 188 | }
|
0 commit comments