Skip to content

Commit 42ef2d9

Browse files
authored
update the parquet gap detector result to use better name and fix offset issue to start with correction version (#489)
1 parent 8a095c6 commit 42ef2d9

File tree

2 files changed

+6
-6
lines changed

2 files changed

+6
-6
lines changed

rust/processor/src/gap_detectors/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ pub async fn create_gap_detector_status_tracker_loop(
138138
if res.num_gaps >= gap_detection_batch_size {
139139
tracing::warn!(
140140
processor_name,
141-
gap_start_version = res.next_version_to_process,
141+
gap_start_version = res.last_success_version,
142142
num_gaps = res.num_gaps,
143143
"[Parser] Processed batches with a gap",
144144
);
@@ -149,13 +149,13 @@ pub async fn create_gap_detector_status_tracker_loop(
149149
>= UPDATE_PROCESSOR_STATUS_SECS
150150
{
151151
tracing::info!(
152-
last_processed_version = res.next_version_to_process,
152+
last_processed_version = res.last_success_version,
153153
processor_name,
154154
"Updating last processed version"
155155
);
156156
processor
157157
.update_last_processed_version(
158-
res.next_version_to_process,
158+
res.last_success_version,
159159
res.last_transaction_timestamp,
160160
)
161161
.await

rust/processor/src/gap_detectors/parquet_gap_detector.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pub struct ParquetFileGapDetectorInner {
2626
}
2727

2828
pub struct ParquetFileGapDetectorResult {
29-
pub next_version_to_process: u64,
29+
pub last_success_version: u64,
3030
pub num_gaps: u64,
3131
pub last_transaction_timestamp: Option<aptos_protos::util::timestamp::Timestamp>,
3232
}
@@ -138,7 +138,7 @@ impl GapDetectorTrait for ParquetFileGapDetectorInner {
138138
self.update_next_version_to_process(self.max_version, &result.table_name);
139139
return Ok(GapDetectorResult::ParquetFileGapDetectorResult(
140140
ParquetFileGapDetectorResult {
141-
next_version_to_process: self.next_version_to_process as u64,
141+
last_success_version: self.next_version_to_process as u64 - 1,
142142
num_gaps: (self.max_version - self.next_version_to_process) as u64,
143143
last_transaction_timestamp: result.last_transaction_timestamp,
144144
},
@@ -164,7 +164,7 @@ impl GapDetectorTrait for ParquetFileGapDetectorInner {
164164

165165
Ok(GapDetectorResult::ParquetFileGapDetectorResult(
166166
ParquetFileGapDetectorResult {
167-
next_version_to_process: self.next_version_to_process as u64,
167+
last_success_version: self.next_version_to_process as u64 - 1,
168168
num_gaps: (self.max_version - self.next_version_to_process) as u64,
169169
last_transaction_timestamp: result.last_transaction_timestamp,
170170
},

0 commit comments

Comments
 (0)