1111import io .opentelemetry .opamp .client .internal .request .Request ;
1212import io .opentelemetry .opamp .client .internal .request .delay .AcceptsDelaySuggestion ;
1313import io .opentelemetry .opamp .client .internal .request .delay .PeriodicDelay ;
14- import io .opentelemetry .opamp .client .internal .request . delay . PeriodicTaskExecutor ;
14+ import io .opentelemetry .opamp .client .internal .response . OpampServerResponseError ;
1515import io .opentelemetry .opamp .client .internal .response .Response ;
1616import java .io .IOException ;
1717import java .time .Duration ;
1818import java .util .Objects ;
1919import java .util .Optional ;
2020import java .util .concurrent .CompletableFuture ;
2121import java .util .concurrent .ExecutionException ;
22+ import java .util .concurrent .Executors ;
23+ import java .util .concurrent .ScheduledExecutorService ;
24+ import java .util .concurrent .ScheduledFuture ;
25+ import java .util .concurrent .ThreadFactory ;
2226import java .util .concurrent .TimeUnit ;
2327import java .util .concurrent .TimeoutException ;
2428import java .util .concurrent .atomic .AtomicBoolean ;
29+ import java .util .concurrent .atomic .AtomicReference ;
2530import java .util .function .Supplier ;
31+ import javax .annotation .Nonnull ;
2632import javax .annotation .Nullable ;
2733import opamp .proto .AgentToServer ;
2834import opamp .proto .ServerErrorResponse ;
2935import opamp .proto .ServerErrorResponseType ;
3036import opamp .proto .ServerToAgent ;
3137
32- public final class HttpRequestService implements RequestService , Runnable {
38+ public final class HttpRequestService implements RequestService {
3339 private final HttpSender requestSender ;
34- private final PeriodicTaskExecutor executor ;
40+ private final ScheduledExecutorService executorService ;
3541 private final PeriodicDelay periodicRequestDelay ;
3642 private final PeriodicDelay periodicRetryDelay ;
37- private final AtomicBoolean retryModeEnabled = new AtomicBoolean (false );
3843 private final AtomicBoolean isRunning = new AtomicBoolean (false );
3944 private final AtomicBoolean hasStopped = new AtomicBoolean (false );
45+ private final AtomicReference <PeriodicDelay > currentDelay ;
46+ private final AtomicReference <ScheduledFuture <?>> currentTask = new AtomicReference <>();
4047 private final RetryAfterParser retryAfterParser ;
4148 @ Nullable private Callback callback ;
4249 @ Nullable private Supplier <Request > requestSupplier ;
@@ -65,23 +72,24 @@ public static HttpRequestService create(
6572 PeriodicDelay periodicRetryDelay ) {
6673 return new HttpRequestService (
6774 requestSender ,
68- PeriodicTaskExecutor . create ( periodicRequestDelay ),
75+ Executors . newSingleThreadScheduledExecutor ( new DaemonThreadFactory () ),
6976 periodicRequestDelay ,
7077 periodicRetryDelay ,
7178 RetryAfterParser .getInstance ());
7279 }
7380
7481 HttpRequestService (
7582 HttpSender requestSender ,
76- PeriodicTaskExecutor executor ,
83+ ScheduledExecutorService executorService ,
7784 PeriodicDelay periodicRequestDelay ,
7885 PeriodicDelay periodicRetryDelay ,
7986 RetryAfterParser retryAfterParser ) {
8087 this .requestSender = requestSender ;
81- this .executor = executor ;
88+ this .executorService = executorService ;
8289 this .periodicRequestDelay = periodicRequestDelay ;
8390 this .periodicRetryDelay = periodicRetryDelay ;
8491 this .retryAfterParser = retryAfterParser ;
92+ currentDelay = new AtomicReference <>(periodicRequestDelay );
8593 }
8694
8795 @ Override
@@ -92,47 +100,45 @@ public void start(Callback callback, Supplier<Request> requestSupplier) {
92100 if (isRunning .compareAndSet (false , true )) {
93101 this .callback = callback ;
94102 this .requestSupplier = requestSupplier ;
95- executor .start (this );
103+ currentTask .set (
104+ executorService .schedule (
105+ this ::periodicSend , getNextDelay ().toNanos (), TimeUnit .NANOSECONDS ));
96106 } else {
97107 throw new IllegalStateException ("HttpRequestService is already running" );
98108 }
99109 }
100110
101- @ Override
102- public void stop () {
103- if ( isRunning . compareAndSet ( true , false )) {
104- hasStopped .set (true );
105- executor . stop ();
106- }
111+ private void periodicSend () {
112+ doSendRequest ();
113+ // schedule the next execution
114+ currentTask .set (
115+ executorService . schedule (
116+ this :: periodicSend , getNextDelay (). toNanos (), TimeUnit . NANOSECONDS ));
107117 }
108118
109- private void enableRetryMode (@ Nullable Duration suggestedDelay ) {
110- if (retryModeEnabled .compareAndSet (false , true )) {
111- periodicRetryDelay .reset ();
112- if (suggestedDelay != null && periodicRetryDelay instanceof AcceptsDelaySuggestion ) {
113- ((AcceptsDelaySuggestion ) periodicRetryDelay ).suggestDelay (suggestedDelay );
114- }
115- executor .setPeriodicDelay (periodicRetryDelay );
116- }
119+ private void sendOnce () {
120+ executorService .execute (this ::doSendRequest );
117121 }
118122
119- private void disableRetryMode () {
120- if (retryModeEnabled .compareAndSet (true , false )) {
121- periodicRequestDelay .reset ();
122- executor .setPeriodicDelay (periodicRequestDelay );
123- }
123+ private Duration getNextDelay () {
124+ return Objects .requireNonNull (currentDelay .get ()).getNextDelay ();
124125 }
125126
126127 @ Override
127- public void sendRequest () {
128- if (!retryModeEnabled .get ()) {
129- executor .executeNow ();
128+ public void stop () {
129+ if (isRunning .compareAndSet (true , false )) {
130+ hasStopped .set (true );
131+ executorService .shutdown ();
130132 }
131133 }
132134
133135 @ Override
134- public void run () {
135- doSendRequest ();
136+ public void sendRequest () {
137+ if (!isRunning .get ()) {
138+ throw new IllegalStateException ("HttpRequestService is not running" );
139+ }
140+
141+ sendOnce ();
136142 }
137143
138144 private void doSendRequest () {
@@ -173,7 +179,7 @@ private void handleHttpError(HttpSender.Response response) {
173179 retryAfter = duration .get ();
174180 }
175181 }
176- enableRetryMode (retryAfter );
182+ useRetryDelay (retryAfter );
177183 }
178184 }
179185
@@ -182,16 +188,14 @@ private static boolean isSuccessful(HttpSender.Response response) {
182188 }
183189
184190 private void handleSuccessResponse (Response response ) {
185- if (retryModeEnabled .get ()) {
186- disableRetryMode ();
187- }
191+ useRegularDelay ();
188192 ServerToAgent serverToAgent = response .getServerToAgent ();
189193
190194 if (serverToAgent .error_response != null ) {
191195 handleErrorResponse (serverToAgent .error_response );
196+ } else {
197+ getCallback ().onRequestSuccess (response );
192198 }
193-
194- getCallback ().onRequestSuccess (response );
195199 }
196200
197201 private void handleErrorResponse (ServerErrorResponse errorResponse ) {
@@ -200,11 +204,51 @@ private void handleErrorResponse(ServerErrorResponse errorResponse) {
200204 if (errorResponse .retry_info != null ) {
201205 retryAfter = Duration .ofNanos (errorResponse .retry_info .retry_after_nanoseconds );
202206 }
203- enableRetryMode (retryAfter );
207+ useRetryDelay (retryAfter );
208+ }
209+ getCallback ().onRequestFailed (new OpampServerResponseError (errorResponse .error_message ));
210+ }
211+
212+ private void useRegularDelay () {
213+ if (currentDelay .compareAndSet (periodicRetryDelay , periodicRequestDelay )) {
214+ cancelCurrentTask ();
215+ periodicRequestDelay .reset ();
216+ }
217+ }
218+
219+ private void useRetryDelay (@ Nullable Duration retryAfter ) {
220+ if (currentDelay .compareAndSet (periodicRequestDelay , periodicRetryDelay )) {
221+ cancelCurrentTask ();
222+ periodicRetryDelay .reset ();
223+ if (retryAfter != null && periodicRetryDelay instanceof AcceptsDelaySuggestion ) {
224+ ((AcceptsDelaySuggestion ) periodicRetryDelay ).suggestDelay (retryAfter );
225+ }
226+ }
227+ }
228+
229+ private void cancelCurrentTask () {
230+ ScheduledFuture <?> future = currentTask .get ();
231+ if (future != null ) {
232+ future .cancel (false );
204233 }
205234 }
206235
207236 private Callback getCallback () {
208237 return Objects .requireNonNull (callback );
209238 }
239+
240+ private static class DaemonThreadFactory implements ThreadFactory {
241+ private final ThreadFactory delegate = Executors .defaultThreadFactory ();
242+
243+ @ Override
244+ public Thread newThread (@ Nonnull Runnable r ) {
245+ Thread t = delegate .newThread (r );
246+ try {
247+ t .setDaemon (true );
248+ } catch (SecurityException e ) {
249+ // Well, we tried.
250+ }
251+ return t ;
252+ }
253+ }
210254}
0 commit comments