Skip to content
This repository was archived by the owner on Apr 1, 2025. It is now read-only.

Commit 9558471

Browse files
Feature/100 implement cosmos based transferprocessstore (#102)
* added cosmos-based transfer process store * added barebones cosmos-db-based transferprocess store * fixed GH actions config error * fixed missing line breaks * fixed serialization problems with TransferProcesses * made DataEntry non-generic * fixed i-tests * added more tests for the CosmosDbStore * re-enabled terraform plan * re-enabled terraform plan * really reenabled... * disabled wronly enabled test * replaced nextForState with a transfaction-safe storedProcedure * removed explicit lock/unlock feature * only run test in CI * fixed terraform format, removed secret * same connector should be able to renew the lease * deploy the connector instance only after the storage account is ready * code cosmetics * try to run terraform after docker images are built * removed branch restriction * disabled azure vault tests * removed commented code * reenabled terraform * re-enabled taint and apply steps on pushes on main * added lease/break-lease behaviour to all modifying calls * code cleanup. added a retry policy that retries after request throttling * added unique connectorId to the lease mechanism * fixed CI switch * PR updates Co-authored-by: Paul Latzelsperger <[email protected]>
1 parent e1a887f commit 9558471

File tree

44 files changed

+1563
-126
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1563
-126
lines changed

.github/workflows/terraform.yaml

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@
4444

4545
name: 'Terraform'
4646

47+
# on:
48+
# workflow_run:
49+
# workflows: [ "Build Docker Images" ]
50+
# types: [ completed ]
4751
on:
4852
push:
4953
branches:
@@ -54,7 +58,8 @@ jobs:
5458
terraform:
5559
name: 'Terraform'
5660
runs-on: ubuntu-latest
57-
environment: production
61+
environment: dev
62+
5863
env:
5964
ARM_CLIENT_ID: ${{ secrets.AZURE_AD_CIENT_ID }}
6065
ARM_CLIENT_SECRET: ${{ secrets.AZURE_AD_CLIENT_SECRET }}
@@ -86,19 +91,18 @@ jobs:
8691
id: validate
8792
run: terraform -chdir=scripts validate -no-color
8893

89-
# deactivated until the GithubActions-User has correct credentials
90-
# - name: Terraform Plan
91-
# id: plan
92-
# if: github.event_name == 'pull_request'
93-
# run: terraform -chdir=scripts plan -no-color -var "resourcesuffix=dev" -var "backend_account_key=${{ secrets.TF_BACKEND_KEY }}"
94+
- name: Terraform Plan
95+
id: plan
96+
run: terraform -chdir=scripts plan -no-color -var "resourcesuffix=dev" -var "backend_account_key=${{ secrets.TF_BACKEND_KEY }}"
9497

9598

99+
- name: Taint the connector instance
100+
id: taint
101+
if: github.ref == 'refs/heads/main' && github.event_name == 'push'
102+
run: terraform -chdir=scripts taint azurerm_container_group.connector-instance
103+
continue-on-error: true
96104

97-
# - name: Taint the connector instance
98-
# if: github.ref == 'refs/heads/main' && github.event_name == 'push'
99-
# run: terraform -chdir=scripts taint azurerm_container_group.connector-instance
100-
# continue-on-error: true
101-
#
102-
# - name: Terraform Apply
103-
# if: github.ref == 'refs/heads/main' && github.event_name == 'push'
104-
# run: terraform -chdir=scripts apply -var "resourcesuffix=dev" -var "backend_account_key=${{ secrets.TF_BACKEND_KEY }}" -auto-approve
105+
- name: Terraform Apply
106+
id: apply
107+
if: github.ref == 'refs/heads/main' && github.event_name == 'push'
108+
run: terraform -chdir=scripts apply -var "resourcesuffix=dev" -var "backend_account_key=${{ secrets.TF_BACKEND_KEY }}" -auto-approve

.github/workflows/verify.yaml

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ jobs:
2222
NIFI_API_AUTH: ${{ secrets.NIFI_API_AUTH }}
2323
S3_SECRET_ACCESS_KEY: ${{ secrets.S3_SECRET_ACCESS_KEY }}
2424
S3_ACCESS_KEY_ID: ${{ secrets.S3_ACCESS_KEY_ID }}
25-
run: ./gradlew clean check -x :extensions:catalog:catalog-atlas:check -x :extensions:security:security-azure:check -x :extensions:transfer:transfer-nifi:check -x :external:nifi:processors:check
25+
run: ./gradlew clean check -x :extensions:catalog:catalog-atlas:check -x :extensions:security:security-azure:check -x :extensions:transfer:transfer-nifi:check -x :external:nifi:processors:check -x extensions:transfer:transfer-store-cosmos:check
2626

2727
- name: Publish Unit Test Results
2828
id: publish-results
@@ -89,7 +89,14 @@ jobs:
8989
run: ./gradlew external:nifi:processors:check
9090

9191
- name: Test Azure Vault Integration
92+
if: ${{ false }} # disabled for now, because it's VERY slow
9293
id: azure-vault-tests
9394
env:
9495
AZ_STORAGE_SAS: ${{ secrets.AZ_STORAGE_SAS }}
95-
run: ./gradlew extensions:security:security-azure:check
96+
run: ./gradlew extensions:security:security-azure:check
97+
98+
- name: Test Cosmos-based TransferProcessStore
99+
id: cosmos-transferprocessstore-test
100+
env:
101+
COSMOS_KEY: ${{ secrets.COSMOS_DB_MASTERKEY }}
102+
run: ./gradlew extensions:transfer:transfer-store-cosmos:check

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,5 @@ distributions/azure/azure.properties
4848
**/secrets
4949
kubeconfig
5050
**/dagx-config.properties
51+
52+
scripts/terraform.tfvars

distributions/demo-e2e/Dockerfile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,6 @@ ENTRYPOINT java \
1515
-Ddagx.atlas.url=${ATLAS_URL} \
1616
-Ddagx.nifi.url=${NIFI_URL} \
1717
-Ddagx.nifi.flow.url=${NIFI_FLOW_URL} \
18+
-Ddagx.cosmos.account.name=${COSMOS_ACCOUNT} \
19+
-Ddagx.cosmos.database.name=${COSMOS_DB} \
1820
-Djava.security.edg=file:/dev/.urandom -jar dagx-demo-e2e.jar

distributions/demo-e2e/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ dependencies {
1717
implementation(project(":extensions:control-http"))
1818

1919
implementation(project(":extensions:transfer:transfer-core"))
20-
implementation(project(":extensions:transfer:transfer-store-memory"))
20+
implementation(project(":extensions:transfer:transfer-store-cosmos"))
2121
implementation(project(":extensions:transfer:transfer-provision-aws"))
2222
implementation(project(":extensions:transfer:transfer-provision-azure"))
2323
implementation(project(":extensions:transfer:transfer-nifi"))

extensions/catalog/catalog-atlas/src/main/java/com/microsoft/dagx/catalog/atlas/metadata/AtlasDataCatalogEntry.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,18 @@
55

66
package com.microsoft.dagx.catalog.atlas.metadata;
77

8+
import com.fasterxml.jackson.annotation.JsonProperty;
9+
import com.fasterxml.jackson.annotation.JsonTypeName;
810
import com.microsoft.dagx.spi.types.domain.metadata.DataCatalogEntry;
911
import com.microsoft.dagx.spi.types.domain.transfer.DataAddress;
1012

13+
@JsonTypeName("dagx:atlascatalogentry")
1114
public class AtlasDataCatalogEntry implements DataCatalogEntry {
1215

16+
@JsonProperty
1317
private final DataAddress address;
1418

15-
public AtlasDataCatalogEntry(DataAddress address) {
19+
public AtlasDataCatalogEntry(@JsonProperty("address") DataAddress address) {
1620

1721
this.address = address;
1822
}

extensions/catalog/catalog-atlas/src/main/java/com/microsoft/dagx/catalog/atlas/metadata/AtlasExtension.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public void initialize(ServiceExtensionContext context) {
4949
context.registerService(AtlasApi.class, api);
5050
context.registerService(MetadataStore.class, new AtlasMetadataStore(api, context.getMonitor(), context.getService(SchemaRegistry.class)));
5151
context.getMonitor().info("Initialized Atlas API extension.");
52+
53+
context.getTypeManager().registerTypes(AtlasDataCatalogEntry.class);
5254
}
5355

5456

extensions/catalog/catalog-atlas/src/main/java/com/microsoft/dagx/catalog/atlas/metadata/AtlasMetadataStore.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public AtlasMetadataStore(AtlasApi atlasApi, Monitor monitor, SchemaRegistry sch
4545
}
4646

4747
@Override
48-
public @Nullable DataEntry<?> findForId(String id) {
48+
public @Nullable DataEntry findForId(String id) {
4949

5050
var properties = atlasApi.getEntityById(id);
5151

@@ -100,12 +100,12 @@ private String getPolicyIdForEntity(AtlasEntity entry) {
100100
}
101101

102102
@Override
103-
public void save(DataEntry<?> entry) {
103+
public void save(DataEntry entry) {
104104
monitor.severe("Save not yet implemented");
105105
}
106106

107107
@Override
108-
public @NotNull Collection<DataEntry<?>> queryAll(Collection<Policy> policies) {
108+
public @NotNull Collection<DataEntry> queryAll(Collection<Policy> policies) {
109109
if (policies.isEmpty()) {
110110
return Collections.emptyList();
111111
}

extensions/catalog/catalog-atlas/src/test/java/com/microsoft/dagx/catalog/atlas/metadata/AtlasMetadataStoreTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ void queryAll() {
174174
replay(atlasApiMock);
175175
replay(monitorMock);
176176

177-
final Collection<DataEntry<?>> entries = atlasMetadataStore.queryAll(Collections.singleton(policy));
177+
final Collection<DataEntry> entries = atlasMetadataStore.queryAll(Collections.singleton(policy));
178178
assertThat(entries).isNotNull().isNotEmpty().doesNotContainNull();
179179

180180
assertThat(entries).allSatisfy(this::assertAzureEntry);
@@ -232,7 +232,7 @@ private AtlasEntity createS3Entity(String name) {
232232
"region", "neverland"));
233233
}
234234

235-
private void assertAzureEntry(DataEntry<?> entry) {
235+
private void assertAzureEntry(DataEntry entry) {
236236
assertThat(entry.getCatalogEntry().getAddress().getProperties()).isNotNull()
237237
.hasFieldOrPropertyWithValue("keyName", KEY_NAME)
238238
.hasFieldOrPropertyWithValue("type", AzureBlobStoreSchema.TYPE)

extensions/demo/demo-nifi/src/main/java/com/microsoft/dagx/demo/nifi/NifiDemoServiceExtension.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import com.microsoft.dagx.spi.policy.PolicyRegistry;
1313
import com.microsoft.dagx.spi.system.ServiceExtension;
1414
import com.microsoft.dagx.spi.system.ServiceExtensionContext;
15-
import com.microsoft.dagx.spi.types.domain.metadata.DataCatalogEntry;
1615
import com.microsoft.dagx.spi.types.domain.metadata.DataEntry;
1716
import com.microsoft.dagx.spi.types.domain.metadata.GenericDataCatalogEntry;
1817

@@ -60,10 +59,10 @@ private void saveDataEntries() {
6059
.property("type", AzureBlobStoreSchema.TYPE)
6160
.build();
6261

63-
DataEntry<DataCatalogEntry> entry1 = DataEntry.Builder.newInstance().id("test123").policyId(USE_EU_POLICY).catalogEntry(sourceFileCatalog).build();
62+
DataEntry entry1 = DataEntry.Builder.newInstance().id("test123").policyId(USE_EU_POLICY).catalogEntry(sourceFileCatalog).build();
6463
metadataStore.save(entry1);
6564

66-
DataEntry<DataCatalogEntry> entry2 = DataEntry.Builder.newInstance().id("test456").policyId(USE_US_OR_EU_POLICY).catalogEntry(sourceFileCatalog).build();
65+
DataEntry entry2 = DataEntry.Builder.newInstance().id("test456").policyId(USE_US_OR_EU_POLICY).catalogEntry(sourceFileCatalog).build();
6766
metadataStore.save(entry2);
6867
}
6968

0 commit comments

Comments
 (0)