@@ -2485,19 +2485,8 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
24852485 // Update the latest consumed VT position (LCVP) since we're consuming from the version topic
24862486 if (isGlobalRtDivEnabled ()) {
24872487 getConsumerDiv ().updateLatestConsumedVtPosition (partition , consumerRecord .getPosition ());
2488-
2489- if (shouldSyncOffsetFromSnapshot (consumerRecord , partitionConsumptionState )) {
2490- PubSubTopicPartition topicPartition = new PubSubTopicPartitionImpl (getVersionTopic (), partition );
2491- PartitionTracker vtDiv = consumerDiv .cloneVtProducerStates (partition ); // has latest consumed VT position
2492- storeBufferService .execSyncOffsetFromSnapshotAsync (topicPartition , vtDiv , this );
2493- // TODO: remove. this is a temporary log for debugging while the feature is in its infancy
2494- int partitionStateMapSize = vtDiv .getPartitionStates (PartitionTracker .VERSION_TOPIC ).size ();
2495- LOGGER .info (
2496- "event=globalRtDiv Syncing LCVP for OffsetRecord topic-partition: {} position: {} size: {}" ,
2497- topicPartition ,
2498- consumerRecord .getPosition (),
2499- partitionStateMapSize );
2500- }
2488+ // Only after the current message is queued to drainer
2489+ // The Offset Record's LCVP may be synced in syncOffsetFromSnapshotIfNeeded()
25012490 }
25022491
25032492 /**
@@ -2788,16 +2777,48 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
27882777 }
27892778 }
27902779
2780+ void syncOffsetFromSnapshotIfNeeded (DefaultPubSubMessage record , PubSubTopicPartition topicPartition ) {
2781+ int partition = topicPartition .getPartitionNumber ();
2782+ if (!isGlobalRtDivEnabled () || !shouldSyncOffsetFromSnapshot (record , getPartitionConsumptionState (partition ))) {
2783+ return ; // without Global RT DIV enabled, the offset record is synced in the drainer in syncOffset()
2784+ }
2785+
2786+ PartitionConsumptionState pcs = getPartitionConsumptionState (topicPartition .getPartitionNumber ());
2787+ if (pcs == null || pcs .getLastQueuedRecordPersistedFuture () == null ) {
2788+ LOGGER .warn (
2789+ "event=globalRtDiv No PCS or lastRecordPersistedFuture found for topic-partition: {}. "
2790+ + "Will not sync OffsetRecord without waiting for any record to be persisted" ,
2791+ topicPartition );
2792+ return ;
2793+ }
2794+
2795+ try {
2796+ PartitionTracker vtDiv = consumerDiv .cloneVtProducerStates (partition ); // has latest consumed VT position
2797+ CompletableFuture <Void > lastFuture = pcs .getLastQueuedRecordPersistedFuture ();
2798+ storeBufferService .execSyncOffsetFromSnapshotAsync (topicPartition , vtDiv , lastFuture , this );
2799+
2800+ // TODO: remove. this is a temporary log for debugging while the feature is in its infancy
2801+ LOGGER .info (
2802+ "event=globalRtDiv Syncing LCVP for OffsetRecord topic-partition: {} position: {} size: {}" ,
2803+ topicPartition ,
2804+ record .getPosition (),
2805+ vtDiv .getPartitionStates (PartitionTracker .VERSION_TOPIC ).size ());
2806+ } catch (InterruptedException e ) {
2807+ LOGGER .error ("event=globalRtDiv Unable to sync Offset Record to update the latest consumed vt position" , e );
2808+ }
2809+ }
2810+
27912811 /**
2792- * Followers should sync the VT DIV to the OffsetRecord if the consumer sees a Global RT DIV message
2793- * (sync only once for a Global RT DIV, which can either be one singular message or multiple chunks + one manifest.
2794- * thus, the condition is to check that it's not a chunk) or if it sees a non-segment control message.
2812+ * Followers should sync the VT DIV to the OffsetRecord if the consumer sees a non-segment control message or a
2813+ * Global RT DIV message.
2814+ * Each Global RT DIV sync will create one singular Put or multiple Puts (chunks + one manifest + Deletes). Thus
2815+ * if we want to sync only once, checking if it's a singular Put or the manifest Put should only trigger once.
27952816 */
27962817 boolean shouldSyncOffsetFromSnapshot (DefaultPubSubMessage consumerRecord , PartitionConsumptionState pcs ) {
27972818 if (consumerRecord .getKey ().isGlobalRtDiv ()) {
2798- Put put = ( Put ) consumerRecord .getValue ().getPayloadUnion ();
2799- if (put .getSchemaId () != CHUNK_SCHEMA_ID ) {
2800- return true ;
2819+ Object payloadUnion = consumerRecord .getValue ().getPayloadUnion ();
2820+ if (payloadUnion instanceof Put && (( Put ) payloadUnion ) .getSchemaId () != CHUNK_SCHEMA_ID ) {
2821+ return true ; // Global RT DIV message can be multiple chunks + deletes, only sync on one Put (manifest or value)
28012822 }
28022823 }
28032824 return isNonSegmentControlMessage (consumerRecord , null );
@@ -3538,19 +3559,19 @@ private byte[] createGlobalRtDivValueBytes(
35383559 }
35393560
35403561 private LeaderProducerCallback createGlobalRtDivCallback (
3541- DefaultPubSubMessage previousMessage ,
3542- PartitionConsumptionState partitionConsumptionState ,
3562+ DefaultPubSubMessage prevMessage ,
3563+ PartitionConsumptionState pcs ,
35433564 int partition ,
35443565 String brokerUrl ,
35453566 long beforeProcessingRecordTimestampNs ,
3546- LeaderProducedRecordContext context ,
3567+ LeaderProducedRecordContext prevContext ,
35473568 byte [] keyBytes ,
35483569 byte [] valueBytes ,
35493570 PubSubTopicPartition topicPartition ,
35503571 PartitionTracker vtDiv ) {
35513572 final int schemaId = AvroProtocolDefinition .GLOBAL_RT_DIV_STATE .getCurrentProtocolVersion ();
35523573 KafkaKey divKey = new KafkaKey (MessageType .GLOBAL_RT_DIV , keyBytes );
3553- KafkaMessageEnvelope divEnvelope = getVeniceWriter (partitionConsumptionState ).get ()
3574+ KafkaMessageEnvelope divEnvelope = getVeniceWriter (pcs ).get ()
35543575 .getKafkaMessageEnvelope (
35553576 MessageType .PUT ,
35563577 false ,
@@ -3567,25 +3588,23 @@ private LeaderProducerCallback createGlobalRtDivCallback(
35673588 divKey ,
35683589 divEnvelope ,
35693590 topicPartition ,
3570- previousMessage .getPosition (),
3591+ prevMessage .getPosition (),
35713592 System .currentTimeMillis (),
35723593 divKey .getKeyLength () + valueBytes .length );
3573- LeaderProducerCallback divCallback = createProducerCallback (
3574- divMessage ,
3575- partitionConsumptionState ,
3576- LeaderProducedRecordContext
3577- .newPutRecord (context .getConsumedKafkaClusterId (), context .getConsumedPosition (), keyBytes , put ),
3578- partition ,
3579- brokerUrl ,
3580- beforeProcessingRecordTimestampNs );
3581-
3582- // After producing RT DIV to local VT, the VT DIV should be sent to the drainer to sync to the OffsetRecord
3583- divCallback .setOnCompletionFunction (produceResult -> {
3594+ LeaderProducedRecordContext context = LeaderProducedRecordContext
3595+ .newPutRecord (prevContext .getConsumedKafkaClusterId (), prevContext .getConsumedPosition (), keyBytes , put );
3596+ LeaderProducerCallback divCallback =
3597+ createProducerCallback (divMessage , pcs , context , partition , brokerUrl , beforeProcessingRecordTimestampNs );
3598+
3599+ // After producing the RT DIV to local VT and LeaderProducerCallback.onCompletion() enqueuing RT DIV in the drainer,
3600+ // the VT DIV should be sent to the drainer to persist the LCVP onto disk by syncing the OffsetRecord
3601+ divCallback .setOnCompletionCallback (produceResult -> {
35843602 try {
3603+ CompletableFuture <Void > lastRecordPersistedFuture = context .getPersistedToDBFuture ();
35853604 vtDiv .updateLatestConsumedVtPosition (produceResult .getPubSubPosition ()); // LCVP = produced position in local VT
3586- storeBufferService .execSyncOffsetFromSnapshotAsync (topicPartition , vtDiv , this );
3605+ storeBufferService .execSyncOffsetFromSnapshotAsync (topicPartition , vtDiv , lastRecordPersistedFuture , this );
35873606 } catch (InterruptedException e ) {
3588- LOGGER .error ("Failed to sync VT DIV to OffsetRecord for replica: {}" , topicPartition , e );
3607+ LOGGER .error ("event=globalRtDiv Failed to sync VT DIV to OffsetRecord for replica: {}" , topicPartition , e );
35893608 }
35903609 });
35913610
0 commit comments