Skip to content

Commit 89ab186

Browse files
Add more tracing and instrumentation
Signed-off-by: Abhi Agarwal <[email protected]>
1 parent f1521ae commit 89ab186

File tree

5 files changed

+39
-1
lines changed

5 files changed

+39
-1
lines changed

etl-destinations/src/deltalake/maintenance.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::sync::Arc;
44
use deltalake::DeltaTable;
55
use tokio::sync::Mutex;
66
use tokio::task::JoinHandle;
7-
use tracing::{error, trace};
7+
use tracing::{debug, error, instrument, trace};
88

99
use etl::types::TableId;
1010

@@ -38,6 +38,7 @@ impl TableMaintenanceState {
3838

3939
/// Await any in-flight compaction, then if the `compact_after_commits` threshold is met,
4040
/// run compaction. This guarantees serialization of compaction runs relative to table writes.
41+
#[instrument(skip(self, table, config), fields(table_id = table_id.0, table_version))]
4142
pub async fn maybe_run_compaction(
4243
self: &Arc<Self>,
4344
table_id: TableId,
@@ -69,6 +70,11 @@ impl TableMaintenanceState {
6970
};
7071

7172
if !should_compact {
73+
debug!(
74+
table_id = table_id.0,
75+
version = table_version,
76+
"Skipping Delta table compaction task"
77+
);
7278
return;
7379
}
7480

@@ -103,6 +109,7 @@ impl TableMaintenanceState {
103109

104110
/// Await any in-flight Z-ordering, then if the `z_order_after_commits` threshold is met,
105111
/// run Z-order. Serializes Z-order runs relative to table writes.
112+
#[instrument(skip(self, table, config), fields(table_id = table_id.0, table_version))]
106113
pub async fn maybe_run_zorder(
107114
self: &Arc<Self>,
108115
table_id: TableId,
@@ -140,6 +147,11 @@ impl TableMaintenanceState {
140147
};
141148

142149
if !should_zorder {
150+
debug!(
151+
table_id = table_id.0,
152+
version = table_version,
153+
"Skipping Delta table Z-order task"
154+
);
143155
return;
144156
}
145157

etl-destinations/src/deltalake/operations/append.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,23 @@ use deltalake::{
33
arrow::array::RecordBatch,
44
writer::{DeltaWriter, RecordBatchWriter},
55
};
6+
use tracing::{instrument, trace};
67

78
use crate::deltalake::config::DeltaTableConfig;
89

910
/// Appends a record batch to a Delta table
11+
#[instrument(skip(table, config, record_batch), fields(num_rows = record_batch.num_rows()))]
1012
pub async fn append_to_table(
1113
table: &mut DeltaTable,
1214
config: &DeltaTableConfig,
1315
record_batch: RecordBatch,
1416
) -> DeltaResult<()> {
17+
trace!("Creating RecordBatchWriter for append");
1518
let mut writer = RecordBatchWriter::for_table(table)?;
1619
writer = writer.with_writer_properties(config.into());
20+
trace!("Writing record batch to Delta table");
1721
writer.write(record_batch).await?;
22+
trace!("Flushing and committing append");
1823
writer.flush_and_commit(table).await?;
1924
Ok(())
2025
}

etl-destinations/src/deltalake/operations/delete.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,22 @@ use deltalake::{
33
};
44

55
use crate::deltalake::config::DeltaTableConfig;
6+
use tracing::{instrument, trace};
67

8+
#[instrument(skip(table, config, delete_predicate))]
79
pub async fn delete_from_table(
810
table: &mut DeltaTable,
911
config: &DeltaTableConfig,
1012
delete_predicate: Expr,
1113
) -> DeltaResult<()> {
14+
trace!("Building delete builder with predicate");
1215
let delete_builder = DeleteBuilder::new((*table).log_store(), table.snapshot()?.clone())
1316
.with_predicate(delete_predicate)
1417
.with_writer_properties(config.into());
1518
// TODO(abhi): Do something with the metrics
19+
trace!("Executing delete operation");
1620
let (deleted_table, _metrics) = delete_builder.await?;
1721
*table = deleted_table;
22+
trace!("Delete operation completed");
1823
Ok(())
1924
}

etl-destinations/src/deltalake/operations/merge.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use deltalake::datafusion::prelude::SessionContext;
44
use deltalake::operations::merge::MergeBuilder;
55
use deltalake::{DeltaResult, DeltaTable, datafusion::prelude::Expr};
66
use etl::types::{TableRow as PgTableRow, TableSchema as PgTableSchema};
7+
use tracing::{instrument, trace};
78

89
use crate::arrow::rows_to_record_batch;
910
use crate::deltalake::config::DeltaTableConfig;
@@ -14,13 +15,18 @@ pub(crate) fn source_qualified_column_expr(column_name: &str, source_alias: &str
1415
Expr::Column(Column::new(Some(source_alias), column_name))
1516
}
1617

18+
#[instrument(
19+
skip(table, config, table_schema, upsert_rows, delete_predicate),
20+
fields(upsert_count = upsert_rows.len(), has_delete = delete_predicate.is_some())
21+
)]
1722
pub async fn merge_to_table(
1823
table: &mut DeltaTable,
1924
config: &DeltaTableConfig,
2025
table_schema: &PgTableSchema,
2126
upsert_rows: &[PgTableRow],
2227
delete_predicate: Option<Expr>,
2328
) -> DeltaResult<()> {
29+
trace!("Building Arrow schema and source batch for merge");
2430
let arrow_schema = postgres_to_arrow_schema(table_schema)?;
2531
let rows = rows_to_record_batch(upsert_rows, arrow_schema)?;
2632

@@ -38,6 +44,7 @@ pub async fn merge_to_table(
3844
let qualified_primary_keys = qualify_primary_keys(primary_keys, "source", "target")
3945
.ok_or(DeltaTableError::generic("Failed to qualify primary keys"))?;
4046

47+
trace!("Creating merge builder");
4148
let merge_builder = MergeBuilder::new(
4249
// TODO(abhi): Is there a way to do this while avoiding the clone/general hackiness?
4350
(*table).log_store(),
@@ -79,7 +86,9 @@ pub async fn merge_to_table(
7986
.when_not_matched_by_source_delete(|delete| delete.predicate(delete_predicate))?;
8087
}
8188
// TODO(abhi): Do something with the metrics
89+
trace!("Executing merge operation");
8290
let (merged_table, _metrics) = merge_builder.await?;
91+
trace!("Merge operation completed");
8392
*table = merged_table;
8493
Ok(())
8594
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,40 @@
11
use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType};
22
use deltalake::parquet::file::properties::WriterProperties;
33
use deltalake::{DeltaResult, DeltaTable};
4+
use tracing::{instrument, trace};
45

56
use crate::deltalake::config::DeltaTableConfig;
67

78
/// Optimizes a Delta table by compacting small files into larger ones.
9+
#[instrument(skip(table, config))]
810
pub async fn compact_table(table: &mut DeltaTable, config: &DeltaTableConfig) -> DeltaResult<()> {
11+
trace!("Starting table compaction");
912
let writer_properties = WriterProperties::from(config);
1013
let optimize_builder = OptimizeBuilder::new(table.log_store(), table.snapshot()?.clone());
1114
let (optimized_table, _metrics) = optimize_builder
1215
.with_writer_properties(writer_properties)
1316
.with_type(OptimizeType::Compact)
1417
.await?;
1518
*table = optimized_table;
19+
trace!("Finished table compaction");
1620
Ok(())
1721
}
1822

1923
/// Optimizes a Delta table by performing Z-order clustering on the provided columns.
24+
#[instrument(skip(table, config, columns), fields(columns = ?columns))]
2025
pub async fn zorder_table(
2126
table: &mut DeltaTable,
2227
config: &DeltaTableConfig,
2328
columns: Vec<String>,
2429
) -> DeltaResult<()> {
30+
trace!("Starting table Z-order optimization");
2531
let writer_properties = WriterProperties::from(config);
2632
let optimize_builder = OptimizeBuilder::new(table.log_store(), table.snapshot()?.clone());
2733
let (optimized_table, _metrics) = optimize_builder
2834
.with_writer_properties(writer_properties)
2935
.with_type(OptimizeType::ZOrder(columns))
3036
.await?;
3137
*table = optimized_table;
38+
trace!("Finished table Z-order optimization");
3239
Ok(())
3340
}

0 commit comments

Comments
 (0)