-
Notifications
You must be signed in to change notification settings - Fork 84
Open
Description
HI All ,
amazon-kinesis-data-analytics-java-examples/HudiConnector/
I dont understand why we cant run the below code in the HUDI connector example
private static StreamingFileSink<UserInfo> createS3SinkFromStaticConfig() {
final StreamingFileSink<UserInfo> sink = StreamingFileSink
.forBulkFormat(new Path(local), ParquetAvroWriters.forReflectRecord(UserInfo.class))
// Use hive style partitioning
.withBucketAssigner(new DateTimeBucketAssigner<>("'year='yyyy'/month='MM'/day='dd'/hour='HH/"))
.withOutputFileConfig(OutputFileConfig.builder()
.withPartSuffix(".parquet")
.build())
.build();
return sink;
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().useBlinkPlanner().build());
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setString("execution.checkpointing.interval", "1 min");
LOG.info("main started");
DataStream<String> input = createSourceFromStaticConfig(env);
LOG.info("read from source");
DataStream<UserInfo> UserInfoDataStream = input.map((MapFunction<String, UserInfo>) CopData::getDataObject);
UserInfoDataStream.addSink(createS3SinkFromStaticConfig()).name("S3 Parquet Sink");
env.execute("Flink S3 Streaming Sink Job");
}
the problem is in only this method createS3SinkFromStaticConfig
I get this Exception
used by: java.io.IOException: unexpected exception type
at java.base/java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1641)
at java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1271)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2205)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2464)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2358)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1679)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:493)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:451)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:600)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:587)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:541)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:322)
... 12 more
Caused by: java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at java.base/java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:237)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at java.base/java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1265)
... 41 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
at org.apache.flink.formats.parquet.avro.ParquetAvroWriters.$deserializeLambda$(ParquetAvroWriters.java:40)
... 51 more
when I remove below dependency from the pom this works
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-flink-bundle_${scala.binary.version}</artifactId>
<version>${hudi.version}</version>
<exclusions>
<exclusion>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</exclusion>
</exclusions>
</dependency>
NOTE: nothing is changed from example pom
please help
Metadata
Metadata
Assignees
Labels
No labels