diff --git a/producer/spark_dataproc/scenarios/biglake_to_iceberg/config.json b/producer/spark_dataproc/scenarios/biglake_to_iceberg/config.json new file mode 100644 index 00000000..dcd6e2bb --- /dev/null +++ b/producer/spark_dataproc/scenarios/biglake_to_iceberg/config.json @@ -0,0 +1,119 @@ +{ + "patterns": [ + "writing_to_big_query.adaptive_spark_plan._spark-bigquery-application_" + ], + "tests": [ + { + "name": "run_event_test", + "path": "events/run_event_test.json", + "tags": { + "facets": [ + "run_event" + ] + } + }, + { + "name": "parent_test", + "path": "events/parent_test.json", + "tags": { + "facets": [ + "parent" + ] + } + }, + { + "name": "spark_properties_test", + "path": "events/spark_properties_test.json", + "tags": { + "facets": [ + "spark_properties" + ] + } + }, + { + "name": "processing_engine_test", + "path": "events/processing_engine_test.json", + "tags": { + "facets": [ + "processing_engine" + ] + } + }, + { + "name": "gcp_dataproc_test", + "path": "events/gcp_dataproc_test.json", + "tags": { + "facets": [ + "gcp_dataproc" + ], + "min_version": "1.24.0" + } + }, + { + "name": "jobType_test", + "path": "events/jobType_test.json", + "tags": { + "facets": [ + "jobType" + ] + } + }, + { + "name": "gcp_lineage_test", + "path": "events/gcp_lineage_test.json", + "tags": { + "facets": [ + "gcp_lineage" + ] + } + }, + { + "name": "dataSource_test", + "path": "events/dataSource_test.json", + "tags": { + "facets": [ + "dataSource" + ] + } + }, + { + "name": "schema_test", + "path": "events/schema_test.json", + "tags": { + "facets": [ + "schema" + ], + "lineage_level": { + "bigquery": [ + "dataset" + ] + } + } + }, + { + "name": "columnLineage_test", + "path": "events/columnLineage_test.json", + "tags": { + "facets": [ + "columnLineage" + ], + "lineage_level": { + "bigquery": [ + "dataset", + "column", + "transformation" + ] + } + } + }, + { + "name": "storage_test", + "path": "events/storage_test.json", + "tags": { + "facets": [ + "storage" + ] + } + } + ] +} diff --git a/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/columnLineage_test.json b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/columnLineage_test.json new file mode 100644 index 00000000..daee39e6 --- /dev/null +++ b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/columnLineage_test.json @@ -0,0 +1,73 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "big_lake_to_iceberg.atomic_replace_table_as_select.e2e_dataset_e2e_another_table" + }, + "outputs": [ + { + "namespace": "gs://open-lineage-e2e", + "name": "data/bigquery_metastore/e2e_dataset.db/e2e_another_table", + "facets": { + "columnLineage": { + "fields": { + "word": { + "inputFields": [ + { + "namespace": "bigquery", + "name": "gcp-open-lineage-testing.e2e_dataset.iceberg_biglake", + "field": "word", + "transformations": [ + { + "type": "DIRECT", + "subtype": "IDENTITY", + "description": "", + + "masking": false + }, + { + "type": "INDIRECT", + "subtype": "GROUP_BY", + "description": "", + "masking": false + } + ] + } + ] + }, + "word_count": { + "inputFields": [ + { + "namespace": "bigquery", + "name": "gcp-open-lineage-testing.e2e_dataset.iceberg_biglake", + "field": "word", + "transformations": [ + { + "type": "INDIRECT", + "subtype": "GROUP_BY", + "description": "", + "masking": false + } + ] + }, + { + "namespace": "bigquery", + "name": "gcp-open-lineage-testing.e2e_dataset.iceberg_biglake", + "field": "word_count", + "transformations": [ + { + "type": "DIRECT", + "subtype": "AGGREGATION", + "description": "", + "masking": false + } + ] + } + ] + } + } + } + } + } + ] +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/dataSource_test.json b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/dataSource_test.json new file mode 100644 index 00000000..d89688aa --- /dev/null +++ b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/dataSource_test.json @@ -0,0 +1,31 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "big_lake_to_iceberg.atomic_replace_table_as_select.e2e_dataset_e2e_another_table" + }, + "inputs": [ + { + "namespace": "bigquery", + "name": "gcp-open-lineage-testing.e2e_dataset.iceberg_biglake", + "facets": { + "dataSource": { + "name": "bigquery", + "uri": "bigquery" + } + } + } + ], + "outputs": [ + { + "namespace": "gs://open-lineage-e2e", + "name": "data/bigquery_metastore/e2e_dataset.db/e2e_another_table", + "facets": { + "dataSource": { + "name": "gs://open-lineage-e2e", + "uri": "gs://open-lineage-e2e" + } + } + } + ] +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/gcp_dataproc_test.json b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/gcp_dataproc_test.json new file mode 100644 index 00000000..c82c265d --- /dev/null +++ b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/gcp_dataproc_test.json @@ -0,0 +1,21 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "big_lake_to_iceberg.atomic_replace_table_as_select.e2e_dataset_e2e_another_table" + }, + "run": { + "facets": { + "gcp_dataproc": { + "clusterUuid": "{{ is_uuid(result) }}", + "jobId": "{{ any(result) }}", + "jobUuid": "{{ is_uuid(result) }}", + "queryNodeName": "append_data", + "appName": "BigQuery to Iceberg with BigQueryMetastoreCatalog", + "clusterName": "{{ match(result, 'dataproc-producer-test-.+') }}", + "appId": "{{ match(result, 'application_.+') }}", + "projectId": "gcp-open-lineage-testing" + } + } + } +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/gcp_lineage_test.json b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/gcp_lineage_test.json new file mode 100644 index 00000000..5879abd7 --- /dev/null +++ b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/gcp_lineage_test.json @@ -0,0 +1,15 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "big_lake_to_iceberg.atomic_replace_table_as_select.e2e_dataset_e2e_another_table", + "facets": { + "gcp_lineage": { + "origin": { + "sourceType": "DATAPROC", + "name": "{{ match(result, 'projects/gcp-open-lineage-testing/regions/us-west1/clusters/dataproc-producer-test-.*') }}" + } + } + } + } +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/jobType_test.json b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/jobType_test.json new file mode 100644 index 00000000..3940144e --- /dev/null +++ b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/jobType_test.json @@ -0,0 +1,14 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "big_lake_to_iceberg.atomic_replace_table_as_select.e2e_dataset_e2e_another_table", + "facets": { + "jobType": { + "processingType": "BATCH", + "integration": "SPARK", + "jobType": "SQL_JOB" + } + } + } +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/outputStatistics_test.json b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/outputStatistics_test.json new file mode 100644 index 00000000..0ae6a26f --- /dev/null +++ b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/outputStatistics_test.json @@ -0,0 +1,20 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "big_lake_to_iceberg.atomic_replace_table_as_select.e2e_dataset_e2e_another_table" + }, + "outputs": [ + { + "namespace": "gs://open-lineage-e2e", + "name": "data/bigquery_metastore/e2e_dataset.db/e2e_another_table", + "outputFacets": { + "outputStatistics": { + "rowCount": "{{ any(result) }}", + "size": "{{ any(result) }}", + "fileCount": "{{ any(result) }}" + } + } + } + ] +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/parent_test.json b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/parent_test.json new file mode 100644 index 00000000..68f4bd71 --- /dev/null +++ b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/parent_test.json @@ -0,0 +1,20 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "big_lake_to_iceberg.atomic_replace_table_as_select.e2e_dataset_e2e_another_table" + }, + "run": { + "facets": { + "parent": { + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "job": { + "namespace": "default", + "name": "big_lake_to_iceberg" + } + } + } + } +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/processing_engine_test.json b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/processing_engine_test.json new file mode 100644 index 00000000..63580707 --- /dev/null +++ b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/processing_engine_test.json @@ -0,0 +1,16 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "big_lake_to_iceberg.atomic_replace_table_as_select.e2e_dataset_e2e_another_table" + }, + "run": { + "facets": { + "processing_engine": { + "version": "3.5.3", + "name": "spark", + "openlineageAdapterVersion": "{{ any(result) }}" + } + } + } +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/run_event_test.json b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/run_event_test.json new file mode 100644 index 00000000..0c3f651b --- /dev/null +++ b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/run_event_test.json @@ -0,0 +1,29 @@ +{ + "eventTime": "{{ is_datetime(result) }}", + "eventType": "COMPLETE", + "run": { + "runId": "{{ is_uuid(result) }}", + "facets": {} + }, + "job": { + "namespace": "default", + "name": "big_lake_to_iceberg.atomic_replace_table_as_select.e2e_dataset_e2e_another_table", + "facets": {} + }, + "inputs": [ + { + "namespace": "bigquery", + "name": "gcp-open-lineage-testing.e2e_dataset.iceberg_biglake", + "facets": {}, + "inputFacets": {} + } + ], + "outputs": [ + { + "namespace": "gs://open-lineage-e2e", + "name": "data/bigquery_metastore/e2e_dataset.db/e2e_another_table", + "facets": {}, + "outputFacets": {} + } + ] +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/schema_test.json b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/schema_test.json new file mode 100644 index 00000000..d8d3db09 --- /dev/null +++ b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/schema_test.json @@ -0,0 +1,59 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "big_lake_to_iceberg.atomic_replace_table_as_select.e2e_dataset_e2e_another_table" + }, + "inputs": [ + { + "namespace": "bigquery", + "name": "gcp-open-lineage-testing.e2e_dataset.iceberg_biglake", + "facets": { + "schema": { + "fields": [ + { + "name": "word", + "type": "string", + "description": "A single unique word (where whitespace is the delimiter) extracted from a corpus." + }, + { + "name": "word_count", + "type": "long", + "description": "The number of times this word appears in this corpus." + }, + { + "name": "corpus", + "type": "string", + "description": "The work from which this word was extracted." + }, + { + "name": "corpus_date", + "type": "long", + "description": "The year in which this corpus was published." + } + ] + } + } + } + ], + "outputs": [ + { + "namespace": "gs://open-lineage-e2e", + "name": "data/bigquery_metastore/e2e_dataset.db/e2e_another_table", + "facets": { + "schema": { + "fields": [ + { + "name": "word", + "type": "string" + }, + { + "name": "word_count", + "type": "long" + } + ] + } + } + } + ] +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/spark_properties_test.json b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/spark_properties_test.json new file mode 100644 index 00000000..35e93fe5 --- /dev/null +++ b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/spark_properties_test.json @@ -0,0 +1,17 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "big_lake_to_iceberg.atomic_replace_table_as_select.e2e_dataset_e2e_another_table" + }, + "run": { + "facets": { + "spark_properties": { + "properties": { + "spark.master": "yarn", + "spark.app.name": "BigQuery to Iceberg with BigQueryMetastoreCatalog" + } + } + } + } +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/sql_test.json b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/sql_test.json new file mode 100644 index 00000000..2973dc2d --- /dev/null +++ b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/sql_test.json @@ -0,0 +1,12 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "big_lake_to_iceberg.atomic_replace_table_as_select.e2e_dataset_e2e_another_table", + "facets": { + "sql": { + "query": "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word" + } + } + } +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/storage_test.json b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/storage_test.json new file mode 100644 index 00000000..44438499 --- /dev/null +++ b/producer/spark_dataproc/scenarios/biglake_to_iceberg/events/storage_test.json @@ -0,0 +1,19 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "big_lake_to_iceberg.atomic_replace_table_as_select.e2e_dataset_e2e_another_table" + }, + "outputs": [ + { + "namespace": "gs://open-lineage-e2e", + "name": "data/bigquery_metastore/e2e_dataset.db/e2e_another_table", + "facets": { + "storage": { + "storageLayer": "iceberg", + "fileFormat": "parquet" + } + } + } + ] +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/biglake_to_iceberg/maintainers.json b/producer/spark_dataproc/scenarios/biglake_to_iceberg/maintainers.json new file mode 100644 index 00000000..d68b35be --- /dev/null +++ b/producer/spark_dataproc/scenarios/biglake_to_iceberg/maintainers.json @@ -0,0 +1,14 @@ +[ + { + "type": "maintainer", + "github-name": "ddebowczyk92", + "email": "dominik.debowczyk@getindata.com", + "link": "" + }, + { + "type": "maintainer", + "github-name": "tnazarew", + "email": "tomasz.nazarewicz@getindata.com", + "link": "" + } +] \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/biglake_to_iceberg/scenario.md b/producer/spark_dataproc/scenarios/biglake_to_iceberg/scenario.md new file mode 100644 index 00000000..ad7c4d97 --- /dev/null +++ b/producer/spark_dataproc/scenarios/biglake_to_iceberg/scenario.md @@ -0,0 +1,25 @@ +# Description + +The scenario involves a Spark job that first reads data from BigQuery Biglake Iceberg table, then queries the same table, and finally writes the results to Iceberg table + +# Entities + +input entity is BigQuery BigLake Iceberg table + +`input_table` + +output entity is Iceberg table + +`output_table` + +# Facets + +Facets present in the events: + +- ColumnLineageDatasetFacet +- DatasourceDatasetFacet +- JobTypeJobFacet +- LifecycleStateChangeDatasetFacet +- ParentRunFacet +- SQLJobFacet +- SchemaDatasetFacet \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/biglake_to_iceberg/test/test.py b/producer/spark_dataproc/scenarios/biglake_to_iceberg/test/test.py new file mode 100644 index 00000000..61434a59 --- /dev/null +++ b/producer/spark_dataproc/scenarios/biglake_to_iceberg/test/test.py @@ -0,0 +1,30 @@ +from pyspark.sql import SparkSession + +spark = ( + SparkSession.builder + .appName("BigLake to Iceberg") + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config("spark.sql.catalog.gcp_iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog.gcp_iceberg_catalog.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") + .config("spark.sql.catalog.gcp_iceberg_catalog.gcp_project", "gcp-open-lineage-testing") + .config("spark.sql.catalog.gcp_iceberg_catalog.gcp_location", "us-west1") + .config("spark.sql.catalog.gcp_iceberg_catalog.blms_catalog", "e2e_blms_catalog") + .config("spark.sql.catalog.gcp_iceberg_catalog.warehouse", f"gs://open-lineage-e2e/data/bigquery_metastore/") + .getOrCreate() + ) + +spark.catalog.setCurrentCatalog("gcp_iceberg_catalog") + +spark.sql(f"CREATE NAMESPACE IF NOT EXISTS e2e_dataset") + +# this is an BigLake Iceberg table +words = spark.read.format('bigquery') \ + .option('table', 'e2e_dataset.iceberg_biglake') \ + .load() + +words.createOrReplaceTempView('words') + +word_count = spark.sql( + 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') + +word_count.write.format("iceberg").mode("overwrite").saveAsTable("e2e_dataset.e2e_another_table") diff --git a/producer/spark_dataproc/scenarios/bigquery_to_iceberg/test/test.py b/producer/spark_dataproc/scenarios/bigquery_to_iceberg/test/test.py index b34b9468..67e8a656 100644 --- a/producer/spark_dataproc/scenarios/bigquery_to_iceberg/test/test.py +++ b/producer/spark_dataproc/scenarios/bigquery_to_iceberg/test/test.py @@ -26,5 +26,4 @@ word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') -# Write as Delta format to GCS word_count.write.format("iceberg").mode("overwrite").saveAsTable("e2e_dataset.e2e_table")