Skip to content

Commit 476357b

Browse files
authored
Merge pull request #13 from data-integrations/feature_release/CDAP-14481-widget-improvements
CDAP-14481 documentation and widget improvements
2 parents d62b372 + cc38265 commit 476357b

File tree

5 files changed

+274
-225
lines changed

5 files changed

+274
-225
lines changed

docs/S3-batchsink.md

Lines changed: 17 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -3,74 +3,35 @@
33

44
Description
55
-----------
6-
A batch sink for writing to Amazon S3 in various formats.
7-
8-
9-
Use Case
10-
--------
11-
This sink is used whenever you need to write to Amazon S3 in Avro format. For example,
6+
This sink is used whenever you need to write to Amazon S3 in various formats. For example,
127
you might want to create daily snapshots of a database by reading the entire contents of a
138
table, writing to this sink, and then other programs can analyze the contents of the
14-
specified file. The output of the run will be stored in a directory with suffix
15-
'yyyy-MM-dd-HH-mm' from the base path provided.
9+
specified file.
1610

1711

1812
Properties
1913
----------
20-
**referenceName:** This will be used to uniquely identify this sink for lineage, annotating metadata, etc.
21-
22-
**authenticationMethod:** Authentication method to access S3. Defaults to Access Credentials.
23-
User need to have AWS environment only to use IAM role based authentication.
24-
URI scheme should be s3a:// for S3AFileSystem or s3n:// for S3NativeFileSystem. (Macro-enabled)
25-
26-
**accessID:** Access ID of the Amazon S3 instance to connect to. (Macro-enabled)
14+
**Reference Name:** Name used to uniquely identify this sink for lineage, annotating metadata, etc.
2715

28-
**accessKey:** Access Key of the Amazon S3 instance to connect to. (Macro-enabled)
16+
**Path:** Path to write to. For example, s3a://<bucket>/path/to/output
2917

30-
**basePath:** The S3 path where the data is stored. Example: 's3a://logs'. (Macro-enabled)
18+
**Path Suffix:** Time format for the output directory that will be appended to the path.
19+
For example, the format 'yyyy-MM-dd-HH-mm' will result in a directory of the form '2015-01-01-20-42'.
20+
If not specified, nothing will be appended to the path."
3121

32-
**enableEncryption:** Server side encryption. Defaults to True. Sole supported algorithm is AES256. (Macro-enabled)
33-
34-
**fileSystemProperties:** A JSON string representing a map of properties needed for the
35-
distributed file system. The property names needed for S3 (*accessID* and *accessKey*)
36-
will be included as ``'fs.s3a.access.key'`` and ``'fs.s3a.secret.key'`` for S3AFileSystem.
37-
For S3NativeFileSystem ``'fs.s3n.awsSecretAccessKey'`` and ``'fs.s3n.awsAccessKeyId'`` will be used. (Macro-enabled)
22+
**Format:** Format to write the records in.
23+
The format must be one of 'json', 'avro', 'parquet', 'csv', 'tsv', or 'delimited'.
3824

39-
**Directory Time Format:** The format for the path that will be suffixed to the basePath; for
40-
example: the format ``'yyyy-MM-dd-HH-mm'`` will create a file path ending in
41-
``'2015-01-01-20-42'``. Default format used is ``'yyyy-MM-dd-HH-mm'``. (Macro-enabled)
25+
**Delimiter:** Delimiter to use if the format is 'delimited'.
26+
The delimiter will be ignored if the format is anything other than 'delimited'.
4227

43-
**Format:** Format to write the records in.
44-
The format must be one of 'json', 'avro', 'parquet', 'csv', 'tsv', or 'delimited'. (Macro-enabled)
28+
**Authentication Method:** Authentication method to access S3. The default value is Access Credentials.
29+
IAM can only be used if the plugin is run in an AWS environment, such as on EMR.
4530

46-
**schema:** The Schema of the record being written to the sink as a JSON object. (Macro-enabled)
31+
**Access ID:** Amazon access ID required for authentication.
4732

33+
**Access Key:** Amazon access key required for authentication.
4834

49-
Example
50-
-------
51-
This example will use Access Credentials authentication and write to an S3 output located at ``s3a://logs``. It will write data in
52-
Avro format compressed using Snappy format and using the given schema. Every time the pipeline
53-
runs, a new output directory from the base path (``s3a://logs``) will be created which
54-
will have the directory name corresponding to the start time in ``yyyy-MM-dd-HH-mm`` format:
35+
**Enable Encryption:** Whether to enable server side encryption. The sole supported algorithm is AES256.
5536

56-
{
57-
"name": "S3Avro",
58-
"type": "batchsink",
59-
"properties": {
60-
"authenticationMethod": "Access Credentials",
61-
"accessKey": "key",
62-
"accessID": "ID",
63-
"basePath": "s3a://logs",
64-
"pathFormat": "yyyy-MM-dd-HH-mm",
65-
"compressionCodec": "Snappy",
66-
"schema": "{
67-
\"type\":\"record\",
68-
\"name\":\"user\",
69-
\"fields\":[
70-
{\"name\":\"id\",\"type\":\"long\"},
71-
{\"name\":\"name\",\"type\":\"string\"},
72-
{\"name\":\"birthyear\",\"type\":\"int\"}
73-
]
74-
}"
75-
}
76-
}
37+
**File System Properties:** Additional properties to use with the OutputFormat when reading the data.

docs/S3-batchsource.md

Lines changed: 23 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -3,68 +3,45 @@
33

44
Description
55
-----------
6-
Batch source to use Amazon S3 as a Source.
7-
8-
9-
Use Case
10-
--------
116
This source is used whenever you need to read from Amazon S3.
127
For example, you may want to read in log files from S3 every hour and then store
138
the logs in a TimePartitionedFileSet.
149

15-
1610
Properties
1711
----------
18-
**referenceName:** This will be used to uniquely identify this source for lineage, annotating metadata, etc.
19-
20-
**authenticationMethod:** Authentication method to access S3. Defaults to Access Credentials.
21-
User need to have AWS environment only to use IAM role based authentication. URI scheme should be s3a:// for S3AFileSystem or s3n:// for S3NativeFileSystem. (Macro-enabled)
22-
23-
**accessID:** Access ID of the Amazon S3 instance to connect to. Mandatory if authentication method is Access credentials. (Macro-enabled)
12+
**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc.
2413

25-
**accessKey:** Access Key of the Amazon S3 instance to connect to. Mandatory if authentication method is Access credentials. (Macro-enabled)
26-
27-
**path:** Path to file(s) to be read. If a directory is specified,
28-
terminate the path name with a '/'. The path uses filename expansion (globbing) to read files. (Macro-enabled)
14+
**Path:** Path to read from. For example, s3a://<bucket>/path/to/input
2915

3016
**Format:** Format of the data to read.
3117
The format must be one of 'avro', 'blob', 'csv', 'delimited', 'json', 'parquet', 'text', or 'tsv'.
3218
If the format is 'blob', every input file will be read into a separate record.
3319
The 'blob' format also requires a schema that contains a field named 'body' of type 'bytes'.
34-
If the format is 'text', the schema must contain a field named 'body' of type 'string'. (Macro-enabled)
20+
If the format is 'text', the schema must contain a field named 'body' of type 'string'.
21+
22+
**Delimiter:** Delimiter to use when the format is 'delimited'. This will be ignored for other formats.
23+
24+
**Authentication Method:** Authentication method to access S3. The default value is Access Credentials.
25+
IAM can only be used if the plugin is run in an AWS environment, such as on EMR.
26+
27+
**Access ID:** Amazon access ID required for authentication.
3528

36-
**fileRegex:** Regex to filter out files in the path. It accepts regular expression which is applied to the complete
37-
path and returns the list of files that match the specified pattern.
29+
**Access Key:** Amazon access key required for authentication.
3830

39-
**maxSplitSize:** Maximum split-size for each mapper in the MapReduce Job. Defaults to 128MB. (Macro-enabled)
31+
**Maximum Split Size:** Maximum size in bytes for each input partition.
32+
Smaller partitions will increase the level of parallelism, but will require more resources and overhead.
33+
The default value is 128MB.
4034

41-
**ignoreNonExistingFolders:** Identify if path needs to be ignored or not, for case when directory or file does not
42-
exists. If set to true it will treat the not present folder as 0 input and log a warning. Default is false.
35+
**Path Field:** Output field to place the path of the file that the record was read from.
36+
If not specified, the file path will not be included in output records.
37+
If specified, the field must exist in the output schema as a string.
4338

44-
**recursive:** Boolean value to determine if files are to be read recursively from the path. Default is false.
39+
**Path Filename Only:** Whether to only use the filename instead of the URI of the file path when a path field is given.
40+
The default value is false.
4541

42+
**Read Files Recursively:** Whether files are to be read recursively from the path. The default value is false.
4643

47-
Example
48-
-------
49-
This example connects to Amazon S3 using Access Credentials and reads in files found in the specified directory while
50-
using the stateful ``timefilter``, which ensures that each file is read only once. The ``timefilter``
51-
requires that files be named with either the convention "yy-MM-dd-HH..." (S3) or "...'.'yy-MM-dd-HH..."
52-
(Cloudfront). The stateful metadata is stored in a table named 'timeTable'. With the maxSplitSize
53-
set to 1MB, if the total size of the files being read is larger than 1MB, CDAP will
54-
configure Hadoop to use one mapper per MB:
44+
**Allow Empty Input:** Whether to allow an input path that contains no data. When set to false, the plugin
45+
will error when there is no data to read. When set to true, no error will be thrown and zero records will be read.
5546

56-
{
57-
"name": "S3",
58-
"type": "batchsource",
59-
"properties": {
60-
"authenticationMethod": "Access Credentials",
61-
"accessKey": "key",
62-
"accessID": "ID",
63-
"path": "s3a://path/to/logs/",
64-
"fileRegex": "timefilter",
65-
"timeTable": "timeTable",
66-
"maxSplitSize": "1048576",
67-
"ignoreNonExistingFolders": "false",
68-
"recursive": "false"
69-
}
70-
}
47+
**File System Properties:** Additional properties to use with the InputFormat when reading the data.

src/main/java/co/cask/aws/s3/source/S3BatchSource.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@
2020
import co.cask.cdap.api.annotation.Macro;
2121
import co.cask.cdap.api.annotation.Name;
2222
import co.cask.cdap.api.annotation.Plugin;
23+
import co.cask.cdap.api.data.schema.Schema;
24+
import co.cask.cdap.api.plugin.EndpointPluginContext;
2325
import co.cask.cdap.etl.api.batch.BatchSource;
2426
import co.cask.cdap.etl.api.batch.BatchSourceContext;
2527
import co.cask.hydrator.common.LineageRecorder;
28+
import co.cask.hydrator.format.FileFormat;
2629
import co.cask.hydrator.format.input.PathTrackingInputFormat;
2730
import co.cask.hydrator.format.plugin.AbstractFileSource;
2831
import co.cask.hydrator.format.plugin.AbstractFileSourceConfig;
@@ -35,6 +38,7 @@
3538
import java.util.List;
3639
import java.util.Map;
3740
import javax.annotation.Nullable;
41+
import javax.ws.rs.Path;
3842

3943
/**
4044
* A {@link BatchSource} that reads from Amazon S3.
@@ -81,6 +85,23 @@ protected void recordLineage(LineageRecorder lineageRecorder, List<String> outpu
8185
lineageRecorder.recordRead("Read", "Read from S3.", outputFields);
8286
}
8387

88+
/**
89+
* Endpoint method to get the output schema of a source.
90+
*
91+
* @param config configuration for the source
92+
* @param pluginContext context to create plugins
93+
* @return schema of fields
94+
*/
95+
@Path("getSchema")
96+
public Schema getSchema(S3BatchConfig config, EndpointPluginContext pluginContext) {
97+
FileFormat fileFormat = config.getFormat();
98+
if (fileFormat == null) {
99+
return config.getSchema();
100+
}
101+
Schema schema = fileFormat.getSchema(config.getPathField());
102+
return schema == null ? config.getSchema() : schema;
103+
}
104+
84105
/**
85106
* Config class that contains properties needed for the S3 source.
86107
*/
@@ -108,7 +129,7 @@ public static class S3BatchConfig extends AbstractFileSourceConfig {
108129
@Nullable
109130
@Description("Authentication method to access S3. " +
110131
"Defaults to Access Credentials. URI scheme should be s3a:// for S3AFileSystem or s3n:// for " +
111-
"S3NativeFileSystem. (Macro-enabled)")
132+
"S3NativeFileSystem.")
112133
private String authenticationMethod;
113134

114135
@Macro

widgets/S3-batchsink.json

Lines changed: 75 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -6,44 +6,28 @@
66
"display-name": "Amazon S3",
77
"configuration-groups": [
88
{
9-
"label": "S3 Avro Batch Sink",
9+
"label": "Basic",
1010
"properties": [
1111
{
1212
"widget-type": "textbox",
1313
"label": "Reference Name",
1414
"name": "referenceName"
1515
},
16-
{
17-
"widget-type": "select",
18-
"label": "Authentication Method",
19-
"name": "authenticationMethod",
20-
"widget-attributes": {
21-
"values": [
22-
"Access Credentials",
23-
"IAM"
24-
],
25-
"default": "Access Credentials"
26-
}
27-
},
28-
{
29-
"widget-type": "textbox",
30-
"label": "Access ID",
31-
"name": "accessID"
32-
},
33-
{
34-
"widget-type": "textbox",
35-
"label": "Access Key",
36-
"name": "accessKey"
37-
},
3816
{
3917
"widget-type": "textbox",
4018
"label": "Path",
41-
"name": "path"
19+
"name": "path",
20+
"widget-attributes": {
21+
"placeholder": "s3a://<bucket>/path/to/output"
22+
}
4223
},
4324
{
4425
"widget-type": "textbox",
45-
"label": "Directory Time Format",
46-
"name": "suffix"
26+
"label": "Path Suffix",
27+
"name": "suffix",
28+
"widget-attributes" : {
29+
"default": "yyyy-MM-dd-HH-mm"
30+
}
4731
},
4832
{
4933
"widget-type": "select",
@@ -64,24 +48,79 @@
6448
{
6549
"widget-type": "textbox",
6650
"label": "Delimiter",
67-
"name": "delimiter"
51+
"name": "delimiter",
52+
"widget-attributes": {
53+
"placeholder": "Delimiter if the format is 'delimited'"
54+
}
55+
}
56+
]
57+
},
58+
{
59+
"label": "Credentials",
60+
"properties": [
61+
{
62+
"widget-type": "radio-group",
63+
"label": "Authentication Method",
64+
"name": "authenticationMethod",
65+
"widget-attributes": {
66+
"layout": "inline",
67+
"default": "Access Credentials",
68+
"options": [
69+
{
70+
"id": "Access Credentials",
71+
"label": "Access Credentials"
72+
},
73+
{
74+
"id": "IAM",
75+
"label": "IAM"
76+
}
77+
]
78+
}
6879
},
6980
{
70-
"widget-type": "select",
71-
"label": "Server Side Encryption",
72-
"name": "enableEncryption",
81+
"widget-type": "textbox",
82+
"label": "Access ID",
83+
"name": "accessID",
7384
"widget-attributes": {
74-
"values": [
75-
"True",
76-
"False"
77-
],
78-
"default": "True"
85+
"placeholder": "Amazon Access ID"
7986
}
8087
},
88+
{
89+
"widget-type": "password",
90+
"label": "Access Key",
91+
"name": "accessKey",
92+
"widget-attributes": {
93+
"placeholder": "Amazon Access Key"
94+
}
95+
}
96+
]
97+
},
98+
{
99+
"label": "Advanced",
100+
"properties": [
81101
{
82102
"widget-type": "json-editor",
83103
"label": "File System Properties",
84104
"name": "fileSystemProperties"
105+
},
106+
{
107+
"widget-type": "select",
108+
"label": "Enable Encryption",
109+
"name": "enableEncryption",
110+
"widget-attributes": {
111+
"layout": "inline",
112+
"default": "true",
113+
"options": [
114+
{
115+
"id": "true",
116+
"label": "True"
117+
},
118+
{
119+
"id": "false",
120+
"label": "False"
121+
}
122+
]
123+
}
85124
}
86125
]
87126
}
@@ -98,8 +137,7 @@
98137
"float",
99138
"double",
100139
"bytes",
101-
"string",
102-
"map<string, string>"
140+
"string"
103141
],
104142
"schema-default-type": "string"
105143
}

0 commit comments

Comments
 (0)