diff --git a/.github/CHANGELOG.md b/.github/CHANGELOG.md new file mode 100644 index 00000000..e69de29b diff --git a/.github/dependabot.yml b/.github/dependabot.yml index f7628b26..5108f1ea 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -4,10 +4,10 @@ updates: - package-ecosystem: 'github-actions' directory: '/' schedule: - interval: 'weekly' + interval: 'monthly' # Maintain dependencies for npm - package-ecosystem: 'pip' directory: '/megalist_dataflow' schedule: - interval: 'weekly' + interval: 'monthly' diff --git a/.github/workflows/codacy-analysis.yml b/.github/workflows/codacy-analysis.yml index 7e62010d..4eaa4f4e 100644 --- a/.github/workflows/codacy-analysis.yml +++ b/.github/workflows/codacy-analysis.yml @@ -6,7 +6,7 @@ # For more information on Codacy Analysis CLI in general, see # https://github.com/codacy/codacy-analysis-cli. -name: Codacy +name: Codacy Analysis on: ['push'] diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index 554bc341..ddda561d 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -1,17 +1,17 @@ # This workflow will install Python dependencies, run tests and lint with a single version of Python # For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions -name: Python testing +name: Python on: push: branches: [ develop ] pull_request: - branches: [ main ] + branches: [ main, master ] jobs: unit_testing: - + name: Test runs-on: ubuntu-latest steps: @@ -27,3 +27,6 @@ jobs: - name: Run tests run: | ./run_tests.sh + - name: Upload coverage to Codacy + run: export CODACY_PROJECT_TOKEN=${{ secrets.CODACY_PROJECT_TOKEN }} && bash <(curl -Ls https://coverage.codacy.com/get.sh) report -r megalist_dataflow/* + continue-on-error: true diff --git a/.github/workflows/semantic-release.yml b/.github/workflows/semantic-release.yml index cbb0134e..7c0c9254 100644 --- a/.github/workflows/semantic-release.yml +++ b/.github/workflows/semantic-release.yml @@ -2,8 +2,7 @@ name: Semantic Release on: push: - branches: - - main + branches: [ main, master ] jobs: release: diff --git a/.github/workflows/terraform.yml b/.github/workflows/terraform.yml index 662f6652..2d364259 100644 --- a/.github/workflows/terraform.yml +++ b/.github/workflows/terraform.yml @@ -4,7 +4,7 @@ on: ['push'] jobs: terraform-actions: - name: Workflow + name: tf validate runs-on: ubuntu-latest defaults: run: @@ -14,7 +14,7 @@ jobs: uses: actions/checkout@master - name: HashiCorp - Setup Terraform - uses: hashicorp/setup-terraform@v1.2.1 + uses: hashicorp/setup-terraform@v1.3.2 with: terraform_version: 0.14.6 diff --git a/README.md b/README.md index 8844568f..ee7a30a4 100644 --- a/README.md +++ b/README.md @@ -125,8 +125,11 @@ python3 mds_dataflow/main.py \ ``` ### Deploying Pipeline -To deploy, use the following command: -`./deploy_cloud.sh project_id bucket_name region_name` +To deploy, use the following commands from the root folder: +``` +cd terraform +./scripts/deploy_cloud.sh project_id bucket_name region_name +``` #### Manually executing pipeline using Dataflow UI To execute the pipeline, use the following steps: diff --git a/megalist_dataflow/main.py b/megalist_dataflow/main.py index 7458a2ca..f0ff49e3 100644 --- a/megalist_dataflow/main.py +++ b/megalist_dataflow/main.py @@ -21,6 +21,7 @@ from mappers.ads_ssd_hashing_mapper import AdsSSDHashingMapper from mappers.ads_user_list_pii_hashing_mapper import AdsUserListPIIHashingMapper from sources.spreadsheet_execution_source import SpreadsheetExecutionSource +from sources.firestore_execution_source import FirestoreExecutionSource from sources.batches_from_executions import BatchesFromExecutions from uploaders.appsflyer.appsflyer_s2s_uploader_async import AppsFlyerS2SUploaderDoFn from uploaders.campaign_manager.campaign_manager_conversion_uploader import CampaignManagerConversionUploaderDoFn @@ -185,11 +186,18 @@ def run(argv=None): dataflow_options.access_token, dataflow_options.refresh_token) - sheets_config = SheetsConfig(oauth_credentials) + if dataflow_options.setup_sheet_id.is_accessible(): + sheets_config = SheetsConfig(oauth_credentials) with beam.Pipeline(options=pipeline_options) as pipeline: - executions = (pipeline | 'Load executions' >> beam.io.Read( - SpreadsheetExecutionSource(sheets_config, dataflow_options.setup_sheet_id))) + if dataflow_options.setup_sheet_id.is_accessible(): + executions = (pipeline | 'Load executions' >> beam.io.Read( + SpreadsheetExecutionSource(sheets_config, dataflow_options.setup_sheet_id))) + elif dataflow_options.setup_firestore_collection.is_accessible(): + executions = (pipeline | 'Load executions' >> beam.io.Read( + FirestoreExecutionSource(dataflow_options.setup_firestore_collection))) + else: + raise Exception('No valid parameter source (setup_sheet_id/setup_firestore_collection) included in the arguments') executions | GoogleAdsSSDStep( oauth_credentials, dataflow_options, AdsSSDHashingMapper()) diff --git a/megalist_dataflow/models/options.py b/megalist_dataflow/models/options.py index 8f0d3365..816afa7c 100644 --- a/megalist_dataflow/models/options.py +++ b/megalist_dataflow/models/options.py @@ -31,6 +31,8 @@ def _add_argparse_args(cls, parser): # Set up parser.add_value_provider_argument( '--setup_sheet_id', help='Id of Spreadsheet with execution info') + parser.add_value_provider_argument( + '--setup_firestore_collection', help='Name of Firestore collection with execution info') parser.add_value_provider_argument( '--bq_ops_dataset', help='Auxliary bigquery dataset used for Megalista operations') diff --git a/megalist_dataflow/requirements.txt b/megalist_dataflow/requirements.txt index 3c563904..df7b5eec 100644 --- a/megalist_dataflow/requirements.txt +++ b/megalist_dataflow/requirements.txt @@ -9,13 +9,13 @@ apache-beam==2.28.0 google-cloud-datastore==1.13.1 google-apitools==0.5.31 pytest==5.4.3 -pytest-cov==2.10.0 +pytest-cov==2.11.1 pytest-mock==3.2.0 requests-mock==1.8.0 -pytz==2020.1 +pytz==2021.1 wheel==0.34.2 -pyarrow==0.17.1 -aiohttp==3.6.2 +pyarrow==4.0.0 +aiohttp==3.7.4 bloom-filter==1.3 -six==1.13.0 +six==1.15.0 mypy==0.790 \ No newline at end of file diff --git a/megalist_dataflow/setup.py b/megalist_dataflow/setup.py index e8c1c59b..6725f203 100644 --- a/megalist_dataflow/setup.py +++ b/megalist_dataflow/setup.py @@ -23,6 +23,6 @@ url='https://github.com/DP6/marketing-data-sync', install_requires=['googleads==24.1.0', 'google-api-python-client==1.10.0', 'google-cloud-core==1.3.0', 'google-cloud-bigquery==1.26.0', - 'google-cloud-datastore==1.13.1', 'aiohttp==3.6.2'], + 'google-cloud-datastore==1.13.1', 'aiohttp==3.7.4'], packages=setuptools.find_packages(), ) diff --git a/megalist_dataflow/sources/firestore_execution_source.py b/megalist_dataflow/sources/firestore_execution_source.py new file mode 100644 index 00000000..1e17e1e3 --- /dev/null +++ b/megalist_dataflow/sources/firestore_execution_source.py @@ -0,0 +1,127 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import distutils.util +import logging + +from apache_beam.options.value_provider import ValueProvider + +from google.cloud import firestore +from sources.base_bounded_source import BaseBoundedSource +from models.execution import Destination, DestinationType +from models.execution import Execution, AccountConfig +from models.execution import Source, SourceType + + +class FirestoreExecutionSource(BaseBoundedSource): + """ + Read Execution data from a Firestore collection. The collection name is set-up in the parameter "setup_firestore_collection" + """ + + def __init__( + self, + setup_firestore_collection: ValueProvider + ): + super().__init__() + self._setup_firestore_collection = setup_firestore_collection + + def _do_count(self): + # TODO: implement count + return 3 + + def read(self, range_tracker): + def document_to_dict(doc): + if not doc.exists: + return None + doc_dict = doc.to_dict() + doc_dict['id'] = doc.id + return doc_dict + + firestore_collection = self._setup_firestore_collection.get() + logging.getLogger("megalista.FirestoreExecutionSource").info(f"Loading Firestore collection {firestore_collection}...") + db = firestore.Client() + entries = db.collection(self._setup_firestore_collection.get()).where('active', '==', 'yes').stream() + entries = [document_to_dict(doc) for doc in entries] + + account_data = document_to_dict(db.collection(self._setup_firestore_collection.get()).document('account_config').get()) + + if not account_data: + raise Exception('Firestore collection is absent') + google_ads_id = account_data.get('google_ads_id', 'empty') + mcc_trix = account_data.get('mcc_trix', 'FALSE') + mcc = False if mcc_trix is None else bool(distutils.util.strtobool(mcc_trix)) + app_id = account_data.get('app_id', 'empty') + google_analytics_account_id = account_data.get('google_analytics_account_id', 'empty') + campaign_manager_account_id = account_data.get('campaign_manager_account_id', 'empty') + + account_config = AccountConfig(google_ads_id, mcc, google_analytics_account_id, campaign_manager_account_id, app_id) + logging.getLogger("megalista.FirestoreExecutionSource").info(f"Loaded: {account_config}") + + sources = self._read_sources(entries) + destinations = self._read_destination(entries) + if entries: + for entry in entries: + if entry['active'].upper() == 'YES': + logging.getLogger("megalista.FirestoreExecutionSource").info( + f"Executing step Source:{sources[entry['id'] + '_source'].source_name} -> Destination:{destinations[entry['id'] + '_destination'].destination_name}") + yield Execution(account_config, sources[entry['id'] + '_source'], destinations[entry['id'] + '_destination']) + else: + logging.getLogger("megalista.FirestoreExecutionSource").warn("No schedules found!") + + @staticmethod + def _read_sources(entries): + sources = {} + if entries: + for entry in entries: + metadata = [entry['bq_dataset'], entry['bq_table']] #TODO: flexibilize for other source types + source = Source(entry['id'] + '_source', SourceType[entry['source']], metadata) + sources[source.source_name] = source + else: + logging.getLogger("megalista.FirestoreExecutionSource").warn("No sources found!") + return sources + + @staticmethod + def _read_destination(entries): + def create_metadata_list(entry): + metadata_list = { + 'ADS_OFFLINE_CONVERSION': ['gads_conversion_name'], + 'ADS_SSD_UPLOAD': ['gads_conversion_name', 'gads_external_upload_id'], + 'ADS_CUSTOMER_MATCH_CONTACT_INFO_UPLOAD': ['gads_audience_name', 'gads_operation', 'gads_hash'], + 'ADS_CUSTOMER_MATCH_MOBILE_DEVICE_ID_UPLOAD': ['gads_audience_name', 'gads_operation'], + 'ADS_CUSTOMER_MATCH_USER_ID_UPLOAD': ['gads_audience_name', 'gads_operation'], + 'GA_MEASUREMENT_PROTOCOL': ['google_analytics_property_id', 'google_analytics_non_interaction'], + 'CM_OFFLINE_CONVERSION': ['campaign_manager_floodlight_activity_id', 'campaign_manager_floodlight_configuration_id'], + 'APPSFLYER_S2S_EVENTS': ['appsflyer_app_id'], + } + + entry_type = entry['type'] + metadata = metadata_list.get(entry_type, None) + if not metadata: + raise Exception(f'Upload type not implemented: {entry_type}') + entry_metadata = [] + for m in metadata: + if m in entry: + entry_metadata.append(entry[m]) + else: + raise Exception(f'Missing field in Firestore document for {entry_type}: {m}') + return entry_metadata + + + destinations = {} + if entries: + for entry in entries: + destination = Destination(entry['id'] + '_destination', DestinationType[entry['type']], create_metadata_list(entry)) + destinations[destination.destination_name] = destination + else: + logging.getLogger("megalista.FirestoreExecutionSource").warn("No destinations found!") + return destinations diff --git a/megalist_dataflow/uploaders/utils.py b/megalist_dataflow/uploaders/utils.py index c458685a..a2a859a9 100644 --- a/megalist_dataflow/uploaders/utils.py +++ b/megalist_dataflow/uploaders/utils.py @@ -34,6 +34,7 @@ def get_ads_service(service_name, version, oauth_credentials, developer_token, oauth2_client, 'Mds Dataflow', client_customer_id=customer_id) + client.partial_failure = True return client.GetService(service_name, version=version) diff --git a/pyproject.toml b/pyproject.toml index 8f7fa7a6..0f6ff6fe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,8 @@ [tool.semantic_release] upload_to_pypi = false +branch = 'master' version_variable = [ 'megalist_dataflow/setup.py:__version__' ] - +version_source = 'tag' +build_command = false diff --git a/terraform/scripts/deploy_cloud.sh b/terraform/scripts/deploy_cloud.sh index 37578e91..e238c78b 100755 --- a/terraform/scripts/deploy_cloud.sh +++ b/terraform/scripts/deploy_cloud.sh @@ -26,7 +26,7 @@ echo "Configuration GCP project in gcloud" gcloud config set project "$1" echo "Build Dataflow metadata" python3 -m pip install --user -q -r requirements.txt -python3 -m main --runner DataflowRunner --project "$1" --gcp_project_id "$1" --temp_location"gs://$2/tmp/" --region "$3" --setup_file ./setup.py --template_location "gs://$2/templates/megalista" --num_workers 1 --autoscaling_algorithm=NONE +python3 -m main --runner DataflowRunner --project "$1" --gcp_project_id "$1" --temp_location "gs://$2/tmp/" --region "$3" --setup_file ./setup.py --template_location "gs://$2/templates/megalista" --num_workers 1 --autoscaling_algorithm=NONE echo "Copy megalista_medata to bucket $2" gsutil cp megalist_metadata "gs://$2/templates/megalista_metadata" cd ..