Skip to content
This repository was archived by the owner on Apr 1, 2025. It is now read-only.

Commit b6ce3ad

Browse files
committed
HTTP/webhook events
1 parent d8c69a0 commit b6ce3ad

File tree

9 files changed

+189
-36
lines changed

9 files changed

+189
-36
lines changed

extensions/transfer/transfer-demo-protocols/src/main/java/com/microsoft/dagx/transfer/demo/protocols/DemoProtocolsTransferExtension.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,19 @@
33
import com.fasterxml.jackson.databind.ObjectMapper;
44
import com.microsoft.dagx.spi.DagxSetting;
55
import com.microsoft.dagx.spi.monitor.Monitor;
6+
import com.microsoft.dagx.spi.protocol.web.WebService;
67
import com.microsoft.dagx.spi.security.Vault;
78
import com.microsoft.dagx.spi.system.ServiceExtension;
89
import com.microsoft.dagx.spi.system.ServiceExtensionContext;
910
import com.microsoft.dagx.spi.transfer.flow.DataFlowManager;
1011
import com.microsoft.dagx.spi.transfer.provision.ProvisionManager;
1112
import com.microsoft.dagx.spi.transfer.provision.ResourceManifestGenerator;
13+
import com.microsoft.dagx.transfer.demo.protocols.http.PubSubHttpEndpoint;
1214
import com.microsoft.dagx.transfer.demo.protocols.object.DemoObjectStorage;
1315
import com.microsoft.dagx.transfer.demo.protocols.object.ObjectStorageFlowController;
1416
import com.microsoft.dagx.transfer.demo.protocols.spi.object.ObjectStorageMessage;
15-
import com.microsoft.dagx.transfer.demo.protocols.spi.stream.TopicManager;
1617
import com.microsoft.dagx.transfer.demo.protocols.spi.stream.StreamPublisherRegistry;
18+
import com.microsoft.dagx.transfer.demo.protocols.spi.stream.TopicManager;
1719
import com.microsoft.dagx.transfer.demo.protocols.spi.stream.message.ConnectMessage;
1820
import com.microsoft.dagx.transfer.demo.protocols.spi.stream.message.DataMessage;
1921
import com.microsoft.dagx.transfer.demo.protocols.spi.stream.message.PubSubMessage;
@@ -48,11 +50,15 @@
4850
public class DemoProtocolsTransferExtension implements ServiceExtension {
4951
@DagxSetting
5052
private static final String WS_PUBSUB_ENDPOINT = "dagx.demo.protocol.ws.pubsub";
51-
5253
private static final String DEFAULT_WS_PUBSUB_ENDPOINT = "ws://localhost:8181/pubsub/";
5354

55+
@DagxSetting
56+
private static final String HTTP_PUBSUB_ENDPOINT = "dagx.demo.protocol.http.pubsub";
57+
private static final String DEFAULT_HTTP_PUBSUB_ENDPOINT = "http://localhost:8181/api/demo/pubsub/";
58+
5459
DemoObjectStorage objectStorage;
5560
DemoTopicManager topicManager;
61+
PubSubHttpEndpoint httpEndpoint;
5662

5763
private Monitor monitor;
5864

@@ -78,6 +84,9 @@ public void initialize(ServiceExtensionContext context) {
7884
var jettyService = context.getService(JettyService.class);
7985
new WebSocketFactory().publishEndpoint(PubSubServerEndpoint.class, () -> new PubSubServerEndpoint(topicManager, objectMapper, monitor), jettyService);
8086

87+
httpEndpoint = new PubSubHttpEndpoint(topicManager);
88+
context.getService(WebService.class).registerController(httpEndpoint);
89+
8190
registerGenerators(context);
8291

8392
registerProvisioners(topicManager, context);
@@ -91,6 +100,7 @@ public void initialize(ServiceExtensionContext context) {
91100
public void start() {
92101
objectStorage.start();
93102
topicManager.start();
103+
httpEndpoint.start();
94104
}
95105

96106
@Override
@@ -101,13 +111,17 @@ public void shutdown() {
101111
if (topicManager != null) {
102112
topicManager.stop();
103113
}
114+
if (httpEndpoint != null) {
115+
httpEndpoint.stop();
116+
}
104117
}
105118

106119
private void registerGenerators(ServiceExtensionContext context) {
107120
var manifestGenerator = context.getService(ResourceManifestGenerator.class);
108121

109-
var endpointAddress = context.getSetting(WS_PUBSUB_ENDPOINT, DEFAULT_WS_PUBSUB_ENDPOINT);
110-
manifestGenerator.registerClientGenerator(new PushStreamResourceGenerator(endpointAddress));
122+
var wsEndpointAddress = context.getSetting(WS_PUBSUB_ENDPOINT, DEFAULT_WS_PUBSUB_ENDPOINT);
123+
var httpEndpointAddress = context.getSetting(HTTP_PUBSUB_ENDPOINT, DEFAULT_HTTP_PUBSUB_ENDPOINT);
124+
manifestGenerator.registerClientGenerator(new PushStreamResourceGenerator(wsEndpointAddress, httpEndpointAddress));
111125
}
112126

113127
private void registerProvisioners(TopicManager topicManager, ServiceExtensionContext context) {

extensions/transfer/transfer-demo-protocols/src/main/java/com/microsoft/dagx/transfer/demo/protocols/http/HttpStreamSession.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,49 @@
11
package com.microsoft.dagx.transfer.demo.protocols.http;
22

3+
import com.microsoft.dagx.spi.DagxException;
34
import com.microsoft.dagx.transfer.demo.protocols.stream.StreamSession;
5+
import okhttp3.MediaType;
46
import okhttp3.OkHttpClient;
7+
import okhttp3.Request;
8+
import okhttp3.RequestBody;
59

10+
import java.io.IOException;
611
import java.net.URL;
712

813
/**
914
* Publishes to an HTTP endpoint.
1015
*/
1116
public class HttpStreamSession implements StreamSession {
1217
private URL endpointURL;
18+
private String destinationToken;
1319
private OkHttpClient httpClient;
1420

15-
public HttpStreamSession(URL endpointURL, OkHttpClient httpClient) {
21+
public HttpStreamSession(URL endpointURL, String destinationToken, OkHttpClient httpClient) {
1622
this.endpointURL = endpointURL;
23+
this.destinationToken = destinationToken;
1724
this.httpClient = httpClient;
1825
}
1926

2027
@Override
2128
public void publish(byte[] data) {
22-
throw new UnsupportedOperationException();
29+
try {
30+
var body = RequestBody.create(data, MediaType.get("application/json"));
31+
Request request = new Request.Builder()
32+
.url(endpointURL)
33+
.addHeader("Content-Type", "application/json")
34+
.addHeader("X-Authorization", destinationToken)
35+
.post(body)
36+
.build();
37+
38+
try (var response = httpClient.newCall(request).execute()) {
39+
if (response.code() != 200) {
40+
throw new DagxException("Invalid response received from destination: " + response.code());
41+
}
42+
}
43+
44+
} catch (IOException e) {
45+
throw new DagxException(e);
46+
}
2347
}
2448

2549
@Override
Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,88 @@
11
package com.microsoft.dagx.transfer.demo.protocols.http;
22

3+
import com.microsoft.dagx.spi.DagxException;
4+
import com.microsoft.dagx.transfer.demo.protocols.spi.stream.TopicManager;
35
import jakarta.ws.rs.Consumes;
6+
import jakarta.ws.rs.HeaderParam;
47
import jakarta.ws.rs.POST;
58
import jakarta.ws.rs.Path;
69
import jakarta.ws.rs.PathParam;
710
import jakarta.ws.rs.Produces;
811
import jakarta.ws.rs.core.MediaType;
12+
import jakarta.ws.rs.core.Response;
13+
14+
import java.util.concurrent.ExecutorService;
15+
import java.util.concurrent.Executors;
16+
import java.util.concurrent.LinkedBlockingQueue;
17+
import java.util.concurrent.atomic.AtomicBoolean;
18+
import java.util.function.Consumer;
919

1020
/**
11-
*
21+
* A JAX-RS endpoint that accepts data to be published to a topic.
1222
*/
1323
@Consumes({MediaType.APPLICATION_JSON})
1424
@Produces({MediaType.APPLICATION_JSON})
1525
@Path("/demo/pubsub")
1626
public class PubSubHttpEndpoint {
27+
private TopicManager topicManager;
28+
29+
private AtomicBoolean active = new AtomicBoolean();
30+
31+
private ExecutorService executorService;
32+
private LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<>();
33+
34+
public PubSubHttpEndpoint(TopicManager topicManager) {
35+
this.topicManager = topicManager;
36+
executorService = Executors.newSingleThreadExecutor();
37+
}
38+
39+
public void start() {
40+
active.set(true);
41+
executorService.submit(this::run);
42+
}
43+
44+
public void stop() {
45+
active.set(false);
46+
if (executorService != null) {
47+
executorService.shutdownNow();
48+
}
49+
}
1750

1851
@POST
1952
@Path("{destinationName}")
20-
public void publish(@PathParam("destinationName") String destinationName) {
53+
public Response publish(@PathParam("destinationName") String topicName, @HeaderParam("X-Authorization") String token, byte[] data) {
54+
var result = topicManager.connect(topicName, token);
55+
if (result.success()) {
56+
var entry = new QueueEntry(result.getConsumer(), data);
57+
try {
58+
queue.put(entry);
59+
} catch (InterruptedException e) {
60+
Thread.interrupted();
61+
throw new DagxException(e);
62+
}
63+
return Response.ok().build();
64+
} else {
65+
return Response.status(Response.Status.BAD_REQUEST).build();
66+
}
67+
}
68+
69+
private static class QueueEntry {
70+
Consumer<byte[]> connection;
71+
byte[] data;
72+
73+
public QueueEntry(Consumer<byte[]> connection, byte[] data) {
74+
this.connection = connection;
75+
this.data = data;
76+
}
77+
}
2178

79+
private void run() {
80+
while (active.get()) {
81+
var entry = queue.poll();
82+
if (entry != null) {
83+
entry.connection.accept(entry.data);
84+
}
85+
}
2286
}
2387

2488
}

extensions/transfer/transfer-demo-protocols/src/main/java/com/microsoft/dagx/transfer/demo/protocols/spi/DemoProtocols.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ public interface DemoProtocols {
99

1010
String PULL_STREAM = "dagx:demo:pullstream";
1111

12-
String PUSH_STREAM = "dagx:demo:pushstream";
12+
String PUSH_STREAM_WS = "dagx:demo:pushstream:ws";
13+
14+
String PUSH_STREAM_HTTP = "dagx:demo:pushstream:http";
1315

1416
String ENDPOINT_ADDRESS = "endpointAddress";
1517

extensions/transfer/transfer-demo-protocols/src/main/java/com/microsoft/dagx/transfer/demo/protocols/stream/PushStreamContext.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,15 @@ public StreamSession createSession(String uri, String topicName, String secretNa
3737
var accessToken = vault.resolveSecret(secretName);
3838
var destinationToken = readAccessToken(accessToken, secretName);
3939
var endpointUri = URI.create(uri);
40+
4041
if ("ws".equalsIgnoreCase(endpointUri.getScheme())) {
4142
var session = new WsPushStreamSession(endpointUri, topicName, destinationToken, objectMapper, monitor);
4243
session.connect();
4344
return session;
4445
} else if ("https".equalsIgnoreCase(endpointUri.getScheme()) || "http".equalsIgnoreCase(endpointUri.getScheme())) {
4546
try {
46-
var endpointUrl = endpointUri.toURL();
47-
return new HttpStreamSession(endpointUrl, httpClient);
47+
var endpointUrl = endpointUri.resolve(topicName).toURL();
48+
return new HttpStreamSession(endpointUrl, destinationToken, httpClient);
4849
} catch (MalformedURLException e) {
4950
throw new DagxException(e);
5051
}

extensions/transfer/transfer-demo-protocols/src/main/java/com/microsoft/dagx/transfer/demo/protocols/stream/PushStreamFlowController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public PushStreamFlowController(StreamPublisherRegistry registry) {
1919

2020
@Override
2121
public boolean canHandle(DataRequest dataRequest) {
22-
return DemoProtocols.PUSH_STREAM.equals(dataRequest.getDestinationType());
22+
return DemoProtocols.PUSH_STREAM_WS.equals(dataRequest.getDestinationType()) || DemoProtocols.PUSH_STREAM_HTTP.equals(dataRequest.getDestinationType()) ;
2323
}
2424

2525
@Override

extensions/transfer/transfer-demo-protocols/src/main/java/com/microsoft/dagx/transfer/demo/protocols/stream/PushStreamProvisionedResourceDefinition.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public String getResourceName() {
3636
@Override
3737
public DataAddress createDataDestination() {
3838
return DataAddress.Builder.newInstance()
39-
.type(DemoProtocols.PUSH_STREAM)
39+
.type(DemoProtocols.PUSH_STREAM_WS)
4040
.keyName("demo-temp-" + destinationName)
4141
.property(DemoProtocols.ENDPOINT_ADDRESS, endpointAddress)
4242
.property(DemoProtocols.DESTINATION_NAME, destinationName)

extensions/transfer/transfer-demo-protocols/src/main/java/com/microsoft/dagx/transfer/demo/protocols/stream/PushStreamResourceGenerator.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,30 @@
1212
* generated. Otherwise, a definition containing metadata to create a destination topic will be returned.
1313
*/
1414
public class PushStreamResourceGenerator implements ResourceDefinitionGenerator {
15-
private final String endpointAddress;
15+
private final String wsEndpointAddress;
16+
private final String httpEndpointAddress;
1617

17-
public PushStreamResourceGenerator(String endpointAddress) {
18-
this.endpointAddress = endpointAddress;
18+
public PushStreamResourceGenerator(String wsEndpointAddress, String httpEndpointAddress) {
19+
this.wsEndpointAddress = wsEndpointAddress;
20+
this.httpEndpointAddress = httpEndpointAddress;
1921
}
2022

2123
@Override
2224
public @Nullable PushStreamResourceDefinition generate(TransferProcess process) {
2325
var dataRequest = process.getDataRequest();
24-
25-
if (!DemoProtocols.PUSH_STREAM.equals(dataRequest.getDestinationType())) {
26-
return null;
27-
}
2826
if (!dataRequest.isManagedResources()) {
2927
// The resource is unmanaged, which means it was created by an external system. In this case it does not need to be provisioned.
3028
return null;
3129
}
3230

31+
String endpointAddress;
32+
if (DemoProtocols.PUSH_STREAM_WS.equals(dataRequest.getDestinationType())) {
33+
endpointAddress = wsEndpointAddress;
34+
} else if (DemoProtocols.PUSH_STREAM_HTTP.equals(dataRequest.getDestinationType())) {
35+
endpointAddress = httpEndpointAddress;
36+
} else {
37+
return null;
38+
}
3339
var destinationName = dataRequest.getDataDestination().getProperty(DemoProtocols.DESTINATION_NAME);
3440
if (destinationName == null) {
3541
destinationName = UUID.randomUUID().toString();

0 commit comments

Comments
 (0)