|
27 | 27 | import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; |
28 | 28 |
|
29 | 29 | import org.apache.flink.table.api.StatementSet; |
30 | | -import org.apache.flink.table.api.ValidationException; |
31 | 30 | import org.apache.flink.table.planner.factories.TestValuesTableFactory; |
32 | 31 | import org.apache.flink.types.Row; |
33 | 32 | import org.apache.flink.types.RowKind; |
@@ -227,7 +226,7 @@ public void testContinuousLatest() throws Exception { |
227 | 226 |
|
228 | 227 | BlockingIterator<Row, Row> iterator = |
229 | 228 | BlockingIterator.of( |
230 | | - streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('log.scan'='latest') */")); |
| 229 | + streamSqlIter("SELECT * FROM T1 /*+ OPTIONS('scan.mode'='latest') */")); |
231 | 230 |
|
232 | 231 | batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')"); |
233 | 232 | assertThat(iterator.collect(2)) |
@@ -255,7 +254,7 @@ public void testContinuousLatestStartingFromEmpty() throws Exception { |
255 | 254 | @Test |
256 | 255 | public void testContinuousFromTimestamp() throws Exception { |
257 | 256 | String sql = |
258 | | - "SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp', 'log.scan.timestamp-millis'='%s') */"; |
| 257 | + "SELECT * FROM T1 /*+ OPTIONS('scan.mode'='from-timestamp', 'scan.timestamp-millis'='%s') */"; |
259 | 258 |
|
260 | 259 | // empty table |
261 | 260 | BlockingIterator<Row, Row> iterator = BlockingIterator.of(streamSqlIter(sql, 0)); |
@@ -323,31 +322,31 @@ public void testLackStartupTimestamp() { |
323 | 322 | assertThatThrownBy( |
324 | 323 | () -> |
325 | 324 | streamSqlIter( |
326 | | - "SELECT * FROM T1 /*+ OPTIONS('log.scan'='from-timestamp') */")) |
| 325 | + "SELECT * FROM T1 /*+ OPTIONS('scan.mode'='from-timestamp') */")) |
327 | 326 | .hasCauseInstanceOf(IllegalArgumentException.class) |
328 | 327 | .hasRootCauseMessage( |
329 | 328 | "must set only one key in [scan.timestamp-millis,scan.timestamp] when you use from-timestamp for scan.mode"); |
330 | 329 | } |
331 | 330 |
|
332 | 331 | @Test |
333 | 332 | public void testConfigureStartupTimestamp() throws Exception { |
334 | | - // Configure 'log.scan.timestamp-millis' without 'log.scan'. |
| 333 | + // Configure 'scan.timestamp-millis' without 'scan.mode'. |
335 | 334 | BlockingIterator<Row, Row> iterator = |
336 | 335 | BlockingIterator.of( |
337 | 336 | streamSqlIter( |
338 | | - "SELECT * FROM T1 /*+ OPTIONS('log.scan.timestamp-millis'='%s') */", |
| 337 | + "SELECT * FROM T1 /*+ OPTIONS('scan.timestamp-millis'='%s') */", |
339 | 338 | 0)); |
340 | 339 | batchSql("INSERT INTO T1 VALUES ('1', '2', '3'), ('4', '5', '6')"); |
341 | 340 | batchSql("INSERT INTO T1 VALUES ('7', '8', '9'), ('10', '11', '12')"); |
342 | 341 | assertThat(iterator.collect(2)) |
343 | 342 | .containsExactlyInAnyOrder(Row.of("1", "2", "3"), Row.of("4", "5", "6")); |
344 | 343 | iterator.close(); |
345 | 344 |
|
346 | | - // Configure 'log.scan.timestamp-millis' with 'log.scan=latest'. |
| 345 | + // Configure 'scan.timestamp-millis' with 'scan.mode=latest'. |
347 | 346 | assertThatThrownBy( |
348 | 347 | () -> |
349 | 348 | streamSqlIter( |
350 | | - "SELECT * FROM T1 /*+ OPTIONS('log.scan'='latest', 'log.scan.timestamp-millis'='%s') */", |
| 349 | + "SELECT * FROM T1 /*+ OPTIONS('scan.mode'='latest', 'scan.timestamp-millis'='%s') */", |
351 | 350 | 0)) |
352 | 351 | .hasCauseInstanceOf(IllegalArgumentException.class) |
353 | 352 | .hasRootCauseMessage( |
@@ -466,28 +465,6 @@ public void testIgnoreOverwrite() throws Exception { |
466 | 465 | iterator.close(); |
467 | 466 | } |
468 | 467 |
|
469 | | - @Test |
470 | | - public void testUnsupportedUpsert() { |
471 | | - assertThatThrownBy( |
472 | | - () -> |
473 | | - streamSqlIter( |
474 | | - "SELECT * FROM T1 /*+ OPTIONS('log.changelog-mode'='upsert') */")) |
475 | | - .hasCauseInstanceOf(ValidationException.class) |
476 | | - .hasRootCauseMessage( |
477 | | - "File store continuous reading does not support upsert changelog mode."); |
478 | | - } |
479 | | - |
480 | | - @Test |
481 | | - public void testUnsupportedEventual() { |
482 | | - assertThatThrownBy( |
483 | | - () -> |
484 | | - streamSqlIter( |
485 | | - "SELECT * FROM T1 /*+ OPTIONS('log.consistency'='eventual') */")) |
486 | | - .hasCauseInstanceOf(ValidationException.class) |
487 | | - .hasRootCauseMessage( |
488 | | - "File store continuous reading does not support eventual consistency mode."); |
489 | | - } |
490 | | - |
491 | 468 | @Test |
492 | 469 | public void testFlinkMemoryPool() { |
493 | 470 | // Check if the configuration is effective |
|
0 commit comments