Skip to content

Commit 6dd1c39

Browse files
committed
add filter to watermark query
1 parent 5f9cd73 commit 6dd1c39

File tree

4 files changed

+52
-23
lines changed

4 files changed

+52
-23
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/AsyncWatermarkCache.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,23 @@ public class AsyncWatermarkCache implements WatermarkCache {
4242
private static final Object MIN_WATERMARK_KEY = new Object();
4343
private final LoadingCache<Object, Optional<Timestamp>> cache;
4444

45+
private Timestamp lastCachedMinWatermark = Timestamp.MIN_VALUE;
46+
4547
public AsyncWatermarkCache(PartitionMetadataDao dao, Duration refreshRate) {
4648
this.cache =
4749
CacheBuilder.newBuilder()
4850
.refreshAfterWrite(java.time.Duration.ofMillis(refreshRate.getMillis()))
4951
.build(
5052
CacheLoader.asyncReloading(
51-
CacheLoader.from(key -> Optional.ofNullable(dao.getUnfinishedMinWatermark())),
53+
CacheLoader.from(
54+
key -> {
55+
Timestamp unfinishedMinTimes =
56+
dao.getUnfinishedMinWatermark(Optional.of(lastCachedMinWatermark));
57+
if (unfinishedMinTimes != null) {
58+
lastCachedMinWatermark = unfinishedMinTimes;
59+
}
60+
return Optional.ofNullable(unfinishedMinTimes);
61+
}),
5262
Executors.newSingleThreadExecutor(
5363
new ThreadFactoryBuilder().setNameFormat(THREAD_NAME_FORMAT).build())));
5464
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/NoOpWatermarkCache.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;
1919

2020
import com.google.cloud.Timestamp;
21+
import java.util.Optional;
2122
import javax.annotation.Nullable;
2223
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
2324

@@ -35,6 +36,6 @@ public NoOpWatermarkCache(PartitionMetadataDao dao) {
3536

3637
@Override
3738
public @Nullable Timestamp getUnfinishedMinWatermark() {
38-
return dao.getUnfinishedMinWatermark();
39+
return dao.getUnfinishedMinWatermark(Optional.empty());
3940
}
4041
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.List;
4848
import java.util.Map;
4949
import java.util.function.Function;
50+
import java.util.Optional;
5051
import java.util.stream.Collectors;
5152
import javax.annotation.Nullable;
5253
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
@@ -178,47 +179,56 @@ public List<String> findAllTableIndexes() {
178179
*
179180
* @return the earliest partition watermark which is not in a {@link State#FINISHED} state.
180181
*/
181-
public @Nullable Timestamp getUnfinishedMinWatermark() {
182+
public @Nullable Timestamp getUnfinishedMinWatermark(Optional<Timestamp> since) {
183+
Timestamp sinceTimestamp = since.orElse(Timestamp.MIN_VALUE);
182184
Statement statement;
185+
final String MIN_WATERMARK = "min_watermark";
183186
if (this.isPostgres()) {
184187
statement =
185188
Statement.newBuilder(
186-
"SELECT \""
189+
"SELECT min(\""
187190
+ COLUMN_WATERMARK
188-
+ "\" FROM \""
191+
+ "\") as "
192+
+ MIN_WATERMARK
193+
+ " FROM \""
189194
+ metadataTableName
190195
+ "\" WHERE \""
191196
+ COLUMN_STATE
192197
+ "\" != $1"
193-
+ " ORDER BY \""
198+
+ " AND \""
194199
+ COLUMN_WATERMARK
195-
+ "\" ASC LIMIT 1")
200+
+ "\" >= $2")
196201
.bind("p1")
197202
.to(State.FINISHED.name())
203+
.bind("p2")
204+
.to(sinceTimestamp)
198205
.build();
199206
} else {
200207
statement =
201-
Statement.newBuilder(
202-
"SELECT "
203-
+ COLUMN_WATERMARK
204-
+ " FROM "
205-
+ metadataTableName
206-
+ " WHERE "
207-
+ COLUMN_STATE
208-
+ " != @state"
209-
+ " ORDER BY "
210-
+ COLUMN_WATERMARK
211-
+ " ASC LIMIT 1")
208+
"SELECT min("
209+
+ COLUMN_WATERMARK
210+
+ ") as "
211+
+ MIN_WATERMARK
212+
+ " FROM "
213+
+ metadataTableName
214+
+ " WHERE "
215+
+ COLUMN_STATE
216+
+ " != @state"
217+
+ " AND "
218+
+ COLUMN_WATERMARK
219+
+ " >= @since;")
212220
.bind("state")
213-
.to(State.FINISHED.name())
214-
.build();
221+
.to(State.FINISHED.name())
222+
.bind("since")
223+
.to(sinceTimestamp)
224+
.build();
215225
}
216226
try (ResultSet resultSet =
217227
databaseClient
218228
.singleUse()
219229
.executeQuery(statement, Options.tag("query=getUnfinishedMinWatermark"))) {
220230
if (resultSet.next()) {
221-
return resultSet.getTimestamp(COLUMN_WATERMARK);
231+
return resultSet.getTimestamp(MIN_WATERMARK);
222232
}
223233
return null;
224234
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -460,19 +460,27 @@ private void mockGetPartitionsAfter(Timestamp timestamp, ResultSet getPartitionR
460460
}
461461

462462
private void mockGetWatermark(Timestamp watermark) {
463+
final String MIN_WATERMARK = "min_watermark";
464+
// The query needs to sync with getUnfinishedMinWatermark() in PartitionMetadataDao file.
463465
Statement watermarkStatement =
464466
Statement.newBuilder(
465-
"SELECT Watermark FROM my-metadata-table WHERE State != @state ORDER BY Watermark ASC LIMIT 1")
467+
"SELECT min(Watermark) as "
468+
+ MIN_WATERMARK
469+
+ " FROM my-metadata-table "
470+
+ "WHERE State != @state "
471+
+ "AND Watermark >= @since;")
466472
.bind("state")
467473
.to(State.FINISHED.name())
474+
.bind("since")
475+
.to(Timestamp.MIN_VALUE)
468476
.build();
469477
ResultSetMetadata watermarkResultSetMetadata =
470478
ResultSetMetadata.newBuilder()
471479
.setRowType(
472480
StructType.newBuilder()
473481
.addFields(
474482
Field.newBuilder()
475-
.setName("Watermark")
483+
.setName(MIN_WATERMARK)
476484
.setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP).build())
477485
.build())
478486
.build())

0 commit comments

Comments
 (0)