@@ -23,12 +23,14 @@ import com.fasterxml.jackson.databind.ObjectMapper
23
23
import com .fasterxml .jackson .module .scala .DefaultScalaModule
24
24
import com .google .common .io .ByteStreams
25
25
import okhttp3 .{RequestBody , ResponseBody }
26
+ import org .eclipse .jetty .server .Server
26
27
import org .scalatest .BeforeAndAfter
27
28
import org .scalatest .mock .MockitoSugar .mock
28
29
import retrofit2 .Call
29
30
30
31
import org .apache .spark .{SparkFunSuite , SSLOptions }
31
32
import org .apache .spark .deploy .kubernetes .SSLUtils
33
+ import org .apache .spark .internal .Logging
32
34
import org .apache .spark .util .Utils
33
35
34
36
/**
@@ -40,30 +42,37 @@ import org.apache.spark.util.Utils
40
42
* we've configured the Jetty server correctly and that the endpoints reached over HTTP can
41
43
* receive streamed uploads and can stream downloads.
42
44
*/
43
- class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter {
45
+ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter with Logging {
46
+
47
+ private val MAX_SERVER_START_ATTEMPTS = 5
44
48
private var serviceImpl : ResourceStagingService = _
45
49
private var stagedResourcesCleaner : StagedResourcesCleaner = _
46
- private var server : ResourceStagingServer = _
50
+ private var server : Option [ ResourceStagingServer ] = None
47
51
private val OBJECT_MAPPER = new ObjectMapper ().registerModule(new DefaultScalaModule )
48
52
49
- private val serverPort = new ServerSocket (0 ).getLocalPort
50
-
51
53
private val sslOptionsProvider = new SettableReferenceSslOptionsProvider ()
52
54
53
55
before {
54
56
stagedResourcesCleaner = mock[StagedResourcesCleaner ]
55
57
serviceImpl = new ResourceStagingServiceImpl (
56
58
new StagedResourcesStoreImpl (Utils .createTempDir()), stagedResourcesCleaner)
57
- server = new ResourceStagingServer (serverPort, serviceImpl, sslOptionsProvider)
58
59
}
59
60
60
61
after {
61
- server.stop()
62
+ server.foreach { s =>
63
+ try {
64
+ s.stop()
65
+ } catch {
66
+ case e : Throwable =>
67
+ log.warn(" Failed to stop the resource staging server." , e)
68
+ }
69
+ }
70
+ server = None
62
71
}
63
72
64
73
test(" Accept file and jar uploads and downloads" ) {
65
- server.start ()
66
- runUploadAndDownload(SSLOptions ())
74
+ val serverPort = startServer ()
75
+ runUploadAndDownload(SSLOptions (), serverPort )
67
76
}
68
77
69
78
test(" Enable SSL on the server" ) {
@@ -80,11 +89,11 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter {
80
89
trustStore = Some (keyStoreAndTrustStore.trustStore),
81
90
trustStorePassword = Some (" trustStore" ))
82
91
sslOptionsProvider.setOptions(sslOptions)
83
- server.start ()
84
- runUploadAndDownload(sslOptions)
92
+ val serverPort = startServer ()
93
+ runUploadAndDownload(sslOptions, serverPort )
85
94
}
86
95
87
- private def runUploadAndDownload (sslOptions : SSLOptions ): Unit = {
96
+ private def runUploadAndDownload (sslOptions : SSLOptions , serverPort : Int ): Unit = {
88
97
val scheme = if (sslOptions.enabled) " https" else " http"
89
98
val retrofitService = RetrofitClientFactoryImpl .createRetrofitClient(
90
99
s " $scheme://127.0.0.1: $serverPort/ " ,
@@ -125,6 +134,44 @@ class ResourceStagingServerSuite extends SparkFunSuite with BeforeAndAfter {
125
134
val downloadedBytes = ByteStreams .toByteArray(responseBody.byteStream())
126
135
assert(downloadedBytes.toSeq === bytes)
127
136
}
137
+
138
+ private def startServer (): Int = {
139
+ var currentAttempt = 0
140
+ var successfulStart = false
141
+ var latestServerPort = new ServerSocket (0 ).getLocalPort
142
+ while (currentAttempt < MAX_SERVER_START_ATTEMPTS && ! successfulStart) {
143
+ val newServer = new ResourceStagingServer (latestServerPort, serviceImpl, sslOptionsProvider)
144
+ try {
145
+ newServer.start()
146
+ successfulStart = true
147
+ server = Some (newServer)
148
+ } catch {
149
+ case e : Throwable =>
150
+ try {
151
+ newServer.stop()
152
+ } catch {
153
+ case e1 : Throwable =>
154
+ log.warn(" Failed to stop a resource staging server that failed to start." , e1)
155
+ }
156
+
157
+ if (Utils .isBindCollision(e)) {
158
+ currentAttempt += 1
159
+ latestServerPort = latestServerPort + 1
160
+ if (currentAttempt == MAX_SERVER_START_ATTEMPTS ) {
161
+ throw new RuntimeException (s " Failed to bind to a random port " +
162
+ s " $MAX_SERVER_START_ATTEMPTS times. Last attempted port: $latestServerPort" , e)
163
+ } else {
164
+ logWarning(s " Attempt $currentAttempt/ $MAX_SERVER_START_ATTEMPTS failed to start " +
165
+ s " server on port $latestServerPort. " , e)
166
+ }
167
+ } else {
168
+ throw e
169
+ }
170
+ }
171
+ }
172
+ logInfo(s " Started resource staging server on port $latestServerPort. " )
173
+ latestServerPort
174
+ }
128
175
}
129
176
130
177
private class SettableReferenceSslOptionsProvider extends ResourceStagingServerSslOptionsProvider {
0 commit comments