Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ release_notes.md
.task
.vscode
.op
.gomodcache
__pycache__
329 changes: 329 additions & 0 deletions docs/modules/components/pages/inputs/aws_dynamodb_cdc.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,329 @@
= aws_dynamodb_cdc
:type: input
:status: beta
:categories: ["Services","AWS"]



////
THIS FILE IS AUTOGENERATED!

To make changes, edit the corresponding source file under:

https://github.com/redpanda-data/connect/tree/main/internal/impl/<provider>.

And:

https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl
////

// © 2024 Redpanda Data Inc.


component_type_dropdown::[]


Reads change data capture (CDC) events from DynamoDB Streams

Introduced in version 1.0.0.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to set version



[tabs]
======
Common::
+
--

```yml
# Common config fields, showing default values
input:
label: ""
aws_dynamodb_cdc:
table: "" # No default (required)
checkpoint_table: redpanda_dynamodb_checkpoints
start_from: trim_horizon
```

--
Advanced::
+
--

```yml
# All config fields, showing default values
input:
label: ""
aws_dynamodb_cdc:
table: "" # No default (required)
checkpoint_table: redpanda_dynamodb_checkpoints
batch_size: 1000
poll_interval: 1s
start_from: trim_horizon
checkpoint_limit: 1000
max_tracked_shards: 10000
region: "" # No default (optional)
endpoint: "" # No default (optional)
tcp:
connect_timeout: 0s
keep_alive:
idle: 15s
interval: 15s
count: 9
tcp_user_timeout: 0s
credentials:
profile: "" # No default (optional)
id: "" # No default (optional)
secret: "" # No default (optional)
token: "" # No default (optional)
from_ec2_role: false # No default (optional)
role: "" # No default (optional)
role_external_id: "" # No default (optional)
```

--
======

Consumes records from DynamoDB Streams with automatic checkpointing and shard management.

DynamoDB Streams capture item-level changes in DynamoDB tables. This input supports:
- Automatic shard discovery and management
- Checkpoint-based resumption after crashes
- Multiple shard processing

For better performance and longer retention, consider using Kinesis Data Streams for DynamoDB
with the `aws_kinesis` input instead.

== Metadata

This input adds the following metadata fields to each message:

- `dynamodb_shard_id` - The shard ID from which the record was read
- `dynamodb_sequence_number` - The sequence number of the record in the stream
- `dynamodb_event_name` - The type of change: INSERT, MODIFY, or REMOVE
- `dynamodb_table` - The name of the DynamoDB table

== Metrics

This input exposes the following metrics:

- `dynamodb_cdc_shards_tracked` - Total number of shards being tracked (gauge)
- `dynamodb_cdc_shards_active` - Number of active shards currently being read from (gauge)


== Fields

=== `table`

The name of the DynamoDB table to read streams from.


*Type*: `string`


=== `checkpoint_table`

DynamoDB table name for storing checkpoints. Will be created if it doesn't exist.


*Type*: `string`

*Default*: `"redpanda_dynamodb_checkpoints"`

=== `batch_size`

Maximum number of records to read in a single batch.


*Type*: `int`

*Default*: `1000`

=== `poll_interval`

Time to wait between polling attempts when no records are available.


*Type*: `string`

*Default*: `"1s"`

=== `start_from`

Where to start reading when no checkpoint exists. `trim_horizon` starts from the oldest available record, `latest` starts from new records.


*Type*: `string`

*Default*: `"trim_horizon"`

Options:
`trim_horizon`
, `latest`
.

=== `checkpoint_limit`

Maximum number of messages to process before updating checkpoint.


*Type*: `int`

*Default*: `1000`

=== `max_tracked_shards`

Maximum number of shards to track simultaneously. Prevents memory issues with extremely large tables.


*Type*: `int`

*Default*: `10000`

=== `region`

The AWS region to target.


*Type*: `string`


=== `endpoint`

Allows you to specify a custom endpoint for the AWS API.


*Type*: `string`


=== `tcp`

TCP socket configuration.


*Type*: `object`


=== `tcp.connect_timeout`

Maximum amount of time a dial will wait for a connect to complete. Zero disables.


*Type*: `string`

*Default*: `"0s"`

=== `tcp.keep_alive`

TCP keep-alive probe configuration.


*Type*: `object`


=== `tcp.keep_alive.idle`

Duration the connection must be idle before sending the first keep-alive probe. Zero defaults to 15s. Negative values disable keep-alive probes.


*Type*: `string`

*Default*: `"15s"`

=== `tcp.keep_alive.interval`

Duration between keep-alive probes. Zero defaults to 15s.


*Type*: `string`

*Default*: `"15s"`

=== `tcp.keep_alive.count`

Maximum unanswered keep-alive probes before dropping the connection. Zero defaults to 9.


*Type*: `int`

*Default*: `9`

=== `tcp.tcp_user_timeout`

Maximum time to wait for acknowledgment of transmitted data before killing the connection. Linux-only (kernel 2.6.37+), ignored on other platforms. When enabled, keep_alive.idle must be greater than this value per RFC 5482. Zero disables.


*Type*: `string`

*Default*: `"0s"`

=== `credentials`

Optional manual configuration of AWS credentials to use. More information can be found in xref:guides:cloud/aws.adoc[].


*Type*: `object`


=== `credentials.profile`

A profile from `~/.aws/credentials` to use.


*Type*: `string`


=== `credentials.id`

The ID of credentials to use.


*Type*: `string`


=== `credentials.secret`

The secret for the credentials being used.
[CAUTION]
====
This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info].
====



*Type*: `string`


=== `credentials.token`

The token for the credentials being used, required when using short term credentials.


*Type*: `string`


=== `credentials.from_ec2_role`

Use the credentials of a host EC2 machine configured to assume https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_use_switch-role-ec2.html[an IAM role associated with the instance^].


*Type*: `bool`

Requires version 4.2.0 or newer

=== `credentials.role`

A role ARN to assume.


*Type*: `string`


=== `credentials.role_external_id`

An external ID to provide when assuming a role.


*Type*: `string`



Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also add an example of using this connector?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I see in the other inputs, the closest from an example we have are the common and advanced config at the top of the files.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to include an example with working IAM auth? Postgres has an example pipeline like follows and I think any examples are always helpful: https://docs.redpanda.com/redpanda-connect/components/inputs/postgres_cdc/#example-pipeline

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ require (
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.9 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.9 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.31.0 // indirect
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.31.0
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.11.9 // indirect
Expand Down
Loading