Skip to content

[Bug] Flink testAsyncLookupNonPkAppendTable failed #6878

@zhangdove

Description

@zhangdove

Search before asking

  • I searched in the issues and found nothing similar.

Paimon version

release-1.2.0

Compute Engine

Flink-1.20.0

Minimal reproduce step

Add the following UT (Unit Test) code in the LookupJoinITCase class

    @Test
    public void testAsyncLookupNonPkAppendTable() throws Exception {
        sql(
                "CREATE TABLE DIM_NO_PK (i INT, j INT, dt varchar) "
                        + "PARTITIONED BY (`dt`) WITH ('continuous.discovery-interval'='1 ms')");

        String query =
                "SELECT T.i, D.j FROM T LEFT JOIN DIM_NO_PK /*+ OPTIONS('scan.partitions' = 'max_pt()','lookup.dynamic-partition.refresh-interval'='1 ms','lookup.async'='true') */" +
                        " for system_time as of T.proctime AS D ON T.i = D.i ";
        BlockingIterator<Row, Row> iterator = BlockingIterator.of(sEnv.executeSql(query).collect());

        sql(
                "INSERT INTO DIM_NO_PK VALUES (1, 11, '2025-12-24'), (1, 12, '2025-12-24'), (1, 11, '2025-12-24')");
        Thread.sleep(2000); // wait refresh
        sql("INSERT INTO T VALUES (1)");
        List<Row> result = iterator.collect(3);
        assertThat(result)
                .containsExactlyInAnyOrder(
                        Row.of(1, 11),
                        Row.of(1, 11),
                        Row.of(1, 12));
        iterator.close();
    }

What doesn't meet your expectations?

I received the following exception information

Caused by: java.lang.IllegalArgumentException: Row arity: 4, but serializer arity: 3
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:124)
	at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:52)
	at org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:64)
	at org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue.emitCompletedElement(OrderedStreamElementQueue.java:71)
	at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:400)
	at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:633)
	at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:613)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)

Anything else?

I tried debugging the code locally and found that Flink's PushProjectIntoTableSourceScanRule removes the partition field dt , causing the serializer generated by codegen to only have information for three columns, but receiving data with four columns. Currently, I'm not sure if Paimon doesn't support my current usage pattern?

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions