Skip to content
This repository was archived by the owner on Apr 1, 2025. It is now read-only.

Commit 78f14c8

Browse files
the connectorID is now either configured via a config entry, or assigned randomly
1 parent 208e4d1 commit 78f14c8

File tree

9 files changed

+89
-20
lines changed

9 files changed

+89
-20
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright (c) Microsoft Corporation.
3+
* All rights reserved.
4+
*
5+
*/
6+
7+
package com.microsoft.dagx.common.settings;
8+
9+
import com.microsoft.dagx.common.string.StringUtils;
10+
import com.microsoft.dagx.spi.system.ServiceExtensionContext;
11+
12+
import java.util.UUID;
13+
14+
public class SettingsHelper {
15+
private static String connectorId;
16+
17+
/**
18+
* Fetches the unique ID of the connector. If the {@code dagx.ids.connector.name} config value has been set, that value
19+
* is returned, else a random name is chosen.
20+
* The connector id is non-transient, that means two subsequent calls will produce the same result.
21+
*/
22+
public static synchronized String getConnectorId(ServiceExtensionContext context) {
23+
if (StringUtils.isNullOrEmpty(connectorId)) {
24+
connectorId = context.getSetting("dagx.ids.connector.name", "dagx-connector-" + UUID.randomUUID());
25+
}
26+
return connectorId;
27+
}
28+
}

extensions/events/events-azure/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ val eventGridSdkVersion: String by project
1212
dependencies {
1313
api(project(":spi"))
1414
implementation(project(":extensions:schema"))
15+
implementation(project(":common"))
1516
implementation("com.azure:azure-messaging-eventgrid:${eventGridSdkVersion}")
1617
}
1718

extensions/events/events-azure/src/main/java/com/microsoft/dagx/events/azure/AzureEventExtension.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import java.util.Objects;
1818
import java.util.Set;
1919

20+
import static com.microsoft.dagx.common.settings.SettingsHelper.getConnectorId;
21+
2022
public class AzureEventExtension implements ServiceExtension {
2123

2224
private static final String DEFAULT_TOPIC_NAME = "connector-events";
@@ -64,7 +66,8 @@ private void registerListeners(ServiceExtensionContext context) {
6466
.endpoint(endpoint)
6567
.buildEventGridEventPublisherAsyncClient();
6668

67-
final AzureEventGridPublisher publisher = new AzureEventGridPublisher(monitor, publisherClient);
69+
70+
final AzureEventGridPublisher publisher = new AzureEventGridPublisher(getConnectorId(context), monitor, publisherClient);
6871

6972
var processObservable = context.getService(TransferProcessObservable.class, true);
7073
if (processObservable != null) {

extensions/events/events-azure/src/main/java/com/microsoft/dagx/events/azure/AzureEventGridPublisher.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,17 @@ class AzureEventGridPublisher implements TransferProcessListener, MetadataListen
2424
private final EventGridPublisherAsyncClient<EventGridEvent> client;
2525
private final String eventTypeTransferprocess = "dagx/transfer/transferprocess";
2626
private final String eventTypeMetadata = "dagx/metadata/store";
27+
private final String connectorId;
2728

28-
public AzureEventGridPublisher(Monitor monitor, EventGridPublisherAsyncClient<EventGridEvent> client) {
29+
public AzureEventGridPublisher(String connectorId, Monitor monitor, EventGridPublisherAsyncClient<EventGridEvent> client) {
30+
this.connectorId = connectorId;
2931
this.monitor = monitor;
3032
this.client = client;
3133
}
3234

3335
@Override
3436
public void created(TransferProcess process) {
35-
var dto = createDto(process);
37+
var dto = createTransferProcessDto(process);
3638
if (process.getType() == TransferProcess.Type.CLIENT) {
3739
sendEvent("createdClient", eventTypeTransferprocess, dto).subscribe(new LoggingSubscriber<>("Transfer process created"));
3840
} else {
@@ -42,46 +44,46 @@ public void created(TransferProcess process) {
4244

4345
@Override
4446
public void completed(TransferProcess process) {
45-
sendEvent("completed", eventTypeTransferprocess, createDto(process)).subscribe(new LoggingSubscriber<>("Transfer process completed"));
47+
sendEvent("completed", eventTypeTransferprocess, createTransferProcessDto(process)).subscribe(new LoggingSubscriber<>("Transfer process completed"));
4648
}
4749

4850

4951
@Override
5052
public void deprovisioned(TransferProcess process) {
51-
sendEvent("deprovisioned", eventTypeTransferprocess, createDto(process)).subscribe(new LoggingSubscriber<>("Transfer process resources deprovisioned"));
53+
sendEvent("deprovisioned", eventTypeTransferprocess, createTransferProcessDto(process)).subscribe(new LoggingSubscriber<>("Transfer process resources deprovisioned"));
5254

5355
}
5456

5557
@Override
5658
public void ended(TransferProcess process) {
57-
sendEvent("ended", eventTypeTransferprocess, createDto(process)).subscribe(new LoggingSubscriber<>("Transfer process ended"));
59+
sendEvent("ended", eventTypeTransferprocess, createTransferProcessDto(process)).subscribe(new LoggingSubscriber<>("Transfer process ended"));
5860

5961
}
6062

6163
@Override
6264
public void error(TransferProcess process) {
63-
sendEvent("error", eventTypeTransferprocess, createDto(process)).subscribe(new LoggingSubscriber<>("Transfer process errored!"));
65+
sendEvent("error", eventTypeTransferprocess, createTransferProcessDto(process)).subscribe(new LoggingSubscriber<>("Transfer process errored!"));
6466

6567
}
6668

6769
@Override
6870
public void querySubmitted() {
69-
sendEvent("querySubmitted", eventTypeMetadata, null).subscribe(new LoggingSubscriber<>("query submitted"));
71+
sendEvent("queryReceived", eventTypeMetadata, new EventDto(connectorId)).subscribe(new LoggingSubscriber<>("query submitted"));
7072
}
7173

7274
@Override
7375
public void searchInitiated() {
74-
sendEvent("searchInitiated", eventTypeMetadata, null).subscribe(new LoggingSubscriber<>("search initiated"));
76+
sendEvent("searchInitiated", eventTypeMetadata, new EventDto(connectorId)).subscribe(new LoggingSubscriber<>("search initiated"));
7577
}
7678

7779
@Override
7880
public void metadataItemAdded() {
79-
sendEvent("itemAdded", eventTypeMetadata, null).subscribe(new LoggingSubscriber<>("AzureEventGrid: metadata item added"));
81+
sendEvent("itemAdded", eventTypeMetadata, new EventDto(connectorId)).subscribe(new LoggingSubscriber<>("AzureEventGrid: metadata item added"));
8082
}
8183

8284
@Override
8385
public void metadataItemUpdated() {
84-
sendEvent("itemUpdated", eventTypeMetadata, null).subscribe(new LoggingSubscriber<>("metadata item updated"));
86+
sendEvent("itemUpdated", eventTypeMetadata, new EventDto(connectorId)).subscribe(new LoggingSubscriber<>("metadata item updated"));
8587
}
8688

8789
private Mono<Void> sendEvent(String what, String where, Object payload) {
@@ -91,8 +93,9 @@ private Mono<Void> sendEvent(String what, String where, Object payload) {
9193
}
9294

9395
@NotNull
94-
private TransferProcessDto createDto(TransferProcess process) {
96+
private TransferProcessDto createTransferProcessDto(TransferProcess process) {
9597
return TransferProcessDto.Builder.newInstance()
98+
.connector(connectorId)
9699
.state(TransferProcessStates.from(process.getState()))
97100
.requestId(process.getDataRequest().getId())
98101
.type(process.getType())
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright (c) Microsoft Corporation.
3+
* All rights reserved.
4+
*
5+
*/
6+
7+
package com.microsoft.dagx.events.azure;
8+
9+
public class EventDto {
10+
private final String connectorId;
11+
12+
protected EventDto(String connectorId) {
13+
this.connectorId = connectorId;
14+
}
15+
16+
public String getConnectorId() {
17+
return connectorId;
18+
}
19+
}

extensions/events/events-azure/src/main/java/com/microsoft/dagx/events/azure/TransferProcessDto.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package com.microsoft.dagx.events.azure;
88

99
import com.fasterxml.jackson.annotation.JsonCreator;
10+
import com.fasterxml.jackson.annotation.JsonProperty;
1011
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
1112
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
1213
import com.microsoft.dagx.spi.types.domain.transfer.TransferProcess;
@@ -19,14 +20,19 @@
1920
* reasons of security.
2021
*/
2122
@JsonDeserialize(builder = TransferProcessDto.Builder.class)
22-
public class TransferProcessDto {
23+
public class TransferProcessDto extends EventDto {
24+
@JsonProperty("requestId")
2325
private String requestId;
26+
@JsonProperty("transferProcessType")
2427
private TransferProcess.Type type;
28+
@JsonProperty("transferProcessState")
2529
private String state;
30+
@JsonProperty("transferProcessStateCode")
2631
private int stateCode;
2732

28-
private TransferProcessDto() {
33+
private TransferProcessDto(String connectorId) {
2934

35+
super(connectorId);
3036
}
3137

3238
public String getRequestId() {
@@ -45,12 +51,14 @@ public int getStateCode() {
4551
return stateCode;
4652
}
4753

54+
4855
@JsonPOJOBuilder(withPrefix = "")
4956
public static final class Builder {
5057
private String requestId;
5158
private TransferProcess.Type type;
5259
private String state;
5360
private int stateCode;
61+
private String connectorId;
5462

5563
private Builder() {
5664
}
@@ -78,12 +86,17 @@ public Builder state(TransferProcessStates state) {
7886

7987

8088
public TransferProcessDto build() {
81-
TransferProcessDto transferProcessDto = new TransferProcessDto();
89+
TransferProcessDto transferProcessDto = new TransferProcessDto(connectorId);
8290
transferProcessDto.state = state;
8391
transferProcessDto.type = type;
8492
transferProcessDto.requestId = requestId;
8593
transferProcessDto.stateCode = stateCode;
8694
return transferProcessDto;
8795
}
96+
97+
public Builder connector(String connectorId) {
98+
this.connectorId = connectorId;
99+
return this;
100+
}
88101
}
89102
}

extensions/ids/ids-core/build.gradle.kts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@ plugins {
1313

1414
dependencies {
1515
api(project(":spi"))
16+
api(project(":common"))
1617
api(project(":extensions:ids:ids-spi"))
1718

1819
api("de.fraunhofer.iais.eis.ids.infomodel:java:${infoModelVersion}")
19-
20+
2021
implementation("jakarta.ws.rs:jakarta.ws.rs-api:${rsApi}")
2122
implementation(project(":policy:policy-engine"))
2223
}

extensions/ids/ids-core/src/main/java/com/microsoft/dagx/ids/core/IdsCoreServiceExtension.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525

2626
import java.util.Set;
2727

28-
import static com.microsoft.dagx.ids.spi.IdsConfiguration.CONNECTOR_NAME;
28+
import static com.microsoft.dagx.common.settings.SettingsHelper.getConnectorId;
2929

3030
/**
3131
* Implements the IDS Controller REST API.
@@ -51,7 +51,7 @@ public void initialize(ServiceExtensionContext context) {
5151
context.registerService(IdsDescriptorService.class, descriptorService);
5252

5353
var identityService = context.getService(IdentityService.class);
54-
var connectorName = context.getSetting(CONNECTOR_NAME, "connectorName");
54+
var connectorName = getConnectorId(context);
5555
var dapsService = new DapsServiceImpl(connectorName, identityService);
5656
context.registerService(DapsService.class, dapsService);
5757

extensions/transfer/transfer-store-cosmos/src/main/java/com/microsoft/dagx/transfer/store/cosmos/CosmosTransferProcessStoreExtension.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323

2424
import java.util.ArrayList;
2525
import java.util.Set;
26-
import java.util.UUID;
26+
27+
import static com.microsoft.dagx.common.settings.SettingsHelper.getConnectorId;
2728

2829
/**
2930
* Provides an in-memory implementation of the {@link com.microsoft.dagx.spi.transfer.store.TransferProcessStore} for testing.
@@ -91,7 +92,7 @@ public void initialize(ServiceExtensionContext context) {
9192
var partitionKey = context.getSetting(COSMOS_PARTITION_KEY_SETTING, DEFAULT_PARTITION_KEY);
9293

9394
// get unique connector name
94-
var connectorId = context.getSetting("dagx.ids.connector.name", "dagx-connector-" + UUID.randomUUID());
95+
var connectorId = getConnectorId(context);
9596

9697
monitor.info("CosmosTransferProcessStore will use connector id '" + connectorId + "'");
9798
context.registerService(TransferProcessStore.class, new CosmosTransferProcessStore(container, context.getTypeManager(), partitionKey, connectorId));

0 commit comments

Comments
 (0)