Skip to content

Commit 36f95dd

Browse files
authored
Add Atomic Barrier and PhaseLockingTestMixin (delta-io#2772)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Added the `AtomicBarrier` and `PhaseLockingTestMixin`, the initial building blocks of the Concurrency Testing framework, in order to make way to add a Suite that tests the interaction of the `ConflictChecker` with the `RowTracking` feature, ensuring Row Tracking is well-tested and getting ready for [enabling Row Tracking outside of testing in Delta](delta-io#2059). <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? Added UTs. <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? No. <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent cb77f54 commit 36f95dd

File tree

4 files changed

+292
-0
lines changed

4 files changed

+292
-0
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta
18+
19+
import scala.concurrent.duration._
20+
21+
object BusyWait {
22+
/**
23+
* Keep checking if `check` returns `true` until it's the case or `waitTime` expires.
24+
*
25+
* Return `true` when the `check` returned `true`, and `false` if `waitTime` expired.
26+
*
27+
* Note: This function is used as a helper function for the Concurrency Testing framework,
28+
* and should not be used in production code. Production code should not use polling
29+
* and should instead use signalling to coordinate.
30+
*/
31+
def until(
32+
check: => Boolean,
33+
waitTime: FiniteDuration): Boolean = {
34+
val DEFAULT_SLEEP_TIME: Duration = 10.millis
35+
val deadline = waitTime.fromNow
36+
37+
do {
38+
if (check) {
39+
return true
40+
}
41+
val sleepTimeMs = DEFAULT_SLEEP_TIME.min(deadline.timeLeft).toMillis
42+
Thread.sleep(sleepTimeMs)
43+
} while (deadline.hasTimeLeft())
44+
false
45+
}
46+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta.fuzzer
18+
19+
import java.util.concurrent.atomic.AtomicInteger
20+
21+
/**
22+
* An atomic barrier is similar to a countdown latch,
23+
* except that the content is a state transition system with semantic meaning
24+
* instead of a simple counter.
25+
*
26+
* It is designed with a single writer ("unblocker") thread and a single reader ("waiter") thread
27+
* in mind. It is concurrency safe with more writers and readers, but using more is likely to cause
28+
* race conditions for legal transitions. That is to say, trying to perform an otherwise
29+
* legal transition twice is illegal and may occur if there is more than one unblocker or
30+
* waiter thread.
31+
* Having additional passive state observers that only call [[load()]] is never an issue.
32+
*
33+
* Legal transitions are:
34+
* - BLOCKED -> UNBLOCKED
35+
* - BLOCKED -> REQUESTED
36+
* - REQUESTED -> UNBLOCKED
37+
* - UNBLOCKED -> PASSED
38+
*/
39+
class AtomicBarrier {
40+
41+
import AtomicBarrier._
42+
43+
private final val state: AtomicInteger = new AtomicInteger(State.Blocked.ordinal)
44+
45+
/** Get the current state. */
46+
def load(): State = {
47+
val ordinal = state.get()
48+
// We should never be putting illegal state ordinals into `state`,
49+
// so this should always succeed.
50+
stateIndex(ordinal)
51+
}
52+
53+
/** Transition to the Unblocked state. */
54+
def unblock(): Unit = {
55+
// Just hot-retry this, since it never needs to wait to make progress.
56+
var successful = false
57+
while(!successful) {
58+
val currentValue = state.get()
59+
if (currentValue == State.Blocked.ordinal || currentValue == State.Requested.ordinal) {
60+
this.synchronized {
61+
successful = state.compareAndSet(currentValue, State.Unblocked.ordinal)
62+
if (successful) {
63+
this.notifyAll()
64+
}
65+
}
66+
} else {
67+
// if it's in any other state we will never make progress
68+
throw new IllegalStateTransitionException(stateIndex(currentValue), State.Unblocked)
69+
}
70+
}
71+
}
72+
73+
/** Wait until this barrier can be passed and then mark it as Passed. */
74+
def waitToPass(): Unit = {
75+
while (true) {
76+
val currentState = load()
77+
currentState match {
78+
case State.Unblocked =>
79+
val updated = state.compareAndSet(currentState.ordinal, State.Passed.ordinal)
80+
if (updated) {
81+
return
82+
}
83+
case State.Passed =>
84+
throw new IllegalStateTransitionException(State.Passed, State.Passed)
85+
case State.Requested =>
86+
this.synchronized {
87+
if (load().ordinal == State.Requested.ordinal) {
88+
this.wait()
89+
}
90+
}
91+
case State.Blocked =>
92+
this.synchronized {
93+
val updated = state.compareAndSet(currentState.ordinal, State.Requested.ordinal)
94+
if (updated) {
95+
this.wait()
96+
}
97+
} // else (if we didn't succeed) just hot-retry until we do
98+
// (or more likely pass, since unblocking is the only legal concurrent
99+
// update with a single concurrent "waiter")
100+
}
101+
}
102+
}
103+
104+
override def toString: String = s"AtomicBarrier(state=${load()})"
105+
}
106+
107+
object AtomicBarrier {
108+
109+
sealed trait State {
110+
def ordinal: Int
111+
}
112+
113+
object State {
114+
case object Blocked extends State {
115+
override final val ordinal = 0
116+
}
117+
case object Unblocked extends State {
118+
override final val ordinal = 1
119+
}
120+
case object Requested extends State {
121+
override final val ordinal = 2
122+
}
123+
case object Passed extends State {
124+
override final val ordinal = 3
125+
}
126+
}
127+
128+
final val stateIndex: Map[Int, State] =
129+
List(State.Blocked, State.Unblocked, State.Requested, State.Passed)
130+
.map(state => state.ordinal -> state)
131+
.toMap
132+
}
133+
134+
class IllegalStateTransitionException(fromState: AtomicBarrier.State, toState: AtomicBarrier.State)
135+
extends RuntimeException(s"State transition from $fromState to $toState is illegal.")
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta.concurrency
18+
19+
import scala.concurrent.duration._
20+
21+
import org.apache.spark.sql.delta.BusyWait
22+
import org.apache.spark.sql.delta.fuzzer.AtomicBarrier
23+
24+
import org.apache.spark.SparkFunSuite
25+
26+
trait PhaseLockingTestMixin { self: SparkFunSuite =>
27+
/** Keep checking if `barrier` in `state` until it's the case or `waitTime` expires. */
28+
def busyWaitForState(
29+
barrier: AtomicBarrier,
30+
state: AtomicBarrier.State,
31+
waitTime: FiniteDuration): Unit =
32+
busyWaitFor(
33+
barrier.load() == state,
34+
waitTime,
35+
s"Exceeded deadline waiting for $barrier to transition to state $state")
36+
37+
/**
38+
* Keep checking if `check` return `true` until it's the case or `waitTime` expires.
39+
*
40+
* Optionally provide a custom error `message`.
41+
*/
42+
def busyWaitFor(
43+
check: => Boolean,
44+
timeout: FiniteDuration,
45+
// lazy evaluate so closed over states are evaluated at time of failure not invocation
46+
message: => String = "Exceeded deadline waiting for check to become true."): Unit = {
47+
if (!BusyWait.until(check, timeout)) {
48+
fail(message)
49+
}
50+
}
51+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.sql.delta.fuzzer
18+
19+
import scala.concurrent.duration._
20+
21+
import org.apache.spark.sql.delta.concurrency.PhaseLockingTestMixin
22+
23+
import org.apache.spark.SparkFunSuite
24+
25+
class AtomicBarrierSuite extends SparkFunSuite
26+
with PhaseLockingTestMixin {
27+
28+
val timeout: FiniteDuration = 5000.millis
29+
30+
test("Atomic Barrier - wait before unblock") {
31+
val barrier = new AtomicBarrier
32+
assert(AtomicBarrier.State.Blocked === barrier.load())
33+
val thread = new Thread(() => {
34+
barrier.waitToPass()
35+
})
36+
assert(AtomicBarrier.State.Blocked === barrier.load())
37+
thread.start()
38+
busyWaitForState(barrier, AtomicBarrier.State.Requested, timeout)
39+
assert(thread.isAlive) // should be stuck waiting for unblock
40+
barrier.unblock()
41+
busyWaitForState(barrier, AtomicBarrier.State.Passed, timeout)
42+
thread.join(timeout.toMillis) // shouldn't take long
43+
assert(!thread.isAlive) // should have passed the barrier and completed
44+
}
45+
46+
test("Atomic Barrier - unblock before wait") {
47+
val barrier = new AtomicBarrier
48+
assert(AtomicBarrier.State.Blocked === barrier.load())
49+
val thread = new Thread(() => {
50+
barrier.waitToPass()
51+
})
52+
assert(AtomicBarrier.State.Blocked === barrier.load())
53+
barrier.unblock()
54+
assert(AtomicBarrier.State.Unblocked === barrier.load())
55+
thread.start()
56+
busyWaitForState(barrier, AtomicBarrier.State.Passed, timeout)
57+
thread.join(timeout.toMillis) // shouldn't take long
58+
assert(!thread.isAlive) // should have passed the barrier and completed
59+
}
60+
}

0 commit comments

Comments
 (0)