Skip to content

Commit af537fc

Browse files
rozzaortex
andauthored
Added copy existing namespaces by regex configuration (#40)
* Added copy existing namespaces by regex configuration KAFKA-147 Co-authored-by: abushmin <[email protected]>
1 parent 7f65f45 commit af537fc

File tree

6 files changed

+269
-59
lines changed

6 files changed

+269
-59
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
- [KAFKA-155](https://jira.mongodb.org/browse/KAFKA-155) Fix business key update strategies to use dot notation for filters
2424
- [KAFKA-105](https://jira.mongodb.org/browse/KAFKA-105) Improve `errors.tolerance=all` support in the sink and source connectors.
2525
- [KAFKA-106](https://jira.mongodb.org/browse/KAFKA-106) Changed `max.num.retries` default to 1. A safer default especially as the driver now has retryable writes.
26+
- [KAFKA-147](https://jira.mongodb.org/browse/KAFKA-147) Added `copy.existing.namespace.regex` configuration, that allows the filtering of namespaces to be copied.
27+
2628

2729
## 1.2.0
2830
- [KAFKA-92](https://jira.mongodb.org/browse/KAFKA-92) Allow the Sink connector to use multiple tasks.

src/integrationTest/java/com/mongodb/kafka/connect/MongoSourceConnectorIntegrationTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,45 @@ void testSourceLoadsDataFromCollectionCopyExistingBson() {
182182
assertProduced(createInserts(1, 100), coll, OutputFormat.BSON);
183183
}
184184

185+
@Test
186+
@DisplayName("Ensure source loads data from collection with copy existing data by regex")
187+
void testSourceLoadsDataFromCollectionCopyExistingByRegex() {
188+
assumeTrue(isGreaterThanFourDotZero());
189+
MongoDatabase db1 = getDatabaseWithPostfix();
190+
MongoDatabase db2 = getDatabaseWithPostfix();
191+
MongoDatabase db3 = getDatabaseWithPostfix();
192+
MongoCollection<Document> coll1 = db1.getCollection("coll1");
193+
MongoCollection<Document> coll21 = db2.getCollection("coll1");
194+
MongoCollection<Document> coll22 = db2.getCollection("coll2");
195+
MongoCollection<Document> coll23 = db2.getCollection("coll3");
196+
MongoCollection<Document> coll3 = db3.getCollection("coll1");
197+
198+
insertMany(rangeClosed(1, 50), coll1);
199+
insertMany(rangeClosed(1, 50), coll21);
200+
insertMany(rangeClosed(1, 50), coll22);
201+
insertMany(rangeClosed(1, 50), coll23);
202+
insertMany(rangeClosed(1, 50), coll3);
203+
204+
Properties sourceProperties = new Properties();
205+
sourceProperties.put(MongoSourceConfig.COPY_EXISTING_CONFIG, "true");
206+
String namespaceRegex =
207+
String.format("(%s\\.coll1|%s\\.coll(1|3))", db1.getName(), db2.getName());
208+
sourceProperties.put(MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG, namespaceRegex);
209+
210+
addSourceConnector(sourceProperties);
211+
212+
insertMany(rangeClosed(51, 100), coll1);
213+
insertMany(rangeClosed(51, 100), coll21);
214+
insertMany(rangeClosed(51, 100), coll22);
215+
insertMany(rangeClosed(51, 100), coll23);
216+
insertMany(rangeClosed(51, 100), coll3);
217+
assertProduced(createInserts(1, 100), coll1);
218+
assertProduced(createInserts(1, 100), coll21);
219+
assertProduced(createInserts(51, 100), coll22);
220+
assertProduced(createInserts(1, 100), coll23);
221+
assertProduced(createInserts(51, 100), coll3);
222+
}
223+
185224
@Test
186225
@DisplayName("Ensure Schema Key and Value output")
187226
void testSchemaKeyAndValueOutput() {

src/main/java/com/mongodb/kafka/connect/source/MongoCopyDataManager.java

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
import static com.mongodb.kafka.connect.source.MongoSourceConfig.COLLECTION_CONFIG;
1919
import static com.mongodb.kafka.connect.source.MongoSourceConfig.COPY_EXISTING_MAX_THREADS_CONFIG;
20+
import static com.mongodb.kafka.connect.source.MongoSourceConfig.COPY_EXISTING_NAMESPACE_REGEX_CONFIG;
2021
import static com.mongodb.kafka.connect.source.MongoSourceConfig.COPY_EXISTING_PIPELINE_CONFIG;
2122
import static com.mongodb.kafka.connect.source.MongoSourceConfig.COPY_EXISTING_QUEUE_SIZE_CONFIG;
2223
import static com.mongodb.kafka.connect.source.MongoSourceConfig.DATABASE_CONFIG;
2324
import static java.util.Collections.singletonList;
25+
import static java.util.stream.Collectors.toList;
2426

2527
import java.util.ArrayList;
2628
import java.util.Collection;
@@ -31,7 +33,8 @@
3133
import java.util.concurrent.Executors;
3234
import java.util.concurrent.atomic.AtomicInteger;
3335
import java.util.function.Consumer;
34-
import java.util.stream.Collectors;
36+
import java.util.function.Predicate;
37+
import java.util.regex.Pattern;
3538

3639
import org.apache.kafka.connect.errors.ConnectException;
3740
import org.slf4j.Logger;
@@ -70,17 +73,8 @@ class MongoCopyDataManager implements AutoCloseable {
7073
this.sourceConfig = sourceConfig;
7174
this.mongoClient = mongoClient;
7275

73-
String database = sourceConfig.getString(DATABASE_CONFIG);
74-
String collection = sourceConfig.getString(COLLECTION_CONFIG);
76+
List<MongoNamespace> namespaces = selectNamespaces(sourceConfig, mongoClient);
7577

76-
List<MongoNamespace> namespaces;
77-
if (database.isEmpty()) {
78-
namespaces = getCollections(mongoClient);
79-
} else if (collection.isEmpty()) {
80-
namespaces = getCollections(mongoClient, database);
81-
} else {
82-
namespaces = singletonList(createNamespace(database, collection));
83-
}
8478
LOGGER.info("Copying existing data on the following namespaces: {}", namespaces);
8579
namespacesToCopy = new AtomicInteger(namespaces.size());
8680
queue = new ArrayBlockingQueue<>(sourceConfig.getInt(COPY_EXISTING_QUEUE_SIZE_CONFIG));
@@ -142,6 +136,31 @@ private void putToQueue(final BsonDocument bsonDocument) {
142136
}
143137
}
144138

139+
static List<MongoNamespace> selectNamespaces(
140+
final MongoSourceConfig sourceConfig, final MongoClient mongoClient) {
141+
142+
String database = sourceConfig.getString(DATABASE_CONFIG);
143+
String collection = sourceConfig.getString(COLLECTION_CONFIG);
144+
String namespacesRegex = sourceConfig.getString(COPY_EXISTING_NAMESPACE_REGEX_CONFIG);
145+
146+
List<MongoNamespace> namespaces;
147+
if (database.isEmpty()) {
148+
namespaces = getCollections(mongoClient);
149+
} else if (collection.isEmpty()) {
150+
namespaces = getCollections(mongoClient, database);
151+
} else {
152+
namespaces = singletonList(createNamespace(database, collection));
153+
}
154+
155+
if (!namespacesRegex.isEmpty()) {
156+
Predicate<String> predicate = Pattern.compile(namespacesRegex).asPredicate();
157+
namespaces =
158+
namespaces.stream().filter(n -> predicate.test(n.getFullName())).collect(toList());
159+
}
160+
161+
return namespaces;
162+
}
163+
145164
static List<Bson> createPipeline(final MongoSourceConfig cfg, final MongoNamespace namespace) {
146165
List<Bson> pipeline = new ArrayList<>();
147166
cfg.getPipeline(COPY_EXISTING_PIPELINE_CONFIG).map(pipeline::addAll);
@@ -166,15 +185,15 @@ private static List<MongoNamespace> getCollections(final MongoClient mongoClient
166185
.filter(s -> !(s.startsWith("admin") || s.startsWith("config") || s.startsWith("local")))
167186
.map(d -> getCollections(mongoClient, d))
168187
.flatMap(Collection::stream)
169-
.collect(Collectors.toList());
188+
.collect(toList());
170189
}
171190

172191
private static List<MongoNamespace> getCollections(
173192
final MongoClient mongoClient, final String database) {
174193
return mongoClient.getDatabase(database).listCollectionNames().into(new ArrayList<>()).stream()
175194
.filter(s -> !s.startsWith("system."))
176195
.map(c -> createNamespace(database, c))
177-
.collect(Collectors.toList());
196+
.collect(toList());
178197
}
179198

180199
private static MongoNamespace createNamespace(final String database, final String collection) {

src/main/java/com/mongodb/kafka/connect/source/MongoSourceConfig.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,16 @@ public class MongoSourceConfig extends AbstractConfig {
230230
+ "Example: `[{\"$match\": {\"closed\": \"false\"}}]`";
231231
private static final String COPY_EXISTING_PIPELINE_DEFAULT = "";
232232

233+
public static final String COPY_EXISTING_NAMESPACE_REGEX_CONFIG = "copy.existing.namespace.regex";
234+
private static final String COPY_EXISTING_NAMESPACE_REGEX_DISPLAY =
235+
"Copy existing namespace regex";
236+
private static final String COPY_EXISTING_NAMESPACE_REGEX_DOC =
237+
"Use a regular expression to define from which existing namespaces data should be copied from."
238+
+ " A namespace is the database name and collection separated by a period e.g. `database.collection`.\n"
239+
+ " Example: The following regular expression will only include collections starting with `a` "
240+
+ "in the `demo` database: `demo\\.a.*`";
241+
private static final String COPY_EXISTING_NAMESPACE_REGEX_DEFAULT = "";
242+
233243
public static final String ERRORS_TOLERANCE_CONFIG = "errors.tolerance";
234244
public static final String ERRORS_TOLERANCE_DISPLAY = "Error Tolerance";
235245
public static final ErrorTolerance ERRORS_TOLERANCE_DEFAULT = ErrorTolerance.NONE;
@@ -659,6 +669,18 @@ public Map<String, ConfigValue> validateAll(final Map<String, String> props) {
659669
Width.MEDIUM,
660670
COPY_EXISTING_PIPELINE_DISPLAY);
661671

672+
configDef.define(
673+
COPY_EXISTING_NAMESPACE_REGEX_CONFIG,
674+
Type.STRING,
675+
COPY_EXISTING_NAMESPACE_REGEX_DEFAULT,
676+
Validators.emptyString().or(Validators.isAValidRegex()),
677+
Importance.MEDIUM,
678+
COPY_EXISTING_NAMESPACE_REGEX_DOC,
679+
group,
680+
++orderInGroup,
681+
Width.MEDIUM,
682+
COPY_EXISTING_NAMESPACE_REGEX_DISPLAY);
683+
662684
group = "Errors";
663685
orderInGroup = 0;
664686

0 commit comments

Comments
 (0)