Skip to content

Commit 67aab08

Browse files
committed
Tests: Added offset resetting to the embedded Kafka
1 parent 1485edd commit 67aab08

File tree

4 files changed

+27
-11
lines changed

4 files changed

+27
-11
lines changed

src/integrationTest/java/com/mongodb/kafka/connect/embedded/ConnectStandalone.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
3838
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
3939
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
40-
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
40+
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
4141
import org.apache.kafka.connect.util.ConnectUtils;
4242
import org.apache.kafka.connect.util.FutureCallback;
4343
import org.slf4j.Logger;
@@ -51,6 +51,8 @@ class ConnectStandalone {
5151
private final Herder herder;
5252
private final Connect connect;
5353

54+
private final ResettableOffsetStore resettableOffsetStore;
55+
5456
@SuppressWarnings("unchecked")
5557
ConnectStandalone(final Properties workerProperties) {
5658
Time time = Time.SYSTEM;
@@ -60,6 +62,7 @@ class ConnectStandalone {
6062
initInfo.logAll();
6163

6264
Map<String, String> workerProps = (Map) workerProperties;
65+
resettableOffsetStore = new ResettableOffsetStore();
6366

6467
LOGGER.info("Scanning for plugin classes. This might take a moment ...");
6568
Plugins plugins = new Plugins(workerProps);
@@ -78,12 +81,7 @@ class ConnectStandalone {
7881
new NoneConnectorClientConfigOverridePolicy();
7982
Worker worker =
8083
new Worker(
81-
workerId,
82-
time,
83-
plugins,
84-
config,
85-
new FileOffsetBackingStore(),
86-
clientConfigOverridePolicy);
84+
workerId, time, plugins, config, resettableOffsetStore, clientConfigOverridePolicy);
8785
this.herder = new StandaloneHerder(worker, kafkaClusterId, clientConfigOverridePolicy);
8886
connectionString = advertisedUrl.toString() + herder.kafkaClusterId();
8987

@@ -166,6 +164,10 @@ void deleteConnector(final String name) {
166164
}
167165
}
168166

167+
void resetOffsets() {
168+
resettableOffsetStore.reset();
169+
}
170+
169171
void stop() {
170172
LOGGER.debug("Connect Standalone stop called");
171173
connect.stop();
@@ -177,4 +179,11 @@ class ConnectorConfigurationException extends RuntimeException {
177179
super(cause);
178180
}
179181
}
182+
183+
private static class ResettableOffsetStore extends MemoryOffsetBackingStore {
184+
185+
void reset() {
186+
data.clear();
187+
}
188+
}
180189
}

src/integrationTest/java/com/mongodb/kafka/connect/embedded/EmbeddedKafka.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636

3737
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
3838
import org.apache.kafka.common.security.JaasUtils;
39-
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
4039
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
4140
import org.apache.kafka.streams.integration.utils.KafkaEmbedded;
4241
import org.apache.kafka.test.TestCondition;
@@ -253,6 +252,10 @@ public void deleteSourceConnector() {
253252
}
254253
}
255254

255+
public void resetOffsets() {
256+
connect.resetOffsets();
257+
}
258+
256259
private Properties effectiveBrokerConfigFrom(
257260
final Properties brokerConfig, final ZooKeeperEmbedded zookeeper) {
258261
final Properties effectiveConfig = new Properties();
@@ -273,7 +276,6 @@ private Properties effectiveBrokerConfigFrom(
273276
private Properties connectWorkerConfig() {
274277
Properties workerProps = new Properties();
275278
workerProps.put(StandaloneConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
276-
workerProps.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
277279
workerProps.put(
278280
StandaloneConfig.KEY_CONVERTER_CLASS_CONFIG,
279281
"org.apache.kafka.connect.storage.StringConverter");
@@ -295,6 +297,7 @@ public void beforeAll(final ExtensionContext context) throws Exception {
295297

296298
@Override
297299
public void afterEach(final ExtensionContext context) {
300+
resetOffsets();
298301
deleteSinkConnector();
299302
deleteSourceConnector();
300303
}

src/integrationTest/java/com/mongodb/kafka/connect/mongodb/MongoKafkaTestCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ public int getMaxWireVersion() {
132132
}
133133

134134
public void cleanUp() {
135+
KAFKA.resetOffsets();
135136
getMongoClient()
136137
.listDatabaseNames()
137138
.into(new ArrayList<>())

src/integrationTest/java/com/mongodb/kafka/connect/source/MongoSourceTaskIntegrationTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -431,15 +431,18 @@ void testSourceGeneratesHeartbeats() {
431431
put(MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
432432
put(MongoSourceConfig.HEARTBEAT_TOPIC_NAME_CONFIG, "heartBeatTopic");
433433
put(MongoSourceConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "1000");
434-
put(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, "50");
434+
put(MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, "10");
435435
}
436436
};
437437

438438
task.start(cfg);
439439

440+
insertMany(rangeClosed(1, 10), coll);
441+
getNextResults(task).forEach(s -> assertNotEquals("heartBeatTopic", s.topic()));
442+
440443
getNextResults(task).forEach(s -> assertEquals("heartBeatTopic", s.topic()));
441444

442-
insertMany(rangeClosed(1, 50), coll);
445+
insertMany(rangeClosed(11, 20), coll);
443446
getNextResults(task).forEach(s -> assertNotEquals("heartBeatTopic", s.topic()));
444447
}
445448
}

0 commit comments

Comments
 (0)