Skip to content

Commit 7d2c480

Browse files
harshachsiddhant1Sriharsha Chintalapani
authored
Add validations for input/output ports (#25601)
* Add validations for input/output ports * Add validations for input/output ports * Fix tests * addresss comments --------- Co-authored-by: Sid <30566406+siddhant1@users.noreply.github.com> Co-authored-by: Sriharsha Chintalapani <harsha.ch@gmail.com>
1 parent 4cbd287 commit 7d2c480

File tree

9 files changed

+512
-25
lines changed

9 files changed

+512
-25
lines changed

openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/DataProductDomainMigrationIT.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,18 @@ void testDataProductDomainMigrationWithInputOutputPorts(TestNamespace ns) throws
169169
Table inputTable = createTestTableInDomain(ns, "input_port_table", sourceDomain);
170170
Table outputTable = createTestTableInDomain(ns, "output_port_table", sourceDomain);
171171

172-
// Add input port
172+
// Output port assets must first be added as HAS assets (data product assets)
173+
// before they can be designated as output ports
174+
List<EntityReference> outputAssetRefs =
175+
List.of(
176+
new EntityReference()
177+
.withId(outputTable.getId())
178+
.withType("table")
179+
.withFullyQualifiedName(outputTable.getFullyQualifiedName()));
180+
BulkAssets outputAssetRequest = new BulkAssets().withAssets(outputAssetRefs);
181+
client.dataProducts().bulkAddAssets(dataProduct.getFullyQualifiedName(), outputAssetRequest);
182+
183+
// Add input port (no HAS requirement - input ports can come from any domain)
173184
List<EntityReference> inputPorts =
174185
List.of(
175186
new EntityReference()
@@ -179,7 +190,7 @@ void testDataProductDomainMigrationWithInputOutputPorts(TestNamespace ns) throws
179190
BulkAssets inputRequest = new BulkAssets().withAssets(inputPorts);
180191
client.dataProducts().bulkAddInputPorts(dataProduct.getFullyQualifiedName(), inputRequest);
181192

182-
// Add output port
193+
// Add output port (requires HAS relationship first)
183194
List<EntityReference> outputPorts =
184195
List.of(
185196
new EntityReference()

openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/DataProductResourceIT.java

Lines changed: 306 additions & 6 deletions
Large diffs are not rendered by default.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,29 @@
11
package org.openmetadata.sdk.exceptions;
22

33
public class InvalidRequestException extends OpenMetadataException {
4+
private final String responseBody;
5+
46
public InvalidRequestException(String message) {
57
super(message, 400);
8+
this.responseBody = null;
69
}
710

811
public InvalidRequestException(String message, Throwable cause) {
912
super(message, cause, 400, null);
13+
this.responseBody = null;
1014
}
1115

1216
public InvalidRequestException(String message, String errorCode) {
1317
super(message, 400, errorCode);
18+
this.responseBody = null;
19+
}
20+
21+
public InvalidRequestException(String message, String errorCode, String responseBody) {
22+
super(message, 400, errorCode);
23+
this.responseBody = responseBody;
24+
}
25+
26+
public String getResponseBody() {
27+
return responseBody;
1428
}
1529
}

openmetadata-sdk/src/main/java/org/openmetadata/sdk/network/OpenMetadataHttpClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ private void handleErrorResponse(Response response) throws OpenMetadataException
323323
// Throw appropriate exception based on status code
324324
switch (statusCode) {
325325
case 400:
326-
throw new InvalidRequestException(errorMessage);
326+
throw new InvalidRequestException(errorMessage, (String) null, responseBodyString);
327327
case 401:
328328
throw new AuthenticationException(errorMessage);
329329
case 409:

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1802,6 +1802,16 @@ List<UUID> findToIds(
18021802
@Bind("relation") int relation,
18031803
@Bind("toEntity") String toEntity);
18041804

1805+
@SqlQuery(
1806+
"SELECT COUNT(*) FROM entity_relationship "
1807+
+ "WHERE fromId = :fromId AND toId = :toId AND fromEntity = :fromEntity AND toEntity = :toEntity AND relation = :relation")
1808+
int existsRelationship(
1809+
@BindUUID("fromId") UUID fromId,
1810+
@BindUUID("toId") UUID toId,
1811+
@Bind("fromEntity") String fromEntity,
1812+
@Bind("toEntity") String toEntity,
1813+
@Bind("relation") int relation);
1814+
18051815
@SqlQuery(
18061816
"SELECT fromId, COUNT(toId) FROM entity_relationship "
18071817
+ "WHERE fromId IN (<fromIds>) AND fromEntity = :fromEntity AND relation = :relation AND toEntity = :toEntity "

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataProductRepository.java

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,13 @@
3535
import java.util.ArrayList;
3636
import java.util.Collections;
3737
import java.util.HashMap;
38+
import java.util.HashSet;
3839
import java.util.LinkedHashMap;
3940
import java.util.List;
4041
import java.util.Map;
42+
import java.util.Set;
4143
import java.util.UUID;
44+
import java.util.stream.Collectors;
4245
import lombok.extern.slf4j.Slf4j;
4346
import org.jdbi.v3.sqlobject.transaction.Transaction;
4447
import org.openmetadata.schema.EntityInterface;
@@ -287,16 +290,68 @@ private BulkOperationResult executeBulkPortsOperation(
287290
BulkOperationResult result =
288291
new BulkOperationResult().withStatus(ApiStatus.SUCCESS).withDryRun(false);
289292
List<BulkResponse> success = new ArrayList<>();
293+
List<BulkResponse> failed = new ArrayList<>();
290294

291295
List<EntityReference> assets = new ArrayList<>(listOrEmpty(request.getAssets()));
292296
EntityUtil.populateEntityReferences(assets);
293297

294298
String fieldName = relationship == Relationship.INPUT_PORT ? "inputPorts" : "outputPorts";
295299

300+
Relationship oppositeRelationship =
301+
relationship == Relationship.INPUT_PORT
302+
? Relationship.OUTPUT_PORT
303+
: Relationship.INPUT_PORT;
304+
305+
Set<UUID> oppositePortIds = Set.of();
306+
Set<UUID> dataProductAssetIds = Set.of();
307+
if (isAdd) {
308+
oppositePortIds =
309+
daoCollection
310+
.relationshipDAO()
311+
.findTo(dataProduct.getId(), DATA_PRODUCT, oppositeRelationship.ordinal())
312+
.stream()
313+
.map(CollectionDAO.EntityRelationshipRecord::getId)
314+
.collect(Collectors.toCollection(HashSet::new));
315+
if (relationship == Relationship.OUTPUT_PORT) {
316+
dataProductAssetIds =
317+
daoCollection
318+
.relationshipDAO()
319+
.findTo(dataProduct.getId(), DATA_PRODUCT, Relationship.HAS.ordinal())
320+
.stream()
321+
.map(CollectionDAO.EntityRelationshipRecord::getId)
322+
.collect(Collectors.toCollection(HashSet::new));
323+
}
324+
}
325+
296326
for (EntityReference ref : assets) {
297327
result.setNumberOfRowsProcessed(result.getNumberOfRowsProcessed() + 1);
298328

299329
if (isAdd) {
330+
if (oppositePortIds.contains(ref.getId())) {
331+
String oppositePortType =
332+
oppositeRelationship == Relationship.INPUT_PORT ? "input" : "output";
333+
String msg =
334+
String.format(
335+
"Asset '%s' is already part of %s ports and cannot be added to %s",
336+
ref.getFullyQualifiedName(), oppositePortType, fieldName);
337+
failed.add(new BulkResponse().withRequest(ref).withMessage(msg));
338+
result.setNumberOfRowsFailed(result.getNumberOfRowsFailed() + 1);
339+
result.setStatus(ApiStatus.PARTIAL_SUCCESS);
340+
continue;
341+
}
342+
343+
if (relationship == Relationship.OUTPUT_PORT
344+
&& !dataProductAssetIds.contains(ref.getId())) {
345+
String msg =
346+
String.format(
347+
"Asset '%s' must belong to the data product before it can be added as an output port",
348+
ref.getFullyQualifiedName());
349+
failed.add(new BulkResponse().withRequest(ref).withMessage(msg));
350+
result.setNumberOfRowsFailed(result.getNumberOfRowsFailed() + 1);
351+
result.setStatus(ApiStatus.PARTIAL_SUCCESS);
352+
continue;
353+
}
354+
300355
addRelationship(
301356
dataProduct.getId(), ref.getId(), DATA_PRODUCT, ref.getType(), relationship);
302357
} else {
@@ -311,11 +366,17 @@ private BulkOperationResult executeBulkPortsOperation(
311366
.onEntityUpdated(dataProduct.getEntityReference(), null);
312367
}
313368

314-
result.withSuccessRequest(success);
369+
result.withSuccessRequest(success).withFailedRequest(failed);
315370

316-
if (result.getStatus().equals(ApiStatus.SUCCESS)) {
371+
if (success.isEmpty() && !failed.isEmpty()) {
372+
result.setStatus(ApiStatus.FAILURE);
373+
}
374+
375+
if (!success.isEmpty()) {
376+
List<EntityReference> successAssets =
377+
success.stream().map(r -> (EntityReference) r.getRequest()).collect(Collectors.toList());
317378
ChangeDescription change =
318-
addBulkAddRemoveChangeDescription(dataProduct.getVersion(), isAdd, assets, null);
379+
addBulkAddRemoveChangeDescription(dataProduct.getVersion(), isAdd, successAssets, null);
319380
if (!change.getFieldsAdded().isEmpty()) {
320381
change.getFieldsAdded().get(0).setName(fieldName);
321382
}

0 commit comments

Comments
 (0)