1818import com .azure .cosmos .kafka .connect .implementation .sink .CosmosSinkTaskConfig ;
1919import com .azure .cosmos .kafka .connect .implementation .sink .IdStrategyType ;
2020import com .azure .cosmos .kafka .connect .implementation .sink .ItemWriteStrategy ;
21- import com .azure .cosmos .kafka .connect .implementation .sink .patch .KafkaCosmosPatchOperationType ;
2221import com .azure .cosmos .models .CosmosContainerProperties ;
2322import org .apache .kafka .common .config .Config ;
2423import org .apache .kafka .common .config .ConfigDef ;
2928import org .testng .annotations .Test ;
3029import reactor .core .publisher .Mono ;
3130
31+ import java .io .ByteArrayInputStream ;
32+ import java .io .ByteArrayOutputStream ;
33+ import java .io .IOException ;
34+ import java .io .InvalidClassException ;
35+ import java .io .ObjectInputStream ;
36+ import java .io .ObjectOutputStream ;
37+ import java .io .Serializable ;
3238import java .util .ArrayList ;
3339import java .util .Arrays ;
40+ import java .util .Base64 ;
3441import java .util .Collections ;
3542import java .util .HashMap ;
3643import java .util .List ;
3744import java .util .Map ;
3845import java .util .UUID ;
3946import java .util .concurrent .atomic .AtomicInteger ;
47+ import java .util .concurrent .atomic .AtomicReference ;
4048import java .util .stream .Collectors ;
4149
4250import static org .assertj .core .api .AssertionsForClassTypes .assertThat ;
@@ -214,6 +222,23 @@ public void taskConfigsForClientMetadataCachesSnapshot() {
214222 }
215223 }
216224
225+ @ Test (groups = "unit" )
226+ public void evilDeserializationIsBlocked () throws Exception {
227+ AtomicReference <String > payload = new AtomicReference <>("Test RCE payload" );
228+ Evil evil = new Evil (payload );
229+ ByteArrayOutputStream baos = new ByteArrayOutputStream ();
230+ try (ObjectOutputStream oos = new ObjectOutputStream (baos )) {
231+ oos .writeObject (evil );
232+ }
233+ String evilBase64 = Base64 .getEncoder ().encodeToString (baos .toByteArray ());
234+
235+ // Through KafkaCosmosUtils: should be blocked and return null
236+ CosmosClientMetadataCachesSnapshot snapshot =
237+ KafkaCosmosUtils .getCosmosClientMetadataFromString (evilBase64 );
238+ assertThat (snapshot ).isNull ();
239+ assertThat (payload .get ()).isEqualTo ("Test RCE payload" );
240+ }
241+
217242 @ Test (groups = "unit" )
218243 public void misFormattedConfig () {
219244 CosmosSinkConnector sinkConnector = new CosmosSinkConnector ();
@@ -450,19 +475,6 @@ public static class SinkConfigs {
450475 "azure.cosmos.sink.write.strategy" ,
451476 ItemWriteStrategy .ITEM_OVERWRITE .getName (),
452477 true ),
453- new KafkaCosmosConfigEntry <String >(
454- "azure.cosmos.sink.write.patch.operationType.default" ,
455- KafkaCosmosPatchOperationType .SET .getName (),
456- true ),
457- new KafkaCosmosConfigEntry <String >(
458- "azure.cosmos.sink.write.patch.property.configs" ,
459- Strings .Emtpy ,
460- true ),
461- new KafkaCosmosConfigEntry <String >(
462- "azure.cosmos.sink.write.patch.filter" ,
463- Strings .Emtpy ,
464- true ),
465- new KafkaCosmosConfigEntry <Integer >("azure.cosmos.sink.maxRetryCount" , 10 , true ),
466478 new KafkaCosmosConfigEntry <String >("azure.cosmos.sink.database.name" , null , false ),
467479 new KafkaCosmosConfigEntry <String >("azure.cosmos.sink.containers.topicMap" , null , false ),
468480 new KafkaCosmosConfigEntry <String >(
@@ -471,4 +483,26 @@ public static class SinkConfigs {
471483 true )
472484 );
473485 }
486+
487+ public static class Evil implements Serializable {
488+ private static final long serialVersionUID = 1L ;
489+
490+ private final AtomicReference <String > payload ;
491+
492+ public Evil (AtomicReference <String > payload ) {
493+ this .payload = payload ;
494+ }
495+
496+ private void readObject (ObjectInputStream in ) throws IOException , ClassNotFoundException {
497+ in .defaultReadObject ();
498+ System .out .println ("Payload executed" );
499+ payload .set ("Payload executed" );
500+ }
501+
502+ @ Override
503+ public String toString () {
504+ return "Evil{payload='" + payload .get () + "'}" ;
505+ }
506+ }
507+
474508}
0 commit comments