Skip to content

Commit 972d2bf

Browse files
authored
[Kernel]Incrementally loading domain metadata from CRC (delta-io#4502)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [x] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> This PR enhanced Domain metadata loading with CRC - If current CRC not found and previous found, combined with log replay from current version until crc's version + 1 ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Adjusted unit tests, by moving the P&M only ones to the shared suite ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> No
1 parent d47c09a commit 972d2bf

File tree

4 files changed

+100
-95
lines changed

4 files changed

+100
-95
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,12 @@ public LogReplay(
165165
loadTableProtocolAndMetadata(
166166
engine, logSegment, newerSnapshotHint, logSegment.getVersion()));
167167
// Lazy loading of domain metadata only when needed
168-
this.activeDomainMetadataMap = new Lazy<>(() -> loadDomainMetadataMap(engine));
168+
this.activeDomainMetadataMap =
169+
new Lazy<>(
170+
() ->
171+
loadDomainMetadataMap(engine).entrySet().stream()
172+
.filter(entry -> !entry.getValue().isRemoved())
173+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
169174
}
170175

171176
/////////////////
@@ -387,18 +392,34 @@ engine, getLogReplayFiles(logSegment), SET_TRANSACTION_READ_SCHEMA, Optional.emp
387392
*/
388393
private Map<String, DomainMetadata> loadDomainMetadataMap(Engine engine) {
389394
// First try to load from CRC info if available
390-
Optional<CRCInfo> currentCrcInfo = getCurrentCrcInfo();
391-
if (currentCrcInfo.isPresent() && currentCrcInfo.get().getDomainMetadata().isPresent()) {
392-
return currentCrcInfo.get().getDomainMetadata().get().stream()
395+
Optional<CRCInfo> lastSeenCrcInfoOpt = crcInfoContext.getLastSeenCrcInfo();
396+
if (!lastSeenCrcInfoOpt.isPresent()
397+
|| !lastSeenCrcInfoOpt.get().getDomainMetadata().isPresent()) {
398+
logger.info("No domain metadata available in CRC info, loading from log");
399+
return loadDomainMetadataMapFromLog(engine, Optional.empty());
400+
}
401+
CRCInfo lastSeenCrcInfo = lastSeenCrcInfoOpt.get();
402+
if (lastSeenCrcInfo.getVersion() == logSegment.getVersion()) {
403+
return lastSeenCrcInfo.getDomainMetadata().get().stream()
393404
.collect(Collectors.toMap(DomainMetadata::getDomain, Function.identity()));
394405
}
395-
// TODO https://github.com/delta-io/delta/issues/4454: Incrementally load domain metadata from
396-
// CRC when current CRC is not available.
397-
// Fall back to loading from the log
398-
logger.info("No domain metadata available in CRC info, loading from log");
399-
return loadDomainMetadataMapFromLog(engine).entrySet().stream()
400-
.filter(entry -> !entry.getValue().isRemoved())
401-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
406+
407+
Map<String, DomainMetadata> finalDomainMetadataMap =
408+
loadDomainMetadataMapFromLog(engine, Optional.of(lastSeenCrcInfo.getVersion() + 1));
409+
// Add domains from the CRC that don't exist in the incremental log data
410+
// - If a domain is updated to the newer versions or removed, it will exist in
411+
// finalDomainMetadataMap, use the one in the map.
412+
// - If a domain is only in the CRC file, use the one from CRC.
413+
lastSeenCrcInfo
414+
.getDomainMetadata()
415+
.get()
416+
.forEach(
417+
domainMetadataInCrc -> {
418+
if (!finalDomainMetadataMap.containsKey(domainMetadataInCrc.getDomain())) {
419+
finalDomainMetadataMap.put(domainMetadataInCrc.getDomain(), domainMetadataInCrc);
420+
}
421+
});
422+
return finalDomainMetadataMap;
402423
}
403424

404425
/**
@@ -409,11 +430,15 @@ private Map<String, DomainMetadata> loadDomainMetadataMap(Engine engine) {
409430
* #loadTableProtocolAndMetadata}.
410431
*
411432
* @param engine The engine used to process the log files.
433+
* @param minLogVersion The minimum log version to read (inclusive). When provided, only reads log
434+
* files * starting from this version. When not provided, reads the entire log. * For
435+
* incremental loading from crc, this is typically set to (crc version + 1).
412436
* @return A map where the keys are domain names and the values are the corresponding {@link
413437
* DomainMetadata} objects.
414438
* @throws UncheckedIOException if an I/O error occurs while closing the iterator.
415439
*/
416-
private Map<String, DomainMetadata> loadDomainMetadataMapFromLog(Engine engine) {
440+
private Map<String, DomainMetadata> loadDomainMetadataMapFromLog(
441+
Engine engine, Optional<Long> minLogVersion) {
417442
try (CloseableIterator<ActionWrapper> reverseIter =
418443
new ActionsIterator(
419444
engine,
@@ -422,14 +447,19 @@ private Map<String, DomainMetadata> loadDomainMetadataMapFromLog(Engine engine)
422447
Optional.empty() /* checkpointPredicate */)) {
423448
Map<String, DomainMetadata> domainMetadataMap = new HashMap<>();
424449
while (reverseIter.hasNext()) {
425-
final ColumnarBatch columnarBatch = reverseIter.next().getColumnarBatch();
450+
final ActionWrapper nextElem = reverseIter.next();
451+
final long version = nextElem.getVersion();
452+
final ColumnarBatch columnarBatch = nextElem.getColumnarBatch();
426453
assert (columnarBatch.getSchema().equals(DOMAIN_METADATA_READ_SCHEMA));
427454

428455
final ColumnVector dmVector = columnarBatch.getColumnVector(0);
429456

430457
// We are performing a reverse log replay. This function ensures that only the first
431458
// encountered domain metadata for each domain is added to the map.
432459
DomainMetadataUtils.populateDomainMetadataMap(dmVector, domainMetadataMap);
460+
if (minLogVersion.isPresent() && minLogVersion.get() == version) {
461+
break;
462+
}
433463
}
434464
return domainMetadataMap;
435465
} catch (IOException ex) {

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ChecksumLogReplayMetricsTestBase.scala

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,4 +145,61 @@ trait ChecksumLogReplayMetricsTestBase extends LogReplayBaseSuite {
145145
readVersion = 10)
146146
}
147147
}
148+
149+
test("checksum not found at the read version, but found at a previous version") {
150+
withTableWithCrc { (table, tablePath, engine) =>
151+
deleteChecksumFileForTable(tablePath, Seq(10, 11, 5, 6))
152+
153+
loadSnapshotFieldsCheckMetrics(
154+
table,
155+
engine,
156+
expJsonVersionsRead = Seq(11),
157+
expParquetVersionsRead = Seq(10),
158+
expParquetReadSetSizes = getExpectedCheckpointReadSize(Seq(1)),
159+
expChecksumReadSet = Nil)
160+
161+
loadSnapshotFieldsCheckMetrics(
162+
table,
163+
engine,
164+
// We find the checksum from crc at version 4, but still read commit files 5 and 6
165+
// to find the P&M which could have been updated in version 5 and 6.
166+
expJsonVersionsRead = Seq(6, 5),
167+
expParquetVersionsRead = Nil,
168+
expParquetReadSetSizes = Nil,
169+
expChecksumReadSet = Seq(4),
170+
readVersion = 6)
171+
172+
// now try to load version 3 and it should get P&M from checksum files only
173+
loadSnapshotFieldsCheckMetrics(
174+
table,
175+
engine,
176+
// We find the checksum from crc at version 3, so shouldn't read anything else
177+
expJsonVersionsRead = Nil,
178+
expParquetVersionsRead = Nil,
179+
expParquetReadSetSizes = Nil,
180+
expChecksumReadSet = Seq(3),
181+
readVersion = 3)
182+
}
183+
}
184+
185+
test(
186+
"checksum missing read version, " +
187+
"both checksum and checkpoint exist the read version the previous version => use checksum") {
188+
withTableWithCrc { (table, tablePath, engine) =>
189+
val checkpointVersion = 10
190+
val readVersion = checkpointVersion + 1
191+
deleteChecksumFileForTable(tablePath, Seq(checkpointVersion + 1))
192+
193+
// 11.crc, missing, 10.crc and 10.checkpoint.parquet exist.
194+
// read 10.crc and 11.json.
195+
loadSnapshotFieldsCheckMetrics(
196+
table,
197+
engine,
198+
expJsonVersionsRead = Seq(readVersion),
199+
expParquetVersionsRead = Nil,
200+
expParquetReadSetSizes = Nil,
201+
expChecksumReadSet = Seq(checkpointVersion),
202+
readVersion = readVersion)
203+
}
204+
}
148205
}

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DomainMetadataCheckSumReplayMetricsSuite.scala

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -54,26 +54,6 @@ class DomainMetadataCheckSumReplayMetricsSuite extends ChecksumLogReplayMetricsT
5454
sizes.flatMap(size => Seq(size, size))
5555
}
5656

57-
// TODO: Remove to ChecksumLogReplayMetricsTestBase
58-
// after incremental CRC loading for domain metadata implemented
59-
test("checksum missing at read version and checkpoint version (readVersion - 1) " +
60-
" => use checkpoint") {
61-
withTableWithCrc { (table, tablePath, engine) =>
62-
val checkpointVersion = 10
63-
val readVersion = checkpointVersion + 1
64-
deleteChecksumFileForTable(tablePath, Seq(readVersion))
65-
66-
loadSnapshotFieldsCheckMetrics(
67-
table,
68-
engine,
69-
expJsonVersionsRead = Seq(readVersion),
70-
expParquetVersionsRead = Seq(checkpointVersion),
71-
expParquetReadSetSizes = Seq(1),
72-
expChecksumReadSet = Seq(checkpointVersion),
73-
readVersion = readVersion)
74-
}
75-
}
76-
7757
test("read domain metadata fro checksum even if snapshot hint exists") {
7858
withTableWithCrc { (table, _, engine) =>
7959
val readVersion = 11

kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/PandMCheckSumLogReplayMetricsSuite.scala

Lines changed: 0 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -96,66 +96,4 @@ class PandMCheckSumLogReplayMetricsSuite extends ChecksumLogReplayMetricsTestBas
9696
readVersion = 6)
9797
}
9898
}
99-
100-
// TODO: Move to ChecksumLogReplayMetricsTestBase
101-
// after incremental CRC loading for domain metadata implemented
102-
test("checksum not found at the read version, but found at a previous version") {
103-
withTableWithCrc { (table, tablePath, engine) =>
104-
deleteChecksumFileForTable(tablePath, Seq(10, 11, 5, 6))
105-
106-
loadSnapshotFieldsCheckMetrics(
107-
table,
108-
engine,
109-
// 10.checkpoint found, so use it and combined with 11.crc
110-
expJsonVersionsRead = Seq(11),
111-
expParquetVersionsRead = Seq(10),
112-
expParquetReadSetSizes = getExpectedCheckpointReadSize(Seq(1)),
113-
expChecksumReadSet = Nil)
114-
115-
loadSnapshotFieldsCheckMetrics(
116-
table,
117-
engine,
118-
// We find the checksum from crc at version 4, but still read commit files 5 and 6
119-
// to find the P&M which could have been updated in version 5 and 6.
120-
expJsonVersionsRead = Seq(6, 5),
121-
expParquetVersionsRead = Nil,
122-
expParquetReadSetSizes = Nil,
123-
expChecksumReadSet = Seq(4),
124-
readVersion = 6)
125-
126-
// now try to load version 3 and it should get P&M from checksum files only
127-
loadSnapshotFieldsCheckMetrics(
128-
table,
129-
engine,
130-
// We find the checksum from crc at version 3, so shouldn't read anything else
131-
expJsonVersionsRead = Nil,
132-
expParquetVersionsRead = Nil,
133-
expParquetReadSetSizes = Nil,
134-
expChecksumReadSet = Seq(3),
135-
readVersion = 3)
136-
}
137-
}
138-
139-
// TODO: Move to ChecksumLogReplayMetricsTestBase
140-
// after incremental CRC loading for domain metadata implemented
141-
test(
142-
"checksum missing read version, " +
143-
"both checksum and checkpoint exist the read version the previous version => use checksum") {
144-
withTableWithCrc { (table, tablePath, engine) =>
145-
val checkpointVersion = 10
146-
val readVersion = checkpointVersion + 1
147-
deleteChecksumFileForTable(tablePath, Seq(checkpointVersion + 1))
148-
149-
// 11.crc, missing, 10.crc and 10.checkpoint.parquet exist.
150-
// read 10.crc and 11.json.
151-
loadSnapshotFieldsCheckMetrics(
152-
table,
153-
engine,
154-
expJsonVersionsRead = Seq(readVersion),
155-
expParquetVersionsRead = Nil,
156-
expParquetReadSetSizes = Nil,
157-
expChecksumReadSet = Seq(checkpointVersion),
158-
readVersion = readVersion)
159-
}
160-
}
16199
}

0 commit comments

Comments
 (0)