Skip to content

[Bug] [flinkcdc yaml] flinkcdc yml not support transform #4372

@boerzi

Description

@boerzi

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

I successfully started the flink task with console commands, but an exception occurred when using dinky。
i add transform,start job error:
java.lang.NoSuchFieldError: names at org.apache.flink.cdc.runtime.parser.TransformParser.generateProjectionColumns(TransformParser.java:348) at org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.lambda$cacheSchema$7(PostTransformOperator.java:252) at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:271) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1654) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.cacheSchema(PostTransformOperator.java:260) at org.apache.flink.cdc.runtime.operators.transform.PostTransformOperator.processElement(PostTransformOperator.java:233) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Thread.java:834)
flink-version:1.81
flinkcdc-version:3.2.0
dinky:dinky-release-1.18-1.2.1

What you expected to happen

The yml task is working properly

How to reproduce

`SET 'execution.checkpointing.interval' = '60s';

EXECUTE PIPELINE WITHYAML (
source:
type: mysql
name: xka-order-business-db
hostname: 10.21.203.13
port: 3306
username: xxxx
password: xxx
tables: order.order_detail,order.order,marketing.goods_sku
scan.newly-added-table.enabled: true
server-id: 5430-5434
server-time-zone: Asia/Shanghai

-- transform:
-- - source-table: order.order_detail
-- projection: *, concat('20',substr(REGEXP_REPLACE(order_no,'[a-zA-Z]',''),0,6)) as dt
-- primary-keys: dt, id
-- partition-keys: dt
-- - source-table: order.order
-- projection: *, concat('20',substr(REGEXP_REPLACE(order_no,'[a-zA-Z]',''),0,6)) as dt
-- primary-keys: dt, order_no
-- partition-keys: dt
-- - source-table: marketing.goods_sku
-- projection: *
-- partition-keys: dt

sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://xxx:9030
load-url: xx:8080
username: xx
password: ''
table.create.properties.replication_num: 1

pipeline:
name: Sync MySQL Database to StarRocks
parallelism: 1

route:

  • source-table: order.order_detail
    sink-table: ods.ods_order_detail
  • source-table: order.order
    sink-table: ods.ods_order
  • source-table: marketing.goods_sku
    sink-table: ods.ods_goods_sku
    )
    `

Anything else

No response

Version

1.2.1

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

BugSomething isn't workingInvalidInvalid

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions