Skip to content

Commit cf724bc

Browse files
authored
fix(worker): update CommitResponse to use partition type from writerFactory (#2677)
* fix(worker): update CommitResponse to use partition type from writerFactory * fix(worker): mock partitionSpec in TopicPartitionsWorkerTest for unpartitioned partitions * fix(worker): reorganize imports in TopicPartitionsWorkerTest for clarity
1 parent 4eeb7d6 commit cf724bc

File tree

2 files changed

+3
-1
lines changed

2 files changed

+3
-1
lines changed

core/src/main/java/kafka/automq/table/worker/TopicPartitionsWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -685,7 +685,7 @@ private CompletableFuture<Void> commitResponse(RequestWrapper requestWrapper, bo
685685
.collect(Collectors.toList());
686686
TopicMetric topicMetric = new TopicMetric(fieldCount);
687687

688-
CommitResponse commitResponse = new CommitResponse(Types.StructType.of(), fastCommit ? Errors.MORE_DATA : Errors.NONE,
688+
CommitResponse commitResponse = new CommitResponse(writerFactory.partitionSpec().partitionType(), fastCommit ? Errors.MORE_DATA : Errors.NONE,
689689
requestWrapper.request.commitId(), topic, nextOffsets, dataFiles, deleteFiles, topicMetric, partitionMetrics);
690690
return channel.asyncSend(topic, new Event(System.currentTimeMillis(), EventType.COMMIT_RESPONSE, commitResponse))
691691
.thenAccept(rst -> LOGGER.info("[COMMIT_RESPONSE],{}", commitResponse));

core/src/test/java/kafka/automq/table/worker/TopicPartitionsWorkerTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.automq.stream.utils.Threads;
3737
import com.automq.stream.utils.threads.EventLoop;
3838

39+
import org.apache.iceberg.PartitionSpec;
3940
import org.junit.jupiter.api.BeforeEach;
4041
import org.junit.jupiter.api.Tag;
4142
import org.junit.jupiter.api.Test;
@@ -102,6 +103,7 @@ public void setup() {
102103
when(channel.asyncSend(eq(TOPIC), any())).thenReturn(CompletableFuture.completedFuture(null));
103104

104105
writerFactory = mock(WriterFactory.class);
106+
when(writerFactory.partitionSpec()).thenReturn(PartitionSpec.unpartitioned());
105107
writers = new ArrayList<>();
106108
when(writerFactory.newWriter()).thenAnswer(invocation -> {
107109
MemoryWriter writer = new MemoryWriter(config);

0 commit comments

Comments
 (0)