@@ -6,6 +6,7 @@ import spark.jobserver.CommonMessages.JobResult
6
6
import spark .jobserver .io .JobDAOActor
7
7
8
8
class NamedObjectsJobSpec extends JobSpecBase (JobManagerSpec .getNewSystem) {
9
+ import scala .concurrent .duration ._
9
10
10
11
override def beforeAll () {
11
12
dao = new InMemoryDAO
@@ -16,12 +17,12 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
16
17
supervisor = TestProbe ().ref
17
18
18
19
manager ! JobManagerActor .Initialize (None )
19
-
20
- expectMsgClass(classOf [JobManagerActor .Initialized ])
20
+
21
+ expectMsgClass(10 .seconds, classOf [JobManagerActor .Initialized ])
21
22
22
23
uploadTestJar()
23
24
}
24
-
25
+
25
26
val jobName = " spark.jobserver.NamedObjectsTestJob"
26
27
27
28
private def getCreateConfig (createDF : Boolean , createRDD : Boolean , createBroadcast : Boolean = false ) : Config = {
@@ -30,12 +31,12 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
30
31
NamedObjectsTestJobConfig .CREATE_RDD + " = " + createRDD + " , " +
31
32
NamedObjectsTestJobConfig .CREATE_BROADCAST + " = " + createBroadcast)
32
33
}
33
-
34
+
34
35
private def getDeleteConfig (names : List [String ]) : Config = {
35
- ConfigFactory .parseString(" spark.jobserver.named-object-creation-timeout = 60 s, " +
36
+ ConfigFactory .parseString(" spark.jobserver.named-object-creation-timeout = 60 s, " +
36
37
NamedObjectsTestJobConfig .DELETE + " = [" + names.mkString(" , " ) + " ]" )
37
38
}
38
-
39
+
39
40
describe(" NamedObjects (RDD)" ) {
40
41
it(" should survive from one job to another one" ) {
41
42
manager ! JobManagerActor .StartJob (" demo" , jobName, getCreateConfig(false , true ), errorEvents ++ syncEvents)
@@ -60,15 +61,16 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
60
61
describe(" NamedObjects (DataFrame)" ) {
61
62
it(" should survive from one job to another one" ) {
62
63
manager ! JobManagerActor .StartJob (" demo" , jobName, getCreateConfig(true , false ), errorEvents ++ syncEvents)
63
- val JobResult (_, names : Array [String ]) = expectMsgClass(classOf [JobResult ])
64
+ // for some reason, this just needs some more time to finish occasinally
65
+ val JobResult (_, names : Array [String ]) = expectMsgClass(10 .seconds, classOf [JobResult ])
64
66
65
67
names should contain(" df1" )
66
68
67
69
manager ! JobManagerActor .StartJob (" demo" , jobName, getCreateConfig(false , false ), errorEvents ++ syncEvents)
68
70
val JobResult (_, names2 : Array [String ]) = expectMsgClass(classOf [JobResult ])
69
71
70
72
names2 should equal(names)
71
-
73
+
72
74
// clean-up
73
75
manager ! JobManagerActor .StartJob (" demo" , jobName, getDeleteConfig(List (" df1" )), errorEvents ++ syncEvents)
74
76
expectMsgClass(classOf [JobResult ])
@@ -79,15 +81,15 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
79
81
it(" should survive from one job to another one" ) {
80
82
manager ! JobManagerActor .StartJob (" demo" , jobName, getCreateConfig(true , true ), errorEvents ++ syncEvents)
81
83
val JobResult (_, names : Array [String ]) = expectMsgClass(classOf [JobResult ])
82
-
84
+
83
85
names should contain(" rdd1" )
84
86
names should contain(" df1" )
85
87
86
88
manager ! JobManagerActor .StartJob (" demo" , jobName, getCreateConfig(false , false ), errorEvents ++ syncEvents)
87
89
val JobResult (_, names2 : Array [String ]) = expectMsgClass(classOf [JobResult ])
88
90
89
91
names2 should equal(names)
90
-
92
+
91
93
// clean-up
92
94
manager ! JobManagerActor .StartJob (" demo" , jobName, getDeleteConfig(List (" rdd1" , " df1" )), errorEvents ++ syncEvents)
93
95
expectMsgClass(classOf [JobResult ])
@@ -99,7 +101,7 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
99
101
100
102
manager ! JobManagerActor .StartJob (" demo" , jobName, getCreateConfig(true , true , true ), errorEvents ++ syncEvents)
101
103
val JobResult (_, names : Array [String ]) = expectMsgClass(classOf [JobResult ])
102
-
104
+
103
105
names should contain(" rdd1" )
104
106
names should contain(" df1" )
105
107
names should contain(" broadcast1" )
@@ -108,7 +110,7 @@ class NamedObjectsJobSpec extends JobSpecBase(JobManagerSpec.getNewSystem) {
108
110
val JobResult (_, names2 : Array [String ]) = expectMsgClass(classOf [JobResult ])
109
111
110
112
names2 should equal(names)
111
-
113
+
112
114
// clean-up
113
115
manager ! JobManagerActor .StartJob (" demo" , jobName, getDeleteConfig(List (" rdd1" , " df1" , " broadcast1" ))
114
116
, errorEvents ++ syncEvents)
0 commit comments