Skip to content
This repository was archived by the owner on Apr 1, 2025. It is now read-only.

Commit 721a52d

Browse files
added event sourcing and forwarding to Azure Event Grid (#106)
* added event sourcing and forwarding to Azure Event Grid * supply default values for topic and topic endpoint * added the topic props to the dockerfile and terraform * added DTO for events Co-authored-by: Paul Latzelsperger <[email protected]>
1 parent 8c0a3bd commit 721a52d

File tree

32 files changed

+598
-98
lines changed

32 files changed

+598
-98
lines changed

core/src/main/java/com/microsoft/dagx/system/DefaultServiceExtensionContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,15 @@ public <T> T getService(Class<T> type) {
9898
return service;
9999
}
100100

101+
@Override
102+
@SuppressWarnings("unchecked")
103+
public <T> T getService(Class<T> type, boolean isOptional) {
104+
if (!isOptional) {
105+
return getService(type);
106+
}
107+
return (T) services.get(type);
108+
}
109+
101110
@Override
102111
public <T> void registerService(Class<T> type, T service) {
103112
services.put(type, service);

distributions/demo-e2e/Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,6 @@ ENTRYPOINT java \
1717
-Ddagx.nifi.flow.url=${NIFI_FLOW_URL} \
1818
-Ddagx.cosmos.account.name=${COSMOS_ACCOUNT} \
1919
-Ddagx.cosmos.database.name=${COSMOS_DB} \
20+
-Ddagx.events.topic.name=${TOPIC_NAME} \
21+
-Ddagx.events.topic.endpoint=${TOPIC_ENDPOINT} \
2022
-Djava.security.edg=file:/dev/.urandom -jar dagx-demo-e2e.jar

distributions/demo-e2e/build.gradle.kts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ dependencies {
2222
implementation(project(":extensions:transfer:transfer-provision-azure"))
2323
implementation(project(":extensions:transfer:transfer-nifi"))
2424

25+
implementation(project(":extensions:events:events-azure"))
26+
2527
implementation(project(":extensions:ids"))
2628

27-
// todo: replace with atlas - but we need this for the time being to provide catalog entries
2829
implementation(project(":extensions:catalog:catalog-atlas"))
2930
implementation(project(":extensions:dataseed"))
3031

extensions/catalog/catalog-atlas/src/main/java/com/microsoft/dagx/catalog/atlas/metadata/AtlasExtension.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package com.microsoft.dagx.catalog.atlas.metadata;
77

88
import com.microsoft.dagx.schema.SchemaRegistry;
9+
import com.microsoft.dagx.spi.metadata.MetadataObservable;
910
import com.microsoft.dagx.spi.metadata.MetadataStore;
1011
import com.microsoft.dagx.spi.security.Vault;
1112
import com.microsoft.dagx.spi.system.ServiceExtension;
@@ -24,7 +25,7 @@ public class AtlasExtension implements ServiceExtension {
2425

2526
@Override
2627
public Set<String> provides() {
27-
return Set.of(ATLAS_FEATURE);
28+
return Set.of(ATLAS_FEATURE, "dagx:metadata-store-observable");
2829
}
2930

3031
@Override
@@ -47,7 +48,10 @@ public void initialize(ServiceExtensionContext context) {
4748
var pwd = vault.resolveSecret(SECRET_ATLAS_PWD);
4849
var api = new AtlasApiImpl(atlasUrl, user, pwd, context.getService(OkHttpClient.class), context.getTypeManager());
4950
context.registerService(AtlasApi.class, api);
50-
context.registerService(MetadataStore.class, new AtlasMetadataStore(api, context.getMonitor(), context.getService(SchemaRegistry.class)));
51+
final AtlasMetadataStore store = new AtlasMetadataStore(api, context.getMonitor(), context.getService(SchemaRegistry.class));
52+
context.registerService(MetadataStore.class, store);
53+
context.registerService(MetadataObservable.class, store);
54+
5155
context.getMonitor().info("Initialized Atlas API extension.");
5256

5357
context.getTypeManager().registerTypes(AtlasDataCatalogEntry.class);

extensions/catalog/catalog-atlas/src/main/java/com/microsoft/dagx/catalog/atlas/metadata/AtlasMetadataStore.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import com.microsoft.dagx.schema.DataSchema;
1515
import com.microsoft.dagx.schema.SchemaRegistry;
1616
import com.microsoft.dagx.spi.DagxException;
17+
import com.microsoft.dagx.spi.metadata.MetadataListener;
18+
import com.microsoft.dagx.spi.metadata.MetadataObservable;
1719
import com.microsoft.dagx.spi.metadata.MetadataStore;
1820
import com.microsoft.dagx.spi.monitor.Monitor;
1921
import com.microsoft.dagx.spi.types.domain.metadata.DataEntry;
@@ -30,7 +32,7 @@
3032
import static java.util.stream.Collectors.toSet;
3133

3234
@SuppressWarnings("unchecked")
33-
public class AtlasMetadataStore implements MetadataStore {
35+
public class AtlasMetadataStore extends MetadataObservable implements MetadataStore {
3436
private static final String ATLAS_PROPERTY_KEYNAME = "keyName";
3537
private static final String ATLAS_PROPERTY_TYPE = "type";
3638
private final AtlasApi atlasApi;
@@ -46,7 +48,7 @@ public AtlasMetadataStore(AtlasApi atlasApi, Monitor monitor, SchemaRegistry sch
4648

4749
@Override
4850
public @Nullable DataEntry findForId(String id) {
49-
51+
getListeners().forEach(MetadataListener::searchInitiated);
5052
var properties = atlasApi.getEntityById(id);
5153

5254
if (properties == null) {
@@ -101,11 +103,13 @@ private String getPolicyIdForEntity(AtlasEntity entry) {
101103

102104
@Override
103105
public void save(DataEntry entry) {
106+
getListeners().forEach(MetadataListener::metadataItemAdded);
104107
monitor.severe("Save not yet implemented");
105108
}
106109

107110
@Override
108111
public @NotNull Collection<DataEntry> queryAll(Collection<Policy> policies) {
112+
getListeners().forEach(MetadataListener::querySubmitted);
109113
if (policies.isEmpty()) {
110114
return Collections.emptyList();
111115
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright (c) Microsoft Corporation.
3+
* All rights reserved.
4+
*/
5+
6+
plugins {
7+
`java-library`
8+
}
9+
10+
val eventGridSdkVersion: String by project
11+
12+
dependencies {
13+
api(project(":spi"))
14+
implementation(project(":extensions:schema"))
15+
implementation("com.azure:azure-messaging-eventgrid:${eventGridSdkVersion}")
16+
}
17+
18+
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright (c) Microsoft Corporation.
3+
* All rights reserved.
4+
*
5+
*/
6+
package com.microsoft.dagx.events.azure;
7+
8+
import com.azure.core.credential.AzureKeyCredential;
9+
import com.azure.messaging.eventgrid.EventGridPublisherClientBuilder;
10+
import com.microsoft.dagx.spi.metadata.MetadataObservable;
11+
import com.microsoft.dagx.spi.monitor.Monitor;
12+
import com.microsoft.dagx.spi.security.Vault;
13+
import com.microsoft.dagx.spi.system.ServiceExtension;
14+
import com.microsoft.dagx.spi.system.ServiceExtensionContext;
15+
import com.microsoft.dagx.spi.transfer.TransferProcessObservable;
16+
17+
import java.util.Objects;
18+
import java.util.Set;
19+
20+
public class AzureEventExtension implements ServiceExtension {
21+
22+
private static final String DEFAULT_TOPIC_NAME = "connector-events";
23+
private static final String DEFAULT_ENDPOINT_NAME_TEMPLATE = "https://%s.westeurope-1.eventgrid.azure.net/api/events";
24+
private static final String TOPIC_NAME_SETTING = "dagx.events.topic.name";
25+
private static final String TOPIC_ENDPOINT_SETTING = "dagx.events.topic.endpoint";
26+
private Monitor monitor;
27+
28+
@Override
29+
public Set<String> requires() {
30+
return Set.of("dagx:transfer-process-observable");
31+
}
32+
33+
@Override
34+
public void initialize(ServiceExtensionContext context) {
35+
monitor = context.getMonitor();
36+
37+
monitor.info("AzureEventsExtension: create event grid appender");
38+
registerListeners(context);
39+
40+
monitor.info("Initialized Azure Events Extension");
41+
}
42+
43+
44+
@Override
45+
public void start() {
46+
monitor.info("Started Azure Events Extension");
47+
}
48+
49+
@Override
50+
public void shutdown() {
51+
monitor.info("Shutdown Azure Events Extension");
52+
}
53+
54+
private void registerListeners(ServiceExtensionContext context) {
55+
56+
var vault = context.getService(Vault.class);
57+
58+
var topicName = getTopic(context);
59+
var endpoint = getEndpoint(context, topicName);
60+
monitor.info("AzureEventExtension: will use topic endpoint " + endpoint);
61+
62+
var publisherClient = new EventGridPublisherClientBuilder()
63+
.credential(new AzureKeyCredential(Objects.requireNonNull(vault.resolveSecret(topicName), "Did not find secret in vault: " + endpoint)))
64+
.endpoint(endpoint)
65+
.buildEventGridEventPublisherAsyncClient();
66+
67+
final AzureEventGridPublisher publisher = new AzureEventGridPublisher(monitor, publisherClient);
68+
69+
var processObservable = context.getService(TransferProcessObservable.class, true);
70+
if (processObservable != null) {
71+
processObservable.registerListener(publisher);
72+
}
73+
74+
var metadataObservable = context.getService(MetadataObservable.class, true);
75+
if (metadataObservable != null) {
76+
metadataObservable.registerListener(publisher);
77+
}
78+
79+
80+
}
81+
82+
private String getTopic(ServiceExtensionContext context) {
83+
return context.getSetting(TOPIC_NAME_SETTING, DEFAULT_TOPIC_NAME);
84+
}
85+
86+
private String getEndpoint(ServiceExtensionContext context, String topicName) {
87+
var ep = context.getSetting(TOPIC_ENDPOINT_SETTING, null);
88+
if (ep == null) {
89+
ep = String.format(DEFAULT_ENDPOINT_NAME_TEMPLATE, topicName);
90+
}
91+
return ep;
92+
}
93+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright (c) Microsoft Corporation.
3+
* All rights reserved.
4+
*
5+
*/
6+
7+
package com.microsoft.dagx.events.azure;
8+
9+
import com.azure.core.util.BinaryData;
10+
import com.azure.messaging.eventgrid.EventGridEvent;
11+
import com.azure.messaging.eventgrid.EventGridPublisherAsyncClient;
12+
import com.microsoft.dagx.spi.metadata.MetadataListener;
13+
import com.microsoft.dagx.spi.monitor.Monitor;
14+
import com.microsoft.dagx.spi.transfer.TransferProcessListener;
15+
import com.microsoft.dagx.spi.types.domain.transfer.TransferProcess;
16+
import com.microsoft.dagx.spi.types.domain.transfer.TransferProcessStates;
17+
import org.jetbrains.annotations.NotNull;
18+
import reactor.core.publisher.BaseSubscriber;
19+
import reactor.core.publisher.Mono;
20+
21+
class AzureEventGridPublisher implements TransferProcessListener, MetadataListener {
22+
23+
private final Monitor monitor;
24+
private final EventGridPublisherAsyncClient<EventGridEvent> client;
25+
private final String eventTypeTransferprocess = "dagx/transfer/transferprocess";
26+
private final String eventTypeMetadata = "dagx/metadata/store";
27+
28+
public AzureEventGridPublisher(Monitor monitor, EventGridPublisherAsyncClient<EventGridEvent> client) {
29+
this.monitor = monitor;
30+
this.client = client;
31+
}
32+
33+
@Override
34+
public void created(TransferProcess process) {
35+
var dto = createDto(process);
36+
if (process.getType() == TransferProcess.Type.CLIENT) {
37+
sendEvent("createdClient", eventTypeTransferprocess, dto).subscribe(new LoggingSubscriber<>("Transfer process created"));
38+
} else {
39+
sendEvent("createdProvider", eventTypeTransferprocess, dto).subscribe(new LoggingSubscriber<>("Transfer process created"));
40+
}
41+
}
42+
43+
@Override
44+
public void completed(TransferProcess process) {
45+
sendEvent("completed", eventTypeTransferprocess, createDto(process)).subscribe(new LoggingSubscriber<>("Transfer process completed"));
46+
}
47+
48+
49+
@Override
50+
public void deprovisioned(TransferProcess process) {
51+
sendEvent("deprovisioned", eventTypeTransferprocess, createDto(process)).subscribe(new LoggingSubscriber<>("Transfer process resources deprovisioned"));
52+
53+
}
54+
55+
@Override
56+
public void ended(TransferProcess process) {
57+
sendEvent("ended", eventTypeTransferprocess, createDto(process)).subscribe(new LoggingSubscriber<>("Transfer process ended"));
58+
59+
}
60+
61+
@Override
62+
public void error(TransferProcess process) {
63+
sendEvent("error", eventTypeTransferprocess, createDto(process)).subscribe(new LoggingSubscriber<>("Transfer process errored!"));
64+
65+
}
66+
67+
@Override
68+
public void querySubmitted() {
69+
sendEvent("querySubmitted", eventTypeMetadata, null).subscribe(new LoggingSubscriber<>("query submitted"));
70+
}
71+
72+
@Override
73+
public void searchInitiated() {
74+
sendEvent("searchInitiated", eventTypeMetadata, null).subscribe(new LoggingSubscriber<>("search initiated"));
75+
}
76+
77+
@Override
78+
public void metadataItemAdded() {
79+
sendEvent("itemAdded", eventTypeMetadata, null).subscribe(new LoggingSubscriber<>("AzureEventGrid: metadata item added"));
80+
}
81+
82+
@Override
83+
public void metadataItemUpdated() {
84+
sendEvent("itemUpdated", eventTypeMetadata, null).subscribe(new LoggingSubscriber<>("metadata item updated"));
85+
}
86+
87+
private Mono<Void> sendEvent(String what, String where, Object payload) {
88+
final BinaryData data = BinaryData.fromObject(payload);
89+
var evt = new EventGridEvent(what, where, data, "0.1");
90+
return client.sendEvent(evt);
91+
}
92+
93+
@NotNull
94+
private TransferProcessDto createDto(TransferProcess process) {
95+
return TransferProcessDto.Builder.newInstance()
96+
.state(TransferProcessStates.from(process.getState()))
97+
.requestId(process.getDataRequest().getId())
98+
.type(process.getType())
99+
.build();
100+
}
101+
102+
private class LoggingSubscriber<T> extends BaseSubscriber<T> {
103+
104+
private final String message;
105+
106+
LoggingSubscriber(String message) {
107+
this.message = message;
108+
}
109+
110+
@Override
111+
protected void hookOnComplete() {
112+
monitor.info("AzureEventGrid: " + message);
113+
}
114+
115+
@Override
116+
protected void hookOnError(@NotNull Throwable throwable) {
117+
monitor.severe("Error during event publishing", throwable);
118+
}
119+
}
120+
}

0 commit comments

Comments
 (0)