-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Open
Labels
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
我用dinky平台,在k8s上运行flink 任务,用的是applicaion模式,pod启动后报错
(408d5ba84207ecc714162bebb47f677c_6cdc5bb954874d922eaee11a8e7b5dd5_0_0) switched from INITIALIZING to FAILED on job-progress-taskmanager-1-1 @ 10.42.0.247 (dataPort=37893).
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.mindflow.dws.app.JobProgressApplication$12
ClassLoader info: URL ClassLoader:
file: '/tmp/tm_job-progress-taskmanager-1-1/blobStorage/job_96f62a81c094504c3d8562c1a9c937d6/blob_p-ac702d7581d28abc3dbee8d2a725d3eb19cbbc54-cd79280239b2190219f2c29876cfceae' (valid JAR)
Class not resolvable through given classloader.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:414) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:869) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:836) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:825) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:825) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:732) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:202) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:789) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist-1.20.0.jar:1.20.0]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.ClassNotFoundException: com.mindflow.dws.app.JobProgressApplication$12
at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51) ~[flink-dist-1.20.0.jar:1.20.0]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:197) ~[flink-dist-1.20.0.jar:1.20.0]
at java.lang.Class.forName0(Native Method) ~[?:?]
at java.lang.Class.forName(Unknown Source) ~[?:?]
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) ~[flink-dist-1.20.0.jar:1.20.0]
at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readClassDesc(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:533) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:521) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:475) ~[flink-dist-1.20.0.jar:1.20.0]
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:400) ~[flink-dist-1.20.0.jar:1.20.0]
... 16 more
What you expected to happen
jar包在session集群中是可以运行的,dinky上的资源应该也是配置正确的,现在不知道哪里有问题
How to reproduce
步骤如下:
- 制作镜像
Dockerfile如下:
FROM flink:1.20.0-java11
ADD ./extends/ /opt/flink/lib/
RUN rm -rf /opt/flink/lib/flink-table-planner-loader-*.jar
RUN mv /opt/flink/opt/flink-table-planner_2.12-*.jar /opt/flink/lib
extends目录下jar包:
commons-cli-1.3.1.jar
dinky-app-1.20-1.2.1-jar-with-dependencies.jar
flink-connector-jdbc-3.2.0-1.19.jar
flink-connector-kafka-3.3.0-1.20.jar
flink-connector-starrocks-1.2.8_flink-1.12-MF.jar
flink-hadoop-fs-1.20.0.jar
flink-s3-fs-hadoop-1.20.0.jar
flink-s3-fs-presto-1.20.0.jar
flink-sql-connector-mysql-cdc-3.2.1.jar
flink-sql-connector-postgres-cdc-3.2.1.jar
flink-table-planner_2.12-1.20.0.jar
kafka-clients-3.3.0.jar
mysql-connector-j-8.0.31.jar
postgresql-42.7.4.jar
然后build和push
docker build -t xxx/dinky-flink:1.20.0-java11 . --no-cache
docker push xxx/dinky-flink:1.20.0-java11
- 制作自定义jar
POM如下:
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.20.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<flink-cdc-version>2.2.1.MF</flink-cdc-version>
<kafka.version>3.3.0</kafka.version>
<kafka-flink.version>3.3.0-1.20</kafka-flink.version>
<starrocks.connector.version>1.2.10_flink-1.19</starrocks.connector.version>
</properties>
<dependencies>
<!-- flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-presto</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${kafka-flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- starrocks -->
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>${starrocks.connector.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.10.0</version>
<configuration>
<verbose>true</verbose>
<fork>true</fork>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<!--打jar包-->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>com.allen.capturewebdata.Main</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
打包命令
mvn clean assembly:assembly -f pom.xml
然后将target下的data-warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar,重命名data-test.jar,上传到minio,目录:dinky/resource/bigdata/data-test.jar。
- 配置dinky
【注册中心】->【集群配置】->【新建】
- 类型:kubernetes Application
- 名称:jar-test
- 暴漏端口类型:NodePort
- k8s命名空间:test
- k8s提交账号:flink
- Flink镜像地址:xxx/dinky-flink:1.20.0-java11(第一步打包的镜像)
- CPU:2G,内存4G
- Jar文件路径:local:///opt/flink/lib/dinky-app-1.20-1.2.1-jar-with-dependencies.jar
【配置中心】->【Resource配置】
- 是否启用Resource:启用
- 存储模式:OSS
- 上传根目录:resource
- URL:minio.xxx.work
- AK/SK:
- 桶名称:dinky
- path style:启用
【数据开发】-> 【资源】
点击同步目录结构,可以看到上传的jar包。
【数据开发】->【项目】->【创建作业】
- 程序路径:rs:/bigdata/data-test.jar
- 程序运行类:com.xxx.dws.app.JobProgressApplication
点击运行按钮,k8s创建Pod,运行报错:
Anything else
No response
Version
1.2.1
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Reactions are currently unavailable