Skip to content

Commit 80b417e

Browse files
committed
Rename integration agent to connector
1 parent 5034181 commit 80b417e

10 files changed

+341
-205
lines changed

README.md

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,25 @@ Using the SDK, you can build Java applications to automate data platform operati
1111
- Notify downstream consumers when data contract tests have failed
1212
- Publish data product costs and usage data to Data Mesh Manager
1313

14-
This SDK is designed as a foundation for building data platform integrations that run as long-running agents on customer's data platform, e.g., as containers running in a Kubernetes cluster or any other container-runtime.
14+
This SDK is designed as a foundation for building data platform integrations that run as long-running connectors on customer's data platform, e.g., as containers running in a Kubernetes cluster or any other container-runtime.
1515

1616
It interacts with the Data Mesh Manager APIs to send metadata and to subscribe to events to trigger actions in the data platform or with other services.
1717

1818

19-
Existing Integrations
19+
Existing Connectors
2020
---
2121

22-
We provide some agents for commonly-used platforms that that use this SDK and that can be used out-of-the-box or as a template for custom integrations:
22+
We provide some connectors for commonly-used platforms that that use this SDK and that can be used out-of-the-box or as a template for custom integrations:
2323

24-
| Platform | Integration | Synchronize Assets | Access Management | Remarks |
25-
|-------------|------------------------------------------------------------------------------------------------------------|--------------------|-------------------|-------------------------|
26-
| Databricks | [datamesh-manager-agent-databricks](https://github.com/datamesh-manager/datamesh-manager-agent-databricks) ||| Uses Unity Catalog APIs |
27-
| Snowflake | [datamesh-manager-agent-snowflake](https://github.com/datamesh-manager/datamesh-manager-agent-snowflake) | | | Uses the Snowflake REST API |
28-
| AWS | | | | Coming soon |
29-
| Google Cloud Platform | [datamesh-manager-agent-gcp](https://github.com/datamesh-manager/datamesh-manager-agent-gcp) | | | Uses BigQuery APIs |
30-
| Azure | | | | Coming soon |
31-
| datahub | | | | Coming soon |
32-
| Collibra | | | | Coming soon |
24+
| Platform | Connector | Synchronize Assets | Access Management | Remarks |
25+
|-----------------------|------------------------------------------------------------------------------------------------------------|--------------------|-------------------|-----------------------------|
26+
| Databricks | [datamesh-manager-agent-databricks](https://github.com/datamesh-manager/datamesh-manager-agent-databricks) ||| Uses Unity Catalog APIs |
27+
| Snowflake | [datamesh-manager-agent-snowflake](https://github.com/datamesh-manager/datamesh-manager-agent-snowflake) ||| Uses the Snowflake REST API |
28+
| AWS | | | | Coming soon |
29+
| Google Cloud Platform | [datamesh-manager-agent-gcp](https://github.com/datamesh-manager/datamesh-manager-agent-gcp) || | Uses BigQuery APIs |
30+
| Azure | | | | Coming soon |
31+
| datahub | | | | Coming soon |
32+
| Collibra | | | | Coming soon |
3333

3434
If you are interested in further integration, please [contact us](https://entropy-data.atlassian.net/servicedesk/customer/portals).
3535

@@ -90,10 +90,10 @@ public class MyAssetsProvider implements DataMeshManagerAssetsProvider {
9090
With this implementation, you can start an `DataMeshManagerAssetsSynchronizer`:
9191

9292
```java
93-
var agentid = "my-unique-assets-synchronization-agent-id";
93+
var connectorid = "my-unique-assets-synchronization-connector-id";
9494
var assetsProvider = new MyAssetsProvider();
95-
var assetsSynchronizer = new DataMeshManagerAssetsSynchronizer(agentid, client, assetsSupplier);
96-
assetsSynchronizer.start(); // This will start a long-running agent that calls the fetchAssets method periodically
95+
var assetsSynchronizer = new DataMeshManagerAssetsSynchronizer(connectorid, client, assetsSupplier);
96+
assetsSynchronizer.start(); // This will start a long-running connector that calls the fetchAssets method periodically
9797
```
9898

9999
### Implement an EventListener (optional)
@@ -121,19 +121,19 @@ You can listen to any event from Data Mesh Manager. The SDK provides a method fo
121121
With this implementation, you can start an `DataMeshManagerEventListener`:
122122

123123
```java
124-
var agentid = "my-unique-event-listener-agent-id";
124+
var connectorid = "my-unique-event-listener-connector-id";
125125
var eventHandler = new MyEventHandler();
126126
var stateRepository = ... // see below
127-
var eventListener = new DataMeshManagerEventListener(agentid, client, eventHandler, stateRepository);
128-
eventListener.start(); // This will start a long-running agent that listens to events from Data Mesh Manager
127+
var eventListener = new DataMeshManagerEventListener(connectorid, client, eventHandler, stateRepository);
128+
eventListener.start(); // This will start a long-running connector that listens to events from Data Mesh Manager
129129
```
130130

131-
If you have multiple agents in an application, make sure to start the `start()` methods in separate threads.
131+
If you have multiple connectors in an application, make sure to start the `start()` methods in separate threads.
132132

133133
### State Repository
134134

135135
The `DataMeshManagerEventListener` requires a `DataMeshManagerStateRepository` to store the `lastEventId` that has been processed.
136-
Also, you can use the state repository in other agents, if you need to store information what has been processed or what is the current state of your agent.
136+
Also, you can use the state repository in other connectors, if you need to store information what has been processed or what is the current state of your connector.
137137
You can implement this interface to store the state in a database, a file, or any other storage:
138138

139139
```java
@@ -146,8 +146,8 @@ public interface DataMeshManagerStateRepository {
146146
For your convenience, you can use the `DataMeshManagerStateRepositoryRemote` to store the state directly in the Data Mesh Manager:
147147

148148
```java
149-
var agentId = "my-unique-event-listener-agent-id";
150-
var stateRepository = new DataMeshManagerStateRepositoryRemote(agentId, client);
149+
var connectorId = "my-unique-event-listener-connector-id";
150+
var stateRepository = new DataMeshManagerStateRepositoryRemote(connectorId, client);
151151
```
152152

153153
and for testing there is also a `DataMeshManagerStateRepositoryInMemory`.

src/main/java/datameshmanager/sdk/DataMeshManagerAgentRegistration.java

Lines changed: 0 additions & 71 deletions
This file was deleted.

src/main/java/datameshmanager/sdk/DataMeshManagerAssetsSynchronizer.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,30 @@ public class DataMeshManagerAssetsSynchronizer {
1313

1414
private static final Logger log = LoggerFactory.getLogger(DataMeshManagerAssetsSynchronizer.class);
1515

16-
private final String agentId;
16+
private final String connectorId;
1717
private final DataMeshManagerClient client;
18-
private final DataMeshManagerAgentRegistration agentRegistration;
18+
private final DataMeshManagerConnectorRegistration connectorRegistration;
1919
private final DataMeshManagerAssetsProvider assetsProvider;
2020
private volatile boolean stopped = false;
2121

2222
private Duration delay = Duration.parse("PT60M");
2323

2424
public DataMeshManagerAssetsSynchronizer(
25-
String agentId,
25+
String connectorId,
2626
DataMeshManagerClient client,
2727
DataMeshManagerAssetsProvider assetsProvider) {
28-
this.agentId = agentId;
28+
this.connectorId = connectorId;
2929
this.client = client;
3030
this.assetsProvider = assetsProvider;
31-
this.agentRegistration = new DataMeshManagerAgentRegistration(client, agentId, "assets-synchronizer");
31+
this.connectorRegistration = new DataMeshManagerConnectorRegistration(client, connectorId, "assets-synchronizer");
3232

33-
this.agentRegistration.register();
33+
this.connectorRegistration.register();
3434
}
3535

3636
public void start() {
37-
log.info("{}: start syncing assets", agentId);
37+
log.info("{}: start syncing assets", connectorId);
3838

39-
// TODO error handling for agentRegistration
39+
// TODO error handling for connectorRegistration
4040
// TODO error handling during while loop
4141

4242
while (!this.stopped) {
@@ -52,11 +52,11 @@ public void start() {
5252

5353
public void stop() {
5454
if (this.stopped) {
55-
log.info("{}: Already stopped asset synchronization", agentId);
55+
log.info("{}: Already stopped asset synchronization", connectorId);
5656
return;
5757
}
5858
this.stopped = true;
59-
log.info("{}: Stopped syncing assets", agentId);
59+
log.info("{}: Stopped syncing assets", connectorId);
6060
}
6161

6262
public void synchronizeAssets() {

src/main/java/datameshmanager/sdk/DataMeshManagerClient.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
import datameshmanager.sdk.client.ApiClient;
44
import datameshmanager.sdk.client.api.AccessApi;
55
import datameshmanager.sdk.client.api.AssetsApi;
6+
import datameshmanager.sdk.client.api.ConnectorsApi;
67
import datameshmanager.sdk.client.api.CostsApi;
78
import datameshmanager.sdk.client.api.DataContractsApi;
89
import datameshmanager.sdk.client.api.DataProductsApi;
910
import datameshmanager.sdk.client.api.DefinitionsApi;
1011
import datameshmanager.sdk.client.api.EventsApi;
11-
import datameshmanager.sdk.client.api.IntegrationsApi;
1212
import datameshmanager.sdk.client.api.SourceSystemsApi;
1313
import datameshmanager.sdk.client.api.TagsApi;
1414
import datameshmanager.sdk.client.api.TeamsApi;
@@ -29,7 +29,7 @@ public class DataMeshManagerClient {
2929
private final TagsApi tagsApi;
3030
private final TeamsApi teamsApi;
3131
private final TestResultsApi testResultsApi;
32-
private final IntegrationsApi integrationsApi;
32+
private final ConnectorsApi connectorsApi;
3333

3434
public DataMeshManagerClient(String host, String apiKey) {
3535
var apiClient = new ApiClient();
@@ -48,7 +48,7 @@ public DataMeshManagerClient(String host, String apiKey) {
4848
this.tagsApi = new TagsApi(apiClient);
4949
this.teamsApi = new TeamsApi(apiClient);
5050
this.testResultsApi = new TestResultsApi(apiClient);
51-
this.integrationsApi = new IntegrationsApi(apiClient);
51+
this.connectorsApi = new ConnectorsApi(apiClient);
5252
}
5353

5454
public ApiClient getApiClient() {
@@ -99,7 +99,7 @@ public TestResultsApi getTestResultsApi() {
9999
return testResultsApi;
100100
}
101101

102-
public IntegrationsApi getIntegrationsApi() {
103-
return integrationsApi;
102+
public ConnectorsApi getConnectorsApi() {
103+
return connectorsApi;
104104
}
105105
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package datameshmanager.sdk;
2+
3+
import datameshmanager.sdk.client.ApiException;
4+
import datameshmanager.sdk.client.model.Connector;
5+
import datameshmanager.sdk.client.model.ConnectorInfo;
6+
import java.util.Objects;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
public class DataMeshManagerConnectorRegistration {
11+
12+
private static final Logger log = LoggerFactory.getLogger(DataMeshManagerConnectorRegistration.class);
13+
14+
private final DataMeshManagerClient client;
15+
16+
/**
17+
* The unique identifier of the connector.
18+
* This is used to identify the connector in the Data Mesh Manager and to exchange state.
19+
* The identifier should be constant across restarts of the connector.
20+
*/
21+
private final String id;
22+
23+
/**
24+
* The type of the connector (e.g. databricks-assets, databricks-access, aws-costs, ...).
25+
*/
26+
private final String type;
27+
28+
public DataMeshManagerConnectorRegistration(DataMeshManagerClient client, String connectorId, String type) {
29+
this.client = client;
30+
this.id = Objects.requireNonNull(connectorId, "Connector ID is required");
31+
this.type = Objects.requireNonNull(type, "Connector type is required");
32+
}
33+
34+
public void register() {
35+
36+
Connector connector;
37+
try {
38+
log.debug("Checking if integration connector {} already exists", id);
39+
connector = client.getConnectorsApi().getConnector(id);
40+
} catch (ApiException e) {
41+
if (e.getCode() == 404) {
42+
connector = new Connector();
43+
} else {
44+
throw e;
45+
}
46+
}
47+
48+
connector
49+
.id(id)
50+
.info(new ConnectorInfo()
51+
.type(type)
52+
);
53+
54+
log.info("Registering integration connector {}", id);
55+
client.getConnectorsApi().putConnector(id, connector);
56+
}
57+
58+
public void delete() {
59+
log.info("Deleting integration connector {}", this.id);
60+
try {
61+
client.getConnectorsApi().deleteConnector(this.id);
62+
} catch (ApiException e) {
63+
if (e.getCode() == 404) {
64+
log.error("Integration connector with id {} already deleted", this.id);
65+
} else {
66+
throw e;
67+
}
68+
}
69+
}
70+
71+
}

src/main/java/datameshmanager/sdk/DataMeshManagerEventListener.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,39 +52,39 @@ public class DataMeshManagerEventListener {
5252

5353
private static final Logger log = LoggerFactory.getLogger(DataMeshManagerEventListener.class);
5454

55-
private final String agentId;
55+
private final String connectorId;
5656
private final DataMeshManagerEventHandler eventHandler;
5757
private final DataMeshManagerClient client;
5858
private final DataMeshManagerStateRepository stateRepository;
5959

6060
private final ObjectMapper objectMapper;
61-
private final DataMeshManagerAgentRegistration agentRegistration;
61+
private final DataMeshManagerConnectorRegistration connectorRegistration;
6262

6363
private boolean stopped = false;
6464
private Duration pollInterval = Duration.ofSeconds(5);
6565

66-
public DataMeshManagerEventListener(String agentId, String type, DataMeshManagerClient client, DataMeshManagerEventHandler eventHandler,
66+
public DataMeshManagerEventListener(String connectorId, String type, DataMeshManagerClient client, DataMeshManagerEventHandler eventHandler,
6767
DataMeshManagerStateRepository stateRepository) {
68-
this.agentId = Objects.requireNonNull(agentId, "agentId must not be null");
68+
this.connectorId = Objects.requireNonNull(connectorId, "connectorId must not be null");
6969
this.eventHandler = Objects.requireNonNull(eventHandler, "eventHandler must not be null");
7070
this.client = Objects.requireNonNull(client, "client must not be null");
7171
this.stateRepository = Objects.requireNonNull(stateRepository, "stateRepository must not be null");
72-
this.agentRegistration = new DataMeshManagerAgentRegistration(client, agentId, type);
72+
this.connectorRegistration = new DataMeshManagerConnectorRegistration(client, connectorId, type);
7373

7474
this.objectMapper = new ObjectMapper()
7575
.findAndRegisterModules()
7676
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
7777

78-
this.agentRegistration.register();
78+
this.connectorRegistration.register();
7979
}
8080

8181
/**
8282
* Starts the event listener to poll for events from the DataMeshManager in an infinite loop.
8383
*/
8484
public void start() {
85-
log.info("{}: Start polling for events", agentId);
85+
log.info("{}: Start polling for events", connectorId);
8686

87-
// TODO error handling for agentRegistration
87+
// TODO error handling for connectorRegistration
8888

8989
var lastEventId = getLastEventId();
9090
while (!this.stopped) {

0 commit comments

Comments
 (0)