Skip to content

Conversation

@squiidz
Copy link
Contributor

@squiidz squiidz commented Jan 15, 2026

No description provided.

@david-yu
Copy link

Out of curiosity is there a reason why this can't be named amazon_dynamodb_cdc instead of amazon_dyanmodb_streams?

@squiidz
Copy link
Contributor Author

squiidz commented Jan 16, 2026

Out of curiosity is there a reason why this can't be named amazon_dynamodb_cdc instead of amazon_dyanmodb_streams?

We can rename it, but it's referring to how aws calls this feature, so It might be more consistent to call it like that. In the end I don't mind renaming it if you think it's better, just let me know.

@david-yu
Copy link

My personal opinion is that we are leveraging dynamo db streams as documented but amazon_dyanmodb_cdc will make it clear to folks that are looking for a CDC use case this is the right connector. Docs: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html. Since we name all of our cdc connectors with a CDC suffix my recommendation would be to make it consistent with RPCN and document in the connector that this leverages a Dynamo DB streams API.

*Type*: `string`



Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also add an example of using this connector?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I see in the other inputs, the closest from an example we have are the common and advanced config at the top of the files.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to include an example with working IAM auth? Postgres has an example pipeline like follows and I think any examples are always helpful: https://docs.redpanda.com/redpanda-connect/components/inputs/postgres_cdc/#example-pipeline

@squiidz squiidz force-pushed the DynamoDB-CDC-Input branch from c507c72 to 4aa10a6 Compare January 16, 2026 18:01
@josephwoodward
Copy link
Contributor

josephwoodward commented Jan 16, 2026

My personal opinion is that we are leveraging dynamo db streams as documented but amazon_dyanmodb_cdc will make it clear to folks that are looking for a CDC use case this is the right connector. Docs: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html. Since we name all of our cdc connectors with a CDC suffix my recommendation would be to make it consistent with RPCN and document in the connector that this leverages a Dynamo DB streams API.

Yeah, I agree.

We have a convention of having cdc in the suffix of CDC specific components to help discoverability. See the below screenshot from here.

Image

@squiidz squiidz force-pushed the DynamoDB-CDC-Input branch from 4aa10a6 to af77b90 Compare January 16, 2026 18:23
@josephwoodward
Copy link
Contributor

josephwoodward commented Jan 21, 2026

As per our call it'd be good to:

  • Add the table name to the message metadata and add docs highlighting what metadata exists (Operation codes etc, see the PostgreSQL component as an example)
  • Update the integration tests to use more data to exercise batching
  • Follow up PR of a /bench/ suite demonstrating the performance characteristics (similar to sql server connector and migrator)
    Otherwise LGTM, would be good to get eyes from @mmatczuk and/or @Jeffail before merging.

}

// Test shard reader structure
func TestShardReaderStructure(t *testing.T) {
Copy link
Contributor

@josephwoodward josephwoodward Jan 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought (non-blocking): Given this test is verifying we can add data to a map in a struct then read from it, it feels redundant (and no doubt covered by the integration tests) and can be removed. WDYT?


port := resource.GetPort("8000/tcp")

t.Run("ReadInsertEvents", func(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be good to verify the number of records we receive as part of the test (ie, if there's 100 records in the table then we see 100 objects in the output.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

aws_cloudwatch ,metric ,aws_cloudwatch ,3.36.0 ,community ,n ,n ,n
aws_dynamodb ,cache ,AWS DynamoDB ,3.36.0 ,community ,n ,y ,y
aws_dynamodb ,output ,AWS DynamoDB ,3.36.0 ,community ,n ,y ,y
aws_dynamodb_cdc ,input ,aws_dynamodb_cdc ,0.0.0 ,enterprise ,n ,y ,n
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the version

Reads change data capture (CDC) events from DynamoDB Streams
Introduced in version 1.0.0.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to set version

go.mod Outdated
github.com/tigerbeetle/tigerbeetle-go v0.16.61
github.com/timeplus-io/proton-go-driver/v2 v2.1.2
github.com/tmc/langchaingo v0.1.13
github.com/tmc/langchaingo v0.1.14
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we update langchain?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had issues with 0.1.13 missing a dependency.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the problem is real please send a separate PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad it was on my side, I reverted it to 0.1.13

Comment on lines 90 to 122
table string
checkpointTable string
batchSize int32
pollInterval time.Duration
startFrom string
checkpointLimit int
maxTrackedShards int
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I like to embed config objects for clear separation

@mmatczuk
Copy link
Collaborator

Code review

Found 5 issues:

  1. pendingCount never decremented causing incorrect checkpointing (bug: pendingCount incremented but never decremented, only reset)

The pendingCount map in the batcher is incremented when messages are added but never decremented when messages are acknowledged. It's only reset to 0 after checkpointing occurs. This causes the checkpoint logic to malfunction - once the count reaches the checkpoint limit, every subsequent ack will trigger a checkpoint even if only 1 message was processed.

// Check if we should checkpoint BEFORE decrementing
// pendingCount tracks how many messages have been acked since last checkpoint
if b.pendingCount[shardID] >= checkpointer.checkpointLimit {
if err := checkpointer.Set(ctx, shardID, seq); err != nil {
return err
}
b.log.Debugf("Checkpointed shard %s at sequence %s", shardID, seq)
// Reset counter after successful checkpoint
b.pendingCount[shardID] = 0
}
}
return nil

  1. Nil pointer dereference in ack function after Close() (bug: race condition between Close() and pending acks)

The ack closure captures d.recordBatcher and d.checkpointer by reference through the receiver, but these are set to nil in Close(). If Close() is called while messages are pending acknowledgment, the ack function will panic with a nil pointer dereference when trying to call methods on these nil pointers.

ackFunc := func(ctx context.Context, err error) error {
if err != nil {
d.log.Warnf("Batch nacked: %w", err)
d.recordBatcher.RemoveMessages(allMessages)
return nil
}
// Mark messages as acked and checkpoint if needed
return d.recordBatcher.AckMessages(ctx, d.checkpointer, allMessages)
}

  1. Missing metrics/observability (AGENTS.md says "Observability: Metrics, logs (warnings/errors only during issues), tracing hooks" and CONTRIBUTING.md says "Exposes useful metrics for debugging that avoid excessive cardinality")

The implementation lacks metrics for monitoring component health and performance. The comparable Kinesis input has metrics like clientShardsMetric and shardsStolenMetric. This component should have similar metrics for shard count, checkpoint operations, and other key performance indicators.

checkpointer *dynamoDBCDCCheckpointer
recordBatcher *dynamoDBCDCRecordBatcher
shardReaders map[string]*dynamoDBShardReader
mu sync.Mutex
closed bool
log *service.Logger
}
type dynamoDBShardReader struct {
shardID string
iterator *string
exhausted bool
}
func newDynamoDBCDCInputFromConfig(conf *service.ParsedConfig, mgr *service.Resources) (*dynamoDBCDCInput, error) {
awsConf, err := GetSession(context.Background(), conf)
if err != nil {
return nil, err
}
table, err := conf.FieldString("table")

  1. Iterator not preserved on GetRecords error, violating Kinesis pattern (input_kinesis.go says "IMPORTANT TO NOTE: The returned shard iterator will always be the input iterator when the error parameter is nil" and "Do NOT modify this method without preserving this behaviour")

When GetRecords fails, the code marks the shard as exhausted without preserving the iterator. The Kinesis implementation explicitly documents that the iterator should be returned unchanged on error to allow retries from the same position. This can cause data loss on transient errors.

getRecords, err := d.streamsClient.GetRecords(ctx, &dynamodbstreams.GetRecordsInput{
ShardIterator: reader.iterator,
Limit: aws.Int32(d.batchSize),
})
if err != nil {
d.log.Errorf("Failed to get records from shard %s: %w", reader.shardID, err)
d.mu.Lock()
reader.exhausted = true
d.mu.Unlock()
continue

Reference pattern:

// IMPORTANT TO NOTE: The returned shard iterator (second return parameter) will
// always be the input iterator when the error parameter is nil, therefore
// replacing the current iterator with this return param should always be safe.
//
// Do NOT modify this method without preserving this behaviour.
func (k *kinesisReader) getRecords(info streamInfo, shardIter string) ([]types.Record, string, error) {
res, err := k.svc.GetRecords(k.ctx, &kinesis.GetRecordsInput{
StreamARN: &info.arn,
Limit: &awsKinesisDefaultLimit,
ShardIterator: &shardIter,
})
if err != nil {
return nil, shardIter, err
}
nextIter := ""
if res.NextShardIterator != nil {
nextIter = *res.NextShardIterator
}
return res.Records, nextIter, nil
}

  1. Missing example configuration with metadata documentation (PR reviewer requested: "Could we also add an example of using this connector?" and "Add the table name to the message metadata and add docs highlighting what metadata exists")

The documentation lacks a practical usage example and metadata documentation. CONTRIBUTING.md requires "examples, troubleshooting guidance, and known pitfalls" for certified connectors.

= aws_dynamodb_cdc
:type: input
:status: beta
:categories: ["Services","AWS"]
////
THIS FILE IS AUTOGENERATED!

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

recordBatcher *dynamoDBCDCRecordBatcher

shardReaders map[string]*dynamoDBShardReader
mu sync.Mutex
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth mentioning what it protects, the convention is to put mu on top of var group or use XMu for protection of X

Comment on lines 130 to 133
checkpointTable, err := conf.FieldString("checkpoint_table")
if err != nil {
return nil, err
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we try to put that into if like if X.checkpointTable, err = conf.FieldString("checkpoint_table"); err != nil

Comment on lines 120 to 123
awsConf, err := GetSession(context.Background(), conf)
if err != nil {
return nil, err
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move that to Connect and remove the awsConf field

Comment on lines 247 to 248
var iteratorType types.ShardIteratorType
var sequenceNumber *string
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Someone should tell Claude to group vars

Comment on lines 231 to 232
d.mu.Lock()
defer d.mu.Unlock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just lock d.shardReaders access not i/o with Amazon

Comment on lines 316 to 318
d.mu.Lock()
reader.exhausted = true
d.mu.Unlock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really need better / more targeted sync primitives unless we very explicitly explain that mu protects map and its contents. Then why iterator is not protected?

}

// Initialize checkpointer
d.checkpointer, err = newDynamoDBCDCCheckpointer(ctx, d.dynamoClient, d.checkpointTable, *d.streamArn, d.checkpointLimit, d.log)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not use ctx passed to Connect, search for shutdown.Signaller

"github.com/redpanda-data/benthos/v4/public/service"
)

// dynamoDBCDCRecordBatcher tracks messages and their checkpoints for DynamoDB CDC.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd appreciate if we could add more docs here about what it does and why.

type dynamoDBCDCRecordBatcher struct {
mu sync.Mutex
messageTracker map[*service.Message]*messageCheckpoint
log *service.Logger
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Log in the middle, use some grouping or move it down.

// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be treated as pkg doc - no separation

// Multiple goroutines should be able to read concurrently
done := make(chan bool, 3)

for i := 0; i < 3; i++ {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's generally better to use range

@mmatczuk
Copy link
Collaborator

@squiidz I think we should work on locking granularity, usually we use a channel internally for sending batches, it also gives you data preloading.

@squiidz squiidz force-pushed the DynamoDB-CDC-Input branch from 94512e4 to f47e5a5 Compare January 22, 2026 01:59
@squiidz squiidz force-pushed the DynamoDB-CDC-Input branch from bbecf86 to 3d12324 Compare January 22, 2026 14:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants