Skip to content

Commit 0c958e8

Browse files
AHeiserkhachatryan
authored andcommitted
[FLINK-38486] Harden shutdown of system UDFs
If a resource is lazily created in open, we can only close after checking for null. Otherwise a failure during initialization will trigger secondary failures.
1 parent 08e7b3b commit 0c958e8

File tree

9 files changed

+44
-18
lines changed

9 files changed

+44
-18
lines changed

flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,9 @@ public void writeRecord(T record) {
210210
@Override
211211
public void close() throws IOException {
212212
try {
213-
writer.close();
213+
if (writer != null) {
214+
writer.close();
215+
}
214216
} catch (Exception e) {
215217
throw new TableException("Exception in close", e);
216218
}

flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,8 +471,10 @@ public void writeRecord(RowData record) throws IOException {
471471

472472
@Override
473473
public void close() throws IOException {
474-
this.output.flush();
475-
this.output.close();
474+
if (output != null) {
475+
this.output.flush();
476+
this.output.close();
477+
}
476478
}
477479
};
478480
}

flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,9 @@ public void endInput() throws Exception {}
124124
public void close() throws Exception {
125125
try {
126126
staticPartitions.clear();
127-
writer.close();
127+
if (writer != null) {
128+
writer.close();
129+
}
128130
} catch (Exception e) {
129131
throw new TableException("Exception in close", e);
130132
}

flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/metrics/otel/OpenTelemetryMetricReporter.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,11 @@ public void open(MetricConfig metricConfig) {
100100

101101
@Override
102102
public void close() {
103-
exporter.flush();
104-
lastResult.join(1, TimeUnit.MINUTES);
105-
exporter.close();
103+
if (exporter != null) {
104+
exporter.flush();
105+
lastResult.join(1, TimeUnit.MINUTES);
106+
exporter.close();
107+
}
106108
}
107109

108110
@Override

flink-metrics/flink-metrics-otel/src/main/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporter.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,14 @@ public void open(MetricConfig metricConfig) {
7070

7171
@Override
7272
public void close() {
73-
spanProcessor.forceFlush();
74-
spanProcessor.close();
75-
spanExporter.flush();
76-
spanExporter.close();
73+
if (spanProcessor != null) {
74+
spanProcessor.forceFlush();
75+
spanProcessor.close();
76+
}
77+
if (spanExporter != null) {
78+
spanExporter.flush();
79+
spanExporter.close();
80+
}
7781
}
7882

7983
private void notifyOfAddedSpanInternal(Span span, io.opentelemetry.api.trace.Span parent) {

flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,18 @@ public void write(RowData element) {
113113
}
114114

115115
public void close() throws Exception {
116-
arrowStreamWriter.end();
117-
arrowStreamReader.close();
118-
rootWriter.close();
119-
allocator.close();
116+
if (arrowStreamWriter != null) {
117+
arrowStreamWriter.end();
118+
}
119+
if (arrowStreamReader != null) {
120+
arrowStreamReader.close();
121+
}
122+
if (rootWriter != null) {
123+
rootWriter.close();
124+
}
125+
if (allocator != null) {
126+
allocator.close();
127+
}
120128
}
121129

122130
/** Creates an {@link ArrowWriter}. */

flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,9 @@ public void open() throws Exception {
129129
@Override
130130
public void close() throws Exception {
131131
super.close();
132-
windowsGrouping.close();
132+
if (windowsGrouping != null) {
133+
windowsGrouping.close();
134+
}
133135
}
134136

135137
@Override

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/DefaultExpressionEvaluator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ public MethodHandle open(FunctionContext context) {
7676
@Override
7777
public void close() {
7878
try {
79-
instance.close();
79+
if (instance != null) {
80+
instance.close();
81+
}
8082
} catch (Exception e) {
8183
throw new TableException(
8284
String.format(

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,8 @@ private void processCurrentData() throws Exception {
152152
@Override
153153
public void close() throws Exception {
154154
super.close();
155-
this.currentData.close();
155+
if (this.currentData != null) {
156+
this.currentData.close();
157+
}
156158
}
157159
}

0 commit comments

Comments
 (0)