Skip to content

Commit 65aaa60

Browse files
authored
fix: Logic for naming objects at destination (#171)
* Change the logic of how the file is named on sink side * Mocking DestinationBlobName into test that are Units * CodeStyle Ok * refactoring unit tests * Fix Header and unitTests structure * fix header * Added NullSafeCapabilities * ensure use of var, using isNullOrBlank over optinal * Removing StringBuilder usage
1 parent 60cad22 commit 65aaa60

File tree

8 files changed

+217
-69
lines changed

8 files changed

+217
-69
lines changed

extensions/data-plane/data-plane-azure-storage/src/main/java/org/eclipse/edc/connector/dataplane/azure/storage/DataPlaneAzureStorageExtension.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ public void initialize(ServiceExtensionContext context) {
7070

7171
var sourceFactory = new AzureStorageDataSourceFactory(blobStoreApi, retryPolicy, monitor, vault);
7272
pipelineService.registerFactory(sourceFactory);
73-
7473
var sinkFactory = new AzureStorageDataSinkFactory(blobStoreApi, executorContainer.getExecutorService(), 5, monitor, vault, typeManager, metadataProvider);
7574
pipelineService.registerFactory(sinkFactory);
7675
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Apache License, Version 2.0 which is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*
10+
* Contributors:
11+
* Marco Pirmo (BMW AG)
12+
*
13+
*/
14+
15+
package org.eclipse.edc.connector.dataplane.azure.storage;
16+
17+
import org.eclipse.edc.util.string.StringUtils;
18+
19+
/**
20+
* Utility class responsible for determining the name under which a file will be saved.
21+
*/
22+
public class DestinationBlobName {
23+
24+
private final String folderName;
25+
private final String blobName;
26+
27+
public DestinationBlobName(String blobName, String folderName) {
28+
29+
this.blobName = blobName;
30+
this.folderName = folderName;
31+
}
32+
33+
/**
34+
* Resolves the name under which a resource should be saved based on its part name and size.
35+
*
36+
* @param partName The name of the resource part.
37+
* @param partsSize The size of the resource part.
38+
* @return A String representing the resolved name for the resource.
39+
*/
40+
public String resolve(String partName, int partsSize) {
41+
var name = "";
42+
if (!StringUtils.isNullOrBlank(folderName)) {
43+
if (folderName.endsWith("/")) {
44+
name += folderName;
45+
} else {
46+
name += folderName + "/";
47+
}
48+
}
49+
if (partsSize == 1 && !StringUtils.isNullOrBlank(blobName)) {
50+
name += blobName;
51+
} else {
52+
name += partName;
53+
}
54+
55+
return name;
56+
}
57+
58+
}
59+

extensions/data-plane/data-plane-azure-storage/src/main/java/org/eclipse/edc/connector/dataplane/azure/storage/pipeline/AzureStorageDataSink.java

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717
import com.azure.core.credential.AzureSasCredential;
1818
import org.eclipse.edc.azure.blob.adapter.BlobAdapter;
1919
import org.eclipse.edc.azure.blob.api.BlobStoreApi;
20+
import org.eclipse.edc.connector.dataplane.azure.storage.DestinationBlobName;
2021
import org.eclipse.edc.connector.dataplane.azure.storage.metadata.BlobMetadataProvider;
2122
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource;
2223
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
2324
import org.eclipse.edc.connector.dataplane.util.sink.ParallelSink;
2425
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
25-
import org.eclipse.edc.util.string.StringUtils;
2626
import org.jetbrains.annotations.NotNull;
2727

2828
import java.util.ArrayList;
@@ -40,13 +40,11 @@ public class AzureStorageDataSink extends ParallelSink {
4040
private final List<String> completedFiles = new ArrayList<>();
4141
private String accountName;
4242
private String containerName;
43-
private String folderName;
44-
private String blobName;
45-
private String blobPrefix;
4643
private String sharedAccessSignature;
4744
private BlobStoreApi blobStoreApi;
4845
private DataFlowStartMessage request;
4946
private BlobMetadataProvider metadataProvider;
47+
private DestinationBlobName destinationBlobName;
5048

5149
private AzureStorageDataSink() {
5250
}
@@ -55,22 +53,15 @@ void registerCompletedFile(String name) {
5553
completedFiles.add(name + COMPLETE_BLOB_NAME);
5654
}
5755

58-
String getDestinationBlobName(String partName) {
59-
var name = !StringUtils.isNullOrEmpty(blobName) && StringUtils.isNullOrBlank(blobPrefix) ? blobName : partName;
60-
if (!StringUtils.isNullOrEmpty(folderName)) {
61-
return folderName.endsWith("/") ? folderName + name : folderName + "/" + name;
62-
} else {
63-
return name;
64-
}
65-
}
6656

6757
/**
6858
* Writes data into an Azure storage container.
6959
*/
60+
7061
@Override
7162
protected StreamResult<Object> transferParts(List<DataSource.Part> parts) {
7263
for (DataSource.Part part : parts) {
73-
var name = getDestinationBlobName(part.name());
64+
var name = destinationBlobName.resolve(part.name(), parts.size());
7465
try (var input = part.openStream()) {
7566
try (var output = getAdapter(name).getOutputStream()) {
7667
try {
@@ -94,6 +85,7 @@ protected StreamResult<Object> transferParts(List<DataSource.Part> parts) {
9485
return StreamResult.success();
9586
}
9687

88+
9789
@Override
9890
protected StreamResult<Object> complete() {
9991
for (var completedFile : completedFiles) {
@@ -139,21 +131,6 @@ public Builder containerName(String containerName) {
139131
return this;
140132
}
141133

142-
public Builder folderName(String folderName) {
143-
sink.folderName = folderName;
144-
return this;
145-
}
146-
147-
public Builder blobName(String blobName) {
148-
sink.blobName = blobName;
149-
return this;
150-
}
151-
152-
public Builder blobPrefix(String blobPrefix) {
153-
sink.blobPrefix = blobPrefix;
154-
return this;
155-
}
156-
157134
public Builder sharedAccessSignature(String sharedAccessSignature) {
158135
sink.sharedAccessSignature = sharedAccessSignature;
159136
return this;
@@ -174,6 +151,11 @@ public Builder metadataProvider(BlobMetadataProvider metadataProvider) {
174151
return this;
175152
}
176153

154+
public Builder destinationBlobName(DestinationBlobName destinationBlobName) {
155+
sink.destinationBlobName = destinationBlobName;
156+
return this;
157+
}
158+
177159
@Override
178160
protected void validate() {
179161
Objects.requireNonNull(sink.accountName, "accountName");
@@ -185,4 +167,4 @@ protected void validate() {
185167
Objects.requireNonNull(sink.monitor, "monitor");
186168
}
187169
}
188-
}
170+
}

extensions/data-plane/data-plane-azure-storage/src/main/java/org/eclipse/edc/connector/dataplane/azure/storage/pipeline/AzureStorageDataSinkFactory.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.eclipse.edc.azure.blob.AzureBlobStoreSchema;
1818
import org.eclipse.edc.azure.blob.AzureSasToken;
1919
import org.eclipse.edc.azure.blob.api.BlobStoreApi;
20+
import org.eclipse.edc.connector.dataplane.azure.storage.DestinationBlobName;
2021
import org.eclipse.edc.connector.dataplane.azure.storage.metadata.BlobMetadataProvider;
2122
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSink;
2223
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSinkFactory;
@@ -74,18 +75,18 @@ public DataSink createSink(DataFlowStartMessage request) {
7475
}
7576

7677
var dataAddress = request.getDestinationDataAddress();
77-
var dataSourceAddress = request.getSourceDataAddress();
7878
var requestId = request.getId();
7979

8080
var secret = vault.resolveSecret(dataAddress.getKeyName());
8181
var token = typeManager.readValue(secret, AzureSasToken.class);
82+
var folderName = dataAddress.getStringProperty(AzureBlobStoreSchema.FOLDER_NAME);
83+
var blobName = dataAddress.getStringProperty(AzureBlobStoreSchema.BLOB_NAME);
84+
var destinationBlobName = new DestinationBlobName(blobName, folderName);
8285

8386
return AzureStorageDataSink.Builder.newInstance()
8487
.accountName(dataAddress.getStringProperty(ACCOUNT_NAME))
8588
.containerName(dataAddress.getStringProperty(AzureBlobStoreSchema.CONTAINER_NAME))
86-
.folderName(dataAddress.getStringProperty(AzureBlobStoreSchema.FOLDER_NAME))
87-
.blobName(dataAddress.getStringProperty(AzureBlobStoreSchema.BLOB_NAME))
88-
.blobPrefix(dataSourceAddress.getStringProperty(BLOB_PREFIX))
89+
.destinationBlobName(destinationBlobName)
8990
.sharedAccessSignature(token.getSas())
9091
.requestId(requestId)
9192
.partitionSize(partitionSize)

extensions/data-plane/data-plane-azure-storage/src/test/java/org/eclipse/edc/connector/dataplane/azure/storage/AzureDataPlaneCopyIntegrationTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,8 @@ public BlobAdapter getBlobAdapter(String accountName, String containerName, Stri
139139

140140
var metadataProvider = new BlobMetadataProviderImpl(monitor);
141141
metadataProvider.registerDecorator(new CommonBlobMetadataDecorator(typeManager, context));
142-
143142
when(context.getConnectorId()).thenReturn("connector-id");
144143
when(context.getParticipantId()).thenReturn("participant-id");
145-
146144
var dataSinkFactory = new AzureStorageDataSinkFactory(account2ApiPatched, executor, partitionSize, monitor, vault, typeManager, metadataProvider);
147145
var dataSink = dataSinkFactory.createSink(request);
148146

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Apache License, Version 2.0 which is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*
10+
* Contributors:
11+
* Marco Pirmo (BMW AG)
12+
*
13+
*/
14+
15+
package org.eclipse.edc.connector.dataplane.azure.storage;
16+
17+
import org.junit.jupiter.api.Test;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
public class DestinationBlobNameTest {
22+
23+
@Test
24+
void shouldReturnBlobName_whenFolderNameIsEmptyAndPartSizeIsEqualToOne() {
25+
26+
var partName = "partName";
27+
var blobName = "blobName";
28+
var folderName = "";
29+
var expected = "blobName";
30+
var destinationBlobName = new DestinationBlobName(blobName, folderName);
31+
assertThat(destinationBlobName.resolve(partName, 1)).isEqualTo(expected);
32+
}
33+
34+
@Test
35+
void shouldReturnPartName_whenFolderNameIsEmptyAndPartSizeIsGraterThanOne() {
36+
37+
var partName = "partName";
38+
var blobName = "blobName";
39+
var folderName = "";
40+
var expected = "partName";
41+
var destinationBlobName = new DestinationBlobName(blobName, folderName);
42+
assertThat(destinationBlobName.resolve(partName, 2)).isEqualTo(expected);
43+
}
44+
45+
@Test
46+
void shouldAppendFolderNameWithPartNameUsingSlash_whenBlobNameIsEmpty() {
47+
48+
var partName = "partName";
49+
var blobName = "";
50+
var folderName = "folderName";
51+
var expected = "folderName/partName";
52+
var destinationBlobName = new DestinationBlobName(blobName, folderName);
53+
assertThat(destinationBlobName.resolve(partName, 1)).isEqualTo(expected);
54+
}
55+
56+
57+
@Test
58+
void shouldAppendFolderNameWithPartNameUsingSlash_whenBlobNameIsEmptyAndFolderNameEndsWithSlash() {
59+
60+
var partName = "partName";
61+
var blobName = "";
62+
var folderName = "folderName/";
63+
var expected = "folderName/partName";
64+
var destinationBlobName = new DestinationBlobName(blobName, folderName);
65+
assertThat(destinationBlobName.resolve(partName, 1)).isEqualTo(expected);
66+
}
67+
68+
@Test
69+
void shouldUsePartName_whenBothBlobNameAndFolderNameIsEmpty() {
70+
71+
var partName = "partName";
72+
var blobName = "";
73+
var folderName = "";
74+
var expected = "partName";
75+
var destinationBlobName = new DestinationBlobName(blobName, folderName);
76+
assertThat(destinationBlobName.resolve(partName, 1)).isEqualTo(expected);
77+
}
78+
79+
@Test
80+
void shouldAppendFolderNameWithBlobNameUsingSlash_whenPartSizeEqualsOne() {
81+
82+
var partName = "partName";
83+
var blobName = "blobName";
84+
var folderName = "folderName";
85+
var expected = "folderName/blobName";
86+
var destinationBlobName = new DestinationBlobName(blobName, folderName);
87+
assertThat(destinationBlobName.resolve(partName, 1)).isEqualTo(expected);
88+
}
89+
90+
@Test
91+
void shouldAppendFolderNameWithBlobNameUsingSlash_whenPartSizeEqualsOneAndFolderNameEndsWithSlash() {
92+
93+
var partName = "partName";
94+
var blobName = "blobName";
95+
var folderName = "folderName/";
96+
var expected = "folderName/blobName";
97+
var destinationBlobName = new DestinationBlobName(blobName, folderName);
98+
assertThat(destinationBlobName.resolve(partName, 1)).isEqualTo(expected);
99+
}
100+
101+
@Test
102+
void shouldAppendFolderNameWithPartNameUsingSlash_whenPartSizeIsGraterThanOne() {
103+
104+
var partName = "partName";
105+
var blobName = "blobName";
106+
var folderName = "folderName";
107+
var expected = "folderName/partName";
108+
var destinationBlobName = new DestinationBlobName(blobName, folderName);
109+
assertThat(destinationBlobName.resolve(partName, 2)).isEqualTo(expected);
110+
}
111+
112+
@Test
113+
void shouldAppendFolderNameWithPartNameUsingSlash_whenPartSizeIsGraterThanOneAndFolderNameEndsWithSlash() {
114+
115+
var partName = "partName";
116+
var blobName = "blobName";
117+
var folderName = "folderName";
118+
var expected = "folderName/partName";
119+
var destinationBlobName = new DestinationBlobName(blobName, folderName);
120+
assertThat(destinationBlobName.resolve(partName, 2)).isEqualTo(expected);
121+
}
122+
123+
}

extensions/data-plane/data-plane-azure-storage/src/test/java/org/eclipse/edc/connector/dataplane/azure/storage/pipeline/AzureDataSourceToDataSinkTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919
import org.eclipse.edc.azure.blob.adapter.BlobAdapter;
2020
import org.eclipse.edc.azure.blob.api.BlobStoreApi;
2121
import org.eclipse.edc.azure.blob.testfixtures.AzureStorageTestFixtures;
22+
import org.eclipse.edc.connector.dataplane.azure.storage.DestinationBlobName;
2223
import org.eclipse.edc.connector.dataplane.azure.storage.metadata.BlobMetadataProviderImpl;
2324
import org.eclipse.edc.spi.monitor.Monitor;
2425
import org.eclipse.edc.spi.system.ServiceExtensionContext;
2526
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
27+
import org.junit.jupiter.api.BeforeEach;
2628
import org.junit.jupiter.api.Test;
2729

2830
import java.io.ByteArrayInputStream;
@@ -61,6 +63,12 @@ class AzureDataSourceToDataSinkTest {
6163
private final String sinkAccountName = AzureStorageTestFixtures.createAccountName();
6264
private final String sinkContainerName = AzureStorageTestFixtures.createContainerName();
6365
private final String sinkSharedAccessSignature = AzureStorageTestFixtures.createSharedAccessSignature();
66+
private final DestinationBlobName destinationBlobName = mock(DestinationBlobName.class);
67+
68+
@BeforeEach
69+
void setUp() {
70+
when(destinationBlobName.resolve(fakeSource.name, 1)).thenReturn(fakeSource.name);
71+
}
6472

6573
/**
6674
* Verifies a sink is able to pull data from the source without exceptions if both endpoints are functioning.
@@ -115,6 +123,7 @@ void transfer_success() {
115123
.monitor(monitor)
116124
.request(request)
117125
.metadataProvider(metadataProvider)
126+
.destinationBlobName(destinationBlobName)
118127
.build();
119128

120129
assertThat(dataSink.transfer(dataSource)).succeedsWithin(500, TimeUnit.MILLISECONDS)
@@ -174,6 +183,7 @@ void transfer_WhenSourceFails_fails() throws Exception {
174183
.blobStoreApi(fakeSinkFactory)
175184
.executorService(executor)
176185
.metadataProvider(metadataProvider)
186+
.destinationBlobName(destinationBlobName)
177187
.request(request)
178188
.monitor(monitor)
179189
.build();
@@ -229,6 +239,7 @@ void transfer_whenSinkFails_fails() throws Exception {
229239
.blobStoreApi(blobApi)
230240
.executorService(executor)
231241
.metadataProvider(metadataProvider)
242+
.destinationBlobName(destinationBlobName)
232243
.request(request)
233244
.monitor(monitor)
234245
.build();

0 commit comments

Comments
 (0)