Skip to content

Commit 0ff007c

Browse files
committed
persisit
1 parent be6898c commit 0ff007c

File tree

3 files changed

+92
-14
lines changed

3 files changed

+92
-14
lines changed

kernel/kernel-api/src/main/java/io/delta/kernel/internal/TransactionImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,7 @@ private Optional<CRCInfo> buildPostCommitCrcInfoIfCurrentCrcAvailable(
592592
metricsResult.getTotalAddFilesSizeInBytes(),
593593
metricsResult.getNumAddFiles(),
594594
Optional.of(txnId.toString()),
595+
Optional.empty(), // TODO: populate domain metadata
595596
metricsResult
596597
.getTableFileSizeHistogram()
597598
.map(FileSizeHistogram::fromFileSizeHistogramResult)));
@@ -615,6 +616,7 @@ private Optional<CRCInfo> buildPostCommitCrcInfoIfCurrentCrcAvailable(
615616
+ metricsResult.getNumAddFiles()
616617
- metricsResult.getNumRemoveFiles(),
617618
Optional.of(txnId.toString()),
619+
Optional.empty(), // TODO: populate domain metadata
618620
metricsResult
619621
.getTableFileSizeHistogram()
620622
.map(FileSizeHistogram::fromFileSizeHistogramResult)));

kernel/kernel-api/src/main/java/io/delta/kernel/internal/checksum/CRCInfo.java

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,20 @@
2121
import io.delta.kernel.data.ColumnVector;
2222
import io.delta.kernel.data.ColumnarBatch;
2323
import io.delta.kernel.data.Row;
24+
import io.delta.kernel.internal.actions.DomainMetadata;
2425
import io.delta.kernel.internal.actions.Metadata;
2526
import io.delta.kernel.internal.actions.Protocol;
2627
import io.delta.kernel.internal.data.GenericRow;
28+
import io.delta.kernel.internal.data.StructRow;
2729
import io.delta.kernel.internal.stats.FileSizeHistogram;
2830
import io.delta.kernel.internal.util.InternalUtils;
31+
import io.delta.kernel.internal.util.VectorUtils;
32+
import io.delta.kernel.types.ArrayType;
2933
import io.delta.kernel.types.LongType;
3034
import io.delta.kernel.types.StringType;
3135
import io.delta.kernel.types.StructType;
32-
import java.util.HashMap;
33-
import java.util.Map;
34-
import java.util.Objects;
35-
import java.util.Optional;
36+
import java.util.*;
37+
import java.util.stream.Collectors;
3638
import org.slf4j.Logger;
3739
import org.slf4j.LoggerFactory;
3840

@@ -47,6 +49,7 @@ public class CRCInfo {
4749
private static final String METADATA = "metadata";
4850
private static final String PROTOCOL = "protocol";
4951
private static final String TXN_ID = "txnId";
52+
private static final String DOMAIN_METADATA = "domainMetadata";
5053
private static final String FILE_SIZE_HISTOGRAM = "fileSizeHistogram";
5154

5255
public static final StructType CRC_FILE_SCHEMA =
@@ -58,6 +61,7 @@ public class CRCInfo {
5861
.add(METADATA, Metadata.FULL_SCHEMA)
5962
.add(PROTOCOL, Protocol.FULL_SCHEMA)
6063
.add(TXN_ID, StringType.STRING, /*nullable*/ true)
64+
.add(DOMAIN_METADATA, new ArrayType(DomainMetadata.FULL_SCHEMA, false), /*nullable*/ true)
6165
.add(FILE_SIZE_HISTOGRAM, FileSizeHistogram.FULL_SCHEMA, /*nullable*/ true);
6266

6367
public static Optional<CRCInfo> fromColumnarBatch(
@@ -82,6 +86,14 @@ public static Optional<CRCInfo> fromColumnarBatch(
8286
txnIdColumnVector.isNullAt(rowId)
8387
? Optional.empty()
8488
: Optional.of(txnIdColumnVector.getString(rowId));
89+
ColumnVector domainMetadataVector = batch.getColumnVector(getSchemaIndex(DOMAIN_METADATA));
90+
Optional<List<DomainMetadata>> domainMetadata =
91+
domainMetadataVector.isNullAt(rowId)
92+
? Optional.empty()
93+
: Optional.of(
94+
VectorUtils.toJavaList(domainMetadataVector.getArray(rowId)).stream()
95+
.map(row -> DomainMetadata.fromRow((StructRow) row))
96+
.collect(Collectors.toList()));
8597
Optional<FileSizeHistogram> fileSizeHistogram =
8698
FileSizeHistogram.fromColumnVector(
8799
batch.getColumnVector(getSchemaIndex(FILE_SIZE_HISTOGRAM)), rowId);
@@ -93,7 +105,14 @@ public static Optional<CRCInfo> fromColumnarBatch(
93105
}
94106
return Optional.of(
95107
new CRCInfo(
96-
version, metadata, protocol, tableSizeBytes, numFiles, txnId, fileSizeHistogram));
108+
version,
109+
metadata,
110+
protocol,
111+
tableSizeBytes,
112+
numFiles,
113+
txnId,
114+
domainMetadata,
115+
fileSizeHistogram));
97116
}
98117

99118
private final long version;
@@ -102,6 +121,7 @@ public static Optional<CRCInfo> fromColumnarBatch(
102121
private final long tableSizeBytes;
103122
private final long numFiles;
104123
private final Optional<String> txnId;
124+
private final Optional<List<DomainMetadata>> domainMetadata;
105125
private final Optional<FileSizeHistogram> fileSizeHistogram;
106126

107127
public CRCInfo(
@@ -111,15 +131,19 @@ public CRCInfo(
111131
long tableSizeBytes,
112132
long numFiles,
113133
Optional<String> txnId,
134+
Optional<List<DomainMetadata>> domainMetadata,
114135
Optional<FileSizeHistogram> fileSizeHistogram) {
115136
checkArgument(tableSizeBytes >= 0);
116137
checkArgument(numFiles >= 0);
138+
// Live Domain Metadata actions at this version, excluding tombstones.
139+
domainMetadata.ifPresent(dms -> dms.forEach(dm -> checkArgument(!dm.isRemoved())));
117140
this.version = version;
118141
this.metadata = requireNonNull(metadata);
119142
this.protocol = requireNonNull(protocol);
120143
this.tableSizeBytes = tableSizeBytes;
121144
this.numFiles = numFiles;
122145
this.txnId = requireNonNull(txnId);
146+
this.domainMetadata = requireNonNull(domainMetadata);
123147
this.fileSizeHistogram = requireNonNull(fileSizeHistogram);
124148
}
125149

@@ -150,6 +174,10 @@ public Optional<String> getTxnId() {
150174
return txnId;
151175
}
152176

177+
public Optional<List<DomainMetadata>> getDomainMetadata() {
178+
return domainMetadata;
179+
}
180+
153181
/** The {@link FileSizeHistogram} stored in this CRCInfo. */
154182
public Optional<FileSizeHistogram> getFileSizeHistogram() {
155183
return fileSizeHistogram;
@@ -172,6 +200,15 @@ public Row toRow() {
172200

173201
// Add optional fields
174202
txnId.ifPresent(txn -> values.put(getSchemaIndex(TXN_ID), txn));
203+
domainMetadata.ifPresent(
204+
domainMetadataList ->
205+
values.put(
206+
getSchemaIndex(DOMAIN_METADATA),
207+
VectorUtils.buildArrayValue(
208+
domainMetadataList.stream()
209+
.map(DomainMetadata::toRow)
210+
.collect(Collectors.toList()),
211+
DomainMetadata.FULL_SCHEMA)));
175212
fileSizeHistogram.ifPresent(
176213
fileSizeHistogram ->
177214
values.put(getSchemaIndex(FILE_SIZE_HISTOGRAM), fileSizeHistogram.toRow()));
@@ -196,6 +233,7 @@ public boolean equals(Object o) {
196233
&& metadata.equals(other.metadata)
197234
&& protocol.equals(other.protocol)
198235
&& txnId.equals(other.txnId)
236+
&& domainMetadata.equals(other.domainMetadata)
199237
&& fileSizeHistogram.equals(other.fileSizeHistogram);
200238
}
201239

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/checksum/ChecksumWriterSuite.scala

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@ package io.delta.kernel.internal.checksum
1818
import java.util
1919
import java.util.{Collections, Optional}
2020

21+
import scala.collection.JavaConverters.{asScalaBufferConverter, seqAsJavaListConverter}
22+
2123
import io.delta.kernel.data.Row
22-
import io.delta.kernel.internal.actions.{Format, Metadata, Protocol}
24+
import io.delta.kernel.exceptions.TableNotFoundException
25+
import io.delta.kernel.internal.actions.{DomainMetadata, Format, Metadata, Protocol}
2326
import io.delta.kernel.internal.checksum.CRCInfo.CRC_FILE_SCHEMA
24-
import io.delta.kernel.internal.data.GenericRow
27+
import io.delta.kernel.internal.data.{GenericRow, StructRow}
2528
import io.delta.kernel.internal.fs.Path
2629
import io.delta.kernel.internal.types.DataTypeJsonSerDe
2730
import io.delta.kernel.internal.util.VectorUtils
@@ -45,6 +48,7 @@ class ChecksumWriterSuite extends AnyFunSuite with MockEngineUtils {
4548
private val NUM_METADATA_IDX = CRC_FILE_SCHEMA.indexOf("numMetadata")
4649
private val NUM_PROTOCOL_IDX = CRC_FILE_SCHEMA.indexOf("numProtocol")
4750
private val TXN_ID_IDX = CRC_FILE_SCHEMA.indexOf("txnId")
51+
private val DOMAIN_METADATA_IDX = CRC_FILE_SCHEMA.indexOf("domainMetadata")
4852
private val METADATA_IDX = CRC_FILE_SCHEMA.indexOf("metadata")
4953
private val PROTOCOL_IDX = CRC_FILE_SCHEMA.indexOf("protocol")
5054
private val FILE_SIZE_HISTOGRAM_IDX = CRC_FILE_SCHEMA.indexOf("fileSizeHistogram")
@@ -55,15 +59,25 @@ class ChecksumWriterSuite extends AnyFunSuite with MockEngineUtils {
5559
val protocol = createTestProtocol()
5660
val metadata = createTestMetadata()
5761

58-
def testChecksumWrite(txn: Optional[String]): Unit = {
62+
def testChecksumWrite(
63+
txn: Optional[String],
64+
domainMetadata: Optional[util.List[DomainMetadata]]): Unit = {
5965
val version = 1L
6066
val tableSizeBytes = 100L
6167
val numFiles = 1L
6268

6369
// TODO when we support writing fileSizeHistogram as part of CRC update this to be non-empty
6470
checksumWriter.writeCheckSum(
6571
mockEngine(jsonHandler = jsonHandler),
66-
new CRCInfo(version, metadata, protocol, tableSizeBytes, numFiles, txn, Optional.empty()))
72+
new CRCInfo(
73+
version,
74+
metadata,
75+
protocol,
76+
tableSizeBytes,
77+
numFiles,
78+
txn,
79+
domainMetadata,
80+
Optional.empty()))
6781

6882
verifyChecksumFile(jsonHandler, version)
6983
verifyChecksumContent(
@@ -72,12 +86,26 @@ class ChecksumWriterSuite extends AnyFunSuite with MockEngineUtils {
7286
numFiles,
7387
metadata,
7488
protocol,
75-
txn)
89+
txn,
90+
domainMetadata)
7691
}
7792

78-
// Test with and without transaction ID
79-
testChecksumWrite(Optional.of("txn"))
80-
testChecksumWrite(Optional.empty())
93+
// Test with and without transaction ID, domain metadata
94+
testChecksumWrite(Optional.of("txn"), Optional.empty())
95+
testChecksumWrite(Optional.empty(), Optional.empty())
96+
testChecksumWrite(
97+
Optional.empty(),
98+
Optional.of(Seq(
99+
new DomainMetadata("domain1", "", false /* removed */ ),
100+
new DomainMetadata("domain2", "", false /* removed */ )).asJava))
101+
// Per protocol, domain metadata list should exclude tombstone.
102+
intercept[IllegalArgumentException] {
103+
testChecksumWrite(
104+
Optional.empty(),
105+
Optional.of(Seq(
106+
new DomainMetadata("domain1", "", true /* removed */ ),
107+
new DomainMetadata("domain2", "", false /* removed */ )).asJava))
108+
}
81109
}
82110

83111
private def verifyChecksumFile(jsonHandler: MockCheckSumFileJsonWriter, version: Long): Unit = {
@@ -92,7 +120,8 @@ class ChecksumWriterSuite extends AnyFunSuite with MockEngineUtils {
92120
expectedNumFiles: Long,
93121
expectedMetadata: Metadata,
94122
expectedProtocol: Protocol,
95-
expectedTxnId: Optional[String]): Unit = {
123+
expectedTxnId: Optional[String],
124+
domainMetadata: Optional[util.List[DomainMetadata]]): Unit = {
96125
assert(!actualCheckSumRow.isNullAt(TABLE_SIZE_BYTES_IDX) && actualCheckSumRow.getLong(
97126
TABLE_SIZE_BYTES_IDX) == expectedTableSizeBytes)
98127
assert(!actualCheckSumRow.isNullAt(
@@ -109,6 +138,15 @@ class ChecksumWriterSuite extends AnyFunSuite with MockEngineUtils {
109138
} else {
110139
assert(actualCheckSumRow.isNullAt(TXN_ID_IDX))
111140
}
141+
142+
if (domainMetadata.isPresent) {
143+
assert(VectorUtils.toJavaList[Row](actualCheckSumRow.getArray(DOMAIN_METADATA_IDX)).asScala
144+
.map(DomainMetadata.fromRow)
145+
=== domainMetadata.get().asScala)
146+
} else {
147+
assert(actualCheckSumRow.isNullAt(DOMAIN_METADATA_IDX))
148+
}
149+
112150
// TODO once we support writing fileSizeHistogram as part of CRC check it here
113151
assert(actualCheckSumRow.isNullAt(FILE_SIZE_HISTOGRAM_IDX))
114152
}

0 commit comments

Comments
 (0)