Skip to content

Commit d065e37

Browse files
authored
Improved Illegal Operation error messages (#62)
Provides a relevant error message when users use an illegal pipeline operation. KAFKA-174
1 parent e90b027 commit d065e37

File tree

3 files changed

+40
-0
lines changed

3 files changed

+40
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
- [KAFKA-205](https://jira.mongodb.org/browse/KAFKA-205) Updated Source connector to use RawBsonDocuments.
1111
- [KAFKA-201](https://jira.mongodb.org/browse/KAFKA-201) Improved copy.existing namespace handling.
1212
- [KAFKA-207](https://jira.mongodb.org/browse/KAFKA-207) Improved efficiency of heartbeats by making them tombstone messages.
13+
- [KAFKA-174](https://jira.mongodb.org/browse/KAFKA-174) Improved error messages when using invalid pipeline operators.
1314

1415
### Bug Fixes
1516
- [KAFKA-195](https://jira.mongodb.org/browse/KAFKA-195) Fixed topics.regex sink validation issue for synthetic config property

src/integrationTest/java/com/mongodb/kafka/connect/MongoSourceConnectorIntegrationTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,31 @@ void testSourceUsesHeartbeatsForOffsets() {
308308
}
309309
}
310310

311+
@Test
312+
@DisplayName("Ensure Source provides friendly error messages for invalid pipelines")
313+
void testSourceHasFriendlyErrorMessagesForInvalidPipelines() {
314+
assumeTrue(isGreaterThanFourDotZero());
315+
try (LogCapture logCapture = new LogCapture(Logger.getLogger(MongoSourceTask.class))) {
316+
MongoCollection<Document> coll = getAndCreateCollection();
317+
318+
Properties sourceProperties = new Properties();
319+
sourceProperties.put(
320+
MongoSourceConfig.DATABASE_CONFIG, coll.getNamespace().getDatabaseName());
321+
sourceProperties.put(
322+
MongoSourceConfig.COLLECTION_CONFIG, coll.getNamespace().getCollectionName());
323+
sourceProperties.put(PIPELINE_CONFIG, "[{'$group': {_id: 1 }}]");
324+
325+
addSourceConnector(sourceProperties);
326+
327+
boolean containsIllegalChangeStreamOperation =
328+
logCapture.getEvents().stream()
329+
.map(e -> e.getMessage().toString())
330+
.anyMatch(e -> e.startsWith("Illegal $changeStream operation"));
331+
332+
assertTrue(containsIllegalChangeStreamOperation);
333+
}
334+
}
335+
311336
public static class KeyValueDeserializer implements Deserializer<Integer> {
312337

313338
static final JsonDeserializer JSON_DESERIALIZER = new JsonDeserializer();

src/main/java/com/mongodb/kafka/connect/source/MongoSourceTask.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public final class MongoSourceTask extends SourceTask {
117117
private static final String NS_KEY = "ns";
118118
private static final String FULL_DOCUMENT = "fullDocument";
119119
private static final int NAMESPACE_NOT_FOUND_ERROR = 26;
120+
private static final int ILLEGAL_OPERATION_ERROR = 20;
120121
private static final int INVALIDATED_RESUME_TOKEN_ERROR = 260;
121122
private static final int UNKNOWN_FIELD_ERROR = 40415;
122123
private static final int FAILED_TO_PARSE_ERROR = 9;
@@ -404,6 +405,19 @@ private MongoChangeStreamCursor<? extends BsonDocument> tryCreateCursor(
404405
}
405406
if (e.getErrorCode() == NAMESPACE_NOT_FOUND_ERROR) {
406407
LOGGER.info("Namespace not found cursor closed.");
408+
} else if (e.getErrorCode() == ILLEGAL_OPERATION_ERROR) {
409+
LOGGER.warn(
410+
"Illegal $changeStream operation: {} {}\n\n"
411+
+ "=====================================================================================\n"
412+
+ "{}\n\n"
413+
+ "Please Note: Not all aggregation pipeline operations are suitable for modifying the\n"
414+
+ "change stream output. For more information, please see the official documentation:\n"
415+
+ " https://docs.mongodb.com/manual/changeStreams/\n"
416+
+ "=====================================================================================\n",
417+
e.getErrorMessage(),
418+
e.getErrorCode(),
419+
e.getErrorMessage());
420+
throw new ConnectException("Illegal $changeStream operation", e);
407421
} else {
408422
LOGGER.warn(
409423
"Failed to resume change stream: {} {}\n\n"

0 commit comments

Comments
 (0)