File tree Expand file tree Collapse file tree 3 files changed +17
-0
lines changed
src/main/java/com/mongodb/kafka/connect/sink/cdc/debezium/mongodb Expand file tree Collapse file tree 3 files changed +17
-0
lines changed Original file line number Diff line number Diff line change @@ -41,6 +41,10 @@ public WriteModel<BsonDocument> perform(final SinkDocument doc) {
4141 .orElseThrow (
4242 () -> new DataException ("Key document must not be missing for delete operation" ));
4343
44+ if (!keyDoc .containsKey (JSON_ID_FIELD )) {
45+ throw new DataException (format ("Delete document missing `%s` field." , JSON_ID_FIELD ));
46+ }
47+
4448 try {
4549 return new DeleteOneModel <>(
4650 BsonDocument .parse (
Original file line number Diff line number Diff line change 1919package com .mongodb .kafka .connect .sink .cdc .debezium .mongodb ;
2020
2121import static com .mongodb .kafka .connect .sink .cdc .debezium .mongodb .MongoDbHandler .ID_FIELD ;
22+ import static java .lang .String .format ;
2223
2324import org .apache .kafka .connect .errors .DataException ;
2425
@@ -44,6 +45,10 @@ public WriteModel<BsonDocument> perform(final SinkDocument doc) {
4445 .orElseThrow (
4546 () -> new DataException ("Value document must not be missing for insert operation" ));
4647
48+ if (!valueDoc .containsKey (JSON_DOC_FIELD_PATH )) {
49+ throw new DataException (format ("Insert document missing `%s` field." , JSON_DOC_FIELD_PATH ));
50+ }
51+
4752 try {
4853 BsonDocument insertDoc =
4954 BsonDocument .parse (valueDoc .get (JSON_DOC_FIELD_PATH ).asString ().getValue ());
Original file line number Diff line number Diff line change @@ -47,6 +47,10 @@ public WriteModel<BsonDocument> perform(final SinkDocument doc) {
4747 .orElseThrow (
4848 () -> new DataException ("Value document must not be missing for update operation" ));
4949
50+ if (!valueDoc .containsKey (JSON_DOC_FIELD_PATH )) {
51+ throw new DataException (format ("Update document missing `%s` field." , JSON_DOC_FIELD_PATH ));
52+ }
53+
5054 try {
5155 BsonDocument updateDoc =
5256 BsonDocument .parse (valueDoc .getString (JSON_DOC_FIELD_PATH ).getValue ());
@@ -69,6 +73,10 @@ public WriteModel<BsonDocument> perform(final SinkDocument doc) {
6973 .orElseThrow (
7074 () -> new DataException ("Key document must not be missing for update operation" ));
7175
76+ if (!keyDoc .containsKey (JSON_ID_FIELD )) {
77+ throw new DataException (format ("Update document missing `%s` field." , JSON_ID_FIELD ));
78+ }
79+
7280 BsonDocument filterDoc =
7381 BsonDocument .parse (
7482 format ("{%s: %s}" , ID_FIELD , keyDoc .getString (JSON_ID_FIELD ).getValue ()));
You can’t perform that action at this time.
0 commit comments