Add upsert support for offline tables#17789
Conversation
Extend Apache Pinot's upsert (primary-key deduplication) feature to work with OFFLINE tables, not just REALTIME tables. This enables batch-ingested data to leverage primary-key-based deduplication. Key changes: - Relax REALTIME-only validation to allow OFFLINE tables with upsert config - Add three-level comparison column fallback: configured columns -> time column -> segment creation time - Add ConstantComparisonColumnReader for segment-creation-time-based comparison - Extend OfflineTableDataManager with full upsert lifecycle (init, addSegment, replaceSegment, getSegmentContexts, shutdown) - Update query executor and admin APIs to support offline upsert tables - Add integration test validating end-to-end offline upsert behavior Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
5449717 to
a966e7f
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #17789 +/- ##
============================================
- Coverage 63.22% 63.20% -0.03%
Complexity 1454 1454
============================================
Files 3183 3183
Lines 191425 191611 +186
Branches 29273 29307 +34
============================================
+ Hits 121023 121100 +77
- Misses 60964 61034 +70
- Partials 9438 9477 +39
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Extends Pinot’s upsert (primary-key deduplication) feature to OFFLINE tables, enabling batch-ingested segments to participate in upsert semantics, including comparison-column fallback logic and server-side introspection APIs.
Changes:
- Enable/validate upsert & dedup configurations for OFFLINE tables and allow empty comparison columns (fallback behavior).
- Add comparison fallback down to segment creation time and wire segment-context/upsert-metadata paths for offline querying.
- Extend server APIs/metrics paths to surface primary-key counts for offline upsert tables and add an offline upsert integration test.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java |
Relaxes REALTIME-only validation and adjusts validations for OFFLINE upsert/dedup. |
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java |
Adds comparison-column fallback: configured → time column → empty (segment creation time). |
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertContext.java |
Allows empty comparison-column list (non-null requirement only). |
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java |
Adds constant comparison-value reader to support segment-creation-time comparison. |
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java |
Creates RecordInfoReader with segment creation time when comparison columns are empty; adjusts TTL gating. |
pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/OfflineTableDataManager.java |
Implements upsert lifecycle for offline tables (init/add/replace/segment contexts/shutdown). |
pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java |
Includes offline upsert tables in the “newly added segments” and segment-context path. |
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java |
Exposes offline partition→PK-count in table metadata response. |
pinot-server/src/main/java/org/apache/pinot/server/api/resources/PrimaryKeyCount.java |
Counts PKs for offline upsert tables on the server instance. |
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java |
Updates tests for relaxed OFFLINE validation and updated error messages. |
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineUpsertTableIntegrationTest.java |
Adds end-to-end integration test coverage for offline upsert. |
...ests/src/test/java/org/apache/pinot/integration/tests/OfflineUpsertTableIntegrationTest.java
Outdated
Show resolved
Hide resolved
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
Show resolved
Hide resolved
- Fix dropOfflineTable() call to use raw table name instead of already-suffixed OFFLINE_TABLE_NAME (avoids double _OFFLINE suffix) - Remove unused OFFLINE_TABLE_NAME constant and TableNameBuilder import - Add null check for time column in validateTTLForUpsertConfig() to prevent NPE when TTL is configured without a time column or comparison columns Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| Integer partitionId = SegmentUtils.getSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, null); | ||
| Preconditions.checkNotNull(partitionId, "Failed to get partition id for segment: " + segmentName | ||
| + " (upsert-enabled table: " + _tableNameWithType + ")"); | ||
| PartitionUpsertMetadataManager partitionUpsertMetadataManager = | ||
| _tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId); | ||
|
|
||
| _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.DOCUMENT_COUNT, | ||
| immutableSegment.getSegmentMetadata().getTotalDocs()); | ||
| _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L); | ||
| ImmutableSegmentDataManager newSegmentManager = new ImmutableSegmentDataManager(immutableSegment); | ||
| SegmentDataManager oldSegmentManager = _segmentDataManagerMap.get(segmentName); | ||
| if (oldSegmentManager == null) { | ||
| // When adding a new segment, we should register it 'before' it is fully initialized by | ||
| // partitionUpsertMetadataManager. Because when processing docs in the new segment, the docs in the other | ||
| // segments may be invalidated, making the queries see less valid docs than expected. We should let query | ||
| // access the new segment asap even though its validDocId bitmap is still being filled by | ||
| // partitionUpsertMetadataManager. | ||
| registerSegment(segmentName, newSegmentManager, partitionUpsertMetadataManager); | ||
| partitionUpsertMetadataManager.trackNewlyAddedSegment(segmentName); | ||
| partitionUpsertMetadataManager.addSegment(immutableSegment); |
There was a problem hiding this comment.
Offline upsert path doesn’t handle the partition upsert metadata manager’s preload mode. Unlike RealtimeTableDataManager, this code never calls preloadSegments(...) and doesn’t branch on partitionUpsertMetadataManager.isPreloading() to use preloadSegment(...). If preload is enabled in the upsert config, the preload flag may remain true and segments will be processed via the slower add/replace path (and potentially with different registration ordering than intended). Consider adding the preload trigger (based on ZK metadata partition id) and a preload branch mirroring the realtime implementation.
| TableUpsertMetadataManager tumm = getTableUpsertMetadataManager(tableDataManager); | ||
| boolean isUsingConsistencyMode = | ||
| rtdm.getTableUpsertMetadataManager().getContext().getConsistencyMode() != UpsertConfig.ConsistencyMode.NONE; | ||
| tumm.getContext().getConsistencyMode() != UpsertConfig.ConsistencyMode.NONE; | ||
| if (isUsingConsistencyMode) { |
There was a problem hiding this comment.
getTableUpsertMetadataManager(...) is annotated as nullable but its return value is dereferenced unconditionally when isUpsertTable(tableDataManager) is true. While current isUpsertTable checks make this safe, the nullable contract is misleading and can mask future regressions (or trigger nullness warnings). Consider enforcing non-null here (e.g., Preconditions.checkState(tumm != null, ...) / Objects.requireNonNull) and/or making the helper return non-null for the supported manager types.
Summary
OfflineTableDataManager(init, addSegment, replaceSegment, getSegmentContexts, shutdown)TablesResource,PrimaryKeyCount), and validation to support offline upsert tablesFiles Changed
TableConfigUtils.javaBaseTableUpsertMetadataManager.javaUpsertContext.javaUpsertUtils.javaConstantComparisonColumnReaderfor segment-creation-time comparisonBasePartitionUpsertMetadataManager.javacreateRecordInfoReader()helper with segment creation time fallbackOfflineTableDataManager.javaSingleTableExecutionInfo.javaOfflineTableDataManagerin query pathTablesResource.javaPrimaryKeyCount.javaTableConfigUtilsTest.javaOfflineUpsertTableIntegrationTest.javaTest plan
TableConfigUtilsTest— 45 tests pass (updated expected error messages for OFFLINE tables)BasePartitionUpsertMetadataManagerTest+ConcurrentMapPartitionUpsertMetadataManager*— 50 tests passTablesResourceTest— 16 tests passPrimaryKeyCountTest— 7 tests passOfflineUpsertTableIntegrationTest— new integration test validates end-to-end offline upsert (dedup query results, segment replacement, skipUpsert option)🤖 Generated with Claude Code