Skip to content

Commit ccb8bc4

Browse files
huaxingaoaiborodin
andauthored
[1.10.x] Cherry-pick Flink: fix cache refreshing in dynamic sink (14406, 14765) (#14862)
* Flink: Fix cache refreshing in dynamic sink (#14406) (cherry picked from commit 8db3d21) * Flink: Backport fix cache refreshing in dynamic sink (#14765) Backport #14406 (cherry picked from commit c4ba60d) --------- Co-authored-by: aiborodin <[email protected]>
1 parent eb26eb3 commit ccb8bc4

File tree

6 files changed

+162
-12
lines changed

6 files changed

+162
-12
lines changed

flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.iceberg.flink.sink.dynamic;
2020

21+
import java.time.Clock;
2122
import java.util.Map;
2223
import java.util.Set;
2324
import org.apache.flink.annotation.Internal;
@@ -50,13 +51,25 @@ class TableMetadataCache {
5051

5152
private final Catalog catalog;
5253
private final long refreshMs;
54+
private final Clock cacheRefreshClock;
5355
private final int inputSchemasPerTableCacheMaximumSize;
5456
private final Map<TableIdentifier, CacheItem> tableCache;
5557

5658
TableMetadataCache(
5759
Catalog catalog, int maximumSize, long refreshMs, int inputSchemasPerTableCacheMaximumSize) {
60+
this(catalog, maximumSize, refreshMs, inputSchemasPerTableCacheMaximumSize, Clock.systemUTC());
61+
}
62+
63+
@VisibleForTesting
64+
TableMetadataCache(
65+
Catalog catalog,
66+
int maximumSize,
67+
long refreshMs,
68+
int inputSchemasPerTableCacheMaximumSize,
69+
Clock cacheRefreshClock) {
5870
this.catalog = catalog;
5971
this.refreshMs = refreshMs;
72+
this.cacheRefreshClock = cacheRefreshClock;
6073
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize;
6174
this.tableCache = new LRUCache<>(maximumSize);
6275
}
@@ -88,6 +101,7 @@ void update(TableIdentifier identifier, Table table) {
88101
tableCache.put(
89102
identifier,
90103
new CacheItem(
104+
cacheRefreshClock.millis(),
91105
true,
92106
table.refs().keySet(),
93107
table.schemas(),
@@ -186,14 +200,16 @@ private Tuple2<Boolean, Exception> refreshTable(TableIdentifier identifier) {
186200
return EXISTS;
187201
} catch (NoSuchTableException e) {
188202
LOG.debug("Table doesn't exist {}", identifier, e);
189-
tableCache.put(identifier, new CacheItem(false, null, null, null, 1));
203+
tableCache.put(
204+
identifier, new CacheItem(cacheRefreshClock.millis(), false, null, null, null, 1));
190205
return Tuple2.of(false, e);
191206
}
192207
}
193208

194209
private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) {
195210
return allowRefresh
196-
&& (cacheItem == null || cacheItem.created + refreshMs > System.currentTimeMillis());
211+
&& (cacheItem == null
212+
|| cacheRefreshClock.millis() - cacheItem.createdTimestampMillis > refreshMs);
197213
}
198214

199215
public void invalidate(TableIdentifier identifier) {
@@ -202,20 +218,21 @@ public void invalidate(TableIdentifier identifier) {
202218

203219
/** Handles timeout for missing items only. Caffeine performance causes noticeable delays. */
204220
static class CacheItem {
205-
private final long created = System.currentTimeMillis();
206-
221+
private final long createdTimestampMillis;
207222
private final boolean tableExists;
208223
private final Set<String> branches;
209224
private final Map<Integer, Schema> tableSchemas;
210225
private final Map<Integer, PartitionSpec> specs;
211226
private final Map<Schema, ResolvedSchemaInfo> inputSchemas;
212227

213228
private CacheItem(
229+
long createdTimestampMillis,
214230
boolean tableExists,
215231
Set<String> branches,
216232
Map<Integer, Schema> tableSchemas,
217233
Map<Integer, PartitionSpec> specs,
218234
int inputSchemaCacheMaximumSize) {
235+
this.createdTimestampMillis = createdTimestampMillis;
219236
this.tableExists = tableExists;
220237
this.branches = branches;
221238
this.tableSchemas = tableSchemas;

flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@
2020

2121
import static org.assertj.core.api.Assertions.assertThat;
2222

23+
import java.time.Clock;
24+
import java.time.Instant;
25+
import java.time.ZoneId;
2326
import org.apache.commons.lang3.SerializationUtils;
2427
import org.apache.iceberg.PartitionSpec;
2528
import org.apache.iceberg.Schema;
29+
import org.apache.iceberg.Table;
2630
import org.apache.iceberg.catalog.Catalog;
2731
import org.apache.iceberg.catalog.TableIdentifier;
2832
import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
@@ -91,4 +95,33 @@ void testCachingDisabled() {
9195

9296
assertThat(cache.getInternalCache()).isEmpty();
9397
}
98+
99+
@Test
100+
void testNoCacheRefreshingBeforeRefreshIntervalElapses() {
101+
// Create table
102+
Catalog catalog = CATALOG_EXTENSION.catalog();
103+
TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
104+
Table table = catalog.createTable(tableIdentifier, SCHEMA2);
105+
106+
// Init cache
107+
TableMetadataCache cache =
108+
new TableMetadataCache(
109+
catalog, 10, 100L, 10, Clock.fixed(Instant.now(), ZoneId.systemDefault()));
110+
cache.update(tableIdentifier, table);
111+
112+
// Cache schema
113+
Schema schema = cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema();
114+
assertThat(schema.sameSchema(SCHEMA2)).isTrue();
115+
116+
// Cache schema with fewer fields
117+
TableMetadataCache.ResolvedSchemaInfo schemaInfo = cache.schema(tableIdentifier, SCHEMA);
118+
assertThat(schemaInfo.resolvedTableSchema().sameSchema(SCHEMA2)).isTrue();
119+
assertThat(schemaInfo.compareResult())
120+
.isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
121+
122+
// Assert both schemas are in cache
123+
TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().get(tableIdentifier);
124+
assertThat(cacheItem).isNotNull();
125+
assertThat(cacheItem.inputSchemas()).containsKeys(SCHEMA, SCHEMA2);
126+
}
94127
}

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.iceberg.flink.sink.dynamic;
2020

21+
import java.time.Clock;
2122
import java.util.Map;
2223
import java.util.Set;
2324
import org.apache.flink.annotation.Internal;
@@ -50,13 +51,25 @@ class TableMetadataCache {
5051

5152
private final Catalog catalog;
5253
private final long refreshMs;
54+
private final Clock cacheRefreshClock;
5355
private final int inputSchemasPerTableCacheMaximumSize;
5456
private final Map<TableIdentifier, CacheItem> tableCache;
5557

5658
TableMetadataCache(
5759
Catalog catalog, int maximumSize, long refreshMs, int inputSchemasPerTableCacheMaximumSize) {
60+
this(catalog, maximumSize, refreshMs, inputSchemasPerTableCacheMaximumSize, Clock.systemUTC());
61+
}
62+
63+
@VisibleForTesting
64+
TableMetadataCache(
65+
Catalog catalog,
66+
int maximumSize,
67+
long refreshMs,
68+
int inputSchemasPerTableCacheMaximumSize,
69+
Clock cacheRefreshClock) {
5870
this.catalog = catalog;
5971
this.refreshMs = refreshMs;
72+
this.cacheRefreshClock = cacheRefreshClock;
6073
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize;
6174
this.tableCache = new LRUCache<>(maximumSize);
6275
}
@@ -88,6 +101,7 @@ void update(TableIdentifier identifier, Table table) {
88101
tableCache.put(
89102
identifier,
90103
new CacheItem(
104+
cacheRefreshClock.millis(),
91105
true,
92106
table.refs().keySet(),
93107
table.schemas(),
@@ -186,14 +200,16 @@ private Tuple2<Boolean, Exception> refreshTable(TableIdentifier identifier) {
186200
return EXISTS;
187201
} catch (NoSuchTableException e) {
188202
LOG.debug("Table doesn't exist {}", identifier, e);
189-
tableCache.put(identifier, new CacheItem(false, null, null, null, 1));
203+
tableCache.put(
204+
identifier, new CacheItem(cacheRefreshClock.millis(), false, null, null, null, 1));
190205
return Tuple2.of(false, e);
191206
}
192207
}
193208

194209
private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) {
195210
return allowRefresh
196-
&& (cacheItem == null || cacheItem.created + refreshMs > System.currentTimeMillis());
211+
&& (cacheItem == null
212+
|| cacheRefreshClock.millis() - cacheItem.createdTimestampMillis > refreshMs);
197213
}
198214

199215
public void invalidate(TableIdentifier identifier) {
@@ -202,20 +218,21 @@ public void invalidate(TableIdentifier identifier) {
202218

203219
/** Handles timeout for missing items only. Caffeine performance causes noticeable delays. */
204220
static class CacheItem {
205-
private final long created = System.currentTimeMillis();
206-
221+
private final long createdTimestampMillis;
207222
private final boolean tableExists;
208223
private final Set<String> branches;
209224
private final Map<Integer, Schema> tableSchemas;
210225
private final Map<Integer, PartitionSpec> specs;
211226
private final Map<Schema, ResolvedSchemaInfo> inputSchemas;
212227

213228
private CacheItem(
229+
long createdTimestampMillis,
214230
boolean tableExists,
215231
Set<String> branches,
216232
Map<Integer, Schema> tableSchemas,
217233
Map<Integer, PartitionSpec> specs,
218234
int inputSchemaCacheMaximumSize) {
235+
this.createdTimestampMillis = createdTimestampMillis;
219236
this.tableExists = tableExists;
220237
this.branches = branches;
221238
this.tableSchemas = tableSchemas;

flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@
2020

2121
import static org.assertj.core.api.Assertions.assertThat;
2222

23+
import java.time.Clock;
24+
import java.time.Instant;
25+
import java.time.ZoneId;
2326
import org.apache.commons.lang3.SerializationUtils;
2427
import org.apache.iceberg.PartitionSpec;
2528
import org.apache.iceberg.Schema;
29+
import org.apache.iceberg.Table;
2630
import org.apache.iceberg.catalog.Catalog;
2731
import org.apache.iceberg.catalog.TableIdentifier;
2832
import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
@@ -91,4 +95,33 @@ void testCachingDisabled() {
9195

9296
assertThat(cache.getInternalCache()).isEmpty();
9397
}
98+
99+
@Test
100+
void testNoCacheRefreshingBeforeRefreshIntervalElapses() {
101+
// Create table
102+
Catalog catalog = CATALOG_EXTENSION.catalog();
103+
TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
104+
Table table = catalog.createTable(tableIdentifier, SCHEMA2);
105+
106+
// Init cache
107+
TableMetadataCache cache =
108+
new TableMetadataCache(
109+
catalog, 10, 100L, 10, Clock.fixed(Instant.now(), ZoneId.systemDefault()));
110+
cache.update(tableIdentifier, table);
111+
112+
// Cache schema
113+
Schema schema = cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema();
114+
assertThat(schema.sameSchema(SCHEMA2)).isTrue();
115+
116+
// Cache schema with fewer fields
117+
TableMetadataCache.ResolvedSchemaInfo schemaInfo = cache.schema(tableIdentifier, SCHEMA);
118+
assertThat(schemaInfo.resolvedTableSchema().sameSchema(SCHEMA2)).isTrue();
119+
assertThat(schemaInfo.compareResult())
120+
.isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
121+
122+
// Assert both schemas are in cache
123+
TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().get(tableIdentifier);
124+
assertThat(cacheItem).isNotNull();
125+
assertThat(cacheItem.inputSchemas()).containsKeys(SCHEMA, SCHEMA2);
126+
}
94127
}

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.iceberg.flink.sink.dynamic;
2020

21+
import java.time.Clock;
2122
import java.util.Map;
2223
import java.util.Set;
2324
import org.apache.flink.annotation.Internal;
@@ -50,13 +51,25 @@ class TableMetadataCache {
5051

5152
private final Catalog catalog;
5253
private final long refreshMs;
54+
private final Clock cacheRefreshClock;
5355
private final int inputSchemasPerTableCacheMaximumSize;
5456
private final Map<TableIdentifier, CacheItem> tableCache;
5557

5658
TableMetadataCache(
5759
Catalog catalog, int maximumSize, long refreshMs, int inputSchemasPerTableCacheMaximumSize) {
60+
this(catalog, maximumSize, refreshMs, inputSchemasPerTableCacheMaximumSize, Clock.systemUTC());
61+
}
62+
63+
@VisibleForTesting
64+
TableMetadataCache(
65+
Catalog catalog,
66+
int maximumSize,
67+
long refreshMs,
68+
int inputSchemasPerTableCacheMaximumSize,
69+
Clock cacheRefreshClock) {
5870
this.catalog = catalog;
5971
this.refreshMs = refreshMs;
72+
this.cacheRefreshClock = cacheRefreshClock;
6073
this.inputSchemasPerTableCacheMaximumSize = inputSchemasPerTableCacheMaximumSize;
6174
this.tableCache = new LRUCache<>(maximumSize);
6275
}
@@ -88,6 +101,7 @@ void update(TableIdentifier identifier, Table table) {
88101
tableCache.put(
89102
identifier,
90103
new CacheItem(
104+
cacheRefreshClock.millis(),
91105
true,
92106
table.refs().keySet(),
93107
table.schemas(),
@@ -186,14 +200,16 @@ private Tuple2<Boolean, Exception> refreshTable(TableIdentifier identifier) {
186200
return EXISTS;
187201
} catch (NoSuchTableException e) {
188202
LOG.debug("Table doesn't exist {}", identifier, e);
189-
tableCache.put(identifier, new CacheItem(false, null, null, null, 1));
203+
tableCache.put(
204+
identifier, new CacheItem(cacheRefreshClock.millis(), false, null, null, null, 1));
190205
return Tuple2.of(false, e);
191206
}
192207
}
193208

194209
private boolean needsRefresh(CacheItem cacheItem, boolean allowRefresh) {
195210
return allowRefresh
196-
&& (cacheItem == null || cacheItem.created + refreshMs > System.currentTimeMillis());
211+
&& (cacheItem == null
212+
|| cacheRefreshClock.millis() - cacheItem.createdTimestampMillis > refreshMs);
197213
}
198214

199215
public void invalidate(TableIdentifier identifier) {
@@ -202,20 +218,21 @@ public void invalidate(TableIdentifier identifier) {
202218

203219
/** Handles timeout for missing items only. Caffeine performance causes noticeable delays. */
204220
static class CacheItem {
205-
private final long created = System.currentTimeMillis();
206-
221+
private final long createdTimestampMillis;
207222
private final boolean tableExists;
208223
private final Set<String> branches;
209224
private final Map<Integer, Schema> tableSchemas;
210225
private final Map<Integer, PartitionSpec> specs;
211226
private final Map<Schema, ResolvedSchemaInfo> inputSchemas;
212227

213228
private CacheItem(
229+
long createdTimestampMillis,
214230
boolean tableExists,
215231
Set<String> branches,
216232
Map<Integer, Schema> tableSchemas,
217233
Map<Integer, PartitionSpec> specs,
218234
int inputSchemaCacheMaximumSize) {
235+
this.createdTimestampMillis = createdTimestampMillis;
219236
this.tableExists = tableExists;
220237
this.branches = branches;
221238
this.tableSchemas = tableSchemas;

flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@
2020

2121
import static org.assertj.core.api.Assertions.assertThat;
2222

23+
import java.time.Clock;
24+
import java.time.Instant;
25+
import java.time.ZoneId;
2326
import org.apache.commons.lang3.SerializationUtils;
2427
import org.apache.iceberg.PartitionSpec;
2528
import org.apache.iceberg.Schema;
29+
import org.apache.iceberg.Table;
2630
import org.apache.iceberg.catalog.Catalog;
2731
import org.apache.iceberg.catalog.TableIdentifier;
2832
import org.apache.iceberg.flink.sink.TestFlinkIcebergSinkBase;
@@ -91,4 +95,33 @@ void testCachingDisabled() {
9195

9296
assertThat(cache.getInternalCache()).isEmpty();
9397
}
98+
99+
@Test
100+
void testNoCacheRefreshingBeforeRefreshIntervalElapses() {
101+
// Create table
102+
Catalog catalog = CATALOG_EXTENSION.catalog();
103+
TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
104+
Table table = catalog.createTable(tableIdentifier, SCHEMA2);
105+
106+
// Init cache
107+
TableMetadataCache cache =
108+
new TableMetadataCache(
109+
catalog, 10, 100L, 10, Clock.fixed(Instant.now(), ZoneId.systemDefault()));
110+
cache.update(tableIdentifier, table);
111+
112+
// Cache schema
113+
Schema schema = cache.schema(tableIdentifier, SCHEMA2).resolvedTableSchema();
114+
assertThat(schema.sameSchema(SCHEMA2)).isTrue();
115+
116+
// Cache schema with fewer fields
117+
TableMetadataCache.ResolvedSchemaInfo schemaInfo = cache.schema(tableIdentifier, SCHEMA);
118+
assertThat(schemaInfo.resolvedTableSchema().sameSchema(SCHEMA2)).isTrue();
119+
assertThat(schemaInfo.compareResult())
120+
.isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
121+
122+
// Assert both schemas are in cache
123+
TableMetadataCache.CacheItem cacheItem = cache.getInternalCache().get(tableIdentifier);
124+
assertThat(cacheItem).isNotNull();
125+
assertThat(cacheItem.inputSchemas()).containsKeys(SCHEMA, SCHEMA2);
126+
}
94127
}

0 commit comments

Comments
 (0)