Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aws-kinesis/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/build
142 changes: 142 additions & 0 deletions aws-kinesis/api/aws-kinesis.api
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
public final class com/amplifyframework/kinesis/BuildConfig {
public static final field BUILD_TYPE Ljava/lang/String;
public static final field DEBUG Z
public static final field LIBRARY_PACKAGE_NAME Ljava/lang/String;
public static final field VERSION_NAME Ljava/lang/String;
public fun <init> ()V
}

public final class com/amplifyframework/kinesis/KinesisDataStreams {
public fun <init> (Landroid/content/Context;Ljava/lang/String;Lcom/amplifyframework/auth/AWSCredentialsProvider;Lcom/amplifyframework/kinesis/KinesisDataStreamsOptions;)V
public synthetic fun <init> (Landroid/content/Context;Ljava/lang/String;Lcom/amplifyframework/auth/AWSCredentialsProvider;Lcom/amplifyframework/kinesis/KinesisDataStreamsOptions;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun clearCache-IoAF18A (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun disable ()V
public final fun enable ()V
public final fun flush-IoAF18A (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun getContext ()Landroid/content/Context;
public final fun getCredentialsProvider ()Lcom/amplifyframework/auth/AWSCredentialsProvider;
public final fun getRegion ()Ljava/lang/String;
public final fun record-BWLJW6A ([BLjava/lang/String;Ljava/lang/String;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class com/amplifyframework/kinesis/KinesisDataStreamsOptions {
public static final field Companion Lcom/amplifyframework/kinesis/KinesisDataStreamsOptions$Companion;
public static final fun builder ()Lcom/amplifyframework/kinesis/KinesisDataStreamsOptions$Builder;
public final fun component1 ()J
public final fun component2 ()I
public final fun component3 ()I
public final fun component4 ()Lcom/amplifyframework/recordcache/FlushStrategy;
public static final fun defaults ()Lcom/amplifyframework/kinesis/KinesisDataStreamsOptions;
public fun equals (Ljava/lang/Object;)Z
public final fun getCacheMaxBytes ()J
public final fun getFlushStrategy ()Lcom/amplifyframework/recordcache/FlushStrategy;
public final fun getMaxRecords ()I
public final fun getMaxRetries ()I
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

public final class com/amplifyframework/kinesis/KinesisDataStreamsOptions$Builder {
public final fun build ()Lcom/amplifyframework/kinesis/KinesisDataStreamsOptions;
public final fun cacheMaxBytes (J)Lcom/amplifyframework/kinesis/KinesisDataStreamsOptions$Builder;
public final fun flushStrategy (Lcom/amplifyframework/recordcache/FlushStrategy;)Lcom/amplifyframework/kinesis/KinesisDataStreamsOptions$Builder;
public final fun getCacheMaxBytes ()J
public final fun getFlushStrategy ()Lcom/amplifyframework/recordcache/FlushStrategy;
public final fun getMaxRecords ()I
public final fun getMaxRetries ()I
public final fun maxRecords (I)Lcom/amplifyframework/kinesis/KinesisDataStreamsOptions$Builder;
public final fun maxRetries (I)Lcom/amplifyframework/kinesis/KinesisDataStreamsOptions$Builder;
public final synthetic fun setCacheMaxBytes (J)V
public final synthetic fun setFlushStrategy (Lcom/amplifyframework/recordcache/FlushStrategy;)V
public final synthetic fun setMaxRecords (I)V
public final synthetic fun setMaxRetries (I)V
}

public final class com/amplifyframework/kinesis/KinesisDataStreamsOptions$Companion {
public final fun builder ()Lcom/amplifyframework/kinesis/KinesisDataStreamsOptions$Builder;
public final fun defaults ()Lcom/amplifyframework/kinesis/KinesisDataStreamsOptions;
public final synthetic fun invoke (Lkotlin/jvm/functions/Function1;)Lcom/amplifyframework/kinesis/KinesisDataStreamsOptions;
}

public final class com/amplifyframework/kinesis/KinesisException : com/amplifyframework/AmplifyException {
public fun <init> (Ljava/lang/String;Ljava/lang/String;Ljava/lang/Throwable;)V
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/String;Ljava/lang/Throwable;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
}

public final class com/amplifyframework/recordcache/ClearCacheData {
public fun <init> ()V
public fun <init> (I)V
public synthetic fun <init> (IILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()I
public final fun copy (I)Lcom/amplifyframework/recordcache/ClearCacheData;
public static synthetic fun copy$default (Lcom/amplifyframework/recordcache/ClearCacheData;IILjava/lang/Object;)Lcom/amplifyframework/recordcache/ClearCacheData;
public fun equals (Ljava/lang/Object;)Z
public final fun getRecordsCleared ()I
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

public final class com/amplifyframework/recordcache/FlushData {
public fun <init> ()V
public fun <init> (I)V
public synthetic fun <init> (IILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()I
public final fun copy (I)Lcom/amplifyframework/recordcache/FlushData;
public static synthetic fun copy$default (Lcom/amplifyframework/recordcache/FlushData;IILjava/lang/Object;)Lcom/amplifyframework/recordcache/FlushData;
public fun equals (Ljava/lang/Object;)Z
public final fun getRecordsFlushed ()I
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

public abstract class com/amplifyframework/recordcache/FlushStrategy {
}

public final class com/amplifyframework/recordcache/FlushStrategy$Interval : com/amplifyframework/recordcache/FlushStrategy {
public synthetic fun <init> (JILkotlin/jvm/internal/DefaultConstructorMarker;)V
public synthetic fun <init> (JLkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1-UwyO8pc ()J
public final fun copy-LRDsOJo (J)Lcom/amplifyframework/recordcache/FlushStrategy$Interval;
public static synthetic fun copy-LRDsOJo$default (Lcom/amplifyframework/recordcache/FlushStrategy$Interval;JILjava/lang/Object;)Lcom/amplifyframework/recordcache/FlushStrategy$Interval;
public fun equals (Ljava/lang/Object;)Z
public final fun getInterval-UwyO8pc ()J
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

public final class com/amplifyframework/recordcache/Record {
public fun <init> (JLjava/lang/String;Ljava/lang/String;[BIIJ)V
public final fun component1 ()J
public final fun component2 ()Ljava/lang/String;
public final fun component3 ()Ljava/lang/String;
public final fun component4 ()[B
public final fun component5 ()I
public final fun component6 ()I
public final fun component7 ()J
public final fun copy (JLjava/lang/String;Ljava/lang/String;[BIIJ)Lcom/amplifyframework/recordcache/Record;
public static synthetic fun copy$default (Lcom/amplifyframework/recordcache/Record;JLjava/lang/String;Ljava/lang/String;[BIIJILjava/lang/Object;)Lcom/amplifyframework/recordcache/Record;
public fun equals (Ljava/lang/Object;)Z
public final fun getCreatedAt ()J
public final fun getData ()[B
public final fun getDataSize ()I
public final fun getId ()J
public final fun getPartitionKey ()Ljava/lang/String;
public final fun getRetryCount ()I
public final fun getStreamName ()Ljava/lang/String;
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

public final class com/amplifyframework/recordcache/RecordData {
public fun <init> ()V
public fun <init> (Z)V
public synthetic fun <init> (ZILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun component1 ()Z
public final fun copy (Z)Lcom/amplifyframework/recordcache/RecordData;
public static synthetic fun copy$default (Lcom/amplifyframework/recordcache/RecordData;ZILjava/lang/Object;)Lcom/amplifyframework/recordcache/RecordData;
public fun equals (Ljava/lang/Object;)Z
public final fun getSuccess ()Z
public fun hashCode ()I
public fun toString ()Ljava/lang/String;
}

63 changes: 63 additions & 0 deletions aws-kinesis/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

plugins {
alias(libs.plugins.amplify.android.library)
alias(libs.plugins.kotlin.serialization)
alias(libs.plugins.amplify.publishing)
}

apply(from = rootProject.file("configuration/checkstyle.gradle"))

android {
namespace = "com.amplifyframework.kinesis"
}

dependencies {
implementation(project(":core"))
implementation(project(":aws-core"))

implementation(libs.androidx.appcompat)
implementation(libs.aws.kinesis)
implementation(libs.kotlin.serializationJson)
implementation(libs.androidx.sqlite)
implementation(libs.androidx.sqlite.bundled)
implementation(libs.androidx.workmanager)

testImplementation(libs.test.junit)
testImplementation(libs.test.mockk)
testImplementation(libs.test.mockito.core)
testImplementation(libs.test.mockito.inline)
testImplementation(libs.test.robolectric)
testImplementation(libs.test.androidx.junit)
testImplementation(libs.test.androidx.core)
testImplementation(libs.test.kotlin.coroutines)
testImplementation(libs.test.kotest.assertions)
testImplementation(libs.androidx.sqlite.bundled.jvm)
testImplementation(project(":testutils"))
testImplementation(project(":aws-kinesis"))

androidTestImplementation(project(":testutils"))
androidTestImplementation(libs.test.androidx.core)
androidTestImplementation(project(":aws-auth-cognito"))
androidTestImplementation(libs.test.androidx.runner)
androidTestImplementation(libs.test.kotlin.coroutines)
androidTestImplementation(libs.test.androidx.junit)
androidTestImplementation(libs.test.kotest.assertions)
androidTestImplementation(project(":aws-kinesis"))
androidTestImplementation(libs.androidx.sqlite)

androidTestUtil(libs.test.androidx.orchestrator)
}
4 changes: 4 additions & 0 deletions aws-kinesis/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
POM_ARTIFACT_ID=aws-kinesis
POM_NAME=Amplify Framework for Android - Kinesis Datastreams
POM_DESCRIPTION=Amplify Framework for Android - Kinesis Datastreams
POM_PACKAGING=aar
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.amplifyframework.kinesis

import android.content.Context
import com.amplifyframework.auth.AWSCredentials
import com.amplifyframework.auth.AWSCredentialsProvider
import com.amplifyframework.recordcache.AutoFlushScheduler
import com.amplifyframework.recordcache.ClearCacheResult
import com.amplifyframework.recordcache.FlushResult
import com.amplifyframework.recordcache.FlushStrategy
import com.amplifyframework.recordcache.FlushStrategy.Interval
import com.amplifyframework.core.Amplify
import com.amplifyframework.core.category.CategoryType
import com.amplifyframework.logging.Logger
import com.amplifyframework.recordcache.RecordClient
import com.amplifyframework.recordcache.RecordInput
import com.amplifyframework.recordcache.RecordResult
import com.amplifyframework.recordcache.SQLiteRecordStorage
import kotlin.system.measureTimeMillis

class KinesisDataStreams(
val context: Context,
val region: String,
val credentialsProvider: AWSCredentialsProvider<AWSCredentials>,
options: KinesisDataStreamsOptions = KinesisDataStreamsOptions.defaults()
) {
private val logger: Logger = Amplify.Logging.logger(CategoryType.ANALYTICS, "KinesisDataStreams")

private val recordClient: RecordClient<KinesisException> = RecordClient(
sender = KinesisRecordSender(
credentialsProvider = this@KinesisDataStreams.credentialsProvider,
region = this.region,
maxRetries = options.maxRetries
),
storage = SQLiteRecordStorage(
context = context,
identifier = region,
maxRecords = options.maxRecords,
maxBytes = options.cacheMaxBytes
),
exceptionMapper = { it.toKinesisException() }
)
private val scheduler: AutoFlushScheduler

init {
if (options.flushStrategy is FlushStrategy.Interval) {
scheduler = AutoFlushScheduler(
options.flushStrategy,
client = recordClient
)
} else {
throw IllegalArgumentException("Flush strategy must be interval")
}
}

/**
* Records data to the specified Kinesis stream.
*
* @param data The data to record as byte array
* @param partitionKey The partition key for the record
* @param streamName The name of the Kinesis stream
* @return Result.success(RecordData) on success, or Result.failure with:
* - KinesisException wrapping RecordCacheLimitExceededException (cache full)
* - KinesisException wrapping RecordCacheStorageException (database errors)
*/
suspend fun record(data: ByteArray, partitionKey: String, streamName: String): RecordResult {
logger.verbose("Recording to stream: $streamName")
return logOp(
operation = { recordClient.record(RecordInput(streamName, partitionKey, data)) },
logSuccess = { _, timeMs -> logger.debug("Record completed successfully in ${timeMs}ms") },
logFailure = { error, timeMs -> logger.warn("Record failed in ${timeMs}ms: ${error?.message}") }
)
}

/**
* Flushes all cached records to their respective Kinesis streams.
*
* @return Result.success(FlushData) on success, or Result.failure with:
* - KinesisException wrapping RecordCacheNetworkException (API/network failures)
* - KinesisException wrapping RecordCacheStorageException (database errors)
*/
suspend fun flush(): FlushResult {
logger.info("Starting flush")
return logOp(
operation = { recordClient.flush() },
logSuccess = { data, timeMs -> logger.info("Flush completed successfully in ${timeMs}ms - ${data.recordsFlushed} records flushed") },
logFailure = { error, timeMs -> logger.warn("Flush failed in ${timeMs}ms: ${error?.message}") }
)
}

/**
* Clears all cached records from local storage.
*
* @return Result.success(ClearCacheData) on success, or Result.failure with:
* - KinesisException wrapping RecordCacheStorageException (database errors)
*/
suspend fun clearCache(): ClearCacheResult {
logger.info("Clearing cache")
return logOp(
operation = { recordClient.clearCache() },
logSuccess = { data, timeMs -> logger.info("Clear cache completed successfully in ${timeMs}ms - ${data.recordsCleared} records cleared") },
logFailure = { error, timeMs -> logger.warn("Clear cache failed in ${timeMs}ms: ${error?.message}") }
)
}

/**
* Enables automatic flushing of cached records based on the configured interval.
*/
fun enable() = scheduler.start()

/**
* Disables automatic flushing of cached records.
*/
fun disable() = scheduler.disable()

private suspend inline fun <T> logOp(
operation: suspend () -> Result<T>,
logSuccess: (T, Long) -> Unit,
logFailure: (Throwable?, Long) -> Unit
): Result<T> {
val result: Result<T>
val timeMs = measureTimeMillis {
result = operation()
}
if (result.isSuccess) {
logSuccess(result.getOrThrow(), timeMs)
} else {
logFailure(result.exceptionOrNull(), timeMs)
}
return result
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.amplifyframework.kinesis

import com.amplifyframework.recordcache.FlushStrategy
import kotlin.time.Duration.Companion.seconds

private const val DEFAULT_CACHE_SIZE_LIMIT_IN_BYTES = 500L * 1024 * 1024

data class KinesisDataStreamsOptions internal constructor(
val cacheMaxBytes: Long,
val maxRecords: Int,
val maxRetries: Int,
val flushStrategy: FlushStrategy
) {
companion object {
@JvmStatic
fun builder() = Builder()

@JvmSynthetic
operator fun invoke(func: Builder.() -> Unit) = Builder().apply(func).build()

@JvmStatic
fun defaults() = builder().build()
}

class Builder internal constructor() {
var cacheMaxBytes: Long = DEFAULT_CACHE_SIZE_LIMIT_IN_BYTES
@JvmSynthetic set

var maxRecords: Int = 500
@JvmSynthetic set

var maxRetries: Int = 5
@JvmSynthetic set

var flushStrategy: FlushStrategy = FlushStrategy.Interval(30.seconds)
@JvmSynthetic set

fun cacheMaxBytes(value: Long) = apply { cacheMaxBytes = value }
fun maxRecords(value: Int) = apply { maxRecords = value }
fun maxRetries(value: Int) = apply { maxRetries = value }
fun flushStrategy(value: FlushStrategy) = apply { flushStrategy = value }

fun build() = KinesisDataStreamsOptions(cacheMaxBytes, maxRecords, maxRetries, flushStrategy)
}
}
Loading
Loading