Skip to content

Commit 5954b5b

Browse files
eemariozhuzhurk
authored andcommitted
[FLINK-38757][runtime] Introduce the base class for application
1 parent 9d2daa8 commit 5954b5b

File tree

4 files changed

+624
-0
lines changed

4 files changed

+624
-0
lines changed
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.common;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.util.AbstractID;
23+
import org.apache.flink.util.StringUtils;
24+
25+
import java.nio.ByteBuffer;
26+
27+
/** Unique (at least statistically unique) identifier for a Flink Application. */
28+
@PublicEvolving
29+
public final class ApplicationID extends AbstractID {
30+
31+
private static final long serialVersionUID = 1L;
32+
33+
/** Creates a new (statistically) random ApplicationID. */
34+
public ApplicationID() {
35+
super();
36+
}
37+
38+
/**
39+
* Creates a new ApplicationID, using the given lower and upper parts.
40+
*
41+
* @param lowerPart The lower 8 bytes of the ID.
42+
* @param upperPart The upper 8 bytes of the ID.
43+
*/
44+
public ApplicationID(long lowerPart, long upperPart) {
45+
super(lowerPart, upperPart);
46+
}
47+
48+
/**
49+
* Creates a new ApplicationID from the given byte sequence. The byte sequence must be exactly
50+
* 16 bytes long. The first eight bytes make up the lower part of the ID, while the next 8 bytes
51+
* make up the upper part of the ID.
52+
*
53+
* @param bytes The byte sequence.
54+
*/
55+
public ApplicationID(byte[] bytes) {
56+
super(bytes);
57+
}
58+
59+
// ------------------------------------------------------------------------
60+
// Static factory methods
61+
// ------------------------------------------------------------------------
62+
63+
/**
64+
* Creates a new (statistically) random ApplicationID.
65+
*
66+
* @return A new random ApplicationID.
67+
*/
68+
public static ApplicationID generate() {
69+
return new ApplicationID();
70+
}
71+
72+
/**
73+
* Creates a new ApplicationID from the given byte sequence. The byte sequence must be exactly
74+
* 16 bytes long. The first eight bytes make up the lower part of the ID, while the next 8 bytes
75+
* make up the upper part of the ID.
76+
*
77+
* @param bytes The byte sequence.
78+
* @return A new ApplicationID corresponding to the ID encoded in the bytes.
79+
*/
80+
public static ApplicationID fromByteArray(byte[] bytes) {
81+
return new ApplicationID(bytes);
82+
}
83+
84+
public static ApplicationID fromByteBuffer(ByteBuffer buf) {
85+
long lower = buf.getLong();
86+
long upper = buf.getLong();
87+
return new ApplicationID(lower, upper);
88+
}
89+
90+
/**
91+
* Parses an ApplicationID from the given string.
92+
*
93+
* @param hexString string representation of an ApplicationID
94+
* @return Parsed ApplicationID
95+
* @throws IllegalArgumentException if the ApplicationID could not be parsed from the given
96+
* string
97+
*/
98+
public static ApplicationID fromHexString(String hexString) {
99+
try {
100+
return new ApplicationID(StringUtils.hexStringToByte(hexString));
101+
} catch (Exception e) {
102+
throw new IllegalArgumentException(
103+
"Cannot parse ApplicationID from \""
104+
+ hexString
105+
+ "\". The expected format is "
106+
+ "[0-9a-fA-F]{32}, e.g. fd72014d4c864993a2e5a9287b4a9c5d.",
107+
e);
108+
}
109+
}
110+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.common;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.configuration.DeploymentOptions;
23+
24+
/** Possible states of an application. */
25+
@PublicEvolving
26+
public enum ApplicationState {
27+
28+
/** The application is newly created and has not started running. */
29+
CREATED(false),
30+
31+
/** The application has started running. */
32+
RUNNING(false),
33+
34+
/** The application has encountered a failure and is waiting for the cleanup to complete. */
35+
FAILING(false),
36+
37+
/** The application has failed due to an exception. */
38+
FAILED(true),
39+
40+
/** The application is being canceled. */
41+
CANCELING(false),
42+
43+
/** The application has been canceled. */
44+
CANCELED(true),
45+
46+
/**
47+
* All jobs in the application have completed, See {@link
48+
* DeploymentOptions#TERMINATE_APPLICATION_ON_ANY_JOB_EXCEPTION} for more information.
49+
*/
50+
FINISHED(true);
51+
52+
// --------------------------------------------------------------------------------------------
53+
54+
private final boolean terminalState;
55+
56+
ApplicationState(boolean terminalState) {
57+
this.terminalState = terminalState;
58+
}
59+
60+
public boolean isTerminalState() {
61+
return terminalState;
62+
}
63+
}
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.application;
20+
21+
import org.apache.flink.api.common.ApplicationID;
22+
import org.apache.flink.api.common.ApplicationState;
23+
import org.apache.flink.api.common.JobID;
24+
import org.apache.flink.runtime.dispatcher.Dispatcher;
25+
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
26+
import org.apache.flink.runtime.messages.Acknowledge;
27+
import org.apache.flink.runtime.rpc.FatalErrorHandler;
28+
import org.apache.flink.util.concurrent.ScheduledExecutor;
29+
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import java.io.Serializable;
34+
import java.util.Arrays;
35+
import java.util.Collections;
36+
import java.util.EnumMap;
37+
import java.util.HashSet;
38+
import java.util.Map;
39+
import java.util.Set;
40+
import java.util.concurrent.CompletableFuture;
41+
import java.util.concurrent.Executor;
42+
43+
/** Base class for all applications. */
44+
public abstract class AbstractApplication implements Serializable {
45+
46+
private static final Logger LOG = LoggerFactory.getLogger(AbstractApplication.class);
47+
48+
private static final long serialVersionUID = 1L;
49+
50+
private final ApplicationID applicationId;
51+
52+
private ApplicationState applicationState;
53+
54+
/**
55+
* Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()}) when the
56+
* application transitioned into a certain status. The index into this array is the ordinal of
57+
* the enum value, i.e. the timestamp when the application went into state "RUNNING" is at
58+
* {@code timestamps[RUNNING.ordinal()]}.
59+
*/
60+
private final long[] statusTimestamps;
61+
62+
private final Set<JobID> jobs = new HashSet<>();
63+
64+
public AbstractApplication(ApplicationID applicationId) {
65+
this.applicationId = applicationId;
66+
this.statusTimestamps = new long[ApplicationState.values().length];
67+
this.applicationState = ApplicationState.CREATED;
68+
this.statusTimestamps[ApplicationState.CREATED.ordinal()] = System.currentTimeMillis();
69+
}
70+
71+
/**
72+
* Entry method to run the application asynchronously.
73+
*
74+
* <p>The returned CompletableFuture indicates that the execution request has been accepted and
75+
* the application transitions to RUNNING state.
76+
*
77+
* <p><b>Note:</b> This method must be called in the main thread of the {@link Dispatcher}.
78+
*
79+
* @param dispatcherGateway the dispatcher of the cluster to run the application.
80+
* @param scheduledExecutor the executor to run the user logic.
81+
* @param mainThreadExecutor the executor bound to the main thread.
82+
* @param errorHandler the handler for fatal errors.
83+
* @return a future indicating that the execution request has been accepted.
84+
*/
85+
public abstract CompletableFuture<Acknowledge> execute(
86+
final DispatcherGateway dispatcherGateway,
87+
final ScheduledExecutor scheduledExecutor,
88+
final Executor mainThreadExecutor,
89+
final FatalErrorHandler errorHandler);
90+
91+
/**
92+
* Cancels the application execution.
93+
*
94+
* <p>This method is responsible for initiating the cancellation process and handling the
95+
* appropriate state transitions of the application.
96+
*
97+
* <p><b>Note:</b> This method must be called in the main thread of the {@link Dispatcher}.
98+
*/
99+
public abstract void cancel();
100+
101+
/**
102+
* Cleans up execution associated with the application.
103+
*
104+
* <p>This method is typically invoked when the cluster is shutting down.
105+
*/
106+
public abstract void dispose();
107+
108+
public abstract String getName();
109+
110+
public ApplicationID getApplicationId() {
111+
return applicationId;
112+
}
113+
114+
public Set<JobID> getJobs() {
115+
return Collections.unmodifiableSet(jobs);
116+
}
117+
118+
/**
119+
* Adds a job ID to the jobs set.
120+
*
121+
* <p><b>Note:</b>This method must be called in the main thread of the {@link Dispatcher}.
122+
*/
123+
public boolean addJob(JobID jobId) {
124+
return jobs.add(jobId);
125+
}
126+
127+
public ApplicationState getApplicationStatus() {
128+
return applicationState;
129+
}
130+
131+
// ------------------------------------------------------------------------
132+
// State Transitions
133+
// ------------------------------------------------------------------------
134+
135+
private static final Map<ApplicationState, Set<ApplicationState>> ALLOWED_TRANSITIONS;
136+
137+
static {
138+
ALLOWED_TRANSITIONS = new EnumMap<>(ApplicationState.class);
139+
ALLOWED_TRANSITIONS.put(
140+
ApplicationState.CREATED,
141+
new HashSet<>(Arrays.asList(ApplicationState.RUNNING, ApplicationState.CANCELING)));
142+
ALLOWED_TRANSITIONS.put(
143+
ApplicationState.RUNNING,
144+
new HashSet<>(
145+
Arrays.asList(
146+
ApplicationState.FINISHED,
147+
ApplicationState.FAILING,
148+
ApplicationState.CANCELING)));
149+
ALLOWED_TRANSITIONS.put(
150+
ApplicationState.FAILING,
151+
new HashSet<>(Collections.singletonList(ApplicationState.FAILED)));
152+
ALLOWED_TRANSITIONS.put(
153+
ApplicationState.CANCELING,
154+
new HashSet<>(Collections.singletonList(ApplicationState.CANCELED)));
155+
}
156+
157+
/** All state transition methods must be called in the main thread. */
158+
public void transitionToRunning() {
159+
transitionState(ApplicationState.RUNNING);
160+
}
161+
162+
/** All state transition methods must be called in the main thread. */
163+
public void transitionToCanceling() {
164+
transitionState(ApplicationState.CANCELING);
165+
}
166+
167+
/** All state transition methods must be called in the main thread. */
168+
public void transitionToFailing() {
169+
transitionState(ApplicationState.FAILING);
170+
}
171+
172+
/** All state transition methods must be called in the main thread. */
173+
public void transitionToFailed() {
174+
transitionState(ApplicationState.FAILED);
175+
}
176+
177+
/** All state transition methods must be called in the main thread. */
178+
public void transitionToFinished() {
179+
transitionState(ApplicationState.FINISHED);
180+
}
181+
182+
/** All state transition methods must be called in the main thread. */
183+
public void transitionToCanceled() {
184+
transitionState(ApplicationState.CANCELED);
185+
}
186+
187+
void transitionState(ApplicationState targetState) {
188+
validateTransition(targetState);
189+
LOG.info(
190+
"Application {} ({}) switched from state {} to {}.",
191+
getName(),
192+
getApplicationId(),
193+
applicationState,
194+
targetState);
195+
this.statusTimestamps[targetState.ordinal()] = System.currentTimeMillis();
196+
this.applicationState = targetState;
197+
}
198+
199+
private void validateTransition(ApplicationState targetState) {
200+
Set<ApplicationState> allowedTransitions = ALLOWED_TRANSITIONS.get(applicationState);
201+
if (allowedTransitions == null || !allowedTransitions.contains(targetState)) {
202+
throw new IllegalStateException(
203+
String.format(
204+
"Invalid transition from %s to %s", applicationState, targetState));
205+
}
206+
}
207+
208+
public long getStatusTimestamp(ApplicationState status) {
209+
return this.statusTimestamps[status.ordinal()];
210+
}
211+
}

0 commit comments

Comments
 (0)