9
9
import java .util .Map ;
10
10
11
11
/**
12
- * Helper class to construct kafka data schema and kafka data record .
12
+ * Helper class to construct schema and record for Kafka Data Field .
13
13
*/
14
14
public class KafkaDataBuilder {
15
15
@@ -18,6 +18,12 @@ public class KafkaDataBuilder {
18
18
public static final String KAFKA_DATA_OFFSET_FIELD_NAME = "offset" ;
19
19
public static final String KAFKA_DATA_INSERT_TIME_FIELD_NAME = "insertTime" ;
20
20
21
+ /**
22
+ * Construct schema for Kafka Data Field
23
+ *
24
+ * @param kafkaDataFieldName The configured name of Kafka Data Field
25
+ * @return Field of Kafka Data, with definitions of kafka topic, partition, offset, and insertTime.
26
+ */
21
27
public static Field buildKafkaDataField (String kafkaDataFieldName ) {
22
28
Field topicField = com .google .cloud .bigquery .Field .of (KAFKA_DATA_TOPIC_FIELD_NAME , LegacySQLTypeName .STRING );
23
29
Field partitionField = com .google .cloud .bigquery .Field .of (KAFKA_DATA_PARTITION_FIELD_NAME , LegacySQLTypeName .INTEGER );
@@ -32,6 +38,12 @@ public static Field buildKafkaDataField(String kafkaDataFieldName) {
32
38
.setMode (com .google .cloud .bigquery .Field .Mode .NULLABLE ).build ();
33
39
}
34
40
41
+ /**
42
+ * Construct a map of Kafka Data record
43
+ *
44
+ * @param kafkaConnectRecord Kafka sink record to build kafka data from.
45
+ * @return HashMap which contains the values of kafka topic, partition, offset, and insertTime.
46
+ */
35
47
public static Map <String , Object > buildKafkaDataRecord (SinkRecord kafkaConnectRecord ) {
36
48
HashMap <String , Object > kafkaData = new HashMap <>();
37
49
kafkaData .put (KAFKA_DATA_TOPIC_FIELD_NAME , kafkaConnectRecord .topic ());
0 commit comments