Skip to content

Commit 895a7de

Browse files
committed
add ensureFields intrinsic to allow for adding missing fields
1 parent 3657c80 commit 895a7de

File tree

6 files changed

+138
-39
lines changed

6 files changed

+138
-39
lines changed

tessellate-main/src/main/antora/modules/reference/pages/transforms.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,3 +135,9 @@ Params:::
135135
The default.
136136
- `upperUnderscore` - converts to upper case and replaces `/\ .-` with underscores
137137
- `camelCase` - converts to camel case
138+
139+
`ensureFields`:: Add any missing fields/columns.
140+
Use this when there are many files with overlapping but not consistent field names and the files need to be normalized so they can be processed as a single unit.
141+
Note any fields not declared as the result fields will be discarded.
142+
Def:::
143+
- `^ensureFields{} -> to_field1 + to_field2` - Ensure the results have all declared fields

tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Intrinsics.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ private static void add(IntrinsicBuilder intrinsicBuilder) {
2424
static {
2525
add(new TsidIntrinsic());
2626
add(new FixedWidthIntrinsic());
27+
add(new EnsureIntrinsic());
2728
add(new ToJsonIntrinsic());
2829
add(new FromJsonIntrinsic());
2930
add(new FormatFieldsIntrinsic());

tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Transformer.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,23 @@ private PipelineContext discardAndEval(PipelineContext context) {
5454

5555
if (operation.exp() != null) {
5656
IntrinsicBuilder.Result result = create(context, operation);
57-
// SWAP allows for efficient replacement of arguments with non-argument and results
58-
Fields selector = result.arguments().isAll() || (context.currentFields.contains(result.arguments()) && context.currentFields.size() == result.arguments().size()) ? Fields.RESULTS : Fields.SWAP;
57+
58+
boolean argsIsAll = result.arguments().isAll() ||
59+
containsAll(context.currentFields, result.arguments());
60+
61+
boolean resultsEqualsDeclared = containsAll(result.function().getFieldDeclaration(), result.results());
62+
63+
Fields selector;
64+
65+
if ((argsIsAll && result.results().isNone()) || resultsEqualsDeclared) {
66+
selector = Fields.RESULTS;
67+
} else if (argsIsAll) {
68+
selector = result.results();
69+
} else {
70+
// SWAP allows for efficient replacement of arguments with non-argument and results
71+
selector = Fields.SWAP;
72+
}
73+
5974
Pipe pipe = new Each(context.pipe, result.arguments(), result.function(), selector);
6075

6176
Fields currentFields = context.currentFields.subtract(result.arguments()).append(result.results());
@@ -78,6 +93,10 @@ private PipelineContext discardAndEval(PipelineContext context) {
7893
}
7994
}
8095

96+
private static boolean containsAll(Fields lhs, Fields rhs) {
97+
return lhs.contains(rhs) && lhs.size() == rhs.size();
98+
}
99+
81100
private PipelineContext copyAndEval(PipelineContext context) {
82101
Operation operation = (Operation) statement;
83102

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright (c) 2023-2025 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*/
8+
9+
package io.clusterless.tessellate.pipeline.intrinsic;
10+
11+
import cascading.operation.Insert;
12+
import cascading.tuple.Fields;
13+
import io.clusterless.tessellate.parser.ast.Operation;
14+
15+
public class EnsureIntrinsic extends IntrinsicBuilder {
16+
17+
public EnsureIntrinsic() {
18+
super("ensureFields");
19+
}
20+
21+
@Override
22+
public Result create(Fields currentFields, Operation operation) {
23+
Fields toFields = fieldsParser().asFields(operation.results());
24+
25+
if (currentFields.isNone()) {
26+
throw new IllegalArgumentException("this intrinsic requires fields to be declared, got: " + currentFields.print());
27+
}
28+
29+
if (toFields.isNone()) {
30+
throw new IllegalArgumentException("result fields must be declared");
31+
}
32+
33+
Fields insertFields = toFields.subtract(currentFields);
34+
35+
Object[] values = new Object[insertFields.size()];
36+
37+
Insert function = new Insert(insertFields, values);
38+
39+
return new Result(Fields.ALL, function, toFields);
40+
}
41+
}

tessellate-main/src/test/java/io/clusterless/tessellate/pipeline/PipelineTest.java

Lines changed: 55 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ void noHeaders(@PathForResource("/data/delimited.csv") URI input, @PathForOutput
7878

7979
CascadingTesting.validateEntries(
8080
pipeline.flow().openSink(),
81-
l -> assertEquals(12, l, "wrong length"),
82-
l -> assertEquals(5, l, "wrong size"),
81+
l -> assertEquals(12, l, "wrong file length"),
82+
l -> assertEquals(5, l, "wrong tuple size"),
8383
l -> {
8484
}
8585
);
@@ -93,7 +93,7 @@ void badWidth(@PathForResource("/data/delimited-variable-width.csv") URI input,
9393
"^fixedWidth{ width:5, insertAt:3 } ->"
9494
);
9595

96-
fixedWidthBase(input, output, transform, List.of());
96+
fixedSchemaBase(input, output, transform, List.of(), false, 5, 5);
9797
}
9898

9999
@Test
@@ -102,7 +102,7 @@ void badWidthWithFields(@PathForResource("/data/delimited-variable-width.csv") U
102102
"^fixedWidth{ width:5, insertAt:3 } -> _0+_1+_2+_3+_4"
103103
);
104104

105-
fixedWidthBase(input, output, transform, List.of());
105+
fixedSchemaBase(input, output, transform, List.of(), false, 5, 5);
106106
}
107107

108108
@Test
@@ -111,10 +111,28 @@ void badWidthWithFieldsAndTypes(@PathForResource("/data/delimited-variable-width
111111
"^fixedWidth{ width:5, insertAt:3 } -> _0|string+_1|string+_2|string+_3|string+_4|string"
112112
);
113113

114-
fixedWidthBase(input, output, transform, Field.asField("a|string", "b|string", "c|string", "d|string", "e|string"));
114+
fixedSchemaBase(input, output, transform, Field.asField("a|string", "b|string", "c|string", "d|string", "e|string"), false, 5, 5);
115115
}
116116

117-
private static void fixedWidthBase(URI input, URI output, Transform transform, List<@NotNull Field> declared) throws IOException {
117+
@Test
118+
void overlappingSchema(@PathForResource("/data/delimited-header-missing-col.csv") URI input, @PathForOutput URI output) throws IOException {
119+
Transform transform = new Transform(
120+
"^ensureFields{} -> first|string+second|string+third|string+fourth|string+fifth|string"
121+
);
122+
123+
fixedSchemaBase(input, output, transform, List.of(), true, 13, 5);
124+
}
125+
126+
@Test
127+
void overlappingSchemaWithTail(@PathForResource("/data/delimited-header-missing-col.csv") URI input, @PathForOutput URI output) throws IOException {
128+
Transform transform = new Transform(
129+
"^ensureFields{} -> first|string+second|string+third|string+fourth|string+fifth|string+sixth|string"
130+
);
131+
132+
fixedSchemaBase(input, output, transform, List.of(), true, 13, 6);
133+
}
134+
135+
private static void fixedSchemaBase(URI input, URI output, Transform transform, List<@NotNull Field> declared, boolean embedsSchema, int fileLength, int tupleSize) throws IOException {
118136
PipelineOptions pipelineOptions = new PipelineOptions();
119137

120138
PipelineDef def = PipelineDef.builder()
@@ -124,7 +142,7 @@ private static void fixedWidthBase(URI input, URI output, Transform transform, L
124142
.withSchema(Schema.builder()
125143
.withDeclared(declared)
126144
.withFormat(Format.csv)
127-
.withEmbedsSchema(false)
145+
.withEmbedsSchema(embedsSchema)
128146
.withStrictParsing(declared.isEmpty())
129147
.build())
130148
.build())
@@ -147,8 +165,8 @@ private static void fixedWidthBase(URI input, URI output, Transform transform, L
147165

148166
CascadingTesting.validateEntries(
149167
pipeline.flow().openSink(),
150-
l -> assertEquals(5, l, "wrong length"),
151-
l -> assertEquals(5, l, "wrong size"),
168+
l -> assertEquals(fileLength, l, "wrong file length"),
169+
l -> assertEquals(tupleSize, l, "wrong tuple size"),
152170
l -> {
153171
}
154172
);
@@ -189,8 +207,8 @@ void headers(@PathForResource("/data/delimited-header.csv") URI input, @PathForO
189207

190208
CascadingTesting.validateEntries(
191209
pipeline.flow().openSink(),
192-
l -> assertEquals(13, l, "wrong length"), // headers are declared so aren't counted
193-
l -> assertEquals(5, l, "wrong size"),
210+
l -> assertEquals(13, l, "wrong file length"), // headers are declared so aren't counted
211+
l -> assertEquals(5, l, "wrong tuple size"),
194212
l -> {
195213
}
196214
);
@@ -259,8 +277,8 @@ private static void headersPartitionedBase(URI input, URI output, List<SourcePar
259277

260278
CascadingTesting.validateEntries(
261279
iterator,
262-
l -> assertEquals(13, l, "wrong length"), // headers are declared so aren't counted
263-
l -> assertEquals(8, l, "wrong size"),
280+
l -> assertEquals(13, l, "wrong file length"), // headers are declared so aren't counted
281+
l -> assertEquals(8, l, "wrong tuple size"),
264282
l -> {
265283
}
266284
);
@@ -304,8 +322,8 @@ void awsS3AccessLog(@PathForResource("/data/aws-s3-access-log.txt") URI input, @
304322

305323
CascadingTesting.validateEntries(
306324
pipeline.flow().openSink(),
307-
l -> assertEquals(4, l, "wrong length"), // headers are declared so aren't counted
308-
l -> assertEquals(merged.source().schema().declared().size(), l, "wrong size"),
325+
l -> assertEquals(4, l, "wrong file length"), // headers are declared so aren't counted
326+
l -> assertEquals(merged.source().schema().declared().size(), l, "wrong tuple size"),
309327
l -> {
310328
}
311329
);
@@ -358,8 +376,8 @@ void awsS3AccessLogWithTransforms(@PathForResource("/data/aws-s3-access-log.txt"
358376

359377
CascadingTesting.validateEntries(
360378
pipeline.flow().openSink(),
361-
l -> assertEquals(4, l, "wrong length"), // headers are declared so aren't counted
362-
l -> assertEquals(merged.source().schema().declared().size() + 1 + 1 + 1, l, "wrong size"),
379+
l -> assertEquals(4, l, "wrong file length"), // headers are declared so aren't counted
380+
l -> assertEquals(merged.source().schema().declared().size() + 1 + 1 + 1, l, "wrong tuple size"),
363381
l -> {
364382
}
365383
);
@@ -403,8 +421,8 @@ void writeReadParquet(@PathForResource("/data/aws-s3-access-log.txt") URI input,
403421

404422
CascadingTesting.validateEntries(
405423
pipelineWrite.flow().openSink(),
406-
l -> assertEquals(4, l, "wrong length"), // headers are declared so aren't counted
407-
l -> assertEquals(merged.source().schema().declared().size(), l, "wrong size"),
424+
l -> assertEquals(4, l, "wrong file length"), // headers are declared so aren't counted
425+
l -> assertEquals(merged.source().schema().declared().size(), l, "wrong tuple size"),
408426
l -> {
409427
}
410428
);
@@ -438,8 +456,8 @@ void writeReadParquet(@PathForResource("/data/aws-s3-access-log.txt") URI input,
438456

439457
CascadingTesting.validateEntries(
440458
pipelineRead.flow().openSink(),
441-
l -> assertEquals(4, l, "wrong length"), // headers are declared so aren't counted
442-
l -> assertEquals(merged.source().schema().declared().size(), l, "wrong size"),
459+
l -> assertEquals(4, l, "wrong file length"), // headers are declared so aren't counted
460+
l -> assertEquals(merged.source().schema().declared().size(), l, "wrong tuple size"),
443461
l -> {
444462
}
445463
);
@@ -526,8 +544,8 @@ void writeReadParquetPartitioned(@PathForResource("/data/aws-s3-access-log.txt")
526544

527545
CascadingTesting.validateEntries(
528546
iterator,
529-
l -> assertEquals(4, l, "wrong length"), // headers are declared so aren't counted
530-
l -> assertEquals(merged.source().schema().declared().size() + 3, l, "wrong size"),
547+
l -> assertEquals(4, l, "wrong file length"), // headers are declared so aren't counted
548+
l -> assertEquals(merged.source().schema().declared().size() + 3, l, "wrong tuple size"),
531549
l -> {
532550
}
533551
);
@@ -598,8 +616,8 @@ void writeReadParquetPartitioned(@PathForResource("/data/aws-s3-access-log.txt")
598616

599617
CascadingTesting.validateEntries(
600618
finalIterator,
601-
l -> assertEquals(4, l, "wrong length"), // headers are declared so aren't counted
602-
l -> assertEquals(merged.source().schema().declared().size() + 3, l, "wrong size"),
619+
l -> assertEquals(4, l, "wrong file length"), // headers are declared so aren't counted
620+
l -> assertEquals(merged.source().schema().declared().size() + 3, l, "wrong tuple size"),
603621
l -> {
604622
}
605623
);
@@ -685,8 +703,8 @@ void writeReadParquetPartitionedWithManifests(
685703

686704
CascadingTesting.validateEntries(
687705
pipelineWrite.flow().openSink(),
688-
l -> assertEquals(4, l, "wrong length"), // headers are declared so aren't counted
689-
l -> assertEquals(merged.source().schema().declared().size() + 3, l, "wrong size"),
706+
l -> assertEquals(4, l, "wrong file length"), // headers are declared so aren't counted
707+
l -> assertEquals(merged.source().schema().declared().size() + 3, l, "wrong tuple size"),
690708
l -> {
691709
}
692710
);
@@ -729,8 +747,8 @@ void writeReadParquetPartitionedWithManifests(
729747

730748
CascadingTesting.validateEntries(
731749
pipelineRead.flow().openSink(),
732-
l -> assertEquals(4, l, "wrong length"), // headers are declared so aren't counted
733-
l -> assertEquals(merged.source().schema().declared().size() + 3, l, "wrong size"),
750+
l -> assertEquals(4, l, "wrong file length"), // headers are declared so aren't counted
751+
l -> assertEquals(merged.source().schema().declared().size() + 3, l, "wrong tuple size"),
734752
l -> {
735753
}
736754
);
@@ -777,8 +795,8 @@ void toJsonAndBack(
777795

778796
CascadingTesting.validateEntries(
779797
writeJson.flow().openSink(),
780-
l -> assertEquals(13, l, "wrong length"), // headers are declared so aren't counted
781-
l -> assertEquals(1, l, "wrong size"),
798+
l -> assertEquals(13, l, "wrong file length"), // headers are declared so aren't counted
799+
l -> assertEquals(1, l, "wrong tuple size"),
782800
l -> {
783801
}
784802
);
@@ -820,8 +838,8 @@ void toJsonAndBack(
820838

821839
CascadingTesting.validateEntries(
822840
readJson.flow().openSink(),
823-
l -> assertEquals(13, l, "wrong length"), // headers are declared so aren't counted
824-
l -> assertEquals(5, l, "wrong size"),
841+
l -> assertEquals(13, l, "wrong file length"), // headers are declared so aren't counted
842+
l -> assertEquals(5, l, "wrong tuple size"),
825843
l -> {
826844
}
827845
);
@@ -882,8 +900,8 @@ void headersBadWidth(
882900

883901
CascadingTesting.validateEntries(
884902
pipeline.flow().openSink(),
885-
l -> assertEquals(11, l, "wrong length"), // headers are declared so aren't counted
886-
l -> assertEquals(5, l, "wrong size"),
903+
l -> assertEquals(11, l, "wrong file length"), // headers are declared so aren't counted
904+
l -> assertEquals(5, l, "wrong tuple size"),
887905
l -> {
888906
}
889907
);
@@ -983,8 +1001,8 @@ private static void joinTest(URI lhs, URI rhs, URI output, String transform, int
9831001

9841002
CascadingTesting.validateEntries(
9851003
pipeline.flow().openSink(),
986-
l -> assertEquals(length, l, "wrong length"), // headers are declared so aren't counted
987-
l -> assertEquals(size, l, "wrong size"),
1004+
l -> assertEquals(length, l, "wrong file length"), // headers are declared so aren't counted
1005+
l -> assertEquals(size, l, "wrong tuple size"),
9881006
l -> {
9891007
}
9901008
);
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
first,second,fourth,fifth
2+
foo,bar,bin,1
3+
foo,bar,bin,2
4+
foo,"bar,bar",bin,3
5+
foo,"bar"",bar",bin,4
6+
foo,"bar"""",bar",bin,5
7+
,"",,6
8+
,,,7
9+
foo,,,8
10+
,"",,9
11+
f,,,10
12+
f,,",bin",11
13+
f,,",bin",
14+
f,,,

0 commit comments

Comments
 (0)