Skip to content

Commit c96f66d

Browse files
committed
Add AsyncCallback.readWriteMutex
Closes #816
1 parent 4ed9fa0 commit c96f66d

File tree

3 files changed

+143
-6
lines changed

3 files changed

+143
-6
lines changed

core/src/main/scala/japgolly/scalajs/react/AsyncCallback.scala

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,88 @@ object AsyncCallback {
365365
/** Creates a new (non-reentrant) mutex. */
366366
def mutex: CallbackTo[Mutex] =
367367
CallbackTo(new Mutex)
368+
369+
// ===================================================================================================================
370+
371+
final class ReadWriteMutex private[AsyncCallback]() {
372+
373+
// Whether it's a read or write mutex is determined by readers being > 0 or not
374+
private var mutex: Option[AsyncCallback.Barrier] =
375+
None
376+
377+
private var readers =
378+
0
379+
380+
private val releaseMutex: Callback =
381+
CallbackTo {
382+
if (readers == 0) {
383+
val old = mutex
384+
mutex = None
385+
old
386+
} else
387+
None
388+
}.flatMap(Callback.traverseOption(_)(_.complete))
389+
390+
private val releaseReader: Callback =
391+
CallbackTo {
392+
readers -= 1
393+
} >> releaseMutex
394+
395+
/** Wrap a [[AsyncCallback]] so that it executes in the write-mutex.
396+
* There can only be one writer active at one time.
397+
*
398+
* Note: THIS IS NOT RE-ENTRANT. Calling this from within the read or write mutex will block.
399+
*/
400+
def write[A](ac: AsyncCallback[A]): AsyncCallback[A] =
401+
AsyncCallback.byName {
402+
403+
mutex match {
404+
case None =>
405+
// Mutex empty
406+
val b = AsyncCallback.barrier.runNow()
407+
mutex = Some(b)
408+
ac.finallyRunSync(releaseMutex)
409+
410+
case Some(b) =>
411+
// Mutex in use
412+
b.await >> write(ac)
413+
}
414+
}
415+
416+
/** Wrap a [[AsyncCallback]] so that it executes in the read-mutex.
417+
* There can be many readers active at one time.
418+
*
419+
* Note: Calling this from within the write-mutex will block.
420+
*/
421+
def read[A](ac: AsyncCallback[A]): AsyncCallback[A] =
422+
AsyncCallback.byName {
423+
424+
mutex match {
425+
case None =>
426+
// Mutex empty
427+
val b = AsyncCallback.barrier.runNow()
428+
mutex = Some(b)
429+
assert(readers == 0)
430+
readers = 1
431+
ac.finallyRunSync(releaseReader)
432+
433+
case Some(b) =>
434+
if (readers > 0) {
435+
// Read-mutex in use
436+
readers += 1
437+
ac.finallyRunSync(releaseReader)
438+
439+
} else {
440+
// Write-mutex in use
441+
b.await >> read(ac)
442+
}
443+
}
444+
}
445+
}
446+
447+
/** Creates a new (non-reentrant) read/write mutex. */
448+
def readWriteMutex: CallbackTo[ReadWriteMutex] =
449+
CallbackTo(new ReadWriteMutex)
368450
}
369451

370452
// █████████████████████████████████████████████████████████████████████████████████████████████████████████████████████

doc/changelog/1.7.7.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,27 @@ This entire release is focused on `AsyncCallback`.
4848
}
4949
```
5050

51+
* Added `readWriteMutex` which returns a `AsyncCallback.ReadWriteMutex`:
52+
53+
```scala
54+
AsyncCallback.ReadWriteMutex {
55+
56+
/** Wrap a AsyncCallback so that it executes in the write-mutex.
57+
* There can only be one writer active at one time.
58+
*
59+
* Note: THIS IS NOT RE-ENTRANT. Calling this from within the read or write mutex will block.
60+
*/
61+
def write[A](ac: AsyncCallback[A]): AsyncCallback[A]
62+
63+
/** Wrap a AsyncCallback so that it executes in the read-mutex.
64+
* There can be many readers active at one time.
65+
*
66+
* Note: Calling this from within the write-mutex will block.
67+
*/
68+
def read[A](ac: AsyncCallback[A]): AsyncCallback[A]
69+
}
70+
```
71+
5172
* You can now add a `_` suffix to the following to return an `AsyncCallback[Unit]` and be more efficient under-the-hood:
5273

5374
* `traverse`

test/src/test/scala/japgolly/scalajs/react/core/AsyncCallbackTest.scala

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import japgolly.scalajs.react.{AsyncCallback, Callback}
66
import scala.concurrent.ExecutionContext.Implicits.global
77
import scala.concurrent.Future
88
import scala.util.Success
9+
import sourcecode.Line
910
import utest._
1011

1112
object AsyncCallbackTest extends TestSuite {
@@ -26,6 +27,22 @@ object AsyncCallbackTest extends TestSuite {
2627
}.unsafeToFuture()
2728
}
2829

30+
private def assertMutexRejection(mutex: AsyncCallback[Unit] => AsyncCallback[Unit])(implicit l: Line): AsyncCallback[Unit] =
31+
mutex(AsyncCallback.unit).timeoutMs(500).delayMs(1).map(o => assertEq(o.isDefined, false))
32+
33+
private def testInMutex(mutex: AsyncCallback[Unit] => AsyncCallback[Unit])
34+
(test: AsyncCallback[Any]): AsyncCallback[Unit] =
35+
for {
36+
b <- AsyncCallback.barrier.asAsyncCallback
37+
_ <- mutex(b.await).fork_.asAsyncCallback // mutex start
38+
_ <- test
39+
_ <- b.complete.asAsyncCallback // mutex end
40+
_ <- mutex(AsyncCallback.unit)
41+
} yield ()
42+
43+
private def testWriteMutex(mutex: AsyncCallback[Unit] => AsyncCallback[Unit]): AsyncCallback[Unit] =
44+
testInMutex(mutex)(assertMutexRejection(mutex))
45+
2946
override def tests = Tests {
3047

3148
"fromCallback" - {
@@ -239,12 +256,29 @@ object AsyncCallbackTest extends TestSuite {
239256
"mutex" - asyncTest {
240257
for {
241258
mutex <- AsyncCallback.mutex.asAsyncCallback
242-
b <- AsyncCallback.barrier.asAsyncCallback
243-
_ <- mutex(b.await).fork_.asAsyncCallback // mutex start
244-
t <- mutex(AsyncCallback.unit).timeoutMs(500).delayMs(1)
245-
_ <- b.complete.asAsyncCallback // mutex end
246-
_ <- mutex(AsyncCallback.unit)
247-
} yield assert(t.isEmpty)
259+
_ <- testWriteMutex(mutex(_))
260+
} yield ()
261+
}
262+
263+
"readWriteMutex" - asyncTest {
264+
for {
265+
mutex <- AsyncCallback.readWriteMutex.asAsyncCallback
266+
267+
_ <- testWriteMutex(mutex.write) // confirm write blocks writes
268+
269+
b1 <- AsyncCallback.barrier.asAsyncCallback
270+
b2 <- AsyncCallback.barrier.asAsyncCallback
271+
l <- AsyncCallback.countDownLatch(2).asAsyncCallback
272+
_ <- mutex.read(l.countDown.asAsyncCallback >> b1.await).fork_.asAsyncCallback // read mutex enter
273+
_ <- mutex.read(l.countDown.asAsyncCallback >> b2.await).fork_.asAsyncCallback // read mutex enter
274+
_ <- l.await // confirm we've got two readers in the mutex at the same time
275+
_ <- assertMutexRejection(mutex.write) // confirm we can't use the write mutex
276+
_ <- b1.complete.asAsyncCallback // read mutex exit
277+
_ <- b2.complete.asAsyncCallback // read mutex exit
278+
279+
_ <- testInMutex(mutex.write)(assertMutexRejection(mutex.read)) // confirm write blocks reads
280+
281+
} yield ()
248282
}
249283

250284
}

0 commit comments

Comments
 (0)