Skip to content

Commit 2354f05

Browse files
authored
[FLINK-38784][pipeline-connector][paimon] Bump Paimon version to 1.3.1 (#4189)
Signed-off-by: Pei Yu <[email protected]>
1 parent 001cba0 commit 2354f05

File tree

3 files changed

+16
-9
lines changed

3 files changed

+16
-9
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ limitations under the License.
2929
<artifactId>flink-cdc-pipeline-connector-paimon</artifactId>
3030

3131
<properties>
32-
<paimon.version>1.2.0</paimon.version>
32+
<paimon.version>1.3.1</paimon.version>
3333
<hadoop.version>2.8.5</hadoop.version>
3434
<hive.version>2.3.9</hive.version>
3535
<mockito.version>3.4.6</mockito.version>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.paimon.memory.MemoryPoolFactory;
3434
import org.apache.paimon.memory.MemorySegmentPool;
3535
import org.apache.paimon.operation.FileStoreWrite;
36+
import org.apache.paimon.operation.WriteRestore;
3637
import org.apache.paimon.table.FileStoreTable;
3738
import org.apache.paimon.table.sink.CommitMessage;
3839
import org.apache.paimon.table.sink.SinkRecord;
@@ -118,7 +119,7 @@ private TableWriteImpl<?> newTableWrite(FileStoreTable table) {
118119
"memoryPool and memoryPoolFactory cannot be set at the same time.");
119120

120121
TableWriteImpl<?> tableWrite =
121-
table.newWrite(commitUser, (part, bucket) -> true, null)
122+
table.newWrite(commitUser)
122123
.withIOManager(paimonIOManager)
123124
.withIgnorePreviousFiles(ignorePreviousFiles);
124125

@@ -129,19 +130,25 @@ private TableWriteImpl<?> newTableWrite(FileStoreTable table) {
129130
if (memoryPoolFactory != null) {
130131
return tableWrite.withMemoryPoolFactory(memoryPoolFactory);
131132
} else {
132-
return tableWrite.withMemoryPool(
133-
memoryPool != null
134-
? memoryPool
135-
: new HeapMemorySegmentPool(
136-
table.coreOptions().writeBufferSize(),
137-
table.coreOptions().pageSize()));
133+
return (TableWriteImpl<?>)
134+
tableWrite.withMemoryPool(
135+
memoryPool != null
136+
? memoryPool
137+
: new HeapMemorySegmentPool(
138+
table.coreOptions().writeBufferSize(),
139+
table.coreOptions().pageSize()));
138140
}
139141
}
140142

141143
public void withCompactExecutor(ExecutorService compactExecutor) {
142144
write.withCompactExecutor(compactExecutor);
143145
}
144146

147+
@Override
148+
public void setWriteRestore(WriteRestore writeRestore) {
149+
this.write.withWriteRestore(writeRestore);
150+
}
151+
145152
@Override
146153
public SinkRecord write(InternalRow internalRow) throws Exception {
147154
return write.writeAndReturn(internalRow);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ public void processElement(StreamRecord<Event> streamRecord) throws Exception {
184184
bucket = 0;
185185
break;
186186
}
187-
case CROSS_PARTITION:
187+
case KEY_DYNAMIC:
188188
default:
189189
{
190190
throw new RuntimeException("Unsupported bucket mode: " + tuple4.f0);

0 commit comments

Comments
 (0)