Skip to content

Commit 1ecd729

Browse files
authored
mgmt, add poller support for sync-stack (Azure#44031)
* add test pom * implementation * javadoc * private ctor * checkstyle, spotbugs * address comments * fix parameter name * share code * nit * use Mono.defer * add verification to initial 200 case * add no delay verification * directly getFinalResult * changelog * fix
1 parent 8bef03b commit 1ecd729

File tree

6 files changed

+686
-249
lines changed

6 files changed

+686
-249
lines changed

sdk/core/azure-core-management/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### Features Added
66

7+
- Added sync-stack support `SyncPollerFactory`, for Azure Resource Manager(ARM) long-running-operation(LRO).
8+
79
### Breaking Changes
810

911
### Bugs Fixed

sdk/core/azure-core-management/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,12 @@
9797
<version>5.11.2</version> <!-- {x-version-update;org.junit.jupiter:junit-jupiter-engine;external_dependency} -->
9898
<scope>test</scope>
9999
</dependency>
100+
<dependency>
101+
<groupId>org.junit.jupiter</groupId>
102+
<artifactId>junit-jupiter-params</artifactId>
103+
<version>5.11.2</version> <!-- {x-version-update;org.junit.jupiter:junit-jupiter-params;external_dependency} -->
104+
<scope>test</scope>
105+
</dependency>
100106
<dependency>
101107
<groupId>io.projectreactor</groupId>
102108
<artifactId>reactor-test</artifactId>

sdk/core/azure-core-management/src/main/java/com/azure/core/management/implementation/polling/PollOperation.java

Lines changed: 48 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,14 @@ public final class PollOperation {
5050
return pollingContext -> {
5151
PollingState pollingState = PollingState.from(serializerAdapter, pollingContext);
5252
if (pollingState.getOperationStatus().isComplete()) {
53-
return pollResponseMonoFromPollingState(serializerAdapter, pollResultType, pollingState);
53+
return Mono.defer(
54+
() -> Mono.just(pollResponseFromPollingState(serializerAdapter, pollResultType, pollingState)));
5455
} else {
5556
// InProgress|NonTerminal-Status
5657
return doSinglePoll(pipeline, pollingState, context).flatMap(updatedState -> {
5758
updatedState.store(pollingContext);
58-
return pollResponseMonoFromPollingState(serializerAdapter, pollResultType, updatedState);
59+
return Mono.defer(
60+
() -> Mono.just(pollResponseFromPollingState(serializerAdapter, pollResultType, updatedState)));
5961
});
6062
}
6163
};
@@ -116,37 +118,6 @@ public static <T, U> Function<PollingContext<PollResult<T>>, Mono<U>> fetchResul
116118
};
117119
}
118120

119-
/**
120-
* Create a PollResponse indicating service error.
121-
*
122-
* @param opStatus the long-running-operation errored status
123-
* @param error the error description
124-
* @param <T> the poll result type
125-
* @return PollResponse
126-
*/
127-
private static <T> Mono<PollResponse<PollResult<T>>> errorPollResponseMono(LongRunningOperationStatus opStatus,
128-
Error error) {
129-
PollResult<T> pollResult = new PollResult<>(new PollResult.Error(error.getMessage(),
130-
error.getResponseStatusCode(), new HttpHeaders(error.getResponseHeaders()), error.getResponseBody()));
131-
return Mono.just(new PollResponse<>(opStatus, pollResult));
132-
}
133-
134-
/**
135-
* Create a PollResponse indicating succeeded or in-progress LRO.
136-
*
137-
* @param serializerAdapter the serializer for any encoding and decoding
138-
* @param opStatus the long-running-operation succeeded or in-progress status
139-
* @param pollResponseBody the poll response body
140-
* @param pollResultType the poll result type
141-
* @param <T> the poll result type
142-
* @return PollResponse
143-
*/
144-
private static <T> Mono<PollResponse<PollResult<T>>> pollResponseMono(SerializerAdapter serializerAdapter,
145-
LongRunningOperationStatus opStatus, String pollResponseBody, Type pollResultType, Duration pollDelay) {
146-
T result = deserialize(serializerAdapter, pollResponseBody, pollResultType);
147-
return Mono.just(new PollResponse<>(opStatus, new PollResult<T>(result), pollDelay));
148-
}
149-
150121
/**
151122
* Do a poll to retrieve the LRO status.
152123
*
@@ -170,34 +141,72 @@ private static Mono<PollingState> doSinglePoll(HttpPipeline pipeline, PollingSta
170141
.fromSupplier(() -> pollingState.update(response.getStatusCode(), response.getHeaders(), null))));
171142
}
172143

173-
private static <T> Mono<PollResponse<PollResult<T>>> pollResponseMonoFromPollingState(
174-
SerializerAdapter serializerAdapter, Type pollResultType, PollingState pollingState) {
144+
/**
145+
* Gets the latest poll response from PollingState.
146+
*
147+
* @param serializerAdapter the serializer for any encoding and decoding
148+
* @param pollResultType the poll result type
149+
* @param pollingState the polling state
150+
* @param <T> the poll result type
151+
* @return the latest poll response
152+
*/
153+
static <T> PollResponse<PollResult<T>> pollResponseFromPollingState(SerializerAdapter serializerAdapter,
154+
Type pollResultType, PollingState pollingState) {
175155
if (pollingState.getOperationStatus().isComplete()) {
176156
if (pollingState.getOperationStatus() == LongRunningOperationStatus.FAILED
177157
|| pollingState.getOperationStatus() == LRO_CANCELLED) {
178158
// Failed|Cancelled
179159
Error lroInitError = pollingState.getSynchronouslyFailedLroError();
180160
if (lroInitError != null) {
181-
return errorPollResponseMono(pollingState.getOperationStatus(), lroInitError);
161+
return errorPollResponse(pollingState.getOperationStatus(), lroInitError);
182162
}
183163
Error pollError = pollingState.getPollError();
184164
if (pollError != null) {
185-
return errorPollResponseMono(pollingState.getOperationStatus(), pollError);
165+
return errorPollResponse(pollingState.getOperationStatus(), pollError);
186166
}
187167
throw new IllegalStateException(
188168
"Either LroError or PollError must" + "be set when OperationStatus is in Failed|Cancelled State.");
189169
} else {
190170
// Succeeded
191-
return pollResponseMono(serializerAdapter, pollingState.getOperationStatus(),
171+
return pollResponse(serializerAdapter, pollingState.getOperationStatus(),
192172
pollingState.getLastResponseBody(), pollResultType, pollingState.getPollDelay());
193173
}
194174
} else {
195175
// InProgress|NonTerminal-Status
196-
return pollResponseMono(serializerAdapter, pollingState.getOperationStatus(),
176+
return pollResponse(serializerAdapter, pollingState.getOperationStatus(),
197177
pollingState.getLastResponseBody(), pollResultType, pollingState.getPollDelay());
198178
}
199179
}
200180

181+
/**
182+
* Create a PollResponse indicating succeeded or in-progress LRO.
183+
*
184+
* @param serializerAdapter the serializer for any encoding and decoding
185+
* @param operationStatus the long-running-operation succeeded or in-progress status
186+
* @param pollResponseBody the poll response body
187+
* @param pollResultType the poll result type
188+
* @param <T> the poll result type
189+
* @return PollResponse
190+
*/
191+
static <T> PollResponse<PollResult<T>> pollResponse(SerializerAdapter serializerAdapter,
192+
LongRunningOperationStatus operationStatus, String pollResponseBody, Type pollResultType, Duration pollDelay) {
193+
T result = deserialize(serializerAdapter, pollResponseBody, pollResultType);
194+
return new PollResponse<>(operationStatus, new PollResult<>(result), pollDelay);
195+
}
196+
197+
/**
198+
* Create a PollResponse indicating service error.
199+
*
200+
* @param operationStatus the long-running-operation errored status
201+
* @param error the error description
202+
* @param <T> the poll result type
203+
* @return PollResponse
204+
*/
205+
static <T> PollResponse<PollResult<T>> errorPollResponse(LongRunningOperationStatus operationStatus, Error error) {
206+
return new PollResponse<>(operationStatus, new PollResult<>(new PollResult.Error(error.getMessage(),
207+
error.getResponseStatusCode(), new HttpHeaders(error.getResponseHeaders()), error.getResponseBody())));
208+
}
209+
201210
/**
202211
* Decorate the request.
203212
*
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.core.management.implementation.polling;
5+
6+
import com.azure.core.http.HttpMethod;
7+
import com.azure.core.http.HttpPipeline;
8+
import com.azure.core.http.HttpRequest;
9+
import com.azure.core.http.HttpResponse;
10+
import com.azure.core.http.rest.Response;
11+
import com.azure.core.management.polling.PollResult;
12+
import com.azure.core.util.BinaryData;
13+
import com.azure.core.util.Context;
14+
import com.azure.core.util.polling.PollResponse;
15+
import com.azure.core.util.polling.PollingContext;
16+
import com.azure.core.util.serializer.SerializerAdapter;
17+
18+
import java.lang.reflect.Type;
19+
import java.util.function.BiFunction;
20+
import java.util.function.Function;
21+
import java.util.function.Supplier;
22+
23+
/**
24+
* Synchronous poll operation for Azure resource manager (ARM) long-running-operation (LRO).
25+
*/
26+
public final class SyncPollOperation {
27+
/**
28+
* Gets a Function that starts the Azure resource manager(ARM) long-running-operation(LRO).
29+
*
30+
* @param serializerAdapter the serializer for any encoding and decoding
31+
* @param pollResultType the type of the poll result, if no result is expecting then this should be Void.class
32+
* @param lroInitialResponseSupplier Supplier of the activation operation to activate (start) the long-running operation
33+
* @param <T> the type of poll result
34+
* @return the ARM LRO activation Function
35+
*/
36+
public static <T> Function<PollingContext<PollResult<T>>, PollResponse<PollResult<T>>> activationFunction(
37+
SerializerAdapter serializerAdapter, Class<T> pollResultType,
38+
Supplier<Response<BinaryData>> lroInitialResponseSupplier) {
39+
return pollingContext -> {
40+
Response<BinaryData> response = lroInitialResponseSupplier.get();
41+
PollingState state = PollingState.create(serializerAdapter, response.getRequest(), response.getStatusCode(),
42+
response.getHeaders(), response.getValue().toString());
43+
state.store(pollingContext);
44+
T result = PollOperation.deserialize(serializerAdapter, response.getValue().toString(), pollResultType);
45+
return new PollResponse<>(state.getOperationStatus(), new PollResult<>(result), state.getPollDelay());
46+
};
47+
}
48+
49+
/**
50+
* Gets a Function that polls provisioning state of ARM resource.
51+
*
52+
* @param serializerAdapter the serializer for any encoding and decoding
53+
* @param httpPipeline the HttpPipeline for making poll request
54+
* @param pollResultType the type of the poll result
55+
* @param context the context
56+
* @param <T> the type of poll result type
57+
* @return the ARM poll function
58+
*/
59+
public static <T> Function<PollingContext<PollResult<T>>, PollResponse<PollResult<T>>> pollFunction(
60+
SerializerAdapter serializerAdapter, HttpPipeline httpPipeline, Class<T> pollResultType, Context context) {
61+
return pollingContext -> {
62+
PollingState state = PollingState.from(serializerAdapter, pollingContext);
63+
if (state.getOperationStatus().isComplete()) {
64+
return PollOperation.pollResponseFromPollingState(serializerAdapter, pollResultType, state);
65+
} else {
66+
try (HttpResponse response
67+
= httpPipeline.sendSync(new HttpRequest(HttpMethod.GET, state.getPollUrl()), context)) {
68+
String body = response.getBodyAsBinaryData().toString();
69+
state.update(response.getStatusCode(), response.getHeaders(), body);
70+
state.store(pollingContext);
71+
return PollOperation.pollResponseFromPollingState(serializerAdapter, pollResultType, state);
72+
}
73+
}
74+
};
75+
}
76+
77+
/**
78+
* Currently there is no option to cancel an ARM LRO in generic way, this is NOP.
79+
*
80+
* @param context the context
81+
* @param <T> the type of poll result type
82+
* @return cancel Function
83+
*/
84+
public static <T> BiFunction<PollingContext<PollResult<T>>, PollResponse<PollResult<T>>, PollResult<T>>
85+
cancelFunction(Context context) {
86+
return (pollingContext, pollResultPollResponse) -> null;
87+
}
88+
89+
/**
90+
* Gets a Function that retrieves final result of a LRO.
91+
*
92+
* @param serializerAdapter the serializer for any encoding and decoding
93+
* @param httpPipeline the HttpPipeline for fetching final result
94+
* @param finalResultType the final result type
95+
* @param context the context
96+
* @param <T> the final result type
97+
* @param <U> the poll result type
98+
* @return retrieve final LRO result Function
99+
*/
100+
public static <T, U> Function<PollingContext<T>, U> fetchResultFunction(SerializerAdapter serializerAdapter,
101+
HttpPipeline httpPipeline, Type finalResultType, Context context) {
102+
return pollingContext -> {
103+
PollingState state = PollingState.from(serializerAdapter, pollingContext);
104+
FinalResult finalResult = state.getFinalResult();
105+
if (finalResult == null) {
106+
return null;
107+
} else {
108+
String value = finalResult.getResult();
109+
U result;
110+
if (value != null) {
111+
result = PollOperation.deserialize(serializerAdapter, value, finalResultType);
112+
} else {
113+
try (HttpResponse response
114+
= httpPipeline.sendSync(new HttpRequest(HttpMethod.GET, finalResult.getResultUri()), context)) {
115+
result = PollOperation.deserialize(serializerAdapter, response.getBodyAsBinaryData().toString(),
116+
finalResultType);
117+
}
118+
}
119+
return result;
120+
}
121+
};
122+
}
123+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
package com.azure.core.management.polling;
5+
6+
import com.azure.core.http.HttpPipeline;
7+
import com.azure.core.http.rest.Response;
8+
import com.azure.core.management.implementation.polling.SyncPollOperation;
9+
import com.azure.core.util.BinaryData;
10+
import com.azure.core.util.Context;
11+
import com.azure.core.util.polling.SyncPoller;
12+
import com.azure.core.util.serializer.SerializerAdapter;
13+
14+
import java.time.Duration;
15+
import java.util.function.Supplier;
16+
17+
/**
18+
* Factory to create SyncPoller for Azure Resource Manager (ARM) long-running-operation (LRO).
19+
*/
20+
public final class SyncPollerFactory {
21+
22+
private SyncPollerFactory() {
23+
}
24+
25+
/**
26+
* Creates a SyncPoller with ARM LRO init operation.
27+
*
28+
* @param serializerAdapter the serializer for any encoding and decoding
29+
* @param httpPipeline the HttpPipeline for making any Http request (e.g. poll)
30+
* @param pollResultType the type of the poll result, if no result is expecting then this should be Void.class
31+
* @param finalResultType the type of the final result, if no result is expecting then this should be Void.class
32+
* @param defaultPollDuration the default poll interval to use if service does not return retry-after
33+
* @param lroInitialResponseSupplier Supplier of the activation operation to activate (start) the long-running operation. This operation
34+
* will be invoked at most once.
35+
* @param <T> the type of poll result
36+
* @param <U> the type of final result
37+
* @return SyncPoller
38+
*/
39+
public static <T, U> SyncPoller<PollResult<T>, U> create(SerializerAdapter serializerAdapter,
40+
HttpPipeline httpPipeline, Class<T> pollResultType, Class<U> finalResultType, Duration defaultPollDuration,
41+
Supplier<Response<BinaryData>> lroInitialResponseSupplier) {
42+
return create(serializerAdapter, httpPipeline, pollResultType, finalResultType, defaultPollDuration,
43+
lroInitialResponseSupplier, Context.NONE);
44+
}
45+
46+
/**
47+
* Creates a SyncPoller with ARM LRO init operation.
48+
*
49+
* @param serializerAdapter the serializer for any encoding and decoding
50+
* @param httpPipeline the HttpPipeline for making any Http request (e.g. poll)
51+
* @param pollResultType the type of the poll result, if no result is expecting then this should be Void.class
52+
* @param finalResultType the type of the final result, if no result is expecting then this should be Void.class
53+
* @param defaultPollDuration the default poll interval to use if service does not return retry-after
54+
* @param lroInitialResponseSupplier Supplier of the activation operation to activate (start) the long-running operation. This operation
55+
* will be invoked at most once.
56+
* @param context the context shared by all requests
57+
* @param <T> the type of poll result
58+
* @param <U> the type of final result
59+
* @return SyncPoller
60+
*/
61+
public static <T, U> SyncPoller<PollResult<T>, U> create(SerializerAdapter serializerAdapter,
62+
HttpPipeline httpPipeline, Class<T> pollResultType, Class<U> finalResultType, Duration defaultPollDuration,
63+
Supplier<Response<BinaryData>> lroInitialResponseSupplier, Context context) {
64+
return SyncPoller.createPoller(defaultPollDuration,
65+
SyncPollOperation.activationFunction(serializerAdapter, pollResultType, lroInitialResponseSupplier),
66+
SyncPollOperation.pollFunction(serializerAdapter, httpPipeline, pollResultType, context),
67+
SyncPollOperation.cancelFunction(context),
68+
SyncPollOperation.fetchResultFunction(serializerAdapter, httpPipeline, finalResultType, context));
69+
}
70+
}

0 commit comments

Comments
 (0)