[Improve][Flink]supports multiple parallelisms and remove flink-specific logic from API#10107
Conversation
1e42e4d to
460e584
Compare
460e584 to
ae2c6b3
Compare
.../src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
Outdated
Show resolved
Hide resolved
4886556 to
35fa5a4
Compare
| } | ||
| } | ||
|
|
||
| private void sendSchemaChangeEventToDownstream(SchemaChangeEvent schemaChangeEvent) { |
There was a problem hiding this comment.
| private void sendSchemaChangeEventToDownstream(SchemaChangeEvent schemaChangeEvent) { | |
| private void sendSchemaChangeEventToDownStream(SchemaChangeEvent schemaChangeEvent) { |
There was a problem hiding this comment.
Is there a problem here?
| private final Config pluginConfig; | ||
| private volatile Long lastProcessedEventTime; | ||
| private transient LocalSchemaCoordinator coordinator; | ||
| private transient Map<String, List<BufferedDataRow>> bufferedDataRows; |
There was a problem hiding this comment.
Doesn't it need to be stored in the checkpoint?
There was a problem hiding this comment.
It's also possible to store the data, but I've already stored it at the sink end. The operators closer to the source end don't necessarily need to be materialized either, especially if the source can provide a replay strategy.
The coordinator does not need to be persisted because it is only a communication component;The BroadcastSchemaSinkOperator is the component that actually holds the coordinator's response state. The coordinator can be rebuilt after a crash, and the job manager will read the state from the sink operator, buffered data can be persisted, but the latest processing time does not need to be persisted because it has already been persisted by the sink.
There was a problem hiding this comment.
I'm going to finalize the following variables; the buffered data rows can be rebuilt through reprocessing:
localSchemaState - Stores the schema state of each table; updated when the schema changes.
lastProcessedEventTime - The time of the last processed event, used to ensure event order.
schemaChangePending - Indicates whether a schema change is currently in progress.
There was a problem hiding this comment.
After the downstream schema change is completed and a checkpoint is performed, failures occur during the process of re-consuming the cache. Is there a possibility that this batch of data will be lost? Because seemingly, this segment of data does not affect the checkpoint process of source and sink data, and it cannot be recovered from the state upon restart.
There was a problem hiding this comment.
You're right, this situation can indeed happen. After the checkpoint, the source side submits the offset, assuming that this portion of the data has been processed and won't be replayed, so the data is lost. Caching is indeed necessary.
| } | ||
| } | ||
|
|
||
| private void performPeriodicCleanup() { |
There was a problem hiding this comment.
Under what circumstances will an expired task occur?
There was a problem hiding this comment.
For example, the following scenario could lead to a timeout:
- The SchemaOperator initiates an alter table add column n request.
- The coordinator waits for acknowledgments from 3 sink subtasks (parallelism=3).
- Subtask-0 and Subtask-1 successfully apply the schema change and send acknowledgments.
- Subtask-2 crashes or is restarted during the application process.
- Subtask-2 does not recover and send an acknowledgment within 5 minutes.
- The request times out and is cleaned up.
| long eventTime = schemaChangeEvent.getCreatedTime(); | ||
|
|
||
| try { | ||
| if (lastProcessedEventTime != null && eventTime <= lastProcessedEventTime) { |
There was a problem hiding this comment.
Could you give an example of the scene that occurred?
There was a problem hiding this comment.
Each time a change event is processed, lastProcessedEventTime is updated. Any event with a timestamp less than or equal to the current watermark is rejected; otherwise, it would lead to inconsistencies in the evolution process.
There was a problem hiding this comment.
For example, consider executing the following SQL statements:
Original table structure: orders(id, user_id, amount)
-- T1=1000: Add discount field
ALTER TABLE orders ADD COLUMN discount DECIMAL(10,2);
-- T2=2000: Add status field
ALTER TABLE orders ADD COLUMN status VARCHAR(20);
-- T3=3000: Drop discount field
ALTER TABLE orders DROP COLUMN discount;
The expected final correct schema should be: orders(id, user_id, amount, status)
- The correct epoch processing order should be: 1000 → 3000 → Completion
- However, if T2 (2000) is processed late, the following will happen:
- After processing T3 (3000), the Sink considers the current schema to be: orders(id, user_id, amount). If this late data is not rejected, and T2 (2000) is suddenly processed, the schema becomes: orders(id, user_id, amount, status). At this point, if data is written:
SeaTunnelRow row = new SeaTunnelRow(new Object[]{
1001L, // id
501L, // user_id
new BigDecimal("99.99"), // amount
"PAID" // status - but the Sink might interpret this as the discount field
});
This would result in a mapping mismatch, so I made a judgment here.
| jobId, | ||
| eventTime); | ||
|
|
||
| String key = tableId.toString() + "#" + eventTime; |
There was a problem hiding this comment.
One method can be used. I see that it has been applied in several places
95fc353 to
c9bd451
Compare
a39c03d to
83897d0
Compare
TyrantLucifer
left a comment
There was a problem hiding this comment.
LGTM, thank you for your contribution.
LiJie20190102
left a comment
There was a problem hiding this comment.
LGTM, thank you for your contribution.
|
@CloverDew Thank you for your contribution |
Purpose of this pull request: Fixes #9980
Problem
After implementing CDC schema evolution support in Flink engine, several issues were identified:
Solution
This PR made some minor adjustments to the architecture.:
Key Changes:
1. Enhanced LocalSchemaCoordinator
Map<String, WeakReference<LocalSchemaCoordinator>>3. Streamlined BroadcastSchemaSinkOperator
lastProcessedEpochusing Flink state4. API Compliance (Addresses #9980)
SupportSchemaEvolutionSinkWritercontains only genericapplySchemaChange()methodTesting
Breaking Changes
None. This is a refactoring that maintains API compatibility while improving internal implementation.
Related Issues
Check list
[New License Guide](https://github.com/apache/seatunnel/blob/dev/docs/en/contribution/new-license.md)
incompatible-changes.mdto describe the incompatibility caused by this PR.