Skip to content

Commit 3e56089

Browse files
authored
Centralise more locks (#8389)
Refactor: Centralize lock management This change centralizes the management of locks and conditions by introducing a centralised Locks registry. This allows for easier debugging and potential future improvements to lock usage. The `withLock` and `newLockCondition` functions are now invoked through the `Locks` object, providing a central point of access.
1 parent 02d69bf commit 3e56089

File tree

9 files changed

+75
-40
lines changed

9 files changed

+75
-40
lines changed

okhttp-testing-support/src/main/kotlin/okhttp3/OkHttpClientTestRule.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ import java.util.logging.Level
2525
import java.util.logging.LogManager
2626
import java.util.logging.LogRecord
2727
import java.util.logging.Logger
28-
import kotlin.concurrent.withLock
2928
import okhttp3.internal.buildConnectionPool
3029
import okhttp3.internal.concurrent.TaskRunner
30+
import okhttp3.internal.connection.Locks.withLock
3131
import okhttp3.internal.connection.RealConnectionPool
3232
import okhttp3.internal.http2.Http2
3333
import okhttp3.internal.taskRunnerInternal
@@ -234,7 +234,7 @@ class OkHttpClientTestRule : BeforeEachCallback, AfterEachCallback {
234234
// a test timeout failure.
235235
val waitTime = (entryTime + 1_000_000_000L - System.nanoTime())
236236
if (!queue.idleLatch().await(waitTime, TimeUnit.NANOSECONDS)) {
237-
TaskRunner.INSTANCE.lock.withLock {
237+
TaskRunner.INSTANCE.withLock {
238238
TaskRunner.INSTANCE.cancelAll()
239239
}
240240
fail<Unit>("Queue still active after 1000 ms")

okhttp-testing-support/src/main/kotlin/okhttp3/internal/concurrent/TaskFaker.kt

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
@file:Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
17+
1618
package okhttp3.internal.concurrent
1719

1820
import assertk.assertThat
@@ -23,9 +25,9 @@ import java.util.concurrent.BlockingQueue
2325
import java.util.concurrent.Executors
2426
import java.util.concurrent.TimeUnit
2527
import java.util.logging.Logger
26-
import kotlin.concurrent.withLock
2728
import okhttp3.OkHttpClient
2829
import okhttp3.TestUtil.threadFactory
30+
import okhttp3.internal.connection.Locks.withLock
2931

3032
/**
3133
* Runs a [TaskRunner] in a controlled environment so that everything is sequential and
@@ -170,7 +172,7 @@ class TaskFaker : Closeable {
170172
fun advanceUntil(newTime: Long) {
171173
taskRunner.assertThreadDoesntHoldLock()
172174

173-
taskRunner.lock.withLock {
175+
taskRunner.withLock {
174176
check(currentTask == TestThreadSerialTask)
175177
nanoTime = newTime
176178
yieldUntil(ResumePriority.AfterOtherTasks)
@@ -181,7 +183,7 @@ class TaskFaker : Closeable {
181183
fun assertNoMoreTasks() {
182184
taskRunner.assertThreadDoesntHoldLock()
183185

184-
taskRunner.lock.withLock {
186+
taskRunner.withLock {
185187
assertThat(activeThreads).isEqualTo(0)
186188
}
187189
}
@@ -211,7 +213,7 @@ class TaskFaker : Closeable {
211213
fun runNextTask() {
212214
taskRunner.assertThreadDoesntHoldLock()
213215

214-
taskRunner.lock.withLock {
216+
taskRunner.withLock {
215217
val contextSwitchCountBefore = contextSwitchCount
216218
yieldUntil(ResumePriority.BeforeOtherTasks) {
217219
contextSwitchCount > contextSwitchCountBefore
@@ -221,7 +223,7 @@ class TaskFaker : Closeable {
221223

222224
/** Sleep until [durationNanos] elapses. For use by the task threads. */
223225
fun sleep(durationNanos: Long) {
224-
taskRunner.lock.withLock {
226+
taskRunner.withLock {
225227
val sleepUntil = nanoTime + durationNanos
226228
yieldUntil { nanoTime >= sleepUntil }
227229
}
@@ -233,7 +235,7 @@ class TaskFaker : Closeable {
233235
*/
234236
fun yield() {
235237
taskRunner.assertThreadDoesntHoldLock()
236-
taskRunner.lock.withLock {
238+
taskRunner.withLock {
237239
yieldUntil()
238240
}
239241
}
@@ -332,7 +334,7 @@ class TaskFaker : Closeable {
332334
runnable.run()
333335
require(currentTask == this) { "unexpected current task: $currentTask" }
334336
} finally {
335-
taskRunner.lock.withLock {
337+
taskRunner.withLock {
336338
activeThreads--
337339
startNextTask()
338340
}
@@ -358,7 +360,7 @@ class TaskFaker : Closeable {
358360
timeout: Long,
359361
unit: TimeUnit,
360362
): T? {
361-
taskRunner.lock.withLock {
363+
taskRunner.withLock {
362364
val waitUntil = nanoTime + unit.toNanos(timeout)
363365
while (true) {
364366
val result = poll()
@@ -371,7 +373,7 @@ class TaskFaker : Closeable {
371373
}
372374

373375
override fun put(element: T) {
374-
taskRunner.lock.withLock {
376+
taskRunner.withLock {
375377
delegate.put(element)
376378
editCount++
377379
}

okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskQueue.kt

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ package okhttp3.internal.concurrent
1818
import java.util.concurrent.CountDownLatch
1919
import java.util.concurrent.RejectedExecutionException
2020
import java.util.concurrent.locks.ReentrantLock
21-
import kotlin.concurrent.withLock
2221
import okhttp3.internal.assertNotHeld
22+
import okhttp3.internal.connection.Locks.withLock
2323
import okhttp3.internal.okHttpName
2424

2525
/**
@@ -32,7 +32,7 @@ class TaskQueue internal constructor(
3232
internal val taskRunner: TaskRunner,
3333
internal val name: String,
3434
) {
35-
val lock: ReentrantLock = ReentrantLock()
35+
internal val lock: ReentrantLock = ReentrantLock()
3636

3737
internal var shutdown = false
3838

@@ -50,7 +50,7 @@ class TaskQueue internal constructor(
5050
* currently-executing task unless it is also scheduled for future execution.
5151
*/
5252
val scheduledTasks: List<Task>
53-
get() = taskRunner.lock.withLock { futureTasks.toList() }
53+
get() = taskRunner.withLock { futureTasks.toList() }
5454

5555
/**
5656
* Schedules [task] for execution in [delayNanos]. A task may only have one future execution
@@ -66,7 +66,7 @@ class TaskQueue internal constructor(
6666
task: Task,
6767
delayNanos: Long = 0L,
6868
) {
69-
taskRunner.lock.withLock {
69+
taskRunner.withLock {
7070
if (shutdown) {
7171
if (task.cancelable) {
7272
taskRunner.logger.taskLog(task, this) { "schedule canceled (queue is shutdown)" }
@@ -126,7 +126,7 @@ class TaskQueue internal constructor(
126126

127127
/** Returns a latch that reaches 0 when the queue is next idle. */
128128
fun idleLatch(): CountDownLatch {
129-
taskRunner.lock.withLock {
129+
taskRunner.withLock {
130130
// If the queue is already idle, that's easy.
131131
if (activeTask == null && futureTasks.isEmpty()) {
132132
return CountDownLatch(0)
@@ -208,7 +208,7 @@ class TaskQueue internal constructor(
208208
fun cancelAll() {
209209
lock.assertNotHeld()
210210

211-
taskRunner.lock.withLock {
211+
taskRunner.withLock {
212212
if (cancelAllAndDecide()) {
213213
taskRunner.kickCoordinator(this)
214214
}
@@ -218,7 +218,7 @@ class TaskQueue internal constructor(
218218
fun shutdown() {
219219
lock.assertNotHeld()
220220

221-
taskRunner.lock.withLock {
221+
taskRunner.withLock {
222222
shutdown = true
223223
if (cancelAllAndDecide()) {
224224
taskRunner.kickCoordinator(this)

okhttp/src/main/kotlin/okhttp3/internal/concurrent/TaskRunner.kt

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,11 @@ import java.util.concurrent.TimeUnit
2323
import java.util.concurrent.locks.Condition
2424
import java.util.concurrent.locks.ReentrantLock
2525
import java.util.logging.Logger
26-
import kotlin.concurrent.withLock
2726
import okhttp3.internal.addIfAbsent
2827
import okhttp3.internal.assertHeld
2928
import okhttp3.internal.concurrent.TaskRunner.Companion.INSTANCE
29+
import okhttp3.internal.connection.Locks.newLockCondition
30+
import okhttp3.internal.connection.Locks.withLock
3031
import okhttp3.internal.okHttpName
3132
import okhttp3.internal.threadFactory
3233

@@ -45,8 +46,8 @@ class TaskRunner(
4546
val backend: Backend,
4647
internal val logger: Logger = TaskRunner.logger,
4748
) {
48-
val lock: ReentrantLock = ReentrantLock()
49-
val condition: Condition = lock.newCondition()
49+
internal val lock: ReentrantLock = ReentrantLock()
50+
val condition: Condition = lock.newLockCondition()
5051

5152
private var nextQueueName = 10000
5253
private var coordinatorWaiting = false
@@ -75,7 +76,7 @@ class TaskRunner(
7576
var incrementedRunCallCount = false
7677
while (true) {
7778
val task =
78-
this@TaskRunner.lock.withLock {
79+
this@TaskRunner.withLock {
7980
if (!incrementedRunCallCount) {
8081
incrementedRunCallCount = true
8182
runCallCount++
@@ -91,7 +92,7 @@ class TaskRunner(
9192
} finally {
9293
// If the task is crashing start another thread to service the queues.
9394
if (!completedNormally) {
94-
lock.withLock {
95+
this@TaskRunner.withLock {
9596
startAnotherThread()
9697
}
9798
}
@@ -139,7 +140,7 @@ class TaskRunner(
139140
try {
140141
delayNanos = task.runOnce()
141142
} finally {
142-
lock.withLock {
143+
this.withLock {
143144
afterRun(task, delayNanos)
144145
}
145146
currentThread.name = oldName
@@ -264,7 +265,7 @@ class TaskRunner(
264265
}
265266

266267
fun newQueue(): TaskQueue {
267-
val name = lock.withLock { nextQueueName++ }
268+
val name = this.withLock { nextQueueName++ }
268269
return TaskQueue(this, "Q$name")
269270
}
270271

@@ -273,7 +274,7 @@ class TaskRunner(
273274
* necessarily track queues that have no tasks scheduled.
274275
*/
275276
fun activeQueues(): List<TaskQueue> {
276-
lock.withLock {
277+
this.withLock {
277278
return busyQueues + readyQueues
278279
}
279280
}

okhttp/src/main/kotlin/okhttp3/internal/connection/ConnectPlan.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import java.security.cert.X509Certificate
2626
import java.util.concurrent.TimeUnit
2727
import javax.net.ssl.SSLPeerUnverifiedException
2828
import javax.net.ssl.SSLSocket
29-
import kotlin.concurrent.withLock
3029
import okhttp3.CertificatePinner
3130
import okhttp3.ConnectionSpec
3231
import okhttp3.Handshake

okhttp/src/main/kotlin/okhttp3/internal/connection/Locks.kt

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717

1818
package okhttp3.internal.connection
1919

20+
import java.util.concurrent.locks.Condition
21+
import java.util.concurrent.locks.ReentrantLock
2022
import kotlin.concurrent.withLock
2123
import kotlin.contracts.ExperimentalContracts
2224
import kotlin.contracts.InvocationKind
2325
import kotlin.contracts.contract
2426
import okhttp3.Dispatcher
27+
import okhttp3.internal.concurrent.TaskQueue
28+
import okhttp3.internal.concurrent.TaskRunner
2529
import okhttp3.internal.http2.Http2Connection
2630
import okhttp3.internal.http2.Http2Stream
2731
import okhttp3.internal.http2.Http2Writer
@@ -32,34 +36,62 @@ import okhttp3.internal.http2.Http2Writer
3236
internal object Locks {
3337
inline fun <T> Dispatcher.withLock(action: () -> T): T {
3438
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
35-
return lock.withLock(action)
39+
return lock.runWithLock(action)
3640
}
3741

3842
inline fun <T> RealConnection.withLock(action: () -> T): T {
3943
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
40-
return lock.withLock(action)
44+
return lock.runWithLock(action)
4145
}
4246

4347
inline fun <T> RealCall.withLock(action: () -> T): T {
4448
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
45-
return lock.withLock(action)
49+
return lock.runWithLock(action)
4650
}
4751

4852
inline fun <T> Http2Connection.withLock(action: () -> T): T {
4953
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
50-
return lock.withLock(action)
54+
return lock.runWithLock(action)
5155
}
5256

5357
inline fun <T> Http2Stream.withLock(action: () -> T): T {
5458
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
55-
return lock.withLock(action)
59+
return lock.runWithLock(action)
5660
}
5761

58-
inline fun <T> Http2Writer.withLock(action: () -> T): T {
62+
inline fun <T> TaskRunner.withLock(action: () -> T): T {
63+
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
64+
return lock.runWithLock(action)
65+
}
66+
67+
inline fun <T> TaskQueue.withLock(action: () -> T): T {
5968
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
69+
return lock.runWithLock(action)
70+
}
6071

72+
inline fun <T> Http2Writer.withLock(action: () -> T): T {
6173
// TODO can we assert we don't have the connection lock?
6274

63-
return lock.withLock(action)
75+
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
76+
return lock.runWithLock(action)
77+
}
78+
79+
/**
80+
* A no cost (inlined) alias to [ReentrantLock#newCondition] for an OkHttp Lock.
81+
* No function on its own but places a central place that all conditions go through to allow
82+
* temporary debugging.
83+
*/
84+
internal fun ReentrantLock.newLockCondition(): Condition {
85+
return this.newCondition()
86+
}
87+
88+
/**
89+
* A no cost (inlined) alias to [ReentrantLock#withLock] for an OkHttp Lock.
90+
* No function on its own but places a central place that all locks go through to allow
91+
* temporary debugging.
92+
*/
93+
inline fun <T> ReentrantLock.runWithLock(action: () -> T): T {
94+
contract { callsInPlace(action, InvocationKind.EXACTLY_ONCE) }
95+
return withLock(action)
6496
}
6597
}

okhttp/src/main/kotlin/okhttp3/internal/connection/RealConnection.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
2626
import java.util.concurrent.locks.ReentrantLock
2727
import javax.net.ssl.SSLPeerUnverifiedException
2828
import javax.net.ssl.SSLSocket
29-
import kotlin.concurrent.withLock
3029
import okhttp3.Address
3130
import okhttp3.Connection
3231
import okhttp3.ConnectionListener
@@ -335,7 +334,7 @@ class RealConnection(
335334
return http2Connection.isHealthy(nowNs)
336335
}
337336

338-
val idleDurationNs = lock.withLock { nowNs - idleAtNs }
337+
val idleDurationNs = this.withLock { nowNs - idleAtNs }
339338
if (idleDurationNs >= IDLE_CONNECTION_HEALTHY_NS && doExtensiveChecks) {
340339
return socket.isHealthy(source)
341340
}
@@ -354,7 +353,7 @@ class RealConnection(
354353
connection: Http2Connection,
355354
settings: Settings,
356355
) {
357-
lock.withLock {
356+
this.withLock {
358357
val oldLimit = allocationLimit
359358
allocationLimit = settings.getMaxConcurrentStreams()
360359

@@ -398,7 +397,7 @@ class RealConnection(
398397
e: IOException?,
399398
) {
400399
var noNewExchangesEvent = false
401-
lock.withLock {
400+
this.withLock {
402401
if (e is StreamResetException) {
403402
when {
404403
e.errorCode == ErrorCode.REFUSED_STREAM -> {

okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Connection.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import okhttp3.internal.EMPTY_HEADERS
2727
import okhttp3.internal.assertThreadDoesntHoldLock
2828
import okhttp3.internal.closeQuietly
2929
import okhttp3.internal.concurrent.TaskRunner
30+
import okhttp3.internal.connection.Locks.newLockCondition
3031
import okhttp3.internal.connection.Locks.withLock
3132
import okhttp3.internal.http2.ErrorCode.REFUSED_STREAM
3233
import okhttp3.internal.http2.Settings.Companion.DEFAULT_INITIAL_WINDOW_SIZE
@@ -56,7 +57,7 @@ import okio.source
5657
@Suppress("NAME_SHADOWING")
5758
class Http2Connection internal constructor(builder: Builder) : Closeable {
5859
internal val lock: ReentrantLock = ReentrantLock()
59-
internal val condition: Condition = lock.newCondition()
60+
internal val condition: Condition = lock.newLockCondition()
6061

6162
// Internal state of this connection is guarded by 'lock'. No blocking operations may be
6263
// performed while holding this lock!

0 commit comments

Comments
 (0)