Skip to content

Commit 4c1c116

Browse files
Merge pull request #37 from cloudsufi/snowflake-escape-char-conf
[PLUGIN-1816] Added fix for decimal issue not having rounding mode and made escape character configurable.
2 parents 40e8302 + d3911d7 commit 4c1c116

File tree

7 files changed

+29
-9
lines changed

7 files changed

+29
-9
lines changed

src/main/java/io/cdap/plugin/snowflake/common/util/SchemaHelper.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.cdap.plugin.snowflake.common.client.SnowflakeFieldDescriptor;
2525
import io.cdap.plugin.snowflake.common.exception.SchemaParseException;
2626
import io.cdap.plugin.snowflake.source.batch.SnowflakeBatchSourceConfig;
27+
import io.cdap.plugin.snowflake.source.batch.SnowflakeInputFormatProvider;
2728
import io.cdap.plugin.snowflake.source.batch.SnowflakeSourceAccessor;
2829
import java.io.IOException;
2930
import java.sql.Types;
@@ -62,7 +63,8 @@ public static Schema getSchema(SnowflakeBatchSourceConfig config, FailureCollect
6263
return getParsedSchema(config.getSchema());
6364
}
6465

65-
SnowflakeSourceAccessor snowflakeSourceAccessor = new SnowflakeSourceAccessor(config);
66+
SnowflakeSourceAccessor snowflakeSourceAccessor =
67+
new SnowflakeSourceAccessor(config, SnowflakeInputFormatProvider.PROPERTY_DEFAULT_ESCAPE_CHAR);
6668
return getSchema(snowflakeSourceAccessor, config.getSchema(), collector, config.getImportQuery());
6769
}
6870

src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeBatchSource.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.cdap.plugin.snowflake.source.batch;
1818

19+
import com.google.common.base.Strings;
1920
import io.cdap.cdap.api.annotation.Description;
2021
import io.cdap.cdap.api.annotation.Name;
2122
import io.cdap.cdap.api.annotation.Plugin;
@@ -33,6 +34,7 @@
3334
import io.cdap.plugin.snowflake.common.util.SchemaHelper;
3435
import org.apache.hadoop.io.NullWritable;
3536

37+
import java.util.HashMap;
3638
import java.util.Map;
3739
import java.util.stream.Collectors;
3840

@@ -68,7 +70,11 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
6870
public void prepareRun(BatchSourceContext context) {
6971
FailureCollector failureCollector = context.getFailureCollector();
7072
config.validate(failureCollector);
71-
73+
Map<String, String> arguments = new HashMap<>(context.getArguments().asMap());
74+
String escapeChar = arguments.containsKey(SnowflakeInputFormatProvider.PROPERTY_ESCAPE_CHAR) &&
75+
!Strings.isNullOrEmpty(arguments.get(SnowflakeInputFormatProvider.PROPERTY_ESCAPE_CHAR))
76+
? arguments.get(SnowflakeInputFormatProvider.PROPERTY_ESCAPE_CHAR)
77+
: SnowflakeInputFormatProvider.PROPERTY_DEFAULT_ESCAPE_CHAR;
7278
Schema schema = SchemaHelper.getSchema(config, failureCollector);
7379
failureCollector.getOrThrowException();
7480

@@ -81,7 +87,7 @@ public void prepareRun(BatchSourceContext context) {
8187
.collect(Collectors.toList()));
8288
}
8389

84-
context.setInput(Input.of(config.getReferenceName(), new SnowflakeInputFormatProvider(config)));
90+
context.setInput(Input.of(config.getReferenceName(), new SnowflakeInputFormatProvider(config, escapeChar)));
8591
}
8692

8793
@Override

src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormat.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ private SnowflakeSourceAccessor getSnowflakeAccessor(Configuration configuration
6060
SnowflakeInputFormatProvider.PROPERTY_CONFIG_JSON);
6161
SnowflakeBatchSourceConfig config = GSON.fromJson(
6262
configJson, SnowflakeBatchSourceConfig.class);
63-
return new SnowflakeSourceAccessor(config);
63+
String escapeChar = configuration.get(SnowflakeInputFormatProvider.PROPERTY_ESCAPE_CHAR,
64+
SnowflakeInputFormatProvider.PROPERTY_DEFAULT_ESCAPE_CHAR);
65+
return new SnowflakeSourceAccessor(config, escapeChar);
6466
}
6567
}

src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeInputFormatProvider.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,17 @@
2929
public class SnowflakeInputFormatProvider implements InputFormatProvider {
3030

3131
public static final String PROPERTY_CONFIG_JSON = "cdap.snowflake.source.config";
32+
public static final String PROPERTY_ESCAPE_CHAR = "cdap.snowflake.source.escape";
33+
34+
public static final String PROPERTY_DEFAULT_ESCAPE_CHAR = "\\";
3235

3336
private static final Gson GSON = new Gson();
3437
private final Map<String, String> conf;
3538

36-
public SnowflakeInputFormatProvider(SnowflakeBatchSourceConfig config) {
39+
public SnowflakeInputFormatProvider(SnowflakeBatchSourceConfig config, String escapeChar) {
3740
this.conf = new ImmutableMap.Builder<String, String>()
3841
.put(PROPERTY_CONFIG_JSON, GSON.toJson(config))
42+
.put(PROPERTY_ESCAPE_CHAR, escapeChar)
3943
.build();
4044
}
4145

src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeMapToRecordTransformer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.slf4j.LoggerFactory;
2525

2626
import java.math.BigDecimal;
27+
import java.math.RoundingMode;
2728
import java.time.Instant;
2829
import java.time.LocalDate;
2930
import java.time.LocalTime;
@@ -83,7 +84,8 @@ private Object convertValue(String fieldName, String value, Schema fieldSchema)
8384
case TIME_MICROS:
8485
return TimeUnit.NANOSECONDS.toMicros(LocalTime.parse(value).toNanoOfDay());
8586
case DECIMAL:
86-
return new BigDecimal(value).setScale(fieldSchema.getScale()).unscaledValue().toByteArray();
87+
return new BigDecimal(value).setScale(fieldSchema.getScale(),
88+
RoundingMode.HALF_EVEN).unscaledValue().toByteArray();
8789
default:
8890
throw new IllegalArgumentException(
8991
String.format("Field '%s' is of unsupported type '%s'", fieldSchema.getDisplayName(),

src/main/java/io/cdap/plugin/snowflake/source/batch/SnowflakeSourceAccessor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,12 @@ public class SnowflakeSourceAccessor extends SnowflakeAccessor {
6060
"OVERWRITE=TRUE HEADER=TRUE SINGLE=FALSE";
6161
private static final String COMMAND_MAX_FILE_SIZE = " MAX_FILE_SIZE=%s";
6262
private final SnowflakeBatchSourceConfig config;
63+
private final char escapeChar;
6364

64-
public SnowflakeSourceAccessor(SnowflakeBatchSourceConfig config) {
65+
public SnowflakeSourceAccessor(SnowflakeBatchSourceConfig config, String escapeChar) {
6566
super(config);
6667
this.config = config;
68+
this.escapeChar = escapeChar.charAt(0);
6769
}
6870

6971
/**
@@ -116,7 +118,7 @@ public CSVReader buildCsvReader(String stageSplit) throws IOException {
116118
InputStream downloadStream = connection.unwrap(SnowflakeConnection.class)
117119
.downloadStream("@~", stageSplit, true);
118120
InputStreamReader inputStreamReader = new InputStreamReader(downloadStream);
119-
return new CSVReader(inputStreamReader);
121+
return new CSVReader(inputStreamReader, ',', '"', escapeChar);
120122
} catch (SQLException e) {
121123
throw new IOException(e);
122124
}

src/test/java/io/cdap/plugin/snowflake/common/client/SnowflakeAccessorTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import io.cdap.plugin.snowflake.Constants;
2020
import io.cdap.plugin.snowflake.common.BaseSnowflakeTest;
21+
import io.cdap.plugin.snowflake.source.batch.SnowflakeInputFormatProvider;
2122
import io.cdap.plugin.snowflake.source.batch.SnowflakeSourceAccessor;
2223
import org.junit.Assert;
2324
import org.junit.Test;
@@ -44,7 +45,8 @@
4445
*/
4546
public class SnowflakeAccessorTest extends BaseSnowflakeTest {
4647

47-
private SnowflakeSourceAccessor snowflakeAccessor = new SnowflakeSourceAccessor(CONFIG);
48+
private SnowflakeSourceAccessor snowflakeAccessor =
49+
new SnowflakeSourceAccessor(CONFIG, SnowflakeInputFormatProvider.PROPERTY_DEFAULT_ESCAPE_CHAR);
4850

4951
@Test
5052
public void testDescribeQuery() throws Exception {

0 commit comments

Comments
 (0)