Skip to content

Commit 1e5b1cf

Browse files
Copilotporunov
andcommitted
Add CDC factory, documentation, and Kafka integration test
- Created CdcIndexTransactionFactory for managing CDC lifecycle - Added comprehensive README with usage examples - Implemented integration test with Kafka testcontainer - Added testcontainers junit-jupiter dependency Co-authored-by: porunov <[email protected]>
1 parent ceb1af1 commit 1e5b1cf

File tree

4 files changed

+351
-0
lines changed

4 files changed

+351
-0
lines changed

janusgraph-cdc/README.md

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
# JanusGraph CDC Module
2+
3+
This module provides Change Data Capture (CDC) support for JanusGraph mixed index mutations to ensure eventual consistency between the storage backend (Cassandra) and mixed indexes (ElasticSearch, Solr, etc.).
4+
5+
## Overview
6+
7+
When using JanusGraph with external index backends, there's a risk of inconsistency if the index update fails while the graph data is successfully written to the storage backend. CDC addresses this by:
8+
9+
1. Publishing all mixed index mutations to a Kafka topic
10+
2. Running separate CDC workers that consume these events and apply them to the index
11+
3. Ensuring eventual consistency even in failure scenarios
12+
13+
## Architecture
14+
15+
### Components
16+
17+
1. **CdcProducer**: Publishes index mutation events to Kafka
18+
2. **CdcWorker**: Consumes events from Kafka and applies them to indexes
19+
3. **CdcIndexTransaction**: Wraps IndexTransaction to capture mutations
20+
4. **CdcConfiguration**: Manages CDC settings
21+
22+
### CDC Modes
23+
24+
- **DUAL** (default): Write to index during transaction AND publish to CDC topic for redundancy
25+
- **SKIP**: Skip index writes during transaction, rely entirely on CDC
26+
- **CDC_ONLY**: Same as SKIP (deprecated naming)
27+
28+
## Configuration
29+
30+
Add the following to your JanusGraph configuration:
31+
32+
```properties
33+
# Enable CDC for mixed indexes
34+
index.search.cdc.enabled=true
35+
36+
# CDC mode (dual, skip, or cdc-only)
37+
index.search.cdc.mode=dual
38+
39+
# Kafka bootstrap servers
40+
index.search.cdc.kafka-bootstrap-servers=localhost:9092
41+
42+
# Kafka topic for CDC events
43+
index.search.cdc.kafka-topic=janusgraph-cdc-index-mutations
44+
```
45+
46+
## Usage
47+
48+
### Running CDC Workers
49+
50+
CDC workers should be run as separate processes:
51+
52+
```java
53+
// Create index provider and retriever
54+
IndexProvider indexProvider = ...;
55+
KeyInformation.IndexRetriever indexRetriever = ...;
56+
57+
// Create and start CDC worker
58+
CdcWorker worker = new CdcWorker(
59+
"localhost:9092", // Kafka bootstrap servers
60+
"janusgraph-cdc-index-mutations", // Topic name
61+
"janusgraph-cdc-group", // Consumer group ID
62+
indexProvider,
63+
indexRetriever
64+
);
65+
66+
worker.start();
67+
68+
// Keep worker running...
69+
70+
// Shutdown gracefully
71+
worker.stop();
72+
worker.close();
73+
```
74+
75+
### Integration with JanusGraph
76+
77+
CDC can be integrated programmatically using the CdcIndexTransactionFactory:
78+
79+
```java
80+
Configuration config = ...; // Your JanusGraph configuration
81+
CdcIndexTransactionFactory cdcFactory = new CdcIndexTransactionFactory(config);
82+
83+
// Wrap index transactions with CDC support
84+
IndexTransaction indexTx = ...; // Original index transaction
85+
IndexTransaction wrappedTx = cdcFactory.wrapIfEnabled(indexTx);
86+
87+
// Use wrappedTx as normal
88+
// Mutations will be captured and published to Kafka
89+
```
90+
91+
## Benefits
92+
93+
1. **Eventual Consistency**: Guarantees that index and storage backend will eventually be consistent
94+
2. **Failure Recovery**: Automatic recovery from index update failures
95+
3. **Operational Flexibility**: CDC workers can be scaled independently
96+
4. **Minimal Performance Impact**: Asynchronous processing offloads index updates
97+
98+
## Dependencies
99+
100+
- Apache Kafka 3.6.1+
101+
- Debezium Core 2.5.0+
102+
- Jackson for JSON serialization
103+
104+
## Limitations
105+
106+
- Requires Kafka infrastructure
107+
- Eventual consistency means slight delay in index updates
108+
- CDC workers must be managed separately from JanusGraph instances
109+
110+
## Future Enhancements
111+
112+
- Automatic integration with Backend
113+
- Support for other message brokers (RabbitMQ, etc.)
114+
- Built-in CDC worker management
115+
- Metrics and monitoring integration

janusgraph-cdc/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@
8383
<artifactId>testcontainers</artifactId>
8484
<scope>test</scope>
8585
</dependency>
86+
<dependency>
87+
<groupId>org.testcontainers</groupId>
88+
<artifactId>junit-jupiter</artifactId>
89+
<scope>test</scope>
90+
</dependency>
8691
<dependency>
8792
<groupId>org.testcontainers</groupId>
8893
<artifactId>kafka</artifactId>
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright 2025 JanusGraph Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package org.janusgraph.diskstorage.cdc;
16+
17+
import org.janusgraph.diskstorage.configuration.Configuration;
18+
import org.janusgraph.diskstorage.indexing.IndexTransaction;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
/**
23+
* Factory for creating CDC-enabled index transactions.
24+
* Manages the lifecycle of CDC producers and provides wrapped index transactions.
25+
*/
26+
public class CdcIndexTransactionFactory implements AutoCloseable {
27+
28+
private static final Logger log = LoggerFactory.getLogger(CdcIndexTransactionFactory.class);
29+
30+
private final CdcConfiguration cdcConfig;
31+
private final CdcProducer cdcProducer;
32+
33+
public CdcIndexTransactionFactory(Configuration configuration) {
34+
this.cdcConfig = new CdcConfiguration(configuration);
35+
36+
if (cdcConfig.isEnabled()) {
37+
this.cdcProducer = new KafkaCdcProducer(
38+
cdcConfig.getKafkaBootstrapServers(),
39+
cdcConfig.getKafkaTopic()
40+
);
41+
log.info("CDC Index Transaction Factory initialized");
42+
} else {
43+
this.cdcProducer = null;
44+
}
45+
}
46+
47+
/**
48+
* Wrap an IndexTransaction with CDC support if enabled.
49+
*
50+
* @param indexTransaction The base index transaction to wrap
51+
* @return The wrapped transaction if CDC is enabled, otherwise null (caller should use original)
52+
*/
53+
public CdcIndexTransaction wrapIfEnabled(IndexTransaction indexTransaction) {
54+
if (cdcConfig.isEnabled() && cdcProducer != null) {
55+
return new CdcIndexTransaction(indexTransaction, cdcProducer, cdcConfig.getMode());
56+
}
57+
return null;
58+
}
59+
60+
/**
61+
* Check if CDC is enabled.
62+
*
63+
* @return true if CDC is enabled
64+
*/
65+
public boolean isEnabled() {
66+
return cdcConfig.isEnabled();
67+
}
68+
69+
@Override
70+
public void close() {
71+
if (cdcProducer != null) {
72+
cdcProducer.close();
73+
log.info("CDC Index Transaction Factory closed");
74+
}
75+
}
76+
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// Copyright 2025 JanusGraph Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package org.janusgraph.diskstorage.cdc;
16+
17+
import com.fasterxml.jackson.databind.ObjectMapper;
18+
import org.apache.kafka.clients.consumer.ConsumerConfig;
19+
import org.apache.kafka.clients.consumer.ConsumerRecord;
20+
import org.apache.kafka.clients.consumer.ConsumerRecords;
21+
import org.apache.kafka.clients.consumer.KafkaConsumer;
22+
import org.apache.kafka.common.serialization.StringDeserializer;
23+
import org.janusgraph.diskstorage.indexing.IndexEntry;
24+
import org.junit.jupiter.api.AfterEach;
25+
import org.junit.jupiter.api.BeforeEach;
26+
import org.junit.jupiter.api.Test;
27+
import org.testcontainers.containers.KafkaContainer;
28+
import org.testcontainers.junit.jupiter.Container;
29+
import org.testcontainers.junit.jupiter.Testcontainers;
30+
import org.testcontainers.utility.DockerImageName;
31+
32+
import java.time.Duration;
33+
import java.util.Arrays;
34+
import java.util.Collections;
35+
import java.util.List;
36+
import java.util.Properties;
37+
38+
import static org.junit.jupiter.api.Assertions.assertEquals;
39+
import static org.junit.jupiter.api.Assertions.assertNotNull;
40+
41+
/**
42+
* Integration test for CDC producer using Kafka testcontainer.
43+
*/
44+
@Testcontainers
45+
public class CdcKafkaIntegrationTest {
46+
47+
private static final String TOPIC_NAME = "test-cdc-topic";
48+
49+
@Container
50+
private static final KafkaContainer kafka = new KafkaContainer(
51+
DockerImageName.parse("confluentinc/cp-kafka:7.5.0")
52+
);
53+
54+
private KafkaCdcProducer producer;
55+
private KafkaConsumer<String, String> consumer;
56+
private ObjectMapper objectMapper;
57+
58+
@BeforeEach
59+
public void setup() {
60+
String bootstrapServers = kafka.getBootstrapServers();
61+
62+
// Create producer
63+
producer = new KafkaCdcProducer(bootstrapServers, TOPIC_NAME);
64+
65+
// Create consumer for verification
66+
Properties consumerProps = new Properties();
67+
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
68+
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
69+
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
70+
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
71+
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
72+
73+
consumer = new KafkaConsumer<>(consumerProps);
74+
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
75+
76+
objectMapper = new ObjectMapper();
77+
}
78+
79+
@AfterEach
80+
public void teardown() {
81+
if (producer != null) {
82+
producer.close();
83+
}
84+
if (consumer != null) {
85+
consumer.close();
86+
}
87+
}
88+
89+
@Test
90+
public void testProducerSendsEventToKafka() throws Exception {
91+
// Create a CDC event
92+
List<IndexEntry> additions = Arrays.asList(
93+
new IndexEntry("name", "John"),
94+
new IndexEntry("age", 30)
95+
);
96+
97+
CdcMutationEvent event = new CdcMutationEvent(
98+
"testStore",
99+
"doc123",
100+
additions,
101+
null,
102+
true,
103+
false,
104+
System.currentTimeMillis()
105+
);
106+
107+
// Send event
108+
producer.send(event);
109+
producer.flush();
110+
111+
// Consume and verify
112+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
113+
assertEquals(1, records.count(), "Should receive one message");
114+
115+
ConsumerRecord<String, String> record = records.iterator().next();
116+
assertEquals("testStore:doc123", record.key());
117+
118+
// Deserialize and verify
119+
CdcMutationEvent receivedEvent = objectMapper.readValue(record.value(), CdcMutationEvent.class);
120+
assertNotNull(receivedEvent);
121+
assertEquals("testStore", receivedEvent.getStoreName());
122+
assertEquals("doc123", receivedEvent.getDocumentId());
123+
assertEquals(CdcMutationEvent.MutationType.ADDED, receivedEvent.getMutationType());
124+
}
125+
126+
@Test
127+
public void testMultipleEventsPreserveOrder() throws Exception {
128+
// Send multiple events
129+
for (int i = 0; i < 5; i++) {
130+
CdcMutationEvent event = new CdcMutationEvent(
131+
"store" + i,
132+
"doc" + i,
133+
Arrays.asList(new IndexEntry("field", "value" + i)),
134+
null,
135+
false,
136+
false,
137+
System.currentTimeMillis()
138+
);
139+
producer.send(event);
140+
}
141+
producer.flush();
142+
143+
// Verify all events received
144+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
145+
assertEquals(5, records.count(), "Should receive five messages");
146+
147+
int count = 0;
148+
for (ConsumerRecord<String, String> record : records) {
149+
CdcMutationEvent event = objectMapper.readValue(record.value(), CdcMutationEvent.class);
150+
assertNotNull(event);
151+
count++;
152+
}
153+
assertEquals(5, count);
154+
}
155+
}

0 commit comments

Comments
 (0)