Skip to content

Commit 741e85c

Browse files
committed
Finish basic SnapshotManager
1 parent d512a22 commit 741e85c

File tree

10 files changed

+186
-22
lines changed

10 files changed

+186
-22
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -803,7 +803,7 @@ lazy val core = (project in file("core"))
803803
commonSettings,
804804
skipReleaseSettings,
805805
libraryDependencies ++= Seq(
806-
// "com.google.guava" % "guava" % "31.1-jre" // Streams.stream
806+
807807
)
808808
)
809809

core/src/main/java/io/delta/core/internal/TableImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import io.delta.core.Table;
88
import io.delta.core.helpers.TableHelper;
99
import io.delta.core.internal.checkpoint.Checkpointer;
10+
import io.delta.core.internal.snapshot.SnapshotManager;
1011

1112
public class TableImpl implements Table {
1213

core/src/main/java/io/delta/core/internal/checkpoint/CheckpointInstance.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.delta.core.internal.checkpoint;
22

3+
import java.util.Collections;
4+
import java.util.List;
35
import java.util.Optional;
46

57
import io.delta.core.internal.lang.Ordered;
@@ -29,6 +31,13 @@ boolean isNotLaterThan(CheckpointInstance other) {
2931
return version <= other.version;
3032
}
3133

34+
public List<String> getCorrespondingFiles(String path) {
35+
assert (this != CheckpointInstance.MAX_VALUE) : "Can't get files for CheckpointVersion.MaxValue.";
36+
return numParts
37+
.map(parts -> FileNames.checkpointFileWithParts(path, version, parts))
38+
.orElseGet(() -> Collections.singletonList(FileNames.checkpointFileSingular(path, version)));
39+
}
40+
3241
@Override
3342
public int compareTo(CheckpointInstance that) {
3443
if (version == that.version) {
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package io.delta.core.internal.checksum;
2+
3+
public class VersionChecksum {
4+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package io.delta.core.internal.lang;
2+
3+
public class Optional {
4+
// exists: Optional.map(v -> f(v)).orElse(false)
5+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package io.delta.core.internal.snapshot;
2+
3+
public class InitialSnapshot extends SnapshotImpl {
4+
public InitialSnapshot() {
5+
// TODO
6+
super(null, -1, null, null, -1);
7+
}
8+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package io.delta.core.internal.snapshot;
2+
3+
import io.delta.core.ScanBuilder;
4+
import io.delta.core.Snapshot;
5+
import io.delta.core.internal.LogSegment;
6+
import io.delta.core.internal.TableImpl;
7+
import io.delta.core.types.StructType;
8+
9+
public class SnapshotImpl implements Snapshot {
10+
private final String path;
11+
private final long version;
12+
private final LogSegment logSegment;
13+
private final TableImpl tableImpl;
14+
private final long timestamp;
15+
16+
public SnapshotImpl(
17+
String path,
18+
long version,
19+
LogSegment logSegment,
20+
TableImpl tableImpl,
21+
long timestamp) {
22+
this.path = path;
23+
this.version = version;
24+
this.logSegment = logSegment;
25+
this.tableImpl = tableImpl;
26+
this.timestamp = timestamp;
27+
}
28+
29+
@Override
30+
public StructType getSchema() {
31+
return null;
32+
}
33+
34+
@Override
35+
public ScanBuilder getScanBuilder() {
36+
return null;
37+
}
38+
}

core/src/main/java/io/delta/core/internal/SnapshotManager.java renamed to core/src/main/java/io/delta/core/internal/snapshot/SnapshotManager.java

Lines changed: 82 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,28 @@
1-
package io.delta.core.internal;
1+
package io.delta.core.internal.snapshot;
22

33
import java.io.FileNotFoundException;
44
import java.util.*;
55
import java.util.stream.Collectors;
66

77
import io.delta.core.Snapshot;
88
import io.delta.core.fs.FileStatus;
9+
import io.delta.core.internal.LogSegment;
10+
import io.delta.core.internal.TableImpl;
911
import io.delta.core.internal.checkpoint.CheckpointInstance;
1012
import io.delta.core.internal.checkpoint.CheckpointMetaData;
1113
import io.delta.core.internal.checkpoint.Checkpointer;
14+
import io.delta.core.internal.checksum.VersionChecksum;
1215
import io.delta.core.internal.lang.ListUtils;
1316
import io.delta.core.internal.lang.Tuple2;
1417
import io.delta.core.internal.util.FileNames;
18+
import io.delta.core.internal.util.Logging;
1519
import io.delta.core.utils.CloseableIterator;
1620

17-
public class SnapshotManager {
21+
public class SnapshotManager implements Logging {
1822

19-
////////////////////
20-
// Static Methods //
21-
////////////////////
23+
/////////////////////////////
24+
// Static Fields / Methods //
25+
/////////////////////////////
2226

2327
/**
2428
* - Verify the versions are contiguous.
@@ -62,7 +66,8 @@ public SnapshotManager(TableImpl tableImpl) {
6266
* Update current snapshot by applying the new delta files if any.
6367
*/
6468
public Snapshot update() {
65-
69+
// TODO
70+
return null;
6671
}
6772

6873
//////////////////
@@ -154,11 +159,35 @@ protected final Optional<List<FileStatus>> listDeltaAndCheckpointFiles(
154159
* `lastCheckpoint` file as a hint on where to start listing the transaction log directory. If
155160
* the _delta_log directory doesn't exist, this method will return an `InitialSnapshot`.
156161
*/
157-
private Snapshot getSnapshotAtInit() {
162+
private SnapshotImpl getSnapshotAtInit() {
158163
final long currentTimestamp = System.currentTimeMillis();
159164
final Optional<CheckpointMetaData> lastCheckpointOpt =
160165
tableImpl.checkpointer.readLastCheckpointFile();
161-
getLogSegmentFrom(lastCheckpointOpt);
166+
final Optional<LogSegment> logSegmentOpt = getLogSegmentFrom(lastCheckpointOpt);
167+
return logSegmentOpt
168+
.map(logSegment -> createSnapshot(logSegment, lastCheckpointOpt, Optional.empty()))
169+
.orElse(new InitialSnapshot());
170+
}
171+
172+
private SnapshotImpl createSnapshot(
173+
LogSegment initSegment,
174+
Optional<CheckpointMetaData> checkpointMetadataOptHint,
175+
Optional<VersionChecksum> checksumOpt) {
176+
final String startingFromStr = initSegment
177+
.checkpointVersionOpt
178+
.map(v -> String.format(" starting from checkpoint version %s.", v))
179+
.orElse(".");
180+
logInfo(() -> String.format("Loading version %s%s", initSegment.version, startingFromStr));
181+
182+
// TODO(SCOTT): createSnapshotFromGivenOrEquivalentLogSegment
183+
184+
return new SnapshotImpl(
185+
tableImpl.logPath,
186+
initSegment.version,
187+
initSegment,
188+
tableImpl,
189+
initSegment.lastCommitTimestamp
190+
);
162191
}
163192

164193
/**
@@ -301,20 +330,52 @@ private Optional<LogSegment> getLogSegmentForVersion(
301330
verifyDeltaVersions(deltaVersions, Optional.of(newCheckpointVersion + 1), versionToLoadOpt);
302331
}
303332

333+
// TODO(SCOTT): double check newCheckpointOpt.get() won't error out
334+
304335
final long newVersion = deltaVersions.isEmpty() ? newCheckpointOpt.get().version : deltaVersions.getLast();
305-
final List<FileStatus> newCheckpointFiles
306-
307-
308-
val newVersion = deltaVersions.lastOption.getOrElse(newCheckpoint.get.version)
309-
val newCheckpointFiles: Seq[FileStatus] = newCheckpoint.map { newCheckpoint =>
310-
val newCheckpointPaths = newCheckpoint.getCorrespondingFiles(logPath).toSet
311-
val newCheckpointFileArray = checkpoints.filter(f => newCheckpointPaths.contains(f.getPath))
312-
assert(newCheckpointFileArray.length == newCheckpointPaths.size,
313-
"Failed in getting the file information for:\n" +
314-
newCheckpointPaths.mkString(" -", "\n -", "") + "\n" +
315-
"among\n" + checkpoints.map(_.getPath).mkString(" -", "\n -", ""))
316-
newCheckpointFileArray.toSeq
317-
}.getOrElse(Nil)
336+
337+
// In the case where `deltasAfterCheckpoint` is empty, `deltas` should still not be empty,
338+
// they may just be before the checkpoint version unless we have a bug in log cleanup.
339+
if (deltas.isEmpty()) {
340+
throw new IllegalStateException(
341+
String.format("Could not find any delta files for version %s", newVersion)
342+
);
343+
}
344+
345+
if (versionToLoadOpt.map(v -> v != newVersion).orElse(false)) {
346+
throw new IllegalStateException(
347+
String.format("Trying to load a non-existent version %s", versionToLoadOpt.get())
348+
);
349+
}
350+
351+
final long lastCommitTimestamp = deltas.get(deltas.size() - 1).modificationTime();
352+
353+
final List<FileStatus> newCheckpointFiles = newCheckpointOpt.map(newCheckpoint -> {
354+
final Set<String> newCheckpointPaths =
355+
new HashSet<>(newCheckpoint.getCorrespondingFiles(tableImpl.logPath));
356+
final List<FileStatus> newCheckpointFileList = checkpoints
357+
.stream()
358+
.filter(f -> newCheckpointPaths.contains(f.path()))
359+
.collect(Collectors.toList());
360+
assert (newCheckpointFileList.size() == newCheckpointPaths.size()) :
361+
String.format(
362+
"Filed in getting the file information for:\n%s\namong\n%s",
363+
String.join("\n -", newCheckpointPaths),
364+
checkpoints.stream().map(FileStatus::path).collect(Collectors.joining("\n - "))
365+
);
366+
return newCheckpointFileList;
367+
}).orElse(Collections.emptyList());
368+
369+
return Optional.of(
370+
new LogSegment(
371+
tableImpl.logPath,
372+
newVersion,
373+
deltasAfterCheckpoint,
374+
newCheckpointFiles,
375+
newCheckpointOpt.map(x -> x.version),
376+
lastCommitTimestamp
377+
)
378+
);
318379
}
319380

320381
/**

core/src/main/java/io/delta/core/internal/util/FileNames.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package io.delta.core.internal.util;
22

3+
import java.util.ArrayList;
4+
import java.util.List;
5+
36
public final class FileNames {
47

58
private FileNames() { }
@@ -19,6 +22,32 @@ public static String listingPrefix(final String path, final long version) {
1922
return String.format("%s/%020d.", path, version);
2023
}
2124

25+
/**
26+
* Returns the path for a singular checkpoint up to the given version.
27+
*
28+
* In a future protocol version this path will stop being written.
29+
*/
30+
public static String checkpointFileSingular(String path, long version) {
31+
return String.format("%s/%020d.checkpoint.parquet", path, version);
32+
}
33+
34+
/**
35+
* Returns the paths for all parts of the checkpoint up to the given version.
36+
*
37+
* In a future protocol version we will write this path instead of checkpointFileSingular.
38+
*
39+
* Example of the format: 00000000000000004915.checkpoint.0000000020.0000000060.parquet is
40+
* checkpoint part 20 out of 60 for the snapshot at version 4915. Zero padding is for
41+
* lexicographic sorting.
42+
*/
43+
public static List<String> checkpointFileWithParts(String path, long version, int numParts) {
44+
final List<String> output = new ArrayList<>();
45+
for (int i = 1; i < numParts + 1; i++) {
46+
output.add(String.format("%s/%020d.checkpoint.%010d.%010d.parquet", path, i, numParts, version));
47+
}
48+
return output;
49+
}
50+
2251
public static boolean isCheckpointFile(final String path) {
2352
return false;
2453
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.delta.core.internal.util;
2+
3+
import java.util.function.Supplier;
4+
5+
public interface Logging {
6+
default void logInfo(Supplier<String> msg) {
7+
System.out.println(msg.get());
8+
}
9+
}

0 commit comments

Comments
 (0)