Skip to content

Commit 3f2205b

Browse files
committed
Add option for FanOut on CSV Extract, readd schema init on BigQueryLoad
1 parent e3c377a commit 3f2205b

File tree

9 files changed

+20
-9
lines changed

9 files changed

+20
-9
lines changed

API/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
<parent>
3232
<groupId>org.ohnlp.backbone</groupId>
3333
<artifactId>backbone-parent</artifactId>
34-
<version>3.0.21</version>
34+
<version>3.0.22</version>
3535
</parent>
3636

3737
<artifactId>api</artifactId>

Core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>org.ohnlp.backbone</groupId>
99
<artifactId>backbone-parent</artifactId>
10-
<version>3.0.21</version>
10+
<version>3.0.22</version>
1111
</parent>
1212

1313
<artifactId>core</artifactId>

Example-Backbone-Configs/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>org.ohnlp.backbone</groupId>
99
<artifactId>backbone-parent</artifactId>
10-
<version>3.0.21</version>
10+
<version>3.0.22</version>
1111
</parent>
1212

1313
<artifactId>example-backbone-configs</artifactId>

IO/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>org.ohnlp.backbone</groupId>
99
<artifactId>backbone-parent</artifactId>
10-
<version>3.0.21</version>
10+
<version>3.0.22</version>
1111
</parent>
1212

1313
<groupId>org.ohnlp.backbone.io</groupId>

IO/src/main/java/org/ohnlp/backbone/io/bigquery/BigQueryLoad.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public void init() throws ComponentInitializationException {
3434

3535
@Override
3636
public POutput expand(PCollection<Row> input) {
37+
this.writeSchema = input.getSchema();
3738
return input.apply(
3839
"Transform output rows to BigQuery TableRow format", ParDo.of(
3940
new DoFn<Row, TableRow>() {

IO/src/main/java/org/ohnlp/backbone/io/local/CSVExtract.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@ public class CSVExtract extends ExtractToMany {
5252
)
5353
private boolean skipFirstRow;
5454

55+
@ConfigurationProperty(
56+
path = "fanout",
57+
desc = "Whether to reshuffle records after read. Defaults to false. Set to true if fanout is needed (i.e., if your individual CSV files are very large)",
58+
required = false
59+
)
60+
private boolean fanout = false;
61+
5562
@ConfigurationProperty(
5663
path = "schema",
5764
desc = "CSV File Schema"
@@ -65,11 +72,11 @@ public class CSVExtract extends ExtractToMany {
6572

6673
@Override
6774
public void init() throws ComponentInitializationException {
68-
6975
}
7076

7177
@Override
7278
public PCollectionRowTuple expand(PBegin input) {
79+
7380
PCollection<String> fileURIs = input.apply("Scan Input Directory for Partitioned Files", Create.of(Arrays.stream(Objects.requireNonNull(new File(dir).listFiles())).map(f -> f.toURI().toString()).collect(Collectors.toList()))).setCoder(StringUtf8Coder.of());
7481
PCollectionTuple readColls = fileURIs.apply("Read CSV Records and Map to Rows", ParDo.of(new DoFn<String, Row>() {
7582
private String[] header;
@@ -127,7 +134,10 @@ public void processFile(ProcessContext pc) {
127134
}).withOutputTags(new TupleTag<>("CSV Records"), TupleTagList.of(new TupleTag<>("Errored Records"))));
128135
PCollection<Row> read = readColls.get("CSV Records");
129136
read.setRowSchema(this.schema);
130-
PCollectionRowTuple ret = PCollectionRowTuple.of("CSV Records", read.apply("Break Fusion", Repartition.of())).and("Errored Records", readColls.get("Errored Records"));
137+
if (this.fanout) {
138+
read = read.apply("Break Fusion", Repartition.of());
139+
}
140+
PCollectionRowTuple ret = PCollectionRowTuple.of("CSV Records", read).and("Errored Records", readColls.get("Errored Records"));
131141
// Set Coders
132142
ret.get("CSV Records").setRowSchema(this.schema);
133143
ret.get("Errored Records").setRowSchema(this.errorSchema);

Plugin-Manager/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>org.ohnlp.backbone</groupId>
99
<artifactId>backbone-parent</artifactId>
10-
<version>3.0.21</version>
10+
<version>3.0.22</version>
1111
</parent>
1212

1313
<artifactId>plugin-manager</artifactId>

Transforms/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>org.ohnlp.backbone</groupId>
99
<artifactId>backbone-parent</artifactId>
10-
<version>3.0.21</version>
10+
<version>3.0.22</version>
1111
</parent>
1212

1313
<groupId>org.ohnlp.backbone.transforms</groupId>

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>org.ohnlp.backbone</groupId>
88
<artifactId>backbone-parent</artifactId>
9-
<version>3.0.21</version>
9+
<version>3.0.22</version>
1010

1111

1212
<properties>

0 commit comments

Comments
 (0)