Skip to content

Commit aa2eb58

Browse files
authored
[Opt](cloud) cache table version for cloud mode (#59339)
1 parent af4e2dd commit aa2eb58

File tree

3 files changed

+165
-0
lines changed

3 files changed

+165
-0
lines changed

fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,11 @@ public enum OlapTableState {
234234
// Ensures only one creation task runs for a given partition at a time.
235235
private ConcurrentHashMap<String, CompletableFuture<Void>> partitionCreationFutures = new ConcurrentHashMap<>();
236236

237+
// Cache for table version in cloud mode
238+
// This value is set when get the table version from meta-service, 0 means version is not cached yet
239+
private long lastTableVersionCachedTimeMs = 0;
240+
private long cachedTableVersion = -1;
241+
237242
public OlapTable() {
238243
// for persist
239244
super(TableType.OLAP);
@@ -3280,11 +3285,43 @@ public long getNextVersion() {
32803285
}
32813286
}
32823287

3288+
public boolean isCachedTableVersionExpired() {
3289+
// -1 means no cache yet, need to fetch from MS
3290+
if (cachedTableVersion == -1) {
3291+
return true;
3292+
}
3293+
ConnectContext ctx = ConnectContext.get();
3294+
if (ctx == null) {
3295+
return true;
3296+
}
3297+
long cacheExpirationMs = ctx.getSessionVariable().cloudTableVersionCacheTtlMs;
3298+
if (cacheExpirationMs <= 0) { // always expired
3299+
return true;
3300+
}
3301+
return System.currentTimeMillis() - lastTableVersionCachedTimeMs > cacheExpirationMs;
3302+
}
3303+
3304+
public void setCachedTableVersion(long version) {
3305+
if (version > cachedTableVersion) {
3306+
cachedTableVersion = version;
3307+
lastTableVersionCachedTimeMs = System.currentTimeMillis();
3308+
}
3309+
}
3310+
3311+
public long getCachedTableVersion() {
3312+
return cachedTableVersion;
3313+
}
3314+
32833315
public long getVisibleVersion() throws RpcException {
32843316
if (Config.isNotCloudMode()) {
32853317
return tableAttributes.getVisibleVersion();
32863318
}
32873319

3320+
// check if cache is not expired
3321+
if (!isCachedTableVersionExpired()) {
3322+
return getCachedTableVersion();
3323+
}
3324+
32883325
// get version rpc
32893326
Cloud.GetVersionRequest request = Cloud.GetVersionRequest.newBuilder()
32903327
.setRequestIp(FrontendOptions.getLocalHostAddressCached())
@@ -3309,6 +3346,8 @@ public long getVisibleVersion() throws RpcException {
33093346
if (version == 0) {
33103347
version = 1;
33113348
}
3349+
// update cache
3350+
setCachedTableVersion(version);
33123351
return version;
33133352
} catch (RpcException e) {
33143353
LOG.warn("get version from meta service failed", e);

fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,8 @@ public class SessionVariable implements Serializable, Writable {
772772
public static final String DISABLE_EMPTY_PARTITION_PRUNE = "disable_empty_partition_prune";
773773
public static final String CLOUD_PARTITION_VERSION_CACHE_TTL_MS =
774774
"cloud_partition_version_cache_ttl_ms";
775+
public static final String CLOUD_TABLE_VERSION_CACHE_TTL_MS =
776+
"cloud_table_version_cache_ttl_ms";
775777
// CLOUD_VARIABLES_BEGIN
776778

777779
public static final String ENABLE_MATCH_WITHOUT_INVERTED_INDEX = "enable_match_without_inverted_index";
@@ -2846,6 +2848,8 @@ public void setDetailShapePlanNodes(String detailShapePlanNodes) {
28462848
public boolean disableEmptyPartitionPrune = false;
28472849
@VariableMgr.VarAttr(name = CLOUD_PARTITION_VERSION_CACHE_TTL_MS)
28482850
public static long cloudPartitionVersionCacheTtlMs = 0;
2851+
@VariableMgr.VarAttr(name = CLOUD_TABLE_VERSION_CACHE_TTL_MS)
2852+
public long cloudTableVersionCacheTtlMs = 0;
28492853
// CLOUD_VARIABLES_END
28502854

28512855
// fetch remote schema rpc timeout

fe/fe-core/src/test/java/org/apache/doris/catalog/OlapTableTest.java

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919

2020
import org.apache.doris.analysis.UserIdentity;
2121
import org.apache.doris.catalog.TableIf.TableType;
22+
import org.apache.doris.cloud.proto.Cloud;
23+
import org.apache.doris.cloud.rpc.VersionHelper;
24+
import org.apache.doris.common.Config;
2225
import org.apache.doris.common.FeConstants;
2326
import org.apache.doris.common.io.FastByteArrayOutputStream;
2427
import org.apache.doris.common.util.UnitTestUtil;
2528
import org.apache.doris.nereids.trees.plans.commands.info.IndexDefinition;
2629
import org.apache.doris.qe.ConnectContext;
30+
import org.apache.doris.qe.SessionVariable;
2731
import org.apache.doris.resource.Tag;
2832
import org.apache.doris.resource.computegroup.ComputeGroup;
2933
import org.apache.doris.system.Backend;
@@ -283,4 +287,122 @@ public void testTopNPushDownWithTag() throws Exception {
283287
ConnectContext.remove();
284288

285289
}
290+
291+
@Test
292+
public void testTableVersionCacheWithRpc() throws Exception {
293+
// Mock cloud mode
294+
new MockUp<Config>() {
295+
@Mock
296+
public boolean isNotCloudMode() {
297+
return false;
298+
}
299+
};
300+
301+
// Create table and database
302+
final Database db = new Database(1L, "test_db");
303+
304+
// Create a custom OlapTable that overrides getDatabase()
305+
OlapTable table = new OlapTable() {
306+
@Override
307+
public Database getDatabase() {
308+
return db;
309+
}
310+
};
311+
table.id = 1000L;
312+
313+
// Mock VersionHelper.getVersionFromMeta()
314+
final long[] versions = {100L, 200L, 300L};
315+
final int[] callCount = {0};
316+
317+
new MockUp<VersionHelper>() {
318+
@Mock
319+
public Cloud.GetVersionResponse getVersionFromMeta(Cloud.GetVersionRequest req) {
320+
Cloud.GetVersionResponse.Builder builder = Cloud.GetVersionResponse.newBuilder();
321+
builder.setStatus(Cloud.MetaServiceResponseStatus.newBuilder()
322+
.setCode(Cloud.MetaServiceCode.OK).build());
323+
builder.setVersion(versions[callCount[0]]);
324+
callCount[0]++;
325+
return builder.build();
326+
}
327+
};
328+
329+
// Create ConnectContext with SessionVariable
330+
ConnectContext ctx = new ConnectContext();
331+
ctx.setSessionVariable(new SessionVariable());
332+
ctx.setThreadLocalInfo();
333+
334+
try {
335+
// Test 1: Initial state with TTL set, should still call RPC for first time
336+
ctx.getSessionVariable().cloudTableVersionCacheTtlMs = 100000; // Set long TTL
337+
Assert.assertEquals(-1, table.getCachedTableVersion()); // Initial state
338+
Assert.assertTrue(table.isCachedTableVersionExpired()); // Should be expired due to -1
339+
340+
long ver0 = table.getVisibleVersion();
341+
Assert.assertEquals(100, ver0); // Should get from MS
342+
Assert.assertEquals(1, callCount[0]); // First RPC call
343+
Assert.assertEquals(100, table.getCachedTableVersion()); // Cache updated
344+
345+
// Second call should use cache
346+
long ver0Again = table.getVisibleVersion();
347+
Assert.assertEquals(100, ver0Again); // Should use cached version
348+
Assert.assertEquals(1, callCount[0]); // No new RPC call
349+
350+
// Test 2: Disable cache (TTL = 0), should always call RPC
351+
ctx.getSessionVariable().cloudTableVersionCacheTtlMs = 0;
352+
long ver1 = table.getVisibleVersion();
353+
Assert.assertEquals(200, ver1);
354+
Assert.assertEquals(2, callCount[0]); // Second RPC call
355+
356+
long ver2 = table.getVisibleVersion();
357+
Assert.assertEquals(300, ver2);
358+
Assert.assertEquals(3, callCount[0]); // Third RPC call
359+
Assert.assertEquals(300, table.getCachedTableVersion()); // Cache updated to 300
360+
361+
// Test 3: Enable cache with long TTL, should use cached version
362+
ctx.getSessionVariable().cloudTableVersionCacheTtlMs = 100000; // 100 seconds
363+
table.setCachedTableVersion(350); // Set cache to a larger version
364+
long ver3 = table.getVisibleVersion();
365+
Assert.assertEquals(350, ver3); // Should return cached version (350)
366+
Assert.assertEquals(3, callCount[0]); // No new RPC call
367+
368+
// Test 4: Test setCachedTableVersion only updates when version is greater
369+
ctx.getSessionVariable().cloudTableVersionCacheTtlMs = 500; // 500ms TTL
370+
371+
// At this point, cache is 350 from Test 3
372+
// Set a larger version to 400
373+
table.setCachedTableVersion(400);
374+
Assert.assertEquals(400, table.getCachedTableVersion());
375+
Assert.assertFalse(table.isCachedTableVersionExpired()); // Not expired yet
376+
377+
Thread.sleep(300); // Sleep 300ms
378+
379+
// Try to set a smaller version (380), should NOT update version or timestamp
380+
table.setCachedTableVersion(380);
381+
Assert.assertEquals(400, table.getCachedTableVersion()); // Version should remain 400
382+
383+
Thread.sleep(300); // Total 600ms since setCachedTableVersion(400)
384+
// Cache should be expired (600ms > 500ms TTL)
385+
// If timestamp was incorrectly reset by setCachedTableVersion(380), cache would not be expired
386+
Assert.assertTrue(table.isCachedTableVersionExpired());
387+
388+
// Test 5: Setting a greater version should update both version and timestamp
389+
ctx.getSessionVariable().cloudTableVersionCacheTtlMs = 500; // 500ms TTL
390+
table.setCachedTableVersion(500); // Set to 500
391+
Assert.assertEquals(500, table.getCachedTableVersion());
392+
Assert.assertFalse(table.isCachedTableVersionExpired()); // Not expired
393+
394+
Thread.sleep(300); // Sleep 300ms
395+
396+
// Set a greater version (550), should update both version and timestamp
397+
table.setCachedTableVersion(550);
398+
Assert.assertEquals(550, table.getCachedTableVersion()); // Version updated to 550
399+
Assert.assertFalse(table.isCachedTableVersionExpired()); // Timestamp reset, not expired yet
400+
401+
Thread.sleep(300); // Sleep another 300ms (total 600ms from first setCachedTableVersion(500), but only 300ms from setCachedTableVersion(550))
402+
Assert.assertFalse(table.isCachedTableVersionExpired()); // Still not expired (300ms < 500ms TTL)
403+
404+
} finally {
405+
ConnectContext.remove();
406+
}
407+
}
286408
}

0 commit comments

Comments
 (0)