Skip to content

feat: Add S3 content sync pipeline#2877

Open
feoh wants to merge 2 commits intomasterfrom
cpatti/s3_sync_pipeline
Open

feat: Add S3 content sync pipeline#2877
feoh wants to merge 2 commits intomasterfrom
cpatti/s3_sync_pipeline

Conversation

@feoh
Copy link
Contributor

@feoh feoh commented Feb 6, 2026

Initial cut at implementation by Github Copilot.

mitodl/hq#8703

What are the relevant tickets?

Fixes #8703

Description (What does it do?)

When doing a database restore in ocw-studio, you will have WebsiteContent that references static resources that don't exist in your environment. We would like to be able to also periodically sync all static resources stored in the AWS S3 bucket denoted by the env variable AWS_STORAGE_BUCKET_NAME from another instance of ocw-studio.

How can this be tested?

I have no earthly idea, and that's a problem. I'll talk to @gumaerc about this on Monday.

Initial cut at implementation by Github Copilot.

mitodl/hq#8703
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @feoh, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a robust solution for maintaining data consistency of static resources across environments, particularly after database restores. By implementing a new Concourse pipeline, the system can now automatically and periodically synchronize content from a designated S3 bucket to the application's primary storage bucket. This feature enhances the resilience and operational efficiency of the platform by ensuring that all referenced static assets are readily available.

Highlights

  • New S3 Content Sync Pipeline: Introduced a new Concourse pipeline designed to periodically synchronize static resources between two AWS S3 buckets. This addresses the issue of missing static content after database restores.
  • Management Command for Pipeline Upsertion: Added a Django management command (upsert_s3_bucket_sync_pipeline) to facilitate the creation, updating, and optional unpausing or deletion of the S3 bucket sync Concourse pipeline.
  • Configurable S3 Sync Settings: New Django settings AWS_IMPORT_STORAGE_BUCKET_NAME and AWS_S3_SYNC_INTERVAL have been added to configure the source S3 bucket for synchronization and the frequency of the sync operation, respectively.
  • Celery Task for Asynchronous Pipeline Management: A new Celery task upsert_s3_bucket_sync_pipeline was added to handle the asynchronous management of the S3 content sync pipeline, ensuring non-blocking operations.
Changelog
  • content_sync/api.py
    • Added get_s3_bucket_sync_pipeline function to retrieve the new S3 bucket synchronization pipeline.
  • content_sync/management/commands/upsert_s3_bucket_sync_pipeline.py
    • New file added: Implements a Django management command to upsert (create/update) the S3 bucket sync Concourse pipeline.
    • Includes options to unpause the pipeline and delete existing pipelines before upserting.
    • Utilizes a Celery task for asynchronous pipeline operations.
  • content_sync/pipelines/base.py
    • Added BaseS3BucketSyncPipeline as a new base class for S3 bucket syncing pipelines.
  • content_sync/pipelines/concourse.py
    • Imported BaseS3BucketSyncPipeline and S3BucketSyncPipelineDefinition.
    • Introduced S3BucketSyncPipeline class, extending BaseS3BucketSyncPipeline and GeneralPipeline, to define the Concourse-specific implementation.
    • Configured mandatory settings for the S3 sync pipeline, including AWS_STORAGE_BUCKET_NAME and AWS_IMPORT_STORAGE_BUCKET_NAME.
    • Implemented the upsert_pipeline method to generate and update the Concourse pipeline configuration.
  • content_sync/pipelines/definitions/concourse/s3_bucket_sync_pipeline.py
    • New file added: Defines the S3BucketSyncPipelineDefinition class, which outlines the structure of the Concourse pipeline for S3 bucket synchronization.
    • Configures a time-based trigger for periodic synchronization.
    • Specifies an AWS CLI s3 sync command to transfer content between the source and destination S3 buckets.
    • Includes conditional logic for handling AWS credentials in development environments.
  • content_sync/tasks.py
    • Added a new Celery task upsert_s3_bucket_sync_pipeline to asynchronously manage the S3 bucket sync pipeline, including upsertion and optional unpausing.
  • main/settings.py
    • Added AWS_IMPORT_STORAGE_BUCKET_NAME setting to specify the source S3 bucket for content synchronization.
    • Added AWS_S3_SYNC_INTERVAL setting to configure the frequency of the S3 bucket synchronization pipeline (e.g., '24h', '12h').
Activity
  • This pull request represents an initial implementation, with some parts generated by GitHub Copilot.
  • The author plans to discuss testing strategies with @gumaerc on Monday, indicating ongoing development and collaboration.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new pipeline to periodically synchronize content between two S3 buckets. The implementation includes a new Concourse pipeline definition, a Celery task to manage it, and a corresponding management command. The changes are generally well-structured, but I've found a critical bug in the pipeline definition's shell command that would cause it to fail, and a minor redundancy in the class initialization. My specific comments are provided below.

Comment on lines +68 to +70
aws s3{CLI_ENDPOINT_URL} sync s3://{import_bucket}/ s3://{storage_bucket}/
"""

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The multiline f-string used for sync_commands includes leading whitespace and newlines. When passed to sh -exc, this will cause the command to fail because the shell will not find the aws executable. To fix this, the command should be defined on a single line without leading whitespace.

        sync_commands = f"aws s3{CLI_ENDPOINT_URL} sync s3://{import_bucket}/ s3://{storage_bucket}/"

Comment on lines +47 to +48
base.__init__(**kwargs)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This call to super().__init__() is redundant as it is called again on lines 98-100. The first call should be removed to avoid confusion and prevent kwargs from being processed twice.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new Concourse pipeline + supporting Celery/management-command wiring to periodically sync static assets between S3 buckets (to address missing referenced resources after DB restores).

Changes:

  • Introduces new settings for the source bucket and sync interval (AWS_IMPORT_STORAGE_BUCKET_NAME, AWS_S3_SYNC_INTERVAL).
  • Adds a new Concourse pipeline definition and pipeline class to run aws s3 sync on a timer.
  • Adds API/task/management command entrypoints to upsert (and optionally unpause) the pipeline.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
main/settings.py Adds env settings for source bucket + interval for the sync pipeline.
content_sync/api.py Exposes get_s3_bucket_sync_pipeline() for backend-specific pipeline construction.
content_sync/tasks.py Adds Celery task to upsert/unpause the new pipeline.
content_sync/pipelines/base.py Adds a base pipeline type/name for the S3 sync pipeline.
content_sync/pipelines/concourse.py Implements the Concourse S3BucketSyncPipeline that upserts the new definition.
content_sync/pipelines/definitions/concourse/s3_bucket_sync_pipeline.py New pipeline definition with timer resource + AWS CLI sync task.
content_sync/management/commands/upsert_s3_bucket_sync_pipeline.py Adds management command to upsert (and optionally delete/unpause) the pipeline.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +126 to +131
@is_publish_pipeline_enabled
def get_s3_bucket_sync_pipeline(api: object | None = None) -> BasePipeline:
"""Get the S3 bucket sync pipeline"""
return import_string(
f"content_sync.pipelines.{settings.CONTENT_SYNC_PIPELINE_BACKEND}.S3BucketSyncPipeline"
)(api=api)
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add unit tests for this new API entrypoint (at minimum: verifies the correct import path is used based on CONTENT_SYNC_PIPELINE_BACKEND, and that the decorator returns None when pipeline backend is unset). Existing pipeline getters in this module are covered by api_test.py, but this one is not.

Copilot uses AI. Check for mistakes.
Comment on lines +257 to +266
@app.task(acks_late=True)
def upsert_s3_bucket_sync_pipeline(
unpause=False, # noqa: FBT002
) -> bool:
"""Upsert the S3 bucket sync pipeline"""
pipeline = api.get_s3_bucket_sync_pipeline()
pipeline.upsert_pipeline()
if unpause:
pipeline.unpause()
return True
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add test coverage for this new Celery task (e.g., mock api.get_s3_bucket_sync_pipeline(), assert upsert_pipeline is always called, and assert unpause() is called only when unpause=True). Similar tasks in this file are covered in tasks_test.py, but this new task currently isn’t.

Copilot uses AI. Check for mistakes.
Comment on lines +664 to +687
class S3BucketSyncPipeline(BaseS3BucketSyncPipeline, GeneralPipeline):
"""Concourse pipeline to sync S3 buckets periodically"""

PIPELINE_NAME = BaseS3BucketSyncPipeline.PIPELINE_NAME

def __init__(self, api: PipelineApi | None = None):
"""Initialize the pipeline instance"""
self.MANDATORY_SETTINGS = [
*MANDATORY_CONCOURSE_SETTINGS,
"AWS_STORAGE_BUCKET_NAME",
"AWS_IMPORT_STORAGE_BUCKET_NAME",
]
super().__init__(api=api)

def upsert_pipeline(self):
"""
Create or update the concourse pipeline
"""
pipeline_definition = S3BucketSyncPipelineDefinition(
import_bucket=settings.AWS_IMPORT_STORAGE_BUCKET_NAME,
storage_bucket=settings.AWS_STORAGE_BUCKET_NAME,
sync_interval=settings.AWS_S3_SYNC_INTERVAL,
)
self.upsert_config(pipeline_definition.json(), self.PIPELINE_NAME)
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add tests for the new S3BucketSyncPipeline class (similar to existing ThemeAssetsPipeline/MassBuildSitesPipeline tests): verify mandatory settings are enforced and that upsert_pipeline uses S3BucketSyncPipelineDefinition with the expected buckets/interval, and upserts config under the expected pipeline name.

Copilot uses AI. Check for mistakes.
Comment on lines +26 to +98
class S3BucketSyncPipelineDefinition(Pipeline):
"""
A Pipeline that syncs S3 buckets periodically:

- Triggers on a configurable time interval
- Uses AWS CLI to sync from AWS_IMPORT_STORAGE_BUCKET_NAME to AWS_STORAGE_BUCKET_NAME

Args:
import_bucket(str): The S3 bucket to sync from (source)
storage_bucket(str): The S3 bucket to sync to (destination)
sync_interval(str): The time interval for syncing (e.g., "24h", "12h", "6h")
"""

def __init__(
self,
import_bucket: str,
storage_bucket: str,
sync_interval: str = "24h",
**kwargs,
):
base = super()
base.__init__(**kwargs)

# Time-based trigger resource
timer_resource = Resource(
name=s3_sync_timer_identifier,
type="time",
icon="clock-outline",
source={"interval": sync_interval},
)

resources = [timer_resource]
resource_types = []

# Get step for time trigger
get_timer_step = GetStep(
get=s3_sync_timer_identifier,
trigger=True,
)

# AWS S3 sync task
sync_commands = f"""
aws s3{CLI_ENDPOINT_URL} sync s3://{import_bucket}/ s3://{storage_bucket}/
"""

s3_sync_task = TaskStep(
task=s3_sync_task_identifier,
timeout="2h",
attempts=3,
config=TaskConfig(
platform="linux",
image_resource=AWS_CLI_REGISTRY_IMAGE,
params=(
{}
if not is_dev()
else {
"AWS_ACCESS_KEY_ID": settings.AWS_ACCESS_KEY_ID or "",
"AWS_SECRET_ACCESS_KEY": settings.AWS_SECRET_ACCESS_KEY or "",
}
),
run=Command(
path="sh",
args=["-exc", sync_commands],
),
),
)

tasks = [get_timer_step, s3_sync_task]
job = Job(name=s3_sync_job_identifier, serial=True, plan=tasks)

base.__init__(
resource_types=resource_types, resources=resources, jobs=[job], **kwargs
)
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add unit tests for this new pipeline definition (consistent with other definitions in this directory): assert the generated JSON includes the time resource interval, the job name, and the expected aws s3 sync command (including the dev endpoint URL behavior).

Copilot uses AI. Check for mistakes.
Comment on lines +67 to +90
sync_commands = f"""
aws s3{CLI_ENDPOINT_URL} sync s3://{import_bucket}/ s3://{storage_bucket}/
"""

s3_sync_task = TaskStep(
task=s3_sync_task_identifier,
timeout="2h",
attempts=3,
config=TaskConfig(
platform="linux",
image_resource=AWS_CLI_REGISTRY_IMAGE,
params=(
{}
if not is_dev()
else {
"AWS_ACCESS_KEY_ID": settings.AWS_ACCESS_KEY_ID or "",
"AWS_SECRET_ACCESS_KEY": settings.AWS_SECRET_ACCESS_KEY or "",
}
),
run=Command(
path="sh",
args=["-exc", sync_commands],
),
),
Copy link

Copilot AI Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sync task doesn’t currently apply the repository’s AWS CLI concurrency tuning (AWS_MAX_CONCURRENT_CONNECTIONS). Other large s3 sync operations configure default.s3.max_concurrent_requests from this setting before running aws s3 sync, which can materially affect runtime for large buckets. Consider setting that config at the start of this task and passing AWS_MAX_CONCURRENT_CONNECTIONS as a param/env so the setting is honored here too.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant