Skip to content

Commit d512a22

Browse files
committed
WIP
1 parent 303b14a commit d512a22

File tree

14 files changed

+630
-8
lines changed

14 files changed

+630
-8
lines changed

build.sbt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -802,7 +802,9 @@ lazy val core = (project in file("core"))
802802
name := "delta-core",
803803
commonSettings,
804804
skipReleaseSettings,
805-
libraryDependencies ++= Seq()
805+
libraryDependencies ++= Seq(
806+
// "com.google.guava" % "guava" % "31.1-jre" // Streams.stream
807+
)
806808
)
807809

808810
lazy val defaultCore = (project in file("default-core"))
@@ -811,5 +813,8 @@ lazy val defaultCore = (project in file("default-core"))
811813
name := "delta-core-default",
812814
commonSettings,
813815
skipReleaseSettings,
814-
libraryDependencies ++= Seq()
816+
libraryDependencies ++= Seq(
817+
818+
"com.fasterxml.jackson.core" % "jackson-databind" % "2.14.2" // ObjectMapper
819+
)
815820
)

core/src/main/java/io/delta/core/fs/FileStatus.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
public interface FileStatus {
44

5-
String filePath();
5+
String path();
66

7-
long size();
7+
long length();
88

99
long modificationTime();
1010
}

core/src/main/java/io/delta/core/helpers/TableHelper.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.delta.core.helpers;
22

3+
import java.io.FileNotFoundException;
4+
35
import io.delta.core.data.Row;
46
import io.delta.core.expressions.Expression;
57
import io.delta.core.fs.FileStatus;
@@ -8,11 +10,17 @@
810

911
public interface TableHelper {
1012

11-
CloseableIterator<FileStatus> listFiles(String path);
13+
CloseableIterator<FileStatus> listFiles(String path) throws FileNotFoundException;
1214

13-
CloseableIterator<FileStatus> listFiles(String path, String prefixToListFrom);
15+
CloseableIterator<FileStatus> listFiles(String path, String prefixToListFrom) throws FileNotFoundException;
1416

17+
// DECISION 1 OPTION 1
1518
CloseableIterator<Row> readJsonFile(String path, StructType readSchema);
19+
// each class needs a fromRow()
20+
21+
// DECISION 1 OPTION 2
22+
CloseableIterator<String> readJsonFile(String path);
23+
<T> T fromJson(String json, TypeReference<T> typeReference);
1624

1725
/** Uses the readSchema for partition pruning. */
1826
CloseableIterator<Row> readParquetFile(String path, StructType readSchema);
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.delta.core.helpers;
2+
3+
import java.lang.reflect.ParameterizedType;
4+
import java.lang.reflect.Type;
5+
6+
public abstract class TypeReference<T> {
7+
8+
private final Type type;
9+
10+
public TypeReference() {
11+
Type superclass = getClass().getGenericSuperclass();
12+
type = ((ParameterizedType) superclass).getActualTypeArguments()[0];
13+
}
14+
15+
public Type getType() {
16+
return type;
17+
}
18+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package io.delta.core.internal;
2+
3+
import java.util.List;
4+
import java.util.Optional;
5+
6+
import io.delta.core.fs.FileStatus;
7+
8+
public class LogSegment {
9+
public final String logPath;
10+
public final long version;
11+
public final List<FileStatus> deltas;
12+
public final List<FileStatus> checkpoints;
13+
public final Optional<Long> checkpointVersionOpt;
14+
public final long lastCommitTimestamp;
15+
16+
/**
17+
* Provides information around which files in the transaction log need to be read to create
18+
* the given version of the log.
19+
*
20+
* @param logPath The path to the _delta_log directory
21+
* @param version The Snapshot version to generate
22+
* @param deltas The delta commit files (.json) to read
23+
* @param checkpoints The checkpoint file(s) to read
24+
* @param checkpointVersionOpt The checkpoint version used to start replay
25+
* @param lastCommitTimestamp The "unadjusted" timestamp of the last commit within this segment.
26+
* By unadjusted, we mean that the commit timestamps may not
27+
* necessarily be monotonically increasing for the commits within
28+
* this segment.
29+
*/
30+
public LogSegment(
31+
String logPath,
32+
long version,
33+
List<FileStatus> deltas,
34+
List<FileStatus> checkpoints,
35+
Optional<Long> checkpointVersionOpt,
36+
long lastCommitTimestamp) {
37+
this.logPath = logPath;
38+
this.version = version;
39+
this.deltas = deltas;
40+
this.checkpoints = checkpoints;
41+
this.checkpointVersionOpt = checkpointVersionOpt;
42+
this.lastCommitTimestamp = lastCommitTimestamp;
43+
}
44+
}

0 commit comments

Comments
 (0)