| 
 | 1 | +package io.mincong.concurrency.completablefuture;  | 
 | 2 | + | 
 | 3 | +import java.time.LocalTime;  | 
 | 4 | +import java.util.concurrent.CompletableFuture;  | 
 | 5 | +import java.util.concurrent.Executors;  | 
 | 6 | +import java.util.stream.IntStream;  | 
 | 7 | + | 
 | 8 | +public class MultipleStageDemo {  | 
 | 9 | + | 
 | 10 | +  public static void main(String[] args) {  | 
 | 11 | +    var demo = new MultipleStageDemo();  | 
 | 12 | +    demo.scenario1();  | 
 | 13 | +    demo.scenario2();  | 
 | 14 | +  }  | 
 | 15 | + | 
 | 16 | +  /**  | 
 | 17 | +   * In scenario 1, resource creation and deletion are done in the same completion stage. Given a  | 
 | 18 | +   * fixed thread pool executor with 2 threads, the executor will complete the tasks of two  | 
 | 19 | +   * resources (e.g. 0 and 1) before starting the next stages. Its behavior can be illustrated as  | 
 | 20 | +   * follows:  | 
 | 21 | +   *  | 
 | 22 | +   * <pre>  | 
 | 23 | +   *          +-----------------------------------------------------------------> Time  | 
 | 24 | +   *          | +------------------------+ +------------------------+  | 
 | 25 | +   * Thread 1 | | Creation 0, Deletion 0 | | Creation 2, Deletion 2 |  | 
 | 26 | +   *          | +------------------------+ +------------------------+  | 
 | 27 | +   * Thread 2 | | Creation 1, Deletion 1 | | Creation 3, Deletion 3 |  | 
 | 28 | +   *          | +------------------------+ +------------------------+  | 
 | 29 | +   *          |  | 
 | 30 | +   * </pre>  | 
 | 31 | +   *  | 
 | 32 | +   * This is because these stages are queued in the following way:  | 
 | 33 | +   *  | 
 | 34 | +   * <pre>  | 
 | 35 | +   * 0. +------------------------+  | 
 | 36 | +   *    | Creation 0, Deletion 0 |  | 
 | 37 | +   *    +------------------------+  | 
 | 38 | +   * 1. +------------------------+  | 
 | 39 | +   *    | Creation 1, Deletion 1 |  | 
 | 40 | +   *    +------------------------+  | 
 | 41 | +   * 2. +------------------------+  | 
 | 42 | +   *    | Creation 2, Deletion 2 |  | 
 | 43 | +   *    +------------------------+  | 
 | 44 | +   * 3. +------------------------+  | 
 | 45 | +   *    | Creation 3, Deletion 3 |  | 
 | 46 | +   *    +------------------------+  | 
 | 47 | +   * </pre>  | 
 | 48 | +   *  | 
 | 49 | +   * Console output:  | 
 | 50 | +   *  | 
 | 51 | +   * <pre>  | 
 | 52 | +   * [15:41:18.010681][main] - Scenario 1 started  | 
 | 53 | +   * [15:41:18.103478][pool-1-thread-1] - [id=0] New resource: creating  | 
 | 54 | +   * [15:41:18.103482][pool-1-thread-2] - [id=1] New resource: creating  | 
 | 55 | +   * [15:41:19.107141][pool-1-thread-1] - [id=0] New resource: created  | 
 | 56 | +   * [15:41:19.107472][pool-1-thread-1] - [id=0] Old resource: creating  | 
 | 57 | +   * [15:41:19.108780][pool-1-thread-2] - [id=1] New resource: created  | 
 | 58 | +   * [15:41:19.108929][pool-1-thread-2] - [id=1] Old resource: creating  | 
 | 59 | +   * [15:41:20.112909][pool-1-thread-2] - [id=1] Old resource: created  | 
 | 60 | +   * [15:41:20.112909][pool-1-thread-1] - [id=0] Old resource: created  | 
 | 61 | +   * [15:41:20.113265][pool-1-thread-1] - [id=2] New resource: creating  | 
 | 62 | +   * [15:41:20.113452][pool-1-thread-2] - [id=3] New resource: creating  | 
 | 63 | +   * [15:41:21.114555][pool-1-thread-1] - [id=2] New resource: created  | 
 | 64 | +   * [15:41:21.114555][pool-1-thread-2] - [id=3] New resource: created  | 
 | 65 | +   * [15:41:21.114914][pool-1-thread-1] - [id=2] Old resource: creating  | 
 | 66 | +   * [15:41:21.115019][pool-1-thread-2] - [id=3] Old resource: creating  | 
 | 67 | +   * [15:41:22.119584][pool-1-thread-2] - [id=3] Old resource: created  | 
 | 68 | +   * [15:41:22.119584][pool-1-thread-1] - [id=2] Old resource: created  | 
 | 69 | +   * [15:41:22.120209][main] - Scenario 1 finished  | 
 | 70 | +   * </pre>  | 
 | 71 | +   */  | 
 | 72 | +  private void scenario1() {  | 
 | 73 | +    print("Scenario 1 started");  | 
 | 74 | +    var executor = Executors.newFixedThreadPool(2);  | 
 | 75 | +    var futures =  | 
 | 76 | +        IntStream.range(0, 4)  | 
 | 77 | +            .mapToObj(  | 
 | 78 | +                i ->  | 
 | 79 | +                    CompletableFuture.runAsync(  | 
 | 80 | +                        () -> {  | 
 | 81 | +                          createNewResource(i);  | 
 | 82 | +                          deleteOldResource(i);  | 
 | 83 | +                        },  | 
 | 84 | +                        executor))  | 
 | 85 | +            .toArray(CompletableFuture[]::new);  | 
 | 86 | +    try {  | 
 | 87 | +      CompletableFuture.allOf(futures).join();  | 
 | 88 | +    } finally {  | 
 | 89 | +      executor.shutdownNow();  | 
 | 90 | +    }  | 
 | 91 | +    print("Scenario 1 finished");  | 
 | 92 | +  }  | 
 | 93 | + | 
 | 94 | +  /**  | 
 | 95 | +   * In scenario 2, resource creation and deletion are done separately in two completion stages.  | 
 | 96 | +   * Given a fixed thread pool executor with 2 threads, the executor will complete the creation of  | 
 | 97 | +   * two resources (e.g. 0 and 1) then continue the creation of two other resources (e.g. 2 and 3).  | 
 | 98 | +   * Once done, it will start the deletions in the same way.  | 
 | 99 | +   *  | 
 | 100 | +   * <pre>  | 
 | 101 | +   *          +-----------------------------------------------------------------> Time  | 
 | 102 | +   *          | +------------+ +------------+ +------------+ +------------+  | 
 | 103 | +   * Thread 1 | | Creation 0 | | Creation 2 | | Deletion 0 | | Deletion 2 |  | 
 | 104 | +   *          | +------------+ +------------+ +------------+ +------------+  | 
 | 105 | +   * Thread 2 | | Creation 1 | | Creation 3 | | Deletion 1 | | Deletion 3 |  | 
 | 106 | +   *          | +------------+ +------------+ +------------+ +------------+  | 
 | 107 | +   *          |  | 
 | 108 | +   * </pre>  | 
 | 109 | +   *  | 
 | 110 | +   * This is because these stages are queued in the following way:  | 
 | 111 | +   *  | 
 | 112 | +   * <pre>  | 
 | 113 | +   * 0. +------------+  | 
 | 114 | +   *    | Creation 0 |  | 
 | 115 | +   *    +------------+  | 
 | 116 | +   * 1. +------------+  | 
 | 117 | +   *    | Creation 1 |  | 
 | 118 | +   *    +------------+  | 
 | 119 | +   * 2. +------------+  | 
 | 120 | +   *    | Creation 2 |  | 
 | 121 | +   *    +------------+  | 
 | 122 | +   * 3. +------------+  | 
 | 123 | +   *    | Creation 3 |  | 
 | 124 | +   *    +------------+  | 
 | 125 | +   * 4. +------------+  | 
 | 126 | +   *    | Deletion 0 |  | 
 | 127 | +   *    +------------+  | 
 | 128 | +   * 5. +------------+  | 
 | 129 | +   *    | Deletion 1 |  | 
 | 130 | +   *    +------------+  | 
 | 131 | +   * 6. +------------+  | 
 | 132 | +   *    | Deletion 2 |  | 
 | 133 | +   *    +------------+  | 
 | 134 | +   * 7. +------------+  | 
 | 135 | +   *    | Deletion 3 |  | 
 | 136 | +   *    +-------------+  | 
 | 137 | +   * </pre>  | 
 | 138 | +   *  | 
 | 139 | +   * Console output:  | 
 | 140 | +   *  | 
 | 141 | +   * <pre>  | 
 | 142 | +   * [15:41:22.120374][main] - Scenario 2 started  | 
 | 143 | +   * [15:41:22.123783][pool-2-thread-1] - [id=0] New resource: creating  | 
 | 144 | +   * [15:41:22.124870][pool-2-thread-2] - [id=1] New resource: creating  | 
 | 145 | +   * [15:41:23.126835][pool-2-thread-1] - [id=0] New resource: created  | 
 | 146 | +   * [15:41:23.126927][pool-2-thread-2] - [id=1] New resource: created  | 
 | 147 | +   * [15:41:23.127811][pool-2-thread-1] - [id=2] New resource: creating  | 
 | 148 | +   * [15:41:23.127868][pool-2-thread-2] - [id=3] New resource: creating  | 
 | 149 | +   * [15:41:24.132029][pool-2-thread-2] - [id=3] New resource: created  | 
 | 150 | +   * [15:41:24.132044][pool-2-thread-1] - [id=2] New resource: created  | 
 | 151 | +   * [15:41:24.132439][pool-2-thread-2] - [id=0] Old resource: creating  | 
 | 152 | +   * [15:41:24.132452][pool-2-thread-1] - [id=1] Old resource: creating  | 
 | 153 | +   * [15:41:25.137506][pool-2-thread-2] - [id=0] Old resource: created  | 
 | 154 | +   * [15:41:25.137506][pool-2-thread-1] - [id=1] Old resource: created  | 
 | 155 | +   * [15:41:25.137804][pool-2-thread-2] - [id=3] Old resource: creating  | 
 | 156 | +   * [15:41:25.138011][pool-2-thread-1] - [id=2] Old resource: creating  | 
 | 157 | +   * [15:41:26.138201][pool-2-thread-2] - [id=3] Old resource: created  | 
 | 158 | +   * [15:41:26.138340][pool-2-thread-1] - [id=2] Old resource: created  | 
 | 159 | +   * [15:41:26.139091][main] - Scenario 2 finished  | 
 | 160 | +   * </pre>  | 
 | 161 | +   */  | 
 | 162 | +  private void scenario2() {  | 
 | 163 | +    print("Scenario 2 started");  | 
 | 164 | +    var executor = Executors.newFixedThreadPool(2);  | 
 | 165 | +    var futures =  | 
 | 166 | +        IntStream.range(0, 4)  | 
 | 167 | +            .mapToObj(  | 
 | 168 | +                i ->  | 
 | 169 | +                    CompletableFuture.runAsync(() -> createNewResource(i), executor)  | 
 | 170 | +                        .thenRunAsync(() -> deleteOldResource(i), executor))  | 
 | 171 | +            .toArray(CompletableFuture[]::new);  | 
 | 172 | +    try {  | 
 | 173 | +      CompletableFuture.allOf(futures).join();  | 
 | 174 | +    } finally {  | 
 | 175 | +      executor.shutdownNow();  | 
 | 176 | +    }  | 
 | 177 | +    print("Scenario 2 finished");  | 
 | 178 | +  }  | 
 | 179 | + | 
 | 180 | +  private void createNewResource(int i) {  | 
 | 181 | +    print("[id=" + i + "] New resource: creating");  | 
 | 182 | +    try {  | 
 | 183 | +      Thread.sleep(1000);  | 
 | 184 | +    } catch (InterruptedException e) {  | 
 | 185 | +      // ignore  | 
 | 186 | +    }  | 
 | 187 | +    print("[id=" + i + "] New resource: created");  | 
 | 188 | +  }  | 
 | 189 | + | 
 | 190 | +  private void deleteOldResource(int i) {  | 
 | 191 | +    print("[id=" + i + "] Old resource: creating");  | 
 | 192 | +    try {  | 
 | 193 | +      Thread.sleep(1000);  | 
 | 194 | +    } catch (InterruptedException e) {  | 
 | 195 | +      // ignore  | 
 | 196 | +    }  | 
 | 197 | +    print("[id=" + i + "] Old resource: created");  | 
 | 198 | +  }  | 
 | 199 | + | 
 | 200 | +  private static void print(String message) {  | 
 | 201 | +    var thread = Thread.currentThread().getName();  | 
 | 202 | +    var time = LocalTime.now();  | 
 | 203 | +    System.out.println("[" + time + "][" + thread + "] - " + message);  | 
 | 204 | +  }  | 
 | 205 | +}  | 
0 commit comments