11# Conduit Connector S3
22
3- ### General
4- ![ scarf pixel] ( https://static.scarf.sh/a.png?x-pxid=191ed0af-67f7-4462-9fc0-13d1cb8e463c )
5- The S3 connector is one of [ Conduit] ( https://github.com/ConduitIO/conduit ) builtin plugins. It provides both, a source
6- and a destination S3 connectors.
7-
8- ### How to build it
9-
10- Run ` make ` .
11-
12- ### Testing
3+ <!-- readmegen:description -->
4+ The S3 connector is one of [ Conduit] ( https://github.com/ConduitIO/conduit )
5+ builtin plugins. It provides both, a source and a destination S3 connectors.
136
14- Run ` make test ` to run all the tests. You must set the environment variables (` AWS_ACCESS_KEY_ID ` ,
15- ` AWS_SECRET_ACCESS_KEY ` , ` AWS_REGION ` )
16- before you run all the tests. If not set, the tests that use these variables will be ignored.
17-
18- ## S3 Source
7+ ## Source
198
20- The S3 Source Connector connects to a S3 bucket with the provided configurations, using
21- ` aws.bucket ` , ` aws.accessKeyId ` ,` aws.secretAccessKey ` and ` aws.region ` . Then will call ` Configure ` to parse the
22- configurations and make sure the bucket exists, If the bucket doesn't exist, or the permissions fail, then an error will
23- occur. After that, the
24- ` Open ` method is called to start the connection from the provided position.
9+ The S3 Source Connector connects to a S3 bucket with the provided
10+ configurations, using ` aws.bucket ` , ` aws.accessKeyId ` ,` aws.secretAccessKey ` and
11+ ` aws.region ` . If the bucket doesn't exist, or the permissions fail, then an
12+ error will occur. After that, the ` Open ` method is called to start the
13+ connection from the provided position.
2514
2615### Change Data Capture (CDC)
2716
28- This connector implements CDC features for S3 by scanning the bucket for changes every
29- ` pollingPeriod ` and detecting any change that happened after a certain timestamp. These changes (update, delete, create)
30- are then inserted into a buffer that is checked on each Read request.
17+ This connector implements CDC features for S3 by scanning the bucket for changes
18+ every ` pollingPeriod ` and detecting any change that happened after a certain
19+ timestamp. These changes (update, delete, create) are then inserted into a
20+ buffer that is checked on each Read request.
3121
3222* To capture "delete" and "update", the S3 bucket versioning must be enabled.
3323* To capture "create" actions, the bucket versioning doesn't matter.
@@ -36,63 +26,190 @@ are then inserted into a buffer that is checked on each Read request.
3626
3727The connector goes through two modes.
3828
39- * Snapshot mode: which loops through the S3 bucket and returns the objects that are already in there. The _ Position_
40- during this mode is the object key attached to an underscore, an "s" for snapshot, and the _ maxLastModifiedDate_ found
41- so far. As an example: "thisIsAKey_s12345", which makes the connector know at what mode it is and what object it last
42- read. The _ maxLastModifiedDate_ will be used when changing to CDC mode, the iterator will capture changes that
43- happened after that.
44-
45- * CDC mode: this mode iterates through the S3 bucket every ` pollingPeriod ` and captures new actions made on the bucket.
46- the _ Position_ during this mode is the object key attached to an underscore, a "c" for CDC, and the object's
47- _ lastModifiedDate_ in seconds. As an example: "thisIsAKey_c1634049397". This position is used to return only the
48- actions with a _ lastModifiedDate_ higher than the last record returned, which will ensure that no duplications are in
29+ * Snapshot mode: which loops through the S3 bucket and returns the objects that
30+ are already in there. The _ Position_ during this mode is the object key
31+ attached to an underscore, an "s" for snapshot, and the _ maxLastModifiedDate_
32+ found so far. As an example: "thisIsAKey_s12345", which makes the connector
33+ know at what mode it is and what object it last read. The
34+ _ maxLastModifiedDate_ will be used when changing to CDC mode, the iterator
35+ will capture changes that happened after that.
36+
37+ * CDC mode: this mode iterates through the S3 bucket every ` pollingPeriod ` and
38+ captures new actions made on the bucket. the _ Position_ during this mode is
39+ the object key attached to an underscore, a "c" for CDC, and the object's
40+ _ lastModifiedDate_ in seconds. As an example: "thisIsAKey_c1634049397". This
41+ position is used to return only the actions with a _ lastModifiedDate_ higher
42+ than the last record returned, which will ensure that no duplications are in
4943 place.
5044
5145### Record Keys
5246
53- The S3 object key uniquely identifies the objects in an Amazon S3 bucket, which is why a record key is the key read from
54- the S3 bucket.
55-
56- ### Configuration
47+ The S3 object key uniquely identifies the objects in an Amazon S3 bucket, which
48+ is why a record key is the key read from the S3 bucket.
5749
58- The config passed to ` Configure ` can contain the following fields.
50+ ## Destination
5951
60- | name | description | required | example |
61- | -----------------------| ---------------------------------------------------------------------------------------| ----------| ---------------------|
62- | ` aws.accessKeyId ` | AWS access key id | yes | "THE_ACCESS_KEY_ID" |
63- | ` aws.secretAccessKey ` | AWS secret access key | yes | "SECRET_ACCESS_KEY" |
64- | ` aws.region ` | the AWS S3 bucket region | yes | "us-east-1" |
65- | ` aws.bucket ` | the AWS S3 bucket name | yes | "bucket_name" |
66- | ` pollingPeriod ` | polling period for the CDC mode, formatted as a time.Duration string. default is "1s" | no | "2s", "500ms" |
67- | ` prefix ` | the key prefix for S3 source | no | "conduit-" |
52+ The S3 Destination Connector connects to an S3 bucket with the provided
53+ configurations, using ` aws.bucket ` , ` aws.accessKeyId ` ,` aws.secretAccessKey ` and
54+ ` aws.region ` . If the permissions fail, the connector will not be ready for
55+ writing to S3.
6856
69- ### Known Limitations
57+ ### Writer
7058
71- * If a pipeline restarts during the snapshot, then the connector will start scanning the objects from the beginning of
72- the bucket, which could result in duplications.
59+ The S3 destination writer has a buffer with the size of ` bufferSize ` , for each
60+ time ` Write ` is called, a new record is added to the buffer. When the buffer is
61+ full, all the records from it will be written to the S3 bucket, and an ack
62+ function will be called for each record after being written.<!-- /readmegen:description -->
63+
64+ ## Source Configuration Parameters
65+
66+ <!-- readmegen:source.parameters.yaml -->
67+ ``` yaml
68+ version : 2.2
69+ pipelines :
70+ - id : example
71+ status : running
72+ connectors :
73+ - id : example
74+ plugin : " s3"
75+ settings :
76+ # AWS access key id.
77+ # Type: string
78+ aws.accessKeyId : " "
79+ # the AWS S3 bucket name.
80+ # Type: string
81+ aws.bucket : " "
82+ # the AWS S3 bucket region
83+ # Type: string
84+ aws.region : " "
85+ # AWS secret access key.
86+ # Type: string
87+ aws.secretAccessKey : " "
88+ # polling period for the CDC mode, formatted as a time.Duration
89+ # string.
90+ # Type: duration
91+ pollingPeriod : " 1s"
92+ # the S3 key prefix.
93+ # Type: string
94+ prefix : " "
95+ # Maximum delay before an incomplete batch is read from the source.
96+ # Type: duration
97+ sdk.batch.delay : " 0"
98+ # Maximum size of batch before it gets read from the source.
99+ # Type: int
100+ sdk.batch.size : " 0"
101+ # Specifies whether to use a schema context name. If set to false, no
102+ # schema context name will be used, and schemas will be saved with the
103+ # subject name specified in the connector (not safe because of name
104+ # conflicts).
105+ # Type: bool
106+ sdk.schema.context.enabled : " true"
107+ # Schema context name to be used. Used as a prefix for all schema
108+ # subject names. If empty, defaults to the connector ID.
109+ # Type: string
110+ sdk.schema.context.name : " "
111+ # Whether to extract and encode the record key with a schema.
112+ # Type: bool
113+ sdk.schema.extract.key.enabled : " false"
114+ # The subject of the key schema. If the record metadata contains the
115+ # field "opencdc.collection" it is prepended to the subject name and
116+ # separated with a dot.
117+ # Type: string
118+ sdk.schema.extract.key.subject : " key"
119+ # Whether to extract and encode the record payload with a schema.
120+ # Type: bool
121+ sdk.schema.extract.payload.enabled : " false"
122+ # The subject of the payload schema. If the record metadata contains
123+ # the field "opencdc.collection" it is prepended to the subject name
124+ # and separated with a dot.
125+ # Type: string
126+ sdk.schema.extract.payload.subject : " payload"
127+ # The type of the payload schema.
128+ # Type: string
129+ sdk.schema.extract.type : " avro"
130+ ` ` `
131+ <!-- /readmegen:source.parameters.yaml -->
132+
133+ ## Destination Configuration Parameters
134+
135+ <!-- readmegen:destination.parameters.yaml -->
136+ ` ` ` yaml
137+ version : 2.2
138+ pipelines :
139+ - id : example
140+ status : running
141+ connectors :
142+ - id : example
143+ plugin : " s3"
144+ settings :
145+ # AWS access key id.
146+ # Type: string
147+ aws.accessKeyId : " "
148+ # the AWS S3 bucket name.
149+ # Type: string
150+ aws.bucket : " "
151+ # the AWS S3 bucket region
152+ # Type: string
153+ aws.region : " "
154+ # AWS secret access key.
155+ # Type: string
156+ aws.secretAccessKey : " "
157+ # the destination format, either "json" or "parquet".
158+ # Type: string
159+ format : " "
160+ # the S3 key prefix.
161+ # Type: string
162+ prefix : " "
163+ # Maximum delay before an incomplete batch is written to the
164+ # destination.
165+ # Type: duration
166+ sdk.batch.delay : " 0"
167+ # Maximum size of batch before it gets written to the destination.
168+ # Type: int
169+ sdk.batch.size : " 0"
170+ # Allow bursts of at most X records (0 or less means that bursts are
171+ # not limited). Only takes effect if a rate limit per second is set.
172+ # Note that if `sdk.batch.size` is bigger than `sdk.rate.burst`, the
173+ # effective batch size will be equal to `sdk.rate.burst`.
174+ # Type: int
175+ sdk.rate.burst : " 0"
176+ # Maximum number of records written per second (0 means no rate
177+ # limit).
178+ # Type: float
179+ sdk.rate.perSecond : " 0"
180+ # The format of the output record. See the Conduit documentation for a
181+ # full list of supported formats
182+ # (https://conduit.io/docs/using/connectors/configuration-parameters/output-format).
183+ # Type: string
184+ sdk.record.format : " opencdc/json"
185+ # Options to configure the chosen output record format. Options are
186+ # normally key=value pairs separated with comma (e.g.
187+ # opt1=val2,opt2=val2), except for the `template` record format, where
188+ # options are a Go template.
189+ # Type: string
190+ sdk.record.format.options : " "
191+ # Whether to extract and decode the record key with a schema.
192+ # Type: bool
193+ sdk.schema.extract.key.enabled : " true"
194+ # Whether to extract and decode the record payload with a schema.
195+ # Type: bool
196+ sdk.schema.extract.payload.enabled : " true"
197+ ` ` `
198+ <!-- /readmegen:destination.parameters.yaml -->
73199
74- ## S3 Destination
200+ ### How to build it
75201
76- The S3 Destination Connector connects to an S3 bucket with the provided configurations, using
77- ` aws.bucket ` , ` aws.accessKeyId ` ,` aws.secretAccessKey ` and ` aws.region ` . Then will call ` Configure ` to parse the
78- configurations, If parsing was not successful, then an error will occur. After that, the ` Open ` method is called to
79- start the connection. If the permissions fail, the connector will not be ready for writing to S3.
202+ Run ` make`.
80203
81- ### Writer
204+ # ## Testing
82205
83- The S3 destination writer has a buffer with the size of ` bufferSize ` , for each time
84- ` Write ` is called, a new record is added to the buffer. When the buffer is full, all the records from it will be written
85- to the S3 bucket, and an ack function will be called for each record after being written .
206+ Run `make test` to run all the tests. You must set the environment variables (`AWS_ACCESS_KEY_ID`,
207+ ` AWS_SECRET_ACCESS_KEY ` , `AWS_REGION`)
208+ before you run all the tests. If not set, the tests that use these variables will be ignored .
86209
87- ### Configuration
210+ # ## Known Limitations
88211
89- The config passed to ` Configure ` can contain the following fields.
212+ * If a pipeline restarts during the snapshot, then the connector will start scanning the objects from the beginning of
213+ the bucket, which could result in duplications.
90214
91- | name | description | required | example |
92- | -----------------------| -----------------------------------------------------------------------------------------------------------------| ----------| ---------------------|
93- | ` aws.accessKeyId ` | AWS access key id | yes | "THE_ACCESS_KEY_ID" |
94- | ` aws.secretAccessKey ` | AWS secret access key | yes | "SECRET_ACCESS_KEY" |
95- | ` aws.region ` | the AWS S3 bucket region | yes | "us-east-1" |
96- | ` aws.bucket ` | the AWS S3 bucket name | yes | "bucket_name" |
97- | ` format ` | the destination format, either "json" or "parquet" | yes | "json" |
98- | ` prefix ` | the key prefix for S3 destination | no | "conduit-" |
215+ 
0 commit comments