A Spark Java application that reads Cost and Usage Report (CUR) files from Google Cloud Storage (GCS) and ingests them into BigQuery tables with advanced tagging capabilities. The application is designed to run both locally and on Kubernetes using the Spark Operator.
This application provides a scalable solution for processing and loading CUR data into BigQuery for further analysis. It supports various file formats (CSV, JSON, Parquet) and includes data transformation capabilities.
- Read CUR files from Google Cloud Storage
- Support for multiple file formats (CSV, JSON, Parquet)
- Data transformation and cleaning
- Data quality checks
- Advanced tagging capabilities with multiple implementation options
- Configurable GCP authentication
- BigQuery table partitioning
- Error handling and logging
- Kubernetes deployment support via Spark Operator
- Java 11+
- Apache Spark 3.x
- Google Cloud SDK
- Maven 3.6+
- Kubernetes cluster (for K8s deployment)
The application provides multiple tagging approaches to categorize and track AWS resources in your CUR data:
-
Simple Tagging (default)
- Adds a
tagsarray column to each CUR row based on matching rules - Tags are stored directly in the CUR data table
- No historical tracking of tag changes
- Adds a
-
SCD Type-2 Tagging
- Implements Slowly Changing Dimension Type-2 pattern for tracking tag history
- Adds columns for effective dates and current status
- Creates new versions of records when tags change
- All history is stored in the CUR data table
-
Direct Tagging with History
- Embeds tags directly in CUR data for efficient querying
- Maintains a separate
cur_resource_tagstable for historical tracking - Uses resource signatures to minimize storage requirements
- Optimized for both query performance and historical analysis
-
None
- Skips tagging completely
Tagging rules are defined in a properties file with the following format:
rule.1.name=EC2Resources
rule.1.field=line_item_product_code
rule.1.operator==
rule.1.value=AmazonEC2
rule.1.tag=Compute
rule.2.name=S3Resources
rule.2.field=line_item_product_code
rule.2.operator==
rule.2.value=AmazonS3
rule.2.tag=StorageSupported operators include:
==(equals)!=(not equals)>(greater than)<(less than)>=(greater than or equal)<=(less than or equal)containsstartsWithendsWithisNullisNotNull
-
Initialization
- Parse command-line arguments
- Load configuration
- Create Spark session with GCP configuration
-
Data Ingestion
- Read CUR file from GCS
- Detect file format (CSV, JSON, Parquet)
- Apply schema if needed
-
Data Transformation
- Clean and normalize data
- Apply data quality checks
- Filter invalid records
-
Tagging
- Apply tagging based on selected mode:
- Simple: Add tags column directly
- SCD: Add tags with effective dates and current status
- Direct: Add tags and maintain separate history table
- None: Skip tagging
- Apply tagging based on selected mode:
-
BigQuery Writing
- Add ingestion timestamp
- Write to BigQuery table
- Write resource tags history to separate table (if using Direct tagging)
- Represents a single tagging rule
- Parses rule definition from properties
- Generates Spark SQL expressions for rule conditions
- Loads rules from configuration file
- Applies rules to CUR data
- Generates tags array column
- Extends tagging with SCD Type-2 functionality
- Adds effective dates and current status
- Handles tag history tracking
- Implements optimized tagging with separate history
- Uses resource signatures to minimize storage
- Maintains history in separate table
- Provides methods for updating tags over time
Run the application with:
spark-submit --class com.example.CURIngestionApp \
cur-java-spark.jar \
<gcs-bucket> \
<cur-file-path> \
<project-id> \
<dataset.table> \
[config-file] \
[rules-file] \
[tagging-mode]Where:
gcs-bucket: GCS bucket containing CUR filescur-file-path: Path to CUR file within the bucketproject-id: GCP project IDdataset.table: BigQuery destination tableconfig-file: (Optional) GCP configuration file (default: gcp-config.properties)rules-file: (Optional) Tagging rules file (default: tagging-rules.properties)tagging-mode: (Optional) One of: none, simple, scd, direct (default: simple)
The application includes comprehensive unit tests for all components:
CURDataTransformerTest: Tests data transformation and quality checksCURIngestionAppTest: Tests file reading and configurationCURFileFormatTest: Tests different file format handlingCURTaggingTest: Tests simple tagging functionalitySCDTaggingTest: Tests SCD Type-2 taggingDirectTaggingTest: Tests direct tagging with history
Run tests with:
mvn test