Skip to content

Commit 531e73b

Browse files
Merge pull request #25 from Milesight-IoT/feat/msc_integration/supports_multi_tenant
feat(msc-integration): supports multi-tenant
2 parents d04d4a5 + b904928 commit 531e73b

File tree

5 files changed

+300
-85
lines changed

5 files changed

+300
-85
lines changed

integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/MscIntegrationBootstrap.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import lombok.extern.slf4j.*;
99
import org.apache.camel.CamelContext;
1010
import org.springframework.beans.factory.annotation.Autowired;
11-
import org.springframework.context.annotation.PropertySource;
1211
import org.springframework.stereotype.Component;
1312

1413
@Slf4j
@@ -27,15 +26,20 @@ public class MscIntegrationBootstrap implements IntegrationBootstrap {
2726

2827
@Override
2928
public void onPrepared(Integration integrationConfig) {
30-
29+
// do nothing
3130
}
3231

3332
@Override
3433
public void onStarted(Integration integrationConfig) {
34+
// do nothing
35+
}
36+
37+
@Override
38+
public void onEnabled(String tenantId, Integration integrationConfig) {
3539
log.info("MSC integration starting");
36-
mscConnectionService.init();
37-
mscDataFetchingService.init();
38-
mscWebhookService.init();
40+
mscConnectionService.init(tenantId);
41+
mscDataFetchingService.init(tenantId);
42+
mscWebhookService.init(tenantId);
3943
log.info("MSC integration started");
4044
}
4145

integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/service/MscConnectionService.java

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.milesight.beaveriot.context.api.EntityValueServiceProvider;
44
import com.milesight.beaveriot.context.integration.model.ExchangePayload;
5+
import com.milesight.beaveriot.context.security.TenantContext;
56
import com.milesight.beaveriot.eventbus.annotations.EventSubscribe;
67
import com.milesight.beaveriot.eventbus.api.Event;
78
import com.milesight.beaveriot.integration.msc.entity.MscConnectionPropertiesEntities;
@@ -15,6 +16,7 @@
1516

1617
import java.util.Map;
1718
import java.util.Objects;
19+
import java.util.concurrent.ConcurrentHashMap;
1820

1921

2022
@Slf4j
@@ -26,31 +28,32 @@ public class MscConnectionService implements IMscClientProvider {
2628
@Autowired
2729
private EntityValueServiceProvider entityValueServiceProvider;
2830

29-
@Getter
30-
private MscClient mscClient;
31+
private final Map<String, MscClient> tenantIdToMscClient = new ConcurrentHashMap<>();
3132

3233
@EventSubscribe(payloadKeyExpression = "msc-integration.integration.openapi.*")
3334
public void onOpenapiPropertiesUpdate(Event<MscConnectionPropertiesEntities.Openapi> event) {
35+
val tenantId = TenantContext.getTenantId();
3436
if (isConfigChanged(event)) {
3537
val openapiSettings = event.getPayload();
36-
initConnection(openapiSettings);
38+
initConnection(tenantId, openapiSettings);
3739
entityValueServiceProvider.saveValuesAndPublishSync(new ExchangePayload(Map.of(OPENAPI_STATUS_KEY, IntegrationStatus.NOT_READY.name())));
3840
}
39-
testConnection();
41+
testConnection(tenantId);
4042
}
4143

42-
private void initConnection(MscConnectionPropertiesEntities.Openapi openapiSettings) {
43-
mscClient = MscClient.builder()
44+
private void initConnection(String tenantId, MscConnectionPropertiesEntities.Openapi openapiSettings) {
45+
tenantIdToMscClient.put(tenantId, MscClient.builder()
4446
.endpoint(openapiSettings.getServerUrl())
4547
.credentials(Credentials.builder()
4648
.clientId(openapiSettings.getClientId())
4749
.clientSecret(openapiSettings.getClientSecret())
4850
.build())
49-
.build();
51+
.build());
5052
}
5153

52-
private void testConnection() {
54+
private void testConnection(String tenantId) {
5355
try {
56+
val mscClient = tenantIdToMscClient.get(tenantId);
5457
mscClient.test();
5558
entityValueServiceProvider.saveValuesAndPublishSync(new ExchangePayload(Map.of(OPENAPI_STATUS_KEY, IntegrationStatus.READY.name())));
5659
} catch (Exception e) {
@@ -71,6 +74,8 @@ private boolean isConfigChanged(Event<MscConnectionPropertiesEntities.Openapi> e
7174
return false;
7275
}
7376
// check if mscClient is initiated
77+
val tenantId = TenantContext.getTenantId();
78+
val mscClient = tenantIdToMscClient.get(tenantId);
7479
if (mscClient == null) {
7580
return true;
7681
}
@@ -90,18 +95,22 @@ private boolean isConfigChanged(Event<MscConnectionPropertiesEntities.Openapi> e
9095
return !Objects.equals(mscClient.getConfig().getCredentials().getClientSecret(), event.getPayload().getClientSecret());
9196
}
9297

93-
public void init() {
98+
public void init(String tenantId) {
9499
try {
95100
val settings = entityValueServiceProvider.findValuesByKey(
96101
MscConnectionPropertiesEntities.getKey(MscConnectionPropertiesEntities.Fields.openapi), MscConnectionPropertiesEntities.Openapi.class);
97102
if (!settings.isEmpty()) {
98-
initConnection(settings);
99-
testConnection();
103+
initConnection(tenantId, settings);
104+
testConnection(tenantId);
100105
}
101106
} catch (Exception e) {
102107
log.error("Error occurs while initializing connection", e);
103108
entityValueServiceProvider.saveValuesAndPublishSync(new ExchangePayload(Map.of(OPENAPI_STATUS_KEY, IntegrationStatus.NOT_READY.name())));
104109
}
105110
}
106111

112+
public MscClient getMscClient() {
113+
return tenantIdToMscClient.get(TenantContext.getTenantId());
114+
}
115+
107116
}

integrations/msc-integration/src/main/java/com/milesight/beaveriot/integration/msc/service/MscDataSyncService.java

Lines changed: 61 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.milesight.beaveriot.context.constants.ExchangeContextKeys;
77
import com.milesight.beaveriot.context.integration.model.Device;
88
import com.milesight.beaveriot.context.integration.model.ExchangePayload;
9+
import com.milesight.beaveriot.context.security.TenantContext;
910
import com.milesight.beaveriot.eventbus.annotations.EventSubscribe;
1011
import com.milesight.beaveriot.eventbus.api.Event;
1112
import com.milesight.beaveriot.integration.msc.constant.MscIntegrationConstants;
@@ -25,9 +26,8 @@
2526
import javax.annotation.Nonnull;
2627
import javax.annotation.Nullable;
2728
import java.io.IOException;
29+
import java.util.Map;
2830
import java.util.Optional;
29-
import java.util.Timer;
30-
import java.util.TimerTask;
3131
import java.util.concurrent.CompletableFuture;
3232
import java.util.concurrent.ConcurrentHashMap;
3333
import java.util.concurrent.ExecutorService;
@@ -57,9 +57,10 @@ public class MscDataSyncService {
5757
@Autowired
5858
private EntityValueServiceProvider entityValueServiceProvider;
5959

60-
private Timer timer;
60+
@Autowired
61+
private MscTimerService mscTimerService;
6162

62-
private int periodSeconds = 0;
63+
private final Map<String, Long> tenantIdToPeriodSecond = new ConcurrentHashMap<>();
6364

6465
// Only two existing tasks allowed at a time (one running and one waiting)
6566
private static final ExecutorService syncAllDataExecutor = new ThreadPoolExecutor(1, 1,
@@ -75,18 +76,23 @@ public class MscDataSyncService {
7576

7677
@EventSubscribe(payloadKeyExpression = "msc-integration.integration.scheduled_data_fetch.*")
7778
public void onScheduledDataFetchPropertiesUpdate(Event<MscConnectionPropertiesEntities.ScheduledDataFetch> event) {
79+
val tenantId = TenantContext.getTenantId();
7880
if (event.getPayload().getPeriod() != null) {
79-
periodSeconds = event.getPayload().getPeriod();
81+
tenantIdToPeriodSecond.put(tenantId, event.getPayload().getPeriod().longValue());
8082
}
81-
restart();
83+
restart(tenantId);
8284
}
8385

8486
@EventSubscribe(payloadKeyExpression = "msc-integration.integration.openapi_status")
8587
public void onOpenapiStatusUpdate(Event<MscConnectionPropertiesEntities> event) {
8688
val status = event.getPayload().getOpenapiStatus();
8789
if (IntegrationStatus.READY.name().equals(status)) {
8890
try {
89-
syncAllDataExecutor.submit(this::syncDeltaData);
91+
val tenantId = TenantContext.getTenantId();
92+
syncAllDataExecutor.submit(() -> {
93+
TenantContext.setTenantId(tenantId);
94+
this.syncDeltaData();
95+
});
9096
} catch (RejectedExecutionException e) {
9197
log.error("Task rejected: ", e);
9298
}
@@ -96,66 +102,77 @@ public void onOpenapiStatusUpdate(Event<MscConnectionPropertiesEntities> event)
96102
@SneakyThrows
97103
@EventSubscribe(payloadKeyExpression = "msc-integration.integration.sync_device")
98104
public void onSyncDevice(Event<MscServiceEntities.SyncDevice> event) {
99-
syncAllDataExecutor.submit(this::syncAllData).get();
105+
val tenantId = TenantContext.getTenantId();
106+
syncAllDataExecutor.submit(() -> {
107+
TenantContext.setTenantId(tenantId);
108+
this.syncAllData();
109+
}).get();
100110
}
101111

102112

103-
public void restart() {
104-
stop();
105-
start();
113+
public void restart(String tenantId) {
114+
stop(tenantId);
115+
start(tenantId);
106116
}
107117

108118
public void stop() {
109-
if (timer != null) {
110-
timer.cancel();
111-
timer = null;
112-
}
113-
log.info("timer stopped");
119+
mscTimerService.clear();
120+
}
121+
122+
public void stop(String tenantId) {
123+
mscTimerService.cancelTask(tenantId);
124+
log.info("msc-integration timer stopped, tenant: '{}'", tenantId);
114125
}
115126

116-
public void init() {
117-
start();
127+
public void init(String tenantId) {
128+
start(tenantId);
118129
}
119130

120-
public void start() {
121-
log.info("timer starting");
122-
if (timer != null) {
131+
public void start(String tenantId) {
132+
log.info("msc-integration timer starting, tenant: '{}'", tenantId);
133+
if (mscTimerService.isScheduled(tenantId)) {
123134
return;
124135
}
125-
if (periodSeconds == 0) {
136+
long periodSecond = tenantIdToPeriodSecond.getOrDefault(tenantId, 0L);
137+
if (periodSecond == 0) {
126138
val scheduledDataFetchSettings = entityValueServiceProvider.findValuesByKey(
127139
MscConnectionPropertiesEntities.getKey(MscConnectionPropertiesEntities.Fields.scheduledDataFetch),
128140
MscConnectionPropertiesEntities.ScheduledDataFetch.class);
141+
129142
if (scheduledDataFetchSettings.isEmpty()) {
130-
periodSeconds = -1;
143+
periodSecond = -1L;
144+
tenantIdToPeriodSecond.put(tenantId, periodSecond);
131145
return;
132146
}
147+
133148
if (!Boolean.TRUE.equals(scheduledDataFetchSettings.getEnabled())
134149
|| scheduledDataFetchSettings.getPeriod() == null
135150
|| scheduledDataFetchSettings.getPeriod() == 0) {
136151
// not enabled or invalid period
137-
periodSeconds = -1;
152+
periodSecond = -1L;
153+
tenantIdToPeriodSecond.put(tenantId, periodSecond);
138154
} else if (scheduledDataFetchSettings.getPeriod() > 0) {
139-
periodSeconds = scheduledDataFetchSettings.getPeriod();
155+
periodSecond = scheduledDataFetchSettings.getPeriod();
156+
tenantIdToPeriodSecond.put(tenantId, periodSecond);
140157
}
141158
}
142-
if (periodSeconds < 0) {
159+
160+
if (periodSecond < 0) {
143161
return;
144162
}
145-
timer = new Timer();
146163

147164
// setup timer
148-
val periodMills = periodSeconds * 1000L;
149-
timer.scheduleAtFixedRate(new TimerTask() {
150-
@Override
151-
public void run() {
152-
try {
153-
syncAllDataExecutor.submit(() -> syncDeltaData());
154-
} catch (RejectedExecutionException e) {
155-
log.error("Task rejected: ", e);
156-
}
165+
TenantContext.getTenantId();
166+
mscTimerService.scheduleTask(() -> {
167+
try {
168+
syncAllDataExecutor.submit(() -> {
169+
TenantContext.setTenantId(tenantId);
170+
this.syncDeltaData();
171+
});
172+
} catch (RejectedExecutionException e) {
173+
log.error("Task rejected: ", e);
157174
}
158-
}, periodMills, periodMills);
175+
}, tenantId, periodSecond, periodSecond);
159176

160177
log.info("timer started");
161178
}
@@ -296,13 +313,17 @@ public CompletableFuture<Boolean> syncDeviceData(Task task) {
296313
log.info("Skip execution because device task is running: {}", task.identifier);
297314
return CompletableFuture.completedFuture(null);
298315
}
316+
val tenantId = TenantContext.getTenantId();
299317
return CompletableFuture.supplyAsync(() -> {
318+
TenantContext.setTenantId(tenantId);
300319
try {
301320
Device device = null;
302321
switch (task.type) {
303-
case REMOVE_LOCAL_DEVICE -> device = removeLocalDevice(task.identifier);
304322
case ADD_LOCAL_DEVICE -> device = addLocalDevice(task);
305323
case UPDATE_LOCAL_DEVICE -> device = updateLocalDevice(task);
324+
default -> {
325+
// do nothing
326+
}
306327
}
307328

308329
if (task.type != Task.Type.REMOVE_LOCAL_DEVICE && device == null) {
@@ -357,21 +378,15 @@ private void syncPropertiesHistory(Device device, long lastSyncTime) {
357378
page.getData().getList().forEach(item -> {
358379
val objectMapper = mscClientProvider.getMscClient().getObjectMapper();
359380
val properties = objectMapper.convertValue(item.getProperties(), JsonNode.class);
360-
saveHistoryData(device.getKey(), null, properties, truncateTimestampMs(item.getTs()), isLatestData.get());
381+
val timestamp = item.getTs() != null ? item.getTs() : TimeUtils.currentTimeMillis();
382+
saveHistoryData(device.getKey(), null, properties, timestamp, isLatestData.get());
361383
if (isLatestData.get()) {
362384
isLatestData.set(false);
363385
}
364386
});
365387
}
366388
}
367389

368-
private static long truncateTimestampMs(Long ts) {
369-
if (ts == null) {
370-
return TimeUtils.currentTimeMillis();
371-
}
372-
return ts - ts % 1000;
373-
}
374-
375390
public void saveHistoryData(String deviceKey, String eventId, JsonNode data, long timestampMs, boolean isLatestData) {
376391
val payload = eventId == null
377392
? MscTslUtils.convertJsonNodeToExchangePayload(deviceKey, data)
@@ -427,11 +442,6 @@ private DeviceDetailResponse getDeviceDetails(Task task)
427442
return details;
428443
}
429444

430-
private Device removeLocalDevice(String identifier) {
431-
// delete is unsupported currently
432-
return null;
433-
}
434-
435445

436446
public record Task(@Nonnull Type type, @Nonnull String identifier, @Nullable DeviceDetailResponse details) {
437447

0 commit comments

Comments
 (0)