Skip to content

Commit 303b14a

Browse files
committed
Add some TableImpl/SnapshotManager/Checkpointer skeletons
1 parent beea715 commit 303b14a

File tree

5 files changed

+133
-1
lines changed

5 files changed

+133
-1
lines changed

core/src/main/java/io/delta/core/Table.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package io.delta.core;
22

33
import io.delta.core.helpers.TableHelper;
4+
import io.delta.core.internal.TableImpl;
45

56
public interface Table {
67

78
static Table forPath(String path, TableHelper helper) {
8-
return null;
9+
return TableImpl.forPath(path, helper);
910
}
1011

1112
Snapshot getLatestSnapshot();
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.delta.core.internal;
2+
3+
import java.util.Optional;
4+
5+
import io.delta.core.Snapshot;
6+
import io.delta.core.internal.checkpoint.CheckpointMetaData;
7+
8+
public class SnapshotManager {
9+
10+
private final TableImpl tableImpl;
11+
volatile private Snapshot currentSnapshot;
12+
13+
public SnapshotManager(TableImpl tableImpl) {
14+
this.tableImpl = tableImpl;
15+
this.currentSnapshot = getSnapshotAtInit();
16+
}
17+
18+
/**
19+
* Update current snapshot by applying the new delta files if any.
20+
*/
21+
public Snapshot update() {
22+
23+
}
24+
25+
/**
26+
* Load the Snapshot for this Delta table at initialization. This method uses the
27+
* `lastCheckpoint` file as a hint on where to start listing the transaction log directory. If
28+
* the _delta_log directory doesn't exist, this method will return an `InitialSnapshot`.
29+
*/
30+
private Snapshot getSnapshotAtInit() {
31+
final long currentTimestamp = System.currentTimeMillis();
32+
final Optional<CheckpointMetaData> lastCheckpointOpt =
33+
tableImpl.checkpointer.readLastCheckpointFile();
34+
}
35+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package io.delta.core.internal;
2+
3+
import java.util.concurrent.Callable;
4+
import java.util.concurrent.locks.ReentrantLock;
5+
6+
import io.delta.core.Snapshot;
7+
import io.delta.core.Table;
8+
import io.delta.core.helpers.TableHelper;
9+
import io.delta.core.internal.checkpoint.Checkpointer;
10+
11+
public class TableImpl implements Table {
12+
13+
public static Table forPath(String path, TableHelper helper) {
14+
final String logPath = path + "/_delta_log"; // TODO: FileUtils, makeQualified, etc.
15+
16+
return new TableImpl(logPath, path, helper);
17+
}
18+
19+
public final String logPath;
20+
public final String dataPath;
21+
public final TableHelper tableHelper;
22+
public final Checkpointer checkpointer;
23+
public final SnapshotManager snapshotManager;
24+
25+
private final ReentrantLock lock;
26+
27+
public TableImpl(
28+
String logPath,
29+
String dataPath,
30+
TableHelper tableHelper) {
31+
this.logPath = logPath;
32+
this.dataPath = dataPath;
33+
this.tableHelper = tableHelper;
34+
35+
this.lock = new ReentrantLock();
36+
this.checkpointer = new Checkpointer(this);
37+
this.snapshotManager = new SnapshotManager(this);
38+
}
39+
40+
@Override
41+
public Snapshot getLatestSnapshot() {
42+
return snapshotManager.update();
43+
}
44+
45+
public <T> T lockInterruptibly(Callable<T> body) {
46+
try {
47+
lock.lockInterruptibly();
48+
49+
try {
50+
return body.call();
51+
} catch (Exception e) {
52+
// failed body.call()
53+
throw new RuntimeException(e);
54+
} finally {
55+
lock.unlock();
56+
}
57+
} catch (InterruptedException e) {
58+
// failed lock.lockInterruptibly();
59+
throw new RuntimeException(e);
60+
}
61+
}
62+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.delta.core.internal.checkpoint;
2+
3+
public class CheckpointMetaData {
4+
public final long version;
5+
public final long size;
6+
7+
public CheckpointMetaData(long version, long size) {
8+
this.version = version;
9+
this.size = size;
10+
}
11+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.delta.core.internal.checkpoint;
2+
3+
import java.util.Optional;
4+
5+
import io.delta.core.internal.TableImpl;
6+
7+
public class Checkpointer {
8+
private final TableImpl tableImpl;
9+
10+
public Checkpointer(TableImpl tableImpl) {
11+
this.tableImpl = tableImpl;
12+
}
13+
14+
/** Returns information about the most recent checkpoint. */
15+
public Optional<CheckpointMetaData> readLastCheckpointFile() {
16+
return loadMetadataFromFile(0);
17+
}
18+
19+
/** Loads the checkpoint metadata from the _last_checkpoint file. */
20+
private Optional<CheckpointMetaData> loadMetadataFromFile(int tries) {
21+
22+
}
23+
}

0 commit comments

Comments
 (0)