Skip to content

Commit a190efe

Browse files
committed
Adds Enhanced Conversions Uploader
Change-Id: I816f77b42facc58f26fe362352ca4a3b9ad5baab
1 parent 8427840 commit a190efe

File tree

8 files changed

+185
-11
lines changed

8 files changed

+185
-11
lines changed

megalist_dataflow/main.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from uploaders.google_ads.customer_match.user_id_uploader import GoogleAdsCustomerMatchUserIdUploaderDoFn
3030
from uploaders.google_ads.conversions.google_ads_offline_conversions_uploader import GoogleAdsOfflineUploaderDoFn
3131
from uploaders.google_ads.conversions.google_ads_ssd_uploader import GoogleAdsSSDUploaderDoFn
32+
from uploaders.google_ads.conversions.google_ads_enhanced_conversions_uploader import GoogleAdsEnhancedConversionsUploaderDoFn
3233
from uploaders.google_analytics.google_analytics_data_import_uploader import GoogleAnalyticsDataImportUploaderDoFn
3334
from uploaders.google_analytics.google_analytics_measurement_protocol import GoogleAnalyticsMeasurementProtocolUploaderDoFn
3435
from uploaders.google_analytics.google_analytics_user_list_uploader import GoogleAnalyticsUserListUploaderDoFn
@@ -58,6 +59,14 @@ def __init__(self, oauth_credentials, dataflow_options=None, hasher=None):
5859
def expand(self, executions):
5960
pass
6061

62+
class GoogleAdsEnhancedConversionsStep(MegalistaStep):
63+
def expand(self, executions):
64+
return (
65+
executions
66+
| 'Load Data - Google Ads Enhanced Conversions' >> BatchesFromExecutions(DestinationType.ADS_ENHANCED_CONVERSION)
67+
| 'Hash Users - Google Ads Enhanced Conversions' >> beam.Map(self._hasher.hash_users)
68+
| 'Upload - Google Ads Enhanced Conversions' >> beam.ParDo(GoogleAdsEnhancedConversionsUploaderDoFn(self._oauth_credentials))
69+
)
6170

6271
class GoogleAdsSSDStep(MegalistaStep):
6372
def expand(self, executions):
@@ -181,7 +190,7 @@ def run(argv=None):
181190
oauth_credentials = OAuthCredentials(
182191
dataflow_options.client_id,
183192
dataflow_options.client_secret,
184-
dataflow_options.developer_token,
193+
dataflow_options.access_token,
185194
dataflow_options.refresh_token)
186195

187196
sheets_config = SheetsConfig(oauth_credentials)
@@ -208,6 +217,7 @@ def run(argv=None):
208217
oauth_credentials, dataflow_options)
209218
executions | CampaignManagerConversionStep(oauth_credentials, dataflow_options)
210219
executions | AppsFlyerEventsStep(oauth_credentials, dataflow_options)
220+
executions | GoogleAdsEnhancedConversionsStep(oauth_credentials, dataflow_options, AdsUserListPIIHashingMapper())
211221

212222
# todo: update trix at the end
213223

megalist_dataflow/mappers/ads_user_list_pii_hashing_mapper.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,12 @@ def __init__(self):
3838

3939
def _hash_user(self, user, hasher):
4040

41-
hashed = dict()
41+
hashed = user.copy()
4242

4343
try:
4444
if 'email' in user:
4545
hashed['hashedEmail'] = hasher.hash_field(user['email'])
46+
del hashed['email']
4647
except:
4748
self.logger.error("Error hashing email for user: %s" % user)
4849

@@ -54,25 +55,32 @@ def _hash_user(self, user, hasher):
5455
'countryCode': user['mailing_address_country'],
5556
'zipCode': user['mailing_address_zip']
5657
}
58+
del hashed['mailing_address_first_name']
59+
del hashed['mailing_address_last_name']
60+
del hashed['mailing_address_country']
61+
del hashed['mailing_address_zip']
5762
except:
5863
self.logger.error("Error hashing address for user: %s" % user)
5964

6065
try:
6166
if 'phone' in user:
6267
hashed['hashedPhoneNumber'] = hasher.hash_field(user['phone'])
68+
del hashed['phone']
6369
except:
6470
self.logger.error("Error hashing phone for user: %s" % user)
6571

6672
try:
6773
if 'mobile_device_id' in user:
6874
hashed['mobileId'] = user['mobile_device_id']
75+
del hashed['mobile_device_id']
6976
except:
7077
self.logger.error(
7178
"Error hashing mobile_device_id for user: %s" % user)
7279

7380
try:
7481
if 'user_id' in user:
7582
hashed['userId'] = user['user_id']
83+
del hashed['user_id']
7684
except:
7785
self.logger.error("Error hashing user_id for user: %s" % user)
7886

megalist_dataflow/models/execution.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
from enum import Enum
16-
from typing import Dict, List
16+
from typing import Dict, List, Union
1717

1818
OK_STATUS = 'OK'
1919

@@ -22,14 +22,15 @@ class DestinationType(Enum):
2222
CM_OFFLINE_CONVERSION, \
2323
ADS_OFFLINE_CONVERSION, \
2424
ADS_SSD_UPLOAD, \
25+
ADS_ENHANCED_CONVERSION, \
2526
ADS_CUSTOMER_MATCH_CONTACT_INFO_UPLOAD, \
2627
ADS_CUSTOMER_MATCH_MOBILE_DEVICE_ID_UPLOAD, \
2728
ADS_CUSTOMER_MATCH_USER_ID_UPLOAD, \
2829
GA_USER_LIST_UPLOAD, \
2930
APPSFLYER_S2S_EVENTS, \
3031
GA_MEASUREMENT_PROTOCOL, \
3132
GA_DATA_IMPORT, \
32-
GA_4_MEASUREMENT_PROTOCOL = range(11)
33+
GA_4_MEASUREMENT_PROTOCOL = range(12)
3334

3435
def __eq__(self, other):
3536
if other is None:
@@ -202,7 +203,7 @@ class Batch:
202203
def __init__(
203204
self,
204205
execution: Execution,
205-
elements: List[Dict[str, str]]
206+
elements: List[Dict[str, Union[str, Dict[str, str]]]]
206207
):
207208
self._execution = execution
208209
self._elements = elements
@@ -212,7 +213,7 @@ def execution(self) -> Execution:
212213
return self._execution
213214

214215
@property
215-
def elements(self) -> List[Dict[str, str]]:
216+
def elements(self) -> List[Dict[str, Union[str, Dict[str, str]]]]:
216217
return self._elements
217218

218219
def __str__(self):

megalist_dataflow/uploaders/campaign_manager/campaign_manager_conversion_uploader_test.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,5 @@ def test_error_on_api_call(mocker, uploader, caplog):
147147

148148
uploader._do_process(Batch(execution, [{'gclid': '123'}]), time.time())
149149

150-
print (caplog.records)
151-
152150
assert 'Error(s) inserting conversions:' in caplog.text
153151
assert '[123]: error_returned' in caplog.text
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# Copyright 2021 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
16+
import logging
17+
from typing import Dict, Any, Union, List
18+
from urllib.parse import quote
19+
20+
import apache_beam as beam
21+
import requests
22+
import json
23+
24+
from uploaders import utils
25+
from models.execution import DestinationType, Batch
26+
from models.oauth_credentials import OAuthCredentials
27+
28+
29+
class GoogleAdsEnhancedConversionsUploaderDoFn(beam.DoFn):
30+
def __init__(self, oauth_credentials: OAuthCredentials):
31+
super().__init__()
32+
self._api_url = "https://www.google.com/ads/event/api/v1"
33+
self._ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36"
34+
self._oauth_credentials = oauth_credentials
35+
36+
def _format_query_params(self, payload: Dict[str, Any]) -> str:
37+
return "&".join([key + "=" + quote(str(value)) for key, value in payload.items() if value is not None])
38+
39+
def _get_access_token(self):
40+
payload = {
41+
'client_id': self._oauth_credentials.get_client_id(),
42+
'client_secret': self._oauth_credentials.get_client_secret(),
43+
'refresh_token': self._oauth_credentials.get_refresh_token(),
44+
'grant_type': 'refresh_token'
45+
}
46+
response = requests.post(
47+
url='https://oauth2.googleapis.com/token', data=payload)
48+
return response.json()['access_token']
49+
50+
@utils.safe_process(logger=logging.getLogger("megalista.GoogleAdsEnhancedConversionsUploaderDoFn"))
51+
def process(self, batch: Batch, **kwargs):
52+
execution = batch.execution
53+
rows = batch.elements
54+
headers = {
55+
"Authorization": f"Bearer {self._get_access_token()}"
56+
}
57+
58+
for row in rows:
59+
payload: Dict[str, Any] = {
60+
'pii_data': {},
61+
'user_agent': self._ua
62+
}
63+
if 'addressInfo' in row and isinstance(row['addressInfo'], dict):
64+
address_info: Dict[str, str] = row['addressInfo']
65+
payload['pii_data']['address'] = [{
66+
'hashed_first_name': address_info['hashedFirstName'],
67+
'hashed_last_name': address_info['hashedLastName'],
68+
'country': address_info['countryCode'],
69+
'postcode': address_info['zipCode']
70+
}]
71+
if 'hashedEmail' in row:
72+
payload['pii_data']['hashed_email'] = row['hashedEmail']
73+
if 'hashedPhoneNumber' in row:
74+
payload['pii_data']['hashed_phone_number'] = row['hashedPhoneNumber']
75+
76+
query_params = {
77+
'gclid': row['gclid'],
78+
'conversion_time': row['conversion_time'],
79+
'conversion_tracking_id': execution.destination.destination_metadata[1],
80+
'label': execution.destination.destination_metadata[0],
81+
'oid': row['oid'],
82+
'value': row['value'],
83+
'currency_code': execution.destination.destination_metadata[2]
84+
}
85+
86+
url = f"{self._api_url}?{self._format_query_params(query_params)}"
87+
88+
response = requests.post(url=url, json=payload, headers=headers)
89+
if response.status_code != 200:
90+
logging.getLogger('megalista.GoogleAdsEnhancedConversionsUploaderDoFn').error(
91+
f"Error uploading Enhanced Conversion {response.status_code}: {response.json()}")
92+
93+
yield batch
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
"""Tests for megalist_dataflow.uploaders.google_ads.google_ads_enhanced_conversions_uploader."""
2+
3+
from uploaders.google_ads.conversions import google_ads_enhanced_conversions_uploader
4+
from models.oauth_credentials import OAuthCredentials
5+
from apache_beam.options.value_provider import StaticValueProvider
6+
from models.execution import Batch, Execution, AccountConfig, Source, Destination, SourceType, DestinationType
7+
8+
import requests_mock
9+
import pytest
10+
import logging
11+
12+
13+
@pytest.fixture
14+
def uploader():
15+
credential_id = StaticValueProvider(str, 'id')
16+
secret = StaticValueProvider(str, 'secret')
17+
access = StaticValueProvider(str, 'access')
18+
refresh = StaticValueProvider(str, 'refresh')
19+
credentials = OAuthCredentials(credential_id, secret, access, refresh)
20+
return google_ads_enhanced_conversions_uploader.GoogleAdsEnhancedConversionsUploaderDoFn(credentials)
21+
22+
23+
def test_request_with_all_info(uploader, mocker):
24+
mocker.patch.object(uploader, '_get_access_token')
25+
with requests_mock.Mocker() as m:
26+
m.post(requests_mock.ANY, status_code=200)
27+
account_config = AccountConfig(
28+
"123-456-7890", False, "UA-123", "CM-123", "com.app")
29+
source = Source('ecs', SourceType.BIG_QUERY, ['megalista', 'ecs'])
30+
destination = Destination('ecd', DestinationType.ADS_ENHANCED_CONVERSION, [
31+
'label', 'id', 'BRL'])
32+
execution = Execution(account_config, source, destination)
33+
34+
element = {
35+
'gclid': 'Cj0KCQjwj7v0BRDOARIsAGh37ipEIqtU82p7-VKMsS1Gu-jVAtfAAr0-hsWzLv4kiZ9EpLpZpJWhUaArEnEALw_wcB',
36+
'conversion_time': '1615230348456000',
37+
'oid': '1',
38+
'value': '15000000',
39+
'hashedEmail': 'b83c49e9841ea158035dc4ac2587c512c26e5a3dbc636d68389133b5e8a6c3ca',
40+
'addressInfo': {
41+
'hashedFirstName': '318b22d6258b32b559d9abf7b60a8db37937f68e5c5994abb5ba3cb9610405a4',
42+
'hashedLastName': 'bd442dfc1a89a054a557a516ccec91b1430f13dc9157252e0bc683f75d82cd81',
43+
'countryCode': 'Brasil',
44+
'zipCode': '04304032'
45+
}
46+
}
47+
48+
next(uploader.process(Batch(execution, [element])))
49+
50+
assert m.call_count == 1
51+
assert m.last_request.query == "gclid=cj0kcqjwj7v0brdoarisagh37ipeiqtu82p7-vkmss1gu-jvatfaar0-hswzlv4kiz9eplpzpjwhuaarenealw_wcb&conversion_time=1615230348456000&conversion_tracking_id=id&label=label&oid=1&value=15000000&currency_code=brl"
52+
assert m.last_request.json() == {
53+
'pii_data': {
54+
'address': [
55+
{
56+
'hashed_first_name': '318b22d6258b32b559d9abf7b60a8db37937f68e5c5994abb5ba3cb9610405a4',
57+
'hashed_last_name': 'bd442dfc1a89a054a557a516ccec91b1430f13dc9157252e0bc683f75d82cd81',
58+
'country': 'Brasil', 'postcode': '04304032'
59+
}
60+
],
61+
'hashed_email': 'b83c49e9841ea158035dc4ac2587c512c26e5a3dbc636d68389133b5e8a6c3ca'
62+
},
63+
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'
64+
}

megalist_dataflow/uploaders/google_analytics/google_analytics_data_import_uploader.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from googleapiclient.http import MediaInMemoryUpload
2323

2424
from uploaders import utils
25-
from models.execution import DestinationType, Batch
25+
from models.execution import DestinationType, Batch, Union
2626

2727

2828
class GoogleAnalyticsDataImportUploaderDoFn(beam.DoFn):
@@ -85,7 +85,7 @@ def process(self, batch: Batch, **kwargs):
8585
ga_account_id, batch.elements)
8686

8787
def _do_upload_data(self, web_property_id, data_import_name, ga_account_id,
88-
rows: List[Dict[str, str]]):
88+
rows: List[Dict[str, Union[str, Dict[str, str]]]]):
8989
analytics = self._get_analytics_service()
9090
data_sources = analytics.management().customDataSources().list(
9191
accountId=ga_account_id,

megalist_dataflow/uploaders/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def inner(*args, **kwargs):
5555
return
5656
logger.info(f'Uploading {len(batch.elements)} rows...')
5757
try:
58-
return func(*args, *kwargs)
58+
return func(*args, **kwargs)
5959
except Exception as e:
6060
logger.error(f'Error uploading data for :{batch.elements}')
6161
logger.error(e, exc_info=True)

0 commit comments

Comments
 (0)