Skip to content

Commit c039b14

Browse files
committed
add sourcePath intrinsic function
1 parent 61a4573 commit c039b14

File tree

5 files changed

+50
-5
lines changed

5 files changed

+50
-5
lines changed

tessellate-main/src/main/antora/modules/reference/pages/transforms.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,7 @@ Use this when there are many files with overlapping but not consistent field nam
141141
Note any fields not declared as the result fields will be discarded.
142142
Def:::
143143
- `^ensureFields{} -> to_field1 + to_field2` - Ensure the results have all declared fields
144+
145+
`sourcePath`:: Add the URI of the data currently being processed.
146+
Def:::
147+
- `^sourcePath{} -> to_field` - Assign the current URI to `to_field`

tessellate-main/src/main/java/io/clusterless/tessellate/operation/SourcePathFunction.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,11 @@ public void prepare(FlowProcess flowProcess, OperationCall<Tuple> operationCall)
5252
public void operate(FlowProcess flowProcess, FunctionCall<Tuple> functionCall) {
5353
String sourcePath = flowProcess.getFlowProcessContext().getSourcePath();
5454

55-
if (sourcePath == null)
55+
if (sourcePath == null) {
5656
sourcePath = defaultPath;
57-
else if (fileNameOnly)
57+
} else if (fileNameOnly) {
5858
sourcePath = getFileName(sourcePath);
59+
}
5960

6061
functionCall.getContext().set(0, sourcePath);
6162

@@ -65,8 +66,9 @@ else if (fileNameOnly)
6566
protected String getFileName(String sourcePath) {
6667
int i = sourcePath.lastIndexOf(File.separatorChar);
6768

68-
if (i == -1)
69+
if (i == -1) {
6970
return sourcePath;
71+
}
7072

7173
return sourcePath.substring(i + 1);
7274
}

tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Intrinsics.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ private static void add(IntrinsicBuilder intrinsicBuilder) {
2323

2424
static {
2525
add(new TsidIntrinsic());
26+
add(new SourcePathIntrinsic());
2627
add(new FixedWidthIntrinsic());
2728
add(new EnsureIntrinsic());
2829
add(new ToJsonIntrinsic());
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright (c) 2023-2025 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*/
8+
9+
package io.clusterless.tessellate.pipeline.intrinsic;
10+
11+
import cascading.tuple.Fields;
12+
import io.clusterless.tessellate.operation.SourcePathFunction;
13+
import io.clusterless.tessellate.parser.ast.Operation;
14+
15+
public class SourcePathIntrinsic extends IntrinsicBuilder {
16+
17+
public SourcePathIntrinsic() {
18+
super("sourcePath");
19+
}
20+
21+
@Override
22+
public Result create(Fields currentFields, Operation operation) {
23+
Fields toFields = fieldsParser().asFields(operation.results());
24+
25+
if (toFields.isNone()) {
26+
toFields = new Fields("source_path", String.class);
27+
}
28+
29+
if (toFields.size() != 1) {
30+
throw new IllegalArgumentException("results may only have one field");
31+
}
32+
33+
SourcePathFunction function = new SourcePathFunction(toFields);
34+
35+
return new Result(Fields.NONE, function, toFields);
36+
}
37+
}

tessellate-main/src/test/java/io/clusterless/tessellate/pipeline/PipelineTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,8 @@ void awsS3AccessLogWithTransforms(@PathForResource("/data/aws-s3-access-log.txt"
350350
"httpStatusString|Integer",// coerce: "httpStatusString|String"
351351
"requestID->",// discard: "httpStatusString|String"
352352
"200=>code|Integer", // insert
353-
"=> _empty|Integer" // insert
353+
"=> _empty|Integer", // insert
354+
"^sourcePath{} +> source_path"
354355
))
355356
.withSink(Sink.builder()
356357
.withOutput(output)
@@ -377,7 +378,7 @@ void awsS3AccessLogWithTransforms(@PathForResource("/data/aws-s3-access-log.txt"
377378
CascadingTesting.validateEntries(
378379
pipeline.flow().openSink(),
379380
l -> assertEquals(4, l, "wrong file length"), // headers are declared so aren't counted
380-
l -> assertEquals(merged.source().schema().declared().size() + 1 + 1 + 1, l, "wrong tuple size"),
381+
l -> assertEquals(merged.source().schema().declared().size() + 1 + 1 + 1 + 1, l, "wrong tuple size"),
381382
l -> {
382383
}
383384
);

0 commit comments

Comments
 (0)