|
| 1 | +# 동기화 클래스 구현 |
| 2 | +> 💡상태 의존적인 클래스란? |
| 3 | +> - 상태 기반 선행 조건을 가진 클래스 |
| 4 | +> - 상태 의존적인 클래스를 새로 구현하는 가장 간단한 방법: 기존의 동기화 클래스를 활용해 필요 기능 구현 |
| 5 | +> - ex) Semaphore, BlockingQueue |
| 6 | +
|
| 7 | +## 1. 상태 종속성 관리 |
| 8 | +### 상태 기반 조건이 만족되지 않을 떄 |
| 9 | +- 단일 스레드 프로그래밍은 앞으로도 상태 기반 조건이 만족될 가능성이 없으므로 반드시 오류 발생 |
| 10 | +- 병렬 프로그래밍은 다른 스레드가 상태 기반 조건을 변경할 수 있으므로 오류가 발생이 적고, 상태 기반 조건이 만족될 때까지 기다리는 경우가 많음 |
| 11 | + |
| 12 | +### 상태 종속적인 작업의 동기화 구조 |
| 13 | +- 상태 변수를 확인할 때는 락을 확보 |
| 14 | +- 선행 조건을 만족하지 않았다면 다른 스레드에서 상태 변수를 변경할 수 있도록 락을 풀고 기다림 |
| 15 | + ``` |
| 16 | + void blockingAction() thows InterruptedException { |
| 17 | + 상태 변수에 대한 락 확보 |
| 18 | + while (선행 조건이 만족하지 않음) { |
| 19 | + 확보했던 락을 풀어줌 |
| 20 | + 선행 조건이 만족할만한 시간만큼 대기 |
| 21 | + 인터럽트에 걸리거나 타임아웃이 걸리면 멈춤 |
| 22 | + 락을 다시 확보 |
| 23 | + } |
| 24 | + 작업 실행 |
| 25 | + 락 해제 |
| 26 | + } |
| 27 | + ``` |
| 28 | + ```java |
| 29 | + public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable { |
| 30 | + public void put(E e) throws InterruptedException { |
| 31 | + Objects.requireNonNull(e); |
| 32 | + ReentrantLock lock = this.lock; |
| 33 | + lock.lockInterruptibly(); |
| 34 | + |
| 35 | + try { |
| 36 | + while(this.count == this.items.length) { |
| 37 | + this.notFull.await(); |
| 38 | + } |
| 39 | + |
| 40 | + this.enqueue(e); |
| 41 | + } finally { |
| 42 | + lock.unlock(); |
| 43 | + } |
| 44 | + } |
| 45 | + } |
| 46 | + |
| 47 | +### 선행 조건을 만족하지 않은 경우 |
| 48 | +#### 1) 예외를 발생시키거나 또는 오류 값 반환 |
| 49 | +- 호출자는 호출할 때마다 예외 처리를 해줘야함 |
| 50 | +#### 2) 스핀 대기 방법 |
| 51 | + - 호출자가 선행 조건이 만족될 때까지 반복해서 확인 |
| 52 | + - 대기 시간 없이 다시 호출 -> CPU 자원 소모 |
| 53 | + - 짧은 시간 동안 대기 후 다시 호출 -> CPU 자원 소모 줄이지만 응답 시간 길어짐 (과다 대기 가능성) |
| 54 | +#### 3) 상태 의존 메서드 내부에서 재시도 반복(폴링/대기) |
| 55 | + - 호출자는 예외 처리나 재시도 코드 작성 필요 없음 |
| 56 | + - 대기 시간에 따라 응답 시간이 달라질 수 있음 |
| 57 | + - 인터럽트에 걸리거나 타임아웃이 걸리면 멈출 수 있도록 구현하는 것이 좋음 |
| 58 | +#### 4) 조건 큐 |
| 59 | + - 조건이 맞지 않으면 스레드를 멈추고, 원하는 조건에 도달하면 스레드를 깨움 |
| 60 | + - 조건 큐에는 스레드가 값으로 들어감 |
| 61 | + - 여러 스레드를 한 덩어리(wait set)로 관리. 특정 조건이 만족할 때까지 한꺼번에 대기 가능 |
| 62 | + - 모든 객체는 스스로를 조건 큐로 사용할 수 있음 |
| 63 | + - wait(), notify(), notifyAll() 메서드 제공 |
| 64 | + - wait(): 현재 스레드가 락을 반납하고 해당 객체의 조건 큐에서 대기 상태로 들어감 |
| 65 | + - notify(): 조건 큐에 있는 스레드 중 하나를 깨움 (랜덤) |
| 66 | + - notifyAll(): 조건 큐에 있는 모든 스레드를 깨움 |
| 67 | + - 자바 객체의 암묵적인 락과 조건 큐는 굉장히 밀접히 관련되어 있음 |
| 68 | + - 객체의 암묵적인 락을 확보한 상태에서만 wait(), notify(), notifyAll() 메서드 호출 가능 |
| 69 | + - 락 객체와 조건 큐 객체는 반드시 같아야 함 |
| 70 | + - 객체 내부의 상태를 확인하기 전에는 조건이 만족할 때까지 대기할 수 없고 |
| 71 | + - 객체 내부의 상태를 변경하지 못하는 한 해당 객체의 조건 큐에 있는 스레드를 깨울 수 없음 |
| 72 | + - 폴링/대기 방법과 작동하는 모습은 똑같지만 CPU 자원, 컨텍스트 스위치 비용, 응답 속도 측면에서 훨씬 효율적 |
| 73 | + - 타임아웃이 걸리면 멈출 수 있도록 구현하는 것이 좋음 |
| 74 | +```java |
| 75 | +@ThreadSafe |
| 76 | +public class BoundedBuffer<V> extends BaseBoundedBuffer<V> { |
| 77 | + public synchronized void put(V v) throws InterruptedException { |
| 78 | + while (isFull()) { |
| 79 | + wait(); |
| 80 | + } |
| 81 | + doPut(v); |
| 82 | + notifyAll(); |
| 83 | + } |
| 84 | +} |
| 85 | +``` |
| 86 | + |
| 87 | +## 2. 조건 큐 활용 |
| 88 | +### 조건 서술어 |
| 89 | +- 특정 기능이 상태 종속적이 되도록 만드는 선행 조건을 의미함 |
| 90 | +- 조건 서술어는 상태 변수를 기반으로 하고 있고, 상태 변수는 락으로 동기화되어 있으니 조건 서술어를 확인하거나 변경할 때는 반드시 락을 확보해야 함 |
| 91 | +- wait 메서드는 먼저 락을 해제하고 현재 스레드를 대기 상태로 둠 |
| 92 | +- 타임아웃 발생/인터럽트 발생/notify()/notifyAll() 메서드 호출을 통해 알림을 받을 때까지 대기 |
| 93 | +- 스레드가 대기 상태에서 깨어나면 다시 락을 확보하고 wait 메서드 호출 직후의 코드부터 실행 재개 |
| 94 | + - 락 확보함에 있어 우선 순위는 갖지 않음 |
| 95 | + |
| 96 | +### wait 메서드 |
| 97 | +- 대기 상태 스레드가 깨어났다고 해서 조건 서술어가 만족된다는 보장은 없음 |
| 98 | + - wait 메서드가 알아서 리턴되는 경우가 있음 |
| 99 | + - notify/notifyAll 메서드가 호출되었지만 다른 스레드가 먼저 락을 확보해 조건 서술어를 변경했을 수도 있음 |
| 100 | + - 동일한 조건 큐를 대상으로 하는 다른 조건 서술어가 만족돼 호출한 것일 수 있음 |
| 101 | +- 하나의 조건 큐에 여러 조건 서술어가 걸려 있을 수 있음 |
| 102 | + |
| 103 | +#### 조건부 wait 메서드 사용 시 주의 사항 |
| 104 | +- 항상 조건 서술어를 명시해야 한다. |
| 105 | +- wait 메서드 호출 전과 리턴된 이후 모두 조건 서술어를 확인해야 한다. |
| 106 | +- 조건 서술어를 확인하는 데 관련된 모든 상태 변수는 해당 조건 큐 락에 의해 동기화되어야 한다. |
| 107 | +- wait, notify, notifyAll 메서드는 반드시 해당 조건 큐 락을 확보한 상태에서 호출해야 한다. |
| 108 | +- 조건 서술어를 확인한 후 실제 작업을 실행해 작업이 끝날 때까지 락을 확보해야 한다. |
| 109 | + |
| 110 | +> 주의 사항을 지키지 않은 경우, 놓친 신호(특정 스레드가 이미 만족한 조건 서술어를 확인하지 못해 대기 상태에 들어가는 상황) 발생 |
| 111 | +
|
| 112 | +### notify/notifyAll 메서드 |
| 113 | +- 여러 개의 스레드가 하나의 조건 큐를 놓고 대기 상태에 들어가 있을 수 있으므로 notify 메서드보단 notifyAll 메서드를 사용하는 것이 더 안전 |
| 114 | +- notify 메서드는 조건 서술어가 여러 개 걸려 있을 때 조건이 만족되지 않는 스레드를 깨울 수 있음 |
| 115 | +- notify 메서드를 사용해도 되는 경우 |
| 116 | + - 단일 조건에 따른 대기 상태에서 깨우는 경우 |
| 117 | + - 한 번에 하나씩 처리하는 경우: 하나의 스레드만 실행시킬 수 있는 경우 |
| 118 | +- 대부분의 경우 위 두 가지 조건을 만족하지 못해 notifyAll 메서드를 사용하는 것이 좋음 |
| 119 | +- 조건부 알림 방법 |
| 120 | + - notifyAll 메서드를 호출하여 모든 스레드를 깨우는 건 비효율적일 수 있음 |
| 121 | + - 대기 상태에서 빠져나올 수 있는 상태를 만들어주는 경우에만 알림 메서드를 호출 |
| 122 | + |
| 123 | +### 하위 클래스 안정성 문제 |
| 124 | +- 상태 기반으로 동작하는 클래스는 하위 클래스에게 대기와 알림 구조를 완전하게 공개하고 그 구조를 문서로 남기거나 |
| 125 | +- 하위 클래스에서 대기와 알림 구조에 전혀 접근할 수 없도록 제한해야 함 |
| 126 | +- 일반적으로 조건 큐를 클래스 내부 캡슐화해서 외부에서는 조건 큐를 사용할 수 없도록 막는 게 좋음 |
| 127 | + |
| 128 | +## 3. 명시적인 조건 객체 |
| 129 | +- 암묵적인 락을 일반화한 형태가 Lock 클래스인 것처럼 암묵적인 조건 큐를 일반화한 형태는 Condition 클래스 |
| 130 | + - Condition 클래스에선 wait, notify, notifyAll 메서드 대신 await, signal, signalAll 메서드 사용 |
| 131 | + - 주의할 점: Condition 객체도 Object 클래스를 상속받아 wait, notify, notifyAll 메서드를 가지고 있으므로 사용하지 않도록 주의 |
| 132 | +- 암묵적인 락 하나는 조건 큐를 하나만 가질 수 있지만, Condition 클래스는 Lock 하나에 여러 개의 조건으로 대기할 수 있음 |
| 133 | + - Condition 객체를 활용하면 단일 알림 조건을 만족시킬 수 있음 (signalAll 메서드 대신 signal 메서드 사용 가능) |
| 134 | +- Condition 객체는 Lock 객체의 공정성을 그대로 물려받음 |
| 135 | +- Condition 객체 & ReentrantLock 클래스 조합 vs 암묵적인 조건 큐 & synchronized 키워드 조합 |
| 136 | + - 공정한 큐 관리 방법이나 하나의 락에서 여러 개의 조건 큐가 필요한 경우 Condition 객체 & ReentrantLock 클래스 조합이 더 나음 |
| 137 | + - 그 외에는 암묵적인 조건 큐 & synchronized 키워드 조합이 더 나음 |
| 138 | +```java |
| 139 | +@ThreadSafe |
| 140 | +public class ConditionBoundedBuffer<V> { |
| 141 | + protected final Lock lock = new ReentrantLock(); |
| 142 | + private final Condition notFull = lock.newCondition(); |
| 143 | + private final Condition notEmpty = lock.newCondition(); |
| 144 | + private final V[] items = (V[]) new Object[100]; |
| 145 | + private int tail, head, count; |
| 146 | + |
| 147 | + public void put(V v) throws InterruptedException { |
| 148 | + lock.lock(); |
| 149 | + try { |
| 150 | + while (count == items.length) { |
| 151 | + notFull.await(); |
| 152 | + } |
| 153 | + items[tail] = v; |
| 154 | + if (++tail == items.length) { |
| 155 | + tail = 0; |
| 156 | + } |
| 157 | + ++count; |
| 158 | + notEmpty.signal(); |
| 159 | + } finally { |
| 160 | + lock.unlock(); |
| 161 | + } |
| 162 | + } |
| 163 | +} |
| 164 | +``` |
| 165 | + |
| 166 | +## 4. 동기화 클래스 내부 구조 |
| 167 | +- AQS(AbstractQueuedSynchronizer) 클래스는 락이나 동기화 클래스를 구현할 때 유용한 기능을 제공 |
| 168 | +- AQS 기반 동기화 클래스는 대기 상태에 들어갈 수 있는 지점이 단 한군데여서 컨텍스트 스위치 비용이 적음 |
| 169 | +- java.util.concurrent 패키지의 동기화 클래스 모두 AQS 클래스를 활용해 구현됨 |
| 170 | + |
| 171 | +## 5. AbstractQueuedSynchronizer |
| 172 | +- AQS 기반 동기화 클래스가 담당하는 가장 기본 연산은 확보와 해체 |
| 173 | + - 확보 연산: 상태 기반으로 동작하며 리소스에 대한 접근을 얻는 과정 (대기 상태에 들어갈 수 있음) |
| 174 | + - 해제 연산: 접근이 끝나 리소스를 반납하는 과정 (확보 연산에서 대기 중인 스레드를 깨움) |
| 175 | +- AQS 클래스는 상태 변수를 int 타입으로 관리 |
| 176 | + - ReentrantLock 클래스는 소속된 스레드에서 락을 몇 번 확보했는지 세기 위해 상태 변수를 사용 |
| 177 | + - Semaphore 클래스는 사용 가능한 퍼밋 수를 세기 위해 상태 변수를 사용 |
| 178 | +- 배타적 확보 연산 vs 배타적이지 않은 확보 연산 |
| 179 | + - 배타적(Exclusive) 확보 연산 동기화 클래스 |
| 180 | + - 한 번에 단 하나의 스레드만 리소스를 확보할 수 있음 |
| 181 | + - 예시: ReentrantLock |
| 182 | + - 비배타적(Shared) 확보 연산 동기화 클래스 |
| 183 | + - 여러 스레드가 동시에 리소스를 확보할 수 있음 |
| 184 | + - 예시: Semaphore(퍼밋 수 > 1), CountDownLatch, ReentrantReadWriteLock의 읽기 락 |
| 185 | + |
| 186 | +## 6. java.util.concurrent 패키지의 동기화 클래스에서 AQS 활용 모습 |
| 187 | +### ReentrantLock |
| 188 | +- 배타적인 확보 연산만 제공하는 동기화 클래스 |
| 189 | +- 동기화 상태 값을 확보된 락의 개수를 세는 데 사용 |
| 190 | +- owner 변수를 통해 현재 락을 확보한 스레드를 추적 |
| 191 | + - tryRelease 메서드에서 unlock 메서드 호출 전 owner 변수로 현재 스레드인지 확인 |
| 192 | + - tryAcquire 메서드에서 재진입 시도인지 최초 진입 시도인지 확인하기 위해 owner 변수 업데이트 |
| 193 | +```java |
| 194 | +public class ReentrantLock implements Lock, Serializable { |
| 195 | + abstract static class Sync extends AbstractQueuedSynchronizer { |
| 196 | + @ReservedStackAccess |
| 197 | + protected final boolean tryRelease(int releases) { |
| 198 | + int c = this.getState() - releases; |
| 199 | + if (this.getExclusiveOwnerThread() != Thread.currentThread()) { |
| 200 | + throw new IllegalMonitorStateException(); |
| 201 | + } else { |
| 202 | + boolean free = c == 0; |
| 203 | + if (free) { |
| 204 | + this.setExclusiveOwnerThread((Thread)null); |
| 205 | + } |
| 206 | + |
| 207 | + this.setState(c); |
| 208 | + return free; |
| 209 | + } |
| 210 | + } |
| 211 | + } |
| 212 | + static final class NonfairSync extends Sync { |
| 213 | + protected final boolean tryAcquire(int acquires) { |
| 214 | + if (this.getState() == 0 && this.compareAndSetState(0, acquires)) { |
| 215 | + this.setExclusiveOwnerThread(Thread.currentThread()); |
| 216 | + return true; |
| 217 | + } else { |
| 218 | + return false; |
| 219 | + } |
| 220 | + } |
| 221 | + } |
| 222 | +} |
| 223 | +``` |
| 224 | + |
| 225 | +### Semaphore |
| 226 | +- 동기화 상태 값을 현재 남아 있는 퍼밋 수로 사용 |
| 227 | +- compareAndSetState 메서드를 활용해 퍼밋 수를 원자적으로 증가/감소 |
| 228 | +```java |
| 229 | +public class Semaphore implements Serializable { |
| 230 | + static final class FairSync extends Sync { |
| 231 | + protected int tryAcquireShared(int acquires) { |
| 232 | + int available; |
| 233 | + int remaining; |
| 234 | + do { |
| 235 | + if (this.hasQueuedPredecessors()) { // 공정성 보장을 위해 대기열에 먼저 온 스레드가 있는지 확인 |
| 236 | + return -1; |
| 237 | + } |
| 238 | + |
| 239 | + available = this.getState(); |
| 240 | + remaining = available - acquires; |
| 241 | + } while(remaining >= 0 && !this.compareAndSetState(available, remaining)); |
| 242 | + |
| 243 | + return remaining; |
| 244 | + } |
| 245 | + } |
| 246 | +} |
| 247 | +``` |
| 248 | + |
| 249 | +### ReentrantReadWriteLock |
| 250 | +- 상태 변수를 읽기 락과 쓰기 락의 확보 상태를 모두 나타내는 데 사용 |
| 251 | + - 상태 변수의 상위 16비트: 읽기 락의 확보 횟수 |
| 252 | + - 상태 변수의 하위 16비트: 쓰기 락의 확보 횟수 |
| 253 | +- 읽기 락은 비배타적 확보/해제 연산, 쓰기 락은 배타적 확보/해제 연산 |
| 254 | +```java |
| 255 | +public class ReentrantReadWriteLock implements ReadWriteLock, Serializable { |
| 256 | + abstract static class Sync extends AbstractQueuedSynchronizer { |
| 257 | + static int sharedCount(int c) { return c >>> 16; } // 상위 16bit |
| 258 | + static int exclusiveCount(int c) { return c & 0xFFFF; } // 하위 16bit |
| 259 | + |
| 260 | + @ReservedStackAccess |
| 261 | + final boolean tryWriteLock() { |
| 262 | + Thread current = Thread.currentThread(); |
| 263 | + int c = this.getState(); |
| 264 | + if (c != 0) { |
| 265 | + int w = exclusiveCount(c); |
| 266 | + if (w == 0 || current != this.getExclusiveOwnerThread()) { |
| 267 | + return false; |
| 268 | + } |
| 269 | + |
| 270 | + if (w == 65535) { |
| 271 | + throw new Error("Maximum lock count exceeded"); |
| 272 | + } |
| 273 | + } |
| 274 | + |
| 275 | + if (!this.compareAndSetState(c, c + 1)) { |
| 276 | + return false; |
| 277 | + } else { |
| 278 | + this.setExclusiveOwnerThread(current); |
| 279 | + return true; |
| 280 | + } |
| 281 | + } |
| 282 | + |
| 283 | + @ReservedStackAccess |
| 284 | + final boolean tryReadLock() { |
| 285 | + Thread current = Thread.currentThread(); |
| 286 | + |
| 287 | + int c; |
| 288 | + int r; |
| 289 | + do { |
| 290 | + c = this.getState(); |
| 291 | + if (exclusiveCount(c) != 0 && this.getExclusiveOwnerThread() != current) { |
| 292 | + return false; |
| 293 | + } |
| 294 | + |
| 295 | + r = sharedCount(c); |
| 296 | + if (r == 65535) { |
| 297 | + throw new Error("Maximum lock count exceeded"); |
| 298 | + } |
| 299 | + } while(!this.compareAndSetState(c, c + 65536)); |
| 300 | + |
| 301 | + if (r == 0) { |
| 302 | + this.firstReader = current; |
| 303 | + this.firstReaderHoldCount = 1; |
| 304 | + } else if (this.firstReader == current) { |
| 305 | + ++this.firstReaderHoldCount; |
| 306 | + } else { |
| 307 | + HoldCounter rh = this.cachedHoldCounter; |
| 308 | + if (rh != null && rh.tid == LockSupport.getThreadId(current)) { |
| 309 | + if (rh.count == 0) { |
| 310 | + this.readHolds.set(rh); |
| 311 | + } |
| 312 | + } else { |
| 313 | + this.cachedHoldCounter = rh = (HoldCounter)this.readHolds.get(); |
| 314 | + } |
| 315 | + |
| 316 | + ++rh.count; |
| 317 | + } |
| 318 | + |
| 319 | + return true; |
| 320 | + } |
| 321 | + } |
| 322 | +} |
| 323 | +``` |
0 commit comments