|
| 1 | +--- |
| 2 | +title: "Asynchronous call model" |
| 3 | +date: 2019-02-23T17:20:00-05:00 |
| 4 | +draft: false |
| 5 | +weight: 7 |
| 6 | +--- |
| 7 | + |
| 8 | + |
| 9 | +Our expectation is that customers will task the operator with managing hundreds of WebLogic domains across dozens of Kubernetes Namespaces. Therefore, we have designed the operator with an efficient user-level threads pattern. We've used that pattern to implement an asynchronous call model for Kubernetes API requests. This call model has built-in support for timeouts, retries with exponential back-off, and lists that exceed the requested maximum size using the continuance functionality. |
| 10 | + |
| 11 | +#### User-level thread pattern |
| 12 | + |
| 13 | +The user-level thread pattern is implemented by the classes in the `oracle.kubernetes.operator.work` package. |
| 14 | + |
| 15 | +* `Engine`: The executor service and factory for `Fibers`. |
| 16 | +* `Fiber`: The user-level thread. `Fibers` represent the execution of a single processing flow through a series of `Steps`. `Fibers` may be suspended and later resumed, and do not consume a `Thread` while suspended. |
| 17 | +* `Step`: Individual CPU-bound activity in a processing flow. |
| 18 | +* `Packet`: Context of the processing flow. |
| 19 | +* `NextAction`: Used by a `Step` when it returns control to the `Fiber` to indicate what should happen next. Common 'next actions' are to execute another `Step` or to suspend the `Fiber`. |
| 20 | +* `Component`: Provider of SPI's that may be useful to the processing flow. |
| 21 | +* `Container`: Represents the containing environment and is a `Component`. |
| 22 | + |
| 23 | +Each `Step` has a reference to the next `Step` in the processing flow; however, `Steps` are not required to indicate that the next `Step` be invoked by the `Fiber` when the `Step` returns a `NextAction` to the `Fiber`. This leads to common use cases where `Fibers` invoke a series of `Steps` that are linked by the 'is-next' relationship, but just as commonly, use cases where the `Fiber` will invoke sets of `Steps` along a detour before returning to the normal flow. |
| 24 | + |
| 25 | +In this sample, the caller creates an `Engine`, `Fiber`, linked set of `Step` instances, and `Packet`. The `Fiber` is then started. The `Engine` would typically be a singleton, since it's backed by a `ScheduledExecutorService`. The `Packet` would also typically be pre-loaded with values that the `Steps` would use in their `apply()` methods. |
| 26 | + |
| 27 | +```java |
| 28 | +static class SomeClass { |
| 29 | + public static void main(String[] args) { |
| 30 | + Engine engine = new Engine("worker-pool"); |
| 31 | + |
| 32 | + Fiber fiber = engine.createFiber(); |
| 33 | + |
| 34 | + Step step = new StepOne(new StepTwo(new StepThree(null))); |
| 35 | + Packet packet = new Packet(); |
| 36 | + |
| 37 | + fiber.start( |
| 38 | + step, |
| 39 | + packet, |
| 40 | + new CompletionCallback() { |
| 41 | + @Override |
| 42 | + public void onCompletion(Packet packet) { |
| 43 | + // Fiber has completed successfully |
| 44 | + } |
| 45 | + |
| 46 | + @Override |
| 47 | + public void onThrowable(Packet packet, Throwable throwable) { |
| 48 | + // Fiber processing was terminated with an exception |
| 49 | + } |
| 50 | + }); |
| 51 | + } |
| 52 | +} |
| 53 | +``` |
| 54 | + |
| 55 | +`Steps` must not invoke sleep or blocking calls from within `apply()`. This prevents the worker threads from serving other `Fibers`. Instead, use asynchronous calls and the `Fiber` suspend/resume pattern. `Step` provides a method, `doDelay()`, which creates a `NextAction` to drive `Fiber` suspend/resume that is a better option than sleep precisely because the worker thread can serve other `Fibers` during the delay. For asynchronous IO or similar patterns, suspend the `Fiber`. In the callback as the `Fiber` suspends, initiate the asynchronous call. Finally, when the call completes, resume the `Fiber`. The suspend/resume functionality handles the case where resumed before the suspending callback completes. |
| 56 | + |
| 57 | +In this sample, the step uses asynchronous file IO and the suspend/resume `Fiber` pattern. |
| 58 | + |
| 59 | +```java |
| 60 | + static class StepTwo extends Step { |
| 61 | + public StepTwo(Step next) { |
| 62 | + super(next); |
| 63 | + } |
| 64 | + |
| 65 | + @Override |
| 66 | + public NextAction apply(Packet packet) { |
| 67 | + return doSuspend((fiber) -> { |
| 68 | + // The Fiber is now suspended |
| 69 | + // Start the asynchronous call |
| 70 | + try { |
| 71 | + Path path = Paths.get(URI.create(this.getClass().getResource("/somefile.dat").toString())); |
| 72 | + AsynchronousFileChannel fileChannel = |
| 73 | + AsynchronousFileChannel.open(path, StandardOpenOption.READ); |
| 74 | + |
| 75 | + ByteBuffer buffer = ByteBuffer.allocate(1024); |
| 76 | + fileChannel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() { |
| 77 | + @Override |
| 78 | + void completed(Integer result, ByteBuffer attachment) { |
| 79 | + // Store data in Packet and resume Fiber |
| 80 | + packet.put("DATA_SIZE_READ", result); |
| 81 | + packet.put("DATA_FROM_SOMEFILE", attachment); |
| 82 | + fiber.resume(packet); |
| 83 | + } |
| 84 | + |
| 85 | + @Override |
| 86 | + public void failed(Throwable exc, ByteBuffer attachment) { |
| 87 | + // log exc |
| 88 | + completed(0, null); |
| 89 | + } |
| 90 | + }); |
| 91 | + } catch (IOException e) { |
| 92 | + // log exception |
| 93 | + // If not resumed here, Fiber will never be resumed |
| 94 | + } |
| 95 | + }); |
| 96 | + } |
| 97 | + } |
| 98 | +``` |
| 99 | + |
| 100 | +#### Call builder pattern |
| 101 | + |
| 102 | +The asynchronous call model is implemented by classes in the `oracle.kubernetes.operator.helpers` package, including `CallBuilder` and `ResponseStep`. The model is based on the `Fiber` suspend/resume pattern described above. `CallBuilder` provides many methods having names ending with "Async", such as `listPodAsync()` or `deleteServiceAsync()`. These methods return a `Step` that can be returned as part of a `NextAction`. When creating these `Steps`, the developer must provide a `ResponseStep`. Only `ResponseStep.onSuccess()` must be implemented; however, it is often useful to override `onFailure()` as Kubernetes treats `404 (Not Found)` as a failure. |
| 103 | + |
| 104 | +In this sample, the developer is using the pattern to list pods from the default namespace that are labeled as part of `cluster-1`. |
| 105 | + |
| 106 | +```java |
| 107 | + static class StepOne extends Step { |
| 108 | + public StepOne(Step next) { |
| 109 | + super(next); |
| 110 | + } |
| 111 | + |
| 112 | + @Override |
| 113 | + public NextAction apply(Packet packet) { |
| 114 | + String namespace = "default"; |
| 115 | + Step step = CallBuilder.create().with($ -> { |
| 116 | + $.labelSelector = "weblogic.clusterName=cluster-1"; |
| 117 | + $.limit = 50; |
| 118 | + $.timeoutSeconds = 30; |
| 119 | + }).listPodAsync(namespace, new ResponseStep<V1PodList>(next) { |
| 120 | + @Override |
| 121 | + public NextAction onFailure(Packet packet, ApiException e, int statusCode, |
| 122 | + Map<String, List<String>> responseHeaders) { |
| 123 | + if (statusCode == CallBuilder.NOT_FOUND) { |
| 124 | + return onSuccess(packet, null, statusCode, responseHeaders); |
| 125 | + } |
| 126 | + return super.onFailure(packet, e, statusCode, responseHeaders); |
| 127 | + } |
| 128 | + |
| 129 | + @Override |
| 130 | + NextAction onSuccess(Packet packet, V1PodList result, int statusCode, |
| 131 | + Map<String, List<String>> responseHeaders) { |
| 132 | + // do something with the result Pod, if not null |
| 133 | + return doNext(packet); |
| 134 | + } |
| 135 | + }); |
| 136 | + |
| 137 | + return doNext(step, packet); |
| 138 | + } |
| 139 | + } |
| 140 | +``` |
| 141 | + |
| 142 | +Notice that the required parameters, such as `namespace`, are method arguments, but optional parameters are designated using a simplified builder pattern using `with()` and a lambda. |
| 143 | + |
| 144 | +The default behavior of `onFailure()` will retry with an exponential backoff the request on status codes `429 (TooManyRequests)`, `500 (InternalServerError)`, `503 (ServiceUnavailable)`, `504 (ServerTimeout)` or a simple timeout with no response from the server. |
| 145 | + |
| 146 | +If the server responds with status code `409 (Conflict)`, then this indicates an optimistic locking failure. Common use cases are that the code read a Kubernetes object in one asynchronous step, modified the object, and attempted to replace the object in another asynchronous step; however, another activity replaced that same object in the interim. In this case, retrying the request would give the same result. Therefore, developers may provide an "on conflict" step when calling `super.onFailure()`. The conflict step will be invoked after an exponential backoff delay. In this example, that conflict step should be the step that reads the existing Kubernetes object. |
0 commit comments