Skip to content

Commit 16a9d2f

Browse files
AHeiserkhachatryan
authored andcommitted
[FLINK-38486] Harden shutdown of system UDFs
If a resource is lazily created in open, we can only close after checking for null. Otherwise a failure during initialization will trigger secondary failures.
1 parent 9ecbcad commit 16a9d2f

File tree

7 files changed

+31
-11
lines changed

7 files changed

+31
-11
lines changed

flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemOutputFormat.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,9 @@ public void writeRecord(T record) {
210210
@Override
211211
public void close() throws IOException {
212212
try {
213-
writer.close();
213+
if (writer != null) {
214+
writer.close();
215+
}
214216
} catch (Exception e) {
215217
throw new TableException("Exception in close", e);
216218
}

flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,8 +471,10 @@ public void writeRecord(RowData record) throws IOException {
471471

472472
@Override
473473
public void close() throws IOException {
474-
this.output.flush();
475-
this.output.close();
474+
if (output != null) {
475+
this.output.flush();
476+
this.output.close();
477+
}
476478
}
477479
};
478480
}

flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/batch/compact/BatchFileWriter.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,9 @@ public void endInput() throws Exception {}
126126
public void close() throws Exception {
127127
try {
128128
staticPartitions.clear();
129-
writer.close();
129+
if (writer != null) {
130+
writer.close();
131+
}
130132
} catch (Exception e) {
131133
throw new TableException("Exception in close", e);
132134
}

flink-python/src/main/java/org/apache/flink/table/runtime/arrow/serializers/ArrowSerializer.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,10 +113,18 @@ public void write(RowData element) {
113113
}
114114

115115
public void close() throws Exception {
116-
arrowStreamWriter.end();
117-
arrowStreamReader.close();
118-
rootWriter.close();
119-
allocator.close();
116+
if (arrowStreamWriter != null) {
117+
arrowStreamWriter.end();
118+
}
119+
if (arrowStreamReader != null) {
120+
arrowStreamReader.close();
121+
}
122+
if (rootWriter != null) {
123+
rootWriter.close();
124+
}
125+
if (allocator != null) {
126+
allocator.close();
127+
}
120128
}
121129

122130
/** Creates an {@link ArrowWriter}. */

flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/batch/BatchArrowPythonGroupWindowAggregateFunctionOperator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,9 @@ public void open() throws Exception {
129129
@Override
130130
public void close() throws Exception {
131131
super.close();
132-
windowsGrouping.close();
132+
if (windowsGrouping != null) {
133+
windowsGrouping.close();
134+
}
133135
}
134136

135137
@Override

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/DefaultExpressionEvaluator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,9 @@ public MethodHandle open(FunctionContext context) {
7676
@Override
7777
public void close() {
7878
try {
79-
instance.close();
79+
if (instance != null) {
80+
instance.close();
81+
}
8082
} catch (Exception e) {
8183
throw new TableException(
8284
String.format(

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/BufferDataOverWindowOperator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ private void processCurrentData() throws Exception {
138138
@Override
139139
public void close() throws Exception {
140140
super.close();
141-
this.currentData.close();
141+
if (this.currentData != null) {
142+
this.currentData.close();
143+
}
142144
}
143145
}

0 commit comments

Comments
 (0)