Skip to content

Commit 5151d4b

Browse files
committed
feat:🆕 PR #9 from DP6/firestore-source
feat:🆕 Add Firestore source
2 parents 488dc58 + c50a5ac commit 5151d4b

File tree

3 files changed

+140
-3
lines changed

3 files changed

+140
-3
lines changed

megalist_dataflow/main.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from mappers.ads_ssd_hashing_mapper import AdsSSDHashingMapper
2222
from mappers.ads_user_list_pii_hashing_mapper import AdsUserListPIIHashingMapper
2323
from sources.spreadsheet_execution_source import SpreadsheetExecutionSource
24+
from sources.firestore_execution_source import FirestoreExecutionSource
2425
from sources.batches_from_executions import BatchesFromExecutions
2526
from uploaders.appsflyer.appsflyer_s2s_uploader_async import AppsFlyerS2SUploaderDoFn
2627
from uploaders.campaign_manager.campaign_manager_conversion_uploader import CampaignManagerConversionUploaderDoFn
@@ -185,11 +186,18 @@ def run(argv=None):
185186
dataflow_options.access_token,
186187
dataflow_options.refresh_token)
187188

188-
sheets_config = SheetsConfig(oauth_credentials)
189+
if dataflow_options.setup_sheet_id.is_accessible():
190+
sheets_config = SheetsConfig(oauth_credentials)
189191

190192
with beam.Pipeline(options=pipeline_options) as pipeline:
191-
executions = (pipeline | 'Load executions' >> beam.io.Read(
192-
SpreadsheetExecutionSource(sheets_config, dataflow_options.setup_sheet_id)))
193+
if dataflow_options.setup_sheet_id.is_accessible():
194+
executions = (pipeline | 'Load executions' >> beam.io.Read(
195+
SpreadsheetExecutionSource(sheets_config, dataflow_options.setup_sheet_id)))
196+
elif dataflow_options.setup_firestore_collection.is_accessible():
197+
executions = (pipeline | 'Load executions' >> beam.io.Read(
198+
FirestoreExecutionSource(dataflow_options.setup_firestore_collection)))
199+
else:
200+
raise Exception('No valid parameter source (setup_sheet_id/setup_firestore_collection) included in the arguments')
193201

194202
executions | GoogleAdsSSDStep(
195203
oauth_credentials, dataflow_options, AdsSSDHashingMapper())

megalist_dataflow/models/options.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ def _add_argparse_args(cls, parser):
3131
# Set up
3232
parser.add_value_provider_argument(
3333
'--setup_sheet_id', help='Id of Spreadsheet with execution info')
34+
parser.add_value_provider_argument(
35+
'--setup_firestore_collection', help='Name of Firestore collection with execution info')
3436
parser.add_value_provider_argument(
3537
'--bq_ops_dataset',
3638
help='Auxliary bigquery dataset used for Megalista operations')
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
# Copyright 2021 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import distutils.util
15+
import logging
16+
17+
from apache_beam.options.value_provider import ValueProvider
18+
19+
from google.cloud import firestore
20+
from sources.base_bounded_source import BaseBoundedSource
21+
from models.execution import Destination, DestinationType
22+
from models.execution import Execution, AccountConfig
23+
from models.execution import Source, SourceType
24+
25+
26+
class FirestoreExecutionSource(BaseBoundedSource):
27+
"""
28+
Read Execution data from a Firestore collection. The collection name is set-up in the parameter "setup_firestore_collection"
29+
"""
30+
31+
def __init__(
32+
self,
33+
setup_firestore_collection: ValueProvider
34+
):
35+
super().__init__()
36+
self._setup_firestore_collection = setup_firestore_collection
37+
38+
def _do_count(self):
39+
# TODO: implement count
40+
return 3
41+
42+
def read(self, range_tracker):
43+
def document_to_dict(doc):
44+
if not doc.exists:
45+
return None
46+
doc_dict = doc.to_dict()
47+
doc_dict['id'] = doc.id
48+
return doc_dict
49+
50+
firestore_collection = self._setup_firestore_collection.get()
51+
logging.getLogger("megalista.FirestoreExecutionSource").info(f"Loading Firestore collection {firestore_collection}...")
52+
db = firestore.Client()
53+
entries = db.collection(self._setup_firestore_collection.get()).where('active', '==', 'yes').stream()
54+
entries = [document_to_dict(doc) for doc in entries]
55+
56+
account_data = document_to_dict(db.collection(self._setup_firestore_collection.get()).document('account_config').get())
57+
58+
if not account_data:
59+
raise Exception('Firestore collection is absent')
60+
google_ads_id = account_data.get('google_ads_id', 'empty')
61+
mcc_trix = account_data.get('mcc_trix', 'FALSE')
62+
mcc = False if mcc_trix is None else bool(distutils.util.strtobool(mcc_trix))
63+
app_id = account_data.get('app_id', 'empty')
64+
google_analytics_account_id = account_data.get('google_analytics_account_id', 'empty')
65+
campaign_manager_account_id = account_data.get('campaign_manager_account_id', 'empty')
66+
67+
account_config = AccountConfig(google_ads_id, mcc, google_analytics_account_id, campaign_manager_account_id, app_id)
68+
logging.getLogger("megalista.FirestoreExecutionSource").info(f"Loaded: {account_config}")
69+
70+
sources = self._read_sources(entries)
71+
destinations = self._read_destination(entries)
72+
if entries:
73+
for entry in entries:
74+
if entry['active'].upper() == 'YES':
75+
logging.getLogger("megalista.FirestoreExecutionSource").info(
76+
f"Executing step Source:{sources[entry['id'] + '_source'].source_name} -> Destination:{destinations[entry['id'] + '_destination'].destination_name}")
77+
yield Execution(account_config, sources[entry['id'] + '_source'], destinations[entry['id'] + '_destination'])
78+
else:
79+
logging.getLogger("megalista.FirestoreExecutionSource").warn("No schedules found!")
80+
81+
@staticmethod
82+
def _read_sources(entries):
83+
sources = {}
84+
if entries:
85+
for entry in entries:
86+
metadata = [entry['bq_dataset'], entry['bq_table']] #TODO: flexibilize for other source types
87+
source = Source(entry['id'] + '_source', SourceType[entry['source']], metadata)
88+
sources[source.source_name] = source
89+
else:
90+
logging.getLogger("megalista.FirestoreExecutionSource").warn("No sources found!")
91+
return sources
92+
93+
@staticmethod
94+
def _read_destination(entries):
95+
def create_metadata_list(entry):
96+
metadata_list = {
97+
'ADS_OFFLINE_CONVERSION': ['gads_conversion_name'],
98+
'ADS_SSD_UPLOAD': ['gads_conversion_name', 'gads_external_upload_id'],
99+
'ADS_CUSTOMER_MATCH_CONTACT_INFO_UPLOAD': ['gads_audience_name', 'gads_operation', 'gads_hash'],
100+
'ADS_CUSTOMER_MATCH_MOBILE_DEVICE_ID_UPLOAD': ['gads_audience_name', 'gads_operation'],
101+
'ADS_CUSTOMER_MATCH_USER_ID_UPLOAD': ['gads_audience_name', 'gads_operation'],
102+
'GA_MEASUREMENT_PROTOCOL': ['google_analytics_property_id', 'google_analytics_non_interaction'],
103+
'CM_OFFLINE_CONVERSION': ['campaign_manager_floodlight_activity_id', 'campaign_manager_floodlight_configuration_id'],
104+
'APPSFLYER_S2S_EVENTS': ['appsflyer_app_id'],
105+
}
106+
107+
entry_type = entry['type']
108+
metadata = metadata_list.get(entry_type, None)
109+
if not metadata:
110+
raise Exception(f'Upload type not implemented: {entry_type}')
111+
entry_metadata = []
112+
for m in metadata:
113+
if m in entry:
114+
entry_metadata.append(entry[m])
115+
else:
116+
raise Exception(f'Missing field in Firestore document for {entry_type}: {m}')
117+
return entry_metadata
118+
119+
120+
destinations = {}
121+
if entries:
122+
for entry in entries:
123+
destination = Destination(entry['id'] + '_destination', DestinationType[entry['type']], create_metadata_list(entry))
124+
destinations[destination.destination_name] = destination
125+
else:
126+
logging.getLogger("megalista.FirestoreExecutionSource").warn("No destinations found!")
127+
return destinations

0 commit comments

Comments
 (0)