Skip to content

Kaligo/pipelinewise-target-redshift

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

130 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

pipelinewise-target-redshift

PyPI version PyPI - Python Version License: Apache2

Singer target that loads data into Amazon Redshift following the Singer spec.

This is a PipelineWise compatible target connector.

How to use it

The recommended method of running this target is to use it from PipelineWise. When running it from PipelineWise you don't need to configure this tap with JSON files and most of things are automated. Please check the related documentation at Target Redshift

If you want to run this Singer Target independently please read further.

Install

First, make sure Python 3 is installed on your system or follow these installation instructions for Mac or Ubuntu.

It's recommended to use a virtualenv:

  python3 -m venv venv
  . venv/bin/activate
  pip install --upgrade pip
  pip install pipelinewise-target-redshift

or

  python3 -m venv venv
  . venv/bin/activate
  pip install --upgrade pip
  pip install .

To run

Like any other target that's following the singer specificiation:

some-singer-tap | target-redshift --config [config.json]

It's reading incoming messages from STDIN and using the properites in config.json to upload data into Amazon Redshift.

Note: To avoid version conflicts run tap and targets in separate virtual environments.

Configuration settings

Running the the target connector requires a config.json file. Example with the minimal settings:

{

  "host": "xxxxxx.redshift.amazonaws.com",
  "port": 5439,
  "user": "my_user",
  "password": "password",
  "dbname": "database_name",
  "aws_access_key_id": "secret",
  "aws_secret_access_key": "secret",
  "s3_bucket": "bucket_name",
  "default_target_schema": "my_target_schema"
}

Full list of options in config.json:

Property Type Required? Description
host String Yes Redshift Host
port Integer Yes Redshift Port
user String Yes Redshift User
password String Yes Redshift Password
dbname String Yes Redshift Database name
aws_profile String No AWS profile name for profile based authentication. If not provided, AWS_PROFILE environment variable will be used.
aws_access_key_id String No S3 Access Key Id. Used for S3 and Redshfit copy operations. If not provided, AWS_ACCESS_KEY_ID environment variable will be used.
aws_secret_access_key String No S3 Secret Access Key. Used for S3 and Redshfit copy operations. If not provided, AWS_SECRET_ACCESS_KEY environment variable will be used.
aws_session_token String No S3 AWS STS token for temporary credentials. If not provided, AWS_SESSION_TOKEN environment variable will be used.
aws_redshift_copy_role_arn String No AWS Role ARN to be used for the Redshift COPY operation. Used instead of the given AWS keys for the COPY operation if provided - the keys are still used for other S3 operations
s3_acl String No S3 Object ACL
s3_bucket String Yes S3 Bucket name
s3_key_prefix String (Default: None) A static prefix before the generated S3 key names. Using prefixes you can upload files into specific directories in the S3 bucket.
copy_options String (Default: EMPTYASNULL BLANKSASNULL TRIMBLANKS TRUNCATECOLUMNS TIMEFORMAT 'auto' COMPUPDATE OFF STATUPDATE OFF). Parameters to use in the COPY command when loading data to Redshift. Some basic file formatting parameters are fixed values and not recommended overriding them by custom values. They are like: CSV GZIP DELIMITER ',' REMOVEQUOTES ESCAPE
batch_size_rows Integer (Default: 100000) Maximum number of rows in each batch. At the end of each batch, the rows in the batch are loaded into Redshift.
flush_all_streams Boolean (Default: False) Flush and load every stream into Redshift when one batch is full. Warning: This may trigger the COPY command to use files with low number of records, and may cause performance problems.
parallelism Integer (Default: 0) The number of threads used to flush tables. 0 will create a thread for each stream, up to parallelism_max. -1 will create a thread for each CPU core. Any other positive number will create that number of threads, up to parallelism_max.
max_parallelism Integer (Default: 16) Max number of parallel threads to use when flushing tables.
default_target_schema String Name of the schema where the tables will be created. If schema_mapping is not defined then every stream sent by the tap is loaded into this schema.
default_target_schema_select_permissions String Grant USAGE privilege on newly created schemas and grant SELECT privilege on newly created tables to a specific list of users or groups. Example: {"users": ["user_1","user_2"], "groups": ["group_1", "group_2"]} If schema_mapping is not defined then every stream sent by the tap is granted accordingly.
schema_mapping Object Useful if you want to load multiple streams from one tap to multiple Redshift schemas.

If the tap sends the stream_id in <schema_name>-<table_name> format then this option overwrites the default_target_schema value. Note, that using schema_mapping you can overwrite the default_target_schema_select_permissions value to grant SELECT permissions to different groups per schemas or optionally you can create indices automatically for the replicated tables.

Note: This is an experimental feature and recommended to use via PipelineWise YAML files that will generate the object mapping in the right JSON format. For further info check a [PipelineWise YAML Example]
disable_table_cache Boolean (Default: False) By default the connector caches the available table structures in Redshift at startup. In this way it doesn't need to run additional queries when ingesting data to check if altering the target tables is required. With disable_table_cache option you can turn off this caching. You will always see the most recent table structures but will cause an extra query runtime.
add_metadata_columns Boolean (Default: False) Metadata columns add extra row level information about data ingestions, (i.e. when was the row read in source, when was inserted or deleted in redshift etc.) Metadata columns are creating automatically by adding extra columns to the tables with a column prefix _SDC_. The metadata columns are documented at https://transferwise.github.io/pipelinewise/data_structure/sdc-columns.html. Enabling metadata columns will flag the deleted rows by setting the _SDC_DELETED_AT metadata column. Without the add_metadata_columns option the deleted rows from singer taps will not be recongisable in Redshift.
hard_delete Boolean (Default: False) When hard_delete option is true then DELETE SQL commands will be performed in Redshift to delete rows in tables. It's achieved by continuously checking the _SDC_DELETED_AT metadata column sent by the singer tap. Due to deleting rows requires metadata columns, hard_delete option automatically enables the add_metadata_columns option as well.
data_flattening_max_level Integer (Default: 0) Object type RECORD items from taps can be loaded into VARIANT columns as JSON (default) or we can flatten the schema by creating columns automatically.

When value is 0 (default) then flattening functionality is turned off.
primary_key_required Boolean (Default: True) Log based and Incremental replications on tables with no Primary Key cause duplicates when merging UPDATE events. When set to true, stop loading data if no Primary Key is defined.
validate_records Boolean (Default: False) Validate every single record message to the corresponding JSON schema. This option is disabled by default and invalid RECORD messages will fail only at load time by Snowflake. Enabling this option will detect invalid records earlier but could cause performance degradation.
skip_updates Boolean No (Default: False) Do not update existing records when Primary Key is defined. Useful to improve performance when records are immutable, e.g. events
append_only Boolean No (Default: False) In fast sync mode, append all records from the stage table to the target table without performing updates or deletions. This is useful for immutable data like events where you only want to append new records. When enabled, all records from the stage table are inserted into the target table regardless of whether they already exist.
detect_deletions Boolean No (Default: False) Enable deletion detection in fast sync mode. When enabled, records that exist in the target table but not in the stage table will be marked as deleted by setting the _SDC_DELETED_AT timestamp. Requires add_metadata_columns to be enabled, tables to have primary keys defined, and FULL_TABLE replication method. This option only works with FULL_TABLE replication method and does not work with incremental updates or full refresh mode.
cleanup_s3_files Boolean No (Default: True) Fast sync mode only. Automatically clean up S3 files after they have been successfully loaded into Redshift. When set to False, S3 files will be retained after loading. This is useful for debugging or when you need to keep the exported files for other purposes. This setting only applies to fast sync operations and has no effect in regular sync mode.
compression String No The compression method to use when writing files to S3 and running Redshift COPY. The currently supported methods are gzip or bzip2. Defaults to none ("").
slices Integer No The number of slices to split files into prior to running COPY on Redshift. This should be set to the number of Redshift slices. The number of slices per node depends on the node size of the cluster - run SELECT COUNT(DISTINCT slice) slices FROM stv_slices to calculate this. Defaults to 1.
temp_dir String (Default: platform-dependent) Directory of temporary CSV files with RECORD messages.
metrics Object No (Optional) StatsD metrics configuration. See Metrics section for details.

Metrics

The target can optionally emit StatsD metrics. To enable this, add a metrics object to your config.json:

{
  "metrics": {
    "statsd_host": "localhost",
    "statsd_port": 8125,
    "statsd_namespace": "my_namespace",
    "statsd_enabled": true
  }
}
Property Type Required? Description
statsd_host String No StatsD host
statsd_port Integer No StatsD port
statsd_namespace String No StatsD namespace (added as a tag to all metrics)
statsd_enabled Boolean No (Default: False) Enable StatsD metrics

Fast Sync Support

This target supports Fast Sync mode when used with tap-postgres configured for fast sync. Fast sync enables high-performance data loading directly from S3 (exported by RDS PostgreSQL) to Redshift, bypassing the traditional Singer record-by-record processing.

How Fast Sync Works

  1. Source Side (tap-postgres): When fast sync is enabled, tap-postgres exports data directly from RDS PostgreSQL to S3 using aws_s3.query_export_to_s3 and embeds fast sync information in STATE messages under bookmarks[stream_id]['fast_sync_s3_info'].

  2. Target Side (target-redshift): This target automatically extracts fast sync operations from STATE message bookmarks and:

    • Queues fast sync operations for parallel processing
    • Processes all queued operations together at the end (following the same pattern as regular stream flushing)
    • For each operation:
      • Creates a temporary staging table
      • Loads data from S3 using Redshift's COPY command
      • Merges data into the target table (INSERT/UPDATE)
      • Detects deleted rows and sets _SDC_DELETED_AT timestamp
      • Cleans up temporary tables and S3 files

    Fast sync operations are processed in parallel using the same parallelism and max_parallelism configuration as regular stream flushing, allowing efficient processing of multiple streams simultaneously.

Fast Sync Requirements

  • Redshift IAM Role: The Redshift cluster must have an IAM role with:

    • s3:GetObject permission on the S3 bucket containing exported data
    • s3:ListBucket permission on the S3 bucket
    • s3:DeleteObject permission (for cleanup after load)
  • S3 Bucket Access: The S3 bucket containing exported data must be accessible from the Redshift cluster.

  • Primary Keys: For deletion detection to work, tables must have primary keys defined.

  • Metadata Columns: Enable add_metadata_columns in the target config to support deletion detection via _SDC_DELETED_AT.

  • Deletion Detection: Enable detect_deletions in the target config to automatically detect and mark deleted records during fast sync operations. This option is disabled by default.

Fast Sync Benefits

  • Performance: Direct COPY from S3 is much faster than record-by-record inserts
  • Scalability: Handles large tables efficiently
  • Parallel Processing: Multiple fast sync operations are queued and processed in parallel, following the same parallelism configuration as regular stream flushing
  • Deletion Detection: Automatically detects and marks deleted rows by comparing full table dumps
  • Reduced Network Traffic: Data flows directly from S3 to Redshift without intermediate processing

Example Configuration

No additional configuration is required in target-redshift for fast sync. The target automatically extracts and processes fast sync operations from STATE message bookmarks when they are received from the tap.

Fast sync operations use the same parallelism settings as regular stream flushing:

  • parallelism: Number of threads for processing fast sync operations (default: 0, auto-detect)
  • max_parallelism: Maximum number of parallel threads (default: 16)

However, ensure your Redshift cluster has the proper IAM role configured:

{
  "host": "xxxxxx.redshift.amazonaws.com",
  "port": 5439,
  "user": "my_user",
  "password": "password",
  "dbname": "database_name",
  "aws_redshift_copy_role_arn": "arn:aws:iam::account-id:role/redshift-role",
  "s3_bucket": "target-bucket-name",
  "default_target_schema": "my_target_schema",
  "add_metadata_columns": true,
  "detect_deletions": false
}

For more information about setting up fast sync on the tap side, see the tap-postgres Fast Sync documentation.

To run tests:

  1. Install python dependencies in a virtual env:
  python3 -m venv venv
  . venv/bin/activate
  pip install --upgrade pip
  pip install -e ".[test]"
  1. To run unit tests:
  coverage run -m pytest -vv --disable-pytest-warnings tests/unit && coverage report
  1. To run integration tests define environment variables first:
  export TARGET_REDSHIFT_HOST=<redshift-host>
  export TARGET_REDSHIFT_PORT=<redshift-port>
  export TARGET_REDSHIFT_USER=<redshift-user>
  export TARGET_REDSHIFT_PASSWORD=<redshift-password>
  export TARGET_REDSHIFT_DBNAME=<redshift-database-name>
  export TARGET_REDSHIFT_SCHEMA=<redshift-target-schema>
  export TARGET_REDSHIFT_AWS_ACCESS_KEY=<aws-access-key-id>
  export TARGET_REDSHIFT_AWS_SECRET_ACCESS_KEY=<aws-access-secret-access-key>
  export TARGET_REDSHIFT_S3_ACL=<s3-target-acl>
  export TARGET_REDSHIFT_S3_BUCKET=<s3-bucket>
  export TARGET_REDSHIFT_S3_KEY_PREFIX=<s3-bucket-directory>

  coverage run -m pytest -vv --disable-pytest-warnings tests/integration && coverage report

To run pylint:

  1. Install python dependencies and run python linter
  python3 -m venv venv
  . venv/bin/activate
  pip install --upgrade pip
  pip install .[test]
  pip install pylint
  pylint target_redshift -d C,W,unexpected-keyword-arg,duplicate-code

License

Apache License Version 2.0

See LICENSE to see the full text.

About

Singer.io Target for Amazon Redshift - PipelineWise compatible

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Python 100.0%