Skip to content

Commit 26d6b77

Browse files
committed
wip
Signed-off-by: wind57 <[email protected]>
1 parent 622b392 commit 26d6b77

File tree

2 files changed

+140
-0
lines changed

2 files changed

+140
-0
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright 2013-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.kubernetes.commons;
18+
19+
import jakarta.annotation.Nonnull;
20+
21+
import java.util.concurrent.Executors;
22+
import java.util.concurrent.ScheduledFuture;
23+
import java.util.concurrent.ScheduledThreadPoolExecutor;
24+
import java.util.concurrent.ThreadFactory;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.locks.ReentrantLock;
27+
28+
/**
29+
* This is taken from fabric8 with some minor changes (we need it, so it could be placed
30+
* in the common package).
31+
*
32+
* @author wind57
33+
*/
34+
public final class CachedSingleThreadScheduler {
35+
36+
private final ReentrantLock lock = new ReentrantLock();
37+
38+
private final long ttlMillis;
39+
40+
private ScheduledThreadPoolExecutor executor;
41+
42+
public CachedSingleThreadScheduler(long ttlMillis) {
43+
this.ttlMillis = ttlMillis;
44+
}
45+
46+
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
47+
try {
48+
lock.lock();
49+
this.startExecutor();
50+
return this.executor.scheduleWithFixedDelay(command, initialDelay, delay, unit);
51+
}
52+
finally {
53+
lock.unlock();
54+
}
55+
}
56+
57+
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
58+
try {
59+
lock.lock();
60+
this.startExecutor();
61+
return this.executor.schedule(command, delay, unit);
62+
}
63+
finally {
64+
lock.unlock();
65+
}
66+
}
67+
68+
private void startExecutor() {
69+
if (this.executor == null) {
70+
this.executor = new ScheduledThreadPoolExecutor(1, threadFactory());
71+
this.executor.setRemoveOnCancelPolicy(true);
72+
this.executor.scheduleWithFixedDelay(this::shutdownCheck, this.ttlMillis, this.ttlMillis,
73+
TimeUnit.MILLISECONDS);
74+
}
75+
76+
}
77+
78+
private void shutdownCheck() {
79+
try {
80+
lock.lock();
81+
if (this.executor.getQueue().isEmpty()) {
82+
this.executor.shutdownNow();
83+
this.executor = null;
84+
}
85+
}
86+
finally {
87+
lock.unlock();
88+
}
89+
90+
}
91+
92+
private ThreadFactory threadFactory() {
93+
return new ThreadFactory() {
94+
final ThreadFactory threadFactory = Executors.defaultThreadFactory();
95+
96+
@Override
97+
public Thread newThread(@Nonnull Runnable runnable) {
98+
Thread thread = threadFactory.newThread(runnable);
99+
thread.setName("fabric8-leader-election" + "-" + thread.getName());
100+
thread.setDaemon(true);
101+
return thread;
102+
}
103+
};
104+
}
105+
106+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package org.springframework.cloud.kubernetes.commons;
2+
3+
import java.util.concurrent.ScheduledFuture;
4+
import java.util.concurrent.ScheduledThreadPoolExecutor;
5+
import java.util.concurrent.TimeUnit;
6+
7+
public class DeleteMeSandbox {
8+
9+
public static void main(String[] args) throws InterruptedException {
10+
ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
11+
Runnable task = () -> {
12+
System.out.println("running");
13+
};
14+
ScheduledFuture<?> future = pool.scheduleAtFixedRate(task, 1, 1, TimeUnit.SECONDS);
15+
16+
17+
Runnable outputTask = () -> {
18+
while (true) {
19+
System.out.println("pool size : " + pool.getQueue().size());
20+
try {
21+
Thread.sleep(500);
22+
} catch (InterruptedException e) {
23+
throw new RuntimeException(e);
24+
}
25+
}
26+
};
27+
Thread thread = new Thread(outputTask);
28+
thread.start();
29+
30+
Thread.sleep(3_000);
31+
future.cancel(true);
32+
}
33+
34+
}

0 commit comments

Comments
 (0)