Skip to content

Commit f68b0e8

Browse files
authored
[To dev/1.3] Avoiding error log by removing ThreadLocal for Mqtt Client
(cherry picked from commit 8a8a7be)
1 parent dc5a36e commit f68b0e8

File tree

2 files changed

+16
-0
lines changed

2 files changed

+16
-0
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public void onConnect(InterceptConnectMessage msg) {
8888
ZoneId.systemDefault().toString(),
8989
TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3,
9090
ClientVersion.V_1_0);
91+
sessionManager.registerSessionForMqtt(session);
9192
clientIdToSessionMap.put(msg.getClientID(), session);
9293
}
9394
}
@@ -96,6 +97,7 @@ public void onConnect(InterceptConnectMessage msg) {
9697
public void onDisconnect(InterceptDisconnectMessage msg) {
9798
MqttClientSession session = clientIdToSessionMap.remove(msg.getClientID());
9899
if (null != session) {
100+
sessionManager.removeCurrSessionForMqtt(session);
99101
sessionManager.closeSession(session, Coordinator.getInstance()::cleanupQueryExecution);
100102
}
101103
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/session/SessionManager.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,12 @@ public void removeCurrSession() {
316316
currSessionIdleTime.remove();
317317
}
318318

319+
public void removeCurrSessionForMqtt(MqttClientSession mqttClientSession) {
320+
if (mqttClientSession != null) {
321+
sessions.remove(mqttClientSession);
322+
}
323+
}
324+
319325
/**
320326
* this method can be only used in client-thread model. Do not use this method in message-thread
321327
* model based service.
@@ -333,6 +339,14 @@ public boolean registerSession(IClientSession session) {
333339
return true;
334340
}
335341

342+
/**
343+
* this method can be only used in mqtt model. Do not use this method in client-thread model based
344+
* service.
345+
*/
346+
public void registerSessionForMqtt(IClientSession session) {
347+
sessions.put(session, placeHolder);
348+
}
349+
336350
/** must be called after registerSession()) will mark the session login. */
337351
public void supplySession(
338352
IClientSession session,

0 commit comments

Comments
 (0)