Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
378c018
Merge pull request #10 from DP6/main
caiotomazelli Apr 8, 2021
045e76a
Add Firestore source
nivaldoh Apr 27, 2021
0a3d5b9
Fixes documentation
caiotomazelli Apr 30, 2021
d6acb45
Fixing deploy_cloud.sh typo.
caiotomazelli Apr 30, 2021
f115c7e
Merge pull request #13 from google/fixing-new-deploy-location
caiotomazelli Apr 30, 2021
08f89e1
refactor: Reorganize parameters
nivaldoh Apr 30, 2021
373a9f3
refactor: Remove placeholders
nivaldoh Apr 30, 2021
c50a5ac
refactor: Reset indentation
nivaldoh Apr 30, 2021
a78f5b2
docs: update README to MDS
joaquimsn Apr 13, 2021
5cf834b
docs: :lipstick: Added theme dp6.github.io
joaquimsn Apr 13, 2021
8ae461c
ci: :construction_worker: Added Codacy and change docs
joaquimsn Apr 23, 2021
4591f92
Merge branch 'master' of github.com:DP6/megalista
joaquimsn May 2, 2021
4e6653e
ci: :green_heart: Fixing semantic-release config
joaquimsn May 2, 2021
271a410
ci: Fixes gitaction workflows
joaquimsn May 2, 2021
cf12ace
Merge pull request #19 from DP6/release
joaquimsn May 2, 2021
a82a6e7
perf: :arrow_up: Update dependencies
joaquimsn May 2, 2021
a7d1239
chore: fixe pyproject.toml (#20)
joaquimsn May 3, 2021
488dc58
perf: ⚡ Add partial failure support for Google Ads (#10)
joaquimsn May 3, 2021
a1ebf76
Merge pull request #9 from DP6/firestore-source
joaquimsn May 3, 2021
5151d4b
feat:🆕 PR #9 from DP6/firestore-source
joaquimsn May 3, 2021
1058063
Merge branch 'master' of github.com:DP6/megalista
joaquimsn May 3, 2021
e65fd66
Merge pull request #21 from DP6/update-dependencies
joaquimsn May 3, 2021
5583a21
Bump pyarrow from 0.17.1 to 4.0.0 in /megalist_dataflow
dependabot[bot] May 3, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added .github/CHANGELOG.md
Empty file.
4 changes: 2 additions & 2 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ updates:
- package-ecosystem: 'github-actions'
directory: '/'
schedule:
interval: 'weekly'
interval: 'monthly'

# Maintain dependencies for npm
- package-ecosystem: 'pip'
directory: '/megalist_dataflow'
schedule:
interval: 'weekly'
interval: 'monthly'
2 changes: 1 addition & 1 deletion .github/workflows/codacy-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# For more information on Codacy Analysis CLI in general, see
# https://github.com/codacy/codacy-analysis-cli.

name: Codacy
name: Codacy Analysis

on: ['push']

Expand Down
9 changes: 6 additions & 3 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# This workflow will install Python dependencies, run tests and lint with a single version of Python
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions

name: Python testing
name: Python

on:
push:
branches: [ develop ]
pull_request:
branches: [ main ]
branches: [ main, master ]

jobs:
unit_testing:

name: Test
runs-on: ubuntu-latest

steps:
Expand All @@ -27,3 +27,6 @@ jobs:
- name: Run tests
run: |
./run_tests.sh
- name: Upload coverage to Codacy
run: export CODACY_PROJECT_TOKEN=${{ secrets.CODACY_PROJECT_TOKEN }} && bash <(curl -Ls https://coverage.codacy.com/get.sh) report -r megalist_dataflow/*
continue-on-error: true
3 changes: 1 addition & 2 deletions .github/workflows/semantic-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ name: Semantic Release

on:
push:
branches:
- main
branches: [ main, master ]

jobs:
release:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/terraform.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on: ['push']

jobs:
terraform-actions:
name: Workflow
name: tf validate
runs-on: ubuntu-latest
defaults:
run:
Expand All @@ -14,7 +14,7 @@ jobs:
uses: actions/checkout@master

- name: HashiCorp - Setup Terraform
uses: hashicorp/setup-terraform@v1.2.1
uses: hashicorp/setup-terraform@v1.3.2
with:
terraform_version: 0.14.6

Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,11 @@ python3 mds_dataflow/main.py \
```

### Deploying Pipeline
To deploy, use the following command:
`./deploy_cloud.sh project_id bucket_name region_name`
To deploy, use the following commands from the root folder:
```
cd terraform
./scripts/deploy_cloud.sh project_id bucket_name region_name
```

#### Manually executing pipeline using Dataflow UI
To execute the pipeline, use the following steps:
Expand Down
14 changes: 11 additions & 3 deletions megalist_dataflow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from mappers.ads_ssd_hashing_mapper import AdsSSDHashingMapper
from mappers.ads_user_list_pii_hashing_mapper import AdsUserListPIIHashingMapper
from sources.spreadsheet_execution_source import SpreadsheetExecutionSource
from sources.firestore_execution_source import FirestoreExecutionSource
from sources.batches_from_executions import BatchesFromExecutions
from uploaders.appsflyer.appsflyer_s2s_uploader_async import AppsFlyerS2SUploaderDoFn
from uploaders.campaign_manager.campaign_manager_conversion_uploader import CampaignManagerConversionUploaderDoFn
Expand Down Expand Up @@ -185,11 +186,18 @@ def run(argv=None):
dataflow_options.access_token,
dataflow_options.refresh_token)

sheets_config = SheetsConfig(oauth_credentials)
if dataflow_options.setup_sheet_id.is_accessible():
sheets_config = SheetsConfig(oauth_credentials)

with beam.Pipeline(options=pipeline_options) as pipeline:
executions = (pipeline | 'Load executions' >> beam.io.Read(
SpreadsheetExecutionSource(sheets_config, dataflow_options.setup_sheet_id)))
if dataflow_options.setup_sheet_id.is_accessible():
executions = (pipeline | 'Load executions' >> beam.io.Read(
SpreadsheetExecutionSource(sheets_config, dataflow_options.setup_sheet_id)))
elif dataflow_options.setup_firestore_collection.is_accessible():
executions = (pipeline | 'Load executions' >> beam.io.Read(
FirestoreExecutionSource(dataflow_options.setup_firestore_collection)))
else:
raise Exception('No valid parameter source (setup_sheet_id/setup_firestore_collection) included in the arguments')

executions | GoogleAdsSSDStep(
oauth_credentials, dataflow_options, AdsSSDHashingMapper())
Expand Down
2 changes: 2 additions & 0 deletions megalist_dataflow/models/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def _add_argparse_args(cls, parser):
# Set up
parser.add_value_provider_argument(
'--setup_sheet_id', help='Id of Spreadsheet with execution info')
parser.add_value_provider_argument(
'--setup_firestore_collection', help='Name of Firestore collection with execution info')
parser.add_value_provider_argument(
'--bq_ops_dataset',
help='Auxliary bigquery dataset used for Megalista operations')
Expand Down
10 changes: 5 additions & 5 deletions megalist_dataflow/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ apache-beam==2.28.0
google-cloud-datastore==1.13.1
google-apitools==0.5.31
pytest==5.4.3
pytest-cov==2.10.0
pytest-cov==2.11.1
pytest-mock==3.2.0
requests-mock==1.8.0
pytz==2020.1
pytz==2021.1
wheel==0.34.2
pyarrow==0.17.1
aiohttp==3.6.2
pyarrow==4.0.0
aiohttp==3.7.4
bloom-filter==1.3
six==1.13.0
six==1.15.0
mypy==0.790
2 changes: 1 addition & 1 deletion megalist_dataflow/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@
url='https://github.com/DP6/marketing-data-sync',
install_requires=['googleads==24.1.0', 'google-api-python-client==1.10.0',
'google-cloud-core==1.3.0', 'google-cloud-bigquery==1.26.0',
'google-cloud-datastore==1.13.1', 'aiohttp==3.6.2'],
'google-cloud-datastore==1.13.1', 'aiohttp==3.7.4'],
packages=setuptools.find_packages(),
)
127 changes: 127 additions & 0 deletions megalist_dataflow/sources/firestore_execution_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import distutils.util
import logging

from apache_beam.options.value_provider import ValueProvider

from google.cloud import firestore
from sources.base_bounded_source import BaseBoundedSource
from models.execution import Destination, DestinationType
from models.execution import Execution, AccountConfig
from models.execution import Source, SourceType


class FirestoreExecutionSource(BaseBoundedSource):
"""
Read Execution data from a Firestore collection. The collection name is set-up in the parameter "setup_firestore_collection"
"""

def __init__(
self,
setup_firestore_collection: ValueProvider
):
super().__init__()
self._setup_firestore_collection = setup_firestore_collection

def _do_count(self):
# TODO: implement count
return 3

def read(self, range_tracker):
def document_to_dict(doc):
if not doc.exists:
return None
doc_dict = doc.to_dict()
doc_dict['id'] = doc.id
return doc_dict

firestore_collection = self._setup_firestore_collection.get()
logging.getLogger("megalista.FirestoreExecutionSource").info(f"Loading Firestore collection {firestore_collection}...")
db = firestore.Client()
entries = db.collection(self._setup_firestore_collection.get()).where('active', '==', 'yes').stream()
entries = [document_to_dict(doc) for doc in entries]

account_data = document_to_dict(db.collection(self._setup_firestore_collection.get()).document('account_config').get())

if not account_data:
raise Exception('Firestore collection is absent')
google_ads_id = account_data.get('google_ads_id', 'empty')
mcc_trix = account_data.get('mcc_trix', 'FALSE')
mcc = False if mcc_trix is None else bool(distutils.util.strtobool(mcc_trix))
app_id = account_data.get('app_id', 'empty')
google_analytics_account_id = account_data.get('google_analytics_account_id', 'empty')
campaign_manager_account_id = account_data.get('campaign_manager_account_id', 'empty')

account_config = AccountConfig(google_ads_id, mcc, google_analytics_account_id, campaign_manager_account_id, app_id)
logging.getLogger("megalista.FirestoreExecutionSource").info(f"Loaded: {account_config}")

sources = self._read_sources(entries)
destinations = self._read_destination(entries)
if entries:
for entry in entries:
if entry['active'].upper() == 'YES':
logging.getLogger("megalista.FirestoreExecutionSource").info(
f"Executing step Source:{sources[entry['id'] + '_source'].source_name} -> Destination:{destinations[entry['id'] + '_destination'].destination_name}")
yield Execution(account_config, sources[entry['id'] + '_source'], destinations[entry['id'] + '_destination'])
else:
logging.getLogger("megalista.FirestoreExecutionSource").warn("No schedules found!")

@staticmethod
def _read_sources(entries):
sources = {}
if entries:
for entry in entries:
metadata = [entry['bq_dataset'], entry['bq_table']] #TODO: flexibilize for other source types
source = Source(entry['id'] + '_source', SourceType[entry['source']], metadata)
sources[source.source_name] = source
else:
logging.getLogger("megalista.FirestoreExecutionSource").warn("No sources found!")
return sources

@staticmethod
def _read_destination(entries):
def create_metadata_list(entry):
metadata_list = {
'ADS_OFFLINE_CONVERSION': ['gads_conversion_name'],
'ADS_SSD_UPLOAD': ['gads_conversion_name', 'gads_external_upload_id'],
'ADS_CUSTOMER_MATCH_CONTACT_INFO_UPLOAD': ['gads_audience_name', 'gads_operation', 'gads_hash'],
'ADS_CUSTOMER_MATCH_MOBILE_DEVICE_ID_UPLOAD': ['gads_audience_name', 'gads_operation'],
'ADS_CUSTOMER_MATCH_USER_ID_UPLOAD': ['gads_audience_name', 'gads_operation'],
'GA_MEASUREMENT_PROTOCOL': ['google_analytics_property_id', 'google_analytics_non_interaction'],
'CM_OFFLINE_CONVERSION': ['campaign_manager_floodlight_activity_id', 'campaign_manager_floodlight_configuration_id'],
'APPSFLYER_S2S_EVENTS': ['appsflyer_app_id'],
}

entry_type = entry['type']
metadata = metadata_list.get(entry_type, None)
if not metadata:
raise Exception(f'Upload type not implemented: {entry_type}')
entry_metadata = []
for m in metadata:
if m in entry:
entry_metadata.append(entry[m])
else:
raise Exception(f'Missing field in Firestore document for {entry_type}: {m}')
return entry_metadata


destinations = {}
if entries:
for entry in entries:
destination = Destination(entry['id'] + '_destination', DestinationType[entry['type']], create_metadata_list(entry))
destinations[destination.destination_name] = destination
else:
logging.getLogger("megalista.FirestoreExecutionSource").warn("No destinations found!")
return destinations
1 change: 1 addition & 0 deletions megalist_dataflow/uploaders/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def get_ads_service(service_name, version, oauth_credentials, developer_token,
oauth2_client,
'Mds Dataflow',
client_customer_id=customer_id)
client.partial_failure = True
return client.GetService(service_name, version=version)


Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
[tool.semantic_release]
upload_to_pypi = false
branch = 'master'
version_variable = [
'megalist_dataflow/setup.py:__version__'
]

version_source = 'tag'
build_command = false
2 changes: 1 addition & 1 deletion terraform/scripts/deploy_cloud.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ echo "Configuration GCP project in gcloud"
gcloud config set project "$1"
echo "Build Dataflow metadata"
python3 -m pip install --user -q -r requirements.txt
python3 -m main --runner DataflowRunner --project "$1" --gcp_project_id "$1" --temp_location"gs://$2/tmp/" --region "$3" --setup_file ./setup.py --template_location "gs://$2/templates/megalista" --num_workers 1 --autoscaling_algorithm=NONE
python3 -m main --runner DataflowRunner --project "$1" --gcp_project_id "$1" --temp_location "gs://$2/tmp/" --region "$3" --setup_file ./setup.py --template_location "gs://$2/templates/megalista" --num_workers 1 --autoscaling_algorithm=NONE
echo "Copy megalista_medata to bucket $2"
gsutil cp megalist_metadata "gs://$2/templates/megalista_metadata"
cd ..