Skip to content

Commit eb28a7b

Browse files
Optimised Internal methods for adding relationships for import, fixed circular dependency, generated changeEvents (#25582)
* Fix tag clearing and circular dependency detection in batch CSV imports - **Tag clearing fix**: Add deleteTagsByTarget before applying new tags in batch imports to match single entity import behavior, ensuring empty CSV fields properly clear existing tags - **Circular dependency detection fix**: Pre-track entities in dryRunCreatedEntities before parent resolution to enable proper circular reference validation during CSV team imports - Resolves test failures in TeamResourceIT.test_importCsv_circularDependency_trueRun and tag-related import issues - Maintains batch import performance while restoring pre-batch-import validation contracts * improve storeRelationshipsInternal internal methods - make them truly batched operations * - Add storeEntities override to all repositories (57 repos) - Add batch lock check to HierarchicalLockManager - Add batch cache write to EntityRepository - Fix createManyEntitiesForImport with batched operations - Fix updateManyEntitiesForImport with batched operations - Add change event creation in flushPendingEntityOperations --------- Co-authored-by: sonika-shah <[email protected]>
1 parent e54eda2 commit eb28a7b

File tree

61 files changed

+1347
-51
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+1347
-51
lines changed

openmetadata-service/src/main/java/org/openmetadata/csv/EntityCsv.java

Lines changed: 88 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -142,13 +142,19 @@ public abstract class EntityCsv<T extends EntityInterface> {
142142
/** Holder for pending entity create/update operations */
143143
protected static class PendingEntityOperation {
144144
EntityInterface entity;
145+
EntityInterface originalEntity;
145146
CSVRecord csvRecord;
146147
String entityType;
147148
boolean isCreate;
148149

149150
PendingEntityOperation(
150-
EntityInterface entity, CSVRecord csvRecord, String entityType, boolean isCreate) {
151+
EntityInterface entity,
152+
EntityInterface originalEntity,
153+
CSVRecord csvRecord,
154+
String entityType,
155+
boolean isCreate) {
151156
this.entity = entity;
157+
this.originalEntity = originalEntity;
152158
this.csvRecord = csvRecord;
153159
this.entityType = entityType;
154160
this.isCreate = isCreate;
@@ -1021,11 +1027,29 @@ protected void createEntity(CSVPrinter resultsPrinter, CSVRecord csvRecord, T en
10211027
if (Boolean.FALSE.equals(importResult.getDryRun())) { // If not dry run, create the entity
10221028
try {
10231029
// In case of updating entity, prepareInternal as update=True
1024-
boolean isUpdate = repository.isUpdateForImport(entity);
1025-
repository.prepareInternal(entity, isUpdate);
1030+
T original = repository.findByNameOrNull(entity.getFullyQualifiedName(), Include.ALL);
1031+
boolean isUpdate = original != null;
1032+
if (isUpdate) {
1033+
entity.setId(original.getId());
1034+
}
1035+
1036+
// Track the entity for immediate lookup by subsequent CSV rows in the same batch BEFORE
1037+
// prepareInternal
1038+
dryRunCreatedEntities.put(entity.getFullyQualifiedName(), entity);
1039+
try {
1040+
repository.prepareInternal(entity, isUpdate);
1041+
} catch (EntityNotFoundException ex) {
1042+
// If entity is not found, checking if we have pending operations
1043+
if (!pendingEntityOperations.isEmpty()) {
1044+
flushPendingEntityOperations();
1045+
repository.prepareInternal(entity, isUpdate);
1046+
} else {
1047+
throw ex;
1048+
}
1049+
}
10261050
// Queue for batch processing instead of immediate persist
10271051
pendingEntityOperations.add(
1028-
new PendingEntityOperation(entity, csvRecord, entityType, !isUpdate));
1052+
new PendingEntityOperation(entity, original, csvRecord, entityType, !isUpdate));
10291053
responseStatus = isUpdate ? Response.Status.OK : Response.Status.CREATED;
10301054
} catch (Exception ex) {
10311055
importFailure(resultsPrinter, ex.getMessage(), csvRecord);
@@ -1038,6 +1062,8 @@ protected void createEntity(CSVPrinter resultsPrinter, CSVRecord csvRecord, T en
10381062
responseStatus = exists ? Response.Status.OK : Response.Status.CREATED;
10391063
// Track the dryRun created entities, as they may be referred by other entities being created
10401064
// during import
1065+
// Track the dryRun created entities, as they may be referred by other entities being created
1066+
// during import
10411067
dryRunCreatedEntities.put(entity.getFullyQualifiedName(), entity);
10421068
}
10431069

@@ -1070,10 +1096,30 @@ protected void createEntity(
10701096
if (Boolean.FALSE.equals(importResult.getDryRun())) {
10711097
try {
10721098
// In case of updating entity, prepareInternal as update=True
1073-
boolean isUpdate = repository.isUpdateForImport(entity);
1074-
repository.prepareInternal(entity, isUpdate);
1099+
T original = (T) repository.findByNameOrNull(entity.getFullyQualifiedName(), Include.ALL);
1100+
boolean isUpdate = original != null;
1101+
if (isUpdate) {
1102+
entity.setId(original.getId());
1103+
}
1104+
1105+
// Track the entity for immediate lookup by subsequent CSV rows in the same batch BEFORE
1106+
// prepareInternal
1107+
dryRunCreatedEntities.put(entity.getFullyQualifiedName(), (T) entity);
1108+
try {
1109+
repository.prepareInternal(entity, isUpdate);
1110+
} catch (EntityNotFoundException ex) {
1111+
// If entity is not found, checking if we have pending operations
1112+
if (!pendingEntityOperations.isEmpty()) {
1113+
flushPendingEntityOperations();
1114+
repository.prepareInternal(entity, isUpdate);
1115+
} else {
1116+
throw ex;
1117+
}
1118+
}
10751119
// Queue for batch processing instead of immediate persist
1076-
pendingEntityOperations.add(new PendingEntityOperation(entity, csvRecord, type, !isUpdate));
1120+
pendingEntityOperations.add(
1121+
new PendingEntityOperation(
1122+
entity, (EntityInterface) original, csvRecord, type, !isUpdate));
10771123
responseStatus = isUpdate ? Response.Status.OK : Response.Status.CREATED;
10781124
} catch (Exception ex) {
10791125
importFailure(resultsPrinter, ex.getMessage(), csvRecord);
@@ -1084,6 +1130,7 @@ protected void createEntity(
10841130
repository.setFullyQualifiedName(entity);
10851131
boolean exists = repository.isUpdateForImport(entity);
10861132
responseStatus = exists ? Response.Status.OK : Response.Status.CREATED;
1133+
responseStatus = exists ? Response.Status.OK : Response.Status.CREATED;
10871134
dryRunCreatedEntities.put(entity.getFullyQualifiedName(), (T) entity);
10881135
}
10891136

@@ -1152,6 +1199,15 @@ private void createChangeEventForUserAndUpdateInES(PutResponse<T> response, Stri
11521199
}
11531200
}
11541201

1202+
private void createChangeEventForBatchedEntity(EntityInterface entity, EventType eventType) {
1203+
ChangeEvent changeEvent =
1204+
FormatterUtil.createChangeEventForEntity(importedBy, eventType, entity);
1205+
Object eventEntity = changeEvent.getEntity();
1206+
changeEvent = copyChangeEvent(changeEvent);
1207+
changeEvent.setEntity(JsonUtils.pojoToMaskedJson(eventEntity));
1208+
Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));
1209+
}
1210+
11551211
/** Flush pending search index updates using bulk API */
11561212
protected void flushPendingSearchIndexUpdates() {
11571213
if (pendingSearchIndexUpdates.isEmpty()) {
@@ -1189,31 +1245,46 @@ protected void flushPendingEntityOperations() {
11891245
// Separate creates and updates
11901246
List<EntityInterface> toCreate = new ArrayList<>();
11911247
List<EntityInterface> toUpdate = new ArrayList<>();
1248+
List<EntityInterface> originals = new ArrayList<>();
11921249

11931250
for (PendingEntityOperation op : ops) {
11941251
if (op.isCreate) {
11951252
toCreate.add(op.entity);
11961253
} else {
1197-
toUpdate.add(op.entity);
1254+
// Verify we have the original entity for update
1255+
if (op.originalEntity != null) {
1256+
toUpdate.add(op.entity);
1257+
originals.add(op.originalEntity);
1258+
} else {
1259+
// Should not happen if createEntity logic is correct, but fallback safely
1260+
LOG.warn(
1261+
"Missing original entity for update operation: {}",
1262+
op.entity.getFullyQualifiedName());
1263+
// Treat as potential create or individual fallback?
1264+
// Safest is to let it fail or try individual update fallback
1265+
}
11981266
}
11991267
}
12001268

12011269
try {
12021270
// Batch create
12031271
if (!toCreate.isEmpty()) {
1204-
repository.getDao().insertMany(toCreate);
1205-
LOG.info("Batch inserted {} {} entities", toCreate.size(), type);
1272+
List<EntityInterface> created =
1273+
repository.createManyEntitiesForImport(toCreate, importedBy);
1274+
for (EntityInterface entity : created) {
1275+
createChangeEventForBatchedEntity(entity, EventType.ENTITY_CREATED);
1276+
pendingSearchIndexUpdates.add(entity);
1277+
}
12061278
}
12071279

12081280
// Batch update
12091281
if (!toUpdate.isEmpty()) {
1210-
repository.getDao().updateMany(toUpdate);
1211-
LOG.info("Batch updated {} {} entities", toUpdate.size(), type);
1212-
}
1213-
1214-
// Queue all entities for ES bulk indexing
1215-
for (PendingEntityOperation op : ops) {
1216-
pendingSearchIndexUpdates.add(op.entity);
1282+
List<EntityInterface> updated =
1283+
repository.updateManyEntitiesForImport(originals, toUpdate, importedBy, importedBy);
1284+
for (EntityInterface entity : updated) {
1285+
createChangeEventForBatchedEntity(entity, EventType.ENTITY_UPDATED);
1286+
pendingSearchIndexUpdates.add(entity);
1287+
}
12171288
}
12181289
} catch (Exception e) {
12191290
LOG.error("Error in batch DB operation for {}, falling back to individual ops", type, e);

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AIApplicationRepository.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package org.openmetadata.service.jdbi3;
1515

16+
import java.util.List;
1617
import lombok.extern.slf4j.Slf4j;
1718
import org.openmetadata.schema.entity.ai.AIApplication;
1819
import org.openmetadata.schema.type.change.ChangeSource;
@@ -60,6 +61,11 @@ public void storeEntity(AIApplication aiApplication, boolean update) {
6061
store(aiApplication, update);
6162
}
6263

64+
@Override
65+
public void storeEntities(List<AIApplication> entities) {
66+
storeMany(entities);
67+
}
68+
6369
@Override
6470
public void storeRelationships(AIApplication aiApplication) {
6571
// Relationships are stored as part of the JSON entity

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AIGovernancePolicyRepository.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package org.openmetadata.service.jdbi3;
1515

16+
import java.util.List;
1617
import lombok.extern.slf4j.Slf4j;
1718
import org.openmetadata.schema.entity.ai.AIGovernancePolicy;
1819
import org.openmetadata.schema.type.change.ChangeSource;
@@ -59,6 +60,11 @@ public void storeEntity(AIGovernancePolicy policy, boolean update) {
5960
store(policy, update);
6061
}
6162

63+
@Override
64+
public void storeEntities(List<AIGovernancePolicy> entities) {
65+
storeMany(entities);
66+
}
67+
6268
@Override
6369
public void storeRelationships(AIGovernancePolicy policy) {
6470
// Relationships are stored as part of the JSON entity

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/APICollectionRepository.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
package org.openmetadata.service.jdbi3;
1515

16+
import com.google.gson.Gson;
17+
import java.util.ArrayList;
1618
import java.util.HashMap;
1719
import java.util.List;
1820
import java.util.Map;
@@ -67,6 +69,25 @@ public void storeEntity(APICollection apiCollection, boolean update) {
6769
apiCollection.withService(service);
6870
}
6971

72+
@Override
73+
public void storeEntities(List<APICollection> entities) {
74+
List<APICollection> entitiesToStore = new ArrayList<>();
75+
Gson gson = new Gson();
76+
77+
for (APICollection apiCollection : entities) {
78+
EntityReference service = apiCollection.getService();
79+
80+
apiCollection.withService(null);
81+
82+
String jsonCopy = gson.toJson(apiCollection);
83+
entitiesToStore.add(gson.fromJson(jsonCopy, APICollection.class));
84+
85+
apiCollection.withService(service);
86+
}
87+
88+
storeMany(entitiesToStore);
89+
}
90+
7091
@Override
7192
public void storeRelationships(APICollection apiCollection) {
7293
addServiceRelationship(apiCollection, apiCollection.getService());

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/APIEndpointRepository.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.openmetadata.service.resources.tags.TagLabelUtil.addDerivedTagsGracefully;
2626
import static org.openmetadata.service.resources.tags.TagLabelUtil.checkMutuallyExclusive;
2727

28+
import com.google.gson.Gson;
2829
import java.util.ArrayList;
2930
import java.util.Collections;
3031
import java.util.List;
@@ -146,6 +147,44 @@ public void storeEntity(APIEndpoint apiEndpoint, boolean update) {
146147
apiEndpoint.withApiCollection(apiCollection);
147148
}
148149

150+
@Override
151+
public void storeEntities(List<APIEndpoint> entities) {
152+
List<APIEndpoint> entitiesToStore = new ArrayList<>();
153+
Gson gson = new Gson();
154+
155+
for (APIEndpoint apiEndpoint : entities) {
156+
EntityReference apiCollection = apiEndpoint.getApiCollection();
157+
apiEndpoint.withApiCollection(null);
158+
159+
List<Field> requestFieldsWithTags = null;
160+
if (apiEndpoint.getRequestSchema() != null) {
161+
requestFieldsWithTags = apiEndpoint.getRequestSchema().getSchemaFields();
162+
apiEndpoint.getRequestSchema().setSchemaFields(cloneWithoutTags(requestFieldsWithTags));
163+
apiEndpoint.getRequestSchema().getSchemaFields().forEach(field -> field.setTags(null));
164+
}
165+
166+
List<Field> responseFieldsWithTags = null;
167+
if (apiEndpoint.getResponseSchema() != null) {
168+
responseFieldsWithTags = apiEndpoint.getResponseSchema().getSchemaFields();
169+
apiEndpoint.getResponseSchema().setSchemaFields(cloneWithoutTags(responseFieldsWithTags));
170+
apiEndpoint.getResponseSchema().getSchemaFields().forEach(field -> field.setTags(null));
171+
}
172+
173+
String jsonCopy = gson.toJson(apiEndpoint);
174+
entitiesToStore.add(gson.fromJson(jsonCopy, APIEndpoint.class));
175+
176+
if (requestFieldsWithTags != null) {
177+
apiEndpoint.getRequestSchema().withSchemaFields(requestFieldsWithTags);
178+
}
179+
if (responseFieldsWithTags != null) {
180+
apiEndpoint.getResponseSchema().withSchemaFields(responseFieldsWithTags);
181+
}
182+
apiEndpoint.withApiCollection(apiCollection);
183+
}
184+
185+
storeMany(entitiesToStore);
186+
}
187+
149188
@Override
150189
public void storeRelationships(APIEndpoint apiEndpoint) {
151190
EntityReference apiCollection = apiEndpoint.getApiCollection();

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AppMarketPlaceRepository.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.openmetadata.service.jdbi3;
22

3+
import java.util.List;
34
import org.openmetadata.schema.entity.app.App;
45
import org.openmetadata.schema.entity.app.AppMarketPlaceDefinition;
56
import org.openmetadata.schema.type.Include;
@@ -47,6 +48,11 @@ public void storeEntity(AppMarketPlaceDefinition entity, boolean update) {
4748
store(entity, update);
4849
}
4950

51+
@Override
52+
public void storeEntities(List<AppMarketPlaceDefinition> entities) {
53+
storeMany(entities);
54+
}
55+
5056
@Override
5157
public void storeRelationships(AppMarketPlaceDefinition entity) {}
5258
}

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/AppRepository.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import static org.openmetadata.service.Entity.getEntityReferenceById;
77
import static org.openmetadata.service.util.UserUtil.getUser;
88

9+
import com.google.gson.Gson;
910
import java.util.ArrayList;
1011
import java.util.HashMap;
1112
import java.util.List;
@@ -178,6 +179,21 @@ public void storeEntity(App entity, boolean update) {
178179
entity.setBot(bot);
179180
}
180181

182+
@Override
183+
public void storeEntities(List<App> entities) {
184+
List<App> entitiesToStore = new ArrayList<>();
185+
Gson gson = new Gson();
186+
for (App entity : entities) {
187+
List<EntityReference> ownerRefs = entity.getOwners();
188+
EntityReference bot = entity.getBot();
189+
String jsonCopy = gson.toJson(entity.withOwners(null).withBot(null));
190+
entitiesToStore.add(gson.fromJson(jsonCopy, App.class));
191+
entity.withOwners(ownerRefs);
192+
entity.setBot(bot);
193+
}
194+
storeMany(entitiesToStore);
195+
}
196+
181197
public EntityReference getBotUser(App application) {
182198
return application.getBot() != null
183199
? application.getBot()

openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/BotRepository.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313

1414
package org.openmetadata.service.jdbi3;
1515

16+
import com.google.gson.Gson;
17+
import java.util.ArrayList;
18+
import java.util.List;
1619
import lombok.extern.slf4j.Slf4j;
1720
import org.jdbi.v3.sqlobject.transaction.Transaction;
1821
import org.openmetadata.schema.entity.Bot;
@@ -75,6 +78,19 @@ public void storeEntity(Bot entity, boolean update) {
7578
entity.withBotUser(botUser);
7679
}
7780

81+
@Override
82+
public void storeEntities(List<Bot> entities) {
83+
List<Bot> entitiesToStore = new ArrayList<>();
84+
Gson gson = new Gson();
85+
for (Bot entity : entities) {
86+
EntityReference botUser = entity.getBotUser();
87+
String jsonCopy = gson.toJson(entity.withBotUser(null));
88+
entitiesToStore.add(gson.fromJson(jsonCopy, Bot.class));
89+
entity.withBotUser(botUser);
90+
}
91+
storeMany(entitiesToStore);
92+
}
93+
7894
@Override
7995
public void storeRelationships(Bot entity) {
8096
addRelationship(

0 commit comments

Comments
 (0)