Skip to content

Commit 446666c

Browse files
committed
[FLINK-37168][python] clean up TimerRegistrationAction in a thread-safe manner in unregisteredTimers list after timers are registered
1 parent f2460c8 commit 446666c

File tree

7 files changed

+49
-6
lines changed

7 files changed

+49
-6
lines changed

flink-python/pyflink/.idea/.gitignore

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

flink-python/pyflink/.idea/inspectionProfiles/profiles_settings.xml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

flink-python/pyflink/.idea/misc.xml

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

flink-python/pyflink/.idea/modules.xml

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

flink-python/pyflink/.idea/vcs.xml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/process/timer/TimerRegistrationAction.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
package org.apache.flink.streaming.api.operators.python.process.timer;
2020

21+
import java.util.List;
22+
23+
/** {@link TimerRegistrationAction} used to register Timer. */
2124
public class TimerRegistrationAction {
2225

2326
private final TimerRegistration timerRegistration;
@@ -26,16 +29,22 @@ public class TimerRegistrationAction {
2629

2730
private boolean isRegistered;
2831

32+
private final List<TimerRegistrationAction> containingList;
33+
2934
public TimerRegistrationAction(
30-
TimerRegistration timerRegistration, byte[] serializedTimerData) {
35+
TimerRegistration timerRegistration,
36+
byte[] serializedTimerData,
37+
List<TimerRegistrationAction> containingList) {
3138
this.timerRegistration = timerRegistration;
3239
this.serializedTimerData = serializedTimerData;
40+
this.containingList = containingList;
3341
this.isRegistered = false;
3442
}
3543

3644
public void run() {
3745
if (!isRegistered) {
3846
timerRegistration.setTimer(serializedTimerData);
47+
containingList.remove(this);
3948
isRegistered = true;
4049
}
4150
}

flink-python/src/main/java/org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
195195

196196
private transient Environment environment;
197197

198-
private transient List<TimerRegistrationAction> unregisteredTimers;
198+
private transient volatile List<TimerRegistrationAction> unregisteredTimers;
199199

200200
public BeamPythonFunctionRunner(
201201
Environment environment,
@@ -311,7 +311,7 @@ public void open(ReadableConfig config) throws Exception {
311311
ShutdownHookUtil.addShutdownHook(
312312
this, BeamPythonFunctionRunner.class.getSimpleName(), LOG);
313313

314-
unregisteredTimers = new LinkedList<>();
314+
unregisteredTimers = Collections.synchronizedList(new LinkedList<>());
315315
}
316316

317317
@Override
@@ -352,8 +352,10 @@ public void process(byte[] data) throws Exception {
352352

353353
@Override
354354
public void drainUnregisteredTimers() {
355-
for (TimerRegistrationAction timerRegistrationAction : unregisteredTimers) {
356-
timerRegistrationAction.run();
355+
synchronized (unregisteredTimers) {
356+
for (TimerRegistrationAction timerRegistrationAction : unregisteredTimers) {
357+
timerRegistrationAction.run();
358+
}
357359
}
358360
unregisteredTimers.clear();
359361
}
@@ -703,7 +705,9 @@ private TimerReceiverFactory createTimerReceiverFactory() {
703705
(timer, timerData) -> {
704706
TimerRegistrationAction timerRegistrationAction =
705707
new TimerRegistrationAction(
706-
timerRegistration, (byte[]) timer.getUserKey());
708+
timerRegistration,
709+
(byte[]) timer.getUserKey(),
710+
unregisteredTimers);
707711
unregisteredTimers.add(timerRegistrationAction);
708712
environment
709713
.getMainMailboxExecutor()

0 commit comments

Comments
 (0)