Skip to content

Commit c4a8080

Browse files
committed
Nullcheck JDBC Load. Fix(?) valid options being overwritten
1 parent 3f2205b commit c4a8080

File tree

10 files changed

+111
-31
lines changed

10 files changed

+111
-31
lines changed

API/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
<parent>
3232
<groupId>org.ohnlp.backbone</groupId>
3333
<artifactId>backbone-parent</artifactId>
34-
<version>3.0.22</version>
34+
<version>3.0.23</version>
3535
</parent>
3636

3737
<artifactId>api</artifactId>

Core/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>org.ohnlp.backbone</groupId>
99
<artifactId>backbone-parent</artifactId>
10-
<version>3.0.22</version>
10+
<version>3.0.23</version>
1111
</parent>
1212

1313
<artifactId>core</artifactId>
@@ -198,7 +198,7 @@
198198
<dependencies>
199199
<dependency>
200200
<groupId>org.apache.beam</groupId>
201-
<artifactId>beam-runners-flink-1.13</artifactId>
201+
<artifactId>beam-runners-flink-1.16</artifactId>
202202
<version>${beam.version}</version>
203203
</dependency>
204204
<!-- Note: this dependency is needed for every non-spark compile (where it is provided in runtime env)

Core/src/main/java/org/ohnlp/backbone/core/BackboneRunner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public class BackboneRunner {
1414
public static void main(String... args) throws IOException, ComponentInitializationException {
1515
PipelineOptionsFactory.register(BackbonePipelineOptions.class);
1616
BackbonePipelineOptions options =
17-
PipelineOptionsFactory.fromArgs(args).create().as(BackbonePipelineOptions.class);
17+
PipelineOptionsFactory.fromArgs(args).as(BackbonePipelineOptions.class);
1818
Pipeline p = Pipeline.create(options);
1919
// First read in the config and create an execution plan
2020
BackboneConfiguration config = new ObjectMapper().readValue(BackboneRunner.class.getResourceAsStream("/configs/" + options.getConfig()), BackboneConfiguration.class);

Example-Backbone-Configs/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>org.ohnlp.backbone</groupId>
99
<artifactId>backbone-parent</artifactId>
10-
<version>3.0.22</version>
10+
<version>3.0.23</version>
1111
</parent>
1212

1313
<artifactId>example-backbone-configs</artifactId>

IO/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>org.ohnlp.backbone</groupId>
99
<artifactId>backbone-parent</artifactId>
10-
<version>3.0.22</version>
10+
<version>3.0.23</version>
1111
</parent>
1212

1313
<groupId>org.ohnlp.backbone.io</groupId>

IO/src/main/java/org/ohnlp/backbone/io/jdbc/JDBCLoad.java

Lines changed: 72 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66
import org.apache.beam.sdk.values.PCollection;
77
import org.apache.beam.sdk.values.PDone;
88
import org.apache.beam.sdk.values.Row;
9+
import org.checkerframework.checker.initialization.qual.Initialized;
10+
import org.checkerframework.checker.nullness.qual.Nullable;
11+
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
12+
import org.joda.time.ReadableDateTime;
913
import org.ohnlp.backbone.api.annotations.ComponentDescription;
1014
import org.ohnlp.backbone.api.annotations.ConfigurationProperty;
1115
import org.ohnlp.backbone.api.annotations.InputColumnProperty;
@@ -14,9 +18,11 @@
1418
import org.ohnlp.backbone.api.exceptions.ComponentInitializationException;
1519

1620
import java.io.Serializable;
21+
import java.math.BigDecimal;
1722
import java.sql.PreparedStatement;
1823
import java.sql.SQLException;
1924
import java.sql.Timestamp;
25+
import java.sql.Types;
2026
import java.util.LinkedList;
2127
import java.util.List;
2228

@@ -143,57 +149,112 @@ public void map(Row row, PreparedStatement ps) throws SQLException {
143149
switch (type) {
144150
case BYTE:
145151
this.execute = (data, statement) -> {
146-
statement.setByte(this.idx, data.getByte(this.fieldName));
152+
@Nullable @UnknownKeyFor @Initialized Byte val = data.getByte(this.fieldName);
153+
if (val != null) {
154+
statement.setByte(this.idx, val);
155+
} else {
156+
statement.setNull(this.idx, Types.TINYINT);
157+
}
147158
};
148159
break;
149160
case INT16:
150161
this.execute = (data, statement) -> {
151-
statement.setShort(this.idx, data.getInt16(this.fieldName));
162+
Short val = data.getInt16(this.fieldName);
163+
if (val != null) {
164+
statement.setShort(this.idx, val);
165+
} else {
166+
statement.setNull(this.idx, Types.SMALLINT);
167+
}
152168
};
153169
break;
154170
case INT32:
155171
this.execute = (data, statement) -> {
156-
statement.setInt(this.idx, data.getInt32(this.fieldName));
172+
Integer val = data.getInt32(this.fieldName);
173+
if (val != null) {
174+
statement.setInt(this.idx, val);
175+
} else {
176+
statement.setNull(this.idx, Types.INTEGER);
177+
}
157178
};
158179
break;
159180
case INT64:
160181
this.execute = (data, statement) -> {
161-
statement.setLong(this.idx, data.getInt64(this.fieldName));
182+
Long val = data.getInt64(this.fieldName);
183+
if (val != null) {
184+
statement.setLong(this.idx, val);
185+
} else {
186+
statement.setNull(this.idx, Types.BIGINT);
187+
}
162188
};
163189
break;
164190
case DECIMAL:
165191
this.execute = (data, statement) -> {
166-
statement.setBigDecimal(this.idx, data.getDecimal(this.fieldName));
192+
BigDecimal val = data.getDecimal(this.fieldName);
193+
if (val != null) {
194+
statement.setBigDecimal(this.idx, val);
195+
} else {
196+
statement.setNull(this.idx, Types.DECIMAL);
197+
}
167198
};
168199
break;
169200
case FLOAT:
170201
this.execute = (data, statement) -> {
171-
statement.setFloat(this.idx, data.getFloat(this.fieldName));
202+
Float val = data.getFloat(this.fieldName);
203+
if (val != null) {
204+
statement.setFloat(this.idx, val);
205+
} else {
206+
statement.setNull(this.idx, Types.FLOAT);
207+
}
172208
};
173209
break;
174210
case DOUBLE:
175211
this.execute = (data, statement) -> {
176-
statement.setDouble(this.idx, data.getDouble(this.fieldName));
212+
Double val = data.getDouble(this.fieldName);
213+
if (val != null) {
214+
statement.setDouble(this.idx, val);
215+
} else {
216+
statement.setNull(this.idx, Types.DOUBLE);
217+
}
177218
};
178219
break;
179220
case STRING:
180221
this.execute = (data, statement) -> {
181-
statement.setString(this.idx, data.getString(this.fieldName));
222+
String val = data.getString(this.fieldName);
223+
if (val != null) {
224+
statement.setString(this.idx, val);
225+
} else {
226+
statement.setNull(this.idx, Types.VARCHAR);
227+
}
182228
};
183229
break;
184230
case DATETIME:
185231
this.execute = (data, statement) -> {
186-
statement.setTimestamp(this.idx, new Timestamp(data.getDateTime(this.fieldName).getMillis()));
232+
ReadableDateTime val = data.getDateTime(this.fieldName);
233+
if (val != null) {
234+
statement.setTimestamp(this.idx, new Timestamp(val.getMillis()));
235+
} else {
236+
statement.setNull(this.idx, Types.TIMESTAMP);
237+
}
187238
};
188239
break;
189240
case BOOLEAN:
190241
this.execute = (data, statement) -> {
191-
statement.setBoolean(this.idx, data.getBoolean(this.fieldName));
242+
Boolean val = data.getBoolean(this.fieldName);
243+
if (val != null) {
244+
statement.setBoolean(this.idx, val);
245+
} else {
246+
statement.setNull(this.idx, Types.BOOLEAN);
247+
}
192248
};
193249
break;
194250
case BYTES:
195251
this.execute = (data, statement) -> {
196-
statement.setBytes(this.idx, data.getBytes(this.fieldName));
252+
byte[] val = data.getBytes(this.fieldName);
253+
if (val != null) {
254+
statement.setBytes(this.idx, val);
255+
} else {
256+
statement.setNull(this.idx, Types.VARBINARY);
257+
}
197258
};
198259
break;
199260
default:

Plugin-Manager/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>org.ohnlp.backbone</groupId>
99
<artifactId>backbone-parent</artifactId>
10-
<version>3.0.22</version>
10+
<version>3.0.23</version>
1111
</parent>
1212

1313
<artifactId>plugin-manager</artifactId>

Transforms/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>org.ohnlp.backbone</groupId>
99
<artifactId>backbone-parent</artifactId>
10-
<version>3.0.22</version>
10+
<version>3.0.23</version>
1111
</parent>
1212

1313
<groupId>org.ohnlp.backbone.transforms</groupId>

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>org.ohnlp.backbone</groupId>
88
<artifactId>backbone-parent</artifactId>
9-
<version>3.0.22</version>
9+
<version>3.0.23</version>
1010

1111

1212
<properties>

scripts/run_pipeline_local.sh

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,43 @@
77
BACKBONEDIR=$(cd `dirname $0` && pwd)
88
cd $BACKBONEDIR
99

10-
echo "Repackaging Backbone with Current Configs, Modules, and Resources"
11-
java -cp bin/Plugin-Manager.jar org.ohnlp.backbone.pluginmanager.PluginManager Flink
1210

13-
FLINK_DIR=flink-1.13.6/bin
11+
OLD_FLINK_DIRS=(flink-1.17.1 flink-1.13.6)
12+
UPGRADE=false
13+
UPGRADE_PATH=NONE
14+
for olddir in $OLD_FLINK_DIRS; do
15+
if [ -f "$olddir/conf/flink-conf.yaml" ]; then
16+
UPGRADE=true
17+
UPGRADE_PATH=$olddir/conf/flink-conf.yaml
18+
break
19+
fi
20+
done
21+
22+
FLINK_DIR=flink-1.18.0/bin
1423
FLINK_EXECUTABLE=$FLINK_DIR/flink
1524
if [ -f "$FLINK_EXECUTABLE" ]; then
1625
echo "Embedded Flink Cluster Already Setup - Skipping New Install"
1726
else
18-
echo "Downloading Apache Flink for Local Run -"
19-
wget https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz -O flink.tgz
27+
if $UPGRADE; then
28+
echo "Flink Update Detected, Upgrading to v1.18.0 from " $UPGRADE_PATH
29+
else
30+
echo "Downloading Apache Flink for Local Run"
31+
fi
32+
wget https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz -O flink.tgz
2033
tar -xf flink.tgz
21-
echo "***Important***: Please adjust default flink settings located at flink-1.13.6/conf/flink-conf.yaml to match your hardware"
22-
echo "Particularly taskmanager.numberOfTaskSlots (generally number of cores available for use, good starting point is CPU * .8 rounded down), "
23-
echo "parallelism.default (set equal to number of task slots), "
24-
echo "and taskmanager.memory.process.size (good start is 2GB * number of task slots)"
25-
read -p "When done, press [ENTER] to continue"
34+
if $UPGRADE; then
35+
cp $UPGRADE_PATH $FLINK_DIR/../conf/flink-conf.yaml
36+
else
37+
echo "***Important***: Please adjust default flink settings located at flink-1.13.6/conf/flink-conf.yaml to match your hardware"
38+
echo "Particularly taskmanager.numberOfTaskSlots (generally number of cores available for use, good starting point is CPU * .8 rounded down), "
39+
echo "parallelism.default (set equal to number of task slots), "
40+
echo "and taskmanager.memory.process.size (good start is 2GB * number of task slots)"
41+
read -p "When done, press [ENTER] to continue"
2642
fi
2743

44+
echo "Repackaging Backbone with Current Configs, Modules, and Resources"
45+
java -cp bin/Plugin-Manager.jar org.ohnlp.backbone.pluginmanager.PluginManager Flink
46+
2847
if [ $# -eq 0 ]; then
2948
cd configs
3049
echo "No configuration parameter supplied, scanning for Available Configurations..."
@@ -56,7 +75,7 @@ if [ -f "$BACKBONE_PACKAGED_FILE" ]; then
5675
echo "Flink Cluster Started - Job Progress Can be Seen via Configured WebUI Port (Default: localhost:8081)"
5776
echo "Submitting Job..."
5877
$FLINK_DIR/flink run -c org.ohnlp.backbone.core.BackboneRunner bin/Backbone-Core-Flink-Packaged.jar --runner=FlinkRunner --config=$BACKBONE_CONFIG
59-
echo "Job Complete, Shutting Down Embedded Flink Cluster"
78+
read -p "Job Complete, Press [ENTER] to Shut Down Embedded Flink Cluster"
6079
$FLINK_DIR/stop-cluster.sh
6180
else
6281
echo "Packaged backbone installation does not exist. Run package_modules_and_configs for your platform first!"

0 commit comments

Comments
 (0)