Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
- [New `Profile` API for timing code execution.][14827]
- [Expanded S3 API with versions and signed uri][14831]
- [Support for reading JSON data from database connections.][14872]
- [Add Redshift bulk loading][14860]

[14522]: https://github.com/enso-org/enso/pull/14522
[14476]: https://github.com/enso-org/enso/pull/14476
Expand All @@ -56,6 +57,7 @@
[14827]: https://github.com/enso-org/enso/pull/14827
[14831]: https://github.com/enso-org/enso/pull/14831
[14872]: https://github.com/enso-org/enso/pull/14872
[14860]: https://github.com/enso-org/enso/pull/14860

#### Enso Language & Runtime

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
## module Standard.AWS.Database.Redshift.Redshift_Connection
- type Redshift_Connection
- base_connection self -> Standard.Base.Any.Any
- bulk_load self table:Standard.Table.Table.Table= table_name:Standard.Base.Data.Text.Text= staging_bucket_credentials:(Standard.Database.Connection.Credentials.Credentials|Standard.AWS.AWS_Credential.AWS_Credential)= staging_bucket:Standard.Base.Data.Text.Text= iam_role:Standard.Base.Data.Text.Text= if_exists:Standard.Database.Bulk_Load_Exists.Bulk_Load_Exists= temporary:Standard.Base.Data.Boolean.Boolean= -> Standard.Base.Any.Any
- close self -> Standard.Base.Any.Any
- create url:Standard.Base.Any.Any properties:Standard.Base.Any.Any make_new:Standard.Base.Any.Any -> Standard.Base.Any.Any
- create_literal_table self source:Standard.Table.Table.Table alias:Standard.Base.Data.Text.Text -> (Standard.Table.Table.Table&Standard.Database.DB_Table.DB_Table&Standard.Base.Any.Any)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
from Standard.Base import all
import Standard.Base.Errors.Illegal_Argument.Illegal_Argument
import Standard.Base.Metadata.File_Type
import Standard.Base.Runtime.Context
import Standard.Base.Visualization.Table_Viz_Data.Table_Viz_Data
from Standard.Base.Metadata.Choice import Option
from Standard.Base.Metadata.Widget import File_Browse, Single_Choice, Text_Input

from Standard.Table import Table, Data_Formatter
import Standard.Table.In_Memory_Table.In_Memory_Table
import Standard.Table.Internal.In_Memory_Helpers
import Standard.Table.Rows_To_Read.Rows_To_Read
from Standard.Table import Table

import Standard.Database.Bulk_Load_Exists.Bulk_Load_Exists
import Standard.Database.Column_Description.Column_Description
import Standard.Database.Connection.Connection.Connection
import Standard.Database.Connection.Credentials.Credentials
import Standard.Database.DB_Table as DB_Table_Module
import Standard.Database.DB_Table.DB_Table
import Standard.Database.Dialects.Dialect.Dialect
Expand All @@ -27,9 +33,14 @@ from Standard.Database.Internal.Upload.Operations.Create import create_table_imp

import project.Database.Redshift.Internal.Redshift_Dialect
import project.Database.Redshift.Internal.Redshift_Type_Mapping.Redshift_Type_Mapping
import project.AWS_Credential.AWS_Credential
import project.S3.S3
import project.S3.S3_File.S3_File

polyglot java import java.lang.ArithmeticException
polyglot java import org.enso.database.JDBCDriverTypes
polyglot java import org.enso.database.JDBCProxy
polyglot java import org.enso.table.data.column.storage.type.BigDecimalType

type Redshift_Connection
## ---
Expand Down Expand Up @@ -360,6 +371,89 @@ type Redshift_Connection
execute self query =
self.connection.execute query

## ---
icon: data_upload
---
Bulk load a Table into Redshift using the COPY command via S3 staging.

This method uploads the table as a CSV file to S3, then uses Redshift's
COPY command to efficiently load the data into the database. The staging
file is automatically deleted after the load completes.

## Arguments:
- `table`: The input table to upload. Must be an in-memory table;
DB_Tables are not supported.
- `table_name`: The name of the table to create in Redshift.
- `staging_bucket`: The S3 bucket path to use for staging the CSV file
(e.g., "s3://my-bucket/staging"). A unique filename will be generated
automatically.
- `credentials`: AWS credentials to use for S3 access. Defaults to
`AWS_Credential.Profile` which uses the default AWS profile.
- `iam_role`: The IAM role ARN that Redshift should assume to access the
S3 staging bucket (e.g., "arn:aws:iam::123456789012:role/MyRedshiftRole").
- `if_exists`: What action to take if the table already exists. Defaults to
`Raise_Error`.
- `Raise_Error` - a `Table_Already_Exists` error will be raised.
- `Drop_Table` - the existing table will be dropped before creating the new one.
- `Truncate_Table` - the existing table will be truncated before loading data into it.
- `Append_To_Table` - data will be appended to the existing table.
- `temporary`: If set to `True`, the created table will be temporary.
Defaults to `False`.

## Returns
A `DB_Table` representing the created or updated table.

## Errors
- `Illegal_Argument` if the table contains unsupported column types (Null,
Mixed, Binary, or Unsupported_Data_Type), or if a `DB_Table` is provided
instead of an in-memory table.
- `Table_Already_Exists` if the table exists and `if_exists` is set to
`Raise_Error`.
@staging_bucket _make_staging_bucket_widget
bulk_load self table:Table=(Missing_Argument.throw "table") table_name:Text=(Missing_Argument.throw "table_name") staging_bucket_credentials:Credentials|AWS_Credential=..Profile staging_bucket:Text=(Missing_Argument.throw "staging_bucket") iam_role:Text=(Missing_Argument.throw "iam_role") if_exists:Bulk_Load_Exists=..Raise_Error temporary:Boolean=False = case table of
_ : DB_Table ->
Error.throw (Illegal_Argument.Error "Cannot bulk load from a DB_Table, use `read` to materialize or use `select_into_database_table` if on same server.")
_ : In_Memory_Table ->
## Check for illegal column types
removed = table.remove_columns [(..By_Type ..Null), (..By_Type ..Mixed), (..By_Type ..Unsupported_Data_Type), (..By_Type ..Binary)]
if removed.column_count != table.column_count then Error.throw (Illegal_Argument.Error "The table contains columns with unsupported types (Null, Mixed, Binary, or Unsupported_Data_Type) that cannot be uploaded to Redshift. Please remove or convert these columns before uploading.")

## Check if table exists
exists = self.query (..Table_Name table_name) . is_error . not
if exists && if_exists==Bulk_Load_Exists.Raise_Error then Error.throw (Table_Already_Exists.Error table_name)

## Check Execution Context
Context.Output.if_enabled disabled_message="As writing is disabled, cannot load data. Press the Write button ▶ to perform the operation." panic=False <|
Nothing

normalized_staging_bucket = if staging_bucket.starts_with "s3://" then staging_bucket else "s3://" + staging_bucket + "/"
normalized_staging_bucket2 = if normalized_staging_bucket.ends_with "/" then normalized_staging_bucket else normalized_staging_bucket + "/"
s3_file_path = normalized_staging_bucket2 + table_name + "_" + Random.uuid + ".csv"
s3_file = S3_File.new s3_file_path staging_bucket_credentials
if s3_file.is_error then Error.throw s3_file
written_s3_file = table.write s3_file format=(..Delimited value_formatter=(Data_Formatter.Value datetime_formats=['yyyy-MM-dd HH:mm:ssz']))
Error.return_if_error written_s3_file
Error.return_if_error written_s3_file
created_table = if exists.not then self.create_table table_name table primary_key=[] temporary=temporary else case if_exists of
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably for a later ticket but wonder if we should take the primary key as an argument?

Bulk_Load_Exists.Drop_Table ->
self.drop_table table_name if_exists=True
self.create_table table_name table primary_key=[] temporary=temporary
Bulk_Load_Exists.Truncate_Table ->
self.truncate_table table_name
self.query (..Table_Name table_name)
Bulk_Load_Exists.Append_To_Table ->
self.query (..Table_Name table_name)

## Assuming we managed to create the table, proceed with copy
created_table.if_not_error <|
result = self.execute ("COPY " + table_name + " FROM '" + s3_file_path + "'" + " IAM_ROLE '" + iam_role + "'" + " FORMAT AS CSV" + " IGNOREHEADER 1;")
if result.is_error then
s3_file.delete
Error.throw result

s3_file.delete
created_table

## ---
private: true
---
Expand Down Expand Up @@ -442,6 +536,12 @@ type Redshift_Connection
to_js_object self =
JS_Object.from_pairs <| [["type", "Redshift_Connection"], ["links", self.connection.tables.at "Name" . to_vector]]

private _make_staging_bucket_widget connection cache=Nothing =
_ = connection
cached_credentials = cache.if_not_nothing <| cache "staging_bucket_credentials"
credentials = if cached_credentials.is_nothing then AWS_Credential.Default else cached_credentials
Single_Choice display=..Always values=((S3.list_buckets credentials).map c-> (Option c ("'"+c+"'")))

## ---
private: true
---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ type DuckDB_Connection
if exists && if_exists==Bulk_Load_Exists.Raise_Error then Error.throw (Table_Already_Exists.Error table_name)

## Check Execution Context
if self.is_in_memory.not then Context.Output.if_enabled disabled_message="As writing is disabled, cannot create a new index. Press the Write button ▶ to perform the operation." panic=False <|
if self.is_in_memory.not then Context.Output.if_enabled disabled_message="As writing is disabled, cannot load data. Press the Write button ▶ to perform the operation." panic=False <|
Nothing

Context.Output.with_enabled <|
Expand Down
Loading