Skip to content

Commit 04c11eb

Browse files
committed
spotless
1 parent 24bb243 commit 04c11eb

File tree

3 files changed

+50
-44
lines changed

3 files changed

+50
-44
lines changed

v2/cdc-parent/cdc-embedded-connector/src/main/java/com/google/cloud/dataflow/cdc/connector/DebeziumToPubSubDataSender.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,7 @@ public DebeziumToPubSubDataSender(
149149
LOG.info("Setting up in memory offset storage.");
150150
configBuilder =
151151
configBuilder.with(
152-
"offset.storage",
153-
"org.apache.kafka.connect.storage.MemoryOffsetBackingStore");
152+
"offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore");
154153
} else {
155154
LOG.info("Setting up in File-based offset storage in {}.", this.offsetStorageFile);
156155
configBuilder =

v2/cdc-parent/cdc-embedded-connector/src/main/java/com/google/cloud/dataflow/cdc/connector/PubSubChangeConsumer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@
4141
import org.slf4j.LoggerFactory;
4242

4343
/** Implements Debezium's Embedded Engine change consumer to push data to PubSub. */
44-
public class PubSubChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
44+
public class PubSubChangeConsumer
45+
implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
4546

4647
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(PubSubChangeConsumer.class);
4748

@@ -101,7 +102,9 @@ private RowCoder getCoderForRow(String tableName, Row record) {
101102
}
102103

103104
@Override
104-
public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> records, RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer)
105+
public void handleBatch(
106+
List<ChangeEvent<SourceRecord, SourceRecord>> records,
107+
RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer)
105108
throws InterruptedException {
106109

107110
ImmutableList.Builder<ApiFuture<String>> futureListBuilder = ImmutableList.builder();

v2/cdc-parent/cdc-embedded-connector/src/test/java/com/google/cloud/dataflow/cdc/connector/PubSubChangeConsumerTest.java

Lines changed: 44 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -90,51 +90,55 @@ public void testBasicRecordAndFilteredRecordInput() throws InterruptedException
9090
// is whitelisted, therefore, only one will be published to pubsub.
9191
ChangeEvent<SourceRecord, SourceRecord> mockEvent1 = Mockito.mock(ChangeEvent.class);
9292
ChangeEvent<SourceRecord, SourceRecord> mockEvent2 = Mockito.mock(ChangeEvent.class);
93-
94-
SourceRecord record1 = new SourceRecord(
95-
ImmutableMap.of("server", "mainstance"),
96-
ImmutableMap.of(
97-
"file",
98-
"mysql-bin.000023",
99-
"pos",
100-
110489,
101-
"gtids",
102-
"36797132-a366-11e9-ac33-42010a800456:1-6407169",
103-
"row",
104-
1,
105-
"snapshot",
106-
true),
107-
topicName,
108-
keySchema,
109-
key,
110-
valueSchema,
111-
value);
112-
113-
SourceRecord record2 = new SourceRecord(
114-
ImmutableMap.of("server", "mainstance"),
115-
ImmutableMap.of(
116-
"file",
117-
"mysql-bin.000023",
118-
"pos",
119-
110490,
120-
"gtids",
121-
"36797132-a366-11e9-ac33-42010a800456:1-6407169",
122-
"row",
123-
1,
124-
"snapshot",
125-
true),
126-
"NOTWHITELISTEDTOPIC!", // A topic that was NOT whitelisted
127-
keySchema,
128-
key,
129-
valueSchema,
130-
value);
93+
94+
SourceRecord record1 =
95+
new SourceRecord(
96+
ImmutableMap.of("server", "mainstance"),
97+
ImmutableMap.of(
98+
"file",
99+
"mysql-bin.000023",
100+
"pos",
101+
110489,
102+
"gtids",
103+
"36797132-a366-11e9-ac33-42010a800456:1-6407169",
104+
"row",
105+
1,
106+
"snapshot",
107+
true),
108+
topicName,
109+
keySchema,
110+
key,
111+
valueSchema,
112+
value);
113+
114+
SourceRecord record2 =
115+
new SourceRecord(
116+
ImmutableMap.of("server", "mainstance"),
117+
ImmutableMap.of(
118+
"file",
119+
"mysql-bin.000023",
120+
"pos",
121+
110490,
122+
"gtids",
123+
"36797132-a366-11e9-ac33-42010a800456:1-6407169",
124+
"row",
125+
1,
126+
"snapshot",
127+
true),
128+
"NOTWHITELISTEDTOPIC!", // A topic that was NOT whitelisted
129+
keySchema,
130+
key,
131+
valueSchema,
132+
value);
131133

132134
Mockito.when(mockEvent1.value()).thenReturn(record1);
133135
Mockito.when(mockEvent2.value()).thenReturn(record2);
134136

135-
List<ChangeEvent<SourceRecord, SourceRecord>> recordBatch = ImmutableList.of(mockEvent1, mockEvent2);
137+
List<ChangeEvent<SourceRecord, SourceRecord>> recordBatch =
138+
ImmutableList.of(mockEvent1, mockEvent2);
136139

137-
RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> mockCommitter = Mockito.mock(RecordCommitter.class);
140+
RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> mockCommitter =
141+
Mockito.mock(RecordCommitter.class);
138142

139143
changeConsumer.handleBatch(recordBatch, mockCommitter);
140144

0 commit comments

Comments
 (0)