|
7 | 7 | import java.util.concurrent.ForkJoinPool; |
8 | 8 | import java.util.concurrent.ForkJoinTask; |
9 | 9 | import java.util.concurrent.ScheduledExecutorService; |
| 10 | +import java.util.concurrent.ThreadLocalRandom; |
10 | 11 | import java.util.concurrent.TimeUnit; |
| 12 | +import java.util.stream.Collectors; |
| 13 | +import java.util.stream.IntStream; |
11 | 14 |
|
12 | 15 | /** |
13 | 16 | * 测试 栈的默认 LIFO 策略 |
| 17 | + * 初步测试,没有LIFO特性,普通排队等待,队列中大量任务等待时,后续的小任务都要等很久,偶现又很快 |
14 | 18 | * |
15 | 19 | * @author Kuangcp |
16 | 20 | * 2024-12-05 13:43 |
@@ -163,15 +167,71 @@ public void testLIFO4() throws Exception { |
163 | 167 | sche.scheduleAtFixedRate(() -> { |
164 | 168 | int parallelism = pool.getParallelism(); |
165 | 169 | long queuedTaskCount = pool.getQueuedSubmissionCount(); |
166 | | - log.info("con={} wait={}", parallelism, queuedTaskCount); |
| 170 | +// log.info("con={} wait={}", parallelism, queuedTaskCount); |
167 | 171 | }, 1, 1, TimeUnit.SECONDS); |
168 | 172 |
|
169 | 173 | Thread.currentThread().join(); |
170 | 174 | } |
171 | 175 |
|
172 | | - |
| 176 | + /** |
| 177 | + * 模拟使用并行流时,高并发小请求 伴随定时的大请求,小请求很快就响应,但是大请求积压时间越来越长,提交线程有时也会消费任务。 |
| 178 | + * <p> |
| 179 | + * 应该能解释当系统打开页面,批量发起请求时,排队中的就会积压等待,但是为什么部分请求耗时不受影响。 |
| 180 | + * <p> |
| 181 | + * https://blog.csdn.net/weixin_38308374/article/details/112735120 |
| 182 | + * |
| 183 | + * @see ForEachOps.ForEachTask#compute() rightSplit.trySplit()可以对数据源的数据进行拆分,将数据一分为二 如果剩余的数据量不足以进行再次拆分,则直接使用当前线程处理 |
| 184 | + */ |
173 | 185 | @Test |
174 | | - public void testSSSS() throws Exception { |
175 | | - System.out.println("xxxxx"); |
| 186 | + public void testLIFO5() throws Exception { |
| 187 | + ScheduledExecutorService sche = Executors.newScheduledThreadPool(4); |
| 188 | + sche.scheduleAtFixedRate(() -> { |
| 189 | + int parallelism = ForkJoinPool.commonPool().getParallelism(); |
| 190 | + long queuedTaskCount = ForkJoinPool.commonPool().getQueuedSubmissionCount(); |
| 191 | + log.info("con={} wait={}", parallelism, queuedTaskCount); |
| 192 | + }, 1, 1, TimeUnit.SECONDS); |
| 193 | + |
| 194 | + // 模拟线程池提交重请求 |
| 195 | + sche.scheduleAtFixedRate(() -> { |
| 196 | + long start = System.currentTimeMillis(); |
| 197 | + |
| 198 | + IntStream.range(1, 10).parallel().mapToObj(v -> { |
| 199 | + try { |
| 200 | + TimeUnit.MILLISECONDS.sleep(2000); |
| 201 | + } catch (Exception e) { |
| 202 | + log.error("", e); |
| 203 | + } |
| 204 | + long val = System.currentTimeMillis(); |
| 205 | + long all = val - start; |
| 206 | + if (all > 110) { |
| 207 | + log.error("X long wait {}", all); |
| 208 | + } |
| 209 | + return val; |
| 210 | + }).collect(Collectors.toList()); |
| 211 | + }, 5, 6, TimeUnit.SECONDS); |
| 212 | + |
| 213 | + // 模拟小请求 |
| 214 | + for (int i = 0; i < 1000; i++) { |
| 215 | + TimeUnit.MILLISECONDS.sleep(300); |
| 216 | + new Thread(() -> { |
| 217 | + long start = System.currentTimeMillis(); |
| 218 | + IntStream.range(1, 10).parallel().mapToObj(v -> { |
| 219 | + try { |
| 220 | + int xr = ThreadLocalRandom.current().nextInt(100); |
| 221 | + TimeUnit.MILLISECONDS.sleep(200 + xr); |
| 222 | + } catch (Exception e) { |
| 223 | + log.error("", e); |
| 224 | + } |
| 225 | + long val = System.currentTimeMillis(); |
| 226 | + long all = val - start; |
| 227 | + if (all > 110) { |
| 228 | + log.error("long wait {}", all); |
| 229 | + } |
| 230 | + return val; |
| 231 | + }).collect(Collectors.toList()); |
| 232 | + }).start(); |
| 233 | + } |
| 234 | + |
| 235 | + Thread.currentThread().join(); |
176 | 236 | } |
177 | 237 | } |
0 commit comments