Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,24 @@ public class CloudRecordingClientImpl extends CloudRecordingClient {

private final MixScenario mixScenario;

private final static Integer MAX_ATTEMPTS = 3;

private final static String pathPrefix = "/v1/apps/%s/cloud_recording";

protected CloudRecordingClientImpl(Context context) {
this.acquireResourceAPI = new AcquireResourceAPI(context);
this.queryResourceAPI = new QueryResourceAPI(context);
this.startResourceAPI = new StartResourceAPI(context);
this.stopResourceAPI = new StopResourceAPI(context);
this.updateResourceAPI = new UpdateResourceAPI(context);
this.acquireResourceAPI = new AcquireResourceAPI(context, pathPrefix, MAX_ATTEMPTS);
this.queryResourceAPI = new QueryResourceAPI(context, pathPrefix, MAX_ATTEMPTS);
this.startResourceAPI = new StartResourceAPI(context, pathPrefix, MAX_ATTEMPTS);
this.stopResourceAPI = new StopResourceAPI(context, pathPrefix, MAX_ATTEMPTS);
this.updateResourceAPI = new UpdateResourceAPI(context, pathPrefix, MAX_ATTEMPTS);

this.individualScenario = new IndividualScenarioImpl(acquireResourceAPI, queryResourceAPI, startResourceAPI,
updateResourceAPI, stopResourceAPI);
this.webScenario = new WebScenarioImpl(acquireResourceAPI, queryResourceAPI, startResourceAPI, updateResourceAPI,
this.webScenario = new WebScenarioImpl(acquireResourceAPI, queryResourceAPI, startResourceAPI,
updateResourceAPI,
stopResourceAPI);
this.mixScenario = new MixScenarioImpl(acquireResourceAPI, queryResourceAPI, startResourceAPI, updateResourceAPI,
this.mixScenario = new MixScenarioImpl(acquireResourceAPI, queryResourceAPI, startResourceAPI,
updateResourceAPI,
stopResourceAPI);
}

Expand All @@ -59,17 +65,17 @@ public Mono<QueryResourceRes> query(String resourceId, String sid, CloudRecordin
}

public Mono<StopResourceRes> stop(String resourceId, String sid, CloudRecordingModeEnum mode,
StopResourceReq request) {
StopResourceReq request) {
return stopResourceAPI.handle(resourceId, sid, mode, request);
}

public Mono<UpdateResourceRes> update(String resourceId, String sid, CloudRecordingModeEnum mode,
UpdateResourceReq request) {
UpdateResourceReq request) {
return updateResourceAPI.handle(resourceId, sid, mode, request);
}

public Mono<UpdateLayoutResourceRes> updateLayout(String resourceId, String sid, CloudRecordingModeEnum mode,
UpdateLayoutResourceReq request) {
UpdateLayoutResourceReq request) {
return updateResourceAPI.handleLayout(resourceId, sid, mode, request);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package io.agora.rest.services.cloudrecording.api;

import io.agora.rest.core.Context;
import io.agora.rest.exception.AgoraNeedRetryException;
import io.agora.rest.services.cloudrecording.api.req.AcquireResourceReq;
import io.agora.rest.services.cloudrecording.api.res.AcquireResourceRes;
import io.netty.handler.codec.http.HttpMethod;
import reactor.core.publisher.Mono;

public class AcquireResourceAPI {
public class AcquireResourceAPI extends BaseAPI {

private final Context context;

public AcquireResourceAPI(Context context) {
this.context = context;
public AcquireResourceAPI(Context context, String pathPrefix, Integer maxAttempts) {
super(context, pathPrefix, maxAttempts);
}

public Mono<AcquireResourceRes> handle(AcquireResourceReq request) {
String path = String.format("/v1/apps/%s/cloud_recording/acquire",
this.context.getAgoraConfig().getAppId());
return this.context.sendRequest(path, HttpMethod.POST, request, AcquireResourceRes.class);
String path = String.format("%s/acquire",
this.pathPrefix);
return this.context.sendRequest(path, HttpMethod.POST, request, AcquireResourceRes.class)
.retryWhen(customRetry(e -> e instanceof AgoraNeedRetryException));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.agora.rest.services.cloudrecording.api;

import java.time.Duration;
import java.util.function.Predicate;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.agora.rest.core.Context;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;

public abstract class BaseAPI {
protected final Context context;

protected final Logger logger = LoggerFactory.getLogger(getClass());

protected final Integer maxAttempts;

protected final String pathPrefix;

public BaseAPI(Context context, String pathPrefix, Integer maxAttempts) {
this.context = context;
this.pathPrefix = pathPrefix;
this.maxAttempts = maxAttempts;
}

protected RetryBackoffSpec customRetry(Predicate<Throwable> retryPredicate) {
return Retry.backoff(maxAttempts, Duration.ofSeconds(1)) // Maximum 5 retry attempts, initial 1-second interval
.maxBackoff(Duration.ofSeconds(5)) // Maximum 5-second interval
.filter(retryPredicate) // Retry condition
.doBeforeRetry(retrySignal -> {
long retryCount = retrySignal.totalRetries() + 1; // Current retry attempt number
Duration nextBackoff = Duration.ofSeconds(retryCount); // Next retry interval
logger.warn("Retry attempt: {}, next backoff: {}", retryCount, nextBackoff);
}).onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {
logger.error("Retry exhausted: {}", retrySignal.totalRetries());
return retrySignal.failure();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,23 @@

import io.agora.rest.core.Context;
import io.agora.rest.exception.AgoraJsonException;
import io.agora.rest.exception.AgoraNeedRetryException;
import io.agora.rest.services.cloudrecording.api.res.QueryResourceRes;
import io.agora.rest.services.cloudrecording.enums.CloudRecordingModeEnum;
import io.netty.handler.codec.http.HttpMethod;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class QueryResourceAPI {
public class QueryResourceAPI extends BaseAPI {

private static final Logger logger = LoggerFactory.getLogger(QueryResourceAPI.class);

private final Context context;

public QueryResourceAPI(Context context) {
this.context = context;
public QueryResourceAPI(Context context, String pathPrefix, Integer maxAttempts) {
super(context, pathPrefix, maxAttempts);
}

public Mono<QueryResourceRes> handle(String resourceId, String sid, CloudRecordingModeEnum mode) {
String path = String.format("/v1/apps/%s/cloud_recording/resourceid/%s/sid/%s/mode/%s/query",
this.context.getAgoraConfig().getAppId(), resourceId, sid, mode.getMode());
String path = String.format("%s/resourceid/%s/sid/%s/mode/%s/query",
this.pathPrefix, resourceId, sid, mode.getMode());
return this.context.sendRequest(path, HttpMethod.GET, null, QueryResourceRes.class)
.retryWhen(customRetry(e -> e instanceof AgoraNeedRetryException))
.handle((resp, sink) -> {
try {
resp.setServerResponse(mode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,20 @@
import io.agora.rest.services.cloudrecording.api.res.StartResourceRes;
import io.agora.rest.services.cloudrecording.enums.CloudRecordingModeEnum;
import io.netty.handler.codec.http.HttpMethod;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;

import java.time.Duration;
import java.util.function.Predicate;
public class StartResourceAPI extends BaseAPI {


public class StartResourceAPI {

private static final Logger logger = LoggerFactory.getLogger(StartResourceAPI.class);

private final Context context;

private final static int MAX_ATTEMPTS = 3;

public StartResourceAPI(Context context) {
this.context = context;
public StartResourceAPI(Context context, String pathPrefix, Integer maxAttempts) {
super(context, pathPrefix, maxAttempts);
}

public Mono<StartResourceRes> handle(String resourceId, CloudRecordingModeEnum mode, StartResourceReq request) {
String path = String.format("/v1/apps/%s/cloud_recording/resourceid/%s/mode/%s/start",
this.context.getAgoraConfig().getAppId(), resourceId, mode.getMode());
String path = String.format("%s/resourceid/%s/mode/%s/start",
this.pathPrefix, resourceId, mode.getMode());

return this.context.sendRequest(path, HttpMethod.POST, request, StartResourceRes.class)
.retryWhen(customRetry(MAX_ATTEMPTS, e -> e instanceof AgoraNeedRetryException));
}


private RetryBackoffSpec customRetry(int maxAttempts, Predicate<Throwable> retryPredicate) {
return Retry.backoff(maxAttempts, Duration.ofSeconds(1)) // Maximum 5 retry attempts, initial 1-second interval
.maxBackoff(Duration.ofSeconds(5)) // Maximum 5-second interval
.filter(retryPredicate) // Retry condition
.doBeforeRetry(retrySignal -> {
long retryCount = retrySignal.totalRetries() + 1; // Current retry attempt number
Duration nextBackoff = Duration.ofSeconds(retryCount); // Next retry interval
logger.warn("Retry attempt: {}, next backoff: {}", retryCount, nextBackoff);
}).onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {
logger.error("Retry exhausted: {}", retrySignal.totalRetries());
return retrySignal.failure();
});
.retryWhen(customRetry(e -> e instanceof AgoraNeedRetryException));
}

}
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
package io.agora.rest.services.cloudrecording.api;

import io.agora.rest.core.Context;
import io.agora.rest.exception.AgoraNeedRetryException;
import io.agora.rest.services.cloudrecording.api.req.StopResourceReq;
import io.agora.rest.services.cloudrecording.api.res.StopResourceRes;
import io.agora.rest.services.cloudrecording.enums.CloudRecordingModeEnum;
import io.netty.handler.codec.http.HttpMethod;
import reactor.core.publisher.Mono;

public class StopResourceAPI {
public class StopResourceAPI extends BaseAPI {

private final Context context;

public StopResourceAPI(Context context) {
this.context = context;
public StopResourceAPI(Context context, String pathPrefix, Integer maxAttempts) {
super(context, pathPrefix, maxAttempts);
}

public Mono<StopResourceRes> handle(String resourceId, String sid, CloudRecordingModeEnum mode,
StopResourceReq request) {
String path = String.format("/v1/apps/%s/cloud_recording/resourceid/%s/sid/%s/mode/%s/stop",
this.context.getAgoraConfig().getAppId(), resourceId, sid, mode.getMode());
return this.context.sendRequest(path, HttpMethod.POST, request, StopResourceRes.class);
StopResourceReq request) {
String path = String.format("%s/resourceid/%s/sid/%s/mode/%s/stop",
this.pathPrefix, resourceId, sid, mode.getMode());
return this.context.sendRequest(path, HttpMethod.POST, request, StopResourceRes.class)
.retryWhen(customRetry(e -> e instanceof AgoraNeedRetryException));
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.agora.rest.services.cloudrecording.api;

import io.agora.rest.core.Context;
import io.agora.rest.exception.AgoraNeedRetryException;
import io.agora.rest.services.cloudrecording.api.req.UpdateLayoutResourceReq;
import io.agora.rest.services.cloudrecording.api.req.UpdateResourceReq;
import io.agora.rest.services.cloudrecording.api.res.UpdateLayoutResourceRes;
Expand All @@ -9,31 +10,31 @@
import io.netty.handler.codec.http.HttpMethod;
import reactor.core.publisher.Mono;

public class UpdateResourceAPI {
public class UpdateResourceAPI extends BaseAPI {

private final Context context;

public UpdateResourceAPI(Context context) {
this.context = context;
public UpdateResourceAPI(Context context, String pathPrefix, Integer maxAttempts) {
super(context, pathPrefix, maxAttempts);
}

public Mono<UpdateResourceRes> handle(String resourceId, String sid, CloudRecordingModeEnum mode,
UpdateResourceReq request) {
String path = String.format("/v1/apps/%s/cloud_recording/resourceid/%s/sid/%s/mode/%s/update",
this.context.getAgoraConfig().getAppId(),
UpdateResourceReq request) {
String path = String.format("%s/resourceid/%s/sid/%s/mode/%s/update",
this.pathPrefix,
resourceId,
sid,
mode.getMode());
return this.context.sendRequest(path, HttpMethod.POST, request, UpdateResourceRes.class);
return this.context.sendRequest(path, HttpMethod.POST, request, UpdateResourceRes.class)
.retryWhen(customRetry(e -> e instanceof AgoraNeedRetryException));
}

public Mono<UpdateLayoutResourceRes> handleLayout(String resourceId, String sid, CloudRecordingModeEnum mode,
UpdateLayoutResourceReq request) {
String path = String.format("/v1/apps/%s/cloud_recording/resourceid/%s/sid/%s/mode/%s/updateLayout",
this.context.getAgoraConfig().getAppId(),
UpdateLayoutResourceReq request) {
String path = String.format("%s/resourceid/%s/sid/%s/mode/%s/updateLayout",
this.pathPrefix,
resourceId,
sid,
mode.getMode());
return this.context.sendRequest(path, HttpMethod.POST, request, UpdateLayoutResourceRes.class);
return this.context.sendRequest(path, HttpMethod.POST, request, UpdateLayoutResourceRes.class)
.retryWhen(customRetry(e -> e instanceof AgoraNeedRetryException));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,18 @@ public class ConvoAIClientImpl extends ConvoAIClient {

private final static String globalPrefixTpl = "/api/conversational-ai-agent/v2/projects/%s";

private final static Integer MAX_ATTEMPTS = 3;

protected ConvoAIClientImpl(Context context, ConvoAIServiceRegionEnum serviceRegionEnum) {
String pathPrefix = getPathPrefix(context, serviceRegionEnum);
joinConvoAIAPI = new JoinConvoAIAPI(context, pathPrefix);
leaveConvoAIAPI = new LeaveConvoAIAPI(context, pathPrefix);
listConvoAIAPI = new ListConvoAIAPI(context, pathPrefix);
queryConvoAIAPI = new QueryConvoAIAPI(context, pathPrefix);
updateConvoAIAPI = new UpdateConvoAIAPI(context, pathPrefix);
historyConvoAIAPI = new HistoryConvoAIAPI(context, pathPrefix);
interruptConvoAIAPI = new InterruptConvoAIAPI(context, pathPrefix);
speakConvoAIAPI = new SpeakConvoAIAPI(context, pathPrefix);
joinConvoAIAPI = new JoinConvoAIAPI(context, pathPrefix, MAX_ATTEMPTS);
leaveConvoAIAPI = new LeaveConvoAIAPI(context, pathPrefix, MAX_ATTEMPTS);
listConvoAIAPI = new ListConvoAIAPI(context, pathPrefix, MAX_ATTEMPTS);
queryConvoAIAPI = new QueryConvoAIAPI(context, pathPrefix, MAX_ATTEMPTS);
updateConvoAIAPI = new UpdateConvoAIAPI(context, pathPrefix, MAX_ATTEMPTS);
historyConvoAIAPI = new HistoryConvoAIAPI(context, pathPrefix, MAX_ATTEMPTS);
interruptConvoAIAPI = new InterruptConvoAIAPI(context, pathPrefix, MAX_ATTEMPTS);
speakConvoAIAPI = new SpeakConvoAIAPI(context, pathPrefix, MAX_ATTEMPTS);
}

private String getPathPrefix(Context context, ConvoAIServiceRegionEnum serviceRegionEnum) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.agora.rest.services.convoai.api;

import io.agora.rest.core.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;
import java.time.Duration;
import java.util.function.Predicate;

public abstract class BaseAPI {
protected final Context context;

protected final Logger logger = LoggerFactory.getLogger(getClass());

protected final Integer maxAttempts;

protected final String pathPrefix;

public BaseAPI(Context context, String pathPrefix, Integer maxAttempts) {
this.context = context;
this.pathPrefix = pathPrefix;
this.maxAttempts = maxAttempts;
}

protected RetryBackoffSpec customRetry(Predicate<Throwable> retryPredicate) {
return Retry.backoff(maxAttempts, Duration.ofSeconds(1)) // Maximum 5 retry attempts, initial 1-second interval
.maxBackoff(Duration.ofSeconds(5)) // Maximum 5-second interval
.filter(retryPredicate) // Retry condition
.doBeforeRetry(retrySignal -> {
long retryCount = retrySignal.totalRetries() + 1; // Current retry attempt number
Duration nextBackoff = Duration.ofSeconds(retryCount); // Next retry interval
logger.warn("Retry attempt: {}, next backoff: {}", retryCount, nextBackoff);
}).onRetryExhaustedThrow((retryBackoffSpec, retrySignal) -> {
logger.error("Retry exhausted: {}", retrySignal.totalRetries());
return retrySignal.failure();
});
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
package io.agora.rest.services.convoai.api;

import io.agora.rest.core.Context;
import io.agora.rest.exception.AgoraNeedRetryException;
import io.agora.rest.services.convoai.res.HistoryConvoAIRes;
import io.netty.handler.codec.http.HttpMethod;
import reactor.core.publisher.Mono;

public class HistoryConvoAIAPI {
private final Context context;
public class HistoryConvoAIAPI extends BaseAPI {

private final String pathPrefix;

public HistoryConvoAIAPI(Context context, String pathPrefix) {
this.context = context;
this.pathPrefix = pathPrefix;
public HistoryConvoAIAPI(Context context, String pathPrefix, Integer maxAttempts) {
super(context, pathPrefix, maxAttempts);
}

public Mono<HistoryConvoAIRes> handle(String agentId) {
String path = String.format("%s/agents/%s/history", pathPrefix, agentId);
return this.context.sendRequest(path, HttpMethod.GET, null, HistoryConvoAIRes.class);
return this.context.sendRequest(path, HttpMethod.GET, null, HistoryConvoAIRes.class)
.retryWhen(customRetry(e -> e instanceof AgoraNeedRetryException));
}
}
Loading