Skip to content
Open
2 changes: 2 additions & 0 deletions aws-datastore/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ apply(from = rootProject.file("configuration/publishing.gradle"))
group = properties["POM_GROUP"].toString()

dependencies {
compileOnly(libs.rxlint)

implementation(project(":core"))
implementation(project(":aws-core"))
implementation(project(":aws-api-appsync"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.amplifyframework.datastore;

import android.annotation.SuppressLint;
import android.content.Context;
import androidx.annotation.NonNull;
import androidx.annotation.Nullable;
Expand Down Expand Up @@ -65,6 +66,8 @@
import java.util.concurrent.TimeUnit;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;

/**
Expand Down Expand Up @@ -98,6 +101,12 @@ public final class AWSDataStorePlugin extends DataStorePlugin<Void> {

private final ReachabilityMonitor reachabilityMonitor;

// Subscriptions that should be disposed when datastore is stopped
private final CompositeDisposable startedDisposables = new CompositeDisposable();

// Subscriptions that have the same lifetime as the plugin
private final CompositeDisposable pluginDisposables = new CompositeDisposable();

private AWSDataStorePlugin(
@NonNull ModelProvider modelProvider,
@NonNull SchemaRegistry schemaRegistry,
Expand Down Expand Up @@ -286,7 +295,11 @@ private void configure(Context context, DataStoreConfiguration configuration) {

reachabilityMonitor.configure(context);

waitForInitialization().subscribe(this::observeNetworkStatus);
Disposable subscription = waitForInitialization().subscribe(
this::observeNetworkStatus,
error -> LOG.error("Datastore did not initialize", error)
);
pluginDisposables.add(subscription);
}

private void publishNetworkStatusEvent(boolean active) {
Expand All @@ -295,16 +308,27 @@ private void publishNetworkStatusEvent(boolean active) {
}

private void observeNetworkStatus() {
reachabilityMonitor.getObservable()
.subscribe(this::publishNetworkStatusEvent);
Disposable subscription = reachabilityMonitor.getObservable()
.subscribe(
this::publishNetworkStatusEvent,
error -> LOG.warn("Unable to subscribe to network status events", error)
);
pluginDisposables.add(subscription);
}

@SuppressLint("CheckResult")
@WorkerThread
@Override
public void initialize(@NonNull Context context) throws AmplifyException {
try {
initializeStorageAdapter(context)
.blockingAwait(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
boolean initialized = initializeStorageAdapter(context)
.blockingAwait(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
if (!initialized) {
throw new DataStoreException(
"Storage adapter did not initialize within allotted timeout",
AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION
);
}
} catch (Throwable initError) {
throw new AmplifyException(
"Failed to initialize the local storage adapter for the DataStore plugin.",
Expand All @@ -327,8 +351,8 @@ private Completable initializeStorageAdapter(Context context) {
}

private Completable waitForInitialization() {
return Completable.fromAction(() -> categoryInitializationsPending.await())
.timeout(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS)
return Completable.fromAction(categoryInitializationsPending::await)
.timeout(LIFECYCLE_TIMEOUT_MS, TimeUnit.MILLISECONDS, Schedulers.io())
.subscribeOn(Schedulers.io())
.doOnComplete(() -> LOG.info("DataStore plugin initialized."))
.doOnError(error -> LOG.error("DataStore initialization timed out.", error));
Expand All @@ -339,27 +363,30 @@ private Completable waitForInitialization() {
*/
@Override
public void start(@NonNull Action onComplete, @NonNull Consumer<DataStoreException> onError) {
waitForInitialization()
Disposable subscription = waitForInitialization()
.andThen(orchestrator.start())
.subscribeOn(Schedulers.io())
.subscribe(
onComplete::call,
error -> onError.accept(new DataStoreException("Failed to start DataStore.", error, "Retry."))
);
startedDisposables.add(subscription);
}

/**
* {@inheritDoc}
*/
@Override
public void stop(@NonNull Action onComplete, @NonNull Consumer<DataStoreException> onError) {
waitForInitialization()
startedDisposables.dispose();
Disposable subscription = waitForInitialization()
.andThen(orchestrator.stop())
.subscribeOn(Schedulers.io())
.subscribe(
onComplete::call,
error -> onError.accept(new DataStoreException("Failed to stop DataStore.", error, "Retry."))
);
startedDisposables.add(subscription);
}

/**
Expand All @@ -372,12 +399,19 @@ public void stop(@NonNull Action onComplete, @NonNull Consumer<DataStoreExceptio
*/
@Override
public void clear(@NonNull Action onComplete, @NonNull Consumer<DataStoreException> onError) {
stop(() -> Completable.create(emitter -> sqliteStorageAdapter.clear(emitter::onComplete, emitter::onError))
.subscribeOn(Schedulers.io())
.subscribe(onComplete::call,
throwable -> onError.accept(new DataStoreException("Clear operation failed",
throwable, AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION))),
onError);
stop(
() -> {
Disposable completable = Completable.create(
emitter -> sqliteStorageAdapter.clear(emitter::onComplete, emitter::onError)
)
.subscribeOn(Schedulers.io())
.subscribe(onComplete::call,
throwable -> onError.accept(new DataStoreException("Clear operation failed",
throwable, AmplifyException.REPORT_BUG_TO_AWS_SUGGESTION)));
pluginDisposables.add(completable);
},
onError
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,12 @@ private void onApiSyncFailure(Throwable exception) {
return;
}
LOG.warn("API sync failed - transitioning to LOCAL_ONLY.", exception);
Completable.fromAction(this::transitionToLocalOnly)
.doOnError(error -> LOG.warn("Transition to LOCAL_ONLY failed.", error))
.subscribe();
Disposable subscription = Completable.fromAction(this::transitionToLocalOnly)
.subscribe(
() -> { /* no-op */ },
error -> LOG.warn("Transition to LOCAL_ONLY failed.", error)
);
disposables.add(subscription);
}

private void disposeNetworkChanges() {
Expand All @@ -422,13 +425,16 @@ private void monitorNetworkChanges() {
monitorNetworkChangesDisposable = reachabilityMonitor.getObservable()
.skip(1) // We skip the current online state, we only care about transitions
.filter(ignore -> !State.STOPPED.equals(currentState.get()))
.subscribe(isOnline -> {
if (isOnline) {
transitionToApiSync();
} else {
transitionToLocalOnly();
}
});
.subscribe(
isOnline -> {
if (isOnline) {
transitionToApiSync();
} else {
transitionToLocalOnly();
}
},
error -> LOG.warn("Error observing network changes", error)
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.TimeUnit;

import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.schedulers.Schedulers;

/**
* Class that defines inner classes and interfaces related to retry strategies.
Expand Down Expand Up @@ -71,7 +72,11 @@ public boolean retryHandler(int attemptNumber, Throwable throwable) {
} else {
final long waitTimeSeconds = Double.valueOf(Math.pow(2, attemptNumber % maxExponent)).longValue();
LOG.debug("Waiting " + waitTimeSeconds + " seconds before retrying");
Completable.timer(TimeUnit.SECONDS.toMillis(waitTimeSeconds), TimeUnit.MILLISECONDS).blockingAwait();
Completable.timer(
TimeUnit.SECONDS.toMillis(waitTimeSeconds),
TimeUnit.MILLISECONDS,
Schedulers.io()
).blockingAwait();
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,10 @@ void startDrainingMutationBuffer() {
.flatMapCompletable(this::mergeEvent)
.doOnError(failure -> LOG.warn("Reading subscriptions buffer has failed.", failure))
.doOnComplete(() -> LOG.warn("Reading from subscriptions buffer is completed."))
.subscribe()
.subscribe(
() -> LOG.info("Subscription data buffer processing complete"),
error -> LOG.warn("Error draining subscription data buffer", error)
)
);
}

Expand Down
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ oauth2 = "0.26.0"
okhttp = "5.0.0-alpha.11"
robolectric = "4.7"
rxjava = "3.0.6"
rxlint = "1.7.8"
slf4j = "2.0.6"
sqlcipher = "4.5.4"
tensorflow = "2.0.0"
Expand Down Expand Up @@ -91,6 +92,7 @@ maplibre-sdk = { module = "org.maplibre.gl:android-sdk", version.ref = "maplibre
oauth2 = { module = "com.google.auth:google-auth-library-oauth2-http", version.ref = "oauth2" }
okhttp = { module = "com.squareup.okhttp3:okhttp", version.ref = "okhttp" }
rxjava = { module = "io.reactivex.rxjava3:rxjava", version.ref = "rxjava" }
rxlint = { module = "nl.littlerobots.rxlint:rxlint", version.ref = "rxlint" }
slf4j = { module = "org.slf4j:slf4j-api", version.ref = "slf4j"}
sqlcipher= { module = "net.zetetic:android-database-sqlcipher", version.ref = "sqlcipher" }
tensorflow = { module = "org.tensorflow:tensorflow-lite", version.ref="tensorflow" }
Expand Down