Skip to content

Commit 8cc2e46

Browse files
cloud-fanzhengruifeng
authored andcommitted
[SPARK-54856][SQL] Refactor scalar function lookup and resolution
### What changes were proposed in this pull request? This PR adds `loadPersistentScalarFunction` method to `SessionCatalog`, to unify the scalar function lookup and resolution between v1 and v2 catalogs. The key difference between v1 and v2 catalogs about function is: - v1 catalog provides different APIs for describing function and invoking function to do function lookup: `lookupPersistentFunction` and `resolvePersistentFunction`. - v2 catalog only has one single lookup API: `loadFunction`. This API returns `UnboundFunction`, for invocation, it needs to enter the second phase and call `UnboundFunction.bind`. The newly added `loadPersistentScalarFunction` method follows the v2 catalog style and handles function invocation (load resources and create function builder) in a second phase. ### Why are the changes needed? Code cleanup, also also avoid redundant function lookup from catalog introduced by #53531 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests ### Was this patch authored or co-authored using generative AI tooling? cursor 2.2.44 Closes #53627 from cloud-fan/func. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Ruifeng Zheng <[email protected]>
1 parent df604f0 commit 8cc2e46

File tree

9 files changed

+187
-137
lines changed

9 files changed

+187
-137
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2102,7 +2102,7 @@ class Analyzer(
21022102
throw QueryCompilationErrors.expectPersistentFuncError(
21032103
nameParts.head, cmd, mismatchHint, u)
21042104
} else {
2105-
ResolvedNonPersistentFunc(nameParts.head, V1Function(info))
2105+
ResolvedNonPersistentFunc(nameParts.head, V1Function.metadataOnly(info))
21062106
}
21072107
}.getOrElse {
21082108
val CatalogAndIdentifier(catalog, ident) =

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,16 @@ trait FunctionRegistryBase[T] {
9090
/* List all of the registered function names. */
9191
def listFunction(): Seq[FunctionIdentifier]
9292

93-
/* Get the class of the registered function by specified name. */
94-
def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo]
93+
/* Get both ExpressionInfo and FunctionBuilder in a single lookup. */
94+
def lookupFunctionEntry(name: FunctionIdentifier): Option[(ExpressionInfo, FunctionBuilder)]
95+
96+
/* Get the ExpressionInfo of the registered function by specified name. */
97+
def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo] =
98+
lookupFunctionEntry(name).map(_._1)
9599

96100
/* Get the builder of the registered function by specified name. */
97-
def lookupFunctionBuilder(name: FunctionIdentifier): Option[FunctionBuilder]
101+
def lookupFunctionBuilder(name: FunctionIdentifier): Option[FunctionBuilder] =
102+
lookupFunctionEntry(name).map(_._2)
98103

99104
/** Drop a function and return whether the function existed. */
100105
def dropFunction(name: FunctionIdentifier): Boolean
@@ -245,13 +250,9 @@ trait SimpleFunctionRegistryBase[T] extends FunctionRegistryBase[T] with Logging
245250
functionBuilders.iterator.map(_._1).toList
246251
}
247252

248-
override def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo] = synchronized {
249-
functionBuilders.get(normalizeFuncName(name)).map(_._1)
250-
}
251-
252-
override def lookupFunctionBuilder(
253-
name: FunctionIdentifier): Option[FunctionBuilder] = synchronized {
254-
functionBuilders.get(normalizeFuncName(name)).map(_._2)
253+
override def lookupFunctionEntry(
254+
name: FunctionIdentifier): Option[(ExpressionInfo, FunctionBuilder)] = synchronized {
255+
functionBuilders.get(normalizeFuncName(name))
255256
}
256257

257258
override def dropFunction(name: FunctionIdentifier): Boolean = synchronized {
@@ -281,11 +282,8 @@ trait EmptyFunctionRegistryBase[T] extends FunctionRegistryBase[T] {
281282
throw SparkUnsupportedOperationException()
282283
}
283284

284-
override def lookupFunction(name: FunctionIdentifier): Option[ExpressionInfo] = {
285-
throw SparkUnsupportedOperationException()
286-
}
287-
288-
override def lookupFunctionBuilder(name: FunctionIdentifier): Option[FunctionBuilder] = {
285+
override def lookupFunctionEntry(
286+
name: FunctionIdentifier): Option[(ExpressionInfo, FunctionBuilder)] = {
289287
throw SparkUnsupportedOperationException()
290288
}
291289

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,11 @@ class FunctionResolution(
5252
val CatalogAndIdentifier(catalog, ident) =
5353
relationResolution.expandIdentifier(u.nameParts)
5454
catalog.asFunctionCatalog.loadFunction(ident) match {
55-
case V1Function(_) =>
56-
// this triggers the second time v1 function resolution but should be cheap
57-
// (no RPC to external catalog), since the metadata has been already cached
58-
// in FunctionRegistry during the above `catalog.loadFunction` call.
59-
resolveV1Function(ident.asFunctionIdentifier, u.arguments, u)
55+
case v1Func: V1Function =>
56+
// V1Function has a lazy builder - invoke() triggers resource loading
57+
// and builder creation only on first invocation
58+
val func = v1Func.invoke(u.arguments)
59+
validateFunction(func, u.arguments.length, u)
6060
case unboundV2Func =>
6161
resolveV2Function(unboundV2Func, u.arguments, u)
6262
}
@@ -118,14 +118,6 @@ class FunctionResolution(
118118
}
119119
}
120120

121-
private def resolveV1Function(
122-
ident: FunctionIdentifier,
123-
arguments: Seq[Expression],
124-
u: UnresolvedFunction): Expression = {
125-
val func = v1SessionCatalog.resolvePersistentFunction(ident, arguments)
126-
validateFunction(func, arguments.length, u)
127-
}
128-
129121
private def validateFunction(
130122
func: Expression,
131123
numArgs: Int,

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala

Lines changed: 95 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path
3232

3333
import org.apache.spark.{SparkException, SparkThrowable}
3434
import org.apache.spark.internal.Logging
35-
import org.apache.spark.sql.AnalysisException
3635
import org.apache.spark.sql.catalyst._
3736
import org.apache.spark.sql.catalyst.analysis._
3837
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
@@ -49,6 +48,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAM
4948
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
5049
import org.apache.spark.sql.internal.SQLConf
5150
import org.apache.spark.sql.internal.StaticSQLConf.GLOBAL_TEMP_DATABASE
51+
import org.apache.spark.sql.internal.connector.V1Function
5252
import org.apache.spark.sql.metricview.util.MetricViewPlanner
5353
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
5454
import org.apache.spark.sql.util.{CaseInsensitiveStringMap, PartitioningUtils}
@@ -2153,122 +2153,121 @@ class SessionCatalog(
21532153
}
21542154

21552155
/**
2156-
* Look up the `ExpressionInfo` of the given function by name if it's a persistent function.
2157-
* This supports both scalar and table functions.
2156+
* Look up a persistent function's ExpressionInfo by name (for DESCRIBE FUNCTION).
2157+
* This only fetches metadata without loading resources or creating builders.
21582158
*/
21592159
def lookupPersistentFunction(name: FunctionIdentifier): ExpressionInfo = {
21602160
val qualifiedIdent = qualifyIdentifier(name)
2161-
val db = qualifiedIdent.database.get
2162-
val funcName = qualifiedIdent.funcName
2161+
// Check if already cached in either registry
21632162
functionRegistry.lookupFunction(qualifiedIdent)
21642163
.orElse(tableFunctionRegistry.lookupFunction(qualifiedIdent))
21652164
.getOrElse {
2166-
requireDbExists(db)
2167-
if (externalCatalog.functionExists(db, funcName)) {
2168-
val metadata = externalCatalog.getFunction(db, funcName)
2169-
if (metadata.isUserDefinedFunction) {
2170-
UserDefinedFunction.fromCatalogFunction(metadata, parser).toExpressionInfo
2171-
} else {
2172-
makeExprInfoForHiveFunction(metadata.copy(identifier = qualifiedIdent))
2173-
}
2165+
val funcMetadata = fetchCatalogFunction(qualifiedIdent)
2166+
if (funcMetadata.isUserDefinedFunction) {
2167+
UserDefinedFunction.fromCatalogFunction(funcMetadata, parser).toExpressionInfo
21742168
} else {
2175-
failFunctionLookup(name)
2169+
makeExprInfoForHiveFunction(funcMetadata)
21762170
}
21772171
}
21782172
}
21792173

21802174
/**
2181-
* Look up a persistent scalar function by name and resolves it to an Expression.
2175+
* Load a persistent scalar function by name.
2176+
* Returns V1Function with:
2177+
* - Eager info (from cache or catalog fetch, no resource loading)
2178+
* - Lazy builder (resource loading only on first invoke)
2179+
*
2180+
* This matches V1 behavior where DESCRIBE doesn't load resources.
21822181
*/
2183-
def resolvePersistentFunction(
2184-
name: FunctionIdentifier, arguments: Seq[Expression]): Expression = {
2185-
resolvePersistentFunctionInternal[Expression](
2186-
name,
2187-
arguments,
2188-
functionRegistry,
2189-
registerHiveFunc = func =>
2190-
registerFunction(
2191-
func,
2192-
overrideIfExists = false,
2193-
registry = functionRegistry,
2194-
functionBuilder = makeFunctionBuilder(func)
2195-
),
2196-
registerUserDefinedFunc = function => {
2197-
val builder = makeUserDefinedScalarFuncBuilder(function)
2198-
registerUserDefinedFunction[Expression](
2199-
function = function,
2200-
overrideIfExists = false,
2201-
registry = functionRegistry,
2202-
functionBuilder = builder)
2203-
}
2204-
)
2182+
def loadPersistentScalarFunction(name: FunctionIdentifier): V1Function = {
2183+
val qualifiedIdent = qualifyIdentifier(name)
2184+
2185+
// Check cache first (no synchronization needed for reads)
2186+
functionRegistry.lookupFunctionEntry(qualifiedIdent) match {
2187+
case Some((cachedInfo, cachedBuilder)) =>
2188+
// Already cached - return with eager builder
2189+
V1Function(cachedInfo, cachedBuilder)
2190+
2191+
case None =>
2192+
// Fetch metadata eagerly (no resource loading yet)
2193+
val funcMetadata = fetchCatalogFunction(qualifiedIdent)
2194+
val info = if (funcMetadata.isUserDefinedFunction) {
2195+
UserDefinedFunction.fromCatalogFunction(funcMetadata, parser).toExpressionInfo
2196+
} else {
2197+
makeExprInfoForHiveFunction(funcMetadata)
2198+
}
2199+
2200+
// Builder factory - loads resources only on first invoke()
2201+
val builderFactory: () => FunctionBuilder = () => synchronized {
2202+
// Re-check cache (another thread may have loaded it)
2203+
functionRegistry.lookupFunctionBuilder(qualifiedIdent).getOrElse {
2204+
if (funcMetadata.isUserDefinedFunction) {
2205+
val udf = UserDefinedFunction.fromCatalogFunction(funcMetadata, parser)
2206+
registerUserDefinedFunction[Expression](
2207+
udf,
2208+
overrideIfExists = false,
2209+
functionRegistry,
2210+
makeUserDefinedScalarFuncBuilder(udf))
2211+
} else {
2212+
loadFunctionResources(funcMetadata.resources)
2213+
registerFunction(
2214+
funcMetadata,
2215+
overrideIfExists = false,
2216+
functionRegistry,
2217+
makeFunctionBuilder(funcMetadata))
2218+
}
2219+
functionRegistry.lookupFunctionBuilder(qualifiedIdent).get
2220+
}
2221+
}
2222+
2223+
V1Function(info, builderFactory)
2224+
}
22052225
}
22062226

22072227
/**
22082228
* Look up a persistent table function by name and resolves it to a LogicalPlan.
22092229
*/
22102230
def resolvePersistentTableFunction(
22112231
name: FunctionIdentifier,
2212-
arguments: Seq[Expression]): LogicalPlan = {
2213-
resolvePersistentFunctionInternal[LogicalPlan](
2214-
name,
2215-
arguments,
2216-
tableFunctionRegistry,
2217-
// We don't support persistent Hive table functions yet.
2218-
registerHiveFunc = (func: CatalogFunction) => failFunctionLookup(name),
2219-
registerUserDefinedFunc = function => {
2220-
val builder = makeUserDefinedTableFuncBuilder(function)
2221-
registerUserDefinedFunction[LogicalPlan](
2222-
function = function,
2223-
overrideIfExists = false,
2224-
registry = tableFunctionRegistry,
2225-
functionBuilder = builder)
2232+
arguments: Seq[Expression]): LogicalPlan = synchronized {
2233+
val qualifiedIdent = qualifyIdentifier(name)
2234+
if (tableFunctionRegistry.functionExists(qualifiedIdent)) {
2235+
// Already cached
2236+
tableFunctionRegistry.lookupFunction(qualifiedIdent, arguments)
2237+
} else {
2238+
// Load from catalog
2239+
val funcMetadata = fetchCatalogFunction(qualifiedIdent)
2240+
if (!funcMetadata.isUserDefinedFunction) {
2241+
// Hive table functions are not supported
2242+
failFunctionLookup(qualifiedIdent)
22262243
}
2227-
)
2244+
val udf = UserDefinedFunction.fromCatalogFunction(funcMetadata, parser)
2245+
registerUserDefinedFunction[LogicalPlan](
2246+
udf,
2247+
overrideIfExists = false,
2248+
tableFunctionRegistry,
2249+
makeUserDefinedTableFuncBuilder(udf))
2250+
tableFunctionRegistry.lookupFunction(qualifiedIdent, arguments)
2251+
}
22282252
}
22292253

2230-
private def resolvePersistentFunctionInternal[T](
2231-
name: FunctionIdentifier,
2232-
arguments: Seq[Expression],
2233-
registry: FunctionRegistryBase[T],
2234-
registerHiveFunc: CatalogFunction => Unit,
2235-
registerUserDefinedFunc: UserDefinedFunction => Unit): T = {
2236-
// `synchronized` is used to prevent multiple threads from concurrently resolving the
2237-
// same function that has not yet been loaded into the function registry. This is needed
2238-
// because calling `registerFunction` twice with `overrideIfExists = false` can lead to
2239-
// a FunctionAlreadyExistsException.
2240-
synchronized {
2241-
val qualifiedIdent = qualifyIdentifier(name)
2242-
val db = qualifiedIdent.database.get
2243-
val funcName = qualifiedIdent.funcName
2244-
if (registry.functionExists(qualifiedIdent)) {
2245-
// This function has been already loaded into the function registry.
2246-
registry.lookupFunction(qualifiedIdent, arguments)
2247-
} else {
2248-
// The function has not been loaded to the function registry, which means
2249-
// that the function is a persistent function (if it actually has been registered
2250-
// in the metastore). We need to first put the function in the function registry.
2251-
val catalogFunction = try {
2252-
externalCatalog.getFunction(db, funcName)
2253-
} catch {
2254-
case _: AnalysisException => failFunctionLookup(qualifiedIdent)
2255-
}
2256-
// Please note that qualifiedName is provided by the user. However,
2257-
// catalogFunction.identifier.unquotedString is returned by the underlying
2258-
// catalog. So, it is possible that qualifiedName is not exactly the same as
2259-
// catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
2260-
// At here, we preserve the input from the user.
2261-
val funcMetadata = catalogFunction.copy(identifier = qualifiedIdent)
2262-
if (!catalogFunction.isUserDefinedFunction) {
2263-
loadFunctionResources(catalogFunction.resources)
2264-
registerHiveFunc(funcMetadata)
2265-
} else {
2266-
val function = UserDefinedFunction.fromCatalogFunction(funcMetadata, parser)
2267-
registerUserDefinedFunc(function)
2268-
}
2269-
// Now, we need to create the Expression.
2270-
registry.lookupFunction(qualifiedIdent, arguments)
2271-
}
2254+
/**
2255+
* Fetch a catalog function from the external catalog.
2256+
*/
2257+
private def fetchCatalogFunction(qualifiedIdent: FunctionIdentifier): CatalogFunction = {
2258+
val db = qualifiedIdent.database.get
2259+
val funcName = qualifiedIdent.funcName
2260+
requireDbExists(db)
2261+
try {
2262+
// Please note that qualifiedIdent is provided by the user. However,
2263+
// CatalogFunction.identifier is returned by the underlying catalog.
2264+
// So, it is possible that qualifiedIdent is not exactly the same as
2265+
// catalogFunction.identifier (difference is on case-sensitivity).
2266+
// At here, we preserve the input from the user.
2267+
externalCatalog.getFunction(db, funcName).copy(identifier = qualifiedIdent)
2268+
} catch {
2269+
case _: NoSuchPermanentFunctionException | _: NoSuchFunctionException =>
2270+
failFunctionLookup(qualifiedIdent)
22722271
}
22732272
}
22742273

@@ -2290,9 +2289,9 @@ class SessionCatalog(
22902289
def lookupFunction(name: FunctionIdentifier, children: Seq[Expression]): Expression = {
22912290
if (name.database.isEmpty) {
22922291
resolveBuiltinOrTempFunction(name.funcName, children)
2293-
.getOrElse(resolvePersistentFunction(name, children))
2292+
.getOrElse(loadPersistentScalarFunction(name).invoke(children))
22942293
} else {
2295-
resolvePersistentFunction(name, children)
2294+
loadPersistentScalarFunction(name).invoke(children)
22962295
}
22972296
}
22982297

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import org.apache.spark.sql.connector.catalog.{CatalogManager, Column, DefaultVa
3838
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
3939
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
4040
import org.apache.spark.sql.internal.SQLConf
41-
import org.apache.spark.sql.internal.connector.V1Function
4241
import org.apache.spark.sql.types._
4342
import org.apache.spark.sql.util.CaseInsensitiveStringMap
4443
import org.apache.spark.util.ArrayImplicits._
@@ -671,7 +670,7 @@ object ResolveDefaultColumns extends QueryErrorsBase
671670
throw SparkUnsupportedOperationException()
672671
}
673672
override def loadFunction(ident: Identifier): UnboundFunction = {
674-
V1Function(v1Catalog.lookupPersistentFunction(ident.asFunctionIdentifier))
673+
v1Catalog.loadPersistentScalarFunction(ident.asFunctionIdentifier)
675674
}
676675
override def functionExists(ident: Identifier): Boolean = {
677676
v1Catalog.isPersistentFunction(ident.asFunctionIdentifier)

0 commit comments

Comments
 (0)