Skip to content

Commit b1fd0b8

Browse files
[FLINK-37724] Adds AsyncTableFunction as a fully supported UDF type (apache#26567)
[FLINK-37724] Adds AsyncTableFunction as a fully supported UDF type.
1 parent 7309fe8 commit b1fd0b8

File tree

67 files changed

+3948
-238
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+3948
-238
lines changed

docs/layouts/shortcodes/generated/execution_config_configuration.html

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
<td><h5>table.exec.async-scalar.buffer-capacity</h5><br> <span class="label label-primary">Streaming</span></td>
5656
<td style="word-wrap: break-word;">10</td>
5757
<td>Integer</td>
58-
<td>The max number of async i/o operation that the async lookup join can trigger.</td>
58+
<td>The max number of async i/o operation that the async scalar function can trigger.</td>
5959
</tr>
6060
<tr>
6161
<td><h5>table.exec.async-scalar.max-attempts</h5><br> <span class="label label-primary">Streaming</span></td>
@@ -87,6 +87,36 @@
8787
<td>Boolean</td>
8888
<td>Set whether to use the SQL/Table operators based on the asynchronous state api. Default value is false.</td>
8989
</tr>
90+
<tr>
91+
<td><h5>table.exec.async-table.max-concurrent-operations</h5><br> <span class="label label-primary">Streaming</span></td>
92+
<td style="word-wrap: break-word;">10</td>
93+
<td>Integer</td>
94+
<td>The max number of concurrent async i/o operations that the async table function can trigger.</td>
95+
</tr>
96+
<tr>
97+
<td><h5>table.exec.async-table.max-retries</h5><br> <span class="label label-primary">Streaming</span></td>
98+
<td style="word-wrap: break-word;">3</td>
99+
<td>Integer</td>
100+
<td>The max number of async retry attempts to make before task execution is failed.</td>
101+
</tr>
102+
<tr>
103+
<td><h5>table.exec.async-table.retry-delay</h5><br> <span class="label label-primary">Streaming</span></td>
104+
<td style="word-wrap: break-word;">100 ms</td>
105+
<td>Duration</td>
106+
<td>The delay to wait before trying again.</td>
107+
</tr>
108+
<tr>
109+
<td><h5>table.exec.async-table.retry-strategy</h5><br> <span class="label label-primary">Streaming</span></td>
110+
<td style="word-wrap: break-word;">FIXED_DELAY</td>
111+
<td><p>Enum</p></td>
112+
<td>Restart strategy which will be used, FIXED_DELAY by default.<br /><br />Possible values:<ul><li>"NO_RETRY"</li><li>"FIXED_DELAY"</li></ul></td>
113+
</tr>
114+
<tr>
115+
<td><h5>table.exec.async-table.timeout</h5><br> <span class="label label-primary">Streaming</span></td>
116+
<td style="word-wrap: break-word;">3 min</td>
117+
<td>Duration</td>
118+
<td>The async timeout for the asynchronous operation to complete, including any retries which may occur.</td>
119+
</tr>
90120
<tr>
91121
<td><h5>table.exec.deduplicate.insert-update-after-sensitive-enabled</h5><br> <span class="label label-primary">Streaming</span></td>
92122
<td style="word-wrap: break-word;">true</td>

flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.ArrayList;
3535
import java.util.Collections;
3636
import java.util.List;
37+
import java.util.Optional;
3738

3839
import static org.apache.flink.shaded.asm9.org.objectweb.asm.Type.getConstructorDescriptor;
3940
import static org.apache.flink.shaded.asm9.org.objectweb.asm.Type.getMethodDescriptor;
@@ -373,4 +374,28 @@ public static void validateLambdaType(Class<?> baseClass, Type t) {
373374
+ "Otherwise the type has to be specified explicitly using type information.");
374375
}
375376
}
377+
378+
/**
379+
* Will return true if the type of the given generic class type matches clazz.
380+
*
381+
* @param clazz The generic class to check against
382+
* @param type The type to be checked
383+
*/
384+
public static boolean isGenericOfClass(Class<?> clazz, Type type) {
385+
Optional<ParameterizedType> parameterized = getParameterizedType(type);
386+
return clazz.equals(type)
387+
|| parameterized.isPresent() && clazz.equals(parameterized.get().getRawType());
388+
}
389+
390+
/**
391+
* Returns an optional of a ParameterizedType, if that's what the type is.
392+
*
393+
* @param type The type to check
394+
* @return optional which is present if the type is a ParameterizedType
395+
*/
396+
public static Optional<ParameterizedType> getParameterizedType(Type type) {
397+
return Optional.of(type)
398+
.filter(p -> p instanceof ParameterizedType)
399+
.map(ParameterizedType.class::cast);
400+
}
376401
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.java.typeutils;
20+
21+
import org.junit.jupiter.api.Test;
22+
23+
import java.lang.reflect.Method;
24+
import java.lang.reflect.ParameterizedType;
25+
import java.lang.reflect.Type;
26+
import java.util.List;
27+
import java.util.Optional;
28+
29+
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
30+
import static org.assertj.core.api.Assertions.assertThat;
31+
32+
/** Tests for {@link TypeExtractionUtils}. */
33+
@SuppressWarnings("rawtypes")
34+
public class TypeExtractionUtilsTest {
35+
36+
@Test
37+
void testIsGeneric() throws Exception {
38+
Method method = getMethod(IsGeneric.class, "m1");
39+
Type firstParam = method.getGenericParameterTypes()[0];
40+
assertThat(TypeExtractionUtils.isGenericOfClass(List.class, firstParam)).isTrue();
41+
42+
method = getMethod(IsGeneric.class, "m2");
43+
firstParam = method.getGenericParameterTypes()[0];
44+
assertThat(TypeExtractionUtils.isGenericOfClass(List.class, firstParam)).isTrue();
45+
}
46+
47+
@Test
48+
void testGetParameterizedType() throws Exception {
49+
Method method = getMethod(IsGeneric.class, "m1");
50+
Type firstParam = method.getGenericParameterTypes()[0];
51+
Optional<ParameterizedType> parameterizedType =
52+
TypeExtractionUtils.getParameterizedType(firstParam);
53+
assertThat(parameterizedType).isPresent();
54+
assertThat(parameterizedType.get().getRawType()).isEqualTo(List.class);
55+
assertThat(parameterizedType.get().getActualTypeArguments()[0]).isEqualTo(Integer.class);
56+
57+
method = getMethod(IsGeneric.class, "m2");
58+
firstParam = method.getGenericParameterTypes()[0];
59+
assertThat(TypeExtractionUtils.getParameterizedType(firstParam)).isEmpty();
60+
}
61+
62+
private Method getMethod(Class<?> clazz, String name) throws Exception {
63+
return getAllDeclaredMethods(clazz).stream()
64+
.filter(m -> m.getName().equals(name))
65+
.findFirst()
66+
.orElseThrow();
67+
}
68+
69+
public static class IsGeneric {
70+
public void m1(List<Integer> list) {}
71+
72+
public void m2(List list) {}
73+
}
74+
}

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ public class ExecutionConfigOptions {
422422
.intType()
423423
.defaultValue(10)
424424
.withDescription(
425-
"The max number of async i/o operation that the async lookup join can trigger.");
425+
"The max number of async i/o operation that the async scalar function can trigger.");
426426

427427
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
428428
public static final ConfigOption<Duration> TABLE_EXEC_ASYNC_SCALAR_TIMEOUT =
@@ -456,6 +456,49 @@ public class ExecutionConfigOptions {
456456
"The max number of async retry attempts to make before task "
457457
+ "execution is failed.");
458458

459+
// ------------------------------------------------------------------------
460+
// Async Table Function
461+
// ------------------------------------------------------------------------
462+
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
463+
public static final ConfigOption<Integer> TABLE_EXEC_ASYNC_TABLE_MAX_CONCURRENT_OPERATIONS =
464+
key("table.exec.async-table.max-concurrent-operations")
465+
.intType()
466+
.defaultValue(10)
467+
.withDescription(
468+
"The max number of concurrent async i/o operations that the async table function can trigger.");
469+
470+
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
471+
public static final ConfigOption<Duration> TABLE_EXEC_ASYNC_TABLE_TIMEOUT =
472+
key("table.exec.async-table.timeout")
473+
.durationType()
474+
.defaultValue(Duration.ofMinutes(3))
475+
.withDescription(
476+
"The async timeout for the asynchronous operation to complete, including any retries which may occur.");
477+
478+
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
479+
public static final ConfigOption<RetryStrategy> TABLE_EXEC_ASYNC_TABLE_RETRY_STRATEGY =
480+
key("table.exec.async-table.retry-strategy")
481+
.enumType(RetryStrategy.class)
482+
.defaultValue(RetryStrategy.FIXED_DELAY)
483+
.withDescription(
484+
"Restart strategy which will be used, FIXED_DELAY by default.");
485+
486+
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
487+
public static final ConfigOption<Duration> TABLE_EXEC_ASYNC_TABLE_RETRY_DELAY =
488+
key("table.exec.async-table.retry-delay")
489+
.durationType()
490+
.defaultValue(Duration.ofMillis(100))
491+
.withDescription("The delay to wait before trying again.");
492+
493+
@Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
494+
public static final ConfigOption<Integer> TABLE_EXEC_ASYNC_TABLE_MAX_RETRIES =
495+
key("table.exec.async-table.max-retries")
496+
.intType()
497+
.defaultValue(3)
498+
.withDescription(
499+
"The max number of async retry attempts to make before task "
500+
+ "execution is failed.");
501+
459502
// ------------------------------------------------------------------------
460503
// Async ML_PREDICT Options
461504
// ------------------------------------------------------------------------

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/UserDefinedFunctionHelper.java

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,17 @@
4848
import java.lang.reflect.ParameterizedType;
4949
import java.lang.reflect.Type;
5050
import java.util.Arrays;
51+
import java.util.Collection;
5152
import java.util.HashSet;
5253
import java.util.List;
54+
import java.util.Optional;
5355
import java.util.Set;
5456
import java.util.concurrent.CompletableFuture;
5557
import java.util.stream.Collectors;
5658

5759
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
60+
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getParameterizedType;
61+
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.isGenericOfClass;
5862
import static org.apache.flink.util.Preconditions.checkState;
5963

6064
/**
@@ -477,11 +481,12 @@ private static void validateImplementationMethods(
477481
validateImplementationMethod(functionClass, false, false, SCALAR_EVAL);
478482
} else if (AsyncScalarFunction.class.isAssignableFrom(functionClass)) {
479483
validateImplementationMethod(functionClass, false, false, ASYNC_SCALAR_EVAL);
480-
validateAsyncImplementationMethod(functionClass, ASYNC_SCALAR_EVAL);
484+
validateAsyncImplementationMethod(functionClass, false, ASYNC_SCALAR_EVAL);
481485
} else if (TableFunction.class.isAssignableFrom(functionClass)) {
482486
validateImplementationMethod(functionClass, true, false, TABLE_EVAL);
483487
} else if (AsyncTableFunction.class.isAssignableFrom(functionClass)) {
484488
validateImplementationMethod(functionClass, true, false, ASYNC_TABLE_EVAL);
489+
validateAsyncImplementationMethod(functionClass, true, ASYNC_TABLE_EVAL);
485490
} else if (AggregateFunction.class.isAssignableFrom(functionClass)) {
486491
validateImplementationMethod(functionClass, true, false, AGGREGATE_ACCUMULATE);
487492
validateImplementationMethod(functionClass, true, true, AGGREGATE_RETRACT);
@@ -541,7 +546,9 @@ private static void validateImplementationMethod(
541546
}
542547

543548
private static void validateAsyncImplementationMethod(
544-
Class<? extends UserDefinedFunction> clazz, String... methodNameOptions) {
549+
Class<? extends UserDefinedFunction> clazz,
550+
boolean verifyFutureContainsCollection,
551+
String... methodNameOptions) {
545552
final Set<String> nameSet = new HashSet<>(Arrays.asList(methodNameOptions));
546553
final List<Method> methods = getAllDeclaredMethods(clazz);
547554
for (Method method : methods) {
@@ -558,18 +565,31 @@ private static void validateAsyncImplementationMethod(
558565
if (method.getParameterCount() >= 1) {
559566
Type firstParam = method.getGenericParameterTypes()[0];
560567
firstParam = ExtractionUtils.resolveVariableWithClassContext(clazz, firstParam);
561-
if (CompletableFuture.class.equals(firstParam)
562-
|| firstParam instanceof ParameterizedType
563-
&& CompletableFuture.class.equals(
564-
((ParameterizedType) firstParam).getRawType())) {
565-
foundParam = true;
568+
if (isGenericOfClass(CompletableFuture.class, firstParam)) {
569+
Optional<ParameterizedType> parameterized = getParameterizedType(firstParam);
570+
if (!verifyFutureContainsCollection) {
571+
foundParam = true;
572+
} else if (parameterized.isPresent()
573+
&& parameterized.get().getActualTypeArguments().length > 0) {
574+
Type firstTypeArgument = parameterized.get().getActualTypeArguments()[0];
575+
if (isGenericOfClass(Collection.class, firstTypeArgument)) {
576+
foundParam = true;
577+
}
578+
}
566579
}
567580
}
568581
if (!foundParam) {
569-
throw new ValidationException(
570-
String.format(
571-
"Method '%s' of function class '%s' must have a first argument of type java.util.concurrent.CompletableFuture.",
572-
method.getName(), clazz.getName()));
582+
if (!verifyFutureContainsCollection) {
583+
throw new ValidationException(
584+
String.format(
585+
"Method '%s' of function class '%s' must have a first argument of type java.util.concurrent.CompletableFuture.",
586+
method.getName(), clazz.getName()));
587+
} else {
588+
throw new ValidationException(
589+
String.format(
590+
"Method '%s' of function class '%s' must have a first argument of type java.util.concurrent.CompletableFuture<java.util.Collection>.",
591+
method.getName(), clazz.getName()));
592+
}
573593
}
574594
}
575595
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionMappingExtractor.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.lang.reflect.ParameterizedType;
3636
import java.lang.reflect.Type;
3737
import java.util.Arrays;
38+
import java.util.Collection;
3839
import java.util.HashSet;
3940
import java.util.List;
4041
import java.util.Map;
@@ -43,6 +44,7 @@
4344
import java.util.concurrent.CompletableFuture;
4445
import java.util.stream.Stream;
4546

47+
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getParameterizedType;
4648
import static org.apache.flink.table.types.extraction.ExtractionUtils.collectAnnotationsOfClass;
4749
import static org.apache.flink.table.types.extraction.ExtractionUtils.collectAnnotationsOfMethod;
4850
import static org.apache.flink.table.types.extraction.ExtractionUtils.extractionError;
@@ -208,7 +210,8 @@ static MethodVerification createParameterVerification(boolean requireAccumulator
208210
* Verification that checks a method by parameters (arguments only) with mandatory {@link
209211
* CompletableFuture}.
210212
*/
211-
static MethodVerification createParameterAndCompletableFutureVerification(Class<?> baseClass) {
213+
static MethodVerification createParameterAndCompletableFutureVerification(
214+
Class<?> baseClass, boolean verifyFutureContainsCollection) {
212215
return (method, state, arguments, result) -> {
213216
checkNoState(state);
214217
checkScalarArgumentsOnly(arguments);
@@ -220,12 +223,29 @@ static MethodVerification createParameterAndCompletableFutureVerification(Class<
220223
final Class<?> resultClass = result.toClass();
221224
Type genericType = method.getGenericParameterTypes()[0];
222225
genericType = resolveVariableWithClassContext(baseClass, genericType);
223-
if (!(genericType instanceof ParameterizedType)) {
226+
Optional<ParameterizedType> parameterized = getParameterizedType(genericType);
227+
if (!parameterized.isPresent()) {
224228
throw extractionError(
225229
"The method '%s' needs generic parameters for the CompletableFuture at position %d.",
226230
method.getName(), 0);
227231
}
228-
final Type returnType = ((ParameterizedType) genericType).getActualTypeArguments()[0];
232+
// If verifyFutureContainsCollection is given, it is assumed to be a generic parameters
233+
// of argumentClass, also at the position genericPos
234+
final Type returnType;
235+
if (verifyFutureContainsCollection) {
236+
Type nestedGenericType = parameterized.get().getActualTypeArguments()[0];
237+
Optional<ParameterizedType> nestedParameterized =
238+
getParameterizedType(nestedGenericType);
239+
if (!nestedParameterized.isPresent()
240+
|| !nestedParameterized.get().getRawType().equals(Collection.class)) {
241+
throw extractionError(
242+
"The method '%s' expects nested generic type CompletableFuture<Collection> for the %d arg.",
243+
method.getName(), 0);
244+
}
245+
returnType = nestedParameterized.get().getActualTypeArguments()[0];
246+
} else {
247+
returnType = parameterized.get().getActualTypeArguments()[0];
248+
}
229249
Class<?> returnTypeClass = getClassFromType(returnType);
230250
// Parameters should be validated using strict autoboxing.
231251
// For return types, we can be more flexible as the UDF should know what it declared.

flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/TypeInferenceExtractor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public static TypeInference forAsyncScalarFunction(
106106
null,
107107
null,
108108
createOutputFromGenericInMethod(0, 0, true),
109-
createParameterAndCompletableFutureVerification(function));
109+
createParameterAndCompletableFutureVerification(function, false));
110110
return extractTypeInference(mappingExtractor, false);
111111
}
112112

@@ -172,7 +172,7 @@ public static TypeInference forAsyncTableFunction(
172172
null,
173173
null,
174174
createOutputFromGenericInClass(AsyncTableFunction.class, 0, true),
175-
createParameterAndCompletableFutureVerification(function));
175+
createParameterAndCompletableFutureVerification(function, true));
176176
return extractTypeInference(mappingExtractor, false);
177177
}
178178

0 commit comments

Comments
 (0)