Skip to content

Commit 605dedf

Browse files
committed
experimental: use COPY TO STDOUT (BINARY) instead of normal query
1 parent 7e69b77 commit 605dedf

File tree

4 files changed

+101
-27
lines changed

4 files changed

+101
-27
lines changed

cli/src/main.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -243,10 +243,11 @@ fn perform_export(args: ExportArgs) {
243243
decimal_precision: args.schema_settings.decimal_precision,
244244
array_handling: args.schema_settings.array_handling,
245245
};
246-
let query = args.query.unwrap_or_else(|| {
247-
format!("SELECT * FROM {}", args.table.unwrap())
248-
});
249-
let result = postgres_cloner::execute_copy(&args.postgres, &query, &args.output_file, props, args.quiet, &settings);
246+
let result = if let Some(table) = &args.table {
247+
postgres_cloner::execute_copy_table(&args.postgres, table, &args.output_file, props, args.quiet, &settings)
248+
} else {
249+
postgres_cloner::execute_copy_query(&args.postgres, &args.query.unwrap(), &args.output_file, props, args.quiet, &settings)
250+
};
250251
let _stats = handle_result(result);
251252

252253
// eprintln!("Wrote {} rows, {} bytes of raw data in {} groups", stats.rows, stats.bytes, stats.groups);

cli/src/parquet_writer.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ pub struct WriterSettings {
1919
pub row_group_row_limit: usize
2020
}
2121

22-
pub struct ParquetRowWriter<W: Write + Send> {
22+
pub struct ParquetRowWriter<W: Write + Send, T: PgAbstractRow = postgres::Row> {
2323
writer: SerializedFileWriter<W>,
2424
schema: parquet::schema::types::TypePtr,
2525
// row_group_writer: SerializedRowGroupWriter<'a, W>,
26-
appender: DynColumnAppender<Arc<postgres::Row>>,
26+
appender: DynColumnAppender<Arc<T>>,
2727
stats: WriterStats,
2828
last_timestep_stats: WriterStats,
2929
last_timestep_time: std::time::Instant,
@@ -35,11 +35,11 @@ pub struct ParquetRowWriter<W: Write + Send> {
3535
current_group_rows: usize
3636
}
3737

38-
impl <W: Write + Send> ParquetRowWriter<W> {
38+
impl<W: Write + Send, T: PgAbstractRow> ParquetRowWriter<W, T> {
3939
pub fn new(
4040
writer: SerializedFileWriter<W>,
4141
schema: parquet::schema::types::TypePtr,
42-
appender: DynColumnAppender<Arc<postgres::Row>>,
42+
appender: DynColumnAppender<Arc<T>>,
4343
quiet: bool,
4444
settings: WriterSettings
4545
) -> parquet::errors::Result<Self> {
@@ -83,10 +83,10 @@ impl <W: Write + Send> ParquetRowWriter<W> {
8383
Ok(())
8484
}
8585

86-
pub fn write_row(&mut self, row: Arc<postgres::Row>) -> Result<(), String> {
86+
pub fn write_row(&mut self, row: Arc<T>) -> Result<(), String> {
8787
let lvl = LevelIndexList::new_i(self.stats.rows);
8888
let bytes = self.appender.copy_value(&lvl, Cow::Borrowed(&row))
89-
.map_err(|e| format!("Could not copy Row[{}]:", identify_row(&row)) + &e)?;
89+
.map_err(|e| format!("Could not copy row: {}", e))?;
9090

9191
self.current_group_bytes += bytes;
9292
self.current_group_rows += 1;

cli/src/pg_custom_types.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::{sync::Arc, any::TypeId, io::Read};
22

33
use byteorder::{ReadBytesExt, BigEndian};
44
use postgres::types::{FromSql, Kind, WrongType, Field};
5+
use postgres::binary_copy::BinaryCopyOutRow;
56
use postgres_protocol::types as pgtypes;
67

78
fn read_pg_len(bytes: &[u8]) -> i32 {
@@ -315,6 +316,17 @@ impl<TRow: PgAbstractRow> PgAbstractRow for Arc<TRow> {
315316
}
316317
}
317318

319+
impl PgAbstractRow for BinaryCopyOutRow {
320+
fn ab_get<'a, T: FromSql<'a>>(&'a self, index: usize) -> T {
321+
self.get(index)
322+
}
323+
324+
fn ab_len(&self) -> usize {
325+
// ab_len is not used in the current implementation
326+
0
327+
}
328+
}
329+
318330
pub struct UnclonableHack<T>(pub T);
319331

320332
impl<T> Clone for UnclonableHack<T> {

cli/src/postgres_cloner.rs

Lines changed: 78 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use pg_bigdecimal::PgNumeric;
1919
use postgres::error::SqlState;
2020
use postgres::types::{Kind, Type as PgType, FromSql};
2121
use postgres::{self, Client, RowIter, Row, Column, Statement, NoTls};
22+
use postgres::binary_copy::{BinaryCopyOutIter, BinaryCopyOutRow};
2223
use postgres::fallible_iterator::FallibleIterator;
2324
use parquet::schema::types::{Type as ParquetType, TypePtr, GroupTypeBuilder};
2425

@@ -31,7 +32,7 @@ use crate::datatypes::money::PgMoney;
3132
use crate::datatypes::numeric::{new_decimal_bytes_appender, new_decimal_int_appender};
3233
use crate::myfrom::{MyFrom, self};
3334
use crate::parquet_writer::{WriterStats, ParquetRowWriter, WriterSettings};
34-
use crate::pg_custom_types::{PgEnum, PgRawRange, PgAbstractRow, PgRawRecord, PgAny, PgAnyRef, UnclonableHack};
35+
use crate::pg_custom_types::{PgAbstractRow, PgAny, PgEnum, PgRawRecord, PgRawRange, PgAnyRef, UnclonableHack};
3536

3637
type ResolvedColumn<TRow> = (DynColumnAppender<TRow>, ParquetType);
3738

@@ -216,33 +217,93 @@ fn pg_connect(args: &PostgresConnArgs) -> Result<Client, String> {
216217
Ok(client)
217218
}
218219

219-
pub fn execute_copy(pg_args: &PostgresConnArgs, query: &str, output_file: &PathBuf, output_props: WriterPropertiesPtr, quiet: bool, schema_settings: &SchemaSettings) -> Result<WriterStats, String> {
220-
220+
pub fn execute_copy_query(pg_args: &PostgresConnArgs, query: &str, output_file: &PathBuf, output_props: WriterPropertiesPtr, quiet: bool, schema_settings: &SchemaSettings) -> Result<WriterStats, String> {
221221
let mut client = pg_connect(pg_args)?;
222+
222223
let statement = client.prepare(query).map_err(|db_err| { db_err.to_string() })?;
224+
let (row_appender, schema) = map_schema_root::<Row>(statement.columns(), schema_settings)?;
225+
226+
execute_with_writer(output_file, output_props, quiet, schema, row_appender, |row_writer| {
227+
let rows: RowIter = client.query_raw::<Statement, &i32, &[i32]>(&statement, &[])
228+
.map_err(|err| format!("Failed to execute the SQL query: {}", err))?;
229+
for row in rows.iterator() {
230+
let row = row.map_err(|err| err.to_string())?;
231+
let row = Arc::new(row);
232+
row_writer.write_row(row)?;
233+
}
234+
Ok(())
235+
})
236+
}
223237

224-
let (row_appender, schema) = map_schema_root(statement.columns(), schema_settings)?;
238+
pub fn execute_copy_table(pg_args: &PostgresConnArgs, table_name: &str, output_file: &PathBuf, output_props: WriterPropertiesPtr, quiet: bool, schema_settings: &SchemaSettings) -> Result<WriterStats, String> {
239+
let mut client = pg_connect(pg_args)?;
240+
241+
if !quiet {
242+
println!("Copying from table {} to {} using COPY with binary format...", table_name, output_file.display());
243+
}
244+
245+
// Get the table schema using a LIMIT 0 query
246+
let schema_query = format!("SELECT * FROM {} LIMIT 0", table_name);
247+
let statement = client.prepare(&schema_query)
248+
.map_err(|err| format!("Failed to prepare schema query: {}", err))?;
249+
250+
let (row_appender, schema) = map_schema_root::<Arc<BinaryCopyOutRow>>(statement.columns(), schema_settings)?;
251+
252+
execute_with_writer(output_file, output_props, quiet, schema, row_appender, |row_writer| {
253+
// Execute COPY TO STDOUT with binary format
254+
let copy_query = format!("COPY {} TO STDOUT (FORMAT BINARY)", table_name);
255+
let copy_reader = client.copy_out(&copy_query)
256+
.map_err(|err| format!("Failed to execute COPY command: {}", err))?;
257+
258+
// Get column types for the binary copy reader
259+
let column_types: Vec<postgres::types::Type> = statement.columns()
260+
.iter()
261+
.map(|col| col.type_().clone())
262+
.collect();
263+
264+
let mut binary_iter = BinaryCopyOutIter::new(copy_reader, &column_types);
265+
266+
// Process each binary row
267+
while let Some(binary_row) = binary_iter.next()
268+
.map_err(|err| format!("Failed to read binary row: {}", err))? {
269+
270+
// Wrap in Arc for the generic system
271+
let row = Arc::new(Arc::new(binary_row));
272+
row_writer.write_row(row)?;
273+
}
274+
Ok(())
275+
})
276+
}
277+
278+
fn execute_with_writer<T: PgAbstractRow + Clone, F>(
279+
output_file: &PathBuf,
280+
output_props: WriterPropertiesPtr,
281+
quiet: bool,
282+
schema: ParquetType,
283+
row_appender: DynColumnAppender<Arc<T>>,
284+
data_processor: F
285+
) -> Result<WriterStats, String>
286+
where
287+
F: FnOnce(&mut ParquetRowWriter<std::fs::File, T>) -> Result<(), String>
288+
{
225289
if !quiet {
226290
eprintln!("Schema: {}", format_schema(&schema, 0));
227291
}
228292
let schema = Arc::new(schema);
229293

230-
let settings = WriterSettings { row_group_byte_limit: 500 * 1024 * 1024, row_group_row_limit: output_props.max_row_group_size() };
294+
let settings = WriterSettings {
295+
row_group_byte_limit: 500 * 1024 * 1024,
296+
row_group_row_limit: output_props.max_row_group_size()
297+
};
231298

232-
let output_file_f = std::fs::File::create(output_file).unwrap();
299+
let output_file_f = std::fs::File::create(output_file)
300+
.map_err(|e| format!("Failed to create output file: {}", e))?;
233301
let pq_writer = SerializedFileWriter::new(output_file_f, schema.clone(), output_props)
234302
.map_err(|e| format!("Failed to create parquet writer: {}", e))?;
235303
let mut row_writer = ParquetRowWriter::new(pq_writer, schema.clone(), row_appender, quiet, settings)
236304
.map_err(|e| format!("Failed to create row writer: {}", e))?;
237305

238-
let rows: RowIter = client.query_raw::<Statement, &i32, &[i32]>(&statement, &[])
239-
.map_err(|err| format!("Failed to execute the SQL query: {}", err))?;
240-
for row in rows.iterator() {
241-
let row = row.map_err(|err| err.to_string())?;
242-
let row = Arc::new(row);
243-
244-
row_writer.write_row(row)?;
245-
}
306+
data_processor(&mut row_writer)?;
246307

247308
Ok(row_writer.close()?)
248309
}
@@ -329,8 +390,8 @@ fn count_columns(p: &ParquetType) -> usize {
329390
}
330391

331392

332-
fn map_schema_root<'a>(row: &[Column], s: &SchemaSettings) -> Result<ResolvedColumn<Arc<Row>>, String> {
333-
let mut fields: Vec<ResolvedColumn<Arc<Row>>> = vec![];
393+
fn map_schema_root<TRow: PgAbstractRow + 'static>(row: &[Column], s: &SchemaSettings) -> Result<ResolvedColumn<Arc<TRow>>, String> {
394+
let mut fields: Vec<ResolvedColumn<Arc<TRow>>> = vec![];
334395
for (col_i, c) in row.iter().enumerate() {
335396

336397
let t = c.type_();
@@ -342,7 +403,7 @@ fn map_schema_root<'a>(row: &[Column], s: &SchemaSettings) -> Result<ResolvedCol
342403

343404
let (column_appenders, parquet_types): (Vec<_>, Vec<_>) = fields.into_iter().unzip();
344405

345-
let merged_appender: DynColumnAppender<Arc<Row>> = Box::new(DynamicMergedAppender::new(column_appenders, 0, 0));
406+
let merged_appender: DynColumnAppender<Arc<TRow>> = Box::new(DynamicMergedAppender::new(column_appenders, 0, 0));
346407
let struct_type = ParquetType::group_type_builder("root")
347408
.with_fields(parquet_types.into_iter().map(Arc::new).collect())
348409
.build()

0 commit comments

Comments
 (0)