Skip to content

Commit 9f8bfe1

Browse files
committed
[Bug] RestOptions.AWAIT_LEADER_TIMEOUT type compatibility improvement
1 parent 9c80377 commit 9f8bfe1

File tree

6 files changed

+46
-50
lines changed

6 files changed

+46
-50
lines changed

streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,33 @@
1717

1818
package org.apache.streampark.flink.client.tool
1919

20-
import org.apache.streampark.common.util.Logger
21-
import org.apache.streampark.flink.kubernetes.KubernetesRetriever
22-
2320
import org.apache.flink.client.deployment.application.ApplicationConfiguration
2421
import org.apache.flink.configuration.{Configuration, CoreOptions}
2522
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
2623
import org.apache.hc.client5.http.entity.mime.MultipartEntityBuilder
2724
import org.apache.hc.client5.http.fluent.Request
2825
import org.apache.hc.core5.http.ContentType
2926
import org.apache.hc.core5.http.io.entity.StringEntity
27+
import org.apache.hc.core5.util.Timeout
28+
import org.apache.streampark.common.util.Logger
3029
import org.json4s.DefaultFormats
3130
import org.json4s.jackson.JsonMethods._
3231
import org.json4s.jackson.Serialization
3332

3433
import java.io.File
3534
import java.nio.charset.StandardCharsets
36-
35+
import java.time.Duration
3736
import scala.collection.JavaConversions._
3837
import scala.util.{Failure, Success, Try}
3938

4039
object FlinkSessionSubmitHelper extends Logger {
4140

41+
// see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
42+
private lazy val FLINK_CLIENT_TIMEOUT_SEC: Timeout = Timeout.ofMilliseconds(Duration.ofSeconds(60).toMillis).toTimeout
43+
44+
// see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
45+
private lazy val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout = Timeout.ofMilliseconds(30000L)
46+
4247
@transient
4348
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
4449

@@ -59,8 +64,8 @@ object FlinkSessionSubmitHelper extends Logger {
5964
// upload flink-job jar
6065
val uploadResult = Request
6166
.post(s"$jmRestUrl/jars/upload")
62-
.connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
63-
.responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
67+
.connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
68+
.responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
6469
.body(
6570
MultipartEntityBuilder
6671
.create()
@@ -90,8 +95,8 @@ object FlinkSessionSubmitHelper extends Logger {
9095
// refer to https://ci.apache.org/projects/flink/flink-docs-stable/docs/ops/rest_api/#jars-upload
9196
val resp = Request
9297
.post(s"$jmRestUrl/jars/${jarUploadResponse.jarId}/run")
93-
.connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
94-
.responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
98+
.connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
99+
.responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
95100
.body(new StringEntity(Serialization.write(new JarRunRequest(flinkConfig))))
96101
.execute
97102
.returnContent()

streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,36 +17,25 @@
1717

1818
package org.apache.streampark.flink.kubernetes
1919

20-
import org.apache.streampark.common.util.{DateUtils, Logger, Utils}
21-
import org.apache.streampark.common.util.Utils.using
22-
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
23-
import org.apache.streampark.flink.kubernetes.ingress.IngressController
24-
import org.apache.streampark.flink.kubernetes.model.ClusterKey
25-
2620
import io.fabric8.kubernetes.client.{DefaultKubernetesClient, KubernetesClient, KubernetesClientException}
2721
import org.apache.flink.client.cli.ClientOptions
2822
import org.apache.flink.client.deployment.{ClusterClientFactory, DefaultClusterClientServiceLoader}
2923
import org.apache.flink.client.program.ClusterClient
3024
import org.apache.flink.configuration.{Configuration, DeploymentOptions, RestOptions}
3125
import org.apache.flink.kubernetes.KubernetesClusterDescriptor
3226
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
33-
import org.apache.hc.core5.util.Timeout
27+
import org.apache.streampark.common.util.Utils.using
28+
import org.apache.streampark.common.util.{Logger, Utils}
29+
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
30+
import org.apache.streampark.flink.kubernetes.ingress.IngressController
31+
import org.apache.streampark.flink.kubernetes.model.ClusterKey
3432

3533
import javax.annotation.Nullable
36-
3734
import scala.collection.JavaConverters._
3835
import scala.util.{Failure, Success, Try}
3936

4037
object KubernetesRetriever extends Logger {
4138

42-
// see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
43-
val FLINK_CLIENT_TIMEOUT_SEC: Timeout =
44-
Timeout.ofMilliseconds(ClientOptions.CLIENT_TIMEOUT.defaultValue().toMillis)
45-
46-
// see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
47-
val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout =
48-
Timeout.ofMilliseconds(RestOptions.AWAIT_LEADER_TIMEOUT.defaultValue())
49-
5039
private val DEPLOYMENT_LOST_TIME = collection.mutable.Map[String, Long]()
5140

5241
/** get new KubernetesClient */

streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,20 @@
1717

1818
package org.apache.streampark.flink.kubernetes.watcher
1919

20+
import org.apache.hc.client5.http.fluent.Request
2021
import org.apache.streampark.common.util.Logger
21-
import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, KubernetesRetriever, MetricWatcherConfig}
2222
import org.apache.streampark.flink.kubernetes.event.FlinkJobCheckpointChangeEvent
2323
import org.apache.streampark.flink.kubernetes.model.{CheckpointCV, ClusterKey, TrackId}
24-
25-
import org.apache.hc.client5.http.fluent.Request
26-
import org.json4s.{DefaultFormats, JNull}
24+
import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, MetricWatcherConfig}
2725
import org.json4s.JsonAST.JNothing
2826
import org.json4s.jackson.JsonMethods.parse
29-
30-
import javax.annotation.concurrent.ThreadSafe
27+
import org.json4s.{DefaultFormats, JNull}
3128

3229
import java.nio.charset.StandardCharsets
3330
import java.util.concurrent.{ScheduledFuture, TimeUnit}
34-
35-
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}
31+
import javax.annotation.concurrent.ThreadSafe
3632
import scala.concurrent.duration.DurationLong
33+
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}
3734
import scala.language.postfixOps
3835
import scala.util.{Failure, Success, Try}
3936

@@ -119,8 +116,8 @@ class FlinkCheckpointWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.def
119116
Checkpoint.as(
120117
Request
121118
.get(s"$flinkJmRestUrl/jobs/${trackId.jobId}/checkpoints")
122-
.connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
123-
.responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
119+
.connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
120+
.responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
124121
.execute
125122
.returnContent
126123
.asString(StandardCharsets.UTF_8)) match {

streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,8 +291,8 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
291291
JobDetails.as(
292292
Request
293293
.get(s"$restUrl/jobs/overview")
294-
.connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
295-
.responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
294+
.connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
295+
.responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
296296
.execute
297297
.returnContent()
298298
.asString(StandardCharsets.UTF_8)

streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,20 @@
1717

1818
package org.apache.streampark.flink.kubernetes.watcher
1919

20+
import org.apache.flink.configuration.{JobManagerOptions, MemorySize, TaskManagerOptions}
21+
import org.apache.hc.client5.http.fluent.Request
2022
import org.apache.streampark.common.util.Logger
21-
import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, KubernetesRetriever, MetricWatcherConfig}
2223
import org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent
2324
import org.apache.streampark.flink.kubernetes.model.{ClusterKey, FlinkMetricCV, TrackId}
24-
25-
import org.apache.flink.configuration.{JobManagerOptions, MemorySize, TaskManagerOptions}
26-
import org.apache.hc.client5.http.fluent.Request
27-
import org.json4s.{DefaultFormats, JArray}
25+
import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, MetricWatcherConfig}
2826
import org.json4s.jackson.JsonMethods.parse
29-
30-
import javax.annotation.concurrent.ThreadSafe
27+
import org.json4s.{DefaultFormats, JArray}
3128

3229
import java.nio.charset.StandardCharsets
3330
import java.util.concurrent.{ScheduledFuture, TimeUnit}
34-
35-
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}
31+
import javax.annotation.concurrent.ThreadSafe
3632
import scala.concurrent.duration.DurationLong
33+
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}
3734
import scala.language.postfixOps
3835
import scala.util.{Failure, Success, Try}
3936

@@ -131,8 +128,8 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.default
131128
.as(
132129
Request
133130
.get(s"$flinkJmRestUrl/overview")
134-
.connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
135-
.responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
131+
.connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
132+
.responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
136133
.execute
137134
.returnContent
138135
.asString(StandardCharsets.UTF_8))
@@ -144,8 +141,8 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.default
144141
.as(
145142
Request
146143
.get(s"$flinkJmRestUrl/jobmanager/config")
147-
.connectTimeout(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC)
148-
.responseTimeout(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC)
144+
.connectTimeout(FLINK_REST_AWAIT_TIMEOUT_SEC)
145+
.responseTimeout(FLINK_CLIENT_TIMEOUT_SEC)
149146
.execute
150147
.returnContent
151148
.asString(StandardCharsets.UTF_8))

streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,21 @@
1717

1818
package org.apache.streampark.flink.kubernetes.watcher
1919

20+
import org.apache.hc.core5.util.Timeout
21+
22+
import java.time.Duration
2023
import java.util.concurrent.ScheduledThreadPoolExecutor
2124
import java.util.concurrent.atomic.AtomicBoolean
22-
2325
import scala.language.implicitConversions
2426

2527
trait FlinkWatcher extends AutoCloseable {
2628

29+
// see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
30+
lazy val FLINK_CLIENT_TIMEOUT_SEC: Timeout = Timeout.ofMilliseconds(Duration.ofSeconds(60).toMillis).toTimeout
31+
32+
// see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
33+
lazy val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout = Timeout.ofMilliseconds(30000L)
34+
2735
private[this] val started: AtomicBoolean = new AtomicBoolean(false)
2836

2937
private val CPU_NUM = Math.max(4, Runtime.getRuntime.availableProcessors * 2)

0 commit comments

Comments
 (0)