Skip to content

Commit ab082a0

Browse files
Copilotporunov
andcommitted
Add CDC configuration tests and fix lazy initialization
- Added comprehensive configuration tests - Fixed CdcConfiguration to lazily load config values when enabled - All tests passing successfully Co-authored-by: porunov <[email protected]>
1 parent 1e5b1cf commit ab082a0

File tree

2 files changed

+92
-4
lines changed

2 files changed

+92
-4
lines changed

janusgraph-cdc/src/main/java/org/janusgraph/diskstorage/cdc/CdcConfiguration.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,18 @@ public class CdcConfiguration {
3737

3838
public CdcConfiguration(Configuration config) {
3939
this.enabled = config.get(INDEX_CDC_ENABLED);
40-
this.mode = parseCdcMode(config.get(INDEX_CDC_MODE));
41-
this.kafkaBootstrapServers = config.get(INDEX_CDC_KAFKA_BOOTSTRAP_SERVERS);
42-
this.kafkaTopic = config.get(INDEX_CDC_KAFKA_TOPIC);
43-
40+
4441
if (enabled) {
42+
this.mode = parseCdcMode(config.get(INDEX_CDC_MODE));
43+
this.kafkaBootstrapServers = config.get(INDEX_CDC_KAFKA_BOOTSTRAP_SERVERS);
44+
this.kafkaTopic = config.get(INDEX_CDC_KAFKA_TOPIC);
4545
validate();
4646
log.info("CDC enabled with mode: {}, topic: {}, bootstrap servers: {}",
4747
mode, kafkaTopic, kafkaBootstrapServers);
48+
} else {
49+
this.mode = CdcIndexTransaction.CdcMode.DUAL;
50+
this.kafkaBootstrapServers = null;
51+
this.kafkaTopic = null;
4852
}
4953
}
5054

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright 2025 JanusGraph Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package org.janusgraph.diskstorage.cdc;
16+
17+
import org.janusgraph.diskstorage.configuration.BasicConfiguration;
18+
import org.janusgraph.diskstorage.configuration.WriteConfiguration;
19+
import org.janusgraph.diskstorage.configuration.backend.CommonsConfiguration;
20+
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
21+
import org.junit.jupiter.api.Test;
22+
23+
import static org.junit.jupiter.api.Assertions.assertEquals;
24+
import static org.junit.jupiter.api.Assertions.assertFalse;
25+
import static org.junit.jupiter.api.Assertions.assertTrue;
26+
27+
/**
28+
* Tests for CDC configuration and transaction wrapping.
29+
*/
30+
public class CdcConfigurationTest {
31+
32+
@Test
33+
public void testCdcConfigurationDisabled() {
34+
WriteConfiguration config = new CommonsConfiguration();
35+
config.set("index.search.backend", "elasticsearch");
36+
37+
BasicConfiguration basicConfig = new BasicConfiguration(
38+
GraphDatabaseConfiguration.ROOT_NS,
39+
config,
40+
BasicConfiguration.Restriction.NONE
41+
);
42+
43+
CdcConfiguration cdcConfig = new CdcConfiguration(basicConfig.restrictTo("search"));
44+
assertFalse(cdcConfig.isEnabled());
45+
}
46+
47+
@Test
48+
public void testCdcConfigurationEnabled() {
49+
WriteConfiguration config = new CommonsConfiguration();
50+
config.set("index.search.cdc.enabled", true);
51+
config.set("index.search.cdc.kafka-bootstrap-servers", "localhost:9092");
52+
config.set("index.search.cdc.kafka-topic", "test-topic");
53+
config.set("index.search.cdc.mode", "dual");
54+
55+
BasicConfiguration basicConfig = new BasicConfiguration(
56+
GraphDatabaseConfiguration.ROOT_NS,
57+
config,
58+
BasicConfiguration.Restriction.NONE
59+
);
60+
61+
CdcConfiguration cdcConfig = new CdcConfiguration(basicConfig.restrictTo("search"));
62+
assertTrue(cdcConfig.isEnabled());
63+
assertEquals("localhost:9092", cdcConfig.getKafkaBootstrapServers());
64+
assertEquals("test-topic", cdcConfig.getKafkaTopic());
65+
assertEquals(CdcIndexTransaction.CdcMode.DUAL, cdcConfig.getMode());
66+
}
67+
68+
@Test
69+
public void testCdcModeDefaults() {
70+
WriteConfiguration config = new CommonsConfiguration();
71+
config.set("index.search.cdc.enabled", true);
72+
config.set("index.search.cdc.kafka-bootstrap-servers", "localhost:9092");
73+
config.set("index.search.cdc.kafka-topic", "test-topic");
74+
75+
BasicConfiguration basicConfig = new BasicConfiguration(
76+
GraphDatabaseConfiguration.ROOT_NS,
77+
config,
78+
BasicConfiguration.Restriction.NONE
79+
);
80+
81+
CdcConfiguration cdcConfig = new CdcConfiguration(basicConfig.restrictTo("search"));
82+
assertEquals(CdcIndexTransaction.CdcMode.DUAL, cdcConfig.getMode());
83+
}
84+
}

0 commit comments

Comments
 (0)