Skip to content

Commit ceb1af1

Browse files
Copilotporunov
andcommitted
Add CDC module with core infrastructure for mixed index mutations
Implemented CDC support infrastructure including: - CDC event model and Kafka producer/consumer - CDC-aware index transaction wrapper - Configuration options for CDC modes - Basic unit tests Co-authored-by: porunov <[email protected]>
1 parent 5b06ebe commit ceb1af1

File tree

10 files changed

+1067
-0
lines changed

10 files changed

+1067
-0
lines changed

janusgraph-cdc/pom.xml

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<parent>
8+
<groupId>org.janusgraph</groupId>
9+
<artifactId>janusgraph</artifactId>
10+
<version>1.2.0-SNAPSHOT</version>
11+
</parent>
12+
13+
<artifactId>janusgraph-cdc</artifactId>
14+
<name>JanusGraph-CDC: Change Data Capture Support</name>
15+
<url>https://janusgraph.org</url>
16+
17+
<properties>
18+
<top.level.basedir>${basedir}/..</top.level.basedir>
19+
<kafka.version>3.6.1</kafka.version>
20+
<debezium.version>2.5.0.Final</debezium.version>
21+
</properties>
22+
23+
<dependencies>
24+
<!-- JanusGraph Core -->
25+
<dependency>
26+
<groupId>org.janusgraph</groupId>
27+
<artifactId>janusgraph-core</artifactId>
28+
<version>${project.version}</version>
29+
</dependency>
30+
31+
<!-- Kafka Dependencies -->
32+
<dependency>
33+
<groupId>org.apache.kafka</groupId>
34+
<artifactId>kafka-clients</artifactId>
35+
<version>${kafka.version}</version>
36+
</dependency>
37+
38+
<!-- Debezium Dependencies -->
39+
<dependency>
40+
<groupId>io.debezium</groupId>
41+
<artifactId>debezium-core</artifactId>
42+
<version>${debezium.version}</version>
43+
</dependency>
44+
45+
<!-- Jackson for JSON serialization -->
46+
<dependency>
47+
<groupId>com.fasterxml.jackson.core</groupId>
48+
<artifactId>jackson-databind</artifactId>
49+
</dependency>
50+
51+
<!-- SLF4J for logging -->
52+
<dependency>
53+
<groupId>org.slf4j</groupId>
54+
<artifactId>slf4j-api</artifactId>
55+
</dependency>
56+
57+
<!-- Test Dependencies -->
58+
<dependency>
59+
<groupId>org.janusgraph</groupId>
60+
<artifactId>janusgraph-backend-testutils</artifactId>
61+
<version>${project.version}</version>
62+
<scope>test</scope>
63+
</dependency>
64+
<dependency>
65+
<groupId>org.janusgraph</groupId>
66+
<artifactId>janusgraph-cql</artifactId>
67+
<version>${project.version}</version>
68+
<scope>test</scope>
69+
</dependency>
70+
<dependency>
71+
<groupId>org.janusgraph</groupId>
72+
<artifactId>janusgraph-es</artifactId>
73+
<version>${project.version}</version>
74+
<scope>test</scope>
75+
</dependency>
76+
<dependency>
77+
<groupId>org.junit.jupiter</groupId>
78+
<artifactId>junit-jupiter-api</artifactId>
79+
<scope>test</scope>
80+
</dependency>
81+
<dependency>
82+
<groupId>org.testcontainers</groupId>
83+
<artifactId>testcontainers</artifactId>
84+
<scope>test</scope>
85+
</dependency>
86+
<dependency>
87+
<groupId>org.testcontainers</groupId>
88+
<artifactId>kafka</artifactId>
89+
<scope>test</scope>
90+
</dependency>
91+
<dependency>
92+
<groupId>org.testcontainers</groupId>
93+
<artifactId>cassandra</artifactId>
94+
<scope>test</scope>
95+
</dependency>
96+
<dependency>
97+
<groupId>org.testcontainers</groupId>
98+
<artifactId>elasticsearch</artifactId>
99+
<scope>test</scope>
100+
</dependency>
101+
<dependency>
102+
<groupId>ch.qos.logback</groupId>
103+
<artifactId>logback-classic</artifactId>
104+
<scope>test</scope>
105+
</dependency>
106+
</dependencies>
107+
108+
<build>
109+
<plugins>
110+
<plugin>
111+
<artifactId>maven-surefire-plugin</artifactId>
112+
</plugin>
113+
</plugins>
114+
</build>
115+
</project>
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright 2025 JanusGraph Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package org.janusgraph.diskstorage.cdc;
16+
17+
import org.janusgraph.diskstorage.configuration.Configuration;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
21+
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_CDC_ENABLED;
22+
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_CDC_KAFKA_BOOTSTRAP_SERVERS;
23+
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_CDC_KAFKA_TOPIC;
24+
import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_CDC_MODE;
25+
26+
/**
27+
* Configuration holder for CDC settings.
28+
*/
29+
public class CdcConfiguration {
30+
31+
private static final Logger log = LoggerFactory.getLogger(CdcConfiguration.class);
32+
33+
private final boolean enabled;
34+
private final CdcIndexTransaction.CdcMode mode;
35+
private final String kafkaBootstrapServers;
36+
private final String kafkaTopic;
37+
38+
public CdcConfiguration(Configuration config) {
39+
this.enabled = config.get(INDEX_CDC_ENABLED);
40+
this.mode = parseCdcMode(config.get(INDEX_CDC_MODE));
41+
this.kafkaBootstrapServers = config.get(INDEX_CDC_KAFKA_BOOTSTRAP_SERVERS);
42+
this.kafkaTopic = config.get(INDEX_CDC_KAFKA_TOPIC);
43+
44+
if (enabled) {
45+
validate();
46+
log.info("CDC enabled with mode: {}, topic: {}, bootstrap servers: {}",
47+
mode, kafkaTopic, kafkaBootstrapServers);
48+
}
49+
}
50+
51+
private CdcIndexTransaction.CdcMode parseCdcMode(String modeStr) {
52+
if (modeStr == null || modeStr.isEmpty()) {
53+
return CdcIndexTransaction.CdcMode.DUAL;
54+
}
55+
56+
switch (modeStr.toLowerCase()) {
57+
case "skip":
58+
return CdcIndexTransaction.CdcMode.SKIP;
59+
case "dual":
60+
return CdcIndexTransaction.CdcMode.DUAL;
61+
case "cdc-only":
62+
case "cdc_only":
63+
return CdcIndexTransaction.CdcMode.CDC_ONLY;
64+
default:
65+
log.warn("Unknown CDC mode: {}, defaulting to DUAL", modeStr);
66+
return CdcIndexTransaction.CdcMode.DUAL;
67+
}
68+
}
69+
70+
private void validate() {
71+
if (kafkaBootstrapServers == null || kafkaBootstrapServers.isEmpty()) {
72+
throw new IllegalArgumentException("CDC is enabled but kafka bootstrap servers are not configured");
73+
}
74+
if (kafkaTopic == null || kafkaTopic.isEmpty()) {
75+
throw new IllegalArgumentException("CDC is enabled but kafka topic is not configured");
76+
}
77+
}
78+
79+
public boolean isEnabled() {
80+
return enabled;
81+
}
82+
83+
public CdcIndexTransaction.CdcMode getMode() {
84+
return mode;
85+
}
86+
87+
public String getKafkaBootstrapServers() {
88+
return kafkaBootstrapServers;
89+
}
90+
91+
public String getKafkaTopic() {
92+
return kafkaTopic;
93+
}
94+
}

0 commit comments

Comments
 (0)