Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Step 1: Use an official OpenJDK base image, as Spark requires Java
FROM openjdk:11-jre-slim

# Step 2: Set environment variables for Spark and Python
ENV SPARK_VERSION=3.5.0
ENV HADOOP_VERSION=3
ENV SPARK_HOME=/opt/spark
ENV PATH=$SPARK_HOME/bin:$PATH
ENV PYTHONUNBUFFERED=1

# Step 3: Install Python, pip, and other necessary tools
RUN apt-get update && \
apt-get install -y python3 python3-pip curl && \
rm -rf /var/lib/apt/lists/*

# Step 4: Download and install Spark
RUN curl -fSL "https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" -o /tmp/spark.tgz && \
tar -xvf /tmp/spark.tgz -C /opt/ && \
mv /opt/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION} ${SPARK_HOME} && \
rm /tmp/spark.tgz

# Step 5: Set up the application directory
WORKDIR /app

# Step 6: Copy and install Python dependencies
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt

# Step 7: Copy your application source code
COPY src ./src
COPY config.json .
COPY pyspark_job.py .

# Step 8: Define the entry point for running the PySpark job
ENTRYPOINT ["spark-submit", "pyspark_job.py"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# AWS Glue to Google Cloud Dataplex Connector

This connector extracts metadata from AWS Glue and transforms it into a format that can be imported into Google Cloud Dataplex. It captures database, table, and lineage information from AWS Glue and prepares it for ingestion into Dataplex, allowing you to catalog your AWS data assets within Google Cloud.

This connector is designed to be run from a Python virtual environment.

***

## Prerequisites

Before using this connector, you need to have the following set up:

1. **AWS Credentials**: You will need an AWS access key ID and a secret access key with permissions to access AWS Glue.
2. **Google Cloud Project**: A Google Cloud project is required to run the script and store the output.
3. **GCP Secret Manager**: The AWS credentials must be stored in a secret in Google Cloud Secret Manager. The secret payload must be a **JSON object** with the following format:
```json
{
"access_key_id": "YOUR_AWS_ACCESS_KEY_ID",
"secret_access_key": "YOUR_AWS_SECRET_ACCESS_KEY"
}
```
4. **Python 3** and **pip** installed.

***

## Configuration

The connector is configured using the `config.json` file. Ensure this file is present in the same directory as `main.py`. Here is a description of the parameters:

| Parameter | Description |
| :--- | :--- |
| **`aws_region`** | The AWS region where your Glue Data Catalog is located (e.g., "eu-north-1"). |
| **`project_id`** | Your Google Cloud Project ID. |
| **`location_id`** | The Google Cloud region where you want to run the script (e.g., "us-central1"). |
| **`entry_group_id`** | The Dataplex entry group ID where the metadata will be imported. |
| **`gcs_bucket`** | The Google Cloud Storage bucket where the output metadata file will be stored. |
| **`aws_account_id`** | Your AWS account ID. |
| **`output_folder`** | The folder within the GCS bucket where the output file will be stored. |
| **`gcp_secret_id`** | The ID of the secret in GCP Secret Manager that contains your AWS credentials. |

***

## Running the Connector

You can run the connector from your local machine using a Python virtual environment.

### Setup and Execution

1. **Create a virtual environment:**
```bash
python3 -m venv venv
source venv/bin/activate
```
2. **Install the required dependencies:**
```bash
pip install -r requirements.txt
```
3. **Run the connector:**
Execute the `main.py` script. It will read settings from `config.json` in the current directory.
```bash
python3 main.py
```

***

## Output

The connector generates a JSONL file in the specified GCS bucket and folder. This file contains the extracted metadata in a format that can be imported into Dataplex.

***

## Importing Metadata into Dataplex

Once the metadata file has been generated, you can import it into Dataplex using a metadata import job.

1. **Prepare the Request File:**
Open the `request.json` file and replace the following placeholders with your actual values:
* `<YOUR_GCS_BUCKET>`: The bucket where the output file was saved.
* `<YOUR_OUTPUT_FOLDER>`: The folder where the output file was saved.
* `<YOUR_PROJECT_ID>`: Your Google Cloud Project ID.
* `<YOUR_LOCATION>`: Your Google Cloud Location (e.g., `us-central1`).
* `<YOUR_ENTRY_GROUP_ID>`: The Dataplex Entry Group ID.

2. **Run the Import Command:**
Use the following `curl` command to initiate the import. Make sure to replace `{project-id}`, `{location}`, and `{job-id}` in the URL with your actual project ID, location, and a unique job ID.

```bash
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://dataplex.googleapis.com/v1/projects/{project-id}/locations/{location}/metadataJobs?metadataJobId={job-id}"
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#!/bin/bash

# Terminate script on error
set -e

# --- Read script arguments ---
POSITIONAL=()
while [[ $# -gt 0 ]]
do
key="$1"

case $key in
-p|--project_id)
PROJECT_ID="$2"
shift # past argument
shift # past value
;;
-r|--repo)
REPO="$2"
shift # past argument
shift # past value
;;
-i|--image_name)
IMAGE_NAME="$2"
shift # past argument
shift # past value
;;
*) # unknown option
POSITIONAL+=("$1") # save it in an array for later
shift # past argument
;;
esac
done
set -- "${POSITIONAL[@]}" # restore positional parameters

# --- Validate arguments ---
if [ -z "$PROJECT_ID" ]; then
echo "Project ID not provided. Please provide project ID with the -p flag."
exit 1
fi

if [ -z "$REPO" ]; then
# Default to gcr.io/[PROJECT_ID] if no repo is provided
REPO="gcr.io/${PROJECT_ID}"
echo "Repository not provided, defaulting to: ${REPO}"
fi

if [ -z "$IMAGE_NAME" ]; then
IMAGE_NAME="aws-glue-to-dataplex-pyspark"
echo "Image name not provided, defaulting to: ${IMAGE_NAME}"
fi

IMAGE_TAG="latest"
IMAGE_URI="${REPO}/${IMAGE_NAME}:${IMAGE_TAG}"

# --- Build the Docker Image ---
echo "Building Docker image: ${IMAGE_URI}..."
# Use the Dockerfile for PySpark
docker build -t "${IMAGE_URI}" -f Dockerfile .

if [ $? -ne 0 ]; then
echo "Docker build failed."
exit 1
fi
echo "Docker build successful."

# --- Run the Docker Container ---
echo "Running the PySpark job in a Docker container..."
echo "Using local gcloud credentials for authentication."

# We mount the local gcloud config directory into the container.
# This allows the container to use your Application Default Credentials.
# Make sure you have run 'gcloud auth application-default login' on your machine.
docker run --rm \
-v ~/.config/gcloud:/root/.config/gcloud \
"${IMAGE_URI}"

if [ $? -ne 0 ]; then
echo "Docker run failed."
exit 1
fi

echo "PySpark job completed successfully."

# --- Optional: Push to Google Container Registry ---
read -p "Do you want to push the image to ${REPO}? (y/n) " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]
then
echo "Pushing image to ${REPO}..."
gcloud auth configure-docker
docker push "${IMAGE_URI}"
echo "Image pushed successfully."
fi

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"aws_region": "<YOUR_AWS_REGION>",
"project_id": "<GCP_PROJECT>",
"location_id": "<GCP_REGION>",
"entry_group_id": "<DATAPLEX_ENTRY_GROUP>",
"gcs_bucket": "<GCS_BUCKET>",
"aws_account_id": "<AWS_ACCOUNT_ID>",
"output_folder": "<GCS_FOLDER_NAME>",
"gcp_secret_id": "<GCP_SECRET_ID>"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import sys
from src import bootstrap

# Allow shared files to be found when running from command line
sys.path.insert(1, '../src/shared')

if __name__ == '__main__':
bootstrap.run()
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import json
from pyspark.sql import SparkSession
from src.aws_glue_connector import AWSGlueConnector
from src.entry_builder import build_database_entry, build_dataset_entry
from src.gcs_uploader import GCSUploader
from src.secret_manager import SecretManager

def main():
"""
Main function to run the AWS Glue to Dataplex metadata connector as a PySpark job.
"""
# Initialize Spark Session
spark = SparkSession.builder.appName("AWSGlueToDataplexConnector").getOrCreate()

# Load configuration from a local file
# In a real cluster environment, this might be passed differently
with open('config.json', 'r') as f:
config = json.load(f)

print("Configuration loaded.")

# Fetch AWS credentials from Secret Manager
print("Fetching AWS credentials from GCP Secret Manager...")
aws_access_key_id, aws_secret_access_key = SecretManager.get_aws_credentials(
project_id=config["project_id"],
secret_id=config["gcp_secret_id"]
)
print("Credentials fetched successfully.")

# Initialize AWS Glue Connector
glue_connector = AWSGlueConnector(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_region=config['aws_region']
)

# Fetch metadata and lineage
print("Fetching metadata from AWS Glue...")
metadata = glue_connector.get_databases()
print(f"Found {len(metadata)} databases.")

print("Fetching lineage info from AWS Glue jobs...")
lineage_info = glue_connector.get_lineage_info()
print(f"Found {len(lineage_info)} lineage relationships.")

# Prepare entries for Dataplex
dataplex_entries = []
for db_name, tables in metadata.items():
dataplex_entries.append(build_database_entry(config, db_name))
for table in tables:
dataplex_entries.append(build_dataset_entry(config, db_name, table, lineage_info))

print(f"Prepared {len(dataplex_entries)} entries for Dataplex.")

# Initialize GCSUploader
gcs_uploader = GCSUploader(
project_id=config['project_id'],
bucket_name=config['gcs_bucket']
)

# Upload to GCS
print(f"Uploading entries to GCS bucket: {config['gcs_bucket']}/{config['output_folder']}...")
gcs_uploader.upload_entries(
entries=dataplex_entries,
aws_region=config['aws_region'],
output_folder=config['output_folder']
)
print("Upload complete.")

# Stop the Spark Session
spark.stop()

if __name__ == '__main__':
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"type": "IMPORT",
"import_spec": {
"source_storage_uri": "gs://<YOUR_GCS_BUCKET>/<YOUR_OUTPUT_FOLDER>/",
"entry_sync_mode": "FULL",
"aspect_sync_mode": "INCREMENTAL",
"log_level": "DEBUG",
"scope": {
"entry_groups": ["projects/<YOUR_PROJECT_ID>/locations/<YOUR_LOCATION>/entryGroups/<YOUR_ENTRY_GROUP_ID>"],
"entry_types": [
"projects/<YOUR_PROJECT_ID>/locations/<YOUR_LOCATION>/entryTypes/aws-glue-database",
"projects/<YOUR_PROJECT_ID>/locations/<YOUR_LOCATION>/entryTypes/aws-glue-table",
"projects/<YOUR_PROJECT_ID>/locations/<YOUR_LOCATION>/entryTypes/aws-glue-view"
],
"aspect_types": [
"projects/dataplex-types/locations/global/aspectTypes/schema",
"projects/<YOUR_PROJECT_ID>/locations/<YOUR_LOCATION>/aspectTypes/aws-glue-database",
"projects/<YOUR_PROJECT_ID>/locations/<YOUR_LOCATION>/aspectTypes/aws-glue-table",
"projects/<YOUR_PROJECT_ID>/locations/<YOUR_LOCATION>/aspectTypes/aws-glue-view",
"projects/<YOUR_PROJECT_ID>/locations/<YOUR_LOCATION>/aspectTypes/aws-lineage-aspect"
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
google-cloud-dataplex>=2.4.0
boto3
google-cloud-secret-manager
google-cloud-storage
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#!/bin/bash

# Set variables
PROJECT_ID="your-project-id" # Replace with your Google Cloud project ID
SERVICE_ACCOUNT_EMAIL="your-service-account@your-project-id.iam.gserviceaccount.com" # Replace with your service account email

# Roles to be granted for running Dataplex metadata extract as Dataproc Serveless job
ROLES=(
"roles/dataplex.catalogEditor"
"roles/dataplex.entryGroupOwner"
"roles/dataplex.metadataJobOwner"
"roles/dataproc.admin"
"roles/dataproc.editor"
"roles/dataproc.worker"
"roles/iam.serviceAccountUser"
"roles/logging.logWriter"
"roles/secretmanager.secretAccessor"
"roles/workflows.invoker"
)

# Loop through the roles and grant each one
for ROLE in "${ROLES[@]}"; do
echo "Granting role: $ROLE to service account: $SERVICE_ACCOUNT_EMAIL"

gcloud projects add-iam-policy-binding "$PROJECT_ID" \
--member="serviceAccount:$SERVICE_ACCOUNT_EMAIL" \
--role="$ROLE"

if [[ $? -eq 0 ]]; then
echo "Successfully granted $ROLE"
else
echo "Error granting $ROLE. Check the gcloud command above for details."
exit 1 # Exit script with error if any role grant fails.
fi
done

echo "Finished granting roles."
Binary file not shown.
Loading