Skip to content

Commit 215a1d1

Browse files
Merge pull request #36 from RADAR-CNS/fix_client_id
Fix client Id
2 parents aa09d7d + 69b652c commit 215a1d1

File tree

4 files changed

+10
-29
lines changed

4 files changed

+10
-29
lines changed

.idea/.name

Lines changed: 0 additions & 1 deletion
This file was deleted.

.idea/compiler.xml

Lines changed: 1 addition & 22 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/main/java/org/radarcns/monitor/AbstractKafkaMonitor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,11 @@ public AbstractKafkaMonitor(RadarPropertyHandler radar, Collection<String> topic
8888

8989
properties = new Properties();
9090
String deserializer = KafkaAvroDeserializer.class.getName();
91+
String monitorClientId = getClass().getName() + "-" + clientId;
9192
properties.setProperty(KEY_DESERIALIZER_CLASS_CONFIG, deserializer);
9293
properties.setProperty(VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
9394
properties.setProperty(GROUP_ID_CONFIG, groupId);
94-
properties.setProperty(CLIENT_ID_CONFIG, clientId);
95+
properties.setProperty(CLIENT_ID_CONFIG, monitorClientId);
9596
properties.setProperty(ENABLE_AUTO_COMMIT_CONFIG, "true");
9697
properties.setProperty(AUTO_COMMIT_INTERVAL_MS_CONFIG, "1001");
9798
properties.setProperty(SESSION_TIMEOUT_MS_CONFIG, "15101");
@@ -105,7 +106,7 @@ public AbstractKafkaMonitor(RadarPropertyHandler radar, Collection<String> topic
105106
this.topics = topics;
106107
this.pollTimeout = new AtomicLong(Long.MAX_VALUE);
107108
this.done = false;
108-
this.clientId = clientId;
109+
this.clientId = monitorClientId;
109110
this.groupId = groupId;
110111

111112
PersistentStateStore localStateStore;
@@ -121,7 +122,7 @@ public AbstractKafkaMonitor(RadarPropertyHandler radar, Collection<String> topic
121122
S localState = stateDefault;
122123
if (stateStore != null && stateDefault != null) {
123124
try {
124-
localState = stateStore.retrieveState(groupId, clientId, stateDefault);
125+
localState = stateStore.retrieveState(groupId, monitorClientId, stateDefault);
125126
logger.info("Using existing {} from persistence store.",
126127
stateDefault.getClass().getName());
127128
} catch (IOException ex) {

src/test/java/org/radarcns/monitor/KafkaMonitorFactoryTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ public void createBatteryMonitor() throws Exception {
5555
assertEquals(BatteryLevelMonitor.class, monitor.getClass());
5656
BatteryLevelMonitor batteryMonitor = (BatteryLevelMonitor) monitor;
5757
batteryMonitor.evaluateRecords(new ConsumerRecords<>(Collections.emptyMap()));
58-
assertTrue(new File(config.getPersistencePath(), "battery_monitors_1.yml").isFile());
58+
assertTrue(new File(config.getPersistencePath(), "battery_monitors_" +
59+
BatteryLevelMonitor.class.getName() + "-1.yml").isFile());
5960
}
6061

6162
@Test(expected = IOException.class)
@@ -79,7 +80,8 @@ public void createDisconnectMonitor() throws Exception {
7980
assertEquals(DisconnectMonitor.class, monitor.getClass());
8081
DisconnectMonitor disconnectMonitor = (DisconnectMonitor) monitor;
8182
disconnectMonitor.evaluateRecords(new ConsumerRecords<>(Collections.emptyMap()));
82-
assertTrue(new File(config.getPersistencePath(), "disconnect_monitor_1.yml").isFile());
83+
assertTrue(new File(config.getPersistencePath(), "disconnect_monitor_" +
84+
DisconnectMonitor.class.getName() + "-1.yml").isFile());
8385
}
8486

8587
@Test
@@ -148,4 +150,4 @@ public static ConfigRadar getBatteryMonitorConfig(int port, TemporaryFolder fold
148150
config.setBatteryMonitor(getBatteryMonitorConfig(port));
149151
return config;
150152
}
151-
}
153+
}

0 commit comments

Comments
 (0)