Skip to content

Commit 0e0e6f4

Browse files
authored
Add API to get and cancel timed out Futures (Azure#36773)
Add API to get and cancel timed out Futures
1 parent e19f80a commit 0e0e6f4

File tree

7 files changed

+199
-186
lines changed

7 files changed

+199
-186
lines changed

eng/versioning/version_client.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,7 @@ com.azure.tools:azure-sdk-build-tool;1.0.0;1.1.0-beta.1
441441
# note: The unreleased dependencies will not be manipulated with the automatic PR creation code.
442442
# In the pom, the version update tag after the version should name the unreleased package and the dependency version:
443443
# <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
444+
unreleased_com.azure:azure-core;1.44.0-beta.1
444445

445446
# Released Beta dependencies: Copy the entry from above, prepend "beta_", remove the current
446447
# version and set the version to the released beta. Released beta dependencies are only valid

sdk/core/azure-core/src/main/java/com/azure/core/util/CoreUtils.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,12 @@
2323
import java.util.Objects;
2424
import java.util.Properties;
2525
import java.util.UUID;
26+
import java.util.concurrent.CancellationException;
27+
import java.util.concurrent.ExecutionException;
28+
import java.util.concurrent.Future;
2629
import java.util.concurrent.ThreadLocalRandom;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.TimeoutException;
2732
import java.util.function.BiFunction;
2833
import java.util.function.Function;
2934
import java.util.stream.Collectors;
@@ -531,4 +536,29 @@ static UUID randomUuid(long msb, long lsb) {
531536
// For environments using Reactor's BlockHound this will raise an exception if called in non-blocking threads.
532537
return new UUID(msb, lsb);
533538
}
539+
540+
/**
541+
* Calls {@link Future#get(long, TimeUnit)} and returns the value if the {@code future} completes before the timeout
542+
* is triggered. If the timeout is triggered, the {@code future} is {@link Future#cancel(boolean) cancelled}
543+
* interrupting the execution of the task that the {@link Future} represented.
544+
*
545+
* @param <T> The type of value returned by the {@code future}.
546+
* @param future The {@link Future} to get the value from.
547+
* @param timeout The timeout value.
548+
* @param unit The {@link TimeUnit} of the timeout value.
549+
* @return The value from the {@code future}.
550+
* @throws CancellationException If the computation was cancelled.
551+
* @throws ExecutionException If the computation threw an exception.
552+
* @throws InterruptedException If the current thread was interrupted while waiting.
553+
* @throws TimeoutException If the wait timed out.
554+
*/
555+
public static <T> T getFutureWithCancellation(Future<T> future, long timeout, TimeUnit unit)
556+
throws InterruptedException, ExecutionException, TimeoutException {
557+
try {
558+
return future.get(timeout, unit);
559+
} catch (TimeoutException e) {
560+
future.cancel(true);
561+
throw e;
562+
}
563+
}
534564
}

sdk/core/azure-core/src/test/java/com/azure/core/util/CoreUtilsTests.java

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
import com.azure.core.http.HttpHeaders;
77
import com.azure.core.http.policy.HttpLogOptions;
88
import com.azure.core.util.logging.ClientLogger;
9+
import org.junit.jupiter.api.AfterAll;
910
import org.junit.jupiter.api.Assertions;
11+
import org.junit.jupiter.api.BeforeAll;
1012
import org.junit.jupiter.api.Test;
1113
import org.junit.jupiter.params.ParameterizedTest;
1214
import org.junit.jupiter.params.provider.Arguments;
@@ -25,7 +27,14 @@
2527
import java.util.List;
2628
import java.util.Map;
2729
import java.util.UUID;
30+
import java.util.concurrent.ExecutionException;
31+
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.Executors;
33+
import java.util.concurrent.Future;
2834
import java.util.concurrent.ThreadLocalRandom;
35+
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.TimeoutException;
37+
import java.util.concurrent.atomic.AtomicBoolean;
2938
import java.util.function.Function;
3039
import java.util.stream.Stream;
3140

@@ -36,6 +45,7 @@
3645
import static org.junit.jupiter.api.Assertions.assertNull;
3746
import static org.junit.jupiter.api.Assertions.assertThrows;
3847
import static org.junit.jupiter.api.Assertions.assertTrue;
48+
import static org.junit.jupiter.api.Assertions.fail;
3949

4050
public class CoreUtilsTests {
4151
private static final byte[] BYTES = "Hello world!".getBytes(StandardCharsets.UTF_8);
@@ -49,6 +59,18 @@ public class CoreUtilsTests {
4959
private static final String TIMEOUT_PROPERTY_NAME = "TIMEOUT_PROPERTY_NAME";
5060
private static final ConfigurationSource EMPTY_SOURCE = new TestConfigurationSource();
5161

62+
private static ExecutorService executorService;
63+
64+
@BeforeAll
65+
public static void setupClass() {
66+
executorService = Executors.newCachedThreadPool();
67+
}
68+
69+
@AfterAll
70+
public static void teardownClass() {
71+
executorService.shutdownNow();
72+
}
73+
5274
@Test
5375
public void findFirstOfTypeEmptyArgs() {
5476
assertNull(CoreUtils.findFirstOfType(null, Integer.class));
@@ -484,7 +506,7 @@ public void randomUuidIsCorrectlyType4() {
484506
bytes[6] &= 0x0f; /* clear version */
485507
bytes[6] |= 0x40; /* set to version 4 */
486508
bytes[8] &= 0x3f; /* clear variant */
487-
bytes[8] |= 0x80; /* set to IETF variant */
509+
bytes[8] |= (byte) 0x80; /* set to IETF variant */
488510
long msbForJava = 0;
489511
long lsbForJava = 0;
490512
for (int i = 0; i < 8; i++) {
@@ -496,4 +518,48 @@ public void randomUuidIsCorrectlyType4() {
496518

497519
assertEquals(new UUID(msbForJava, lsbForJava), CoreUtils.randomUuid(msb, lsb));
498520
}
521+
522+
@Test
523+
public void futureCompletesBeforeTimeout() {
524+
try {
525+
AtomicBoolean completed = new AtomicBoolean(false);
526+
Future<?> future = executorService.submit(() -> {
527+
Thread.sleep(10);
528+
completed.set(true);
529+
return null;
530+
});
531+
532+
future.get(1000, TimeUnit.MILLISECONDS);
533+
534+
assertTrue(completed.get());
535+
} catch (InterruptedException | ExecutionException | TimeoutException e) {
536+
throw new RuntimeException(e);
537+
}
538+
}
539+
540+
@Test
541+
public void futureTimesOutAndIsCancelled() {
542+
try {
543+
AtomicBoolean completed = new AtomicBoolean(false);
544+
Future<?> future = executorService.submit(() -> {
545+
Thread.sleep(1000);
546+
completed.set(true);
547+
return null;
548+
});
549+
550+
try {
551+
CoreUtils.getFutureWithCancellation(future, 100, TimeUnit.MILLISECONDS);
552+
fail("Expected future to timout and be cancelled.");
553+
} catch (TimeoutException e) {
554+
// Expected.
555+
}
556+
557+
// Give time for the future to complete if cancellation didn't work.
558+
Thread.sleep(1000);
559+
560+
assertFalse(completed.get());
561+
} catch (InterruptedException | ExecutionException e) {
562+
throw new RuntimeException(e);
563+
}
564+
}
499565
}

sdk/tables/azure-data-tables/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ Licensed under the MIT License.
4646
<dependency>
4747
<groupId>com.azure</groupId>
4848
<artifactId>azure-core</artifactId>
49-
<version>1.43.0</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
49+
<version>1.44.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
5050
</dependency>
5151
<dependency>
5252
<groupId>com.azure</groupId>

0 commit comments

Comments
 (0)