|
| 1 | +## 작업과 실행 정책 간의 보이지 않는 연결 관계 |
| 2 | + |
| 3 | +Executor 프레임웍을 이용하여 실행정책을 설정 & 변경 시, |
| 4 | +일정한 조건을 갖춘 실행 정책이 필요한 작업이 있음을 유의해야 함 |
| 5 | + |
| 6 | +- **의존성이 있는 작업** |
| 7 | + - **독립적인 작업** : 다른 작업이 실행하는 데서 발생하는 부수적인 요건에 관계없이 동작하는 작업 → 설정 변경해도 성능 외 문제 X |
| 8 | + - **다른 작업에 의존성을 갖는 작업** : 실행 정책에 보이지 않는 조건 걸림, 활동성 문제 발생 가능 → 스레드 풀의 크기를 충분히 크게 잡기 |
| 9 | + |
| 10 | +- **스레드 한정 기법을 사용하는 작업** |
| 11 | + - Executor 프레임웍이 단일 스레드로 동작해야 한다는 조건 발생 → 여러 개의 스레드를 사용하는 풀로 변경하지 않도록 유의 |
| 12 | + - 반드시 순차적으로 실행돼야 함 |
| 13 | + |
| 14 | +- **응답 시간이 민감한 작업** |
| 15 | + - 응답 성능을 지켜야 하는 Executor에는 오래 걸리는 작업을 넣지 않도록 유의 |
| 16 | + - 화면 갱신 등 응답 민감한 작업은 가볍게 유지 |
| 17 | + |
| 18 | +- **ThreadLocal을 사용하는 작업** |
| 19 | + - ThreadLocal은 스레드가 재사용될 때 값도 함께 남아 있음. Executor처럼 스레드를 오래 재사용하는 환경에서는, 의도치 않게 이전 작업의 데이터가 이어질 수 있음 |
| 20 | + - ThreadLocal 사용 시, 현재 실행 중인 작업 종료 후 더 이상 사용하지 않을 값만 보관해야 함 → 작업 종료 후 반드시 정리하기 |
| 21 | + |
| 22 | +> 스레드 풀은 동일하고 서로 독립적인 다수의 작업 실행 시 효과적이지만, |
| 23 | +> 해당 작업이 가정하는 **실행 정책**을 명확히 문서화해야 함. |
| 24 | +
|
| 25 | +--- |
| 26 | + |
| 27 | +### 스레드 부족 데드락 |
| 28 | + |
| 29 | +- **스레드 부족 데드락**: 의존성 있는 작업처럼, 같은 풀 안에서 작업이 서로의 결과를 기다리기만 하며 진행이 멈추는 상황 |
| 30 | + - 작업 A가 같은 풀에 추가한 다른 작업 B의 결과를 기다림 |
| 31 | + - B도 A가 끝나길 기다림 → A, B 모두 대기 → 데드락 발생 |
| 32 | + |
| 33 | +- 풀의 크기가 충분히 크다면 데드락 없이 실행될 수 있지만, |
| 34 | + 완전히 독립적이지 않은 작업을 Executor에 등록할 때는 항상 주의 필요 |
| 35 | + |
| 36 | +- 운영 환경에서는 스레드 풀에서 필요한 자원이 제한되어 풀 크기가 예상보다 작게 설정될 수 있음 |
| 37 | + |
| 38 | +- **풀 크기, 설정 등 실행 정책을 명시해 두는 것이 중요** |
| 39 | + |
| 40 | +--- |
| 41 | + |
| 42 | +### 오래 실행되는 작업 |
| 43 | + |
| 44 | +- 스레드 풀의 크기가 상당히 작다면, 오래 걸리는 작업 때문에 응답 속도가 저하될 수 있음 |
| 45 | + - 일정 시간 동안만 대기하는 메소드 사용 |
| 46 | + - 시간 초과 시 해당 작업이 실행되지 못했음을 기록하고, 작업을 종료 후 재등록하는 대책 등 사용 |
| 47 | + |
| 48 | +--- |
| 49 | + |
| 50 | +## 스레드 풀 크기 조절 |
| 51 | + |
| 52 | +- 스레드 풀의 크기는 설정 파일이나 `Runtime.availableProcessors` 등의 메소드 결과 값에 따라 동적으로 지정 |
| 53 | + - 너무 크거나 작으면 자원 확보 경쟁을 하거나, 작업 처리 속도가 저하됨 |
| 54 | + |
| 55 | +### 스레드 풀 크기 산정 방법 |
| 56 | +1. **컴퓨터 환경 확인**: CPU 개수, 메모리, 파일 핸들, DB 커넥션 등 가용 자원 파악 & 해야 할 작업의 동작 과정 파악 |
| 57 | +2. **작업 성격 고려**: CPU를 많이 사용하는 작업의 경우, CPU 코어 수 + 1이 적정치로 알려져 있음 |
| 58 | +3. **비율 계산**: 실제 작업하는 시간 대비 대기 시간 비율을 대략 측정해 풀 크기를 조정 |
| 59 | + |
| 60 | +> 리틀의 법칙(Little's law) : `L = λ × W` |
| 61 | +> - **L**: 동시에 처리되는 요청의 개수 |
| 62 | +> - **λ (lambda)**: 시스템이 처리 가능한 평균 처리량 |
| 63 | +> - **W**: 평균 요청 처리 시간 |
| 64 | +> |
| 65 | +> 예) 평균 응답 시간 55ms, 스레드 풀 크기 22인 서비스 → `22 / 0.055 = 400` |
| 66 | +> 시스템이 1초당 처리할 수 있는 요청 개수는 **400개** |
| 67 | +
|
| 68 | +4. 풀 크기를 바꿔가며 계속 실행 → CPU의 활용도가 최적화되는 지점 찾기 |
| 69 | +5. 자원 고려: DB 연결 등 다른 자원과 풀 크기가 서로 영향을 미치므로 함께 조율해야 함 |
| 70 | + |
| 71 | +--- |
| 72 | + |
| 73 | +## ThreadPoolExecutor 설정 |
| 74 | + |
| 75 | +### 스레드 생성과 제거 |
| 76 | +- Executors 팩토리 메소드의 기본 정책이 맞지 않으면, |
| 77 | + **ThreadPoolExecutor 생성 메소드**를 직접 사용해 세부 설정 조정 가능 |
| 78 | + |
| 79 | +- 주요 설정 값 |
| 80 | + - **풀의 코어 크기** : 기본적으로 유지할 스레드 수 |
| 81 | + - **최대 크기** : 동시에 실행 가능한 스레드 수의 상한값 |
| 82 | + - **스레드 유지 시간** : 코어 크기를 초과하거나, 스레드가 일정 시간 이상 작업 없이 대기하면 제거 |
| 83 | + - 코어 크기 및 유지 시간을 조절하면 불필요한 스레드 자원 점유를 줄이고, 다른 작업에 자원을 활용 가능 |
| 84 | + |
| 85 | + ```java |
| 86 | + public ThreadPoolExecutor(int corePoolSize, |
| 87 | + int maximumPoolSize, |
| 88 | + long keepAliveTime, |
| 89 | + TimeUnit unit, |
| 90 | + BlockingQueue<Runnable> workQueue, |
| 91 | + ThreadFactory threadFactory, |
| 92 | + RejectedExecutionHandler handler) {...} |
| 93 | + ``` |
| 94 | + |
| 95 | + --- |
| 96 | + |
| 97 | +### 큐에 쌓인 작업 관리 |
| 98 | + |
| 99 | +스레드 풀도 처리할 수 있는 것보다 많은 작업이 들어오면 결국 한계에 도달함. |
| 100 | +큐(BlockingQueue)에 작업을 쌓아둘 수 있지만, 처리 속도보다 유입이 빠르면 **응답 지연 발생**. |
| 101 | + |
| 102 | +#### 큐에 적용할 수 있는 전략 |
| 103 | +1. **크기 제한 없음**: 작업을 끝없이 쌓음 |
| 104 | + - `LinkedBlockingQueue`, `newFixedThreadPool`, `newSingleThreadExecutor` 기본 설정 |
| 105 | +2. **크기 제한 있음**: 자원 사용량 제어 가능 |
| 106 | + - 단, 큐가 가득 차면 새로운 작업 처리 방안 필요 |
| 107 | + - 큐 크기와 스레드 수를 함께 튜닝해야 함 |
| 108 | +3. **직접 전달**: 큐 없이 작업을 스레드에 바로 전달 |
| 109 | + - `SynchronousQueue`, `newCachedThreadPool` 기본 설정 |
| 110 | + - 대기 스레드 없으면 새 스레드 생성, 최대 크기 도달 시 집중 대응 정책에 따라 작업 거부 |
| 111 | + |
| 112 | +#### 권장 전략 |
| 113 | +- 작업이 **서로 독립적일 때만** 스레드 수/작업 큐 크기 제한 가능 |
| 114 | + - 자원 관리 중요 & 동시 스레드 수 제한 필요 → **고정된 스레드 풀** |
| 115 | + - 작업 간 의존성 있음 → 고정 큐는 **스레드 부족 데드락 위험** → **무제한 풀 권장** |
| 116 | + - 작업 몰림·동적 확장 필요 → `newCachedThreadPool` + `SynchronousQueue` |
| 117 | + |
| 118 | +--- |
| 119 | + |
| 120 | +### 집중 대응 정책 |
| 121 | + |
| 122 | +- 큐가 가득 찼거나 이미 종료된 스레드 풀에 작업을 등록할 때 **집중 대응 정책** 동작 |
| 123 | + |
| 124 | +#### 설정 방법 |
| 125 | +- `setRejectedExecutionHandler()`로 원하는 정책 지정 가능 |
| 126 | + |
| 127 | +1. **중단 정책 (기본 정책)** : `RejectedExecutionException` 던짐 → 호출자가 직접 처리 |
| 128 | +2. **제거 정책** : 새로 추가하려던 작업을 조용히 제거 |
| 129 | +3. **오래된 항목 제거 정책** : 큐의 가장 오래된 작업 제거 후 새 작업 삽입 |
| 130 | +4. **호출자 실행 정책** : 작업을 제출한 스레드에서 직접 실행 → 속도 조절 가능 |
| 131 | + |
| 132 | +#### 추가 방법 |
| 133 | +- Executor 생성 시 집중 대응 정책 지정 가능 |
| 134 | +- `execute()` 호출 시 단순 대기 정책은 없음 |
| 135 | +- 필요 시 **Semaphore 기반 BoundedExecutor** 구현 → (스레드 수 + 큐 허용 크기) 만큼 세마포어 설정 |
| 136 | + |
| 137 | +--- |
| 138 | + |
| 139 | +### 스레드 팩토리 (ThreadFactory) |
| 140 | + |
| 141 | +- 스레드 풀은 새로운 스레드를 항상 **ThreadFactory**를 통해 생성 (`newThread()` 호출) |
| 142 | +- 기본 ThreadFactory는 **데몬이 아니고 특별한 설정 없는 스레드**를 반환 |
| 143 | + |
| 144 | +#### ThreadFactory의 특징 |
| 145 | +- `newThread(Runnable r)` 메서드 하나만 정의되어 있음 |
| 146 | +- 사용 예시 |
| 147 | + - 스레드 이름 지정 → 로그/덤프에서 식별 용이 |
| 148 | + - `UncaughtExceptionHandler` 지정 → 실행 중 예외 로깅 |
| 149 | + - 생성/종료 시 디버깅 메시지 출력 |
| 150 | + - 생성/제거된 스레드 수 통계 관리 |
| 151 | + |
| 152 | +#### 애플리케이션 보안 관련 |
| 153 | +- `Executors.privilegedThreadFactory()` 사용 가능 |
| 154 | + - 호출한 스레드와 동일한 권한, `AccessControlContext`, `contextClassLoader` 적용 |
| 155 | + - → 여러 클라이언트가 서로 다른 보안 정책을 가질 때 혼란 방지 |
| 156 | + |
| 157 | +--- |
| 158 | + |
| 159 | +### ThreadPoolExecutor 생성 이후 설정 변경 |
| 160 | + |
| 161 | +- `ThreadPoolExecutor`는 생성 후에도 `set` 메소드로 대부분 설정 값 변경 가능 |
| 162 | + - 예: `setCorePoolSize()`, `setMaximumPoolSize()` 등 |
| 163 | +- `Executor`를 `ThreadPoolExecutor`로 형변환 후 설정 가능 |
| 164 | + |
| 165 | + ```java |
| 166 | + public class MyAppThread extends Thread { |
| 167 | + public static final String DEFAULT_NAME = "MyAppThread"; |
| 168 | + private static volatile boolean debugLifecycle = false; |
| 169 | + private static final AtomicInteger created = new AtomicInteger(); |
| 170 | + private static final AtomicInteger alive = new AtomicInteger(); |
| 171 | + private static final Logger log = Logger.getAnonymousLogger(); |
| 172 | + |
| 173 | + public MyAppThread(Runnable r) { |
| 174 | + this(r, DEFAULT_NAME); |
| 175 | + } |
| 176 | + |
| 177 | + public MyAppThread(Runnable runnable, String name) { |
| 178 | + super(runnable, name + "-" + created.incrementAndGet()); |
| 179 | + setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { |
| 180 | + public void uncaughtException(Thread t, |
| 181 | + Throwable e) { |
| 182 | + log.log(Level.SEVERE, |
| 183 | + "UNCAUGHT in thread " + t.getName(), e); |
| 184 | + } |
| 185 | + }); |
| 186 | + } |
| 187 | + |
| 188 | + public void run() { |
| 189 | + // Copy debug flag to ensure consistent value throughout. |
| 190 | + boolean debug = debugLifecycle; |
| 191 | + if (debug) log.log(Level.FINE, "Created " + getName()); |
| 192 | + try { |
| 193 | + alive.incrementAndGet(); |
| 194 | + super.run(); |
| 195 | + } finally { |
| 196 | + alive.decrementAndGet(); |
| 197 | + if (debug) log.log(Level.FINE, "Exiting " + getName()); |
| 198 | + } |
| 199 | + } |
| 200 | + |
| 201 | + public static int getThreadsCreated() { |
| 202 | + return created.get(); |
| 203 | + } |
| 204 | + |
| 205 | + public static int getThreadsAlive() { |
| 206 | + return alive.get(); |
| 207 | + } |
| 208 | + |
| 209 | + public static boolean getDebug() { |
| 210 | + return debugLifecycle; |
| 211 | + } |
| 212 | + |
| 213 | + public static void setDebug(boolean b) { |
| 214 | + debugLifecycle = b; |
| 215 | + } |
| 216 | +}``` |
| 217 | + |
| 218 | +#### 설정 변경 차단하기 |
| 219 | +- `Executors.unconfigurableExecutorService(ExecutorService e)` |
| 220 | + - ExecutorService를 감싸서 외부에는 인터페이스만 노출 → 세부 설정 변경 차단 |
| 221 | +- 주의: `newSingleThreadExecutor()`는 내부적으로 ThreadPoolExecutor를 감싸 단일 스레드 정책 보장 |
| 222 | +- 외부 코드가 잘못 설정을 변경하지 못하도록 하려면 `unconfigurableExecutorService()` 사용 |
| 223 | + |
| 224 | +--- |
| 225 | + |
| 226 | +## ThreadPoolExecutor 상속 |
| 227 | + |
| 228 | +- `ThreadPoolExecutor`는 상속을 통해 기능 확장 가능 |
| 229 | +- 하위 클래스가 오버라이드할 수 있는 훅 메서드 제공 |
| 230 | + |
| 231 | +### 주요 훅 메서드 |
| 232 | +- **beforeExecute(Thread t, Runnable r)** |
| 233 | + - 작업 실행 전 호출 |
| 234 | + - 로그, 실행 시작 시점 기록, 모니터링/통계 활용 |
| 235 | + - 주의: 이 메서드에서 RuntimeException 발생 시 → 작업 실행 X, `afterExecute`도 호출되지 않음 |
| 236 | + |
| 237 | +- **afterExecute(Runnable r, Throwable t)** |
| 238 | + - 작업 실행 후 항상 호출됨 (정상 종료/예외와 무관) |
| 239 | + - 실행 결과 기록, 예외 처리, 실행 시간 측정 |
| 240 | + - `beforeExecute`에서 저장한 값을 ThreadLocal로 전달 가능 |
| 241 | + |
| 242 | +- **terminated()** |
| 243 | + - 모든 작업과 스레드가 종료된 뒤 한 번 호출 |
| 244 | + - 자원 반납, 로그 출력, 알람 발송, 최종 통계 수집 등에 활용 |
| 245 | + |
| 246 | +--- |
| 247 | + |
| 248 | +## 재귀 함수 병렬화 |
| 249 | + |
| 250 | +### 반복문 병렬화 |
| 251 | +- 반복문 내부에서 **복잡한 연산**이나 **블로킹 I/O** 수행 시, 각 반복 작업이 **독립적**이면 병렬화에 적합 |
| 252 | +- Executor 사용 시 순차 실행보다 빠르게 처리 가능 |
| 253 | +- `processInParallel()`은 작업 등록만 하고 바로 반환 → 빠른 처리 시작 |
| 254 | +- 모든 작업 종료까지 기다리려면 `ExecutorService.invokeAll()` 사용 |
| 255 | + |
| 256 | +### 재귀 함수 병렬화 |
| 257 | +- 재귀 함수도 각 단계 작업이 **이전 결과와 독립적**이면 병렬화 가능 |
| 258 | + |
| 259 | + ```java |
| 260 | + public <T> void sequentialRecursive(List<Node<T>> nodes, |
| 261 | + Collection<T> results) { |
| 262 | + for (Node<T> n : nodes) { |
| 263 | + results.add(n.compute()); |
| 264 | + sequentialRecursive(n.getChildren(), results); |
| 265 | + } |
| 266 | + } |
| 267 | + |
| 268 | + public <T> void parallelRecursive(final Executor exec, |
| 269 | + List<Node<T>> nodes, |
| 270 | + final Collection<T> results) { |
| 271 | + for (final Node<T> n : nodes) { |
| 272 | + exec.execute(new Runnable() { |
| 273 | + public void run() { |
| 274 | + results.add(n.compute()); |
| 275 | + } |
| 276 | + }); |
| 277 | + parallelRecursive(exec, n.getChildren(), results); |
| 278 | + } |
| 279 | + } |
| 280 | + |
| 281 | + public <T> Collection<T> getParallelResults(List<Node<T>> nodes) |
| 282 | + throws InterruptedException { |
| 283 | + ExecutorService exec = Executors.newCachedThreadPool(); |
| 284 | + Queue<T> resultQueue = new ConcurrentLinkedQueue<T>(); |
| 285 | + parallelRecursive(exec, nodes, resultQueue); |
| 286 | + exec.shutdown(); |
| 287 | + exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); |
| 288 | + return resultQueue; |
| 289 | + } |
| 290 | +``` |
| 291 | + |
| 292 | +- **퍼즐 프레임워크 예시** |
| 293 | + - 탐색 병렬 실행, 목표 상태 찾으면 즉시 중단 |
| 294 | + - `CountDownLatch`로 결과 한 번만 설정, 동시 접근은 락으로 보호 |
| 295 | + - 결과 없음 → 실행 중인 스레드 수 추적 → 더 이상 작업이 없으면 null 반환 |
| 296 | + |
| 297 | +- 추가 종료 조건 |
| 298 | + - 전체 실행 시간 제한 |
| 299 | + - 이동 횟수 제한 |
| 300 | + - 클라이언트 요청에 따른 중단 신호 처리 |
| 301 | + |
| 302 | + ```java |
| 303 | +public class PuzzleSolver <P,M> extends ConcurrentPuzzleSolver<P, M> { |
| 304 | + PuzzleSolver(Puzzle<P, M> puzzle) { |
| 305 | + super(puzzle); |
| 306 | + } |
| 307 | + |
| 308 | + private final AtomicInteger taskCount = new AtomicInteger(0); |
| 309 | + |
| 310 | + protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) { |
| 311 | + return new CountingSolverTask(p, m, n); |
| 312 | + } |
| 313 | + |
| 314 | + class CountingSolverTask extends SolverTask { |
| 315 | + CountingSolverTask(P pos, M move, PuzzleNode<P, M> prev) { |
| 316 | + super(pos, move, prev); |
| 317 | + taskCount.incrementAndGet(); |
| 318 | + } |
| 319 | + |
| 320 | + public void run() { |
| 321 | + try { |
| 322 | + super.run(); |
| 323 | + } finally { |
| 324 | + if (taskCount.decrementAndGet() == 0) |
| 325 | + solution.setValue(null); |
| 326 | + } |
| 327 | + } |
| 328 | + } |
| 329 | +} |
| 330 | +``` |
| 331 | + |
| 332 | + |
| 333 | + |
| 334 | + |
| 335 | + |
0 commit comments