Skip to content

Commit e786c4b

Browse files
committed
[fix] add update
1 parent 5b2cbcc commit e786c4b

File tree

12 files changed

+102
-74
lines changed

12 files changed

+102
-74
lines changed

fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,7 @@ private void setExternalTableAutoAnalyzePolicy(ExternalTable table, List<AlterOp
388388

389389
private void processAlterTableForExternalTable(
390390
ExternalTable table, List<AlterOp> alterOps) throws UserException {
391+
long updateTime = System.currentTimeMillis();
391392
for (AlterOp alterOp : alterOps) {
392393
if (alterOp instanceof ModifyTablePropertiesOp) {
393394
setExternalTableAutoAnalyzePolicy(table, alterOps);
@@ -430,23 +431,23 @@ private void processAlterTableForExternalTable(
430431
AddPartitionFieldOp addPartitionField = (AddPartitionFieldOp) alterOp;
431432
if (table instanceof IcebergExternalTable) {
432433
((IcebergExternalCatalog) table.getCatalog()).addPartitionField(
433-
(IcebergExternalTable) table, addPartitionField);
434+
(IcebergExternalTable) table, addPartitionField, updateTime);
434435
} else {
435436
throw new UserException("ADD PARTITION KEY is only supported for Iceberg tables");
436437
}
437438
} else if (alterOp instanceof DropPartitionFieldOp) {
438439
DropPartitionFieldOp dropPartitionField = (DropPartitionFieldOp) alterOp;
439440
if (table instanceof IcebergExternalTable) {
440441
((IcebergExternalCatalog) table.getCatalog()).dropPartitionField(
441-
(IcebergExternalTable) table, dropPartitionField);
442+
(IcebergExternalTable) table, dropPartitionField, updateTime);
442443
} else {
443444
throw new UserException("DROP PARTITION KEY is only supported for Iceberg tables");
444445
}
445446
} else if (alterOp instanceof ReplacePartitionFieldOp) {
446447
ReplacePartitionFieldOp replacePartitionField = (ReplacePartitionFieldOp) alterOp;
447448
if (table instanceof IcebergExternalTable) {
448449
((IcebergExternalCatalog) table.getCatalog()).replacePartitionField(
449-
(IcebergExternalTable) table, replacePartitionField);
450+
(IcebergExternalTable) table, replacePartitionField, updateTime);
450451
} else {
451452
throw new UserException("REPLACE PARTITION KEY is only supported for Iceberg tables");
452453
}

fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,10 @@ public void handleRefreshTable(String catalogName, String dbName, String tableNa
146146
}
147147
return;
148148
}
149-
refreshTableInternal((ExternalDatabase) db, (ExternalTable) table, 0);
150-
149+
long updateTime = System.currentTimeMillis();
150+
refreshTableInternal((ExternalDatabase) db, (ExternalTable) table, updateTime);
151151
ExternalObjectLog log = ExternalObjectLog.createForRefreshTable(catalog.getId(), db.getFullName(),
152-
table.getName());
152+
table.getName(), updateTime);
153153
Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
154154
}
155155

@@ -194,6 +194,9 @@ public void replayRefreshTable(ExternalObjectLog log) {
194194
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
195195
.getMetaStoreCache((HMSExternalCatalog) catalog);
196196
cache.refreshAffectedPartitionsCache((HMSExternalTable) table.get(), modifiedPartNames, newPartNames);
197+
if (table.get() instanceof HMSExternalTable && log.getLastUpdateTime() > 0) {
198+
((HMSExternalTable) table.get()).setEventUpdateTime(log.getLastUpdateTime());
199+
}
197200
LOG.info("replay refresh partitions for table {}, "
198201
+ "modified partitions count: {}, "
199202
+ "new partitions count: {}",
@@ -233,8 +236,8 @@ public void refreshTableInternal(ExternalDatabase db, ExternalTable table, long
233236
((HMSExternalTable) table).setEventUpdateTime(updateTime);
234237
}
235238
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(table);
236-
LOG.info("refresh table {}, id {} from db {} in catalog {}",
237-
table.getName(), table.getId(), db.getFullName(), db.getCatalog().getName());
239+
LOG.info("refresh table {}, id {} from db {} in catalog {}, update time: {}",
240+
table.getName(), table.getId(), db.getFullName(), db.getCatalog().getName(), updateTime);
238241
}
239242

240243
// Refresh partition

fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1137,8 +1137,9 @@ public void truncateTable(String dbName, String tableName, PartitionNamesInfo pa
11371137
partitions = partitionNamesInfo.getPartitionNames();
11381138
}
11391139
ExternalTable dorisTable = getDbOrDdlException(dbName).getTableOrDdlException(tableName);
1140-
metadataOps.truncateTable(dorisTable, partitions);
1141-
TruncateTableInfo info = new TruncateTableInfo(getName(), dbName, tableName, partitions);
1140+
long updateTime = System.currentTimeMillis();
1141+
metadataOps.truncateTable(dorisTable, partitions, updateTime);
1142+
TruncateTableInfo info = new TruncateTableInfo(getName(), dbName, tableName, partitions, updateTime);
11421143
Env.getCurrentEnv().getEditLog().logTruncateTable(info);
11431144
} catch (Exception e) {
11441145
LOG.warn("Failed to truncate table {}.{} in catalog {}", dbName, tableName, getName(), e);
@@ -1148,7 +1149,7 @@ public void truncateTable(String dbName, String tableName, PartitionNamesInfo pa
11481149

11491150
public void replayTruncateTable(TruncateTableInfo info) {
11501151
if (metadataOps != null) {
1151-
metadataOps.afterTruncateTable(info.getDb(), info.getTable());
1152+
metadataOps.afterTruncateTable(info.getDb(), info.getTable(), info.getUpdateTime());
11521153
}
11531154
}
11541155

@@ -1320,11 +1321,11 @@ public void resetMetaCacheNames() {
13201321
}
13211322

13221323
// log the refresh external table operation
1323-
private void logRefreshExternalTable(ExternalTable dorisTable) {
1324+
private void logRefreshExternalTable(ExternalTable dorisTable, long updateTime) {
13241325
Env.getCurrentEnv().getEditLog()
13251326
.logRefreshExternalTable(
13261327
ExternalObjectLog.createForRefreshTable(dorisTable.getCatalog().getId(),
1327-
dorisTable.getDbName(), dorisTable.getName()));
1328+
dorisTable.getDbName(), dorisTable.getName(), updateTime));
13281329
}
13291330

13301331
@Override
@@ -1336,8 +1337,9 @@ public void addColumn(TableIf dorisTable, Column column, ColumnPosition position
13361337
throw new DdlException("Add column operation is not supported for catalog: " + getName());
13371338
}
13381339
try {
1339-
metadataOps.addColumn(externalTable, column, position);
1340-
logRefreshExternalTable(externalTable);
1340+
long updateTime = System.currentTimeMillis();
1341+
metadataOps.addColumn(externalTable, column, position, updateTime);
1342+
logRefreshExternalTable(externalTable, updateTime);
13411343
} catch (Exception e) {
13421344
LOG.warn("Failed to add column {} to table {}.{} in catalog {}",
13431345
column.getName(), externalTable.getDbName(), externalTable.getName(), getName(), e);
@@ -1354,8 +1356,9 @@ public void addColumns(TableIf dorisTable, List<Column> columns) throws UserExce
13541356
throw new DdlException("Add columns operation is not supported for catalog: " + getName());
13551357
}
13561358
try {
1357-
metadataOps.addColumns(externalTable, columns);
1358-
logRefreshExternalTable(externalTable);
1359+
long updateTime = System.currentTimeMillis();
1360+
metadataOps.addColumns(externalTable, columns, updateTime);
1361+
logRefreshExternalTable(externalTable, updateTime);
13591362
} catch (Exception e) {
13601363
LOG.warn("Failed to add columns to table {}.{} in catalog {}",
13611364
externalTable.getDbName(), externalTable.getName(), getName(), e);
@@ -1372,8 +1375,9 @@ public void dropColumn(TableIf dorisTable, String columnName) throws UserExcepti
13721375
throw new DdlException("Drop column operation is not supported for catalog: " + getName());
13731376
}
13741377
try {
1375-
metadataOps.dropColumn(externalTable, columnName);
1376-
logRefreshExternalTable(externalTable);
1378+
long updateTime = System.currentTimeMillis();
1379+
metadataOps.dropColumn(externalTable, columnName, updateTime);
1380+
logRefreshExternalTable(externalTable, updateTime);
13771381
} catch (Exception e) {
13781382
LOG.warn("Failed to drop column {} from table {}.{} in catalog {}",
13791383
columnName, externalTable.getDbName(), externalTable.getName(), getName(), e);
@@ -1390,8 +1394,9 @@ public void renameColumn(TableIf dorisTable, String oldName, String newName) thr
13901394
throw new DdlException("Rename column operation is not supported for catalog: " + getName());
13911395
}
13921396
try {
1393-
metadataOps.renameColumn(externalTable, oldName, newName);
1394-
logRefreshExternalTable(externalTable);
1397+
long updateTime = System.currentTimeMillis();
1398+
metadataOps.renameColumn(externalTable, oldName, newName, updateTime);
1399+
logRefreshExternalTable(externalTable, updateTime);
13951400
} catch (Exception e) {
13961401
LOG.warn("Failed to rename column {} to {} in table {}.{} in catalog {}",
13971402
oldName, newName, externalTable.getDbName(), externalTable.getName(), getName(), e);
@@ -1408,8 +1413,9 @@ public void modifyColumn(TableIf dorisTable, Column column, ColumnPosition colum
14081413
throw new DdlException("Modify column operation is not supported for catalog: " + getName());
14091414
}
14101415
try {
1411-
metadataOps.modifyColumn(externalTable, column, columnPosition);
1412-
logRefreshExternalTable(externalTable);
1416+
long updateTime = System.currentTimeMillis();
1417+
metadataOps.modifyColumn(externalTable, column, columnPosition, updateTime);
1418+
logRefreshExternalTable(externalTable, updateTime);
14131419
} catch (Exception e) {
14141420
LOG.warn("Failed to modify column {} in table {}.{} in catalog {}",
14151421
column.getName(), externalTable.getDbName(), externalTable.getName(), getName(), e);
@@ -1426,8 +1432,9 @@ public void reorderColumns(TableIf dorisTable, List<String> newOrder) throws Use
14261432
throw new DdlException("Reorder columns operation is not supported for catalog: " + getName());
14271433
}
14281434
try {
1429-
metadataOps.reorderColumns(externalTable, newOrder);
1430-
logRefreshExternalTable(externalTable);
1435+
long updateTime = System.currentTimeMillis();
1436+
metadataOps.reorderColumns(externalTable, newOrder, updateTime);
1437+
logRefreshExternalTable(externalTable, updateTime);
14311438
} catch (Exception e) {
14321439
LOG.warn("Failed to reorder columns in table {}.{} in catalog {}",
14331440
externalTable.getDbName(), externalTable.getName(), getName(), e);

fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,22 +75,25 @@ public static ExternalObjectLog createForRefreshDb(long catalogId, String dbName
7575
return externalObjectLog;
7676
}
7777

78-
public static ExternalObjectLog createForRefreshTable(long catalogId, String dbName, String tblName) {
78+
public static ExternalObjectLog createForRefreshTable(long catalogId, String dbName, String tblName,
79+
long updateTime) {
7980
ExternalObjectLog externalObjectLog = new ExternalObjectLog();
8081
externalObjectLog.setCatalogId(catalogId);
8182
externalObjectLog.setDbName(dbName);
8283
externalObjectLog.setTableName(tblName);
84+
externalObjectLog.setLastUpdateTime(updateTime);
8385
return externalObjectLog;
8486
}
8587

8688
public static ExternalObjectLog createForRefreshPartitions(long catalogId, String dbName, String tblName,
87-
List<String> modifiedPartNames, List<String> newPartNames) {
89+
List<String> modifiedPartNames, List<String> newPartNames, long updateTime) {
8890
ExternalObjectLog externalObjectLog = new ExternalObjectLog();
8991
externalObjectLog.setCatalogId(catalogId);
9092
externalObjectLog.setDbName(dbName);
9193
externalObjectLog.setTableName(tblName);
9294
externalObjectLog.setPartitionNames(modifiedPartNames);
9395
externalObjectLog.setNewPartitionNames(newPartNames);
96+
externalObjectLog.setLastUpdateTime(updateTime);
9497
return externalObjectLog;
9598
}
9699

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,15 +337,15 @@ public void truncateTableImpl(ExternalTable dorisTable, List<String> partitions)
337337
}
338338

339339
@Override
340-
public void afterTruncateTable(String dbName, String tblName) {
340+
public void afterTruncateTable(String dbName, String tblName, long updateTime) {
341341
try {
342342
// Invalidate cache.
343343
Optional<ExternalDatabase<?>> db = catalog.getDbForReplay(dbName);
344344
if (db.isPresent()) {
345345
Optional tbl = db.get().getTableForReplay(tblName);
346346
if (tbl.isPresent()) {
347347
Env.getCurrentEnv().getRefreshManager()
348-
.refreshTableInternal(db.get(), (ExternalTable) tbl.get(), 0);
348+
.refreshTableInternal(db.get(), (ExternalTable) tbl.get(), updateTime);
349349
}
350350
}
351351
} catch (Exception e) {

fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -221,46 +221,48 @@ public boolean viewExists(String dbName, String viewName) {
221221
/**
222222
* Add partition field to Iceberg table for partition evolution
223223
*/
224-
public void addPartitionField(IcebergExternalTable table, AddPartitionFieldOp op) throws UserException {
224+
public void addPartitionField(IcebergExternalTable table, AddPartitionFieldOp op, long updateTime)
225+
throws UserException {
225226
makeSureInitialized();
226227
if (metadataOps == null) {
227228
throw new UserException("Add partition field operation is not supported for catalog: " + getName());
228229
}
229-
((IcebergMetadataOps) metadataOps).addPartitionField(table, op);
230+
((IcebergMetadataOps) metadataOps).addPartitionField(table, op, updateTime);
230231
Env.getCurrentEnv().getEditLog()
231232
.logRefreshExternalTable(
232233
ExternalObjectLog.createForRefreshTable(table.getCatalog().getId(),
233-
table.getDbName(), table.getName()));
234+
table.getDbName(), table.getName(), updateTime));
234235
}
235236

236237
/**
237238
* Drop partition field from Iceberg table for partition evolution
238239
*/
239-
public void dropPartitionField(IcebergExternalTable table, DropPartitionFieldOp op) throws UserException {
240+
public void dropPartitionField(IcebergExternalTable table, DropPartitionFieldOp op, long updateTime)
241+
throws UserException {
240242
makeSureInitialized();
241243
if (metadataOps == null) {
242244
throw new UserException("Drop partition field operation is not supported for catalog: " + getName());
243245
}
244-
((IcebergMetadataOps) metadataOps).dropPartitionField(table, op);
246+
((IcebergMetadataOps) metadataOps).dropPartitionField(table, op, updateTime);
245247
Env.getCurrentEnv().getEditLog()
246248
.logRefreshExternalTable(
247249
ExternalObjectLog.createForRefreshTable(table.getCatalog().getId(),
248-
table.getDbName(), table.getName()));
250+
table.getDbName(), table.getName(), updateTime));
249251
}
250252

251253
/**
252254
* Replace partition field in Iceberg table for partition evolution
253255
*/
254256
public void replacePartitionField(IcebergExternalTable table,
255-
ReplacePartitionFieldOp op) throws UserException {
257+
ReplacePartitionFieldOp op, long updateTime) throws UserException {
256258
makeSureInitialized();
257259
if (metadataOps == null) {
258260
throw new UserException("Replace partition field operation is not supported for catalog: " + getName());
259261
}
260-
((IcebergMetadataOps) metadataOps).replacePartitionField(table, op);
262+
((IcebergMetadataOps) metadataOps).replacePartitionField(table, op, updateTime);
261263
Env.getCurrentEnv().getEditLog()
262264
.logRefreshExternalTable(
263265
ExternalObjectLog.createForRefreshTable(table.getCatalog().getId(),
264-
table.getDbName(), table.getName()));
266+
table.getDbName(), table.getName(), updateTime));
265267
}
266268
}

0 commit comments

Comments
 (0)