Skip to content

[Feature] Support Context-Aware UDFs and Lifecycle Management in SQL Transform #10460

@davidzollo

Description

@davidzollo

Is your feature request related to a problem? Please describe

Currently, the SQL UDF mechanism (ZetaUDF) in SeaTunnel Transform is stateless and isolated. It handles data purely based on the input arguments passed to the function. However, in complex production scenarios, this design has several limitations:

  1. Lack of Context Access: UDFs cannot access row-level metadata such as the source database, table name, rowKind (INSERT/UPDATE/DELETE), or other fields in the row that are not explicitly passed as arguments.
  2. No Lifecycle Management: There are no open() or close() hooks. This makes it impossible to initialize expensive resources (e.g., KMS clients, database connections, cache loaders) once and reuse them. Users are forced to initialize resources per row or use static singletons, which are hard to manage and test.
  3. Limited Stateful Capabilities: Implementing logic that depends on the execution context (e.g., encrypting data using a key derived from the table name) is currently impossible.

Describe the solution you'd like

I propose enhancing the ZetaUDF interface and the SQL execution engine to support Context-Aware UDFs and Lifecycle Management, while maintaining 100% backward compatibility with existing UDFs.

1. ZetaUDF Interface Extension

Introduce new default methods to the ZetaUDF interface. Existing UDFs do not need to change.

public interface ZetaUDF extends Serializable {
    // ... existing methods ...

    /**
     * Indicate whether this UDF requires row-level context.
     * Default returns false for backward compatibility.
     */
    default boolean requiresContext() {
        return false;
    }

    /**
     * Evaluate the function with context.
     * Default implementation falls back to evaluate(args).
     */
    default Object evaluateWithContext(List<Object> args, ZetaUDFContext context) {
        return evaluate(args);
    }

    /**
     * Initialize resources. Called once during engine startup.
     */
    default void open() throws Exception {}

    /**
     * Release resources. Called once during engine shutdown.
     */
    default void close() {}
}

2. Introduce ZetaUDFContext

A lightweight, mutable context class that provides access to the runtime environment.

  • Metadata Access: getRawTableId(), getDatabase(), getTable(), getRowKind().
  • Data Access: getAllFields() (read-only access to the full row).

3. Engine Enhancements

  • Lifecycle Integration: The ZetaSQLEngine will call open() on all registered UDFs during preparation and close() during shutdown.
  • Context Injection: The engine will maintain a shared ZetaUDFContext instance and update it for each row (using a zero-allocation, mutable pattern) before executing UDFs.

Describe alternatives you've considered

  • ThreadLocal Context: Considered using ThreadLocal to pass context implicitly.
    • Rejected: Hard to manage lifecycle in different execution environments (Spark/Flink/SeaTunnel Engine), potential for memory leaks, and less explicit API contract.
  • Refactoring evaluate Signature: Changing evaluate(List<Object> args) to accept context.
    • Rejected: This would break all existing Custom UDFs. The proposed default method approach ensures seamless backward compatibility.

Additional context

Performance Considerations

The implementation focuses on Zero-Copy and Object Reuse:

  • The ZetaUDFContext instance is created once and reused.
  • Updating the context for each row involves only reference assignments (context.update(fields, row)).
  • No new objects are allocated per row processing.
  • Expected performance impact is negligible (< 2%) for UDFs that do not use the context features.

Example Usage (Encryption)

public class EncryptUDF implements ZetaUDF {
    private CryptoClient client;

    @Override
    public void open() {
        this.client = new CryptoClient(); // Init once
    }

    @Override
    public boolean requiresContext() { return true; }

    @Override
    public Object evaluateWithContext(List<Object> args, ZetaUDFContext context) {
        String tableId = context.getRawTableId();
        // Use table name as part of the encryption key derivation
        return client.encrypt(args.get(0), tableId);
    }
}

Metadata

Metadata

Assignees

Type

No type

Projects

Status

Doing

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions