Skip to content

Commit 3e48a15

Browse files
committed
feat: s3 support example
1 parent 5e51305 commit 3e48a15

File tree

4 files changed

+27
-3
lines changed

4 files changed

+27
-3
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ repositories {
2323
dependencies {
2424
testCompile group: 'junit', name: 'junit', version: '4.12'
2525
compile files('beam-sdks-java-io-snowflake-2.22.0-SNAPSHOT.jar')
26+
compile 'org.apache.beam:beam-sdks-java-io-amazon-web-services:2.22.0'
2627
compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version: '2.22.0'
2728
compile group: 'org.apache.beam', name: 'beam-runners-direct-java', version: '2.22.0'
2829
compile group: 'org.apache.beam', name: 'beam-runners-google-cloud-dataflow-java', version: '2.22.0'

src/main/java/batching/SnowflakeWordCountOptions.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@
44
import org.apache.beam.sdk.options.Default;
55
import org.apache.beam.sdk.options.Description;
66
import org.apache.beam.sdk.options.Validation.Required;
7+
import org.apache.beam.sdk.io.aws.options.S3Options;
78

89
/**
910
* Supported PipelineOptions used in provided examples.
1011
*/
11-
public interface SnowflakeWordCountOptions extends SnowflakePipelineOptions {
12+
public interface SnowflakeWordCountOptions extends SnowflakePipelineOptions, S3Options {
1213

1314
@Description("Path of the file to read from")
1415
@Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
@@ -21,4 +22,16 @@ public interface SnowflakeWordCountOptions extends SnowflakePipelineOptions {
2122
String getOutput();
2223

2324
void setOutput(String value);
25+
26+
@Description("AWS Access Key")
27+
@Required
28+
String getAwsAccessKey();
29+
30+
void setAwsAccessKey(String awsAccessKey);
31+
32+
@Description("AWS secret key")
33+
@Required
34+
String getAwsSecretKey();
35+
36+
void setAwsSecretKey(String awsSecretKey);
2437
}

src/main/java/batching/WordCountExample.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@
2323
import org.apache.beam.sdk.values.PBegin;
2424
import org.apache.beam.sdk.values.PCollection;
2525
import org.apache.beam.sdk.values.PDone;
26+
import com.amazonaws.auth.AWSCredentials;
27+
import com.amazonaws.auth.AWSStaticCredentialsProvider;
28+
import com.amazonaws.auth.BasicAWSCredentials;
2629

2730
/**
2831
* An example that contains batch writing and reading from Snowflake. Inspired by Apache Beam/WordCount-example(https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java)
29-
*
32+
* <p>
3033
* Check main README for more information.
3134
*/
3235
public class WordCountExample {
@@ -35,10 +38,18 @@ public static void main(String[] args) {
3538
SnowflakeWordCountOptions options =
3639
PipelineOptionsFactory.fromArgs(args).withValidation().as(SnowflakeWordCountOptions.class);
3740

41+
options = parseAwsOptions(options);
42+
3843
runWritingToSnowflake(options);
3944
runReadingFromSnowflake(options);
4045
}
4146

47+
private static SnowflakeWordCountOptions parseAwsOptions(SnowflakeWordCountOptions options) {
48+
AWSCredentials awsCredentials = new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey());
49+
options.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials));
50+
51+
return options;
52+
}
4253

4354
private static void runWritingToSnowflake(SnowflakeWordCountOptions options) {
4455
Pipeline p = Pipeline.create(options);

src/main/java/streaming/TaxiRidesExample.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ public static void main(String[] args) {
4444
.withSnowPipe(options.getSnowPipe())
4545
.withFileNameTemplate(UUID.randomUUID().toString())
4646
.withFlushTimeLimit(Duration.millis(3000))
47-
.withDebugMode(SnowflakeIO.StreamingLogLevel.INFO)
4847
.withFlushRowLimit(100)
4948
.withQuotationMark("")
5049
.withShardsNumber(1));

0 commit comments

Comments
 (0)