Skip to content

Commit 3a13f8d

Browse files
eemariozhuzhurk
authored andcommitted
[FLINK-38978][runtime] Expose JM config for applications
1 parent c0f2273 commit 3a13f8d

File tree

5 files changed

+295
-0
lines changed

5 files changed

+295
-0
lines changed

flink-runtime-web/src/test/resources/rest_api_v1.snapshot

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,38 @@
161161
"type" : "object",
162162
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody"
163163
}
164+
}, {
165+
"url" : "/applications/:applicationid/jobmanager/config",
166+
"method" : "GET",
167+
"status-code" : "200 OK",
168+
"file-upload" : false,
169+
"path-parameters" : {
170+
"pathParameters" : [ {
171+
"key" : "applicationid"
172+
} ]
173+
},
174+
"query-parameters" : {
175+
"queryParameters" : [ ]
176+
},
177+
"request" : {
178+
"type" : "object",
179+
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
180+
},
181+
"response" : {
182+
"type" : "array",
183+
"items" : {
184+
"type" : "object",
185+
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ConfigurationInfoEntry",
186+
"properties" : {
187+
"key" : {
188+
"type" : "string"
189+
},
190+
"value" : {
191+
"type" : "string"
192+
}
193+
}
194+
}
195+
}
164196
}, {
165197
"url" : "/cluster",
166198
"method" : "DELETE",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.rest.handler.application;
20+
21+
import org.apache.flink.configuration.Configuration;
22+
import org.apache.flink.runtime.application.ArchivedApplication;
23+
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
24+
import org.apache.flink.runtime.rest.handler.HandlerRequest;
25+
import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
26+
import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
27+
import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
28+
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
29+
import org.apache.flink.runtime.rest.messages.MessageHeaders;
30+
import org.apache.flink.runtime.rest.messages.application.JobManagerApplicationConfigurationHeaders;
31+
import org.apache.flink.runtime.webmonitor.RestfulGateway;
32+
import org.apache.flink.runtime.webmonitor.history.ApplicationJsonArchivist;
33+
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
34+
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
35+
import org.apache.flink.util.Preconditions;
36+
37+
import javax.annotation.Nonnull;
38+
39+
import java.io.IOException;
40+
import java.time.Duration;
41+
import java.util.Collection;
42+
import java.util.Collections;
43+
import java.util.Map;
44+
import java.util.concurrent.CompletableFuture;
45+
46+
/** Handler which serves the jobmanager's configuration of a specific application. */
47+
public class JobManagerApplicationConfigurationHandler
48+
extends AbstractRestHandler<
49+
RestfulGateway, EmptyRequestBody, ConfigurationInfo, ApplicationMessageParameters>
50+
implements ApplicationJsonArchivist {
51+
52+
private final ConfigurationInfo jobConfig;
53+
54+
public JobManagerApplicationConfigurationHandler(
55+
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
56+
Duration timeout,
57+
Map<String, String> responseHeaders,
58+
MessageHeaders<EmptyRequestBody, ConfigurationInfo, ApplicationMessageParameters>
59+
messageHeaders,
60+
Configuration configuration) {
61+
super(leaderRetriever, timeout, responseHeaders, messageHeaders);
62+
63+
Preconditions.checkNotNull(configuration);
64+
this.jobConfig = ConfigurationInfo.from(configuration);
65+
}
66+
67+
@Override
68+
protected CompletableFuture<ConfigurationInfo> handleRequest(
69+
@Nonnull HandlerRequest<EmptyRequestBody> request, @Nonnull RestfulGateway gateway) {
70+
return CompletableFuture.completedFuture(jobConfig);
71+
}
72+
73+
@Override
74+
public Collection<ArchivedJson> archiveApplicationWithPath(
75+
ArchivedApplication archivedApplication) throws IOException {
76+
return Collections.singletonList(
77+
new ArchivedJson(
78+
JobManagerApplicationConfigurationHeaders.getInstance()
79+
.getTargetRestEndpointURL()
80+
.replace(
81+
':' + ApplicationIDPathParameter.KEY,
82+
archivedApplication.getApplicationId().toHexString()),
83+
jobConfig));
84+
}
85+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.rest.messages.application;
20+
21+
import org.apache.flink.runtime.rest.HttpMethodWrapper;
22+
import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
23+
import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
24+
import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
25+
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
26+
import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
27+
28+
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
29+
30+
/**
31+
* Message headers for the {@link
32+
* org.apache.flink.runtime.rest.handler.application.JobManagerApplicationConfigurationHandler}.
33+
*/
34+
public class JobManagerApplicationConfigurationHeaders
35+
implements RuntimeMessageHeaders<
36+
EmptyRequestBody, ConfigurationInfo, ApplicationMessageParameters> {
37+
38+
private static final JobManagerApplicationConfigurationHeaders INSTANCE =
39+
new JobManagerApplicationConfigurationHeaders();
40+
41+
public static final String JOBMANAGER_APPLICATION_CONFIG_REST_PATH =
42+
"/applications/:" + ApplicationIDPathParameter.KEY + "/jobmanager/config";
43+
44+
private JobManagerApplicationConfigurationHeaders() {}
45+
46+
@Override
47+
public Class<EmptyRequestBody> getRequestClass() {
48+
return EmptyRequestBody.class;
49+
}
50+
51+
@Override
52+
public HttpMethodWrapper getHttpMethod() {
53+
return HttpMethodWrapper.GET;
54+
}
55+
56+
@Override
57+
public String getTargetRestEndpointURL() {
58+
return JOBMANAGER_APPLICATION_CONFIG_REST_PATH;
59+
}
60+
61+
@Override
62+
public Class<ConfigurationInfo> getResponseClass() {
63+
return ConfigurationInfo.class;
64+
}
65+
66+
@Override
67+
public HttpResponseStatus getResponseStatusCode() {
68+
return HttpResponseStatus.OK;
69+
}
70+
71+
@Override
72+
public ApplicationMessageParameters getUnresolvedMessageParameters() {
73+
return new ApplicationMessageParameters();
74+
}
75+
76+
public static JobManagerApplicationConfigurationHeaders getInstance() {
77+
return INSTANCE;
78+
}
79+
80+
@Override
81+
public String getDescription() {
82+
return "Returns the jobmanager's configuration of a specific application.";
83+
}
84+
}

flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.flink.runtime.rest.handler.application.ApplicationCancellationHandler;
3838
import org.apache.flink.runtime.rest.handler.application.ApplicationDetailsHandler;
3939
import org.apache.flink.runtime.rest.handler.application.ApplicationsOverviewHandler;
40+
import org.apache.flink.runtime.rest.handler.application.JobManagerApplicationConfigurationHandler;
4041
import org.apache.flink.runtime.rest.handler.cluster.ClusterConfigHandler;
4142
import org.apache.flink.runtime.rest.handler.cluster.ClusterOverviewHandler;
4243
import org.apache.flink.runtime.rest.handler.cluster.DashboardConfigHandler;
@@ -137,6 +138,7 @@
137138
import org.apache.flink.runtime.rest.messages.YarnCancelJobTerminationHeaders;
138139
import org.apache.flink.runtime.rest.messages.YarnStopJobTerminationHeaders;
139140
import org.apache.flink.runtime.rest.messages.application.ApplicationDetailsHeaders;
141+
import org.apache.flink.runtime.rest.messages.application.JobManagerApplicationConfigurationHeaders;
140142
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
141143
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
142144
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
@@ -506,6 +508,14 @@ protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initiali
506508
responseHeaders,
507509
ApplicationDetailsHeaders.getInstance());
508510

511+
JobManagerApplicationConfigurationHandler jobManagerApplicationConfigurationHandler =
512+
new JobManagerApplicationConfigurationHandler(
513+
leaderRetriever,
514+
timeout,
515+
responseHeaders,
516+
JobManagerApplicationConfigurationHeaders.getInstance(),
517+
clusterConfiguration);
518+
509519
JobAccumulatorsHandler jobAccumulatorsHandler =
510520
new JobAccumulatorsHandler(
511521
leaderRetriever,
@@ -818,6 +828,10 @@ protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initiali
818828
handlers.add(
819829
Tuple2.of(
820830
applicationDetailsHandler.getMessageHeaders(), applicationDetailsHandler));
831+
handlers.add(
832+
Tuple2.of(
833+
jobManagerApplicationConfigurationHandler.getMessageHeaders(),
834+
jobManagerApplicationConfigurationHandler));
821835
handlers.add(Tuple2.of(jobAccumulatorsHandler.getMessageHeaders(), jobAccumulatorsHandler));
822836
handlers.add(Tuple2.of(taskManagersHandler.getMessageHeaders(), taskManagersHandler));
823837
handlers.add(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.rest.handler.application;
20+
21+
import org.apache.flink.api.common.ApplicationID;
22+
import org.apache.flink.configuration.Configuration;
23+
import org.apache.flink.configuration.JobManagerOptions;
24+
import org.apache.flink.runtime.rest.handler.HandlerRequest;
25+
import org.apache.flink.runtime.rest.handler.HandlerRequestException;
26+
import org.apache.flink.runtime.rest.messages.ApplicationIDPathParameter;
27+
import org.apache.flink.runtime.rest.messages.ApplicationMessageParameters;
28+
import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
29+
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
30+
import org.apache.flink.runtime.rest.messages.application.JobManagerApplicationConfigurationHeaders;
31+
import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
32+
import org.apache.flink.testutils.TestingUtils;
33+
34+
import org.junit.jupiter.api.Test;
35+
36+
import java.util.Collections;
37+
import java.util.HashMap;
38+
import java.util.Map;
39+
40+
import static org.assertj.core.api.Assertions.assertThat;
41+
42+
/** Test for the {@link JobManagerApplicationConfigurationHandler}. */
43+
class JobManagerApplicationConfigurationHandlerTest {
44+
45+
private static HandlerRequest<EmptyRequestBody> createRequest(ApplicationID applicationId)
46+
throws HandlerRequestException {
47+
Map<String, String> pathParameters = new HashMap<>();
48+
pathParameters.put(ApplicationIDPathParameter.KEY, applicationId.toString());
49+
return HandlerRequest.resolveParametersAndCreate(
50+
EmptyRequestBody.getInstance(),
51+
new ApplicationMessageParameters(),
52+
pathParameters,
53+
Collections.emptyMap(),
54+
Collections.emptyList());
55+
}
56+
57+
@Test
58+
void testRequestConfiguration() throws Exception {
59+
final Configuration configuration = new Configuration();
60+
configuration.set(JobManagerOptions.ADDRESS, "address");
61+
62+
final JobManagerApplicationConfigurationHandler handler =
63+
new JobManagerApplicationConfigurationHandler(
64+
() -> null,
65+
TestingUtils.TIMEOUT,
66+
Collections.emptyMap(),
67+
JobManagerApplicationConfigurationHeaders.getInstance(),
68+
configuration);
69+
70+
final ApplicationID applicationId = ApplicationID.generate();
71+
final HandlerRequest<EmptyRequestBody> handlerRequest = createRequest(applicationId);
72+
73+
final ConfigurationInfo configurationInfo =
74+
handler.handleRequest(handlerRequest, new TestingRestfulGateway.Builder().build())
75+
.get();
76+
77+
assertThat(configurationInfo.get(0).getKey()).isEqualTo(JobManagerOptions.ADDRESS.key());
78+
assertThat(configurationInfo.get(0).getValue()).isEqualTo("address");
79+
}
80+
}

0 commit comments

Comments
 (0)