diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b38495c48dd..2c845b4caec5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,6 +40,7 @@ - [New `Profile` API for timing code execution.][14827] - [Expanded S3 API with versions and signed uri][14831] - [Support for reading JSON data from database connections.][14872] +- [Add Redshift bulk loading][14860] [14522]: https://github.com/enso-org/enso/pull/14522 [14476]: https://github.com/enso-org/enso/pull/14476 @@ -56,6 +57,7 @@ [14827]: https://github.com/enso-org/enso/pull/14827 [14831]: https://github.com/enso-org/enso/pull/14831 [14872]: https://github.com/enso-org/enso/pull/14872 +[14860]: https://github.com/enso-org/enso/pull/14860 #### Enso Language & Runtime diff --git a/distribution/lib/Standard/AWS/0.0.0-dev/docs/api/Database/Redshift/Redshift_Connection.md b/distribution/lib/Standard/AWS/0.0.0-dev/docs/api/Database/Redshift/Redshift_Connection.md index 0f5956cde9e9..916147e83d86 100644 --- a/distribution/lib/Standard/AWS/0.0.0-dev/docs/api/Database/Redshift/Redshift_Connection.md +++ b/distribution/lib/Standard/AWS/0.0.0-dev/docs/api/Database/Redshift/Redshift_Connection.md @@ -2,6 +2,7 @@ ## module Standard.AWS.Database.Redshift.Redshift_Connection - type Redshift_Connection - base_connection self -> Standard.Base.Any.Any + - bulk_load self table:Standard.Table.Table.Table= table_name:Standard.Base.Data.Text.Text= staging_bucket_credentials:(Standard.Database.Connection.Credentials.Credentials|Standard.AWS.AWS_Credential.AWS_Credential)= staging_bucket:Standard.Base.Data.Text.Text= iam_role:Standard.Base.Data.Text.Text= if_exists:Standard.Database.Bulk_Load_Exists.Bulk_Load_Exists= temporary:Standard.Base.Data.Boolean.Boolean= -> Standard.Base.Any.Any - close self -> Standard.Base.Any.Any - create url:Standard.Base.Any.Any properties:Standard.Base.Any.Any make_new:Standard.Base.Any.Any -> Standard.Base.Any.Any - create_literal_table self source:Standard.Table.Table.Table alias:Standard.Base.Data.Text.Text -> (Standard.Table.Table.Table&Standard.Database.DB_Table.DB_Table&Standard.Base.Any.Any) diff --git a/distribution/lib/Standard/AWS/0.0.0-dev/src/Database/Redshift/Redshift_Connection.enso b/distribution/lib/Standard/AWS/0.0.0-dev/src/Database/Redshift/Redshift_Connection.enso index c3829fbbb622..8e022d3357e6 100644 --- a/distribution/lib/Standard/AWS/0.0.0-dev/src/Database/Redshift/Redshift_Connection.enso +++ b/distribution/lib/Standard/AWS/0.0.0-dev/src/Database/Redshift/Redshift_Connection.enso @@ -1,14 +1,20 @@ from Standard.Base import all +import Standard.Base.Errors.Illegal_Argument.Illegal_Argument import Standard.Base.Metadata.File_Type +import Standard.Base.Runtime.Context import Standard.Base.Visualization.Table_Viz_Data.Table_Viz_Data from Standard.Base.Metadata.Choice import Option from Standard.Base.Metadata.Widget import File_Browse, Single_Choice, Text_Input +from Standard.Table import Table, Data_Formatter +import Standard.Table.In_Memory_Table.In_Memory_Table +import Standard.Table.Internal.In_Memory_Helpers import Standard.Table.Rows_To_Read.Rows_To_Read -from Standard.Table import Table +import Standard.Database.Bulk_Load_Exists.Bulk_Load_Exists import Standard.Database.Column_Description.Column_Description import Standard.Database.Connection.Connection.Connection +import Standard.Database.Connection.Credentials.Credentials import Standard.Database.DB_Table as DB_Table_Module import Standard.Database.DB_Table.DB_Table import Standard.Database.Dialects.Dialect.Dialect @@ -27,9 +33,14 @@ from Standard.Database.Internal.Upload.Operations.Create import create_table_imp import project.Database.Redshift.Internal.Redshift_Dialect import project.Database.Redshift.Internal.Redshift_Type_Mapping.Redshift_Type_Mapping +import project.AWS_Credential.AWS_Credential +import project.S3.S3 +import project.S3.S3_File.S3_File +polyglot java import java.lang.ArithmeticException polyglot java import org.enso.database.JDBCDriverTypes polyglot java import org.enso.database.JDBCProxy +polyglot java import org.enso.table.data.column.storage.type.BigDecimalType type Redshift_Connection ## --- @@ -360,6 +371,89 @@ type Redshift_Connection execute self query = self.connection.execute query + ## --- + icon: data_upload + --- + Bulk load a Table into Redshift using the COPY command via S3 staging. + + This method uploads the table as a CSV file to S3, then uses Redshift's + COPY command to efficiently load the data into the database. The staging + file is automatically deleted after the load completes. + + ## Arguments: + - `table`: The input table to upload. Must be an in-memory table; + DB_Tables are not supported. + - `table_name`: The name of the table to create in Redshift. + - `staging_bucket`: The S3 bucket path to use for staging the CSV file + (e.g., "s3://my-bucket/staging"). A unique filename will be generated + automatically. + - `credentials`: AWS credentials to use for S3 access. Defaults to + `AWS_Credential.Profile` which uses the default AWS profile. + - `iam_role`: The IAM role ARN that Redshift should assume to access the + S3 staging bucket (e.g., "arn:aws:iam::123456789012:role/MyRedshiftRole"). + - `if_exists`: What action to take if the table already exists. Defaults to + `Raise_Error`. + - `Raise_Error` - a `Table_Already_Exists` error will be raised. + - `Drop_Table` - the existing table will be dropped before creating the new one. + - `Truncate_Table` - the existing table will be truncated before loading data into it. + - `Append_To_Table` - data will be appended to the existing table. + - `temporary`: If set to `True`, the created table will be temporary. + Defaults to `False`. + + ## Returns + A `DB_Table` representing the created or updated table. + + ## Errors + - `Illegal_Argument` if the table contains unsupported column types (Null, + Mixed, Binary, or Unsupported_Data_Type), or if a `DB_Table` is provided + instead of an in-memory table. + - `Table_Already_Exists` if the table exists and `if_exists` is set to + `Raise_Error`. + @staging_bucket _make_staging_bucket_widget + bulk_load self table:Table=(Missing_Argument.throw "table") table_name:Text=(Missing_Argument.throw "table_name") staging_bucket_credentials:Credentials|AWS_Credential=..Profile staging_bucket:Text=(Missing_Argument.throw "staging_bucket") iam_role:Text=(Missing_Argument.throw "iam_role") if_exists:Bulk_Load_Exists=..Raise_Error temporary:Boolean=False = case table of + _ : DB_Table -> + Error.throw (Illegal_Argument.Error "Cannot bulk load from a DB_Table, use `read` to materialize or use `select_into_database_table` if on same server.") + _ : In_Memory_Table -> + ## Check for illegal column types + removed = table.remove_columns [(..By_Type ..Null), (..By_Type ..Mixed), (..By_Type ..Unsupported_Data_Type), (..By_Type ..Binary)] + if removed.column_count != table.column_count then Error.throw (Illegal_Argument.Error "The table contains columns with unsupported types (Null, Mixed, Binary, or Unsupported_Data_Type) that cannot be uploaded to Redshift. Please remove or convert these columns before uploading.") + + ## Check if table exists + exists = self.query (..Table_Name table_name) . is_error . not + if exists && if_exists==Bulk_Load_Exists.Raise_Error then Error.throw (Table_Already_Exists.Error table_name) + + ## Check Execution Context + Context.Output.if_enabled disabled_message="As writing is disabled, cannot load data. Press the Write button ▶ to perform the operation." panic=False <| + Nothing + + normalized_staging_bucket = if staging_bucket.starts_with "s3://" then staging_bucket else "s3://" + staging_bucket + "/" + normalized_staging_bucket2 = if normalized_staging_bucket.ends_with "/" then normalized_staging_bucket else normalized_staging_bucket + "/" + s3_file_path = normalized_staging_bucket2 + table_name + "_" + Random.uuid + ".csv" + s3_file = S3_File.new s3_file_path staging_bucket_credentials + if s3_file.is_error then Error.throw s3_file + written_s3_file = table.write s3_file format=(..Delimited value_formatter=(Data_Formatter.Value datetime_formats=['yyyy-MM-dd HH:mm:ssz'])) + Error.return_if_error written_s3_file + Error.return_if_error written_s3_file + created_table = if exists.not then self.create_table table_name table primary_key=[] temporary=temporary else case if_exists of + Bulk_Load_Exists.Drop_Table -> + self.drop_table table_name if_exists=True + self.create_table table_name table primary_key=[] temporary=temporary + Bulk_Load_Exists.Truncate_Table -> + self.truncate_table table_name + self.query (..Table_Name table_name) + Bulk_Load_Exists.Append_To_Table -> + self.query (..Table_Name table_name) + + ## Assuming we managed to create the table, proceed with copy + created_table.if_not_error <| + result = self.execute ("COPY " + table_name + " FROM '" + s3_file_path + "'" + " IAM_ROLE '" + iam_role + "'" + " FORMAT AS CSV" + " IGNOREHEADER 1;") + if result.is_error then + s3_file.delete + Error.throw result + + s3_file.delete + created_table + ## --- private: true --- @@ -442,6 +536,12 @@ type Redshift_Connection to_js_object self = JS_Object.from_pairs <| [["type", "Redshift_Connection"], ["links", self.connection.tables.at "Name" . to_vector]] +private _make_staging_bucket_widget connection cache=Nothing = + _ = connection + cached_credentials = cache.if_not_nothing <| cache "staging_bucket_credentials" + credentials = if cached_credentials.is_nothing then AWS_Credential.Default else cached_credentials + Single_Choice display=..Always values=((S3.list_buckets credentials).map c-> (Option c ("'"+c+"'"))) + ## --- private: true --- diff --git a/distribution/lib/Standard/DuckDB/0.0.0-dev/src/DuckDB_Connection.enso b/distribution/lib/Standard/DuckDB/0.0.0-dev/src/DuckDB_Connection.enso index 50c792d32cd7..e2b50e22bf00 100644 --- a/distribution/lib/Standard/DuckDB/0.0.0-dev/src/DuckDB_Connection.enso +++ b/distribution/lib/Standard/DuckDB/0.0.0-dev/src/DuckDB_Connection.enso @@ -571,7 +571,7 @@ type DuckDB_Connection if exists && if_exists==Bulk_Load_Exists.Raise_Error then Error.throw (Table_Already_Exists.Error table_name) ## Check Execution Context - if self.is_in_memory.not then Context.Output.if_enabled disabled_message="As writing is disabled, cannot create a new index. Press the Write button ▶ to perform the operation." panic=False <| + if self.is_in_memory.not then Context.Output.if_enabled disabled_message="As writing is disabled, cannot load data. Press the Write button ▶ to perform the operation." panic=False <| Nothing Context.Output.with_enabled <|