Skip to content

Commit e05af8c

Browse files
authored
chore!: rename log_schema to commit_schema (delta-io#1419)
Rename schema's for more accurately describe schemas. This is follow-up from delta-io#1407
1 parent 3efdae7 commit e05af8c

File tree

12 files changed

+71
-66
lines changed

12 files changed

+71
-66
lines changed

kernel/examples/inspect-table/src/main.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use delta_kernel::actions::visitors::{
44
SetTransactionVisitor,
55
};
66
use delta_kernel::actions::{
7-
get_log_schema, ADD_NAME, CDC_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME,
7+
get_commit_schema, ADD_NAME, CDC_NAME, METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME,
88
SET_TRANSACTION_NAME,
99
};
1010
use delta_kernel::engine_data::{GetData, RowVisitor, TypedGetData as _};
@@ -70,7 +70,7 @@ enum Action {
7070
}
7171

7272
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
73-
LazyLock::new(|| get_log_schema().leaves(None));
73+
LazyLock::new(|| get_commit_schema().leaves(None));
7474

7575
struct LogVisitor {
7676
actions: Vec<(Action, usize)>,
@@ -205,10 +205,11 @@ fn try_main() -> DeltaResult<()> {
205205
}
206206
}
207207
Commands::Actions { oldest_first } => {
208-
let log_schema = get_log_schema();
209-
let actions = snapshot
210-
.log_segment()
211-
.read_actions(&engine, log_schema.clone(), None)?;
208+
let commit_schema = get_commit_schema();
209+
let actions =
210+
snapshot
211+
.log_segment()
212+
.read_actions(&engine, commit_schema.clone(), None)?;
212213

213214
let mut visitor = LogVisitor::new();
214215
for action in actions {

kernel/src/actions/mod.rs

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ pub(crate) const DOMAIN_METADATA_NAME: &str = "domainMetadata";
6767

6868
pub(crate) const INTERNAL_DOMAIN_PREFIX: &str = "delta.";
6969

70-
static LOG_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
70+
static COMMIT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
7171
Arc::new(StructType::new_unchecked([
7272
StructField::nullable(ADD_NAME, Add::to_schema()),
7373
StructField::nullable(REMOVE_NAME, Remove::to_schema()),
@@ -76,17 +76,16 @@ static LOG_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
7676
StructField::nullable(SET_TRANSACTION_NAME, SetTransaction::to_schema()),
7777
StructField::nullable(COMMIT_INFO_NAME, CommitInfo::to_schema()),
7878
StructField::nullable(CDC_NAME, Cdc::to_schema()),
79-
StructField::nullable(CHECKPOINT_METADATA_NAME, CheckpointMetadata::to_schema()),
8079
StructField::nullable(DOMAIN_METADATA_NAME, DomainMetadata::to_schema()),
8180
]))
8281
});
8382

8483
static ALL_ACTIONS_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
8584
Arc::new(StructType::new_unchecked(
86-
get_log_schema()
87-
.fields()
88-
.cloned()
89-
.chain([StructField::nullable(SIDECAR_NAME, Sidecar::to_schema())]),
85+
get_commit_schema().fields().cloned().chain([
86+
StructField::nullable(CHECKPOINT_METADATA_NAME, CheckpointMetadata::to_schema()),
87+
StructField::nullable(SIDECAR_NAME, Sidecar::to_schema()),
88+
]),
9089
))
9190
});
9291

@@ -121,8 +120,8 @@ static LOG_DOMAIN_METADATA_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
121120
#[internal_api]
122121
/// Gets the schema for all actions that can appear in commits
123122
/// logs. This excludes actions that can only appear in checkpoints.
124-
pub(crate) fn get_log_schema() -> &'static SchemaRef {
125-
&LOG_SCHEMA
123+
pub(crate) fn get_commit_schema() -> &'static SchemaRef {
124+
&COMMIT_SCHEMA
126125
}
127126

128127
#[internal_api]
@@ -1047,7 +1046,7 @@ mod tests {
10471046

10481047
#[test]
10491048
fn test_metadata_schema() {
1050-
let schema = get_log_schema()
1049+
let schema = get_commit_schema()
10511050
.project(&[METADATA_NAME])
10521051
.expect("Couldn't get metaData field");
10531052

@@ -1081,7 +1080,7 @@ mod tests {
10811080

10821081
#[test]
10831082
fn test_add_schema() {
1084-
let schema = get_log_schema()
1083+
let schema = get_commit_schema()
10851084
.project(&[ADD_NAME])
10861085
.expect("Couldn't get add field");
10871086

@@ -1139,7 +1138,7 @@ mod tests {
11391138

11401139
#[test]
11411140
fn test_remove_schema() {
1142-
let schema = get_log_schema()
1141+
let schema = get_commit_schema()
11431142
.project(&[REMOVE_NAME])
11441143
.expect("Couldn't get remove field");
11451144
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
@@ -1162,7 +1161,7 @@ mod tests {
11621161

11631162
#[test]
11641163
fn test_cdc_schema() {
1165-
let schema = get_log_schema()
1164+
let schema = get_commit_schema()
11661165
.project(&[CDC_NAME])
11671166
.expect("Couldn't get cdc field");
11681167
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
@@ -1195,7 +1194,7 @@ mod tests {
11951194

11961195
#[test]
11971196
fn test_checkpoint_metadata_schema() {
1198-
let schema = get_log_schema()
1197+
let schema = get_all_actions_schema()
11991198
.project(&[CHECKPOINT_METADATA_NAME])
12001199
.expect("Couldn't get checkpointMetadata field");
12011200
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
@@ -1210,7 +1209,7 @@ mod tests {
12101209

12111210
#[test]
12121211
fn test_transaction_schema() {
1213-
let schema = get_log_schema()
1212+
let schema = get_commit_schema()
12141213
.project(&["txn"])
12151214
.expect("Couldn't get transaction field");
12161215

@@ -1227,7 +1226,7 @@ mod tests {
12271226

12281227
#[test]
12291228
fn test_commit_info_schema() {
1230-
let schema = get_log_schema()
1229+
let schema = get_commit_schema()
12311230
.project(&["commitInfo"])
12321231
.expect("Couldn't get commitInfo field");
12331232

@@ -1251,7 +1250,7 @@ mod tests {
12511250

12521251
#[test]
12531252
fn test_domain_metadata_schema() {
1254-
let schema = get_log_schema()
1253+
let schema = get_commit_schema()
12551254
.project(&[DOMAIN_METADATA_NAME])
12561255
.expect("Couldn't get domainMetadata field");
12571256
let expected = Arc::new(StructType::new_unchecked([StructField::nullable(
@@ -1789,9 +1788,9 @@ mod tests {
17891788
let metadata_id = metadata.id.clone();
17901789

17911790
// test with the full log schema that wraps metadata in a "metaData" field
1792-
let log_schema = get_log_schema().project(&[METADATA_NAME]).unwrap();
1791+
let commit_schema = get_commit_schema().project(&[METADATA_NAME]).unwrap();
17931792
let actual: RecordBatch = metadata
1794-
.into_engine_data(log_schema, &engine)
1793+
.into_engine_data(commit_schema, &engine)
17951794
.unwrap()
17961795
.into_any()
17971796
.downcast::<ArrowEngineData>()
@@ -1886,8 +1885,8 @@ mod tests {
18861885
assert_eq!(record_batch, expected);
18871886

18881887
// test with the full log schema that wraps protocol in a "protocol" field
1889-
let log_schema = get_log_schema().project(&[PROTOCOL_NAME]).unwrap();
1890-
let engine_data = protocol.into_engine_data(log_schema, &engine);
1888+
let commit_schema = get_commit_schema().project(&[PROTOCOL_NAME]).unwrap();
1889+
let engine_data = protocol.into_engine_data(commit_schema, &engine);
18911890

18921891
let schema = Arc::new(Schema::new(vec![Field::new(
18931892
"protocol",
@@ -1996,7 +1995,7 @@ mod tests {
19961995

19971996
#[test]
19981997
fn test_schema_contains_file_actions_with_add() {
1999-
let schema = get_log_schema()
1998+
let schema = get_commit_schema()
20001999
.project(&[ADD_NAME, PROTOCOL_NAME])
20012000
.unwrap();
20022001
assert!(schema_contains_file_actions(&schema));
@@ -2007,7 +2006,7 @@ mod tests {
20072006

20082007
#[test]
20092008
fn test_schema_contains_file_actions_with_remove() {
2010-
let schema = get_log_schema()
2009+
let schema = get_commit_schema()
20112010
.project(&[REMOVE_NAME, METADATA_NAME])
20122011
.unwrap();
20132012
assert!(schema_contains_file_actions(&schema));
@@ -2018,13 +2017,15 @@ mod tests {
20182017

20192018
#[test]
20202019
fn test_schema_contains_file_actions_with_both() {
2021-
let schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME]).unwrap();
2020+
let schema = get_commit_schema()
2021+
.project(&[ADD_NAME, REMOVE_NAME])
2022+
.unwrap();
20222023
assert!(schema_contains_file_actions(&schema));
20232024
}
20242025

20252026
#[test]
20262027
fn test_schema_contains_file_actions_with_neither() {
2027-
let schema = get_log_schema()
2028+
let schema = get_commit_schema()
20282029
.project(&[PROTOCOL_NAME, METADATA_NAME])
20292030
.unwrap();
20302031
assert!(!schema_contains_file_actions(&schema));

kernel/src/actions/visitors.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1087,7 +1087,7 @@ mod tests {
10871087
engine
10881088
.evaluation_handler()
10891089
.new_expression_evaluator(
1090-
get_log_schema().clone(),
1090+
get_commit_schema().clone(),
10911091
expression.into(),
10921092
InCommitTimestampVisitor::schema().into(),
10931093
)

kernel/src/engine/arrow_data.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ impl ArrowEngineData {
343343
mod tests {
344344
use std::sync::Arc;
345345

346-
use crate::actions::{get_log_schema, Metadata, Protocol};
346+
use crate::actions::{get_commit_schema, Metadata, Protocol};
347347
use crate::arrow::array::types::Int32Type;
348348
use crate::arrow::array::{Array, AsArray, Int32Array, RecordBatch, StringArray};
349349
use crate::arrow::datatypes::{
@@ -366,7 +366,7 @@ mod tests {
366366
r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#,
367367
]
368368
.into();
369-
let output_schema = get_log_schema().clone();
369+
let output_schema = get_commit_schema().clone();
370370
let parsed = handler
371371
.parse_json(string_array_to_engine_data(json_strings), output_schema)
372372
.unwrap();
@@ -385,7 +385,7 @@ mod tests {
385385
r#"{"protocol": {"minReaderVersion": 3, "minWriterVersion": 7, "readerFeatures": ["rw1"], "writerFeatures": ["rw1", "w2"]}}"#,
386386
]
387387
.into();
388-
let output_schema = get_log_schema().project(&["protocol"])?;
388+
let output_schema = get_commit_schema().project(&["protocol"])?;
389389
let parsed = handler
390390
.parse_json(string_array_to_engine_data(json_strings), output_schema)
391391
.unwrap();

kernel/src/engine/default/json.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ mod tests {
252252
use std::sync::{mpsc, Arc, Mutex};
253253
use std::task::Waker;
254254

255-
use crate::actions::get_log_schema;
255+
use crate::actions::get_commit_schema;
256256
use crate::arrow::array::{AsArray, Int32Array, RecordBatch, StringArray};
257257
use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
258258
use crate::engine::arrow_data::ArrowEngineData;
@@ -488,7 +488,7 @@ mod tests {
488488
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#,
489489
r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
490490
]);
491-
let output_schema = get_log_schema().clone();
491+
let output_schema = get_commit_schema().clone();
492492

493493
let batch = handler
494494
.parse_json(string_array_to_engine_data(json_strings), output_schema)
@@ -503,7 +503,7 @@ mod tests {
503503
let json_strings = StringArray::from(vec![
504504
r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2, "maxRowId": 3}}}"#,
505505
]);
506-
let output_schema = get_log_schema().clone();
506+
let output_schema = get_commit_schema().clone();
507507

508508
let batch: RecordBatch = handler
509509
.parse_json(string_array_to_engine_data(json_strings), output_schema)
@@ -542,7 +542,7 @@ mod tests {
542542

543543
let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new()));
544544
let data: Vec<RecordBatch> = handler
545-
.read_json_files(files, get_log_schema().clone(), None)
545+
.read_json_files(files, get_commit_schema().clone(), None)
546546
.unwrap()
547547
.map_ok(into_record_batch)
548548
.try_collect()
@@ -554,7 +554,7 @@ mod tests {
554554
// limit batch size
555555
let handler = handler.with_batch_size(2);
556556
let data: Vec<RecordBatch> = handler
557-
.read_json_files(files, get_log_schema().clone(), None)
557+
.read_json_files(files, get_commit_schema().clone(), None)
558558
.unwrap()
559559
.map_ok(into_record_batch)
560560
.try_collect()

kernel/src/log_segment.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::sync::{Arc, LazyLock};
55

66
use crate::actions::visitors::SidecarVisitor;
77
use crate::actions::{
8-
get_log_schema, schema_contains_file_actions, Metadata, Protocol, Sidecar, METADATA_NAME,
8+
get_commit_schema, schema_contains_file_actions, Metadata, Protocol, Sidecar, METADATA_NAME,
99
PROTOCOL_NAME, SIDECAR_NAME,
1010
};
1111
use crate::last_checkpoint_hint::LastCheckpointHint;
@@ -556,7 +556,7 @@ impl LogSegment {
556556
&self,
557557
engine: &dyn Engine,
558558
) -> DeltaResult<impl Iterator<Item = DeltaResult<ActionsBatch>> + Send> {
559-
let schema = get_log_schema().project(&[PROTOCOL_NAME, METADATA_NAME])?;
559+
let schema = get_commit_schema().project(&[PROTOCOL_NAME, METADATA_NAME])?;
560560
// filter out log files that do not contain metadata or protocol information
561561
static META_PREDICATE: LazyLock<Option<PredicateRef>> = LazyLock::new(|| {
562562
Some(Arc::new(Predicate::or(

kernel/src/log_segment/tests.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use url::Url;
99

1010
use crate::actions::visitors::AddVisitor;
1111
use crate::actions::{
12-
get_all_actions_schema, get_log_schema, Add, Sidecar, ADD_NAME, METADATA_NAME, REMOVE_NAME,
12+
get_all_actions_schema, get_commit_schema, Add, Sidecar, ADD_NAME, METADATA_NAME, REMOVE_NAME,
1313
SIDECAR_NAME,
1414
};
1515
use crate::engine::arrow_data::ArrowEngineData;
@@ -1070,15 +1070,15 @@ fn test_create_checkpoint_stream_returns_checkpoint_batches_as_is_if_schema_has_
10701070
add_checkpoint_to_store(
10711071
&store,
10721072
// Create a checkpoint batch with sidecar actions to verify that the sidecar actions are not read.
1073-
sidecar_batch_with_given_paths(vec!["sidecar1.parquet"], get_log_schema().clone()),
1073+
sidecar_batch_with_given_paths(vec!["sidecar1.parquet"], get_commit_schema().clone()),
10741074
"00000000000000000001.checkpoint.parquet",
10751075
)?;
10761076

10771077
let checkpoint_one_file = log_root
10781078
.join("00000000000000000001.checkpoint.parquet")?
10791079
.to_string();
10801080

1081-
let v2_checkpoint_read_schema = get_log_schema().project(&[METADATA_NAME])?;
1081+
let v2_checkpoint_read_schema = get_commit_schema().project(&[METADATA_NAME])?;
10821082

10831083
let log_segment = LogSegment::try_new(
10841084
ListedLogFiles::try_new(
@@ -1138,7 +1138,7 @@ fn test_create_checkpoint_stream_returns_checkpoint_batches_if_checkpoint_is_mul
11381138
let checkpoint_one_file = log_root.join(checkpoint_part_1)?.to_string();
11391139
let checkpoint_two_file = log_root.join(checkpoint_part_2)?.to_string();
11401140

1141-
let v2_checkpoint_read_schema = get_log_schema().project(&[ADD_NAME])?;
1141+
let v2_checkpoint_read_schema = get_commit_schema().project(&[ADD_NAME])?;
11421142

11431143
let log_segment = LogSegment::try_new(
11441144
ListedLogFiles::try_new(
@@ -1185,7 +1185,7 @@ fn test_create_checkpoint_stream_reads_parquet_checkpoint_batch_without_sidecars
11851185

11861186
add_checkpoint_to_store(
11871187
&store,
1188-
add_batch_simple(get_log_schema().clone()),
1188+
add_batch_simple(get_commit_schema().clone()),
11891189
"00000000000000000001.checkpoint.parquet",
11901190
)?;
11911191

@@ -1295,12 +1295,12 @@ fn test_create_checkpoint_stream_reads_checkpoint_file_and_returns_sidecar_batch
12951295

12961296
add_sidecar_to_store(
12971297
&store,
1298-
add_batch_simple(get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?),
1298+
add_batch_simple(get_commit_schema().project(&[ADD_NAME, REMOVE_NAME])?),
12991299
"sidecarfile1.parquet",
13001300
)?;
13011301
add_sidecar_to_store(
13021302
&store,
1303-
add_batch_with_remove(get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?),
1303+
add_batch_with_remove(get_commit_schema().project(&[ADD_NAME, REMOVE_NAME])?),
13041304
"sidecarfile2.parquet",
13051305
)?;
13061306

0 commit comments

Comments
 (0)