Skip to content

Commit 180d587

Browse files
authored
[FLINK-37164][Runtime] Speed up state v2 synchronous methods execution (#26005)
1 parent 82baf88 commit 180d587

File tree

13 files changed

+248
-132
lines changed

13 files changed

+248
-132
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ boolean tryOccupyKey(RecordContext<K> recordContext) {
305305
@Override
306306
public <IN, OUT> InternalStateFuture<OUT> handleRequest(
307307
@Nullable State state, StateRequestType type, @Nullable IN payload) {
308-
return handleRequest(state, type, payload, false);
308+
return handleRequest(state, type, false, payload, false);
309309
}
310310

311311
/**
@@ -314,19 +314,27 @@ public <IN, OUT> InternalStateFuture<OUT> handleRequest(
314314
* @param state the state to request. Could be {@code null} if the type is {@link
315315
* StateRequestType#SYNC_POINT}.
316316
* @param type the type of this request.
317+
* @param sync whether to trigger the request synchronously once it's ready.
317318
* @param payload the payload input for this request.
318319
* @param allowOverdraft whether to allow overdraft.
319320
* @return the state future.
320321
*/
321322
public <IN, OUT> InternalStateFuture<OUT> handleRequest(
322323
@Nullable State state,
323324
StateRequestType type,
325+
boolean sync,
324326
@Nullable IN payload,
325327
boolean allowOverdraft) {
326328
// Step 1: build state future & assign context.
327329
InternalStateFuture<OUT> stateFuture = stateFutureFactory.create(currentContext);
328330
StateRequest<K, ?, IN, OUT> request =
329-
new StateRequest<>(state, type, payload, stateFuture, currentContext);
331+
new StateRequest<>(
332+
state,
333+
type,
334+
sync || type == StateRequestType.SYNC_POINT,
335+
payload,
336+
stateFuture,
337+
currentContext);
330338

331339
// Step 2: try to seize the capacity, if the current in-flight records exceeds the limit,
332340
// block the current state request from entering until some buffered requests are processed.
@@ -346,22 +354,25 @@ public <IN, OUT> InternalStateFuture<OUT> handleRequest(
346354
@Override
347355
public <IN, OUT> OUT handleRequestSync(
348356
State state, StateRequestType type, @Nullable IN payload) {
349-
InternalStateFuture<OUT> stateFuture = handleRequest(state, type, payload);
350-
// Trigger since we are waiting the result.
351-
triggerIfNeeded(true);
352-
try {
353-
while (!stateFuture.isDone()) {
354-
if (!mailboxExecutor.tryYield()) {
355-
// We force trigger the buffer if the executor is not fully loaded.
356-
if (!stateExecutor.fullyLoaded()) {
357-
triggerIfNeeded(true);
357+
InternalStateFuture<OUT> stateFuture = handleRequest(state, type, true, payload, false);
358+
if (!stateFuture.isDone()) {
359+
// Trigger since we are waiting the result.
360+
triggerIfNeeded(true);
361+
try {
362+
while (!stateFuture.isDone()) {
363+
if (!mailboxExecutor.tryYield()) {
364+
// We force trigger the buffer if the executor is not fully loaded.
365+
if (!stateExecutor.fullyLoaded()) {
366+
triggerIfNeeded(true);
367+
}
368+
waitForNewMails();
358369
}
359-
waitForNewMails();
360370
}
371+
} catch (InterruptedException ignored) {
372+
// ignore the interrupted exception to avoid throwing fatal error when the task
373+
// cancel
374+
// or exit.
361375
}
362-
} catch (InterruptedException ignored) {
363-
// ignore the interrupted exception to avoid throwing fatal error when the task cancel
364-
// or exit.
365376
}
366377
return stateFuture.get();
367378
}
@@ -373,7 +384,15 @@ public <N> void setCurrentNamespaceForState(
373384
}
374385

375386
<IN, OUT> void insertActiveBuffer(StateRequest<K, ?, IN, OUT> request) {
376-
stateRequestsBuffer.enqueueToActive(request);
387+
if (request.isSync()) {
388+
if (request.getRequestType() == StateRequestType.SYNC_POINT) {
389+
request.getFuture().complete(null);
390+
} else {
391+
stateExecutor.executeRequestSync(request);
392+
}
393+
} else {
394+
stateRequestsBuffer.enqueueToActive(request);
395+
}
377396
}
378397

379398
<IN, OUT> void insertBlockingBuffer(StateRequest<K, ?, IN, OUT> request) {
@@ -441,7 +460,7 @@ private void seizeCapacity(boolean allowOverdraft) {
441460
*/
442461
public StateFuture<Void> syncPointRequestWithCallback(
443462
ThrowingRunnable<Exception> callback, boolean allowOverdraft) {
444-
return handleRequest(null, StateRequestType.SYNC_POINT, null, allowOverdraft)
463+
return handleRequest(null, StateRequestType.SYNC_POINT, true, null, allowOverdraft)
445464
.thenAccept(v -> callback.run());
446465
}
447466

@@ -464,7 +483,7 @@ public void drainInflightRecords(int targetNum) {
464483
* only drain in best efforts and return when no progress is made.
465484
*/
466485
private void drainInflightRecords(int targetNum, boolean forceToWait) {
467-
if (!forceToWait && drainDepth > 0) {
486+
if (!forceToWait && drainDepth > 5) {
468487
// We don't allow recursive call of drain if we are not forced to wait here.
469488
// This is to avoid stack overflow, since the yield will pick up another processing,
470489
// which may cause another drain.

flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ public interface StateExecutor {
4545
*/
4646
StateRequestContainer createStateRequestContainer();
4747

48+
/**
49+
* Execute a single state request *synchronously*. This is for synchronous APIs.
50+
*
51+
* @param stateRequest the request to run.
52+
*/
53+
void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest);
54+
4855
/**
4956
* Check if this executor is fully loaded. Will be invoked to determine whether to give more
5057
* requests to run or wait for a while.

flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ public class StateRequest<K, N, IN, OUT> implements Serializable {
4444
/** The type of this request. */
4545
private final StateRequestType type;
4646

47+
private final boolean sync;
48+
4749
/** The payload(input) of this request. */
4850
@Nullable private final IN payload;
4951

@@ -58,11 +60,13 @@ public class StateRequest<K, N, IN, OUT> implements Serializable {
5860
public StateRequest(
5961
@Nullable State state,
6062
StateRequestType type,
63+
boolean sync,
6164
@Nullable IN payload,
6265
InternalStateFuture<OUT> stateFuture,
6366
RecordContext<K> context) {
6467
this.state = state;
6568
this.type = type;
69+
this.sync = sync;
6670
this.payload = payload;
6771
this.stateFuture = stateFuture;
6872
this.context = context;
@@ -79,6 +83,10 @@ public IN getPayload() {
7983
return payload;
8084
}
8185

86+
public boolean isSync() {
87+
return sync;
88+
}
89+
8290
@Nullable
8391
public State getState() {
8492
return state;

flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -148,14 +148,9 @@ boolean checkCurrentSeq(long seq) {
148148
}
149149

150150
void enqueueToActive(StateRequest<K, ?, ?, ?> request) {
151-
if (request.getRequestType() == StateRequestType.SYNC_POINT) {
152-
request.getFuture().complete(null);
153-
} else {
154-
activeQueue.add(request);
155-
if (bufferTimeout > 0 && seqAndTimeout == null) {
156-
seqAndTimeout =
157-
Tuple2.of(currentSeq.get(), System.currentTimeMillis() + bufferTimeout);
158-
}
151+
activeQueue.add(request);
152+
if (bufferTimeout > 0 && seqAndTimeout == null) {
153+
seqAndTimeout = Tuple2.of(currentSeq.get(), System.currentTimeMillis() + bufferTimeout);
159154
}
160155
}
161156

flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AbstractStateIteratorTest.java

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.api.common.state.v2.State;
2222
import org.apache.flink.api.common.state.v2.StateIterator;
23+
import org.apache.flink.core.state.InternalStateFuture;
2324
import org.apache.flink.core.state.StateFutureUtils;
2425
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
2526
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
@@ -196,40 +197,7 @@ public CompletableFuture<Void> executeBatchRequests(
196197
CompletableFuture<Void> future = new CompletableFuture<>();
197198
for (StateRequest request :
198199
((MockStateRequestContainer) stateRequestContainer).getStateRequestList()) {
199-
if (request.getRequestType() == StateRequestType.MAP_ITER) {
200-
ArrayList<Integer> results = new ArrayList<>(step);
201-
for (int i = 0; current < limit && i < step; i++) {
202-
results.add(current++);
203-
}
204-
request.getFuture()
205-
.complete(
206-
new TestIterator(
207-
request.getState(),
208-
request.getRequestType(),
209-
aec,
210-
results,
211-
current,
212-
limit));
213-
} else if (request.getRequestType() == StateRequestType.ITERATOR_LOADING) {
214-
assertThat(request.getPayload()).isInstanceOf(TestIterator.class);
215-
assertThat(((TestIterator) request.getPayload()).current).isEqualTo(current);
216-
ArrayList<Integer> results = new ArrayList<>(step);
217-
for (int i = 0; current < limit && i < step; i++) {
218-
results.add(current++);
219-
}
220-
request.getFuture()
221-
.complete(
222-
new TestIterator(
223-
request.getState(),
224-
((TestIterator) request.getPayload()).getRequestType(),
225-
aec,
226-
results,
227-
current,
228-
limit));
229-
} else {
230-
fail("Unsupported request type " + request.getRequestType());
231-
}
232-
processedCount.incrementAndGet();
200+
executeRequestSync(request);
233201
}
234202
future.complete(null);
235203
return future;
@@ -240,6 +208,44 @@ public StateRequestContainer createStateRequestContainer() {
240208
return new MockStateRequestContainer();
241209
}
242210

211+
@Override
212+
public void executeRequestSync(StateRequest<?, ?, ?, ?> request) {
213+
if (request.getRequestType() == StateRequestType.MAP_ITER) {
214+
ArrayList<Integer> results = new ArrayList<>(step);
215+
for (int i = 0; current < limit && i < step; i++) {
216+
results.add(current++);
217+
}
218+
((InternalStateFuture<StateIterator<Integer>>) request.getFuture())
219+
.complete(
220+
new TestIterator(
221+
request.getState(),
222+
request.getRequestType(),
223+
aec,
224+
results,
225+
current,
226+
limit));
227+
} else if (request.getRequestType() == StateRequestType.ITERATOR_LOADING) {
228+
assertThat(request.getPayload()).isInstanceOf(TestIterator.class);
229+
assertThat(((TestIterator) request.getPayload()).current).isEqualTo(current);
230+
ArrayList<Integer> results = new ArrayList<>(step);
231+
for (int i = 0; current < limit && i < step; i++) {
232+
results.add(current++);
233+
}
234+
((InternalStateFuture<StateIterator<Integer>>) request.getFuture())
235+
.complete(
236+
new TestIterator(
237+
request.getState(),
238+
((TestIterator) request.getPayload()).getRequestType(),
239+
aec,
240+
results,
241+
current,
242+
limit));
243+
} else {
244+
fail("Unsupported request type " + request.getRequestType());
245+
}
246+
processedCount.incrementAndGet();
247+
}
248+
243249
@Override
244250
public boolean fullyLoaded() {
245251
return false;

flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.api.common.typeutils.base.IntSerializer;
2626
import org.apache.flink.api.java.tuple.Tuple2;
2727
import org.apache.flink.core.fs.CloseableRegistry;
28+
import org.apache.flink.core.state.InternalStateFuture;
2829
import org.apache.flink.core.state.StateFutureImpl.AsyncFrameworkExceptionHandler;
2930
import org.apache.flink.core.state.StateFutureUtils;
3031
import org.apache.flink.runtime.asyncprocessing.EpochManager.Epoch;
@@ -847,26 +848,7 @@ public CompletableFuture<Void> executeBatchRequests(
847848
CompletableFuture<Void> future = new CompletableFuture<>();
848849
for (StateRequest request :
849850
((MockStateRequestContainer) stateRequestContainer).getStateRequestList()) {
850-
if (request.getRequestType() == StateRequestType.VALUE_GET) {
851-
Preconditions.checkState(request.getState() != null);
852-
TestValueState state = (TestValueState) request.getState();
853-
Integer val =
854-
state.underlyingState.get(
855-
(String) request.getRecordContext().getKey(),
856-
(String) request.getRecordContext().getNamespace(state));
857-
request.getFuture().complete(val);
858-
} else if (request.getRequestType() == StateRequestType.VALUE_UPDATE) {
859-
Preconditions.checkState(request.getState() != null);
860-
TestValueState state = (TestValueState) request.getState();
861-
862-
state.underlyingState.update(
863-
(String) request.getRecordContext().getKey(),
864-
(String) request.getRecordContext().getNamespace(state),
865-
(Integer) request.getPayload());
866-
request.getFuture().complete(null);
867-
} else {
868-
throw new UnsupportedOperationException("Unsupported request type");
869-
}
851+
executeRequestSync(request);
870852
}
871853
future.complete(null);
872854
return future;
@@ -877,6 +859,30 @@ public StateRequestContainer createStateRequestContainer() {
877859
return new MockStateRequestContainer();
878860
}
879861

862+
@Override
863+
public void executeRequestSync(StateRequest<?, ?, ?, ?> request) {
864+
if (request.getRequestType() == StateRequestType.VALUE_GET) {
865+
Preconditions.checkState(request.getState() != null);
866+
TestValueState state = (TestValueState) request.getState();
867+
Integer val =
868+
state.underlyingState.get(
869+
(String) request.getRecordContext().getKey(),
870+
(String) request.getRecordContext().getNamespace(state));
871+
((InternalStateFuture<Integer>) request.getFuture()).complete(val);
872+
} else if (request.getRequestType() == StateRequestType.VALUE_UPDATE) {
873+
Preconditions.checkState(request.getState() != null);
874+
TestValueState state = (TestValueState) request.getState();
875+
876+
state.underlyingState.update(
877+
(String) request.getRecordContext().getKey(),
878+
(String) request.getRecordContext().getNamespace(state),
879+
(Integer) request.getPayload());
880+
request.getFuture().complete(null);
881+
} else {
882+
throw new UnsupportedOperationException("Unsupported request type");
883+
}
884+
}
885+
880886
@Override
881887
public boolean fullyLoaded() {
882888
return false;

flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/MockStateExecutor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public CompletableFuture<Void> executeBatchRequests(
3131
Preconditions.checkArgument(stateRequestContainer instanceof MockStateRequestContainer);
3232
for (StateRequest<?, ?, ?, ?> request :
3333
((MockStateRequestContainer) stateRequestContainer).getStateRequestList()) {
34-
request.getFuture().complete(null);
34+
executeRequestSync(request);
3535
}
3636
return CompletableFuture.completedFuture(null);
3737
}
@@ -41,6 +41,11 @@ public StateRequestContainer createStateRequestContainer() {
4141
return new MockStateRequestContainer();
4242
}
4343

44+
@Override
45+
public void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest) {
46+
stateRequest.getFuture().complete(null);
47+
}
48+
4449
@Override
4550
public boolean fullyLoaded() {
4651
return false;

0 commit comments

Comments
 (0)