Skip to content

Commit a3a16af

Browse files
authored
fix parquet to start from where it left off when it's re-deployed (#468)
* fix parquet to start from where it left off when it's re-deployed * lint * temp * remove logs * add log for dashboard * add metrics/logs * add more metrics * remove mutable and let the buffer drop natively * lint
1 parent fa1ce49 commit a3a16af

File tree

12 files changed

+287
-156
lines changed

12 files changed

+287
-156
lines changed

rust/processor/src/bq_analytics/gcs_handler.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
use crate::bq_analytics::ParquetProcessorError;
2-
use anyhow::Result;
1+
use crate::{bq_analytics::ParquetProcessorError, utils::counters::PARQUET_BUFFER_SIZE};
2+
use anyhow::{Context, Result};
33
use chrono::{Datelike, Timelike};
44
use google_cloud_storage::{
55
client::Client as GCSClient,
66
http::objects::upload::{Media, UploadObjectRequest, UploadType},
77
};
8-
use hyper::Body;
8+
use hyper::{body::HttpBody, Body};
99
use std::path::{Path, PathBuf};
1010
use tokio::time::{sleep, timeout, Duration};
1111
use tracing::{debug, error, info};
12+
1213
const MAX_RETRIES: usize = 3;
1314
const INITIAL_DELAY_MS: u64 = 500;
1415
const TIMEOUT_SECONDS: u64 = 300;
@@ -18,6 +19,7 @@ pub async fn upload_parquet_to_gcs(
1819
table_name: &str,
1920
bucket_name: &str,
2021
bucket_root: &Path,
22+
processor_name: String,
2123
) -> Result<(), ParquetProcessorError> {
2224
if buffer.is_empty() {
2325
error!("The file is empty and has no data to upload.",);
@@ -57,6 +59,12 @@ pub async fn upload_parquet_to_gcs(
5759

5860
loop {
5961
let data = Body::from(buffer.clone());
62+
let size_hint = data.size_hint();
63+
let size = size_hint.exact().context("Failed to get size hint")?;
64+
PARQUET_BUFFER_SIZE
65+
.with_label_values(&[&processor_name, table_name])
66+
.set(size as i64);
67+
6068
let upload_result = timeout(
6169
Duration::from_secs(TIMEOUT_SECONDS),
6270
client.upload_object(&upload_request, data, &upload_type),
@@ -65,7 +73,11 @@ pub async fn upload_parquet_to_gcs(
6573

6674
match upload_result {
6775
Ok(Ok(result)) => {
68-
info!("File uploaded successfully to GCS: {}", result.name);
76+
info!(
77+
table_name = table_name,
78+
file_name = result.name,
79+
"File uploaded successfully to GCS",
80+
);
6981
return Ok(());
7082
},
7183
Ok(Err(e)) => {

rust/processor/src/bq_analytics/generic_parquet_processor.rs

Lines changed: 39 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::{
33
bq_analytics::gcs_handler::upload_parquet_to_gcs,
44
gap_detectors::ProcessingResult,
55
utils::{
6-
counters::{PARQUET_HANDLER_BUFFER_SIZE, PARQUET_STRUCT_SIZE},
6+
counters::{PARQUET_HANDLER_CURRENT_BUFFER_SIZE, PARQUET_STRUCT_SIZE},
77
util::naive_datetime_to_timestamp,
88
},
99
};
@@ -23,7 +23,6 @@ use tracing::{debug, error, info};
2323
#[derive(Debug, Default, Clone)]
2424
pub struct ParquetDataGeneric<ParquetType> {
2525
pub data: Vec<ParquetType>,
26-
pub transaction_version_to_struct_count: AHashMap<i64, i64>,
2726
}
2827

2928
pub trait NamedTable {
@@ -71,6 +70,7 @@ where
7170
pub upload_interval: Duration,
7271
pub max_buffer_size: usize,
7372
pub last_upload_time: Instant,
73+
pub processor_name: String,
7474
}
7575
fn create_new_writer(schema: Arc<Type>) -> Result<SerializedFileWriter<Vec<u8>>> {
7676
let props = WriterProperties::builder()
@@ -103,6 +103,7 @@ where
103103
schema: Arc<Type>,
104104
upload_interval: Duration,
105105
max_buffer_size: usize,
106+
processor_name: String,
106107
) -> Result<Self> {
107108
// had to append unique id to avoid concurrent write issues
108109
let writer = create_new_writer(schema.clone())?;
@@ -119,6 +120,7 @@ where
119120
upload_interval,
120121
max_buffer_size,
121122
last_upload_time: Instant::now(),
123+
processor_name,
122124
})
123125
}
124126

@@ -128,47 +130,54 @@ where
128130
changes: ParquetDataGeneric<ParquetType>,
129131
) -> Result<()> {
130132
let parquet_structs = changes.data;
131-
self.transaction_version_to_struct_count
132-
.extend(changes.transaction_version_to_struct_count);
133-
133+
let processor_name = self.processor_name.clone();
134134
for parquet_struct in parquet_structs {
135135
let size_of_struct = allocative::size_of_unique(&parquet_struct);
136136
PARQUET_STRUCT_SIZE
137-
.with_label_values(&[ParquetType::TABLE_NAME])
137+
.with_label_values(&[&processor_name, ParquetType::TABLE_NAME])
138138
.set(size_of_struct as i64);
139139
self.buffer_size_bytes += size_of_struct;
140140
self.buffer.push(parquet_struct);
141141

142142
if self.buffer_size_bytes >= self.max_buffer_size {
143-
info!("Max buffer size reached, uploading to GCS.");
143+
debug!(
144+
table_name = ParquetType::TABLE_NAME,
145+
buffer_size = self.buffer_size_bytes,
146+
max_buffer_size = self.max_buffer_size,
147+
"Max buffer size reached, uploading to GCS."
148+
);
144149
if let Err(e) = self.upload_buffer(gcs_client).await {
145150
error!("Failed to upload buffer: {}", e);
146151
return Err(e);
147152
}
148153
self.last_upload_time = Instant::now();
149154
}
155+
}
150156

151-
if self.last_upload_time.elapsed() >= self.upload_interval {
152-
info!(
153-
"Time has elapsed more than {} since last upload.",
154-
self.upload_interval.as_secs()
155-
);
156-
if let Err(e) = self.upload_buffer(gcs_client).await {
157-
error!("Failed to upload buffer: {}", e);
158-
return Err(e);
159-
}
160-
self.last_upload_time = Instant::now();
157+
if self.last_upload_time.elapsed() >= self.upload_interval {
158+
info!(
159+
"Time has elapsed more than {} since last upload for {}",
160+
self.upload_interval.as_secs(),
161+
ParquetType::TABLE_NAME
162+
);
163+
if let Err(e) = self.upload_buffer(gcs_client).await {
164+
error!("Failed to upload buffer: {}", e);
165+
return Err(e);
161166
}
167+
self.last_upload_time = Instant::now();
162168
}
163169

164-
PARQUET_HANDLER_BUFFER_SIZE
165-
.with_label_values(&[ParquetType::TABLE_NAME])
166-
.set(self.buffer.len() as i64);
170+
PARQUET_HANDLER_CURRENT_BUFFER_SIZE
171+
.with_label_values(&[&self.processor_name, ParquetType::TABLE_NAME])
172+
.set(self.buffer_size_bytes as i64);
173+
167174
Ok(())
168175
}
169176

170177
async fn upload_buffer(&mut self, gcs_client: &GCSClient) -> Result<()> {
178+
// This is to cover the case when interval duration has passed but buffer is empty
171179
if self.buffer.is_empty() {
180+
debug!("Buffer is empty, skipping upload.");
172181
return Ok(());
173182
}
174183
let start_version = self
@@ -183,9 +192,7 @@ where
183192
let end_version = last.version();
184193
let last_transaction_timestamp = naive_datetime_to_timestamp(last.get_timestamp());
185194

186-
let txn_version_to_struct_count =
187-
process_struct_count_map(&self.buffer, &mut self.transaction_version_to_struct_count);
188-
195+
let parquet_processed_transactions = build_parquet_processed_transactions(&self.buffer);
189196
let struct_buffer = std::mem::take(&mut self.buffer);
190197

191198
let mut row_group_writer = self
@@ -206,12 +213,6 @@ where
206213
.into_inner()
207214
.context("Failed to get inner buffer")?;
208215

209-
debug!(
210-
table_name = ParquetType::TABLE_NAME,
211-
start_version = start_version,
212-
end_version = end_version,
213-
"Max buffer size reached, uploading to GCS."
214-
);
215216
let bucket_root = PathBuf::from(&self.bucket_root);
216217

217218
upload_parquet_to_gcs(
@@ -220,6 +221,7 @@ where
220221
ParquetType::TABLE_NAME,
221222
&self.bucket_name,
222223
&bucket_root,
224+
self.processor_name.clone(),
223225
)
224226
.await?;
225227

@@ -229,7 +231,9 @@ where
229231
start_version,
230232
end_version,
231233
last_transaction_timestamp: Some(last_transaction_timestamp),
232-
txn_version_to_struct_count,
234+
txn_version_to_struct_count: None,
235+
parquet_processed_structs: Some(parquet_processed_transactions),
236+
table_name: ParquetType::TABLE_NAME.to_string(),
233237
};
234238

235239
self.gap_detector_sender
@@ -243,19 +247,18 @@ where
243247
}
244248
}
245249

246-
fn process_struct_count_map<ParquetType: NamedTable + HasVersion>(
250+
fn build_parquet_processed_transactions<ParquetType: NamedTable + HasVersion>(
247251
buffer: &[ParquetType],
248-
txn_version_to_struct_count: &mut AHashMap<i64, i64>,
249252
) -> AHashMap<i64, i64> {
250253
let mut txn_version_to_struct_count_for_gap_detector = AHashMap::new();
251254

252255
for item in buffer.iter() {
253256
let version = item.version();
254257

255-
if let Some(count) = txn_version_to_struct_count.get(&(version)) {
256-
txn_version_to_struct_count_for_gap_detector.insert(version, *count);
257-
txn_version_to_struct_count.remove(&(version));
258-
}
258+
txn_version_to_struct_count_for_gap_detector
259+
.entry(version)
260+
.and_modify(|count| *count += 1)
261+
.or_insert(1);
259262
}
260263
txn_version_to_struct_count_for_gap_detector
261264
}

rust/processor/src/bq_analytics/mod.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@ pub struct ParquetProcessingResult {
3030
pub start_version: i64,
3131
pub end_version: i64,
3232
pub last_transaction_timestamp: Option<aptos_protos::util::timestamp::Timestamp>,
33-
pub txn_version_to_struct_count: AHashMap<i64, i64>,
33+
pub txn_version_to_struct_count: Option<AHashMap<i64, i64>>,
34+
// This is used to store the processed structs in the parquet file
35+
pub parquet_processed_structs: Option<AHashMap<i64, i64>>,
36+
pub table_name: String,
3437
}
3538

3639
#[derive(Debug)]
@@ -115,13 +118,14 @@ where
115118
"[Parquet Handler] Starting parquet handler loop",
116119
);
117120

118-
let mut parquet_manager = GenericParquetHandler::new(
121+
let mut parquet_handler = GenericParquetHandler::new(
119122
bucket_name.clone(),
120123
bucket_root.clone(),
121124
new_gap_detector_sender.clone(),
122125
ParquetType::schema(),
123126
upload_interval,
124127
max_buffer_size,
128+
processor_name.clone(),
125129
)
126130
.expect("Failed to create parquet manager");
127131

@@ -135,7 +139,7 @@ where
135139
loop {
136140
match parquet_receiver.recv().await {
137141
Ok(txn_pb_res) => {
138-
let result = parquet_manager.handle(&gcs_client, txn_pb_res).await;
142+
let result = parquet_handler.handle(&gcs_client, txn_pb_res).await;
139143

140144
match result {
141145
Ok(_) => {

rust/processor/src/db/common/models/default_models/parquet_transactions.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -384,15 +384,16 @@ impl Transaction {
384384

385385
if let Some(a) = block_metadata {
386386
block_metadata_txns.push(a.clone());
387-
// transaction_version_to_struct_count.entry(a.version).and_modify(|e| *e += 1);
388387
}
389-
wscs.append(&mut wsc_list);
390388

391389
if !wsc_list.is_empty() {
392390
transaction_version_to_struct_count
393-
.entry(wsc_list[0].txn_version)
394-
.and_modify(|e| *e += wsc_list.len() as i64);
391+
.entry(txn.txn_version)
392+
.and_modify(|e| *e += wsc_list.len() as i64)
393+
.or_insert(wsc_list.len() as i64);
395394
}
395+
wscs.append(&mut wsc_list);
396+
396397
wsc_details.append(&mut wsc_detail_list);
397398
}
398399
(txns, block_metadata_txns, wscs, wsc_details)

rust/processor/src/gap_detectors/gap_detector.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::{
88
use ahash::AHashMap;
99
use anyhow::Result;
1010

11+
#[derive(Clone)]
1112
pub struct DefaultGapDetector {
1213
next_version_to_process: u64,
1314
seen_versions: AHashMap<u64, DefaultProcessingResult>,

0 commit comments

Comments
 (0)