Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,19 @@ public abstract class EntityCsv<T extends EntityInterface> {
/** Holder for pending entity create/update operations */
protected static class PendingEntityOperation {
EntityInterface entity;
EntityInterface originalEntity;
CSVRecord csvRecord;
String entityType;
boolean isCreate;

PendingEntityOperation(
EntityInterface entity, CSVRecord csvRecord, String entityType, boolean isCreate) {
EntityInterface entity,
EntityInterface originalEntity,
CSVRecord csvRecord,
String entityType,
boolean isCreate) {
this.entity = entity;
this.originalEntity = originalEntity;
this.csvRecord = csvRecord;
this.entityType = entityType;
this.isCreate = isCreate;
Expand Down Expand Up @@ -1021,11 +1027,29 @@ protected void createEntity(CSVPrinter resultsPrinter, CSVRecord csvRecord, T en
if (Boolean.FALSE.equals(importResult.getDryRun())) { // If not dry run, create the entity
try {
// In case of updating entity, prepareInternal as update=True
boolean isUpdate = repository.isUpdateForImport(entity);
repository.prepareInternal(entity, isUpdate);
T original = repository.findByNameOrNull(entity.getFullyQualifiedName(), Include.ALL);
boolean isUpdate = original != null;
if (isUpdate) {
entity.setId(original.getId());
}

// Track the entity for immediate lookup by subsequent CSV rows in the same batch BEFORE
// prepareInternal
dryRunCreatedEntities.put(entity.getFullyQualifiedName(), entity);
try {
repository.prepareInternal(entity, isUpdate);
} catch (EntityNotFoundException ex) {
// If entity is not found, checking if we have pending operations
if (!pendingEntityOperations.isEmpty()) {
flushPendingEntityOperations();
repository.prepareInternal(entity, isUpdate);
} else {
throw ex;
}
}
// Queue for batch processing instead of immediate persist
pendingEntityOperations.add(
new PendingEntityOperation(entity, csvRecord, entityType, !isUpdate));
new PendingEntityOperation(entity, original, csvRecord, entityType, !isUpdate));
responseStatus = isUpdate ? Response.Status.OK : Response.Status.CREATED;
} catch (Exception ex) {
importFailure(resultsPrinter, ex.getMessage(), csvRecord);
Expand All @@ -1038,6 +1062,8 @@ protected void createEntity(CSVPrinter resultsPrinter, CSVRecord csvRecord, T en
responseStatus = exists ? Response.Status.OK : Response.Status.CREATED;
// Track the dryRun created entities, as they may be referred by other entities being created
// during import
// Track the dryRun created entities, as they may be referred by other entities being created
// during import
dryRunCreatedEntities.put(entity.getFullyQualifiedName(), entity);
}

Expand Down Expand Up @@ -1070,10 +1096,30 @@ protected void createEntity(
if (Boolean.FALSE.equals(importResult.getDryRun())) {
try {
// In case of updating entity, prepareInternal as update=True
boolean isUpdate = repository.isUpdateForImport(entity);
repository.prepareInternal(entity, isUpdate);
T original = (T) repository.findByNameOrNull(entity.getFullyQualifiedName(), Include.ALL);
boolean isUpdate = original != null;
if (isUpdate) {
entity.setId(original.getId());
}

// Track the entity for immediate lookup by subsequent CSV rows in the same batch BEFORE
// prepareInternal
dryRunCreatedEntities.put(entity.getFullyQualifiedName(), (T) entity);
try {
repository.prepareInternal(entity, isUpdate);
} catch (EntityNotFoundException ex) {
// If entity is not found, checking if we have pending operations
if (!pendingEntityOperations.isEmpty()) {
flushPendingEntityOperations();
repository.prepareInternal(entity, isUpdate);
} else {
throw ex;
}
}
// Queue for batch processing instead of immediate persist
pendingEntityOperations.add(new PendingEntityOperation(entity, csvRecord, type, !isUpdate));
pendingEntityOperations.add(
new PendingEntityOperation(
entity, (EntityInterface) original, csvRecord, type, !isUpdate));
responseStatus = isUpdate ? Response.Status.OK : Response.Status.CREATED;
} catch (Exception ex) {
importFailure(resultsPrinter, ex.getMessage(), csvRecord);
Expand All @@ -1084,6 +1130,7 @@ protected void createEntity(
repository.setFullyQualifiedName(entity);
boolean exists = repository.isUpdateForImport(entity);
responseStatus = exists ? Response.Status.OK : Response.Status.CREATED;
responseStatus = exists ? Response.Status.OK : Response.Status.CREATED;
dryRunCreatedEntities.put(entity.getFullyQualifiedName(), (T) entity);
}

Expand Down Expand Up @@ -1189,31 +1236,43 @@ protected void flushPendingEntityOperations() {
// Separate creates and updates
List<EntityInterface> toCreate = new ArrayList<>();
List<EntityInterface> toUpdate = new ArrayList<>();
List<EntityInterface> originals = new ArrayList<>();

for (PendingEntityOperation op : ops) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Edge Case: Missing originalEntity in update silently drops operation

Details

When an update operation has a null originalEntity, the code logs a warning but doesn't actually handle the entity:

} else {
  // Verify we have the original entity for update
  if (op.originalEntity != null) {
    toUpdate.add(op.entity);
    originals.add(op.originalEntity);
  } else {
    // Should not happen if createEntity logic is correct, but fallback safely
    LOG.warn(
        "Missing original entity for update operation: {}",
        op.entity.getFullyQualifiedName());
    // Treat as potential create or individual fallback?
    // Safest is to let it fail or try individual update fallback
  }
}

The entity is silently dropped from processing - no exception is thrown, no fallback is executed, and no failure is recorded to the CSV import results. This could lead to data loss where users think their update succeeded (since no error is reported in the import results) but the entity was never actually updated.

Consider either:

  1. Adding the entity to a separate fallback list for individual processing
  2. Recording an import failure for this row
  3. Throwing an exception if this is truly unexpected

Was this helpful? React with 👍 / 👎

if (op.isCreate) {
toCreate.add(op.entity);
} else {
toUpdate.add(op.entity);
// Verify we have the original entity for update
if (op.originalEntity != null) {
toUpdate.add(op.entity);
originals.add(op.originalEntity);
} else {
// Should not happen if createEntity logic is correct, but fallback safely
LOG.warn(
"Missing original entity for update operation: {}",
op.entity.getFullyQualifiedName());
// Treat as potential create or individual fallback?
// Safest is to let it fail or try individual update fallback
}
}
}

try {
// Batch create
if (!toCreate.isEmpty()) {
repository.getDao().insertMany(toCreate);
LOG.info("Batch inserted {} {} entities", toCreate.size(), type);
List<EntityInterface> created = repository.createManyEntitiesForImport(toCreate);
for (EntityInterface entity : created) {
pendingSearchIndexUpdates.add(entity);
}
}

// Batch update
if (!toUpdate.isEmpty()) {
repository.getDao().updateMany(toUpdate);
LOG.info("Batch updated {} {} entities", toUpdate.size(), type);
}

// Queue all entities for ES bulk indexing
for (PendingEntityOperation op : ops) {
pendingSearchIndexUpdates.add(op.entity);
List<EntityInterface> updated =
repository.updateManyEntitiesForImport(originals, toUpdate, importedBy);
for (EntityInterface entity : updated) {
pendingSearchIndexUpdates.add(entity);
}
}
} catch (Exception e) {
LOG.error("Error in batch DB operation for {}, falling back to individual ops", type, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1569,7 +1569,7 @@ public List<PutResponse<T>> createOrUpdateBatchForImport(List<T> entities, Strin
}

@Transaction
private List<T> createManyEntitiesForImport(List<T> entities) {
public List<T> createManyEntitiesForImport(List<T> entities) {
storeEntities(entities);
storeExtensions(entities);
storeRelationshipsInternal(entities);
Expand All @@ -1579,8 +1579,7 @@ private List<T> createManyEntitiesForImport(List<T> entities) {
}

@Transaction
private List<T> updateManyEntitiesForImport(
List<T> originals, List<T> updates, String updatedBy) {
public List<T> updateManyEntitiesForImport(List<T> originals, List<T> updates, String updatedBy) {
List<T> updatedEntities = new ArrayList<>();
for (int i = 0; i < originals.size(); i++) {
T original = originals.get(i);
Expand All @@ -1596,7 +1595,11 @@ private List<T> updateManyEntitiesForImport(
updateMany(updatedEntities);
// Update relationships
for (T entity : updatedEntities) {
storeRelationships(entity);
// For imports, delete existing tags before applying new ones to match single entity behavior
if (supportsTags) {
daoCollection.tagUsageDAO().deleteTagsByTarget(entity.getFullyQualifiedName());
}
storeRelationshipsInternal(entity);
}
return updatedEntities;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.csv.CsvExportProgressCallback;
import org.openmetadata.csv.CsvImportProgressCallback;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.AddGlossaryToAssetsRequest;
import org.openmetadata.schema.api.ValidateGlossaryTagsRequest;
Expand Down Expand Up @@ -1840,6 +1841,18 @@ public String exportToCsv(
@Override
public CsvImportResult importFromCsv(
String name, String csv, boolean dryRun, String user, boolean recursive) throws IOException {
return importFromCsv(name, csv, dryRun, user, recursive, (CsvImportProgressCallback) null);
}

@Override
public CsvImportResult importFromCsv(
String name,
String csv,
boolean dryRun,
String user,
boolean recursive,
CsvImportProgressCallback callback)
throws IOException {
GlossaryTerm glossaryTerm = getByName(null, name, Fields.EMPTY_FIELDS);
GlossaryRepository glossaryRepository =
(GlossaryRepository) Entity.getEntityRepository(GLOSSARY);
Expand All @@ -1848,6 +1861,6 @@ public CsvImportResult importFromCsv(
glossaryRepository.getByName(
null, glossaryTerm.getGlossary().getName(), Fields.EMPTY_FIELDS);
GlossaryRepository.GlossaryCsv glossaryCsv = new GlossaryRepository.GlossaryCsv(glossary, user);
return glossaryCsv.importCsv(csv, dryRun);
return glossaryCsv.importCsv(csv, dryRun, callback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1060,23 +1060,39 @@ protected void createEntity(CSVPrinter printer, List<CSVRecord> csvRecords) thro
.withDefaultRoles(getEntityReferences(printer, csvRecord, 7, ROLE))
.withPolicies(getEntityReferences(printer, csvRecord, 8, POLICY));

// Pre-track the entity so getParents can find it for circular dependency detection
if (processRecord) {
dryRunCreatedEntities.put(team.getName(), team);
}

// Field 5 - parent teams
getParents(printer, csvRecord, team);

// Validate during dry run to catch logical errors early
TeamRepository repository = (TeamRepository) Entity.getEntityRepository(TEAM);
if (processRecord && importResult.getDryRun()) {
if (processRecord) {
createEntity(printer, csvRecord, team);
}
}

@Override
protected void createEntity(CSVPrinter resultsPrinter, CSVRecord csvRecord, Team entity)
throws IOException {

// Validate hierarchy now that entity is pre-tracked
if (processRecord) {
TeamRepository repository = (TeamRepository) Entity.getEntityRepository(TEAM);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: Redundant processRecord check in overridden createEntity

Details

In the overridden createEntity method, processRecord is checked but this check is likely incorrect:

@Override
protected void createEntity(CSVPrinter resultsPrinter, CSVRecord csvRecord, Team entity)
    throws IOException {

  // Validate hierarchy now that entity is pre-tracked
  if (processRecord) {  // This check is redundant/suspicious
    TeamRepository repository = (TeamRepository) Entity.getEntityRepository(TEAM);

The createEntity method is only called when processRecord is already true (from the createRecord method). Checking processRecord again inside createEntity is redundant and could be confusing. Additionally, if processRecord is modified elsewhere between the call and this check, it could lead to inconsistent state.

Consider removing the redundant check or documenting why it's necessary.


Was this helpful? React with 👍 / 👎

try {
repository.validateForDryRun(team, dryRunCreatedEntities);
repository.validateForDryRun(entity, dryRunCreatedEntities);
} catch (Exception ex) {
importFailure(printer, ex.getMessage(), csvRecord);
importFailure(resultsPrinter, ex.getMessage(), csvRecord);
processRecord = false;
// Remove from dryRunCreatedEntities since validation failed
dryRunCreatedEntities.remove(entity.getName());
return; // Don't proceed with creation
}
}

if (processRecord) {
createEntity(printer, csvRecord, team);
}
// Now call the parent method for normal processing
super.createEntity(resultsPrinter, csvRecord, entity);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,8 @@ public void updateEntitiesBulk(List<EntityInterface> entities) {
return;
}

String entityType = entities.get(0).getEntityReference().getType();

int batchSize = 100;
int maxConcurrentRequests = 5;
long maxPayloadSizeBytes = 10 * 1024 * 1024; // 10MB
Expand All @@ -667,6 +669,7 @@ public void updateEntitiesBulk(List<EntityInterface> entities) {
try {
bulkSink = createBulkSink(batchSize, maxConcurrentRequests, maxPayloadSizeBytes);
Map<String, Object> contextData = new HashMap<>();
contextData.put(ReindexingUtil.ENTITY_TYPE_KEY, entityType);
bulkSink.write(entities, contextData);
bulkSink.flushAndAwait(60); // Wait up to 60 seconds for completion
} catch (Exception e) {
Expand Down
Loading