Skip to content

Commit 8207ff4

Browse files
srawat98-devsrawat
andauthored
Refactor TableStatsCollectorUtil (#417)
Refactors TableStatsCollectorUtil by extracting reusable helper methods from the populateCommitEventTablePartitions implementation. This improves code organization, testability, and enables future code reuse without changing any functionality. ## Summary <!--- HINT: Replace #nnn with corresponding Issue number, if you are fixing an existing issue --> This is a pure refactoring PR that extracts well-designed, reusable helper methods from inline code in populateCommitEventTablePartitions. The goal is to: - Improve code organization and readability - Create reusable building blocks for future features - Reduce code duplication - No functional changes - behavior remains identical. ## Changes - [ ] Client-facing API Changes - [ ] Internal API Changes - [ ] Bug Fixes - [ ] New Features - [ ] Performance Improvements - [ ] Code Style - [x] Refactoring - [ ] Documentation - [ ] Tests For all the boxes checked, please include additional details of the changes made in this pull request. ## Testing Done <!--- Check any relevant boxes with "x" --> - [x] Manually Tested on local docker setup. Please include commands ran, and their output. - [ ] Added new tests for the changes made. - [ ] Updated existing tests to reflect the changes made. - [ ] No tests added or updated. Please explain why. If unsure, please feel free to ask for help. - [ ] Some other form of testing like staging or soak time in production. Please explain. For all the boxes checked, include a detailed description of the testing done for the changes made in this pull request. # Additional Information - [ ] Breaking Changes - [ ] Deprecations - [ ] Large PR broken into smaller PRs, and PR plan linked in the description. For all the boxes checked, include additional details of the changes made in this pull request. --------- Co-authored-by: srawat <[email protected]>
1 parent 14bd9f0 commit 8207ff4

File tree

2 files changed

+90
-53
lines changed

2 files changed

+90
-53
lines changed

apps/spark/src/main/java/com/linkedin/openhouse/jobs/spark/TableStatsCollectionSparkApp.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ protected void publishStats(IcebergTableStats icebergTableStats) {
9696
}
9797

9898
/**
99-
* Publish commit events. Override this method in li-openhouse to send to Kafka.
99+
* Publish commit events.
100100
*
101101
* @param commitEvents List of commit events to publish
102102
*/
@@ -110,7 +110,7 @@ protected void publishCommitEvents(List<CommitEventTable> commitEvents) {
110110
}
111111

112112
/**
113-
* Publish partition-level commit events. Override this method in li-openhouse to send to Kafka.
113+
* Publish partition-level commit events.
114114
*
115115
* @param partitionEvents List of partition events to publish
116116
*/

apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollectorUtil.java

Lines changed: 88 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,74 @@ public static List<CommitEventTable> populateCommitEventTable(Table table, Spark
504504
return commitEventTableList;
505505
}
506506

507+
/**
508+
* Builds an enriched DataFrame containing partition data joined with commit metadata.
509+
*
510+
* <p>This shared helper method queries Iceberg metadata tables (all_entries and snapshots) and
511+
* creates a DataFrame with partition information enriched with commit metadata.
512+
*
513+
* <p>This is a pure query builder - it does not manage caching or counting. The caller is
514+
* responsible for the DataFrame lifecycle (cache, count, collect, unpersist).
515+
*
516+
* <p><b>Output Schema:</b>
517+
*
518+
* <ul>
519+
* <li>snapshot_id: long - Iceberg snapshot ID
520+
* <li>committed_at: long - Commit timestamp in epoch seconds
521+
* <li>operation: string - Commit operation (append, overwrite, delete, etc.)
522+
* <li>summary: map&lt;string,string&gt; - Commit summary metadata
523+
* <li>partition: struct - Partition column values as a struct
524+
* </ul>
525+
*
526+
* <p><b>For unpartitioned tables:</b> Returns null to indicate no partition data available.
527+
*
528+
* <p><b>Visibility:</b> Package-private for testing purposes.
529+
*
530+
* @param table Iceberg Table
531+
* @param spark SparkSession
532+
* @return DataFrame with enriched partition and commit data, or null if unpartitioned
533+
*/
534+
static Dataset<Row> buildEnrichedPartitionDataFrame(Table table, SparkSession spark) {
535+
String fullTableName = table.name();
536+
537+
// Check if table is partitioned
538+
PartitionSpec spec = table.spec();
539+
if (spec.isUnpartitioned()) {
540+
log.info("Table {} is unpartitioned, no enriched partition data to build", fullTableName);
541+
return null;
542+
}
543+
544+
// Query all_entries metadata table for partitions per commit
545+
// Use DISTINCT to deduplicate (snapshot_id, partition) pairs
546+
// No status filter - captures all affected partitions (ADDED or DELETED files)
547+
String allEntriesQuery =
548+
String.format(
549+
"SELECT DISTINCT snapshot_id, data_file.partition " + "FROM %s.all_entries",
550+
table.name());
551+
552+
log.info("Executing all_entries query for table {}: {}", fullTableName, allEntriesQuery);
553+
Dataset<Row> partitionsPerCommitDF = spark.sql(allEntriesQuery);
554+
555+
// Query snapshots to get commit metadata
556+
String snapshotsQuery =
557+
String.format(
558+
"SELECT snapshot_id, committed_at, operation, summary " + "FROM %s.snapshots",
559+
table.name());
560+
561+
Dataset<Row> snapshotsDF = spark.sql(snapshotsQuery);
562+
563+
// Join partitions with commit metadata and return
564+
// Caller manages the lifecycle (cache, count, collect, unpersist)
565+
return partitionsPerCommitDF
566+
.join(snapshotsDF, "snapshot_id")
567+
.select(
568+
functions.col("snapshot_id"),
569+
functions.col("committed_at").cast("long"), // Cast timestamp to epoch seconds
570+
functions.col("operation"),
571+
functions.col("summary"),
572+
functions.col("partition")); // Keep partition struct for transformation
573+
}
574+
507575
/**
508576
* Collect partition-level commit events for a table.
509577
*
@@ -532,14 +600,17 @@ public static List<CommitEventTablePartitions> populateCommitEventTablePartition
532600
String fullTableName = table.name();
533601
log.info("Collecting partition-level commit events for table: {}", fullTableName);
534602

535-
// Step 1: Check if table is partitioned
536-
PartitionSpec spec = table.spec();
537-
if (spec.isUnpartitioned()) {
538-
log.info("Table {} is unpartitioned, no partition events to collect", fullTableName);
603+
// Step 1: Build enriched DataFrame with partition and commit data using shared helper
604+
Dataset<Row> enrichedDF = buildEnrichedPartitionDataFrame(table, spark);
605+
606+
// Check if any data was found
607+
if (enrichedDF == null) {
608+
log.info("No partition-level commit events found for table: {}", fullTableName);
539609
return Collections.emptyList();
540610
}
541611

542-
// Step 2: Parse table name components
612+
// Step 2: Parse table name components for transformation
613+
PartitionSpec spec = table.spec();
543614
String dbName = getDatabaseName(fullTableName);
544615
if (dbName == null) {
545616
return Collections.emptyList();
@@ -555,61 +626,27 @@ public static List<CommitEventTablePartitions> populateCommitEventTablePartition
555626
List<String> partitionColumnNames =
556627
spec.fields().stream().map(f -> f.name()).collect(Collectors.toList());
557628

558-
// Step 3: Query all_entries metadata table for partitions per commit
559-
// Use DISTINCT to deduplicate (snapshot_id, partition) pairs
560-
// No status filter - captures all affected partitions (ADDED or DELETED files)
561-
// Note: We query snapshots here even though populateCommitEventTable() also queries it.
562-
// This is intentional to maintain parallel execution (both methods run simultaneously).
563-
// Snapshots query is fast (~10-50ms, hits Iceberg metadata cache).
564-
String allEntriesQuery =
565-
String.format(
566-
"SELECT DISTINCT snapshot_id, data_file.partition " + "FROM %s.all_entries",
567-
table.name());
568-
569-
log.info("Executing all_entries query: {}", allEntriesQuery);
570-
Dataset<Row> partitionsPerCommitDF = spark.sql(allEntriesQuery);
629+
// Step 3: Manage DataFrame lifecycle and collect to driver
630+
// Cache BEFORE first action to materialize and reuse for collection
631+
enrichedDF.cache();
571632

572-
// Cache for reuse
573-
partitionsPerCommitDF.cache();
574-
long totalRecords = partitionsPerCommitDF.count();
633+
// Count triggers cache materialization (single join execution)
634+
long totalRecords = enrichedDF.count();
575635

636+
// Early return if no data found (after cache materialization)
576637
if (totalRecords == 0) {
577-
log.info("No partition-level commit events found for table: {}", fullTableName);
578-
partitionsPerCommitDF.unpersist();
638+
log.info("No partition-level records found for table: {}", fullTableName);
639+
enrichedDF.unpersist();
579640
return Collections.emptyList();
580641
}
581642

582-
log.info(
583-
"Found {} partition-level commit event records for table: {}", totalRecords, fullTableName);
584-
585-
// Step 4: Join with snapshots to get commit metadata
586-
String snapshotsQuery =
587-
String.format(
588-
"SELECT snapshot_id, committed_at, operation, summary " + "FROM %s.snapshots",
589-
table.name());
590-
591-
Dataset<Row> snapshotsDF = spark.sql(snapshotsQuery);
592-
593-
// Join partitions with commit metadata
594-
Dataset<Row> enrichedDF =
595-
partitionsPerCommitDF
596-
.join(snapshotsDF, "snapshot_id")
597-
.select(
598-
functions.col("snapshot_id"),
599-
functions.col("committed_at").cast("long"), // Cast timestamp to epoch seconds
600-
functions.col("operation"),
601-
functions.col("summary"),
602-
functions.col("partition")); // Keep partition struct for Java transformation
603-
604-
// Step 6: Collect to driver and transform in Java with type safety
605-
// This matches populateCommitEventTable() pattern which also uses collectAsList()
606-
// Size is manageable: typically 100K rows × 200 bytes = 20MB
607643
log.info("Collecting {} rows to driver for transformation", totalRecords);
608-
List<Row> rows = enrichedDF.collectAsList();
644+
List<Row> rows = enrichedDF.collectAsList(); // Uses cached data
609645

610-
partitionsPerCommitDF.unpersist();
646+
// Unpersist immediately after collection to free memory
647+
enrichedDF.unpersist();
611648

612-
// Step 7: Delegate transformation to helper method
649+
// Step 4: Delegate transformation to helper method
613650
// Separated for testability and readability
614651
List<CommitEventTablePartitions> result =
615652
transformRowsToPartitionEvents(

0 commit comments

Comments
 (0)