Skip to content

Add actual kinesis datastreams client#3191

Merged
jvh-aws merged 16 commits intofeat/kinesisfrom
jv/add-kinesis-datastreams-client-actual-client
Feb 6, 2026
Merged

Add actual kinesis datastreams client#3191
jvh-aws merged 16 commits intofeat/kinesisfrom
jv/add-kinesis-datastreams-client-actual-client

Conversation

@jvh-aws
Copy link
Contributor

@jvh-aws jvh-aws commented Feb 3, 2026

  • PR title and description conform to Pull Request guidelines.

Issue #, if available:

Description of changes:

How did you test these changes?
(Please add a line here how the changes were tested)

Documentation update required?

  • No
  • Yes (Please include a PR link for the documentation update)

General Checklist

  • Added Unit Tests
  • Added Integration Tests
  • Security oriented best practices and standards are followed (e.g. using input sanitization, principle of least privilege, etc)
  • Ensure commit message has the appropriate scope (e.g fix(storage): message, feat(auth): message, chore(all): message)

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

Base automatically changed from jv/add-kinesis-datastreams-client-autoflush-and-recordclient to feat/kinesis February 5, 2026 15:10
@jvh-aws jvh-aws marked this pull request as ready for review February 5, 2026 15:18
@jvh-aws jvh-aws requested review from a team as code owners February 5, 2026 15:19
Copy link
Member

@mattcreaser mattcreaser left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Various thoughts below but none of them are blocking.


private const val DEFAULT_CACHE_SIZE_LIMIT_IN_BYTES = 500L * 1024 * 1024

data class KinesisDataStreamsOptions internal constructor(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will need kdoc on all the public APIs. This can be done in a subsequent PR.

logger.info("Starting flush")
return logOp(
operation = { recordClient.flush() },
logSuccess = { data, timeMs -> logger.info("Flush completed successfully in ${timeMs}ms - ${data.recordsFlushed} records flushed") },
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For non-constant log messages please use the lazy evaluation versions (e.g. logger.info { "..." })

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, we'll rework logging in a follow up.

import com.amplifyframework.AmplifyException
import com.amplifyframework.recordcache.RecordCacheException

class KinesisException(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To discuss later: using sealed exception hierarchy for better discoverability

import com.amplifyframework.recordcache.SQLiteRecordStorage
import kotlin.system.measureTimeMillis

class KinesisDataStreams(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see this is what what proposed in the API review but let's revisit this class name prior to release to ensure it aligns with how we are thinking about things for vNext.

@jvh-aws jvh-aws merged commit a179c55 into feat/kinesis Feb 6, 2026
@jvh-aws jvh-aws deleted the jv/add-kinesis-datastreams-client-actual-client branch February 6, 2026 08:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants