diff --git a/.github/actions/run_event_validation/action.yml b/.github/actions/run_event_validation/action.yml index 965159f1..63633e1f 100644 --- a/.github/actions/run_event_validation/action.yml +++ b/.github/actions/run_event_validation/action.yml @@ -4,10 +4,19 @@ author: Tomasz Nazarewicz inputs: tag: description: "Version of the spec to check against" - required: true - release: + default: "" + required: false + release_tags: + description: "Versions of the spec to check against" + required: false + ol_release: description: "release to run the validation with" - required: true + default: "" + required: false + component_release: + description: "release of the component producing events" + default: "" + required: false target-path: description: "Path to save the report to" required: true @@ -31,26 +40,47 @@ outputs: runs: using: "composite" steps: - - name: Set up Python 3.11 - uses: actions/setup-python@v3 - with: - python-version: "3.11" - - - name: create dir for ol spec and report - id: dir_for_spec + - name: create necessary dirs + id: create_dirs shell: bash run: | - mkdir -p spec + mkdir -p specs + mkdir -p tmp mkdir -p report - - name: get latest OL spec + - name: get OpenlineageCode uses: actions/checkout@v4 with: repository: OpenLineage/OpenLineage - ref: ${{ inputs.tag }} - path: spec - sparse-checkout: | - spec/ + path: tmp + + - name: Get spec for each tag + shell: bash + run: | + cd tmp + IFS=',' read -ra TAGS <<< "${{ inputs.release_tags }}" + for TAG in "${TAGS[@]}"; do + echo "Checking out tag: $TAG" + git fetch --tags --quiet + if git checkout --quiet "$TAG"; then + DEST_DIR="../specs/$TAG" + if [ -d "spec" ]; then + mkdir -p "../specs/$TAG" + find spec -path './website' -prune -o -type f \( -name '*Facet.json' -o -name 'OpenLineage.json' \) -exec cp {} "../specs/$TAG/" \; + echo "success" + else + echo "Spec directory not found in $TAG" + fi + else + echo "Tag $TAG not found!" + fi + done + cd .. + + - name: Set up Python 3.11 + uses: actions/setup-python@v3 + with: + python-version: "3.11" - name: Validate OpenLineage events shell: bash @@ -58,8 +88,9 @@ runs: pip install -r ./scripts/requirements.txt python scripts/validate_ol_events.py \ --event_base_dir=${{ inputs.event-directory }} \ - --spec_dirs=spec/spec/,spec/spec/facets/,spec/spec/registry/gcp/dataproc/facets,spec/spec/registry/gcp/lineage/facets \ + --spec_base_dir=specs \ --target=${{ inputs.target-path }} \ --component="${{ inputs.component }}" \ --producer_dir=${{ inputs.producer-dir }} \ - --release=${{ inputs.release }} \ No newline at end of file + --openlineage_version=${{ inputs.release_tags }} \ + --component_version=${{ inputs.component_release }} \ No newline at end of file diff --git a/.github/workflows/check_scenarios.yml b/.github/workflows/check_scenarios.yml index d9e6924a..3a42bba2 100644 --- a/.github/workflows/check_scenarios.yml +++ b/.github/workflows/check_scenarios.yml @@ -20,6 +20,28 @@ jobs: - name: Checkout code uses: actions/checkout@v4 + - name: get scenarios' OL versions + id: get_versions + run: | + SCENARIO_DIR=./consumer/scenarios + if [ ! -d "$SCENARIO_DIR" ]; then + echo "Scenario directory '$SCENARIO_DIR' not found!" + exit 1 + fi + + VERSIONS=() + for scenario in "$SCENARIO_DIR"/*; do + if [ -d "$scenario" ] && [ -f "$scenario/config.json" ]; then + VERSION=$(jq -r '.openlineage_version' "$scenario/config.json" 2>/dev/null) + if [ -n "$VERSION" ] && [ "$VERSION" != "null" ]; then + VERSIONS+=("$VERSION") + fi + fi + done + + UNIQUE_VERSIONS=($(printf "%s\n" "${VERSIONS[@]}" | sort -u)) + echo "versions=$(IFS=,; echo "${UNIQUE_VERSIONS[*]}")" >> $GITHUB_OUTPUT + - name: Create report.json directory run: mkdir -p reports/ @@ -27,8 +49,7 @@ jobs: uses: ./.github/actions/run_event_validation with: component: 'scenarios' - tag: ${{ inputs.get-latest-snapshots == 'true' && 'main' || inputs.release }} - release: ${{ inputs.release }} + release_tags: ${{ steps.get_versions.outputs.versions }} target-path: 'reports/scenarios-report.json' event-directory: './consumer/scenarios' producer-dir: './consumer' diff --git a/.github/workflows/collect_and_compare_reports.yml b/.github/workflows/collect_and_compare_reports.yml index 8c819d44..bbb07e22 100644 --- a/.github/workflows/collect_and_compare_reports.yml +++ b/.github/workflows/collect_and_compare_reports.yml @@ -13,8 +13,8 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 - with: - ref: refs/heads/main +# with: +# ref: refs/heads/main - uses: actions/upload-artifact@v4 with: diff --git a/.github/workflows/consumer_dataplex.yml b/.github/workflows/consumer_dataplex.yml index c28b7112..f5cdb4c5 100644 --- a/.github/workflows/consumer_dataplex.yml +++ b/.github/workflows/consumer_dataplex.yml @@ -42,7 +42,8 @@ jobs: --consumer_dir consumer/consumers/dataplex \ --scenario_dir consumer/scenarios/ \ --parent projects/gcp-open-lineage-testing/locations/us \ - --release ${{ inputs.release }} + --release ${{ inputs.release }} \ + --target dataplex-report.json - uses: actions/upload-artifact@v4 with: diff --git a/.github/workflows/main_new_release.yml b/.github/workflows/main_new_release.yml index 8485a027..74866b7d 100644 --- a/.github/workflows/main_new_release.yml +++ b/.github/workflows/main_new_release.yml @@ -44,28 +44,7 @@ jobs: # also normally new release of OL should trigger all producer tests but for now they are run anyway so no need to trigger - name: Select components to run id: select-components - run: | - # assuming the version will not exceed 1000 this is the quickest way to get comparable values - version_sum() { - IFS='.' read -r var1 var2 var3 <<< "$1" - echo $(( var1 * 1000000 + var2 * 1000 )) - } - - current_ol=$(cat generated-files/releases.json | jq -c '.[] | select(.name | contains("openlineage")) | .latest_version ' -r) - latest_ol=$(curl https://api.github.com/repos/OpenLineage/OpenLineage/releases/latest -s | jq .tag_name -r) - - sum1=$(version_sum "$latest_ol") - sum2=$(version_sum "$current_ol") - - if (( $(version_sum $latest_ol) > $(version_sum $current_ol) )); then - echo "ol_release=${latest_ol}" >> $GITHUB_OUTPUT - echo "releases_updated=true" >> $GITHUB_OUTPUT - jq --arg latest_ol "$latest_ol" 'map(if .name == "openlineage" then .latest_version = $latest_ol else . end)' \ - generated-files/releases.json > generated-files/updated-releases.json - else - echo "ol_release=${current_ol}" >> $GITHUB_OUTPUT - fi - + run: ./scripts/select_components.sh - uses: actions/upload-artifact@v4 if: steps.select-components.outputs.releases_updated == 'true' with: diff --git a/.github/workflows/main_pr.yml b/.github/workflows/main_pr.yml index 5528e0e2..7ce35304 100644 --- a/.github/workflows/main_pr.yml +++ b/.github/workflows/main_pr.yml @@ -20,6 +20,7 @@ jobs: run_spark_dataproc: ${{ steps.get-changed.outputs.spark_dataproc_changed }} ol_release: ${{ steps.get-release.outputs.openlineage_release }} any_run: ${{ steps.get-changed.outputs.any_changed }} + test_matrix: ${{ steps.set-matrix-values.outputs.spark_dataproc_matrix }} steps: - name: Checkout code uses: actions/checkout@v4 @@ -64,7 +65,24 @@ jobs: run: | echo " any changed value is ${{ steps.get-changed.outputs.any_changed }}" openlineage_release=$(cat generated-files/releases.json | jq -c '.[] | select(.name | contains("openlineage")) | .latest_version ' -r) - echo "openlineage_release=${openlineage_release}" >> $GITHUB_OUTPUT + echo "openlineage_release=${openlineage_release}" >> $GITHUB_OUTPUT + - name: set-matrix-values + id: set-matrix-values + run: | + check_producer() { + local producer="$1" + local file="./producer/${producer}/versions.json" + + if [[ -f "$file" ]]; then + cat "$file" | jq -c + else + echo "Error: File '$file' does not exist." >&2 + return 1 + fi + } + echo "spark_dataproc_matrix=$(check_producer spark_dataproc)" >> $GITHUB_OUTPUT + + # echo "myoutput=$(jq -cn --argjson environments "$TARGETS" '{target: $environments}')" >> $GITHUB_OUTPUT ######## COMPONENT VALIDATION ######## @@ -87,17 +105,20 @@ jobs: with: release: ${{ needs.initialize_workflow.outputs.ol_release }} - spark_dataproc: - needs: initialize_workflow - if: ${{ needs.initialize_workflow.outputs.run_spark_dataproc == 'true' }} - uses: ./.github/workflows/producer_spark_dataproc.yml - secrets: - gcpKey: ${{ secrets.GCP_SA_KEY }} - postgresqlUser: ${{ secrets.POSTGRESQL_USER }} - postgresqlPassword: ${{ secrets.POSTGRESQL_PASSWORD }} - with: - release: ${{ needs.initialize_workflow.outputs.ol_release }} - get-latest-snapshots: 'false' +# spark_dataproc: +# needs: initialize_workflow +# if: ${{ needs.initialize_workflow.outputs.run_spark_dataproc == 'true' }} +# uses: ./.github/workflows/producer_spark_dataproc.yml +# strategy: +# matrix: ${{ fromJson(needs.initialize_workflow.outputs.test_matrix) }} +# secrets: +# gcpKey: ${{ secrets.GCP_SA_KEY }} +# postgresqlUser: ${{ secrets.POSTGRESQL_USER }} +# postgresqlPassword: ${{ secrets.POSTGRESQL_PASSWORD }} +# with: +# ol_release: ${{ matrix.openlineage_versions }} +# spark_release: ${{ matrix.component_version }} +# get-latest-snapshots: 'false' ######## COLLECTION OF REPORTS AND EXECUTE APPROPRIATE ACTIONS ######## @@ -106,7 +127,7 @@ jobs: - initialize_workflow - scenarios - dataplex - - spark_dataproc +# - spark_dataproc if: ${{ !failure() && needs.initialize_workflow.outputs.any_run == 'true'}} uses: ./.github/workflows/collect_and_compare_reports.yml with: diff --git a/.github/workflows/producer_spark_dataproc.yml b/.github/workflows/producer_spark_dataproc.yml index f826648b..c3cd69d4 100644 --- a/.github/workflows/producer_spark_dataproc.yml +++ b/.github/workflows/producer_spark_dataproc.yml @@ -10,7 +10,10 @@ on: postgresqlPassword: required: true inputs: - release: + spark_release: + description: "release of spark dataproc to use" + type: string + ol_release: description: "release tag of OpenLineage to use" type: string get-latest-snapshots: @@ -24,37 +27,80 @@ jobs: - name: Checkout code uses: actions/checkout@v4 + - name: initialize tests + id: init + run: | + scenarios=$(./scripts/get_valid_test_scenarios.sh "producer/spark_dataproc/scenarios/" ${{ inputs.spark_release }} ${{ inputs.ol_release }} ) + [[ "$scenarios" == "" ]] || echo "scenarios=$scenarios" >> $GITHUB_OUTPUT + echo "openlineage_cluster_suffix=$(echo '${{ inputs.ol_release }}' | sed 's/\.//g')" >> $GITHUB_OUTPUT + echo "component_cluster_suffix=$(echo '${{ inputs.spark_release }}' | sed 's/\.//g')" >> $GITHUB_OUTPUT + case "${{ inputs.spark_release }}" in + "3.5.1") + echo "spark_short=3.5" >> $GITHUB_OUTPUT + echo "dataproc_version=2.2-ubuntu22" >> $GITHUB_OUTPUT + ;; + "3.3.2") + echo "spark_short=3.3" >> $GITHUB_OUTPUT + echo "dataproc_version=2.1-ubuntu20" >> $GITHUB_OUTPUT + ;; + "3.1.3") + echo "spark_short=3.1" >> $GITHUB_OUTPUT + echo "dataproc_version=2.0-ubuntu18" >> $GITHUB_OUTPUT + ;; + *) + echo "spark version ${{ inputs.spark_release }} not supported" + exit 1 + ;; + esac + + - name: GCP authorization id: gcp-auth + if: ${{ steps.init.outputs.scenarios }} uses: 'google-github-actions/auth@v2' with: credentials_json: '${{ secrets.gcpKey }}' - name: Get OL artifacts id: get-ol-artifacts + if: ${{ steps.init.outputs.scenarios }} uses: ./.github/actions/get_openlineage_artifacts with: - version: ${{ inputs.release }} + version: ${{ inputs.ol_release }} get-latest-snapshots: ${{ inputs.get-latest-snapshots }} skip-flink: 'true' skip-s3: 'true' skip-gcp-lineage: 'true' skip-sql: 'true' - # This should be somehow automated and return the newest version -# - name: Download Big Query Spark Maven Artifact -# id: download-spark-bq-connector +# This should be somehow automated and return the newest version + - name: Download Big Query Spark Maven Artifact + id: download-spark-bq-connector + if: ${{ steps.init.outputs.scenarios }} + uses: clausnz/github-action-download-maven-artifact@master + with: + url: 'https://repo1.maven.org' + repository: 'maven2' + groupId: 'com.google.cloud.spark' + artifactId: 'spark-${{ steps.init.outputs.spark_short }}-bigquery' + version: '0.42.0' + extension: 'jar' + +# - name: Download Spark BigTable Maven Artifact +# id: download-spark-bigtable-connector +# if: ${{ steps.init.outputs.scenarios }} # uses: clausnz/github-action-download-maven-artifact@master # with: # url: 'https://repo1.maven.org' # repository: 'maven2' -# groupId: 'com.google.cloud.spark' -# artifactId: 'spark-3.5-bigquery' -# version: '0.42.0' +# groupId: 'com.google.cloud.spark.bigtable' +# artifactId: 'spark-bigtable_2.12' +# version: '0.4.0' # extension: 'jar' - name: Upload openlineage spark integration to GCS id: upload-spark-integration + if: ${{ steps.init.outputs.scenarios }} uses: ./.github/actions/upload_artifacts with: local-file-path: ${{ steps.get-ol-artifacts.outputs.spark }} @@ -63,28 +109,40 @@ jobs: - name: Upload openlineage GCS transport to GCS id: upload-gcs-transport + if: ${{ steps.init.outputs.scenarios }} uses: ./.github/actions/upload_artifacts with: local-file-path: ${{ steps.get-ol-artifacts.outputs.gcs }} gcs-path: "gs://open-lineage-e2e/jars" credentials: ${{ steps.gcp-auth.outputs.credentials_file_path }} -# - name: Upload Spark BigQuery connector to GCS -# id: upload-spark-bq-connector + - name: Upload Spark BigQuery connector to GCS + id: upload-spark-bq-connector + if: ${{ steps.init.outputs.scenarios }} + uses: ./.github/actions/upload_artifacts + with: + local-file-path: ${{ steps.download-spark-bq-connector.outputs.file }} + gcs-path: "gs://open-lineage-e2e/jars" + credentials: ${{ steps.gcp-auth.outputs.credentials_file_path }} + +# - name: Upload Spark BigTable connector to GCS +# id: upload-spark-bigtable-connector +# if: ${{ steps.init.outputs.scenarios }} # uses: ./.github/actions/upload_artifacts # with: -# local-file-path: ${{ steps.download-spark-bq-connector.outputs.file }} +# local-file-path: ${{ steps.download-spark-bigtable-connector.outputs.file }} # gcs-path: "gs://open-lineage-e2e/jars" # credentials: ${{ steps.gcp-auth.outputs.credentials_file_path }} - name: Upload initialization actions to GCS id: upload-initialization-actions + if: ${{ steps.init.outputs.scenarios }} uses: ./.github/actions/upload_artifacts with: local-file-path: producer/spark_dataproc/runner/get_openlineage_jar.sh gcs-path: "gs://open-lineage-e2e/scripts" credentials: ${{ steps.gcp-auth.outputs.credentials_file_path }} - + - name: Upload CloudSQL init actions to GCS id: upload-cloud-sql-initialization-actions uses: ./.github/actions/upload_artifacts @@ -94,78 +152,126 @@ jobs: credentials: ${{ steps.gcp-auth.outputs.credentials_file_path }} - name: Set up Python 3.11 + if: ${{ steps.init.outputs.scenarios }} uses: actions/setup-python@v3 with: cache: "pip" python-version: "3.11" - name: Install dependencies + if: ${{ steps.init.outputs.scenarios }} run: | python -m pip install --upgrade pip pip install flake8 pytest if [ -f producer/spark_dataproc/runner/requirements.txt ]; then pip install -r producer/spark_dataproc/runner/requirements.txt; fi - name: Start producer + id: start-producer + if: ${{ steps.init.outputs.scenarios }} run: | + metadata='' + metadata+='SPARK_BQ_CONNECTOR_URL=${{steps.upload-spark-bq-connector.outputs.uploaded-file}}' + metadata+=',OPENLINEAGE_SPARK_URL=${{ steps.upload-spark-integration.outputs.uploaded-file }}' + metadata+=',SPARK_SPANNER_CONNECTOR_URL=gs://spark-lib/spanner/spark-3.1-spanner-1.1.0.jar' + metadata+=',SPARK_BIGTABLE_CONNECTOR_URL=gs://open-lineage-e2e/jars/spark-bigtable_2.12-0.5.0-SNAPSHOT.jar' + metadata+=',enable-cloud-sql-hive-metastore=false' + metadata+=',additional-cloud-sql-instances=gcp-open-lineage-testing:us-central1:open-lineage-e2e=tcp:3307' + python producer/spark_dataproc/runner/dataproc_workflow.py create-cluster \ --project-id gcp-open-lineage-testing \ --region us-west1 \ - --cluster-name dataproc-producer-test-${{ github.run_id }} \ + --dataproc-image-version ${{ steps.init.outputs.dataproc_version }} \ + --cluster-name "dataproc-producer-test-${{steps.init.outputs.component_cluster_suffix}}-${{ steps.init.outputs.openlineage_cluster_suffix }}-${{ github.run_id }}" \ --credentials-file ${{ steps.gcp-auth.outputs.credentials_file_path }} \ - --metadata 'SPARK_BQ_CONNECTOR_URL=gs://open-lineage-e2e/jars/spark-3.5-bigquery-0.0.1-SNAPSHOT.jar,OPENLINEAGE_SPARK_URL=${{ steps.upload-spark-integration.outputs.uploaded-file }},SPARK_SPANNER_CONNECTOR_URL=gs://open-lineage-e2e/jars/spark-3.1-spanner-1.1.0.jar,SPARK_BIGTABLE_CONNECTOR_URL=gs://open-lineage-e2e/jars/spark-bigtable_2.12-0.3.0.jar,enable-cloud-sql-hive-metastore=false,additional-cloud-sql-instances=gcp-open-lineage-testing:us-central1:open-lineage-e2e=tcp:3307' \ + --metadata "$metadata" \ --initialization-actions="${{ steps.upload-initialization-actions.outputs.uploaded-file }},${{ steps.upload-cloud-sql-initialization-actions.outputs.uploaded-file }}" -# --metadata "SPARK_BQ_CONNECTOR_URL=${{ steps.upload-spark-bq-connector.outputs.uploaded-file }},OPENLINEAGE_SPARK_URL=${{ steps.upload-spark-integration.outputs.uploaded-file }}" \ - name: Set producer output event dir + if: ${{ steps.init.outputs.scenarios }} id: set-producer-output run: | echo "event_dir=/tmp/producer-$(date +%s%3N)" >> $GITHUB_OUTPUT - name: Run producer jobs and create OL events + if: ${{ steps.init.outputs.scenarios }} id: run-producer + continue-on-error: true run: | - for scenario_path in producer/spark_dataproc/scenarios/* + set -e + IFS=';' read -ra scenarios <<< "${{ steps.init.outputs.scenarios }}" + + properties='' + properties+='spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener' + properties+=',spark.sql.warehouse.dir=/tmp/warehouse' + properties+=',spark.openlineage.transport.type=gcs' + properties+=',spark.driver.POSTGRESQL_USER=${{ secrets.postgresqlUser }}' + properties+=',spark.driver.POSTGRESQL_PASSWORD=${{ secrets.postgresqlPassword }}' + properties+=',spark.scenario.suffix=${{steps.init.outputs.component_cluster_suffix}}-${{steps.init.outputs.openlineage_cluster_suffix}}' + + for scenario in "${scenarios[@]}" do - scenario="${scenario_path##*/}" - run_script=$(find "$scenario_path/test/" -maxdepth 1 -type f -name "*.py" | head -n 1) + echo "Getting script for scenario $scenario" + run_script=$(find "producer/spark_dataproc/scenarios/$scenario/test/" -maxdepth 1 -type f -name "*.py" | head -n 1) echo "Running spark job for scenario: $scenario" - python producer/spark_dataproc/runner/dataproc_workflow.py run-job \ + if ! python producer/spark_dataproc/runner/dataproc_workflow.py run-job \ --project-id gcp-open-lineage-testing \ --region us-west1 \ - --cluster-name "dataproc-producer-test-${{ github.run_id }}" \ + --cluster-name "dataproc-producer-test-${{steps.init.outputs.component_cluster_suffix}}-${{steps.init.outputs.openlineage_cluster_suffix}}-${{ github.run_id }}" \ --gcs-bucket open-lineage-e2e \ --python-job "$run_script" \ --jars "${{ steps.upload-gcs-transport.outputs.uploaded-file }}" \ - --spark-properties 'spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener,spark.sql.warehouse.dir=/tmp/warehouse,spark.openlineage.transport.type=gcs,spark.driver.POSTGRESQL_USER=${{ secrets.postgresqlUser }},spark.driver.POSTGRESQL_PASSWORD=${{ secrets.postgresqlPassword }}' \ + --spark-properties "$properties" \ --output-directory "${{ steps.set-producer-output.outputs.event_dir }}/$scenario" \ --credentials-file "${{ steps.gcp-auth.outputs.credentials_file_path }}" \ - --dataproc-image-version 2.2-ubuntu22 \ - || { echo "Error: Spark job failed for scenario: $scenario"; } + --dataproc-image-version ${{ steps.init.outputs.dataproc_version }} + then + echo "Error: Spark job failed for scenario: $scenario" + exit 1 + fi echo "Finished running spark job for scenario: $scenario" done + + echo "Finished running all scenarios" - name: Terminate producer cluster + if: ${{ always() && steps.init.outputs.scenarios }} run: | - python producer/spark_dataproc/runner/dataproc_workflow.py terminate-cluster \ - --project-id gcp-open-lineage-testing \ - --region us-west1 \ - --cluster-name dataproc-producer-test-${{ github.run_id }} \ - --credentials-file ${{ steps.gcp-auth.outputs.credentials_file_path }} + if gcloud dataproc clusters list --region us-west1 | grep -q "dataproc-producer-test-${{steps.init.outputs.component_cluster_suffix}}-${{steps.init.outputs.openlineage_cluster_suffix}}-${{ github.run_id }}" + then + python producer/spark_dataproc/runner/dataproc_workflow.py terminate-cluster \ + --project-id gcp-open-lineage-testing \ + --region us-west1 \ + --cluster-name "dataproc-producer-test-${{steps.init.outputs.component_cluster_suffix}}-${{steps.init.outputs.openlineage_cluster_suffix}}-${{ github.run_id }}" \ + --credentials-file ${{ steps.gcp-auth.outputs.credentials_file_path }} + else + echo "Cluster does not exist" + fi + + - name: Fail if spark jobs failed + if: ${{ steps.init.outputs.scenarios && steps.run-producer.outcome == 'failure' }} + run: | + echo "step 'Run producer jobs and create OL events' has ended with failure but to terminate the cluster we needed to continue on error but fail the job after cluster termination." + echo "This will become redundant if starting the cluster is extracted to custom action with post actions defined." + echo "For now composite actions do not support post actions." + exit 1 - name: Validation + if: ${{ steps.init.outputs.scenarios }} uses: ./.github/actions/run_event_validation with: component: 'spark_dataproc' - tag: ${{ inputs.get-latest-snapshots == 'true' && 'main' || inputs.release }} - release: ${{ inputs.release }} + release_tags: ${{ inputs.get-latest-snapshots == 'true' && 'main' || inputs.ol_release }} + ol_release: ${{ inputs.ol_release }} + component_release: ${{ inputs.spark_release }} event-directory: ${{ steps.set-producer-output.outputs.event_dir }} - target-path: 'spark-dataproc-report.json' + target-path: 'spark-dataproc-${{inputs.spark_release}}-${{inputs.ol_release}}-report.json' - uses: actions/upload-artifact@v4 + if: ${{ steps.init.outputs.scenarios }} with: - name: spark-dataproc-report - path: spark-dataproc-report.json + name: spark-dataproc-${{inputs.spark_release}}-${{inputs.ol_release}}-report + path: spark-dataproc-${{inputs.spark_release}}-${{inputs.ol_release}}-report.json retention-days: 1 diff --git a/consumer/consumers/dataplex/run_dataplex_tests.sh b/consumer/consumers/dataplex/run_dataplex_tests.sh old mode 100644 new mode 100755 index f08d40e2..409811f8 --- a/consumer/consumers/dataplex/run_dataplex_tests.sh +++ b/consumer/consumers/dataplex/run_dataplex_tests.sh @@ -53,4 +53,5 @@ python consumer/consumers/dataplex/validator/validator.py \ --credentials "$GCP_CREDENTIALS_JSON_PATH" \ --consumer_dir "$CONSUMER_DIR" \ --scenario_dir "$SCENARIO_DIR" \ - --parent "$GCP_PARENT" ${DUMP_API_STATE} \ No newline at end of file + --parent "$GCP_PARENT" ${DUMP_API_STATE} \ + --target "dataplex_report.json" \ No newline at end of file diff --git a/consumer/consumers/dataplex/scenarios/simple_run_event/config.json b/consumer/consumers/dataplex/scenarios/simple_run_event/config.json index f10088c5..10558044 100644 --- a/consumer/consumers/dataplex/scenarios/simple_run_event/config.json +++ b/consumer/consumers/dataplex/scenarios/simple_run_event/config.json @@ -5,9 +5,7 @@ "path": "validation/processes/processes.json", "entity": "process", "tags": { - "facets": ["run_event"], - "min_version": "1.22.0", - "producer": "spark" + "facets": ["run_event"] } }, { @@ -15,9 +13,7 @@ "path": "validation/runs/runs.json", "entity": "run", "tags": { - "facets": ["run_event"], - "min_version": "1.22.0", - "producer": "spark" + "facets": ["run_event"] } }, { @@ -25,9 +21,7 @@ "path": "validation/lineage_events/lineage_events.json", "entity": "lineage_event", "tags": { - "facets": ["run_event"], - "min_version": "1.22.0", - "producer": "spark" + "facets": ["run_event"] } }, { @@ -35,9 +29,7 @@ "path": "validation/links/links.json", "entity": "link", "tags": { - "facets": ["run_event"], - "min_version": "1.22.0", - "producer": "spark" + "facets": ["run_event"] } } ] diff --git a/consumer/consumers/dataplex/scenarios/spark_dataproc_simple_producer_test/config.json b/consumer/consumers/dataplex/scenarios/spark_dataproc_simple_producer_test/config.json index 64603f2d..40059ebf 100644 --- a/consumer/consumers/dataplex/scenarios/spark_dataproc_simple_producer_test/config.json +++ b/consumer/consumers/dataplex/scenarios/spark_dataproc_simple_producer_test/config.json @@ -5,9 +5,7 @@ "path": "validation/processes/processes.json", "entity": "process", "tags": { - "facets": ["run_event"], - "min_version": "1.22.0", - "producer": "spark" + "facets": ["run_event"] } }, { @@ -15,9 +13,7 @@ "path": "validation/runs/runs.json", "entity": "run", "tags": { - "facets": ["run_event", "processing_engine"], - "min_version": "1.22.0", - "producer": "spark" + "facets": ["run_event", "processing_engine"] } }, { @@ -25,9 +21,7 @@ "path": "validation/lineage_events/lineage_events.json", "entity": "lineage_event", "tags": { - "facets": ["run_event"], - "min_version": "1.22.0", - "producer": "spark" + "facets": ["run_event"] } }, { @@ -35,9 +29,7 @@ "path": "validation/links/links.json", "entity": "link", "tags": { - "facets": ["run_event"], - "min_version": "1.22.0", - "producer": "airflow" + "facets": ["run_event"] } } ] diff --git a/consumer/consumers/dataplex/validator/validator.py b/consumer/consumers/dataplex/validator/validator.py index b85fd92d..cc7fb6a5 100644 --- a/consumer/consumers/dataplex/validator/validator.py +++ b/consumer/consumers/dataplex/validator/validator.py @@ -2,6 +2,7 @@ import json import os import time +from pathlib import Path from os.path import join from proto import Message from google.api_core.exceptions import InvalidArgument @@ -9,32 +10,50 @@ from google.cloud.datacatalog_lineage_v1 import LineageClient, SearchLinksRequest from google.protobuf.json_format import ParseDict from google.protobuf import struct_pb2 -from compare_releases import release_between from compare_events import diff +from report import Report, Component, Scenario, Test -class Validator: +class EntityHandler: + def __init__(self, client=None, consumer_dir=None, scenario_dir=None, parent=None, release=None): self.client = client - self.consumer_dir = consumer_dir - self.scenario_dir = scenario_dir + self.consumer_dir = Path(consumer_dir) + self.scenario_dir = Path(scenario_dir) self.parent = parent self.release = release def load_ol_events(self, scenario): - return [{'name': entry.name, 'payload': ParseDict(json.load(open(entry.path, 'r')), struct_pb2.Struct())} - for entry in os.scandir(f"{self.scenario_dir}/{scenario}/events") if entry.is_file()] + scenario_path = self.scenario_dir / scenario / "events" + entity = [{'name': entry.name, 'payload': self.get_payload(entry)} for entry in scenario_path.iterdir() if + entry.is_file()] + return sorted(entity, key=lambda d: d['name']) + + def get_payload(self, entry): + return ParseDict(json.loads(entry.read_text()), struct_pb2.Struct()) + + def send_ol_events(self, scenario): + events = self.load_ol_events(scenario) + report = [] + for e in events: + try: + response = self.client.process_open_lineage_run_event(parent=self.parent, open_lineage=e['payload']) + report.append((e['name'], [])) + time.sleep(0.1) + except InvalidArgument as exc: + report.append((e['name'], exc.args[0])) + time.sleep(2) + return report def load_validation_events(self, scenario, config): d = {} scenario_dir = join(self.consumer_dir, "scenarios", scenario) for e in config['tests']: - if release_between(self.release, e['tags'].get('min_version'), e['tags'].get('max_version')): - name = e['name'] - path = e['path'] - entity = e['entity'] - tags = e['tags'] - d[name] = {'body': json.load(open(join(scenario_dir, path), 'r')), 'entity': entity, 'tags': tags} + name = e['name'] + path = e['path'] + entity = e['entity'] + tags = e['tags'] + d[name] = {'body': json.load(open(join(scenario_dir, path), 'r')), 'entity': entity, 'tags': tags} processes = {k: v for k, v in d.items() if v['entity'] == "process"} runs = {k: v for k, v in d.items() if v['entity'] == "run"} @@ -43,61 +62,19 @@ def load_validation_events(self, scenario, config): return processes, runs, lineage_events, links def dump_api_state(self, scenario): - dump_dir = join(self.consumer_dir, "scenarios", scenario, "api_state") - processes_state, runs_state, events_state, links_state = self.get_api_state() - try: - os.mkdir(dump_dir) - except FileExistsError: - pass - except PermissionError: - print(f"Permission denied: Unable to create '{dump_dir}'.") - except Exception as e: - print(f"An error occurred: {e}") - - with open(join(dump_dir, "processes.json"), 'w') as f: - json.dump(processes_state, f, indent=2) - with open(join(dump_dir, "runs.json"), 'w') as f: - json.dump(runs_state, f, indent=2) - with open(join(dump_dir, "lineage_events.json"), 'w') as f: - json.dump(events_state, f, indent=2) - with open(join(dump_dir, "links.json"), 'w') as f: - json.dump(links_state, f, indent=2) + dump_directory = self.consumer_dir / "scenarios" / scenario / "api_state" + dump_directory.mkdir(parents=True, exist_ok=True) - def send_ol_events(self, scenario): - events = sorted(self.load_ol_events(scenario), key=lambda d: d['name']) - report = [] - for e in events: - try: - response = self.client.process_open_lineage_run_event(parent=self.parent, open_lineage=e['payload']) - report.append( - {"status": "SUCCESS", 'validation_type': 'syntax', 'name': e['name'], 'entity_type': 'openlineage', - 'tags': {}}) - time.sleep(1) - except InvalidArgument as exc: - report.append( - {"status": "FAILURE", 'validation_type': 'syntax', "details": exc.args[0], 'name': e['name'], - 'entity_type': 'openlineage', 'tags': {}}) - time.sleep(2) - return report + processes_state, runs_state, events_state, links_state = self.get_api_state() - def read_config(self, scenario): - with open(join(self.consumer_dir, 'scenarios', scenario, "config.json"), 'r') as f: - return json.load(f) + self.write_entity_to_file(processes_state, dump_directory, "processes.json") + self.write_entity_to_file(runs_state, dump_directory, "runs.json") + self.write_entity_to_file(events_state, dump_directory, "lineage_events.json") + self.write_entity_to_file(links_state, dump_directory, "links.json") - def validate(self, scenario, dump): - config = self.read_config(scenario) - self.clean_up() - report = self.send_ol_events(scenario) - if not any(r['status'] == "FAILURE" for r in report): - if dump: - self.dump_api_state(scenario) - else: - report.extend(self.validate_api_state(scenario, config)) - - self.clean_up() - return {"name": scenario, - "status": 'FAILURE' if any(r['status'] == "FAILURE" for r in report) else 'SUCCESS', - "tests": report} + def write_entity_to_file(self, processes_state, dump_directory, name): + with open(dump_directory / name, 'w') as f: + json.dump(processes_state, f, indent=2) def get_api_state(self): processes = [Message.to_dict(p) for p in self.client.list_processes(parent=self.parent)] @@ -112,23 +89,42 @@ def get_links(self, link): return self.client.search_links( request=SearchLinksRequest(source=link["source"], target=link["target"], parent=self.parent)) - def validate_api_state(self, scenario, config): - processes_expected, runs_expected, events_expected, links_expected = self.load_validation_events(scenario, - config) - processes_state, runs_state, events_state, links = self.get_api_state() - report = [] - report.extend(self.compare_by_name(processes_expected, processes_state, 'process')) - report.extend(self.compare_by_name(runs_expected, runs_state, 'run')) - report.extend(self.compare_by_start_time(events_expected, events_state, 'lineage_event')) - report.extend(self.compare_by_start_time(links_expected, links, 'link')) - - return report - def clean_up(self): processes = [x for x in self.client.list_processes(parent=self.parent)] for p in processes: self.client.delete_process(name=p.name) + def read_consumer_config(self, scenario_name): + return self.read_config(self.consumer_dir / 'scenarios', scenario_name) + + def read_scenario_config(self, scenario_name): + return self.read_config(self.scenario_dir, scenario_name) + + def read_config(self, directory, scenario): + path = directory / scenario / "config.json" + with open(path, 'r') as f: + return json.load(f) + + +class Validator: + @staticmethod + def validate_syntax(config, errors, scenario_name): + errors_ = [Test.simplified(name, 'syntax', 'openlineage', details, config) for name, details in + errors] + scenario = Scenario.simplified(scenario_name, errors_) + return scenario + + @staticmethod + def validate_api_state(scenario, api_state, validation_events): + processes_expected, runs_expected, events_expected, links_expected = validation_events + processes_state, runs_state, events_state, links = api_state + tests = [] + tests.extend(Validator.compare_by_name(processes_expected, processes_state, 'process')) + tests.extend(Validator.compare_by_name(runs_expected, runs_state, 'run')) + tests.extend(Validator.compare_by_start_time(events_expected, events_state, 'lineage_event')) + tests.extend(Validator.compare_by_start_time(links_expected, links, 'link')) + return Scenario.simplified(scenario, tests) + @staticmethod def compare_by_name(expected, result, entity_type): results = [] @@ -142,8 +138,7 @@ def compare_by_name(expected, result, entity_type): details.extend([f"{entity_type} {entity_name}, {r}" for r in res]) else: details.append(f"{entity_type} {entity_name}, no matching entity") - results.append({'entity_type': entity_type, 'status': 'SUCCESS' if len(details) == 0 else 'FAILURE', - 'details': details, 'validation_type': 'semantics', 'name': k, 'tags': v['tags']}) + results.append(Test.simplified(entity_type, 'semantics', entity_type, details, v['tags'])) return results @staticmethod @@ -159,14 +154,9 @@ def compare_by_start_time(expected, result, entity_type): else: diffs = [f"event {exp['start_time']}, no matching entity"] details.extend(diffs) - results.append({'entity_type': entity_type, 'status': 'SUCCESS' if len(details) == 0 else 'FAILURE', - 'details': details, 'validation_type': 'semantics', 'name': k, 'tags': v['tags']}) + results.append(Test.simplified(entity_type, 'semantics', entity_type, details, v['tags'])) return results - def __repr__(self): - return (f"MyClass(client={self.client}, consumer_dir={self.consumer_dir}, " - f"scenario_dir={self.scenario_dir}, parent={self.parent})") - def list_scenarios(consumer_dir): return [entry.name for entry in os.scandir(f"{consumer_dir}/scenarios") if entry.is_dir()] @@ -180,6 +170,7 @@ def get_arguments(): parser.add_argument('--parent', type=str, help="Parent identifier") parser.add_argument('--release', type=str, help="OpenLineage release used in generating events") parser.add_argument("--dump", action='store_true', help="dump api state") + parser.add_argument("--target", type=str, help="target location") args = parser.parse_args() @@ -190,18 +181,47 @@ def get_arguments(): parent = args.parent release = args.release dump = args.dump + target = args.target + + return consumer_dir, scenario_dir, parent, client, release, dump, target - return consumer_dir, scenario_dir, parent, client, release, dump + +def process_scenario(validator, handler, scenario_name, dump): + print(f"Processing scenario {scenario_name}") + handler.clean_up() + config = handler.read_scenario_config(scenario_name) + print(config) + errors = handler.send_ol_events(scenario_name) + print(f"Syntax validation for {scenario_name}") + scenario = validator.validate_syntax(config, errors, scenario_name) + + if scenario.status == "SUCCESS": + if dump: + print(f"Dumping api state of {scenario_name}") + handler.dump_api_state(scenario_name) + else: + consumer_config = handler.read_consumer_config(scenario_name) + api_state = handler.get_api_state() + validation_events = handler.load_validation_events(scenario_name, consumer_config) + print(f"Api state validation for {scenario_name}") + scenario.update(validator.validate_api_state(scenario_name, api_state, validation_events)) + + handler.clean_up() + return scenario def main(): - consumer_dir, scenario_dir, parent, client, release, dump = get_arguments() - validator = Validator(client, consumer_dir, scenario_dir, parent, release) - scenarios = list_scenarios(consumer_dir) - reports = [validator.validate(scenario, dump) for scenario in scenarios] - t = open('dataplex-report.json', 'w') - print(os.path.abspath(t.name)) - json.dump([{"name": "dataplex", "component_type": "consumer", "scenarios": reports}], t, indent=2) + consumer_dir, scenario_dir, parent, client, release, dump, target = get_arguments() + validator = Validator() + handler = EntityHandler(client, consumer_dir, scenario_dir, parent, release) + scenarios_names = list_scenarios(consumer_dir) + scenarios = [process_scenario(validator, handler, scenario, dump) for scenario in scenarios_names] + + component = Component('dataplex', 'consumer', {s.name: s for s in scenarios}, "", "") + report = Report.single_component_report(component) + + with Path(target).open('w') as t: + json.dump(report.to_dict(), t, indent=2) if __name__ == "__main__": diff --git a/consumer/scenarios/CLL/config.json b/consumer/scenarios/CLL/config.json index b57a1d6a..78327c66 100644 --- a/consumer/scenarios/CLL/config.json +++ b/consumer/scenarios/CLL/config.json @@ -1,7 +1,4 @@ { - "tags": { - "min_version": "1.22.0", - "max_version": "1.23.0", - "input": "spark" - } + "openlineage_version": "1.19.0", + "input": "spark" } diff --git a/consumer/scenarios/airflow/config.json b/consumer/scenarios/airflow/config.json index dd5e1b87..c08f1b9b 100644 --- a/consumer/scenarios/airflow/config.json +++ b/consumer/scenarios/airflow/config.json @@ -1,5 +1,4 @@ { - "tags": { - "input": "airflow" - } + "openlineage_version": "1.14.0", + "input": "airflow" } diff --git a/consumer/scenarios/simple_run_event/config.json b/consumer/scenarios/simple_run_event/config.json index 43d91ca4..e3c35aea 100644 --- a/consumer/scenarios/simple_run_event/config.json +++ b/consumer/scenarios/simple_run_event/config.json @@ -1,7 +1,3 @@ { - "tags": { - "min_version": "1.22.0", - "max_version": "1.23.0", - "input": "" - } + "openlineage_version": "1.15.0" } diff --git a/consumer/scenarios/spark_dataproc_bigquery_shakespare/config.json b/consumer/scenarios/spark_dataproc_bigquery_shakespare/config.json index b20f397f..dcfc315f 100644 --- a/consumer/scenarios/spark_dataproc_bigquery_shakespare/config.json +++ b/consumer/scenarios/spark_dataproc_bigquery_shakespare/config.json @@ -1,7 +1,4 @@ { - "tags": { - "min_version": "1.22.0", - "max_version": "1.23.0", - "input": "spark_dataproc" - } + "openlineage_version": "1.23.0", + "input": "spark_dataproc" } diff --git a/consumer/scenarios/spark_dataproc_simple_producer_test/config.json b/consumer/scenarios/spark_dataproc_simple_producer_test/config.json index b20f397f..dcfc315f 100644 --- a/consumer/scenarios/spark_dataproc_simple_producer_test/config.json +++ b/consumer/scenarios/spark_dataproc_simple_producer_test/config.json @@ -1,7 +1,4 @@ { - "tags": { - "min_version": "1.22.0", - "max_version": "1.23.0", - "input": "spark_dataproc" - } + "openlineage_version": "1.23.0", + "input": "spark_dataproc" } diff --git a/consumer/scenarios/spark_dataproc_simple_producer_test_complete/config.json b/consumer/scenarios/spark_dataproc_simple_producer_test_complete/config.json index b20f397f..dcfc315f 100644 --- a/consumer/scenarios/spark_dataproc_simple_producer_test_complete/config.json +++ b/consumer/scenarios/spark_dataproc_simple_producer_test_complete/config.json @@ -1,7 +1,4 @@ { - "tags": { - "min_version": "1.22.0", - "max_version": "1.23.0", - "input": "spark_dataproc" - } + "openlineage_version": "1.23.0", + "input": "spark_dataproc" } diff --git a/generated-files/report.json b/generated-files/report.json index 2dcbe1db..0637a088 100644 --- a/generated-files/report.json +++ b/generated-files/report.json @@ -1,1433 +1 @@ -[ - { - "name": "spark-mock", - "component_type": "producer", - "scenarios": [ - { - "name": "mock", - "status": "SUCCESS", - "tests": [ - { - "name": "mock", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "run_event", - "parent", - "spark_properties", - "processing_engine", - "gcp_dataproc_spark", - "jobType", - "sql", - "gcp_lineage", - "dataSource", - "schema", - "symlinks", - "columnLineage" - ], - "lineage_level": { - "hive": [ - "dataset", - "column", - "transformation" - ], - "jdbc": [ - "dataset", - "column" - ] - } - } - } - ] - } - ] - }, - { - "name": "hive-mock", - "component_type": "producer", - "scenarios": [ - { - "name": "simple_scenario", - "status": "SUCCESS", - "tests": [ - { - "name": "mock", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "run_event" - ], - "lineage_level": { - "hdfs": [ - "dataset" - ] - } - } - } - ] - } - ] - }, - { - "name": "marquez-mock", - "component_type": "consumer", - "scenarios": [ - { - "name": "mock", - "status": "SUCCESS", - "tests": [ - { - "name": "mock", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "process", - "details": [], - "tags": { - "facets": [], - "max_version": "1.23.0", - "min_version": "1.22.0", - "producer": "spark" - } - } - ] - } - ] - }, - { - "name": "dataplex", - "component_type": "consumer", - "scenarios": [ - { - "name": "simple_run_event", - "status": "SUCCESS", - "tests": [ - { - "name": "simple_run_event.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "process", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "process", - "details": [], - "tags": { - "facets": [ - "run_event" - ], - "min_version": "1.22.0", - "producer": "spark" - } - }, - { - "name": "runs", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "run", - "details": [], - "tags": { - "facets": [ - "run_event" - ], - "min_version": "1.22.0", - "producer": "spark" - } - }, - { - "name": "lineage_events", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "event", - "details": [], - "tags": { - "facets": [ - "run_event" - ], - "min_version": "1.22.0", - "producer": "spark" - } - }, - { - "name": "links", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "link", - "details": [], - "tags": { - "facets": [ - "run_event" - ], - "min_version": "1.22.0", - "producer": "spark" - } - } - ] - }, - { - "name": "spark_dataproc_simple_producer_test", - "status": "SUCCESS", - "tests": [ - { - "name": "4.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "3.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "2.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "1.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "process", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "process", - "details": [], - "tags": { - "facets": [ - "run_event" - ], - "min_version": "1.22.0", - "producer": "spark" - } - }, - { - "name": "runs", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "run", - "details": [], - "tags": { - "facets": [ - "run_event", - "processing_engine" - ], - "min_version": "1.22.0", - "producer": "spark" - } - }, - { - "name": "lineage_events", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "event", - "details": [], - "tags": { - "facets": [ - "run_event" - ], - "min_version": "1.22.0", - "producer": "spark" - } - }, - { - "name": "links", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "link", - "details": [], - "tags": { - "facets": [ - "run_event" - ], - "min_version": "1.22.0", - "producer": "airflow" - } - } - ] - }, - { - "name": "airflow", - "status": "SUCCESS", - "tests": [ - { - "name": "line_01.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_02.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_03.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_04.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_05.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_06.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_07.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_08.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_09.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_10.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_11.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_12.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_13.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_14.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_15.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_16.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_17.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_18.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_19.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_20.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_21.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_22.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_23.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_24.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_25.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_26.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_27.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_28.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_29.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_30.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_31.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "line_32.json", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "process", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "process", - "details": [], - "tags": { - "facets": [ - "run_event" - ], - "producer": "airflow" - } - }, - { - "name": "runs", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "run", - "details": [], - "tags": { - "facets": [ - "run_event", - "processing_engine" - ], - "producer": "airflow" - } - }, - { - "name": "lineage_events", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "lineage_event", - "details": [], - "tags": { - "facets": [ - "run_event" - ], - "producer": "airflow" - } - }, - { - "name": "links", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "link", - "details": [], - "tags": { - "facets": [ - "run_event" - ], - "producer": "airflow" - } - } - ] - } - ] - }, - { - "name": "spark_dataproc", - "component_type": "producer", - "scenarios": [ - { - "name": "bigquery_input", - "status": "SUCCESS", - "tests": [ - { - "name": "default:reading_from_big_query.adaptive_spark_plan.tmp_my_shakespeare_output:RUNNING", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:reading_from_big_query:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:reading_from_big_query.adaptive_spark_plan.tmp_my_shakespeare_output:START", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:reading_from_big_query.adaptive_spark_plan.tmp_my_shakespeare_output:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:reading_from_big_query:START", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "complete_event_1.23.0", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "run_event", - "parent", - "spark_properties", - "processing_engine", - "gcp_dataproc_spark", - "jobType", - "gcp_lineage", - "dataSource", - "schema", - "columnLineage" - ], - "max_version": "1.23.0", - "lineage_level": { - "bigquery": [ - "dataset", - "column", - "transformation" - ] - } - } - }, - { - "name": "input_bigquery", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "dataSource", - "schema", - "columnLineage" - ], - "lineage_level": { - "bigquery": [ - "dataset", - "column", - "transformation" - ] - } - } - }, - { - "name": "complete_event", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "run_event", - "parent", - "spark_properties", - "processing_engine", - "gcp_dataproc", - "jobType", - "gcp_lineage", - "dataSource", - "schema", - "columnLineage" - ], - "min_version": "1.24.0", - "lineage_level": { - "bigquery": [ - "dataset", - "column", - "transformation" - ] - } - } - } - ] - }, - { - "name": "bigquery_output", - "status": "SUCCESS", - "tests": [ - { - "name": "default:writing_to_big_query.adaptive_spark_plan._spark-bigquery-application_1732182497982_0002-1bc6564b-38fe-4c8a-9a0c-874ec37ea26f:RUNNING", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:writing_to_big_query:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:writing_to_big_query.adaptive_spark_plan._spark-bigquery-application_1732182497982_0002-1bc6564b-38fe-4c8a-9a0c-874ec37ea26f:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:writing_to_big_query:START", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:writing_to_big_query.execute_save_into_data_source_command:START", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:writing_to_big_query.adaptive_spark_plan._spark-bigquery-application_1732182497982_0002-1bc6564b-38fe-4c8a-9a0c-874ec37ea26f:START", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:writing_to_big_query.execute_save_into_data_source_command:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "complete_event_1.23.0", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "run_event", - "parent", - "spark_properties", - "processing_engine", - "gcp_dataproc_spark", - "jobType", - "gcp_lineage", - "dataSource", - "schema", - "columnLineage" - ], - "max_version": "1.23.0", - "lineage_level": { - "bigquery": [ - "dataset", - "column", - "transformation" - ] - } - } - }, - { - "name": "default:writing_to_big_query.adaptive_spark_plan._spark-bigquery-application_1732282398466_0002-1a47efc8-e046-4d9a-bac7-49a8fc6b9eca:RUNNING", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:writing_to_big_query.adaptive_spark_plan._spark-bigquery-application_1732282398466_0002-1a47efc8-e046-4d9a-bac7-49a8fc6b9eca:START", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:writing_to_big_query.adaptive_spark_plan._spark-bigquery-application_1732282398466_0002-1a47efc8-e046-4d9a-bac7-49a8fc6b9eca:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "complete_event", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "run_event", - "parent", - "spark_properties", - "processing_engine", - "gcp_dataproc_spark", - "jobType", - "gcp_lineage", - "dataSource", - "schema", - "columnLineage" - ], - "min_version": "1.24.0", - "lineage_level": { - "bigquery": [ - "dataset", - "column", - "transformation" - ] - } - } - } - ] - }, - { - "name": "simple_scenario", - "status": "SUCCESS", - "tests": [ - { - "name": "default:simple_test.drop_table:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:simple_test.execute_insert_into_hive_table.warehouse_t2:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:simple_test.execute_insert_into_hive_table.warehouse_t1:RUNNING", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:simple_test.execute_create_table_command.warehouse_t1:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:simple_test:START", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:simple_test.execute_insert_into_hive_table.warehouse_t1:START", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:simple_test:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:simple_test.execute_create_hive_table_as_select_command.default_t2:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:simple_test.drop_table:START", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:simple_test.execute_insert_into_hive_table.warehouse_t1:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "complete_event", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "run_event", - "parent", - "spark_properties", - "processing_engine", - "gcp_dataproc_spark", - "jobType", - "sql", - "gcp_lineage", - "dataSource", - "schema", - "symlinks", - "columnLineage" - ], - "min_version": "1.24.0", - "lineage_level": { - "hive": [ - "dataset", - "column", - "transformation" - ] - } - } - }, - { - "name": "input_hive", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "dataSource", - "schema", - "symlinks", - "columnLineage" - ], - "min_version": "1.22.0", - "lineage_level": { - "hive": [ - "dataset", - "column", - "transformation" - ] - } - } - }, - { - "name": "default:simple_test.execute_create_hive_table_as_select_command.default_t2:START", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - } - ] - }, - { - "name": "hive", - "status": "SUCCESS", - "tests": [ - { - "name": "default:simple_test.drop_table:START", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:simple_test:START", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:simple_test.execute_insert_into_hive_table.warehouse_t1:START", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:simple_test.execute_insert_into_hive_table.warehouse_t1:RUNNING", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:simple_test.drop_table:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:simple_test.execute_create_table_command.warehouse_t1:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:simple_test.execute_insert_into_hive_table.warehouse_t1:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:simple_test.execute_insert_into_hive_table.warehouse_t2:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:simple_test.execute_create_hive_table_as_select_command.default_t2:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:simple_test.execute_create_hive_table_as_select_command.default_t2:START", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:simple_test:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "run_event_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "run_event" - ] - } - }, - { - "name": "parent_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "parent" - ] - } - }, - { - "name": "spark_properties_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "spark_properties" - ] - } - }, - { - "name": "processing_engine_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "processing_engine" - ] - } - }, - { - "name": "gcp_dataproc_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "gcp_dataproc" - ], - "min_version": "1.24.0" - } - }, - { - "name": "jobType_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "jobType" - ] - } - }, - { - "name": "gcp_lineage_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "gcp_lineage" - ] - } - }, - { - "name": "dataSource_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "dataSource" - ] - } - }, - { - "name": "schema_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "schema" - ], - "lineage_level": { - "bigquery": [ - "dataset" - ] - } - } - }, - { - "name": "columnLineage_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "columnLineage" - ], - "lineage_level": { - "bigquery": [ - "dataset", - "column", - "transformation" - ] - } - } - } - ] - }, - { - "name": "bigquery", - "status": "SUCCESS", - "tests": [ - { - "name": "default:writing_to_big_query.adaptive_spark_plan._spark-bigquery-application_:RUNNING", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:writing_to_big_query.adaptive_spark_plan._spark-bigquery-application_:START", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:writing_to_big_query.adaptive_spark_plan._spark-bigquery-application_:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:writing_to_big_query.execute_save_into_data_source_command:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:writing_to_big_query:START", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:writing_to_big_query.execute_save_into_data_source_command:START", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "default:writing_to_big_query:COMPLETE", - "status": "SUCCESS", - "validation_type": "syntax", - "entity_type": "openlineage", - "details": [], - "tags": {} - }, - { - "name": "run_event_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "run_event" - ] - } - }, - { - "name": "parent_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "parent" - ] - } - }, - { - "name": "spark_properties_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "spark_properties" - ] - } - }, - { - "name": "processing_engine_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "processing_engine" - ] - } - }, - { - "name": "gcp_dataproc_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "gcp_dataproc" - ], - "min_version": "1.24.0" - } - }, - { - "name": "jobType_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "jobType" - ] - } - }, - { - "name": "gcp_lineage_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "gcp_lineage" - ] - } - }, - { - "name": "dataSource_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "dataSource" - ] - } - }, - { - "name": "schema_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "schema" - ], - "lineage_level": { - "bigquery": [ - "dataset" - ] - } - } - }, - { - "name": "columnLineage_test", - "status": "SUCCESS", - "validation_type": "semantics", - "entity_type": "openlineage", - "details": [], - "tags": { - "facets": [ - "columnLineage" - ], - "lineage_level": { - "bigquery": [ - "dataset", - "column", - "transformation" - ] - } - } - } - ] - } - ] - } -] \ No newline at end of file +[] \ No newline at end of file diff --git a/producer/spark_dataproc/runner/dataproc_workflow.py b/producer/spark_dataproc/runner/dataproc_workflow.py index df73143b..1358f398 100644 --- a/producer/spark_dataproc/runner/dataproc_workflow.py +++ b/producer/spark_dataproc/runner/dataproc_workflow.py @@ -525,6 +525,7 @@ async def run_job_command(args): credentials=credentials, ) events_path = f"events/{test_id}/" + print(f"EVENT PATH: {events_path}") job_gcs_dir = f"gs://{args.gcs_bucket}/jobs" uploaded_job_file = upload_to_gcs( source_path=args.python_job, @@ -552,6 +553,7 @@ async def run_job_command(args): files_to_download = list_blobs_with_prefix( args.gcs_bucket, events_path, "json", credentials=credentials ) + print(f"FILES TO DOWNLOAD: {files_to_download}") if files_to_download: download_files_in_parallel( args.gcs_bucket, diff --git a/producer/spark_dataproc/runner/get_openlineage_jar.sh b/producer/spark_dataproc/runner/get_openlineage_jar.sh index 6e139f99..83ff86d3 100755 --- a/producer/spark_dataproc/runner/get_openlineage_jar.sh +++ b/producer/spark_dataproc/runner/get_openlineage_jar.sh @@ -15,11 +15,12 @@ if [[ -n "${OPENLINEAGE_SPARK_URL}" ]]; then bq_url="${SPARK_BQ_CONNECTOR_URL}" ol_url="${OPENLINEAGE_SPARK_URL}" spanner_url="${SPARK_SPANNER_CONNECTOR_URL}" + bigtable_url="${SPARK_BIGTABLE_CONNECTOR_URL}" else bq_url="gs://open-lineage-e2e/jars/spark-3.5-bigquery-0.0.1-SNAPSHOT.jar" ol_url="gs://open-lineage-e2e/jars/openlineage-spark_2.12-1.29.0-SNAPSHOT.jar" spanner_url="gs://open-lineage-e2e/jars/spark-3.1-spanner-1.1.0.jar" - bigtable_url="gs://open-lineage-e2e/jars/spark-bigtable_2.12-0.3.0.jar" + bigtable_url="gs://open-lineage-e2e/jars/spark-bigtable_2.12-0.5.0-SNAPSHOT.jar" fi postgresql_url="gs://open-lineage-e2e/jars/postgresql-42.5.6.jar" @@ -27,4 +28,5 @@ postgresql_url="gs://open-lineage-e2e/jars/postgresql-42.5.6.jar" gsutil cp -P "${bq_url}" "${VM_SPARK_JARS_DIR}/" gsutil cp -P "${ol_url}" "${VM_SPARK_JARS_DIR}/" gsutil cp -P "${spanner_url}" "${VM_SPARK_JARS_DIR}/" -gsutil cp -P "${postgresql_url}" "${VM_SPARK_JARS_DIR}/" \ No newline at end of file +gsutil cp -P "${postgresql_url}" "${VM_SPARK_JARS_DIR}/" +gsutil cp -P "${bigtable_url}" "${VM_SPARK_JARS_DIR}/" \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/bigquery/config.json b/producer/spark_dataproc/scenarios/bigquery/config.json index feae21e4..fa6b6328 100644 --- a/producer/spark_dataproc/scenarios/bigquery/config.json +++ b/producer/spark_dataproc/scenarios/bigquery/config.json @@ -1,4 +1,12 @@ { + "component_versions": { + "min": "3.5.0", + "max": "3.5.1" + }, + "openlineage_versions": { + "min": "1.0.0", + "max": "5.0.0" + }, "patterns": [ "writing_to_big_query.adaptive_spark_plan._spark-bigquery-application_" ], diff --git a/producer/spark_dataproc/scenarios/bigquery/events/processing_engine_test.json b/producer/spark_dataproc/scenarios/bigquery/events/processing_engine_test.json index a85356c3..7e9ba832 100644 --- a/producer/spark_dataproc/scenarios/bigquery/events/processing_engine_test.json +++ b/producer/spark_dataproc/scenarios/bigquery/events/processing_engine_test.json @@ -7,7 +7,7 @@ "run": { "facets": { "processing_engine": { - "version": "3.5.1", + "version": "3.5.3", "name": "spark", "openlineageAdapterVersion": "{{ any(result) }}" } diff --git a/producer/spark_dataproc/scenarios/bigtable/config.json b/producer/spark_dataproc/scenarios/bigtable/config.json index 756d1aee..12916067 100644 --- a/producer/spark_dataproc/scenarios/bigtable/config.json +++ b/producer/spark_dataproc/scenarios/bigtable/config.json @@ -1,4 +1,12 @@ { + "component_versions": { + "min": "3.5.0", + "max": "3.5.1" + }, + "openlineage_versions": { + "min": "1.0.0", + "max": "1.0.0" + }, "tests": [ { "name": "columnLineage_test", @@ -8,7 +16,7 @@ "columnLineage" ], "lineage_level": { - "cloudsql": [ + "bigtable": [ "dataset", "column", "transformation" @@ -49,7 +57,12 @@ "tags": { "facets": [ "schema" - ] + ], + "lineage_level": { + "bigtable": [ + "dataset" + ] + } } }, { diff --git a/producer/spark_dataproc/scenarios/bigtable/test/test.py b/producer/spark_dataproc/scenarios/bigtable/test/test.py index 519536b0..1999b3f7 100644 --- a/producer/spark_dataproc/scenarios/bigtable/test/test.py +++ b/producer/spark_dataproc/scenarios/bigtable/test/test.py @@ -15,6 +15,10 @@ def generate_test_row(number, start_range): doubleCol=float(number / 3.14) ) +# def drop_if_exists(instance, table_name): +# table = instance.table(table_id=table_name) +# if table.exists(): +# table.delete() def generate_table_id(test_name): return f"cbt-{test_name}-{uuid.uuid4().hex[:20]}" @@ -63,9 +67,10 @@ def delete_bigtable_table(table_name, admin_client): spark = SparkSession.builder.appName("BigtableExample").getOrCreate() -test_name = "test" -input_table = "input_table" -output_table = "output_table" +suffix = spark.conf.get('spark.scenario.suffix') +test_name = f"test_{suffix}" +input_table = f"input_table_{suffix}" +output_table = f"output_table_{suffix}" # Assuming admin_client is already set up # create_bigtable_table(input_table, admin_client) @@ -91,6 +96,12 @@ def delete_bigtable_table(table_name, admin_client): project_id = "gcp-open-lineage-testing" instance_id = "openlineage-test" +client = bigtable.Client(project=project_id, admin=True) +instance = client.instance(instance_id) + +# drop_if_exists(instance, input_table) +# drop_if_exists(instance, output_table) + write_dataframe_to_bigtable(test_df, raw_basic_catalog, project_id, instance_id, True) read_df = read_dataframe_from_bigtable(spark, raw_basic_catalog, project_id, instance_id) @@ -112,11 +123,7 @@ def delete_bigtable_table(table_name, admin_client): spark.stop() -#Cleanup after the run - -client = bigtable.Client(project=project_id, admin=True) - -instance = client.instance(instance_id) +# Cleanup after the run bt_table1 = instance.table(table_id=input_table) bt_table2 = instance.table(table_id=output_table) diff --git a/producer/spark_dataproc/scenarios/cloudsql/config.json b/producer/spark_dataproc/scenarios/cloudsql/config.json index 64a9fdb9..90b81613 100644 --- a/producer/spark_dataproc/scenarios/cloudsql/config.json +++ b/producer/spark_dataproc/scenarios/cloudsql/config.json @@ -1,4 +1,12 @@ { + "component_versions": { + "min": "3.5.0", + "max": "3.5.1" + }, + "openlineage_versions": { + "min": "1.0.0", + "max": "5.0.0" + }, "tests": [ { "name": "columnLineage_test", @@ -58,7 +66,12 @@ "tags": { "facets": [ "schema" - ] + ], + "lineage_level": { + "bigtable": [ + "dataset" + ] + } } }, { diff --git a/producer/spark_dataproc/scenarios/hive/config.json b/producer/spark_dataproc/scenarios/hive/config.json index 2a6e415f..ab646618 100644 --- a/producer/spark_dataproc/scenarios/hive/config.json +++ b/producer/spark_dataproc/scenarios/hive/config.json @@ -1,8 +1,19 @@ { + "component_versions": { + "min": "3.1.0", + "max": "3.5.1" + }, + "openlineage_versions": { + "min": "1.0.0", + "max": "5.0.0" + }, "tests": [ { "name": "run_event_test", "path": "events/run_event_test.json", + "component_versions": { + "max": "3.3.2" + }, "tags": { "facets": [ "run_event" @@ -12,6 +23,9 @@ { "name": "parent_test", "path": "events/parent_test.json", + "component_versions": { + "max": "3.3.2" + }, "tags": { "facets": [ "parent" @@ -21,6 +35,9 @@ { "name": "spark_properties_test", "path": "events/spark_properties_test.json", + "component_versions": { + "max": "3.3.2" + }, "tags": { "facets": [ "spark_properties" @@ -30,6 +47,9 @@ { "name": "processing_engine_test", "path": "events/processing_engine_test.json", + "component_versions": { + "max": "3.3.2" + }, "tags": { "facets": [ "processing_engine" @@ -39,6 +59,9 @@ { "name": "gcp_dataproc_spark_test", "path": "events/gcp_dataproc_spark_test.json", + "component_versions": { + "max": "3.3.2" + }, "tags": { "max_version": "1.23.0", "facets": [ @@ -49,6 +72,9 @@ { "name": "gcp_dataproc_test", "path": "events/gcp_dataproc_test.json", + "component_versions": { + "max": "3.3.2" + }, "tags": { "facets": [ "gcp_dataproc" @@ -59,6 +85,9 @@ { "name": "jobType_test", "path": "events/jobType_test.json", + "component_versions": { + "max": "3.3.2" + }, "tags": { "facets": [ "jobType" @@ -68,6 +97,9 @@ { "name": "gcp_lineage_test", "path": "events/gcp_lineage_test.json", + "component_versions": { + "max": "3.3.2" + }, "tags": { "facets": [ "gcp_lineage" @@ -77,6 +109,9 @@ { "name": "dataSource_test", "path": "events/dataSource_test.json", + "component_versions": { + "max": "3.3.2" + }, "tags": { "facets": [ "dataSource" @@ -86,6 +121,9 @@ { "name": "schema_test", "path": "events/schema_test.json", + "component_versions": { + "max": "3.3.2" + }, "tags": { "facets": [ "schema" @@ -100,6 +138,9 @@ { "name": "columnLineage_test", "path": "events/columnLineage_test.json", + "component_versions": { + "max": "3.3.2" + }, "tags": { "facets": [ "columnLineage" @@ -112,6 +153,156 @@ ] } } + }, + { + "name": "run_event_test_3.5.1", + "path": "events_3.5.1/run_event_test.json", + "component_versions": { + "min": "3.5.1" + }, + "tags": { + "facets": [ + "run_event" + ] + } + }, + { + "name": "parent_test_3.5.1", + "path": "events_3.5.1/parent_test.json", + "component_versions": { + "min": "3.5.1" + }, + "tags": { + "facets": [ + "parent" + ] + } + }, + { + "name": "spark_properties_test_3.5.1", + "path": "events_3.5.1/spark_properties_test.json", + "component_versions": { + "min": "3.5.1" + }, + "tags": { + "facets": [ + "spark_properties" + ] + } + }, + { + "name": "processing_engine_test_3.5.1", + "path": "events_3.5.1/processing_engine_test.json", + "component_versions": { + "min": "3.5.1" + }, + "tags": { + "facets": [ + "processing_engine" + ] + } + }, + { + "name": "gcp_dataproc_spark_test_3.5.1", + "path": "events_3.5.1/gcp_dataproc_spark_test.json", + "component_versions": { + "min": "3.5.1" + }, + "openlineage_versions": { + "max": "1.23.0" + }, + "tags": { + "facets": [ + "gcp_dataproc_spark" + ] + } + }, + { + "name": "gcp_dataproc_test_3.5.1", + "path": "events_3.5.1/gcp_dataproc_test.json", + "component_versions": { + "min": "3.5.1" + }, + "openlineage_versions": { + "min": "1.24.0" + }, + "tags": { + "facets": [ + "gcp_dataproc" + ] + } + }, + { + "name": "jobType_test_3.5.1", + "path": "events_3.5.1/jobType_test.json", + "component_versions": { + "min": "3.5.1" + }, + "tags": { + "facets": [ + "jobType" + ] + } + }, + { + "name": "gcp_lineage_test_3.5.1", + "path": "events_3.5.1/gcp_lineage_test.json", + "component_versions": { + "min": "3.5.1" + }, + "tags": { + "facets": [ + "gcp_lineage" + ] + } + }, + { + "name": "dataSource_test_3.5.1", + "path": "events_3.5.1/dataSource_test.json", + "component_versions": { + "min": "3.5.1" + }, + "tags": { + "facets": [ + "dataSource" + ] + } + }, + { + "name": "schema_test_3.5.1", + "path": "events_3.5.1/schema_test.json", + "component_versions": { + "min": "3.5.1" + }, + "tags": { + "facets": [ + "schema" + ], + "lineage_level": { + "hive": [ + "dataset" + ] + } + } + }, + { + "name": "columnLineage_test_3.5.1", + "path": "events_3.5.1/columnLineage_test.json", + "component_versions": { + "min": "3.5.1" + }, + "tags": { + "facets": [ + "columnLineage" + ], + "lineage_level": { + "hive": [ + "dataset", + "column", + "transformation" + ] + } + } } ] } diff --git a/producer/spark_dataproc/scenarios/hive/events/columnLineage_test.json b/producer/spark_dataproc/scenarios/hive/events/columnLineage_test.json index 8c9dc823..d947fab4 100644 --- a/producer/spark_dataproc/scenarios/hive/events/columnLineage_test.json +++ b/producer/spark_dataproc/scenarios/hive/events/columnLineage_test.json @@ -2,7 +2,7 @@ "eventType": "COMPLETE", "job": { "namespace": "default", - "name": "simple_test.execute_insert_into_hive_table.warehouse_t2" + "name": "simple_test.execute_create_hive_table_as_select_command.default_t2" }, "outputs": [ { diff --git a/producer/spark_dataproc/scenarios/hive/events/dataSource_test.json b/producer/spark_dataproc/scenarios/hive/events/dataSource_test.json index 0be0838b..45a89315 100644 --- a/producer/spark_dataproc/scenarios/hive/events/dataSource_test.json +++ b/producer/spark_dataproc/scenarios/hive/events/dataSource_test.json @@ -2,7 +2,7 @@ "eventType": "COMPLETE", "job": { "namespace": "default", - "name": "simple_test.execute_insert_into_hive_table.warehouse_t2" + "name": "simple_test.execute_create_hive_table_as_select_command.default_t2" }, "inputs": [ { diff --git a/producer/spark_dataproc/scenarios/hive/events/gcp_dataproc_spark_test.json b/producer/spark_dataproc/scenarios/hive/events/gcp_dataproc_spark_test.json index 62cab5e9..44755d01 100644 --- a/producer/spark_dataproc/scenarios/hive/events/gcp_dataproc_spark_test.json +++ b/producer/spark_dataproc/scenarios/hive/events/gcp_dataproc_spark_test.json @@ -2,12 +2,12 @@ "eventType": "COMPLETE", "job": { "namespace": "default", - "name": "simple_test.execute_insert_into_hive_table.warehouse_t2" + "name": "simple_test.execute_create_hive_table_as_select_command.default_t2" }, "run": { "facets": { "gcp_dataproc_spark": { - "queryNodeName": "execute_insert_into_hive_table", + "queryNodeName": "execute_create_hive_table_as_select_command", "appName": "simple test", "appId": "{{ any(result) }}", "projectId": "gcp-open-lineage-testing" diff --git a/producer/spark_dataproc/scenarios/hive/events/gcp_dataproc_test.json b/producer/spark_dataproc/scenarios/hive/events/gcp_dataproc_test.json index 61901ec4..9e4ce965 100644 --- a/producer/spark_dataproc/scenarios/hive/events/gcp_dataproc_test.json +++ b/producer/spark_dataproc/scenarios/hive/events/gcp_dataproc_test.json @@ -2,12 +2,12 @@ "eventType": "COMPLETE", "job": { "namespace": "default", - "name": "simple_test.execute_insert_into_hive_table.warehouse_t2" + "name": "simple_test.execute_create_hive_table_as_select_command.default_t2" }, "run": { "facets": { "gcp_dataproc": { - "queryNodeName": "execute_insert_into_hive_table", + "queryNodeName": "execute_create_hive_table_as_select_command", "appName": "simple test", "appId": "{{ any(result) }}", "projectId": "gcp-open-lineage-testing" diff --git a/producer/spark_dataproc/scenarios/hive/events/gcp_lineage_test.json b/producer/spark_dataproc/scenarios/hive/events/gcp_lineage_test.json index eca4b171..33abcba2 100644 --- a/producer/spark_dataproc/scenarios/hive/events/gcp_lineage_test.json +++ b/producer/spark_dataproc/scenarios/hive/events/gcp_lineage_test.json @@ -2,7 +2,7 @@ "eventType": "COMPLETE", "job": { "namespace": "default", - "name": "simple_test.execute_insert_into_hive_table.warehouse_t2", + "name": "simple_test.execute_create_hive_table_as_select_command.default_t2", "facets": { "gcp_lineage": { "origin": { diff --git a/producer/spark_dataproc/scenarios/hive/events/jobType_test.json b/producer/spark_dataproc/scenarios/hive/events/jobType_test.json index 0b09e4c5..293c406f 100644 --- a/producer/spark_dataproc/scenarios/hive/events/jobType_test.json +++ b/producer/spark_dataproc/scenarios/hive/events/jobType_test.json @@ -2,7 +2,7 @@ "eventType": "COMPLETE", "job": { "namespace": "default", - "name": "simple_test.execute_insert_into_hive_table.warehouse_t2", + "name": "simple_test.execute_create_hive_table_as_select_command.default_t2", "facets": { "jobType": { "processingType": "BATCH", diff --git a/producer/spark_dataproc/scenarios/hive/events/parent_test.json b/producer/spark_dataproc/scenarios/hive/events/parent_test.json index d748c2a1..5bc41d3a 100644 --- a/producer/spark_dataproc/scenarios/hive/events/parent_test.json +++ b/producer/spark_dataproc/scenarios/hive/events/parent_test.json @@ -2,7 +2,7 @@ "eventType": "COMPLETE", "job": { "namespace": "default", - "name": "simple_test.execute_insert_into_hive_table.warehouse_t2" + "name": "simple_test.execute_create_hive_table_as_select_command.default_t2" }, "run": { "facets": { diff --git a/producer/spark_dataproc/scenarios/hive/events/processing_engine_test.json b/producer/spark_dataproc/scenarios/hive/events/processing_engine_test.json index dfa7e929..fa54bf35 100644 --- a/producer/spark_dataproc/scenarios/hive/events/processing_engine_test.json +++ b/producer/spark_dataproc/scenarios/hive/events/processing_engine_test.json @@ -2,7 +2,7 @@ "eventType": "COMPLETE", "job": { "namespace": "default", - "name": "simple_test.execute_insert_into_hive_table.warehouse_t2" + "name": "simple_test.execute_create_hive_table_as_select_command.default_t2" }, "run": { "facets": { diff --git a/producer/spark_dataproc/scenarios/hive/events/run_event_test.json b/producer/spark_dataproc/scenarios/hive/events/run_event_test.json index d2778ae4..3edd6558 100644 --- a/producer/spark_dataproc/scenarios/hive/events/run_event_test.json +++ b/producer/spark_dataproc/scenarios/hive/events/run_event_test.json @@ -6,7 +6,7 @@ }, "job": { "namespace": "default", - "name": "simple_test.execute_insert_into_hive_table.warehouse_t2", + "name": "simple_test.execute_create_hive_table_as_select_command.default_t2", "facets": {} }, "inputs": [ diff --git a/producer/spark_dataproc/scenarios/hive/events/schema_test.json b/producer/spark_dataproc/scenarios/hive/events/schema_test.json index 3809d0e8..82ba1d9f 100644 --- a/producer/spark_dataproc/scenarios/hive/events/schema_test.json +++ b/producer/spark_dataproc/scenarios/hive/events/schema_test.json @@ -2,7 +2,7 @@ "eventType": "COMPLETE", "job": { "namespace": "default", - "name": "simple_test.execute_insert_into_hive_table.warehouse_t2" + "name": "simple_test.execute_create_hive_table_as_select_command.default_t2" }, "inputs": [ { diff --git a/producer/spark_dataproc/scenarios/hive/events/spark_properties_test.json b/producer/spark_dataproc/scenarios/hive/events/spark_properties_test.json index 9e54505f..1890f6c1 100644 --- a/producer/spark_dataproc/scenarios/hive/events/spark_properties_test.json +++ b/producer/spark_dataproc/scenarios/hive/events/spark_properties_test.json @@ -2,7 +2,7 @@ "eventType": "COMPLETE", "job": { "namespace": "default", - "name": "simple_test.execute_insert_into_hive_table.warehouse_t2" + "name": "simple_test.execute_create_hive_table_as_select_command.default_t2" }, "run": { "facets": { diff --git a/producer/spark_dataproc/scenarios/hive/events/sql_test.json b/producer/spark_dataproc/scenarios/hive/events/sql_test.json index f0740633..12052e5a 100644 --- a/producer/spark_dataproc/scenarios/hive/events/sql_test.json +++ b/producer/spark_dataproc/scenarios/hive/events/sql_test.json @@ -2,7 +2,7 @@ "eventType": "COMPLETE", "job": { "namespace": "default", - "name": "simple_test.execute_insert_into_hive_table.warehouse_t2", + "name": "simple_test.execute_create_hive_table_as_select_command.default_t2", "facets": { "sql": { "query": "CREATE TABLE IF NOT EXISTS t2 AS SELECT * FROM t1;" diff --git a/producer/spark_dataproc/scenarios/hive/events/symlinks_test.json b/producer/spark_dataproc/scenarios/hive/events/symlinks_test.json index cbc5c770..863037bd 100644 --- a/producer/spark_dataproc/scenarios/hive/events/symlinks_test.json +++ b/producer/spark_dataproc/scenarios/hive/events/symlinks_test.json @@ -2,7 +2,7 @@ "eventType": "COMPLETE", "job": { "namespace": "default", - "name": "simple_test.execute_insert_into_hive_table.warehouse_t2" + "name": "simple_test.execute_create_hive_table_as_select_command.default_t2" }, "inputs": [ { diff --git a/producer/spark_dataproc/scenarios/hive/events_3.5.1/columnLineage_test.json b/producer/spark_dataproc/scenarios/hive/events_3.5.1/columnLineage_test.json new file mode 100644 index 00000000..8c9dc823 --- /dev/null +++ b/producer/spark_dataproc/scenarios/hive/events_3.5.1/columnLineage_test.json @@ -0,0 +1,53 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "simple_test.execute_insert_into_hive_table.warehouse_t2" + }, + "outputs": [ + { + "namespace": "{{ any(result) }}", + "name": "/user/hive/warehouse/t2", + "facets": { + "columnLineage": { + "fields": { + "a": { + "inputFields": [ + { + "namespace": "{{ any(result) }}", + "name": "/user/hive/warehouse/t1", + "field": "a", + "transformations": [ + { + "type": "DIRECT", + "subtype": "IDENTITY", + "description": "", + "masking": false + } + ] + } + ] + }, + "b": { + "inputFields": [ + { + "namespace": "{{ any(result) }}", + "name": "/user/hive/warehouse/t1", + "field": "b", + "transformations": [ + { + "type": "DIRECT", + "subtype": "IDENTITY", + "description": "", + "masking": false + } + ] + } + ] + } + } + } + } + } + ] +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/hive/events_3.5.1/dataSource_test.json b/producer/spark_dataproc/scenarios/hive/events_3.5.1/dataSource_test.json new file mode 100644 index 00000000..0be0838b --- /dev/null +++ b/producer/spark_dataproc/scenarios/hive/events_3.5.1/dataSource_test.json @@ -0,0 +1,31 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "simple_test.execute_insert_into_hive_table.warehouse_t2" + }, + "inputs": [ + { + "namespace": "{{ any(result) }}", + "name": "/user/hive/warehouse/t1", + "facets": { + "dataSource": { + "name": "{{ any(result) }}", + "uri": "{{ any(result) }}" + } + } + } + ], + "outputs": [ + { + "namespace": "{{ any(result) }}", + "name": "/user/hive/warehouse/t2", + "facets": { + "dataSource": { + "name": "{{ any(result) }}", + "uri": "{{ any(result) }}" + } + } + } + ] +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/hive/events_3.5.1/gcp_dataproc_spark_test.json b/producer/spark_dataproc/scenarios/hive/events_3.5.1/gcp_dataproc_spark_test.json new file mode 100644 index 00000000..62cab5e9 --- /dev/null +++ b/producer/spark_dataproc/scenarios/hive/events_3.5.1/gcp_dataproc_spark_test.json @@ -0,0 +1,17 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "simple_test.execute_insert_into_hive_table.warehouse_t2" + }, + "run": { + "facets": { + "gcp_dataproc_spark": { + "queryNodeName": "execute_insert_into_hive_table", + "appName": "simple test", + "appId": "{{ any(result) }}", + "projectId": "gcp-open-lineage-testing" + } + } + } +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/hive/events_3.5.1/gcp_dataproc_test.json b/producer/spark_dataproc/scenarios/hive/events_3.5.1/gcp_dataproc_test.json new file mode 100644 index 00000000..61901ec4 --- /dev/null +++ b/producer/spark_dataproc/scenarios/hive/events_3.5.1/gcp_dataproc_test.json @@ -0,0 +1,17 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "simple_test.execute_insert_into_hive_table.warehouse_t2" + }, + "run": { + "facets": { + "gcp_dataproc": { + "queryNodeName": "execute_insert_into_hive_table", + "appName": "simple test", + "appId": "{{ any(result) }}", + "projectId": "gcp-open-lineage-testing" + } + } + } +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/hive/events_3.5.1/gcp_lineage_test.json b/producer/spark_dataproc/scenarios/hive/events_3.5.1/gcp_lineage_test.json new file mode 100644 index 00000000..eca4b171 --- /dev/null +++ b/producer/spark_dataproc/scenarios/hive/events_3.5.1/gcp_lineage_test.json @@ -0,0 +1,15 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "simple_test.execute_insert_into_hive_table.warehouse_t2", + "facets": { + "gcp_lineage": { + "origin": { + "sourceType": "DATAPROC", + "name": "projects/gcp-open-lineage-testing/regions/us-west1/unknown/" + } + } + } + } +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/hive/events_3.5.1/jobType_test.json b/producer/spark_dataproc/scenarios/hive/events_3.5.1/jobType_test.json new file mode 100644 index 00000000..0b09e4c5 --- /dev/null +++ b/producer/spark_dataproc/scenarios/hive/events_3.5.1/jobType_test.json @@ -0,0 +1,14 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "simple_test.execute_insert_into_hive_table.warehouse_t2", + "facets": { + "jobType": { + "processingType": "BATCH", + "integration": "SPARK", + "jobType": "SQL_JOB" + } + } + } +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/hive/events_3.5.1/parent_test.json b/producer/spark_dataproc/scenarios/hive/events_3.5.1/parent_test.json new file mode 100644 index 00000000..d748c2a1 --- /dev/null +++ b/producer/spark_dataproc/scenarios/hive/events_3.5.1/parent_test.json @@ -0,0 +1,20 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "simple_test.execute_insert_into_hive_table.warehouse_t2" + }, + "run": { + "facets": { + "parent": { + "run": { + "runId": "{{ is_uuid(result) }}" + }, + "job": { + "namespace": "default", + "name": "simple_test" + } + } + } + } +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/hive/events_3.5.1/processing_engine_test.json b/producer/spark_dataproc/scenarios/hive/events_3.5.1/processing_engine_test.json new file mode 100644 index 00000000..dfa7e929 --- /dev/null +++ b/producer/spark_dataproc/scenarios/hive/events_3.5.1/processing_engine_test.json @@ -0,0 +1,16 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "simple_test.execute_insert_into_hive_table.warehouse_t2" + }, + "run": { + "facets": { + "processing_engine": { + "version": "{{ any(result) }}", + "name": "spark", + "openlineageAdapterVersion": "{{ any(result) }}" + } + } + } +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/hive/events_3.5.1/run_event_test.json b/producer/spark_dataproc/scenarios/hive/events_3.5.1/run_event_test.json new file mode 100644 index 00000000..d2778ae4 --- /dev/null +++ b/producer/spark_dataproc/scenarios/hive/events_3.5.1/run_event_test.json @@ -0,0 +1,28 @@ +{ + "eventType": "COMPLETE", + "run": { + "runId": "{{ is_uuid(result) }}", + "facets": {} + }, + "job": { + "namespace": "default", + "name": "simple_test.execute_insert_into_hive_table.warehouse_t2", + "facets": {} + }, + "inputs": [ + { + "namespace": "{{ any(result) }}", + "name": "/user/hive/warehouse/t1", + "facets": {}, + "inputFacets": {} + } + ], + "outputs": [ + { + "namespace": "{{ any(result) }}", + "name": "/user/hive/warehouse/t2", + "facets": {}, + "outputFacets": {} + } + ] +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/hive/events_3.5.1/schema_test.json b/producer/spark_dataproc/scenarios/hive/events_3.5.1/schema_test.json new file mode 100644 index 00000000..3809d0e8 --- /dev/null +++ b/producer/spark_dataproc/scenarios/hive/events_3.5.1/schema_test.json @@ -0,0 +1,47 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "simple_test.execute_insert_into_hive_table.warehouse_t2" + }, + "inputs": [ + { + "namespace": "{{ any(result) }}", + "name": "/user/hive/warehouse/t1", + "facets": { + "schema": { + "fields": [ + { + "name": "a", + "type": "integer" + }, + { + "name": "b", + "type": "string" + } + ] + } + } + } + ], + "outputs": [ + { + "namespace": "{{ any(result) }}", + "name": "/user/hive/warehouse/t2", + "facets": { + "schema": { + "fields": [ + { + "name": "a", + "type": "integer" + }, + { + "name": "b", + "type": "string" + } + ] + } + } + } + ] +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/hive/events_3.5.1/spark_properties_test.json b/producer/spark_dataproc/scenarios/hive/events_3.5.1/spark_properties_test.json new file mode 100644 index 00000000..9e54505f --- /dev/null +++ b/producer/spark_dataproc/scenarios/hive/events_3.5.1/spark_properties_test.json @@ -0,0 +1,17 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "simple_test.execute_insert_into_hive_table.warehouse_t2" + }, + "run": { + "facets": { + "spark_properties": { + "properties": { + "spark.master": "local", + "spark.app.name": "simple test" + } + } + } + } +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/hive/events_3.5.1/sql_test.json b/producer/spark_dataproc/scenarios/hive/events_3.5.1/sql_test.json new file mode 100644 index 00000000..f0740633 --- /dev/null +++ b/producer/spark_dataproc/scenarios/hive/events_3.5.1/sql_test.json @@ -0,0 +1,12 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "simple_test.execute_insert_into_hive_table.warehouse_t2", + "facets": { + "sql": { + "query": "CREATE TABLE IF NOT EXISTS t2 AS SELECT * FROM t1;" + } + } + } +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/hive/events_3.5.1/symlinks_test.json b/producer/spark_dataproc/scenarios/hive/events_3.5.1/symlinks_test.json new file mode 100644 index 00000000..cbc5c770 --- /dev/null +++ b/producer/spark_dataproc/scenarios/hive/events_3.5.1/symlinks_test.json @@ -0,0 +1,41 @@ +{ + "eventType": "COMPLETE", + "job": { + "namespace": "default", + "name": "simple_test.execute_insert_into_hive_table.warehouse_t2" + }, + "inputs": [ + { + "namespace": "{{ any(result) }}", + "name": "/user/hive/warehouse/t1", + "facets": { + "symlinks": { + "identifiers": [ + { + "namespace": "{{ any(result) }}", + "name": "default.t1", + "type": "TABLE" + } + ] + } + } + } + ], + "outputs": [ + { + "namespace": "{{ any(result) }}", + "name": "/user/hive/warehouse/t2", + "facets": { + "symlinks": { + "identifiers": [ + { + "namespace": "{{ any(result) }}", + "name": "default.t2", + "type": "TABLE" + } + ] + } + } + } + ] +} \ No newline at end of file diff --git a/producer/spark_dataproc/scenarios/spanner/config.json b/producer/spark_dataproc/scenarios/spanner/config.json index 002f1445..8df6cfb3 100644 --- a/producer/spark_dataproc/scenarios/spanner/config.json +++ b/producer/spark_dataproc/scenarios/spanner/config.json @@ -1,4 +1,12 @@ { + "component_versions": { + "min": "3.5.0", + "max": "3.5.1" + }, + "openlineage_versions": { + "min": "1.0.0", + "max": "5.0.0" + }, "tests": [ { "name": "columnLineage_test", diff --git a/producer/spark_dataproc/versions.json b/producer/spark_dataproc/versions.json new file mode 100644 index 00000000..6e2c3143 --- /dev/null +++ b/producer/spark_dataproc/versions.json @@ -0,0 +1,4 @@ +{ + "openlineage_versions": ["1.30.0"], + "component_version": ["3.5.1", "3.3.2", "3.1.3"] +} \ No newline at end of file diff --git a/scripts/generate_compatibility_tables.py b/scripts/generate_compatibility_tables.py index 04de329e..bd70777f 100644 --- a/scripts/generate_compatibility_tables.py +++ b/scripts/generate_compatibility_tables.py @@ -1,140 +1,253 @@ import argparse - -from report import Report import json - +from pathlib import Path +from report import Report from py_markdown_table.markdown_table import markdown_table -def get_version_status(min_version, max_version): - if min_version is None and max_version is None: - return '+' - elif min_version is not None and max_version is None: - return f'above {min_version}' - elif min_version is None and max_version is not None: - return f'below {max_version}' - else: - return f'above {min_version}, below {max_version}' +def get_arguments(): + parser = argparse.ArgumentParser(description="Generate Markdown reports from lineage summary.") + parser.add_argument('--report', type=str, required=True, help="Path to JSON report file") + parser.add_argument('--target', type=str, required=True, help="Output directory for markdown files") + args = parser.parse_args() + return Path(args.report), Path(args.target) + +def get_sorted_facets(summary): + f = [f for v in summary.component_versions.values() for vv in v.values() for f in vv.get('facets', [])] + facets = set(f) + desired_order = ["run_event", "jobType", "parent", "dataSource", "processing_engine", "sql", "symlinks", + "schema", "columnLineage", "gcp_dataproc_spark", "gcp_lineage", "spark_properties"] + sorted_in_order = sorted([item for item in facets if item in desired_order], key=desired_order.index) + sorted_out_of_order = sorted(item for item in facets if item not in desired_order) + return sorted_in_order + sorted_out_of_order -def generate_facets_table(data): - facets = get_sorted_facets(data) +def sanitize(name): + return name.replace(" ", "_").replace("/", "_").replace(".", "_") - table_data = [] - # Populate rows for each top-level key - for key, value in data.items(): - row = {'Name': key} - for facet in facets: - if 'facets' in value and facet in value['facets']: - facet_data = value['facets'][facet] - status = get_version_status(facet_data['min_version'], facet_data['max_version']) - row[facet] = status - else: - row[facet] = '-' - table_data.append(row) +def normalize_label(name: str) -> str: + return " ".join(word.capitalize() for word in name.replace("_", " ").split()) - table = markdown_table(table_data) - table.set_params(row_sep="markdown", quote=False) - return table.get_markdown() +def write_category_json(path: Path, label: str, position: int): + path.mkdir(parents=True, exist_ok=True) + content = { + "label": label, + "position": position + } + with open(path / "_category_.json", "w") as f: + json.dump(content, f, indent=2) -def get_sorted_facets(data): - facets = set() - for key, value in data.items(): - if 'facets' in value: - facets.update(value['facets'].keys()) - desired_order = ["run_event", "jobType", "parent", "dataSource", "processing_engine", "sql", "symlinks", "schema", - "columnLineage", "gcp_dataproc_spark", "gcp_lineage", "spark_properties"] - sorted_in_order = sorted( - [item for item in facets if item in desired_order], - key=lambda x: desired_order.index(x) - ) - sorted_out_of_order = sorted(item for item in facets if item not in desired_order) - final_sorted_list = sorted_in_order + sorted_out_of_order - return final_sorted_list - - -def generate_lineage_table(data): - producers = {} - - for key, value in data.items(): - if 'lineage_levels' in value: - table_data = [] - for datasource, levels in value['lineage_levels'].items(): - row = {'Datasource': datasource} - for level in ['dataset', 'column', 'transformation']: - if level in levels: - level_data = levels[level] - status = get_version_status(level_data['min_version'], level_data['max_version']) - row[level.capitalize()] = status - else: - row[level.capitalize()] = '-' - table_data.append(row) - table = markdown_table(table_data) - table.set_params(row_sep="markdown", quote=False) - producers[key] = table.get_markdown() - return producers - - -def generate_producers_table(data): - # Prepare table header and rows - consumers = {} - for key, value in data.items(): - if 'producers' in value: - table_data = [] - for producer, producer_data in value['producers'].items(): - status = get_version_status(producer_data['min_version'], producer_data['max_version']) - table_data.append({'Producer': producer, 'Version': status}) - table = markdown_table(table_data) - table.set_params(row_sep="markdown", quote=False) - consumers[key] = table.get_markdown() - return consumers +def write_markdown(path: Path, filename: str, label: str, content_parts, position: int = 1): + path.mkdir(parents=True, exist_ok=True) + full_file = path / f"{filename}.md" + with open(full_file, "w") as f: + f.write(f"---\nsidebar_position: {position}\ntitle: {label}\n---\n\n") + for section_title, content in content_parts: + f.write(f"## {section_title}\n{content}\n\n") + print(f"Written: {full_file}") -def get_arguments(): - parser = argparse.ArgumentParser(description="") - parser.add_argument('--report', type=str, help="path to report file") - parser.add_argument('--target', type=str, help="path to target file") - args = parser.parse_args() +def fill_facet_table(d, facets): + table_data = [] + for key in sorted(d.keys(), key=lambda x: tuple(map(int, x.split(".")))): + value = d[key] + row = {'openlineage version': key} + for facet in facets: + row[facet] = '+' if 'facets' in value and facet in value['facets'] else '-' + table_data.append(row) + table = markdown_table(table_data) + table.set_params(row_sep="markdown", quote=False) + return table.get_markdown() - report_path = args.report - target_path = args.target - return report_path, target_path +def fill_inputs_table(d): + table_data = [] + seen = set() + for value in d.values(): + for producer in value.get('inputs', []): + if producer: + normalized = normalize_label(producer) + if normalized not in seen: + table_data.append({'Producer': normalized, 'Status': '+'}) + seen.add(normalized) + table_data = sorted(table_data, key=lambda x: x['Producer']) + if table_data: + table = markdown_table(table_data) + table.set_params(row_sep="markdown", quote=False) + return table.get_markdown() + return "" + + + +def fill_lineage_level_table(d): + table_data = [] + for key in sorted(d.keys(), key=lambda x: tuple(map(int, x.split(".")))): + value = d[key] + for datasource, levels in value.get('lineage_levels', {}).items(): + row = {'Datasource': datasource} + for level in ['dataset', 'column', 'transformation']: + row[level.capitalize()] = '+' if level in levels else '-' + table_data.append(row) + if table_data: + table = markdown_table(table_data) + table.set_params(row_sep="markdown", quote=False) + return table.get_markdown() + return "" + + +def process_components(summaries, output_dir, is_producer, offset): + for idx, summary in enumerate(summaries): + label = normalize_label(summary.name) + component_id = sanitize(summary.name) + + is_unversioned = list(summary.component_versions.keys()) == [""] + + if is_unversioned: + # Flat structure for unversioned component + ol_versions = summary.component_versions[""] + facets_data = {} + inputs_data = {} + + for ol_version, data in ol_versions.items(): + if 'facets' in data: + facets_data[ol_version] = data + if 'inputs' in data: + inputs_data[ol_version] = data + + content_parts = [] + if facets_data: + sorted_facets = get_sorted_facets(summary) + content_parts.append(("Facets", fill_facet_table(facets_data, sorted_facets))) + if inputs_data: + content_parts.append(("Producer Inputs", fill_inputs_table(inputs_data))) + + if content_parts: + write_markdown( + path=output_dir, + filename=component_id, + label=label, + content_parts=content_parts, + position=idx+offset + ) + else: + # Nested structure with _category_.json for versioned components + base_path = output_dir / component_id + write_category_json(base_path, label, idx + offset) + comp_idx = 1 + for component_version in sorted(summary.component_versions.keys(), key=lambda x: tuple(map(int, x.split(".")))): + ol_versions = summary.component_versions[component_version] + + facets = {} + inputs = {} + lineage = {} + + for ol_version, data in ol_versions.items(): + if 'facets' in data: + facets[ol_version] = data + if not is_producer and 'inputs' in data: + inputs[ol_version] = data + if is_producer and 'lineage_levels' in data: + lineage[ol_version] = data + + content_parts = [] + if facets: + sorted_facets = get_sorted_facets(summary) + content_parts.append(("Facets", fill_facet_table(facets, sorted_facets))) + if not is_producer and inputs: + content_parts.append(("Producer Inputs", fill_inputs_table(inputs))) + if is_producer and lineage: + content_parts.append(("Lineage Levels", fill_lineage_level_table(lineage))) + + if content_parts: + write_markdown( + path=base_path, + filename=component_version, + label=component_version, + content_parts=content_parts, + position=comp_idx + ) + comp_idx += 1 + return offset + len(summaries) + + + +def generate_facets_by_openlineage_version_table(summaries): + data_by_version = {} + for summary in summaries: + component_base_name = normalize_label(str(summary.name) or "") + for comp_version, ol_versions in summary.component_versions.items(): + for ol_version, values in ol_versions.items(): + if 'facets' not in values: + continue + full_component_name = f"{component_base_name} ({comp_version})" if comp_version else component_base_name + data_by_version.setdefault(ol_version, {}).setdefault(full_component_name, set()).update( + values['facets']) + + all_facets = {facet for components in data_by_version.values() for facets in components.values() for facet in facets} + facet_list = sorted(all_facets) + + result = {} + for ol_version, components in data_by_version.items(): + table_data = [] + for component_name, component_facets in components.items(): + row = {"Component (Version)": component_name} + for facet in facet_list: + row[facet] = '+' if facet in component_facets else '-' + table_data.append(row) + table = markdown_table(table_data) + table.set_params(row_sep="markdown", quote=False) + result[ol_version] = table.get_markdown() + return result + + + +def write_ol_version_summaries(consumer_summary, producer_summary, output_dir): + consumer_by_version = generate_facets_by_openlineage_version_table(consumer_summary) + producer_by_version = generate_facets_by_openlineage_version_table(producer_summary) + + summary_base = output_dir / "openlineage_versions" + + all_versions = sorted(set(consumer_by_version.keys()) | set(producer_by_version.keys())) + + for idx, version in enumerate(all_versions, start=1): + version_path = summary_base / version + write_category_json(version_path, f"OpenLineage {version}", idx) + + if version in consumer_by_version: + write_markdown( + version_path, + filename="consumer_summary", + label=f"Consumer Summary", + content_parts=[("Facets", consumer_by_version[version])], + position=1 + ) + + if version in producer_by_version: + write_markdown( + version_path, + filename="producer_summary", + label=f"Producer Summary", + content_parts=[("Facets", producer_by_version[version])], + position=2 + ) def main(): report_path, target_path = get_arguments() - with open(report_path, 'r') as c: - report = json.load(c) - consumer_report = Report.from_dict([e for e in report if e['component_type'] == 'consumer']) - producer_report = Report.from_dict( - [e for e in report if e['component_type'] == 'producer' and e['name'] != 'scenarios']) - consumer_tag_summary = consumer_report.get_tag_summary() - producer_tag_summary = producer_report.get_tag_summary() - consumer_facets_table = generate_facets_table(consumer_tag_summary) - producers_tables = generate_producers_table(consumer_tag_summary) - producer_facets_table = generate_facets_table(producer_tag_summary) - lineage_level_tables = generate_lineage_table(producer_tag_summary) - - # Output the tables - with open(target_path, "w") as file: - file.write("# Producers\n") - file.write("## Facets Compatibility\n") - file.write(producer_facets_table + "\n\n") - - for k, v in lineage_level_tables.items(): - file.write(f"## Lineage level support for {k}\n") - file.write(v + "\n\n") - - file.write("# Consumers\n") - file.write("## Facets Compatibility\n") - file.write(consumer_facets_table + "\n\n") - for k, v in producers_tables.items(): - file.write(f"## Producers support for {k}\n") - file.write(v + "\n") + target_path.mkdir(parents=True, exist_ok=True) + + with open(report_path, 'r') as f: + report = Report.from_dict(json.load(f)) + + consumer_summary = report.get_consumer_tag_summary() + producer_summary = report.get_producer_tag_summary() + offset = 3 + offset = process_components(consumer_summary, target_path, is_producer=False, offset = offset) + process_components(producer_summary, target_path, is_producer=True, offset = offset) + write_ol_version_summaries(consumer_summary, producer_summary, target_path) if __name__ == "__main__": diff --git a/scripts/get_valid_test_scenarios.sh b/scripts/get_valid_test_scenarios.sh new file mode 100755 index 00000000..f6fdda3f --- /dev/null +++ b/scripts/get_valid_test_scenarios.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +# Ensure arguments are provided +if [[ $# -lt 3 ]]; then + echo "Usage: $0 " + exit 1 +fi + +scenario_path="$1" +component_version="$2" +openlineage_version="$3" + +version_check() { + local min max version + read -r min max < <(jq -r "$1.min // \"0.0.0\" , $1.max // \"999.999.999\"" "$2" | xargs) + IFS='.' read -r v1 v2 v3 <<< "$3"; version=$(( v1 * 1000000 + v2 * 1000 + v3 )) + IFS='.' read -r m1 m2 m3 <<< "$min"; min=$(( m1 * 1000000 + m2 * 1000 + m3 )) + IFS='.' read -r M1 M2 M3 <<< "$max"; max=$(( M1 * 1000000 + M2 * 1000 + M3 )) + [[ $version -ge $min && $version -le $max ]] +} + +valid_scenarios="" +for dir in "$scenario_path"/*; do + config_file="${dir}/config.json" + [[ ! -f "$config_file" ]] && echo "NO CONFIG FILE FOR SCENARIO ${dir##*/}" && exit 1 + + if version_check ".component_versions" "$config_file" "$component_version" && + version_check ".openlineage_versions" "$config_file" "$openlineage_version"; then + valid_scenarios+=";${dir##*/}" + fi +done + +echo "${valid_scenarios/#;/}" diff --git a/scripts/report.py b/scripts/report.py index a77b5b03..549116da 100644 --- a/scripts/report.py +++ b/scripts/report.py @@ -5,12 +5,33 @@ class Report: def __init__(self, components): self.components = components + def __str__(self): + return str(self.to_dict()) + @classmethod def from_dict(cls, d): - return cls({s['name']: Component.from_dict(s) for s in d}) + return cls( + {f"{c['name']}-{c['component_version']}-{c['openlineage_version']}": Component.from_dict(c) for c in d}) + + @classmethod + def single_component_report(cls, component): + return cls({component.name: component}) - def get_tag_summary(self): - return {k: v.get_tag_summary() for k, v in self.components.items()} + def get_producer_tag_summary(self): + + summaries = [v.get_producer_tag_summary() for k, v in self.components.items() if + v.component_type == 'producer' and v.name != 'scenarios'] + groupped_summaries = {} + for s in summaries: + groupped_summaries.setdefault(s.name, ProducerSummary(s.name)).update(s) + return [e for e in groupped_summaries.values()] + + def get_consumer_tag_summary(self): + summaries = [v.get_consumer_tag_summary() for k, v in self.components.items() if v.component_type == 'consumer'] + groupped_summaries = {} + for s in summaries: + groupped_summaries.setdefault(s.name, ConsumerSummary(s.name)).update(s) + return [e for e in groupped_summaries.values()] def get_new_failures(self, old): oc = old.components if old is not None and old.components is not None else {} @@ -33,43 +54,42 @@ def to_dict(self): class Component: - def __init__(self, name, component_type, scenarios): + def __init__(self, name, component_type, scenarios, component_version, openlineage_version): self.name = name self.component_type = component_type self.scenarios = scenarios + self.component_version = component_version + self.openlineage_version = openlineage_version + + def __str__(self): + return str(self.to_dict()) @classmethod def from_dict(cls, d): - return cls(d['name'], d['component_type'], {s['name']: Scenario.from_dict(s) for s in d['scenarios']}) + return cls(d['name'], d['component_type'], + {s['name']: Scenario.from_dict(s) for s in d['scenarios']}, d['component_version'], + d['openlineage_version']) - def get_tag_summary(self): - facets = {} - inputs = {} + def get_producer_tag_summary(self): + summary = ProducerSummary(name=self.name) for n, s in self.scenarios.items(): - ss = s.get_tag_summary(self.component_type) - for f, ver in ss['facets'].items(): - s.update_facet_versions(f, facets, ver['max_version'], ver['min_version']) - if self.component_type == 'producer': - for datasource, lineage_levels in ss['lineage_levels'].items(): - for ll, ver in lineage_levels.items(): - if inputs.get(datasource) is None: - inputs[datasource] = {} - s.update_facet_versions(ll, inputs[datasource], ver['max_version'], ver['min_version']) - else: - for i, ver in ss['producers'].items(): - s.update_facet_versions(i, inputs, ver['max_version'], ver['min_version']) - output = {'facets': facets} - if self.component_type == 'producer': - output["lineage_levels"] = inputs - else: - output["producers"] = inputs - return output + ss = s.get_producer_tag_summary(self.component_version, self.openlineage_version) + summary.update(ss) + return summary + + def get_consumer_tag_summary(self): + summary = ConsumerSummary(self.name) + for n, s in self.scenarios.items(): + ss = s.get_consumer_tag_summary(self.component_version) + summary.update(ss) + return summary def get_new_failures(self, old): os = old.scenarios if old is not None and old.scenarios is not None else {} nfs = {k: nfs for k, v in self.scenarios.items() if (nfs := v.get_new_failures(os.get(k))) is not None} - return Component(self.name, self.component_type, nfs) if any(nfs) else None + return Component(self.name, self.component_type, nfs, self.component_version, self.openlineage_version) if any( + nfs) else None def update(self, new): for k, v in new.scenarios.items(): @@ -79,11 +99,16 @@ def update(self, new): self.scenarios[k] = v def to_dict(self): - return {'name': self.name, 'component_type': self.component_type, + return {'name': self.name, 'component_type': self.component_type, 'component_version': self.component_version, + 'openlineage_version': self.openlineage_version, 'scenarios': [c.to_dict() for c in self.scenarios.values()]} class Scenario: + + def __str__(self): + return str(self.to_dict()) + def __init__(self, name, status, tests): self.name = name self.status = status @@ -91,41 +116,35 @@ def __init__(self, name, status, tests): @classmethod def simplified(cls, name, tests): - return cls(name, 'SUCCESS' if not any(t for n, t in tests.items() if t.status == 'FAILURE') else 'FAILURE', - tests) + tests_ = tests if isinstance(tests, dict) else {t.name: t for t in tests} + return cls(name, 'SUCCESS' if not any(t for n, t in tests_.items() if t.status == 'FAILURE') else 'FAILURE', + tests_) @classmethod def from_dict(cls, d): return cls(d['name'], d['status'], {t['name']: Test.from_dict(t) for t in d['tests']}) - def get_tag_summary(self, component_type): - facets = {} - inputs = {} + def get_producer_tag_summary(self, component_version, openlineage_version): + summary = ProducerSummary() for name, test in self.tests.items(): if test.status == 'SUCCESS' and len(test.tags) > 0: tags = test.tags - min_ver = tags.get('min_version') - max_ver = tags.get('max_version') - for f in tags['facets']: - self.update_facet_versions(f, facets, max_ver, min_ver) - - if component_type == 'producer': - lineage_levels = tags['lineage_level'].items() if 'lineage_level' in tags else {} - for datasource, lineage_levels in lineage_levels: - for ll in lineage_levels: - if inputs.get(datasource) is None: - inputs[datasource] = {} - self.update_facet_versions(ll, inputs[datasource], max_ver, min_ver) - if component_type == 'consumer': - # if inputs.get(tags['producer']) is None: - # inputs[tags['producer']] = {} - self.update_facet_versions(tags['producer'], inputs, max_ver, min_ver) - output = {'facets': facets} - if component_type == 'producer': - output["lineage_levels"] = inputs - else: - output["producers"] = inputs - return output + summary.add(tags.get('facets', []), tags.get('lineage_level', {}), component_version, openlineage_version) + return summary + + def get_consumer_tag_summary(self, component_version): + summary = ConsumerSummary() + # assume the OL version is consistent across tests in single scenario + openlineage_version = next((test.tags['openlineage_version'] for name, test in self.tests.items() + if test.tags.get('openlineage_version') is not None), None) + if openlineage_version is None: + raise Exception(f"version empty for consumer scenario {self.name}") + for name, test in self.tests.items(): + if test.status == 'SUCCESS' and len(test.tags) > 0: + tags = test.tags + summary.add(tags.get('facets', []), tags.get('input'), component_version, openlineage_version) + + return summary def update_facet_versions(self, f, entity, max_ver, min_ver): if entity.get(f) is None: @@ -162,6 +181,9 @@ def __init__(self, name, status, validation_type, entity_type, details, tags): self.details = details self.tags = tags + def __str__(self): + return str(self.to_dict()) + @classmethod def simplified(cls, name, validation_type, entity_type, details, tags): return cls(name, 'SUCCESS' if len(details) == 0 else 'FAILURE', validation_type, entity_type, details, tags) @@ -186,3 +208,60 @@ def update(self, new): def to_dict(self): return {"name": self.name, "status": self.status, "validation_type": self.validation_type, "entity_type": self.entity_type, "details": self.details, "tags": self.tags} + + +class ConsumerSummary: + def __init__(self, name=None): + self.name = name + self.component_versions = {} + + def update(self, other): + for comp_version, ol_versions in other.component_versions.items(): + for ol_version, data in ol_versions.items(): + self_cv = self.component_versions.setdefault(comp_version, {}) + self_olv = self_cv.setdefault(ol_version, { + 'facets': set(), + 'inputs': set() + }) + self_olv['facets'].update(data.get('facets', set())) + self_olv['inputs'].update(data.get('inputs', set())) + + def add(self, facet, input, component_version, openlineage_version): + component_versions = self.component_versions.setdefault(component_version, {}) + openlineage_versions = component_versions.setdefault(openlineage_version, { + 'facets': set(), + 'inputs': set() + }) + + openlineage_versions['facets'].update(facet) + openlineage_versions['inputs'].add(input) + + +class ProducerSummary: + def __init__(self, name=None): + self.name = name + self.component_versions = {} + + def add(self, facet, lineage_level, component_version, openlineage_version): + component_versions = self.component_versions.setdefault(component_version, {}) + openlineage_versions = component_versions.setdefault(openlineage_version, { + 'facets': set(), + 'lineage_levels': {} + }) + + openlineage_versions['facets'].update(facet) + + for comp, levels in lineage_level.items(): + openlineage_versions['lineage_levels'].setdefault(comp, set()).update(levels) + + def update(self, other): + for comp_version, ol_versions in other.component_versions.items(): + for ol_version, data in ol_versions.items(): + self_cv = self.component_versions.setdefault(comp_version, {}) + self_olv = self_cv.setdefault(ol_version, { + 'facets': set(), + 'lineage_levels': {} + }) + self_olv['facets'].update(data.get('facets', set())) + for comp, levels in data.get('lineage_levels', {}).items(): + self_olv['lineage_levels'].setdefault(comp, set()).update(levels) diff --git a/scripts/requirements.txt b/scripts/requirements.txt index 87cf4da2..440d54cb 100644 --- a/scripts/requirements.txt +++ b/scripts/requirements.txt @@ -38,4 +38,5 @@ rsa==4.9 six==1.16.0 soupsieve==2.5 uritemplate==4.1.1 -urllib3==2.2.2 \ No newline at end of file +urllib3==2.2.2 +json-with-comments==1.2.10 \ No newline at end of file diff --git a/scripts/select_components.sh b/scripts/select_components.sh new file mode 100644 index 00000000..507f9887 --- /dev/null +++ b/scripts/select_components.sh @@ -0,0 +1,17 @@ +# assuming the version will not exceed 1000 this is the quickest way to get comparable values +version_sum() { + IFS='.' read -r var1 var2 var3 <<< "$1" + echo $(( var1 * 1000000 + var2 * 1000 + var3)) +} + +current_ol=$(cat generated-files/releases.json | jq -c '.[] | select(.name | contains("openlineage")) | .latest_version ' -r) +latest_ol=$(curl https://api.github.com/repos/OpenLineage/OpenLineage/releases/latest -s | jq .tag_name -r) + +if (( $(version_sum $latest_ol) > $(version_sum $current_ol) )); then + echo "ol_release=${latest_ol}" >> $GITHUB_OUTPUT + echo "releases_updated=true" >> $GITHUB_OUTPUT + jq --arg latest_ol "$latest_ol" 'map(if .name == "openlineage" then .latest_version = $latest_ol else . end)' \ + generated-files/releases.json > generated-files/updated-releases.json +else + echo "ol_release=${current_ol}" >> $GITHUB_OUTPUT +fi \ No newline at end of file diff --git a/scripts/update_website_files.sh b/scripts/update_website_files.sh new file mode 100644 index 00000000..5a8eb26c --- /dev/null +++ b/scripts/update_website_files.sh @@ -0,0 +1,191 @@ +#!/bin/bash + +set -e + +# Usage: ./deploy_docs.sh [output_dir] [--dry-run] +OUT_DIR="${1:-./out}" +DOCS_TARGET="docs/integrations/openlineage_compatibility" +VERSIONS_ROOT="versioned_docs" +DRY_RUN=false + +# Detect dry-run +if [[ "$2" == "--dry-run" ]] || [[ "$1" == "--dry-run" ]]; then + DRY_RUN=true + echo "Running in DRY RUN mode — no files will be written." +fi + +log() { + echo "$@" +} + +copy_file() { + local src="$1" + local dest="$2" + if $DRY_RUN; then + log "Would copy: $src → $dest" + else + mkdir -p "$(dirname "$dest")" + cp "$src" "$dest" + log "Copied: $src → $dest" + fi +} + +write_stub() { + local dest="$1" + local title="$2" + local position="$3" + if $DRY_RUN; then + log "Would create stub: $dest" + else + mkdir -p "$(dirname "$dest")" + cat > "$dest" < "$target_dir/_category_.json" < "$dest_file" + echo -e "\nimport Transports from '@site/docs/integrations/openlineage_compatibility/$comp/$version.md';\n\n\n" >> "$dest_file" + log "Proxy created for $comp/$version" + fi + done + done +done + + +log "" +log "Success." \ No newline at end of file diff --git a/scripts/validate_ol_events.py b/scripts/validate_ol_events.py index 61ca693d..565511eb 100644 --- a/scripts/validate_ol_events.py +++ b/scripts/validate_ol_events.py @@ -1,5 +1,7 @@ import argparse -import json +import traceback + +import jsonc import logging import os import re @@ -11,52 +13,72 @@ from report import Test, Scenario, Component, Report from compare_releases import release_between from compare_events import diff - +from jsonschema import RefResolver class OLSyntaxValidator: def __init__(self, schema_validators): self.schema_validators = schema_validators @staticmethod - def is_custom_facet(facet): + def is_custom_facet(facet, schema_type): if facet.get('_schemaURL') is not None: - return any(facet.get('_schemaURL').__contains__(f'defs/{facet_type}Facet') for facet_type in - ['Run', 'Job', 'Dataset', 'InputDataset', 'OutputDataset']) + is_custom = any(facet.get('_schemaURL').__contains__(f'defs/{facet_type}Facet') for facet_type in + ['Run', 'Job', 'Dataset', 'InputDataset', 'OutputDataset']) + if is_custom: + print(f"facet {schema_type} seems to be custom facet, validation skipped") + return is_custom + return False + + @classmethod - def load_schemas(cls, paths): - file_paths = [p for path in paths for file in listdir(path) if isfile((p := join(path, file)))] + def get_validators(cls, spec_path, tags): + return {tag: cls.get_validator(spec_path, tag) for tag in tags} - facet_schemas = [load_json(path) for path in file_paths if path.__contains__('Facet.json')] - spec_schema = next(load_json(path) for path in file_paths if path.__contains__('OpenLineage.json')) + @classmethod + def get_validator(cls, spec_path, tag): + file_paths = listdir(join(spec_path, tag)) + facet_schemas = [load_json(join(spec_path, tag, path)) for path in file_paths if path.__contains__('Facet.json')] + spec_schema = next(load_json(join(spec_path, tag, path)) for path in file_paths if path.__contains__('OpenLineage.json')) + schema_validators = {} + for schema in facet_schemas: + name = next(iter(schema['properties'])) + store = { + spec_schema['$id']: spec_schema, + schema['$id']: schema, + } + resolver = RefResolver(base_uri="", referrer=spec_schema, store=store) + schema_validators[name] = Draft202012Validator(schema, resolver=resolver) - schema_validators = {next(iter(schema['properties'])): Draft202012Validator(schema) for schema in facet_schemas} schema_validators['core'] = Draft202012Validator(spec_schema) return cls(schema_validators) - def validate_entity(self, instance, schema_type): - schema_validator = self.schema_validators.get(schema_type) - if schema_validator is not None: - errors = [error for error in schema_validator.iter_errors(instance)] - if len(errors) == 0: + def validate_entity(self, instance, schema_type, name): + try: + schema_validator = self.schema_validators.get(schema_type) + if schema_validator is not None: + errors = [error for error in schema_validator.iter_errors(instance)] + if len(errors) == 0: + return [] + else: + return [f"{(e := best_match([error], by_relevance())).json_path}: {e.message}" for error in errors] + elif self.is_custom_facet(instance.get(schema_type), schema_type): + # facet type may be custom facet without available schema json file (defined only as class) return [] else: - return [f"{(e := best_match([error], by_relevance())).json_path}: {e.message}" for error in errors] - elif self.is_custom_facet(instance.get(schema_type)): - # facet type may be custom facet without available schema json file (defined only as class) - return [] - else: - return [f"$.{schema_type} facet type {schema_type} not recognized"] - - def validate(self, event): + return [f"$.{schema_type} facet type {schema_type} not recognized"] + except Exception: + print(f"when validating {schema_type}, for instance of {name} following exception ocurred \n {traceback.format_exc()}") + + def validate(self, event, name): validation_result = [] - run_validation = self.validate_entity(event, 'core') - run = self.validate_entity_map(event, 'run') - job = self.validate_entity_map(event, 'job') - inputs = self.validate_entity_array(event, 'inputs', 'facets') - input_ifs = self.validate_entity_array(event, 'inputs', 'inputFacets') - outputs = self.validate_entity_array(event, 'outputs', 'facets') - output_ofs = self.validate_entity_array(event, 'outputs', 'outputFacets') + run_validation = self.validate_entity(event, 'core', name) + run = self.validate_entity_map(event, 'run', name) + job = self.validate_entity_map(event, 'job', name) + inputs = self.validate_entity_array(event, 'inputs', 'facets', name) + input_ifs = self.validate_entity_array(event, 'inputs', 'inputFacets', name) + outputs = self.validate_entity_array(event, 'outputs', 'facets', name) + output_ofs = self.validate_entity_array(event, 'outputs', 'outputFacets', name) validation_result.extend(run_validation) validation_result.extend(run) @@ -68,15 +90,15 @@ def validate(self, event): return validation_result - def validate_entity_array(self, data, entity, generic_facet_type): + def validate_entity_array(self, data, entity, generic_facet_type, name): return [e.replace('$', f'$.{entity}[{ind}]') for ind, i in enumerate(data[entity]) for k, v in (i.get(generic_facet_type).items() if generic_facet_type in i else {}.items()) - for e in self.validate_entity({k: v}, k)] + for e in self.validate_entity({k: v}, k, name)] - def validate_entity_map(self, data, entity): + def validate_entity_map(self, data, entity, name): return [e.replace('$', f'$.{entity}') for k, v in data[entity]['facets'].items() for e in - self.validate_entity({k: v}, k)] + self.validate_entity({k: v}, k, name)] class OLSemanticValidator: @@ -127,7 +149,7 @@ def fields_match(r, e) -> bool: def load_json(path): with open(path) as f: - return json.load(f) + return jsonc.load(f) def extract_pattern(identifier, patterns): @@ -150,12 +172,10 @@ def all_tests_succeeded(syntax_tests): return not any(t.status == "FAILURE" for t in syntax_tests.values()) -def get_expected_events(producer_dir, component, scenario_name, config, release): - if component == 'scenarios': - return None +def get_expected_events(producer_dir, component, scenario_name, config, component_version, openlineage_version): test_events = [] for test in config['tests']: - if release_between(release, test['tags'].get('min_version'), test['tags'].get('max_version')): + if check_versions(component_version, openlineage_version, test): filepath = join(producer_dir, component, 'scenarios', scenario_name, test['path']) body = load_json(filepath) test_events.append((test['name'], body, test['tags'])) @@ -167,7 +187,7 @@ def validate_scenario_syntax(result_events, validator, config): for name, event in result_events.items(): identification = get_event_identifier(event, name, config.get('patterns')) print(f"syntax validation for {identification}") - details = validator.validate(event) + details = validator.validate(event, name) syntax_tests[identification] = Test(identification, "FAILURE" if len(details) > 0 else "SUCCESS", 'syntax', 'openlineage', details, {}) return syntax_tests @@ -179,16 +199,17 @@ def get_config(producer_dir, component, scenario_name): else: path = join(producer_dir, component, 'scenarios', scenario_name, 'config.json') with open(path) as f: - return json.load(f) + return jsonc.load(f) def get_arguments(): parser = argparse.ArgumentParser(description="") parser.add_argument('--event_base_dir', type=str, help="directory containing the reports") - parser.add_argument('--spec_dirs', type=str, help="comma separated list of directories containing spec and facets") + parser.add_argument('--spec_base_dir', type=str, help="directory containing specs and facets") parser.add_argument('--producer_dir', type=str, help="directory storing producers") parser.add_argument('--component', type=str, help="component producing the validated events") - parser.add_argument('--release', type=str, help="OpenLineage release used in generating events") + parser.add_argument('--component_version', type=str, help="component release used in generating events") + parser.add_argument('--openlineage_version', type=str, help="Comma separated list of Openlineage versions") parser.add_argument('--target', type=str, help="target file") args = parser.parse_args() @@ -197,39 +218,53 @@ def get_arguments(): producer_dir = args.producer_dir target = args.target component = args.component - release = args.release - spec_dirs = args.spec_dirs.split(',') + component_version = args.component_version + openlineage_version = args.openlineage_version + spec_base_dir = args.spec_base_dir - return event_base_dir, producer_dir, target, spec_dirs, component, release + return event_base_dir, producer_dir, target, spec_base_dir, component, component_version, openlineage_version + + +def check_versions(component_version, openlineage_version, config): + component_versions = config.get("component_versions", {}) + openlineage_versions = config.get("openlineage_versions", {}) + + return (release_between(component_version, component_versions.get("min"), component_versions.get("max")) and + release_between(openlineage_version, openlineage_versions.get("min"), openlineage_versions.get("max"))) def main(): - base_dir, producer_dir, target, spec_dirs, component, release = get_arguments() - validator = OLSyntaxValidator.load_schemas(paths=spec_dirs) + base_dir, producer_dir, target, spec_dirs, component, component_version, openlineage_version = get_arguments() scenarios = {} - for scenario_name in listdir(base_dir): - scenario_path = get_path(base_dir, component, scenario_name) - if isdir(scenario_path): - config = get_config(producer_dir, component, scenario_name) - if component == 'scenarios': - if release_between(release, config['tags'].get('min_version'), config['tags'].get('max_version')): - result_events = {file: load_json(path) for file in listdir(scenario_path) if - isfile(path := join(scenario_path, file))} - tests = validate_scenario_syntax(result_events, validator, config) - scenarios[scenario_name] = Scenario.simplified(scenario_name, tests) - else: - expected = get_expected_events(producer_dir, component, scenario_name, config, release) + if component == 'scenarios': + validators = OLSyntaxValidator.get_validators(spec_path=spec_dirs, tags=openlineage_version.split(',')) + for scenario_name in listdir(base_dir): + scenario_path = get_path(base_dir, component, scenario_name) + if isdir(scenario_path): + config = get_config(producer_dir, component, scenario_name) + validator = validators.get(config.get('openlineage_version')) + print(f"for scenario {scenario_name} validation version is {config.get('openlineage_version')}") result_events = {file: load_json(path) for file in listdir(scenario_path) if isfile(path := join(scenario_path, file))} tests = validate_scenario_syntax(result_events, validator, config) - - if all_tests_succeeded(tests) and expected is not None: - for name, res in OLSemanticValidator(expected).validate(result_events).items(): - tests[name] = res scenarios[scenario_name] = Scenario.simplified(scenario_name, tests) - report = Report({component: Component(component, 'producer', scenarios)}) + report = Report({component: Component(component, 'producer', scenarios, "", "")}) + else: + validator = OLSyntaxValidator.get_validator(spec_path=spec_dirs, tag=openlineage_version) + for scenario_name in listdir(base_dir): + config = get_config(producer_dir, component, scenario_name) + scenario_path = get_path(base_dir, component, scenario_name) + expected = get_expected_events(producer_dir, component, scenario_name, config, component_version, openlineage_version) + result_events = {file: load_json(path) for file in listdir(scenario_path) if + isfile(path := join(scenario_path, file))} + tests = validate_scenario_syntax(result_events, validator, config) + if all_tests_succeeded(tests) and expected is not None: + for name, res in OLSemanticValidator(expected).validate(result_events).items(): + tests[name] = res + scenarios[scenario_name] = Scenario.simplified(scenario_name, tests) + report = Report({component: Component(component, 'producer', scenarios, component_version, openlineage_version)}) with open(target, 'w') as f: - json.dump(report.to_dict(), f, indent=2) + jsonc.dump(report.to_dict(), f, indent=2) def get_path(base_dir, component, scenario_name):