Skip to content

Commit 43cd66d

Browse files
authored
[fix](catalog) update the table's last update time after related operations. (#59387)
### What problem does this PR solve? We should update the external table's last update time after operations like schema change, insert or refresh. So the the SQL cache feature can get the right time to validate the cache. Also merge `schemaUpdateTime` and `eventUpdateTime` into one `updateTime`, and move it to `ExternalTable`. So that all kinds of external table can use this field.
1 parent ceaca62 commit 43cd66d

File tree

23 files changed

+218
-135
lines changed

23 files changed

+218
-135
lines changed

fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,7 @@ private void setExternalTableAutoAnalyzePolicy(ExternalTable table, List<AlterOp
388388

389389
private void processAlterTableForExternalTable(
390390
ExternalTable table, List<AlterOp> alterOps) throws UserException {
391+
long updateTime = System.currentTimeMillis();
391392
for (AlterOp alterOp : alterOps) {
392393
if (alterOp instanceof ModifyTablePropertiesOp) {
393394
setExternalTableAutoAnalyzePolicy(table, alterOps);
@@ -430,23 +431,23 @@ private void processAlterTableForExternalTable(
430431
AddPartitionFieldOp addPartitionField = (AddPartitionFieldOp) alterOp;
431432
if (table instanceof IcebergExternalTable) {
432433
((IcebergExternalCatalog) table.getCatalog()).addPartitionField(
433-
(IcebergExternalTable) table, addPartitionField);
434+
(IcebergExternalTable) table, addPartitionField, updateTime);
434435
} else {
435436
throw new UserException("ADD PARTITION KEY is only supported for Iceberg tables");
436437
}
437438
} else if (alterOp instanceof DropPartitionFieldOp) {
438439
DropPartitionFieldOp dropPartitionField = (DropPartitionFieldOp) alterOp;
439440
if (table instanceof IcebergExternalTable) {
440441
((IcebergExternalCatalog) table.getCatalog()).dropPartitionField(
441-
(IcebergExternalTable) table, dropPartitionField);
442+
(IcebergExternalTable) table, dropPartitionField, updateTime);
442443
} else {
443444
throw new UserException("DROP PARTITION KEY is only supported for Iceberg tables");
444445
}
445446
} else if (alterOp instanceof ReplacePartitionFieldOp) {
446447
ReplacePartitionFieldOp replacePartitionField = (ReplacePartitionFieldOp) alterOp;
447448
if (table instanceof IcebergExternalTable) {
448449
((IcebergExternalCatalog) table.getCatalog()).replacePartitionField(
449-
(IcebergExternalTable) table, replacePartitionField);
450+
(IcebergExternalTable) table, replacePartitionField, updateTime);
450451
} else {
451452
throw new UserException("REPLACE PARTITION KEY is only supported for Iceberg tables");
452453
}

fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,10 @@ public void handleRefreshTable(String catalogName, String dbName, String tableNa
146146
}
147147
return;
148148
}
149-
refreshTableInternal((ExternalDatabase) db, (ExternalTable) table, 0);
150-
149+
long updateTime = System.currentTimeMillis();
150+
refreshTableInternal((ExternalDatabase) db, (ExternalTable) table, updateTime);
151151
ExternalObjectLog log = ExternalObjectLog.createForRefreshTable(catalog.getId(), db.getFullName(),
152-
table.getName());
152+
table.getName(), updateTime);
153153
Env.getCurrentEnv().getEditLog().logRefreshExternalTable(log);
154154
}
155155

@@ -194,6 +194,9 @@ public void replayRefreshTable(ExternalObjectLog log) {
194194
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
195195
.getMetaStoreCache((HMSExternalCatalog) catalog);
196196
cache.refreshAffectedPartitionsCache((HMSExternalTable) table.get(), modifiedPartNames, newPartNames);
197+
if (table.get() instanceof HMSExternalTable && log.getLastUpdateTime() > 0) {
198+
((HMSExternalTable) table.get()).setUpdateTime(log.getLastUpdateTime());
199+
}
197200
LOG.info("replay refresh partitions for table {}, "
198201
+ "modified partitions count: {}, "
199202
+ "new partitions count: {}",
@@ -229,12 +232,12 @@ public void refreshExternalTableFromEvent(String catalogName, String dbName, Str
229232

230233
public void refreshTableInternal(ExternalDatabase db, ExternalTable table, long updateTime) {
231234
table.unsetObjectCreated();
232-
if (table instanceof HMSExternalTable && updateTime > 0) {
233-
((HMSExternalTable) table).setEventUpdateTime(updateTime);
235+
if (updateTime > 0) {
236+
table.setUpdateTime(updateTime);
234237
}
235238
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(table);
236-
LOG.info("refresh table {}, id {} from db {} in catalog {}",
237-
table.getName(), table.getId(), db.getFullName(), db.getCatalog().getName());
239+
LOG.info("refresh table {}, id {} from db {} in catalog {}, update time: {}",
240+
table.getName(), table.getId(), db.getFullName(), db.getCatalog().getName(), updateTime);
238241
}
239242

240243
// Refresh partition
@@ -268,7 +271,7 @@ public void refreshPartitions(String catalogName, String dbName, String tableNam
268271
}
269272

270273
Env.getCurrentEnv().getExtMetaCacheMgr().invalidatePartitionsCache((ExternalTable) table, partitionNames);
271-
((HMSExternalTable) table).setEventUpdateTime(updateTime);
274+
((HMSExternalTable) table).setUpdateTime(updateTime);
272275
}
273276

274277
public void addToRefreshMap(long catalogId, Integer[] sec) {

fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,7 @@ public void addExternalPartitions(String catalogName, String dbName, String tabl
726726

727727
HMSExternalTable hmsTable = (HMSExternalTable) table;
728728
Env.getCurrentEnv().getExtMetaCacheMgr().addPartitionsCache(catalog.getId(), hmsTable, partitionNames);
729-
hmsTable.setEventUpdateTime(updateTime);
729+
hmsTable.setUpdateTime(updateTime);
730730
}
731731

732732
public void dropExternalPartitions(String catalogName, String dbName, String tableName,
@@ -757,7 +757,7 @@ public void dropExternalPartitions(String catalogName, String dbName, String tab
757757

758758
HMSExternalTable hmsTable = (HMSExternalTable) table;
759759
Env.getCurrentEnv().getExtMetaCacheMgr().dropPartitionsCache(catalog.getId(), hmsTable, partitionNames);
760-
hmsTable.setEventUpdateTime(updateTime);
760+
hmsTable.setUpdateTime(updateTime);
761761
}
762762

763763
public void registerCatalogRefreshListener(Env env) {

fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1137,8 +1137,9 @@ public void truncateTable(String dbName, String tableName, PartitionNamesInfo pa
11371137
partitions = partitionNamesInfo.getPartitionNames();
11381138
}
11391139
ExternalTable dorisTable = getDbOrDdlException(dbName).getTableOrDdlException(tableName);
1140-
metadataOps.truncateTable(dorisTable, partitions);
1141-
TruncateTableInfo info = new TruncateTableInfo(getName(), dbName, tableName, partitions);
1140+
long updateTime = System.currentTimeMillis();
1141+
metadataOps.truncateTable(dorisTable, partitions, updateTime);
1142+
TruncateTableInfo info = new TruncateTableInfo(getName(), dbName, tableName, partitions, updateTime);
11421143
Env.getCurrentEnv().getEditLog().logTruncateTable(info);
11431144
} catch (Exception e) {
11441145
LOG.warn("Failed to truncate table {}.{} in catalog {}", dbName, tableName, getName(), e);
@@ -1148,7 +1149,7 @@ public void truncateTable(String dbName, String tableName, PartitionNamesInfo pa
11481149

11491150
public void replayTruncateTable(TruncateTableInfo info) {
11501151
if (metadataOps != null) {
1151-
metadataOps.afterTruncateTable(info.getDb(), info.getTable());
1152+
metadataOps.afterTruncateTable(info.getDb(), info.getTable(), info.getUpdateTime());
11521153
}
11531154
}
11541155

@@ -1320,11 +1321,11 @@ public void resetMetaCacheNames() {
13201321
}
13211322

13221323
// log the refresh external table operation
1323-
private void logRefreshExternalTable(ExternalTable dorisTable) {
1324+
private void logRefreshExternalTable(ExternalTable dorisTable, long updateTime) {
13241325
Env.getCurrentEnv().getEditLog()
13251326
.logRefreshExternalTable(
13261327
ExternalObjectLog.createForRefreshTable(dorisTable.getCatalog().getId(),
1327-
dorisTable.getDbName(), dorisTable.getName()));
1328+
dorisTable.getDbName(), dorisTable.getName(), updateTime));
13281329
}
13291330

13301331
@Override
@@ -1336,8 +1337,9 @@ public void addColumn(TableIf dorisTable, Column column, ColumnPosition position
13361337
throw new DdlException("Add column operation is not supported for catalog: " + getName());
13371338
}
13381339
try {
1339-
metadataOps.addColumn(externalTable, column, position);
1340-
logRefreshExternalTable(externalTable);
1340+
long updateTime = System.currentTimeMillis();
1341+
metadataOps.addColumn(externalTable, column, position, updateTime);
1342+
logRefreshExternalTable(externalTable, updateTime);
13411343
} catch (Exception e) {
13421344
LOG.warn("Failed to add column {} to table {}.{} in catalog {}",
13431345
column.getName(), externalTable.getDbName(), externalTable.getName(), getName(), e);
@@ -1354,8 +1356,9 @@ public void addColumns(TableIf dorisTable, List<Column> columns) throws UserExce
13541356
throw new DdlException("Add columns operation is not supported for catalog: " + getName());
13551357
}
13561358
try {
1357-
metadataOps.addColumns(externalTable, columns);
1358-
logRefreshExternalTable(externalTable);
1359+
long updateTime = System.currentTimeMillis();
1360+
metadataOps.addColumns(externalTable, columns, updateTime);
1361+
logRefreshExternalTable(externalTable, updateTime);
13591362
} catch (Exception e) {
13601363
LOG.warn("Failed to add columns to table {}.{} in catalog {}",
13611364
externalTable.getDbName(), externalTable.getName(), getName(), e);
@@ -1372,8 +1375,9 @@ public void dropColumn(TableIf dorisTable, String columnName) throws UserExcepti
13721375
throw new DdlException("Drop column operation is not supported for catalog: " + getName());
13731376
}
13741377
try {
1375-
metadataOps.dropColumn(externalTable, columnName);
1376-
logRefreshExternalTable(externalTable);
1378+
long updateTime = System.currentTimeMillis();
1379+
metadataOps.dropColumn(externalTable, columnName, updateTime);
1380+
logRefreshExternalTable(externalTable, updateTime);
13771381
} catch (Exception e) {
13781382
LOG.warn("Failed to drop column {} from table {}.{} in catalog {}",
13791383
columnName, externalTable.getDbName(), externalTable.getName(), getName(), e);
@@ -1390,8 +1394,9 @@ public void renameColumn(TableIf dorisTable, String oldName, String newName) thr
13901394
throw new DdlException("Rename column operation is not supported for catalog: " + getName());
13911395
}
13921396
try {
1393-
metadataOps.renameColumn(externalTable, oldName, newName);
1394-
logRefreshExternalTable(externalTable);
1397+
long updateTime = System.currentTimeMillis();
1398+
metadataOps.renameColumn(externalTable, oldName, newName, updateTime);
1399+
logRefreshExternalTable(externalTable, updateTime);
13951400
} catch (Exception e) {
13961401
LOG.warn("Failed to rename column {} to {} in table {}.{} in catalog {}",
13971402
oldName, newName, externalTable.getDbName(), externalTable.getName(), getName(), e);
@@ -1408,8 +1413,9 @@ public void modifyColumn(TableIf dorisTable, Column column, ColumnPosition colum
14081413
throw new DdlException("Modify column operation is not supported for catalog: " + getName());
14091414
}
14101415
try {
1411-
metadataOps.modifyColumn(externalTable, column, columnPosition);
1412-
logRefreshExternalTable(externalTable);
1416+
long updateTime = System.currentTimeMillis();
1417+
metadataOps.modifyColumn(externalTable, column, columnPosition, updateTime);
1418+
logRefreshExternalTable(externalTable, updateTime);
14131419
} catch (Exception e) {
14141420
LOG.warn("Failed to modify column {} in table {}.{} in catalog {}",
14151421
column.getName(), externalTable.getDbName(), externalTable.getName(), getName(), e);
@@ -1426,8 +1432,9 @@ public void reorderColumns(TableIf dorisTable, List<String> newOrder) throws Use
14261432
throw new DdlException("Reorder columns operation is not supported for catalog: " + getName());
14271433
}
14281434
try {
1429-
metadataOps.reorderColumns(externalTable, newOrder);
1430-
logRefreshExternalTable(externalTable);
1435+
long updateTime = System.currentTimeMillis();
1436+
metadataOps.reorderColumns(externalTable, newOrder, updateTime);
1437+
logRefreshExternalTable(externalTable, updateTime);
14311438
} catch (Exception e) {
14321439
LOG.warn("Failed to reorder columns in table {}.{} in catalog {}",
14331440
externalTable.getDbName(), externalTable.getName(), getName(), e);

fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,22 +75,25 @@ public static ExternalObjectLog createForRefreshDb(long catalogId, String dbName
7575
return externalObjectLog;
7676
}
7777

78-
public static ExternalObjectLog createForRefreshTable(long catalogId, String dbName, String tblName) {
78+
public static ExternalObjectLog createForRefreshTable(long catalogId, String dbName, String tblName,
79+
long updateTime) {
7980
ExternalObjectLog externalObjectLog = new ExternalObjectLog();
8081
externalObjectLog.setCatalogId(catalogId);
8182
externalObjectLog.setDbName(dbName);
8283
externalObjectLog.setTableName(tblName);
84+
externalObjectLog.setLastUpdateTime(updateTime);
8385
return externalObjectLog;
8486
}
8587

8688
public static ExternalObjectLog createForRefreshPartitions(long catalogId, String dbName, String tblName,
87-
List<String> modifiedPartNames, List<String> newPartNames) {
89+
List<String> modifiedPartNames, List<String> newPartNames, long updateTime) {
8890
ExternalObjectLog externalObjectLog = new ExternalObjectLog();
8991
externalObjectLog.setCatalogId(catalogId);
9092
externalObjectLog.setDbName(dbName);
9193
externalObjectLog.setTableName(tblName);
9294
externalObjectLog.setPartitionNames(modifiedPartNames);
9395
externalObjectLog.setNewPartitionNames(newPartNames);
96+
externalObjectLog.setLastUpdateTime(updateTime);
9497
return externalObjectLog;
9598
}
9699

fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
8585
@SerializedName(value = "ta")
8686
private final TableAttributes tableAttributes = new TableAttributes();
8787

88-
// this field will be refreshed after reloading schema
89-
protected volatile long schemaUpdateTime;
88+
// record the table update time, like insert/alter/delete
89+
protected volatile long updateTime = 0;
9090

9191
protected long dbId;
9292
protected boolean objectCreated;
@@ -276,16 +276,15 @@ public long getCreateTime() {
276276
return 0;
277277
}
278278

279-
// return schema update time as default
280-
// override this method if there is some other kinds of update time
281-
// use getSchemaUpdateTime if just need the schema update time
282279
@Override
280+
// Returns the table update time, tracking when the table was last modified
281+
// (for example, by insert, alter, or refresh operations).
283282
public long getUpdateTime() {
284-
return this.schemaUpdateTime;
283+
return updateTime;
285284
}
286285

287-
public void setUpdateTime(long schemaUpdateTime) {
288-
this.schemaUpdateTime = schemaUpdateTime;
286+
public void setUpdateTime(long updateTime) {
287+
this.updateTime = updateTime;
289288
}
290289

291290
@Override
@@ -345,7 +344,7 @@ public Optional<ColumnStatistic> getColumnStatistic(String colName) {
345344
* @return
346345
*/
347346
public Optional<SchemaCacheValue> initSchemaAndUpdateTime(SchemaCacheKey key) {
348-
schemaUpdateTime = System.currentTimeMillis();
347+
setUpdateTime(System.currentTimeMillis());
349348
return initSchema(key);
350349
}
351350

@@ -484,10 +483,6 @@ public int hashCode() {
484483
return Objects.hashCode(name, db);
485484
}
486485

487-
public long getSchemaUpdateTime() {
488-
return schemaUpdateTime;
489-
}
490-
491486
public long getDbId() {
492487
return dbId;
493488
}

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalDatabase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public boolean registerTable(TableIf tableIf) {
5151
super.registerTable(tableIf);
5252
HMSExternalTable table = getTableNullable(tableIf.getName());
5353
if (table != null) {
54-
table.setEventUpdateTime(tableIf.getUpdateTime());
54+
table.setUpdateTime(tableIf.getUpdateTime());
5555
}
5656
return true;
5757
}

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -190,9 +190,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
190190

191191
private HMSDlaTable dlaTable;
192192

193-
// record the event update time when enable hms event listener
194-
protected volatile long eventUpdateTime;
195-
196193
public enum DLAType {
197194
UNKNOWN, HIVE, HUDI, ICEBERG
198195
}
@@ -648,11 +645,11 @@ public Set<String> getPartitionNames() {
648645
public Optional<SchemaCacheValue> initSchemaAndUpdateTime(SchemaCacheKey key) {
649646
Table table = loadHiveTable();
650647
// try to use transient_lastDdlTime from hms client
651-
schemaUpdateTime = MapUtils.isNotEmpty(table.getParameters())
648+
setUpdateTime(MapUtils.isNotEmpty(table.getParameters())
652649
&& table.getParameters().containsKey(TBL_PROP_TRANSIENT_LAST_DDL_TIME)
653650
? Long.parseLong(table.getParameters().get(TBL_PROP_TRANSIENT_LAST_DDL_TIME)) * 1000
654651
// use current timestamp if lastDdlTime does not exist (hive views don't have this prop)
655-
: System.currentTimeMillis();
652+
: System.currentTimeMillis());
656653
return initSchema(key);
657654
}
658655

@@ -902,17 +899,6 @@ private void setStatData(Column col, ColumnStatisticsData data, ColumnStatisticB
902899
builder.setMaxValue(Double.POSITIVE_INFINITY);
903900
}
904901

905-
public void setEventUpdateTime(long updateTime) {
906-
this.eventUpdateTime = updateTime;
907-
}
908-
909-
@Override
910-
// get the max value of `schemaUpdateTime` and `eventUpdateTime`
911-
// eventUpdateTime will be refreshed after processing events with hms event listener enabled
912-
public long getUpdateTime() {
913-
return Math.max(this.schemaUpdateTime, this.eventUpdateTime);
914-
}
915-
916902
@Override
917903
public void gsonPostProcess() throws IOException {
918904
super.gsonPostProcess();

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -337,15 +337,15 @@ public void truncateTableImpl(ExternalTable dorisTable, List<String> partitions)
337337
}
338338

339339
@Override
340-
public void afterTruncateTable(String dbName, String tblName) {
340+
public void afterTruncateTable(String dbName, String tblName, long updateTime) {
341341
try {
342342
// Invalidate cache.
343343
Optional<ExternalDatabase<?>> db = catalog.getDbForReplay(dbName);
344344
if (db.isPresent()) {
345345
Optional tbl = db.get().getTableForReplay(tblName);
346346
if (tbl.isPresent()) {
347347
Env.getCurrentEnv().getRefreshManager()
348-
.refreshTableInternal(db.get(), (ExternalTable) tbl.get(), 0);
348+
.refreshTableInternal(db.get(), (ExternalTable) tbl.get(), updateTime);
349349
}
350350
}
351351
} catch (Exception e) {

0 commit comments

Comments
 (0)