Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
1ac7121
Conversion to guidToUtf8 usage. 👠
KaiSernLim Nov 11, 2025
7eb9cc5
OffsetRecord LCVP sync moved to be after the current message is queue…
KaiSernLim Nov 18, 2025
27add66
Add a TODO for handling skipping Global RT DIV messages from repushes…
KaiSernLim Dec 9, 2025
166013f
Added support for `Delete` in `shouldSyncOffsetFromSnapshot()`. 🏈
KaiSernLim Dec 9, 2025
2c80173
Minor style tweaks. 🛝
KaiSernLim Dec 9, 2025
608c8a7
Added integration test to create scenarios to stop server and restart…
KaiSernLim Dec 9, 2025
a83b1cd
Parametrized the integration test. ☪️
KaiSernLim Dec 9, 2025
b8403c5
Skip Global RT DIV messages in `PubSubSplitIterator`. 🩷
KaiSernLim Dec 9, 2025
1786763
Include repush integration test. 🫨
KaiSernLim Dec 9, 2025
fa278be
Removed outdated comment. 🙃
KaiSernLim Dec 9, 2025
f4ae940
Simplified GuidUtils import. 👻
KaiSernLim Dec 9, 2025
2ea71e5
Removed `performKafkaInputRepush()`. 🥠
KaiSernLim Dec 10, 2025
1ba29cc
Review changes. 📲
KaiSernLim Dec 11, 2025
f526e88
Removed the try-finally. 📻
KaiSernLim Dec 11, 2025
fc743a5
Do not sync vt div node if the previous record failed to persist. ⚱️
KaiSernLim Dec 16, 2025
f351d86
Quick unit test for test coverage. 👾
KaiSernLim Dec 17, 2025
fe775ce
Refactor LeaderProducerCallback to introduce onCompletionCallback for…
KaiSernLim Dec 19, 2025
5320030
Revised the CompletableFuture from both leader and follower code path…
KaiSernLim Dec 23, 2025
268cf3b
Last review change. 🫩
KaiSernLim Jan 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ public abstract CompletableFuture<Void> execSyncOffsetCommandAsync(
public abstract void execSyncOffsetFromSnapshotAsync(
PubSubTopicPartition topicPartition,
PartitionTracker vtDivSnapshot,
CompletableFuture<Void> lastRecordPersistedFuture,
StoreIngestionTask ingestionTask) throws InterruptedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2485,19 +2485,8 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
// Update the latest consumed VT position (LCVP) since we're consuming from the version topic
if (isGlobalRtDivEnabled()) {
getConsumerDiv().updateLatestConsumedVtPosition(partition, consumerRecord.getPosition());

if (shouldSyncOffsetFromSnapshot(consumerRecord, partitionConsumptionState)) {
PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl(getVersionTopic(), partition);
PartitionTracker vtDiv = consumerDiv.cloneVtProducerStates(partition); // has latest consumed VT position
storeBufferService.execSyncOffsetFromSnapshotAsync(topicPartition, vtDiv, this);
// TODO: remove. this is a temporary log for debugging while the feature is in its infancy
int partitionStateMapSize = vtDiv.getPartitionStates(PartitionTracker.VERSION_TOPIC).size();
LOGGER.info(
"event=globalRtDiv Syncing LCVP for OffsetRecord topic-partition: {} position: {} size: {}",
topicPartition,
consumerRecord.getPosition(),
partitionStateMapSize);
}
// Only after the current message is queued to drainer
// The Offset Record's LCVP may be synced in syncOffsetFromSnapshotIfNeeded()
}

/**
Expand Down Expand Up @@ -2788,16 +2777,48 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
}
}

void syncOffsetFromSnapshotIfNeeded(DefaultPubSubMessage record, PubSubTopicPartition topicPartition) {
int partition = topicPartition.getPartitionNumber();
if (!isGlobalRtDivEnabled() || !shouldSyncOffsetFromSnapshot(record, getPartitionConsumptionState(partition))) {
return; // without Global RT DIV enabled, the offset record is synced in the drainer in syncOffset()
}

PartitionConsumptionState pcs = getPartitionConsumptionState(topicPartition.getPartitionNumber());
if (pcs == null || pcs.getLastQueuedRecordPersistedFuture() == null) {
LOGGER.warn(
"event=globalRtDiv No PCS or lastRecordPersistedFuture found for topic-partition: {}. "
+ "Will not sync OffsetRecord without waiting for any record to be persisted",
topicPartition);
return;
}

try {
PartitionTracker vtDiv = consumerDiv.cloneVtProducerStates(partition); // has latest consumed VT position
CompletableFuture<Void> lastFuture = pcs.getLastQueuedRecordPersistedFuture();
storeBufferService.execSyncOffsetFromSnapshotAsync(topicPartition, vtDiv, lastFuture, this);

// TODO: remove. this is a temporary log for debugging while the feature is in its infancy
LOGGER.info(
"event=globalRtDiv Syncing LCVP for OffsetRecord topic-partition: {} position: {} size: {}",
topicPartition,
record.getPosition(),
vtDiv.getPartitionStates(PartitionTracker.VERSION_TOPIC).size());
} catch (InterruptedException e) {
LOGGER.error("event=globalRtDiv Unable to sync Offset Record to update the latest consumed vt position", e);
}
}

/**
* Followers should sync the VT DIV to the OffsetRecord if the consumer sees a Global RT DIV message
* (sync only once for a Global RT DIV, which can either be one singular message or multiple chunks + one manifest.
* thus, the condition is to check that it's not a chunk) or if it sees a non-segment control message.
* Followers should sync the VT DIV to the OffsetRecord if the consumer sees a non-segment control message or a
* Global RT DIV message.
* Each Global RT DIV sync will create one singular Put or multiple Puts (chunks + one manifest + Deletes). Thus
* if we want to sync only once, checking if it's a singular Put or the manifest Put should only trigger once.
*/
boolean shouldSyncOffsetFromSnapshot(DefaultPubSubMessage consumerRecord, PartitionConsumptionState pcs) {
if (consumerRecord.getKey().isGlobalRtDiv()) {
Put put = (Put) consumerRecord.getValue().getPayloadUnion();
if (put.getSchemaId() != CHUNK_SCHEMA_ID) {
return true;
Object payloadUnion = consumerRecord.getValue().getPayloadUnion();
if (payloadUnion instanceof Put && ((Put) payloadUnion).getSchemaId() != CHUNK_SCHEMA_ID) {
return true; // Global RT DIV message can be multiple chunks + deletes, only sync on one Put (manifest or value)
}
}
return isNonSegmentControlMessage(consumerRecord, null);
Expand Down Expand Up @@ -3538,19 +3559,19 @@ private byte[] createGlobalRtDivValueBytes(
}

private LeaderProducerCallback createGlobalRtDivCallback(
DefaultPubSubMessage previousMessage,
PartitionConsumptionState partitionConsumptionState,
DefaultPubSubMessage prevMessage,
PartitionConsumptionState pcs,
int partition,
String brokerUrl,
long beforeProcessingRecordTimestampNs,
LeaderProducedRecordContext context,
LeaderProducedRecordContext prevContext,
byte[] keyBytes,
byte[] valueBytes,
PubSubTopicPartition topicPartition,
PartitionTracker vtDiv) {
final int schemaId = AvroProtocolDefinition.GLOBAL_RT_DIV_STATE.getCurrentProtocolVersion();
KafkaKey divKey = new KafkaKey(MessageType.GLOBAL_RT_DIV, keyBytes);
KafkaMessageEnvelope divEnvelope = getVeniceWriter(partitionConsumptionState).get()
KafkaMessageEnvelope divEnvelope = getVeniceWriter(pcs).get()
.getKafkaMessageEnvelope(
MessageType.PUT,
false,
Expand All @@ -3567,25 +3588,23 @@ private LeaderProducerCallback createGlobalRtDivCallback(
divKey,
divEnvelope,
topicPartition,
previousMessage.getPosition(),
prevMessage.getPosition(),
System.currentTimeMillis(),
divKey.getKeyLength() + valueBytes.length);
LeaderProducerCallback divCallback = createProducerCallback(
divMessage,
partitionConsumptionState,
LeaderProducedRecordContext
.newPutRecord(context.getConsumedKafkaClusterId(), context.getConsumedPosition(), keyBytes, put),
partition,
brokerUrl,
beforeProcessingRecordTimestampNs);

// After producing RT DIV to local VT, the VT DIV should be sent to the drainer to sync to the OffsetRecord
divCallback.setOnCompletionFunction(produceResult -> {
LeaderProducedRecordContext context = LeaderProducedRecordContext
.newPutRecord(prevContext.getConsumedKafkaClusterId(), prevContext.getConsumedPosition(), keyBytes, put);
LeaderProducerCallback divCallback =
createProducerCallback(divMessage, pcs, context, partition, brokerUrl, beforeProcessingRecordTimestampNs);

// After producing the RT DIV to local VT and LeaderProducerCallback.onCompletion() enqueuing RT DIV in the drainer,
// the VT DIV should be sent to the drainer to persist the LCVP onto disk by syncing the OffsetRecord
divCallback.setOnCompletionCallback(produceResult -> {
try {
CompletableFuture<Void> lastRecordPersistedFuture = context.getPersistedToDBFuture();
vtDiv.updateLatestConsumedVtPosition(produceResult.getPubSubPosition()); // LCVP = produced position in local VT
storeBufferService.execSyncOffsetFromSnapshotAsync(topicPartition, vtDiv, this);
storeBufferService.execSyncOffsetFromSnapshotAsync(topicPartition, vtDiv, lastRecordPersistedFuture, this);
} catch (InterruptedException e) {
LOGGER.error("Failed to sync VT DIV to OffsetRecord for replica: {}", topicPartition, e);
LOGGER.error("event=globalRtDiv Failed to sync VT DIV to OffsetRecord for replica: {}", topicPartition, e);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public class LeaderProducerCallback implements ChunkAwareCallback {
private static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER =
RedundantExceptionFilter.getRedundantExceptionFilter();
private static final Consumer<PubSubProduceResult> NO_OP = produceResult -> {};
private Consumer<PubSubProduceResult> onCompletionFunction = NO_OP;
private Consumer<PubSubProduceResult> onCompletionFunction = NO_OP; // ran before onCompletion() runs
private Consumer<PubSubProduceResult> onCompletionCallback = NO_OP; // ran after onCompletion() runs

protected static final ChunkedValueManifestSerializer CHUNKED_VALUE_MANIFEST_SERIALIZER =
new ChunkedValueManifestSerializer(false);
Expand Down Expand Up @@ -208,6 +209,7 @@ public void onCompletion(PubSubProduceResult produceResult, Exception e) {
LatencyUtils.getElapsedTimeFromMsToMs(currentTimeForMetricsMs),
currentTimeForMetricsMs);
}
this.onCompletionCallback.accept(produceResult);
} catch (Exception oe) {
boolean endOfPushReceived = partitionConsumptionState.isEndOfPushReceived();
LOGGER.error(
Expand Down Expand Up @@ -357,6 +359,10 @@ public void setOnCompletionFunction(Consumer<PubSubProduceResult> onCompletionFu
this.onCompletionFunction = onCompletionFunction;
}

public void setOnCompletionCallback(Consumer<PubSubProduceResult> onCompletionCallback) {
this.onCompletionCallback = onCompletionCallback;
}

// Visible for VeniceWriter unit test.
public PartitionConsumptionState getPartitionConsumptionState() {
return partitionConsumptionState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,10 @@ public CompletableFuture<Void> execSyncOffsetCommandAsync(
public void execSyncOffsetFromSnapshotAsync(
PubSubTopicPartition topicPartition,
PartitionTracker vtDivSnapshot,
CompletableFuture<Void> lastRecordPersistedFuture,
StoreIngestionTask ingestionTask) throws InterruptedException {
getDelegate(ingestionTask).execSyncOffsetFromSnapshotAsync(topicPartition, vtDivSnapshot, ingestionTask);
getDelegate(ingestionTask)
.execSyncOffsetFromSnapshotAsync(topicPartition, vtDivSnapshot, lastRecordPersistedFuture, ingestionTask);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,12 +336,19 @@ public CompletableFuture<Void> execSyncOffsetCommandAsync(
return syncOffsetCmd.getCmdExecutedFuture();
}

/**
* lastRecordPersistedFuture indicates whether the last record was persisted successfully and will have two sources.
* 1. From the follower code path, it is lastQueuedRecordPersistedFuture from the PCS set in putConsumeRecord().
* 2. From the leader side, it is the persistedToDBFuture from the LeaderProducedRecordContext from when the
* LeaderProducerCallback was created.
*/
public void execSyncOffsetFromSnapshotAsync(
PubSubTopicPartition topicPartition,
PartitionTracker vtDivSnapshot,
CompletableFuture<Void> lastRecordPersistedFuture,
StoreIngestionTask ingestionTask) throws InterruptedException {
DefaultPubSubMessage fakeRecord = new FakePubSubMessage(topicPartition);
SyncVtDivNode syncDivNode = new SyncVtDivNode(fakeRecord, vtDivSnapshot, ingestionTask);
SyncVtDivNode syncDivNode = new SyncVtDivNode(fakeRecord, vtDivSnapshot, lastRecordPersistedFuture, ingestionTask);
getDrainerForConsumerRecord(fakeRecord, topicPartition.getPartitionNumber()).put(syncDivNode);
}

Expand Down Expand Up @@ -688,20 +695,31 @@ protected int getBaseClassOverhead() {
/**
* Allows the ConsumptionTask to command the Drainer to sync the VT DIV to the OffsetRecord.
*/
private static class SyncVtDivNode extends QueueNode {
static class SyncVtDivNode extends QueueNode {
private static final int PARTIAL_CLASS_OVERHEAD = getClassOverhead(SyncVtDivNode.class);

private PartitionTracker vtDivSnapshot;
private final PartitionTracker vtDivSnapshot;
private final CompletableFuture<Void> lastRecordPersistedFuture;

public SyncVtDivNode(
DefaultPubSubMessage consumerRecord,
PartitionTracker vtDivSnapshot,
CompletableFuture<Void> lastRecordPersistedFuture,
StoreIngestionTask ingestionTask) {
super(consumerRecord, ingestionTask, StringUtils.EMPTY, 0);
this.vtDivSnapshot = vtDivSnapshot;
this.lastRecordPersistedFuture = lastRecordPersistedFuture;
}

public void execute() {
if (!lastRecordPersistedFuture.isDone() || lastRecordPersistedFuture.isCompletedExceptionally()) {
LOGGER.warn(
"event=globalRtDiv Skipping SyncVtDivNode for {} because preceding record failed (done={} exception={})",
getConsumerRecord().getTopicPartition(),
lastRecordPersistedFuture.isDone(),
lastRecordPersistedFuture.isCompletedExceptionally());
return;
}
getIngestionTask().updateAndSyncOffsetFromSnapshot(vtDivSnapshot, getConsumerRecord().getTopicPartition());
}

Expand Down Expand Up @@ -845,7 +863,7 @@ public void run() {
}
}

private static class FakePubSubMessage implements DefaultPubSubMessage {
static class FakePubSubMessage implements DefaultPubSubMessage {
private static final int SHALLOW_CLASS_OVERHEAD = ClassSizeEstimator.getClassOverhead(FakePubSubMessage.class);
private final PubSubTopicPartition topicPartition;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,10 @@ private int handleSingleMessage(
elapsedTimeForPuttingIntoQueue.setValue(
elapsedTimeForPuttingIntoQueue.getValue() + LatencyUtils.getElapsedTimeFromNSToMS(queuePutStartTimeInNS));
}

// Intentionally not protecting against exceptions thrown by putConsumerRecord()
// Only sync OffsetRecord if the message that triggered the sync was successfully enqueued into the drainer
syncOffsetFromSnapshotIfNeeded(record, topicPartition); // latest consumed VT position (LCVP) in offset record
break;
case PRODUCED_TO_KAFKA:
case SKIPPED_MESSAGE:
Expand Down Expand Up @@ -2797,6 +2801,8 @@ boolean shouldSendGlobalRtDiv(DefaultPubSubMessage record, PartitionConsumptionS
return syncBytesInterval > 0 && (getConsumedBytesSinceLastSync().getOrDefault(brokerUrl, 0L) >= syncBytesInterval);
}

abstract void syncOffsetFromSnapshotIfNeeded(DefaultPubSubMessage record, PubSubTopicPartition topicPartition);

/**
* Update the offset metadata in OffsetRecord in the following cases:
* 1. A ControlMessage other than Start_of_Segment and End_of_Segment is processed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public Map<CharSequence, ProducerPartitionState> getPartitionStates(TopicType ty
.stream()
.collect(
Collectors.toMap(
entry -> GuidUtils.getHexFromGuid(entry.getKey()),
entry -> GuidUtils.guidToUtf8(entry.getKey()),
entry -> entry.getValue().toProducerPartitionState()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.exceptions.VeniceTimeoutException;
import com.linkedin.venice.kafka.protocol.ControlMessage;
import com.linkedin.venice.kafka.protocol.Delete;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.LeaderMetadata;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
Expand Down Expand Up @@ -113,6 +114,8 @@

public class LeaderFollowerStoreIngestionTaskTest {
private static final PubSubTopicRepository TOPIC_REPOSITORY = new PubSubTopicRepository();
private static final int GLOBAL_RT_DIV_VERSION =
AvroProtocolDefinition.GLOBAL_RT_DIV_STATE.getCurrentProtocolVersion();

private Store mockStore;
private LeaderFollowerStoreIngestionTask leaderFollowerStoreIngestionTask;
Expand Down Expand Up @@ -605,8 +608,7 @@ public void testSendGlobalRtDivMessage() throws InterruptedException, IOExceptio
byte[] valueBytes = ByteUtils.extractByteArray(compressor.decompress(ByteBuffer.wrap(compressedBytes)));
InternalAvroSpecificSerializer<GlobalRtDivState> serializer =
leaderFollowerStoreIngestionTask.globalRtDivStateSerializer;
GlobalRtDivState globalRtDiv =
serializer.deserialize(valueBytes, AvroProtocolDefinition.GLOBAL_RT_DIV_STATE.getCurrentProtocolVersion());
GlobalRtDivState globalRtDiv = serializer.deserialize(valueBytes, GLOBAL_RT_DIV_VERSION);
assertNotNull(globalRtDiv);

// Verify the callback has DivSnapshot (VT + RT DIV)
Expand All @@ -618,19 +620,20 @@ public void testSendGlobalRtDivMessage() throws InterruptedException, IOExceptio
assertEquals(callbackPayload.getPartition(), partition);
assertTrue(callbackPayload.getValue().payloadUnion instanceof Put);
Put put = (Put) callbackPayload.getValue().payloadUnion;
assertEquals(put.getSchemaId(), AvroProtocolDefinition.GLOBAL_RT_DIV_STATE.getCurrentProtocolVersion());
assertEquals(put.getSchemaId(), GLOBAL_RT_DIV_VERSION);
assertNotNull(put.getPutValue());

// Verify that completing the future from put() causes execSyncOffsetFromSnapshotAsync to be called
// and that produceResult should override the LCVP of the VT DIV sent to the drainer
verify(mockStoreBufferService, never()).execSyncOffsetFromSnapshotAsync(any(), any(), any());
verify(mockStoreBufferService, never()).execSyncOffsetFromSnapshotAsync(any(), any(), any(), any());
PubSubProduceResult produceResult = mock(PubSubProduceResult.class);
PubSubPosition specificPosition = InMemoryPubSubPosition.of(11L);
when(produceResult.getPubSubPosition()).thenReturn(specificPosition);
when(produceResult.getSerializedSize()).thenReturn(keyBytes.length + put.putValue.remaining());
callback.onCompletion(produceResult, null);
ArgumentCaptor<PartitionTracker> vtDivCaptor = ArgumentCaptor.forClass(PartitionTracker.class);
verify(mockStoreBufferService, times(1)).execSyncOffsetFromSnapshotAsync(any(), vtDivCaptor.capture(), any());
verify(mockStoreBufferService, times(1))
.execSyncOffsetFromSnapshotAsync(any(), vtDivCaptor.capture(), any(), any());
assertEquals(vtDivCaptor.getValue().getLatestConsumedVtPosition(), specificPosition);
}

Expand Down Expand Up @@ -668,17 +671,22 @@ public void testShouldSyncOffsetFromSnapshot() throws InterruptedException {
Put mockPut = mock(Put.class);
KafkaMessageEnvelope mockKme = globalRtDivMessage.getValue();

// The method should only return true for non-chunk Global RT DIV messages
// The method should only return true for non-chunked Global RT DIV messages
assertFalse(mockIngestionTask.shouldSyncOffsetFromSnapshot(globalRtDivMessage, mockPartitionConsumptionState));
doReturn(true).when(mockKey).isGlobalRtDiv();
doReturn(mockPut).when(mockKme).getPayloadUnion();
doReturn(AvroProtocolDefinition.GLOBAL_RT_DIV_STATE.getCurrentProtocolVersion()).when(mockPut).getSchemaId();
doReturn(GLOBAL_RT_DIV_VERSION).when(mockPut).getSchemaId();
assertTrue(mockIngestionTask.shouldSyncOffsetFromSnapshot(globalRtDivMessage, mockPartitionConsumptionState));
doReturn(AvroProtocolDefinition.CHUNK.getCurrentProtocolVersion()).when(mockPut).getSchemaId();
assertFalse(mockIngestionTask.shouldSyncOffsetFromSnapshot(globalRtDivMessage, mockPartitionConsumptionState));
doReturn(AvroProtocolDefinition.CHUNKED_VALUE_MANIFEST.getCurrentProtocolVersion()).when(mockPut).getSchemaId();
assertTrue(mockIngestionTask.shouldSyncOffsetFromSnapshot(globalRtDivMessage, mockPartitionConsumptionState));

// The method should not error when a non-Put value is passed in
Delete mockDelete = mock(Delete.class);
doReturn(mockDelete).when(mockKme).getPayloadUnion();
assertFalse(mockIngestionTask.shouldSyncOffsetFromSnapshot(globalRtDivMessage, mockPartitionConsumptionState));

// Set up Control Message
final DefaultPubSubMessage nonSegmentControlMessage = getMockMessage(2).getMessage();
mockKey = nonSegmentControlMessage.getKey();
Expand Down
Loading
Loading