Skip to content
This repository was archived by the owner on Dec 14, 2022. It is now read-only.

[BUG] Flink pulsar source upgrade from 1.13.1.4 to 1.13.6.2 fails #608

@nikolasten

Description

@nikolasten

Flink pulsar source upgrade from 1.13.1.4 to 1.13.6.2 fails
When upgrading flink pipeline that was using 1.13.1.4 pulsar flink connector, more specifically org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource to 1.13.6.2 pulsar flink connector, upgrade fails with error

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)\nCaused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for StreamSource_e851a344fc332b3e7b727e57889fc262_(1/1) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:285)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:173)
... 10 common frames omitted\nCaused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createOperatorStateBackend(EmbeddedRocksDBStateBackend.java:485)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:276)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 12 common frames omitted\nCaused by: java.io.InvalidClassException: org.apache.flink.streaming.connectors.pulsar.internal.SerializableRange; local class incompatible: stream classdesc serialVersionUID = -6297347936093846291, local class serialVersionUID = -4628744661831747115
at java.base/java.io.ObjectStreamClass.initNonProxy(Unknown Source)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
at java.base/java.io.ObjectInputStream.readClassDesc(Unknown Source)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject0(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at java.base/java.io.ObjectInputStream.readObject(Unknown Source)
at org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscriptionSerializer.toObject(TopicSubscriptionSerializer.java:103)
at org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscriptionSerializer.deserialize(TopicSubscriptionSerializer.java:121)
at org.apache.flink.streaming.connectors.pulsar.internal.TopicSubscriptionSerializer.deserialize(TopicSubscriptionSerializer.java:32)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:151)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:37)
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.deserializeOperatorStateValues(OperatorStateRestoreOperation.java:217)
at org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:188)
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:80)
... 16 common frames omitted

To Reproduce
Steps to reproduce the behavior:

  1. Deploy flink pipeline with pulsar flink connector 1.13.1.4 and use org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSource as streaming source.
  2. Enable checkpointing, close flink job with a savepoint
  3. Upgrade flink-pulsar dependecy to 1.13.6.2
  4. Deploy flink job from savepoint

Expected behavior
Upgrade was successful

Additional context
Seems like same issue is happening here apache/flink-cdc#78

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions