Skip to content

Support for UNLOAD functionality for Data Lake connectors #19981

@findinpath

Description

@findinpath

Introduction

Amazon Athena exposes UNLOAD statement which can be used to save to a specified storage location the output of a query.
AWS Athena documentation for UNLOAD : https://docs.aws.amazon.com/athena/latest/ug/unload.html

Such a functionality would be beneficial to implement in Trino as well.

Implementation roadmap

End state:

    CALL unload(path => …, format => ..., input => TABLE(...))

How do we get there:

First phase

SELECT * FROM TABLE(unload(path => …, input => TABLE (...)))
SELECT * FROM TABLE(hive.system.unload(
  input => TABLE( <subquery> ),
  location => 's3://bucket/path',
  format => 'parquet'))

What would the statement return?

A table function today runs either partitioned or not.
We need to partition the writes and therefore we partition the output - so we produce multiple values for the statement.
The statement would therefore potentially return all the filenames of all the files written.

A table function should be side-effect free.
Calling however this function twice would probably write twice / potentially overwrite and corrupt (in case of Iceberg or Delta Lake tables) content in the specified location.
How would it perform in the context of FTE?

Limitations:

  • No idempotency guarantees
  • Limited support for controlling partitioning scheme (e.g., exact number of files written)
  • Unload table function produces one output row for each file/partitioning element written, which will get displayed to the user as the result of the query

Second phase

SELECT * FROM TABLE(unload(path => …, input => TABLE (...)))

Aspects that can be improved independently from each other:

  • Resolve idempotency protocol / commit protocol
  • Control for partitioning scheme and scheduling
  • Support for partial/final style execution for table functions, so function can return a single summary value (e.g., number of rows written, or files written, etc)

End state:

    CALL unload(path => …, format => ... , input => TABLE(...))

Use CALL statement

Initial suggestion from @pettyjamesm

CALL unload(
      location => "s3://bucket/path", 
      format => "parquet", 
      input => TABLE (SELECT * FROM orders))

This kind of call would fit the SQL specification. Yes, CALL can take table as a parameter (either a table or a query).
Input from @martint

This would probably be ideal from a syntactic point of view.

CALL is today like a TASK in the coordinator - it is not a "proper" query.
In order to should be changed to be invoked in a “query”.
This could be sugared behind the scenes as

SELECT * FROM TABLE(system.unload(
  input => TABLE( <subquery> ),
  location => 's3://bucket/path',
  format => 'parquet'))

unload would return a status value saying e.g.: how many rows were "affected"
The commit protocol should be taken into account.

Other potential ways to implement this functionality

Add support for a UNLOAD in the SQL Syntax

It looks attractive, but it doesn’t fit in the Trino model.
The engine wouldn't know to exactly which connector should point UNLOAD to.
Moreover, there could be scenarios where depending on the specified location, different connectors would need to be used.

Add an extra table parameter to CTAS statement to avoid registering the table

CREATE TABLE hive.myschema.mytable 
WITH (
    location=’....’,
    register_table= false) 
AS SELECT * FROM tcph.sf1.nation;

The table parameter register_table would signal to the connector that the output table of this statement shouldn't be registered to the metastore.
In the example above however myschema and mytable are both synthetic - they are not used anywhere.

Create a special connector with the sole purpose of doing CTAS but don't register the table

CREATE TABLE hive.myschema.mytable 
WITH (
    location=’....’) 
AS SELECT * FROM tcph.sf1.nation;

Same as the option mentioned previously, the connector would completely disregard myschema and mytable.

Create a special connector which interprets differently the concept of schema name and table name

CREATE TABLE hive.bucket."table/location”
WITH (....)
AS SELECT * FROM tpch.sf1.nation;

The connector handling the above query would interpret the schema name as the bucket name and the table name as the path in the bucket to which we'd like to save the table.

Use a reserved schema name in a general purpose data lake connector

CREATE TABLE hive.raw."bucket/some/path"
WITH (....)
AS SELECT * FROM tpch.sf1.nation;

raw would be a reserved schema name in the connector.
This pragmatic choice comes with the side-effect of limiting the user not to use the reserved schema name.
From a UX perspective it feels rather awkward.

Insert into a table function the output of a SELECT

INSERT INTO TABLE(hive.system.unload(
  location => 's3://bucket/path',
  format => 'parquet'))

The syntax as it is listed, doesn’t fit in the standard.
This could be misleading for the users - how should the users distinguish between read-only and writable table functions?

Metadata

Metadata

Assignees

No one assigned

    Labels

    delta-lakeDelta Lake connectorenhancementNew feature or requesthiveHive connectorhudiHudi connectoricebergIceberg connector

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions