|
11 | 11 | import oracle.kubernetes.operator.work.Fiber.CompletionCallback;
|
12 | 12 | import oracle.kubernetes.operator.work.Fiber.ExitCallback;
|
13 | 13 |
|
| 14 | +/** |
| 15 | + * Allows at most one running Fiber per key value. However, rather than queue later arriving Fibers this class cancels |
| 16 | + * the earlier arriving Fibers. For the operator, this makes sense as domain presence Fibers that come later will always complete |
| 17 | + * or correct work that may have been in-flight. |
| 18 | + */ |
14 | 19 | public class FiberGate {
|
15 | 20 | private final Engine engine;
|
16 | 21 | private final ConcurrentMap<String, Fiber> gateMap = new ConcurrentHashMap<String, Fiber>();
|
17 | 22 |
|
| 23 | + /** |
| 24 | + * Constructor taking Engine for running Fibers |
| 25 | + * @param engine Engine |
| 26 | + */ |
18 | 27 | public FiberGate(Engine engine) {
|
19 | 28 | this.engine = engine;
|
20 | 29 | }
|
21 | 30 |
|
| 31 | + /** |
| 32 | + * Starts Fiber that cancels any earlier running Fibers with the same key. Fiber map is not updated if no Fiber |
| 33 | + * is started. |
| 34 | + * @param key Key |
| 35 | + * @param strategy Step for Fiber to begin with |
| 36 | + * @param packet Packet |
| 37 | + * @param callback Completion callback |
| 38 | + * @return started Fiber |
| 39 | + */ |
22 | 40 | public Fiber startFiber(String key, Step strategy, Packet packet, CompletionCallback callback) {
|
23 |
| - return replaceAndStartFiber(key, null, strategy, packet, callback); |
| 41 | + return startFiberIfLastFiberMatches(key, null, strategy, packet, callback); |
24 | 42 | }
|
25 | 43 |
|
26 |
| - public Fiber replaceAndStartFiber(String key, Fiber old, Step strategy, Packet packet, CompletionCallback callback) { |
| 44 | + /** |
| 45 | + * Starts Fiber only if the last started Fiber matches the given old Fiber. |
| 46 | + * @param key Key |
| 47 | + * @param old Expected last Fiber |
| 48 | + * @param strategy Step for Fiber to begin with |
| 49 | + * @param packet Packet |
| 50 | + * @param callback Completion callback |
| 51 | + * @return started Fiber, or null, if no Fiber started |
| 52 | + */ |
| 53 | + public synchronized Fiber startFiberIfLastFiberMatches(String key, Fiber old, Step strategy, Packet packet, CompletionCallback callback) { |
27 | 54 | Fiber f = engine.createFiber();
|
28 | 55 | WaitForOldFiberStep wfofs;
|
29 |
| - synchronized (this) { |
30 |
| - if (old != null) { |
31 |
| - if (!gateMap.replace(key, old, f)) { |
32 |
| - return null; |
33 |
| - } |
34 |
| - } else { |
35 |
| - old = gateMap.put(key, f); |
| 56 | + if (old != null) { |
| 57 | + if (!gateMap.replace(key, old, f)) { |
| 58 | + return null; |
36 | 59 | }
|
37 |
| - wfofs = new WaitForOldFiberStep(old, strategy); |
38 |
| - f.getComponents().put(ProcessingConstants.FIBER_COMPONENT_NAME, Component.createFor(wfofs)); |
| 60 | + } else { |
| 61 | + old = gateMap.put(key, f); |
39 | 62 | }
|
| 63 | + wfofs = new WaitForOldFiberStep(old, strategy); |
| 64 | + f.getComponents().put(ProcessingConstants.FIBER_COMPONENT_NAME, Component.createFor(wfofs)); |
40 | 65 | f.start(wfofs, packet, new CompletionCallback() {
|
41 | 66 | @Override
|
42 | 67 | public void onCompletion(Packet packet) {
|
@@ -87,5 +112,4 @@ public void onExit() {
|
87 | 112 | });
|
88 | 113 | }
|
89 | 114 | }
|
90 |
| - |
91 | 115 | }
|
0 commit comments