|
17 | 17 | package examples; |
18 | 18 |
|
19 | 19 | // [START managedkafka_create_cluster] |
| 20 | + |
20 | 21 | import com.google.api.gax.longrunning.OperationFuture; |
| 22 | +import com.google.api.gax.longrunning.OperationSnapshot; |
| 23 | +import com.google.api.gax.longrunning.OperationTimedPollAlgorithm; |
| 24 | +import com.google.api.gax.retrying.RetrySettings; |
| 25 | +import com.google.api.gax.retrying.RetryingFuture; |
| 26 | +import com.google.api.gax.retrying.TimedRetryAlgorithm; |
21 | 27 | import com.google.cloud.managedkafka.v1.AccessConfig; |
22 | 28 | import com.google.cloud.managedkafka.v1.CapacityConfig; |
23 | 29 | import com.google.cloud.managedkafka.v1.Cluster; |
24 | 30 | import com.google.cloud.managedkafka.v1.CreateClusterRequest; |
25 | 31 | import com.google.cloud.managedkafka.v1.GcpConfig; |
26 | 32 | import com.google.cloud.managedkafka.v1.LocationName; |
27 | 33 | import com.google.cloud.managedkafka.v1.ManagedKafkaClient; |
| 34 | +import com.google.cloud.managedkafka.v1.ManagedKafkaSettings; |
28 | 35 | import com.google.cloud.managedkafka.v1.NetworkConfig; |
29 | 36 | import com.google.cloud.managedkafka.v1.OperationMetadata; |
30 | 37 | import com.google.cloud.managedkafka.v1.RebalanceConfig; |
| 38 | +import java.time.Duration; |
31 | 39 | import java.util.concurrent.ExecutionException; |
32 | 40 |
|
33 | 41 | public class CreateCluster { |
@@ -64,17 +72,47 @@ public static void createCluster( |
64 | 72 | .setRebalanceConfig(rebalanceConfig) |
65 | 73 | .build(); |
66 | 74 |
|
67 | | - try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) { |
| 75 | + // Create the settings to configure the timeout for polling operations |
| 76 | + ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder(); |
| 77 | + TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create( |
| 78 | + RetrySettings.newBuilder() |
| 79 | + .setTotalTimeoutDuration(Duration.ofHours(1L)) |
| 80 | + .build()); |
| 81 | + settingsBuilder.createClusterOperationSettings() |
| 82 | + .setPollingAlgorithm(timedRetryAlgorithm); |
| 83 | + |
| 84 | + try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create( |
| 85 | + settingsBuilder.build())) { |
| 86 | + |
68 | 87 | CreateClusterRequest request = |
69 | 88 | CreateClusterRequest.newBuilder() |
70 | 89 | .setParent(LocationName.of(projectId, region).toString()) |
71 | 90 | .setClusterId(clusterId) |
72 | 91 | .setCluster(cluster) |
73 | 92 | .build(); |
| 93 | + |
74 | 94 | // The duration of this operation can vary considerably, typically taking between 10-40 |
75 | 95 | // minutes. |
76 | 96 | OperationFuture<Cluster, OperationMetadata> future = |
77 | 97 | managedKafkaClient.createClusterOperationCallable().futureCall(request); |
| 98 | + |
| 99 | + // Get the initial LRO and print details. |
| 100 | + OperationSnapshot operation = future.getInitialFuture().get(); |
| 101 | + System.out.printf("Cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n", |
| 102 | + operation.getName(), |
| 103 | + operation.isDone(), |
| 104 | + future.getMetadata().get().toString()); |
| 105 | + |
| 106 | + while (!future.isDone()) { |
| 107 | + // The pollingFuture gives us the most recent status of the operation |
| 108 | + RetryingFuture<OperationSnapshot> pollingFuture = future.getPollingFuture(); |
| 109 | + OperationSnapshot currentOp = pollingFuture.getAttemptResult().get(); |
| 110 | + System.out.printf("Polling Operation:\nName: %s\n Done: %s\n", |
| 111 | + currentOp.getName(), |
| 112 | + currentOp.isDone()); |
| 113 | + } |
| 114 | + |
| 115 | + // NOTE: future.get() blocks completion until the operation is complete (isDone = True) |
78 | 116 | Cluster response = future.get(); |
79 | 117 | System.out.printf("Created cluster: %s\n", response.getName()); |
80 | 118 | } catch (ExecutionException e) { |
|
0 commit comments