-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
dinky:1.2.1
fink 1.7.2
CREATE TABLE mysql_cdc_table(
menu_id INT,
menu_name STRING,
order_num INT
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '192.168.6.192',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'all_lizc',
'server-time-zone' = 'UTC',
'scan.incremental.snapshot.enabled' = 'true',
'debezium.snapshot.mode'='latest-offset' ,-- 或者key是scan.startup.mode,initial表示要历史数据,latest-offset表示不要历史数据
'debezium.datetime.format.date'='yyyy-MM-dd',
'debezium.datetime.format.time'='HH-mm-ss',
'debezium.datetime.format.datetime'='yyyy-MM-dd HH-mm-ss',
'debezium.datetime.format.timestamp'='yyyy-MM-dd HH-mm-ss',
'debezium.datetime.format.timestamp.zone'='UTC+8',
'scan.incremental.snapshot.chunk.key-column' ='true',
'table-name' = 'sys_menu');
select *from mysql_cdc_table;
查询
2025-01-22 02:18:38.080 WARN org.apache.flink.streaming.api.operators.collect.CollectResultFetcher(222): Failed to get job status so we assume that the job has terminated. Some data might be lost. java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1060) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:933) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:857) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:210) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.cancelJob(CollectResultFetcher.java:232) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.close(CollectResultFetcher.java:157) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:108) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[flink-table-planner_2.12-1.17.2.jar:1.17.2]
at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811) ~[?:1.8.0_422]
at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126) ~[?:1.8.0_422]
at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499) ~[?:1.8.0_422]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486) ~[?:1.8.0_422]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_422]
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) ~[?:1.8.0_422]
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) ~[?:1.8.0_422]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_422]
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) ~[?:1.8.0_422]
at org.dinky.data.result.ResultRunnable.catchData(ResultRunnable.java:143) ~[dinky-core-1.2.1.jar:?]
at org.dinky.data.result.ResultRunnable.lambda$run$0(ResultRunnable.java:93) ~[dinky-core-1.2.1.jar:?]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_422]
at org.dinky.data.result.ResultRunnable.run(ResultRunnable.java:85) ~[dinky-core-1.2.1.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_422]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_422]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_422]
What you expected to happen
对应的mysql数据库有数据,查询能查询到数据
How to reproduce
2025-01-22 02:22:50.632 INFO org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager(234): Shutting down TaskExecutorStateChangelogStoragesManager.
2025-01-22 02:22:50.633 INFO org.apache.flink.runtime.state.TaskExecutorChannelStateExecutorFactoryManager(95): Shutting down TaskExecutorChannelStateExecutorFactoryManager.
2025-01-22 02:22:50.619 WARN org.apache.flink.streaming.api.operators.collect.CollectResultFetcher(222): Failed to get job status so we assume that the job has terminated. Some data might be lost. java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1060) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:933) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:857) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:210) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.cancelJob(CollectResultFetcher.java:232) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.close(CollectResultFetcher.java:157) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:108) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) ~[flink-dist-1.17.2.jar:1.17.2]
at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) ~[flink-table-planner_2.12-1.17.2.jar:1.17.2]
at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811) ~[?:1.8.0_422]
at java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126) ~[?:1.8.0_422]
at java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499) ~[?:1.8.0_422]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486) ~[?:1.8.0_422]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_422]
at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) ~[?:1.8.0_422]
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) ~[?:1.8.0_422]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_422]
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) ~[?:1.8.0_422]
at org.dinky.data.result.ResultRunnable.catchData(ResultRunnable.java:143) ~[dinky-core-1.2.1.jar:?]
at org.dinky.data.result.ResultRunnable.lambda$run$0(ResultRunnable.java:93) ~[dinky-core-1.2.1.jar:?]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_422]
at org.dinky.data.result.ResultRunnable.run(ResultRunnable.java:85) ~[dinky-core-1.2.1.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_422]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_422]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_422]
2025-01-22 02:22:50.633 ERROR org.dinky.data.result.ResultRunnable(103): java.lang.RuntimeException: Failed to fetch next result
2025-01-22 02:22:50.634 INFO org.dinky.data.result.ResultPool(62): Remove job result from cache. Job id: 139
2025-01-22 02:22:50.637 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager(1125): Closing TaskExecutor connection c0a022ec-82e0-4f3c-a688-2ad5195b38d3 because: The TaskExecutor is shutting down.
2025-01-22 02:22:50.637 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService(140): Stop job leader service.
2025-01-22 02:22:50.638 INFO org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager(314): Shutting down TaskExecutorLocalStateStoresManager.
2025-01-22 02:22:50.638 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager(968): Registered job manager a8d1377c44494e9f4c90ff180b37418a@akka://flink/user/rpc/jobmanager_23 for job 7c5f408bb04ecdc4a2b7372ea05254da.
2025-01-22 02:22:50.650 INFO org.apache.flink.runtime.jobmaster.JobMaster(1174): JobManager successfully registered at ResourceManager, leader id: a39d118cbda66f9ad48140168f674605.
2025-01-22 02:22:50.656 INFO org.apache.flink.runtime.io.disk.FileChannelManagerImpl(149): FileChannelManager removed spill file directory /tmp/flink-io-2b291a3a-3b44-4ed3-9607-43ba5a7d5d22
2025-01-22 02:22:50.657 INFO org.apache.flink.runtime.io.network.NettyShuffleEnvironment(372): Shutting down the network environment and its components.
2025-01-22 02:22:50.658 INFO org.apache.flink.runtime.io.disk.FileChannelManagerImpl(149): FileChannelManager removed spill file directory /tmp/flink-netty-shuffle-c7925e83-be8d-4328-9015-e247f7d9fb66
2025-01-22 02:22:50.660 INFO org.apache.flink.runtime.taskexecutor.KvStateService(122): Shutting down the kvState service and its components.
2025-01-22 02:22:50.660 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService(140): Stop job leader service.
2025-01-22 02:22:50.661 INFO org.apache.flink.runtime.filecache.FileCache(160): removed file cache directory /tmp/flink-dist-cache-e349e855-eeb2-47da-9c31-278ef8f07a2a
2025-01-22 02:22:50.678 INFO org.apache.flink.runtime.history.FsJobArchivist(91): Job 7c5f408bb04ecdc4a2b7372ea05254da has been archived at rs:/tmp/flink-job-archive/7c5f408bb04ecdc4a2b7372ea05254da.
2025-01-22 02:22:50.680 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager(635): Shut down cluster because application is in CANCELED, diagnostics DispatcherResourceManagerComponent has been closed..
2025-01-22 02:22:50.681 INFO org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent(165): Closing components.
2025-01-22 02:22:50.681 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher(1299): Job 7c5f408bb04ecdc4a2b7372ea05254da has been registered for cleanup in the JobResultStore after reaching a terminal state.
2025-01-22 02:22:50.681 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess(136): Stopping SessionDispatcherLeaderProcess.
2025-01-22 02:22:50.695 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher(456): Stopping dispatcher akka://flink/user/rpc/dispatcher_22.
2025-01-22 02:22:50.695 INFO org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl(167): Stopping resource manager service.
2025-01-22 02:22:50.695 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher(1198): Stopping all currently running jobs of dispatcher akka://flink/user/rpc/dispatcher_22.
2025-01-22 02:22:50.694 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor(521): Stopped TaskExecutor akka://flink/user/rpc/taskmanager_20.
2025-01-22 02:22:50.696 INFO org.apache.flink.runtime.security.token.DefaultDelegationTokenManager(382): Stopping credential renewal
2025-01-22 02:22:50.696 INFO org.apache.flink.runtime.jobmaster.JobMaster(436): Stopping the JobMaster for job 'demo1' (7c5f408bb04ecdc4a2b7372ea05254da).
2025-01-22 02:22:50.696 INFO org.apache.flink.runtime.security.token.DefaultDelegationTokenManager(386): Stopped credential renewal
2025-01-22 02:22:50.696 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager(269): Closing the slot manager.
2025-01-22 02:22:50.696 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager(240): Suspending the slot manager.
2025-01-22 02:22:50.705 INFO org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator(92): Closing the CollectSinkOperatorCoordinator.
2025-01-22 02:22:50.705 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator(278): Closing SourceCoordinator for source Source: mysql_cdc_table[19].
2025-01-22 02:22:50.706 INFO org.apache.flink.runtime.jobmaster.JobMaster(1226): Close ResourceManager connection 0784a8cbeccaed13bd39bb14e2edd87c: ResourceManager leader changed to new address null
2025-01-22 02:22:50.707 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator(283): Source coordinator for source Source: mysql_cdc_table[19] closed.
2025-01-22 02:22:50.707 INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore(153): Shutting down
Process FlinkSubmit/2 exit with status:FINISHED
Anything else
No response
Version
1.2.0
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct