Skip to content

Commit 47478ed

Browse files
committed
[FLINK-38486] Harden shutdown of system UDFs
If a resource is lazily created in open, we can only close after checking for null. Otherwise a failure during initialization will trigger secondary failures.
1 parent 13618dd commit 47478ed

File tree

10 files changed

+52
-22
lines changed

10 files changed

+52
-22
lines changed

flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,9 @@ public void writeRecord(T record) {
210210
@Override
211211
public void close() throws IOException {
212212
try {
213-
writer.close();
213+
if (writer != null) {
214+
writer.close();
215+
}
214216
} catch (Exception e) {
215217
throw new TableException("Exception in close", e);
216218
}

flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,8 +471,10 @@ public void writeRecord(RowData record) throws IOException {
471471

472472
@Override
473473
public void close() throws IOException {
474-
this.output.flush();
475-
this.output.close();
474+
if (output != null) {
475+
this.output.flush();
476+
this.output.close();
477+
}
476478
}
477479
};
478480
}

flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,9 @@ public void endInput() throws Exception {}
124124
public void close() throws Exception {
125125
try {
126126
staticPartitions.clear();
127-
writer.close();
127+
if (writer != null) {
128+
writer.close();
129+
}
128130
} catch (Exception e) {
129131
throw new TableException("Exception in close", e);
130132
}

flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/events/otel/OpenTelemetryEventReporter.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,14 @@ public void open(MetricConfig metricConfig) {
100100

101101
@Override
102102
public void close() {
103-
logRecordProcessor.forceFlush();
104-
logRecordProcessor.close();
105-
logRecordExporter.flush();
106-
logRecordExporter.close();
103+
if (logRecordProcessor != null) {
104+
logRecordProcessor.forceFlush();
105+
logRecordProcessor.close();
106+
}
107+
if (logRecordExporter != null) {
108+
logRecordExporter.flush();
109+
logRecordExporter.close();
110+
}
107111
}
108112

109113
@Override

flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,11 @@ public void open(MetricConfig metricConfig) {
124124

125125
@Override
126126
public void close() {
127-
exporter.flush();
128-
waitForLastReportToComplete();
129-
exporter.close();
127+
if (exporter != null) {
128+
exporter.flush();
129+
waitForLastReportToComplete();
130+
exporter.close();
131+
}
130132
}
131133

132134
@Override

flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,14 @@ public void open(MetricConfig metricConfig) {
9797

9898
@Override
9999
public void close() {
100-
spanProcessor.forceFlush();
101-
spanProcessor.close();
102-
spanExporter.flush();
103-
spanExporter.close();
100+
if (spanProcessor != null) {
101+
spanProcessor.forceFlush();
102+
spanProcessor.close();
103+
}
104+
if (spanExporter != null) {
105+
spanExporter.flush();
106+
spanExporter.close();
107+
}
104108
}
105109

106110
private void notifyOfAddedSpanInternal(Span span, io.opentelemetry.api.trace.Span parent) {

flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,18 @@ public void write(RowData element) {
113113
}
114114

115115
public void close() throws Exception {
116-
arrowStreamWriter.end();
117-
arrowStreamReader.close();
118-
rootWriter.close();
119-
allocator.close();
116+
if (arrowStreamWriter != null) {
117+
arrowStreamWriter.end();
118+
}
119+
if (arrowStreamReader != null) {
120+
arrowStreamReader.close();
121+
}
122+
if (rootWriter != null) {
123+
rootWriter.close();
124+
}
125+
if (allocator != null) {
126+
allocator.close();
127+
}
120128
}
121129

122130
/** Creates an {@link ArrowWriter}. */

flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,9 @@ public void open() throws Exception {
129129
@Override
130130
public void close() throws Exception {
131131
super.close();
132-
windowsGrouping.close();
132+
if (windowsGrouping != null) {
133+
windowsGrouping.close();
134+
}
133135
}
134136

135137
@Override

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/DefaultExpressionEvaluator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ public MethodHandle open(FunctionContext context) {
7676
@Override
7777
public void close() {
7878
try {
79-
instance.close();
79+
if (instance != null) {
80+
instance.close();
81+
}
8082
} catch (Exception e) {
8183
throw new TableException(
8284
String.format(

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ private void processCurrentData() throws Exception {
152152
@Override
153153
public void close() throws Exception {
154154
super.close();
155-
this.currentData.close();
155+
if (this.currentData != null) {
156+
this.currentData.close();
157+
}
156158
}
157159
}

0 commit comments

Comments
 (0)