Skip to content

Commit 05ae747

Browse files
Efim Poberezkintdas
authored andcommitted
[SPARK-23747][STRUCTURED STREAMING] Add EpochCoordinator unit tests
## What changes were proposed in this pull request? Unit tests for EpochCoordinator that test correct sequencing of committed epochs. Several tests are ignored since they test functionality implemented in SPARK-23503 which is not yet merged, otherwise they fail. Author: Efim Poberezkin <[email protected]> Closes apache#20983 from efimpoberezkin/pr/EpochCoordinator-tests.
1 parent 1cc66a0 commit 05ae747

File tree

1 file changed

+224
-0
lines changed

1 file changed

+224
-0
lines changed
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.streaming.continuous
19+
20+
import org.mockito.InOrder
21+
import org.mockito.Matchers.{any, eq => eqTo}
22+
import org.mockito.Mockito._
23+
import org.scalatest.BeforeAndAfterEach
24+
import org.scalatest.mockito.MockitoSugar
25+
26+
import org.apache.spark._
27+
import org.apache.spark.rpc.RpcEndpointRef
28+
import org.apache.spark.sql.LocalSparkSession
29+
import org.apache.spark.sql.execution.streaming.continuous._
30+
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, PartitionOffset}
31+
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
32+
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
33+
import org.apache.spark.sql.test.TestSparkSession
34+
35+
class EpochCoordinatorSuite
36+
extends SparkFunSuite
37+
with LocalSparkSession
38+
with MockitoSugar
39+
with BeforeAndAfterEach {
40+
41+
private var epochCoordinator: RpcEndpointRef = _
42+
43+
private var writer: StreamWriter = _
44+
private var query: ContinuousExecution = _
45+
private var orderVerifier: InOrder = _
46+
47+
override def beforeEach(): Unit = {
48+
val reader = mock[ContinuousReader]
49+
writer = mock[StreamWriter]
50+
query = mock[ContinuousExecution]
51+
orderVerifier = inOrder(writer, query)
52+
53+
spark = new TestSparkSession()
54+
55+
epochCoordinator
56+
= EpochCoordinatorRef.create(writer, reader, query, "test", 1, spark, SparkEnv.get)
57+
}
58+
59+
test("single epoch") {
60+
setWriterPartitions(3)
61+
setReaderPartitions(2)
62+
63+
commitPartitionEpoch(0, 1)
64+
commitPartitionEpoch(1, 1)
65+
commitPartitionEpoch(2, 1)
66+
reportPartitionOffset(0, 1)
67+
reportPartitionOffset(1, 1)
68+
69+
// Here and in subsequent tests this is called to make a synchronous call to EpochCoordinator
70+
// so that mocks would have been acted upon by the time verification happens
71+
makeSynchronousCall()
72+
73+
verifyCommit(1)
74+
}
75+
76+
test("single epoch, all but one writer partition has committed") {
77+
setWriterPartitions(3)
78+
setReaderPartitions(2)
79+
80+
commitPartitionEpoch(0, 1)
81+
commitPartitionEpoch(1, 1)
82+
reportPartitionOffset(0, 1)
83+
reportPartitionOffset(1, 1)
84+
85+
makeSynchronousCall()
86+
87+
verifyNoCommitFor(1)
88+
}
89+
90+
test("single epoch, all but one reader partition has reported an offset") {
91+
setWriterPartitions(3)
92+
setReaderPartitions(2)
93+
94+
commitPartitionEpoch(0, 1)
95+
commitPartitionEpoch(1, 1)
96+
commitPartitionEpoch(2, 1)
97+
reportPartitionOffset(0, 1)
98+
99+
makeSynchronousCall()
100+
101+
verifyNoCommitFor(1)
102+
}
103+
104+
test("consequent epochs, messages for epoch (k + 1) arrive after messages for epoch k") {
105+
setWriterPartitions(2)
106+
setReaderPartitions(2)
107+
108+
commitPartitionEpoch(0, 1)
109+
commitPartitionEpoch(1, 1)
110+
reportPartitionOffset(0, 1)
111+
reportPartitionOffset(1, 1)
112+
113+
commitPartitionEpoch(0, 2)
114+
commitPartitionEpoch(1, 2)
115+
reportPartitionOffset(0, 2)
116+
reportPartitionOffset(1, 2)
117+
118+
makeSynchronousCall()
119+
120+
verifyCommitsInOrderOf(List(1, 2))
121+
}
122+
123+
ignore("consequent epochs, a message for epoch k arrives after messages for epoch (k + 1)") {
124+
setWriterPartitions(2)
125+
setReaderPartitions(2)
126+
127+
commitPartitionEpoch(0, 1)
128+
commitPartitionEpoch(1, 1)
129+
reportPartitionOffset(0, 1)
130+
131+
commitPartitionEpoch(0, 2)
132+
commitPartitionEpoch(1, 2)
133+
reportPartitionOffset(0, 2)
134+
reportPartitionOffset(1, 2)
135+
136+
// Message that arrives late
137+
reportPartitionOffset(1, 1)
138+
139+
makeSynchronousCall()
140+
141+
verifyCommitsInOrderOf(List(1, 2))
142+
}
143+
144+
ignore("several epochs, messages arrive in order 1 -> 3 -> 4 -> 2") {
145+
setWriterPartitions(1)
146+
setReaderPartitions(1)
147+
148+
commitPartitionEpoch(0, 1)
149+
reportPartitionOffset(0, 1)
150+
151+
commitPartitionEpoch(0, 3)
152+
reportPartitionOffset(0, 3)
153+
154+
commitPartitionEpoch(0, 4)
155+
reportPartitionOffset(0, 4)
156+
157+
commitPartitionEpoch(0, 2)
158+
reportPartitionOffset(0, 2)
159+
160+
makeSynchronousCall()
161+
162+
verifyCommitsInOrderOf(List(1, 2, 3, 4))
163+
}
164+
165+
ignore("several epochs, messages arrive in order 1 -> 3 -> 5 -> 4 -> 2") {
166+
setWriterPartitions(1)
167+
setReaderPartitions(1)
168+
169+
commitPartitionEpoch(0, 1)
170+
reportPartitionOffset(0, 1)
171+
172+
commitPartitionEpoch(0, 3)
173+
reportPartitionOffset(0, 3)
174+
175+
commitPartitionEpoch(0, 5)
176+
reportPartitionOffset(0, 5)
177+
178+
commitPartitionEpoch(0, 4)
179+
reportPartitionOffset(0, 4)
180+
181+
commitPartitionEpoch(0, 2)
182+
reportPartitionOffset(0, 2)
183+
184+
makeSynchronousCall()
185+
186+
verifyCommitsInOrderOf(List(1, 2, 3, 4, 5))
187+
}
188+
189+
private def setWriterPartitions(numPartitions: Int): Unit = {
190+
epochCoordinator.askSync[Unit](SetWriterPartitions(numPartitions))
191+
}
192+
193+
private def setReaderPartitions(numPartitions: Int): Unit = {
194+
epochCoordinator.askSync[Unit](SetReaderPartitions(numPartitions))
195+
}
196+
197+
private def commitPartitionEpoch(partitionId: Int, epoch: Long): Unit = {
198+
val dummyMessage: WriterCommitMessage = mock[WriterCommitMessage]
199+
epochCoordinator.send(CommitPartitionEpoch(partitionId, epoch, dummyMessage))
200+
}
201+
202+
private def reportPartitionOffset(partitionId: Int, epoch: Long): Unit = {
203+
val dummyOffset: PartitionOffset = mock[PartitionOffset]
204+
epochCoordinator.send(ReportPartitionOffset(partitionId, epoch, dummyOffset))
205+
}
206+
207+
private def makeSynchronousCall(): Unit = {
208+
epochCoordinator.askSync[Long](GetCurrentEpoch)
209+
}
210+
211+
private def verifyCommit(epoch: Long): Unit = {
212+
orderVerifier.verify(writer).commit(eqTo(epoch), any())
213+
orderVerifier.verify(query).commit(epoch)
214+
}
215+
216+
private def verifyNoCommitFor(epoch: Long): Unit = {
217+
verify(writer, never()).commit(eqTo(epoch), any())
218+
verify(query, never()).commit(epoch)
219+
}
220+
221+
private def verifyCommitsInOrderOf(epochs: Seq[Long]): Unit = {
222+
epochs.foreach(verifyCommit)
223+
}
224+
}

0 commit comments

Comments
 (0)