|
| 1 | +# Copyright 2021 Google LLC |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# https://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | +import distutils.util |
| 15 | +import logging |
| 16 | + |
| 17 | +from apache_beam.options.value_provider import ValueProvider |
| 18 | + |
| 19 | +from google.cloud import firestore |
| 20 | +from sources.base_bounded_source import BaseBoundedSource |
| 21 | +from models.execution import Destination, DestinationType |
| 22 | +from models.execution import Execution, AccountConfig |
| 23 | +from models.execution import Source, SourceType |
| 24 | + |
| 25 | + |
| 26 | +class FirestoreExecutionSource(BaseBoundedSource): |
| 27 | + """ |
| 28 | + Read Execution data from a Firestore collection. The collection name is set-up in the parameter "setup_firestore_collection" |
| 29 | + """ |
| 30 | + |
| 31 | + def __init__( |
| 32 | + self, |
| 33 | + setup_firestore_collection: ValueProvider |
| 34 | + ): |
| 35 | + super().__init__() |
| 36 | + self._setup_firestore_collection = setup_firestore_collection |
| 37 | + |
| 38 | + def _do_count(self): |
| 39 | + # TODO: implement count |
| 40 | + return 3 |
| 41 | + |
| 42 | + def read(self, range_tracker): |
| 43 | + def document_to_dict(doc): |
| 44 | + if not doc.exists: |
| 45 | + return None |
| 46 | + doc_dict = doc.to_dict() |
| 47 | + doc_dict['id'] = doc.id |
| 48 | + return doc_dict |
| 49 | + |
| 50 | + firestore_collection = self._setup_firestore_collection.get() |
| 51 | + logging.getLogger("megalista.FirestoreExecutionSource").info(f"Loading Firestore collection {firestore_collection}...") |
| 52 | + db = firestore.Client() |
| 53 | + entries = db.collection(self._setup_firestore_collection.get()).where('active', '==', 'yes').stream() |
| 54 | + entries = [document_to_dict(doc) for doc in entries] |
| 55 | + |
| 56 | + account_data = document_to_dict(db.collection(self._setup_firestore_collection.get()).document('account_config').get()) |
| 57 | + |
| 58 | + if not account_data: |
| 59 | + raise Exception('Firestore collection is absent') |
| 60 | + google_ads_id = account_data.get('google_ads_id', 'empty') |
| 61 | + mcc_trix = account_data.get('mcc_trix', 'FALSE') |
| 62 | + mcc = False if mcc_trix is None else bool(distutils.util.strtobool(mcc_trix)) |
| 63 | + app_id = account_data.get('app_id', 'empty') |
| 64 | + google_analytics_account_id = account_data.get('google_analytics_account_id', 'empty') |
| 65 | + campaign_manager_account_id = account_data.get('campaign_manager_account_id', 'empty') |
| 66 | + |
| 67 | + account_config = AccountConfig(google_ads_id, mcc, google_analytics_account_id, campaign_manager_account_id, app_id) |
| 68 | + logging.getLogger("megalista.FirestoreExecutionSource").info(f"Loaded: {account_config}") |
| 69 | + |
| 70 | + sources = self._read_sources(entries) |
| 71 | + destinations = self._read_destination(entries) |
| 72 | + if entries: |
| 73 | + for entry in entries: |
| 74 | + if entry['active'].upper() == 'YES': |
| 75 | + logging.getLogger("megalista.FirestoreExecutionSource").info( |
| 76 | + f"Executing step Source:{sources[entry['id'] + '_source'].source_name} -> Destination:{destinations[entry['id'] + '_destination'].destination_name}") |
| 77 | + yield Execution(account_config, sources[entry['id'] + '_source'], destinations[entry['id'] + '_destination']) |
| 78 | + else: |
| 79 | + logging.getLogger("megalista.FirestoreExecutionSource").warn("No schedules found!") |
| 80 | + |
| 81 | + @staticmethod |
| 82 | + def _read_sources(entries): |
| 83 | + sources = {} |
| 84 | + if entries: |
| 85 | + for entry in entries: |
| 86 | + metadata = [entry['bq_dataset'], entry['bq_table']] #TODO: flexibilize for other source types |
| 87 | + source = Source(entry['id'] + '_source', SourceType[entry['source']], metadata) |
| 88 | + sources[source.source_name] = source |
| 89 | + else: |
| 90 | + logging.getLogger("megalista.FirestoreExecutionSource").warn("No sources found!") |
| 91 | + return sources |
| 92 | + |
| 93 | + @staticmethod |
| 94 | + def _read_destination(entries): |
| 95 | + def create_metadata_list(entry): |
| 96 | + metadata_list = { |
| 97 | + 'ADS_OFFLINE_CONVERSION': ['gads_conversion_name'], |
| 98 | + 'ADS_SSD_UPLOAD': ['gads_conversion_name', 'gads_external_upload_id'], |
| 99 | + 'ADS_CUSTOMER_MATCH_CONTACT_INFO_UPLOAD': ['gads_audience_name', 'gads_operation', 'gads_hash'], |
| 100 | + 'ADS_CUSTOMER_MATCH_MOBILE_DEVICE_ID_UPLOAD': ['gads_audience_name', 'gads_operation'], |
| 101 | + 'ADS_CUSTOMER_MATCH_USER_ID_UPLOAD': ['gads_audience_name', 'gads_operation'], |
| 102 | + 'GA_MEASUREMENT_PROTOCOL': ['google_analytics_property_id', 'google_analytics_non_interaction'], |
| 103 | + 'CM_OFFLINE_CONVERSION': ['campaign_manager_floodlight_activity_id', 'campaign_manager_floodlight_configuration_id'], |
| 104 | + 'APPSFLYER_S2S_EVENTS': ['appsflyer_app_id'], |
| 105 | + } |
| 106 | + |
| 107 | + entry_type = entry['type'] |
| 108 | + metadata = metadata_list.get(entry_type, None) |
| 109 | + if not metadata: |
| 110 | + raise Exception(f'Upload type not implemented: {entry_type}') |
| 111 | + entry_metadata = [] |
| 112 | + for m in metadata: |
| 113 | + if m in entry: |
| 114 | + entry_metadata.append(entry[m]) |
| 115 | + else: |
| 116 | + raise Exception(f'Missing field in Firestore document for {entry_type}: {m}') |
| 117 | + return entry_metadata |
| 118 | + |
| 119 | + |
| 120 | + destinations = {} |
| 121 | + if entries: |
| 122 | + for entry in entries: |
| 123 | + destination = Destination(entry['id'] + '_destination', DestinationType[entry['type']], create_metadata_list(entry)) |
| 124 | + destinations[destination.destination_name] = destination |
| 125 | + else: |
| 126 | + logging.getLogger("megalista.FirestoreExecutionSource").warn("No destinations found!") |
| 127 | + return destinations |
0 commit comments