Skip to content

Commit 7756836

Browse files
committed
CM-1630: Fixed by properly registering onComplete handlers.
1 parent 54d508e commit 7756836

File tree

1 file changed

+16
-17
lines changed

1 file changed

+16
-17
lines changed

comet-java-client/src/main/java/ml/comet/experiment/impl/BaseExperimentAsync.java

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,11 @@ void logAssetFolder(@NonNull File folder, boolean logFilePath, boolean recursive
313313
Observable<LogDataResponse> observable =
314314
Observable.fromStream(assets)
315315
.flatMap(asset -> Observable.fromSingle(
316-
sendAssetAsync(getRestApiClient()::logAsset, asset, onComplete)), true);
316+
sendAssetAsync(getRestApiClient()::logAsset, asset)), true);
317+
318+
if (onComplete.isPresent()) {
319+
observable = observable.doFinally(onComplete.get());
320+
}
317321

318322
// subscribe for processing results
319323
observable
@@ -450,7 +454,11 @@ void logAsset(@NonNull final Asset asset, Optional<Action> onComplete) {
450454
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
451455
private <T extends Asset> void logAsset(final BiFunction<T, String, Single<LogDataResponse>> func,
452456
@NonNull final T asset, Optional<Action> onComplete) {
453-
Single<LogDataResponse> single = this.sendAssetAsync(func, asset, onComplete);
457+
Single<LogDataResponse> single = this.sendAssetAsync(func, asset);
458+
459+
if (onComplete.isPresent()) {
460+
single = single.doFinally(onComplete.get());
461+
}
454462

455463
// subscribe to get operation completed
456464
single.subscribe(
@@ -467,30 +475,21 @@ private <T extends Asset> void logAsset(final BiFunction<T, String, Single<LogDa
467475
* Attempts to send given {@link Asset} or its subclass asynchronously.
468476
* This method will wrap send operation into {@link Single} and transparently log any errors that may happen.
469477
*
470-
* @param func the function to be invoked to send asset to the backend.
471-
* @param asset the {@link Asset} or subclass to be sent.
472-
* @param onComplete The optional action to be invoked when this operation
473-
* asynchronously completes. Can be {@code null} if not interested in completion signal.
474-
* @param <T> the {@link Asset} or its subclass.
478+
* @param func the function to be invoked to send asset to the backend.
479+
* @param asset the {@link Asset} or subclass to be sent.
480+
* @param <T> the {@link Asset} or its subclass.
475481
* @return the {@link Single} which can be used to subscribe for operation results.
476482
*/
477-
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
478483
private <T extends Asset> Single<LogDataResponse> sendAssetAsync(
479-
final BiFunction<T, String, Single<LogDataResponse>> func,
480-
@NonNull final T asset, Optional<Action> onComplete) {
484+
final BiFunction<T, String, Single<LogDataResponse>> func, @NonNull final T asset) {
481485

482-
Single<LogDataResponse> single = validateAndGetExperimentKey()
486+
return validateAndGetExperimentKey()
483487
.subscribeOn(Schedulers.io())
484-
.concatMap(experimentKey -> func.apply(asset, experimentKey))
488+
.concatMap(experimentKey1 -> func.apply(asset, experimentKey1))
485489
.doOnSuccess(logDataResponse ->
486490
AsyncDataResponseLogger.checkAndLog(logDataResponse, getLogger(), asset))
487491
.doOnError(throwable ->
488492
getLogger().error(getString(FAILED_TO_SEND_LOG_ASSET_REQUEST, asset), throwable));
489-
490-
if (onComplete.isPresent()) {
491-
return single.doFinally(onComplete.get());
492-
}
493-
return single;
494493
}
495494

496495
/**

0 commit comments

Comments
 (0)