From 9b2afae3126c0c1a33a9d67d973e1880a20ffb53 Mon Sep 17 00:00:00 2001 From: Christian Smorra Date: Mon, 1 Apr 2019 15:16:09 +0200 Subject: [PATCH 1/9] add option to specify a dimension filter --- hooks/google_analytics_hook.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hooks/google_analytics_hook.py b/hooks/google_analytics_hook.py index 99f4154..65f2d3a 100644 --- a/hooks/google_analytics_hook.py +++ b/hooks/google_analytics_hook.py @@ -104,7 +104,8 @@ def get_analytics_report(self, dimensions, metrics, page_size, - include_empty_rows): + include_empty_rows, + dimension_filter_clauses=None): analytics = self.get_service_object(name='reporting') @@ -115,7 +116,8 @@ def get_analytics_report(self, 'dimensions': dimensions, 'metrics': metrics, 'pageSize': page_size or 1000, - 'includeEmptyRows': include_empty_rows or False + 'includeEmptyRows': include_empty_rows or False, + 'dimension_filter_clauses': dimension_filter_clauses } response = (analytics From a05c0236096c647a5f3dc7a673cdf7da0c6e9ba0 Mon Sep 17 00:00:00 2001 From: Christian Smorra Date: Mon, 1 Apr 2019 15:17:10 +0200 Subject: [PATCH 2/9] add GoogleAnalyticsReportingToPostgresOperator --- ...nalytics_reporting_to_postgres_operator.py | 171 ++++++++++++++++++ 1 file changed, 171 insertions(+) create mode 100644 operators/google_analytics_reporting_to_postgres_operator.py diff --git a/operators/google_analytics_reporting_to_postgres_operator.py b/operators/google_analytics_reporting_to_postgres_operator.py new file mode 100644 index 0000000..71788ec --- /dev/null +++ b/operators/google_analytics_reporting_to_postgres_operator.py @@ -0,0 +1,171 @@ +from datetime import datetime + +from airflow.hooks.postgres_hook import PostgresHook +from airflow.models import BaseOperator +import pandas as pd + +from google_analytics_plugin.hooks.google_analytics_hook import GoogleAnalyticsHook + + +class GoogleAnalyticsReportingToPostgresOperator(BaseOperator): + """ + Google Analytics Reporting To S3 Operator + + :param google_analytics_conn_id: The Google Analytics connection id. + :type google_analytics_conn_id: string + :param view_id: The view id for associated report. + :type view_id: string/array + :param since: The date up from which to pull GA data. + This can either be a string in the format + of '%Y-%m-%d %H:%M:%S' or '%Y-%m-%d' + but in either case it will be + passed to GA as '%Y-%m-%d'. + :type since: string + :param until: The date up to which to pull GA data. + This can either be a string in the format + of '%Y-%m-%d %H:%M:%S' or '%Y-%m-%d' + but in either case it will be + passed to GA as '%Y-%m-%d'. + :type until: string + + + :type postgres_conn_id: string + :param google_analytics_conn_id The Postgres connection id + + :type destination_table: string + :param destination_table: Table to be created in the database + + :type destination_table_dtypes: dict + :param destination_table_dtypes Dictionary containing column/sqlalchemy type mapping + + :type if_exists: string + :param if_exists What to do if the table exists. Options: fail,replace,append. + See pandas documetation for to_sql for more + + :type destination_schema string + :param destination_schema Database schema where to create the table + """ + + template_fields = ('since', + 'until') + + def __init__(self, + google_analytics_conn_id, + view_id, + since, + until, + dimensions, + metrics, + postgres_conn_id, + destination_table, + destination_schema=None, + if_exists='fail', + destination_table_dtypes=None, + page_size=1000, + include_empty_rows=True, + sampling_level=None, + dimension_filter_clauses=None, + *args, + **kwargs): + super().__init__(*args, **kwargs) + + self.google_analytics_conn_id = google_analytics_conn_id + self.view_id = view_id + self.since = since + self.until = until + self.sampling_level = sampling_level + self.dimensions = dimensions + self.metrics = metrics + self.page_size = page_size + self.include_empty_rows = include_empty_rows + self.postgres_conn_id = postgres_conn_id + self.destination_schema = destination_schema + self.destination_table = destination_table + self.destination_table_dtypes = destination_table_dtypes + self.if_exists = if_exists + self.dimension_filter_clauses = dimension_filter_clauses + + self.metricMap = { + 'METRIC_TYPE_UNSPECIFIED': 'varchar(255)', + 'CURRENCY': 'decimal(20,5)', + 'INTEGER': 'int(11)', + 'FLOAT': 'decimal(20,5)', + 'PERCENT': 'decimal(20,5)', + 'TIME': 'time' + } + + if self.page_size > 10000: + raise Exception('Please specify a page size equal to or lower than 10000.') + + if not isinstance(self.include_empty_rows, bool): + raise Exception('Please specificy "include_empty_rows" as a boolean.') + + def execute(self, context): + ga_conn = GoogleAnalyticsHook(self.google_analytics_conn_id, key_file='ga_key.json') + try: + since_formatted = datetime.strptime(self.since, '%Y-%m-%d %H:%M:%S').strftime( + '%Y-%m-%d') + except: + since_formatted = str(self.since) + try: + until_formatted = datetime.strptime(self.until, '%Y-%m-%d %H:%M:%S').strftime( + '%Y-%m-%d') + except: + until_formatted = str(self.until) + report = ga_conn.get_analytics_report(self.view_id, + since_formatted, + until_formatted, + self.sampling_level, + self.dimensions, + self.metrics, + self.page_size, + self.include_empty_rows, + self.dimension_filter_clauses + ) + + columnHeader = report.get('columnHeader', {}) + # Right now all dimensions are hardcoded to varchar(255), will need a map if any non-varchar dimensions are used in the future + # Unfortunately the API does not send back types for Dimensions like it does for Metrics (yet..) + dimensionHeaders = [ + {'name': header.replace('ga:', ''), 'type': 'varchar(255)'} + for header + in columnHeader.get('dimensions', []) + ] + metricHeaders = [ + {'name': entry.get('name').replace('ga:', ''), + 'type': self.metricMap.get(entry.get('type'), 'varchar(255)')} + for entry + in columnHeader.get('metricHeader', {}).get('metricHeaderEntries', []) + ] + + rows = report.get('data', {}).get('rows', []) + all_data = [] + for row_counter, row in enumerate(rows): + root_data_obj = {} + dimensions = row.get('dimensions', []) + metrics = row.get('metrics', []) + + for index, dimension in enumerate(dimensions): + header = dimensionHeaders[index].get('name').lower() + root_data_obj[header] = dimension + for metric in metrics: + data = {} + data.update(root_data_obj) + + for index, value in enumerate(metric.get('values', [])): + header = metricHeaders[index].get('name').lower() + data[header] = value + + data['viewid'] = self.view_id + data['timestamp'] = self.since + + all_data.append(data) + + df_google_data = pd.DataFrame(all_data) + postgres_hook = PostgresHook(self.postgres_conn_id) + df_google_data.to_sql(name=self.destination_table, + con=postgres_hook.get_sqlalchemy_engine(), + dtype=self.destination_table_dtypes, + if_exists=self.if_exists, + schema=self.destination_schema + ) From b088f50fcc33fb0ba47929aa8043862269fca572 Mon Sep 17 00:00:00 2001 From: Christian Smorra Date: Mon, 1 Apr 2019 15:17:49 +0200 Subject: [PATCH 3/9] add new operator to plugin --- __init__.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/__init__.py b/__init__.py index 75c617e..1791ce9 100644 --- a/__init__.py +++ b/__init__.py @@ -2,13 +2,14 @@ from google_analytics_plugin.hooks.google_analytics_hook import GoogleAnalyticsHook from google_analytics_plugin.operators.google_analytics_reporting_to_s3_operator import GoogleAnalyticsReportingToS3Operator from google_analytics_plugin.operators.google_analytics_account_summaries_to_s3_operator import GoogleAnalyticsAccountSummariesToS3Operator - +from google_analytics_plugin.operators.google_analytics_reporting_to_postgres_operator import GoogleAnalyticsReportingToPostgresOperator class GoogleAnalyticsPlugin(AirflowPlugin): name = "google_analytics_plugin" hooks = [GoogleAnalyticsHook] operators = [GoogleAnalyticsReportingToS3Operator, - GoogleAnalyticsAccountSummariesToS3Operator] + GoogleAnalyticsAccountSummariesToS3Operator, + GoogleAnalyticsReportingToPostgresOperator] executors = [] macros = [] admin_views = [] From 0b7f17a0cff6e32fa3f012098d5dfc414cda0049 Mon Sep 17 00:00:00 2001 From: Christian Smorra Date: Mon, 1 Apr 2019 15:21:51 +0200 Subject: [PATCH 4/9] fix key name --- hooks/google_analytics_hook.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hooks/google_analytics_hook.py b/hooks/google_analytics_hook.py index 65f2d3a..57973ce 100644 --- a/hooks/google_analytics_hook.py +++ b/hooks/google_analytics_hook.py @@ -117,7 +117,7 @@ def get_analytics_report(self, 'metrics': metrics, 'pageSize': page_size or 1000, 'includeEmptyRows': include_empty_rows or False, - 'dimension_filter_clauses': dimension_filter_clauses + 'dimensionFilterClauses': dimension_filter_clauses } response = (analytics From 6852535cb8ad8ae6c4bd8de5d3a303192e7ceddc Mon Sep 17 00:00:00 2001 From: Christian Smorra Date: Mon, 1 Apr 2019 15:25:37 +0200 Subject: [PATCH 5/9] dont hardcode key file name --- operators/google_analytics_reporting_to_postgres_operator.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/operators/google_analytics_reporting_to_postgres_operator.py b/operators/google_analytics_reporting_to_postgres_operator.py index 71788ec..793b2f5 100644 --- a/operators/google_analytics_reporting_to_postgres_operator.py +++ b/operators/google_analytics_reporting_to_postgres_operator.py @@ -65,6 +65,7 @@ def __init__(self, include_empty_rows=True, sampling_level=None, dimension_filter_clauses=None, + key_file=None, *args, **kwargs): super().__init__(*args, **kwargs) @@ -84,6 +85,7 @@ def __init__(self, self.destination_table_dtypes = destination_table_dtypes self.if_exists = if_exists self.dimension_filter_clauses = dimension_filter_clauses + self.key_file=key_file self.metricMap = { 'METRIC_TYPE_UNSPECIFIED': 'varchar(255)', @@ -101,7 +103,7 @@ def __init__(self, raise Exception('Please specificy "include_empty_rows" as a boolean.') def execute(self, context): - ga_conn = GoogleAnalyticsHook(self.google_analytics_conn_id, key_file='ga_key.json') + ga_conn = GoogleAnalyticsHook(self.google_analytics_conn_id, key_file=self.key_file) try: since_formatted = datetime.strptime(self.since, '%Y-%m-%d %H:%M:%S').strftime( '%Y-%m-%d') From be66b635cd49cef002e17b4b918ca5c5b7446954 Mon Sep 17 00:00:00 2001 From: Christian Smorra Date: Fri, 26 Apr 2019 11:45:27 +0200 Subject: [PATCH 6/9] template destination_table --- operators/google_analytics_reporting_to_postgres_operator.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/operators/google_analytics_reporting_to_postgres_operator.py b/operators/google_analytics_reporting_to_postgres_operator.py index 793b2f5..98c5a21 100644 --- a/operators/google_analytics_reporting_to_postgres_operator.py +++ b/operators/google_analytics_reporting_to_postgres_operator.py @@ -47,7 +47,8 @@ class GoogleAnalyticsReportingToPostgresOperator(BaseOperator): """ template_fields = ('since', - 'until') + 'until', + 'destination_table') def __init__(self, google_analytics_conn_id, @@ -122,7 +123,7 @@ def execute(self, context): self.metrics, self.page_size, self.include_empty_rows, - self.dimension_filter_clauses + dimension_filter_clauses=self.dimension_filter_clauses ) columnHeader = report.get('columnHeader', {}) From 904a09a08aaee59355349df153c218f7e110cc0f Mon Sep 17 00:00:00 2001 From: Christian Smorra Date: Wed, 28 Aug 2019 17:09:56 +0200 Subject: [PATCH 7/9] add log --- operators/google_analytics_reporting_to_postgres_operator.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/operators/google_analytics_reporting_to_postgres_operator.py b/operators/google_analytics_reporting_to_postgres_operator.py index 98c5a21..426366d 100644 --- a/operators/google_analytics_reporting_to_postgres_operator.py +++ b/operators/google_analytics_reporting_to_postgres_operator.py @@ -141,6 +141,8 @@ def execute(self, context): in columnHeader.get('metricHeader', {}).get('metricHeaderEntries', []) ] + print('samples read', report.get('data', {}).get('samplesReadCounts', 0)) + rows = report.get('data', {}).get('rows', []) all_data = [] for row_counter, row in enumerate(rows): From 03dcc7da5fa949b3c480111aed296e031a775963 Mon Sep 17 00:00:00 2001 From: Christian Smorra Date: Wed, 28 Aug 2019 17:30:25 +0200 Subject: [PATCH 8/9] add more debug info --- operators/google_analytics_reporting_to_postgres_operator.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/operators/google_analytics_reporting_to_postgres_operator.py b/operators/google_analytics_reporting_to_postgres_operator.py index 426366d..fa29664 100644 --- a/operators/google_analytics_reporting_to_postgres_operator.py +++ b/operators/google_analytics_reporting_to_postgres_operator.py @@ -142,6 +142,8 @@ def execute(self, context): ] print('samples read', report.get('data', {}).get('samplesReadCounts', 0)) + print('total read', report.get('data', {}).get('totals', 0)) + print('row count', report.get('data', {}).get('rowCount', 0)) rows = report.get('data', {}).get('rows', []) all_data = [] From eb89999f5c99574e3bb38faa76849bcc0bc477be Mon Sep 17 00:00:00 2001 From: Christian Smorra Date: Mon, 6 Jan 2020 16:20:36 +0100 Subject: [PATCH 9/9] use settings.AIRFLOW_HOME (support newer airflow version) --- hooks/google_analytics_hook.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hooks/google_analytics_hook.py b/hooks/google_analytics_hook.py index 57973ce..ee1b08a 100644 --- a/hooks/google_analytics_hook.py +++ b/hooks/google_analytics_hook.py @@ -32,6 +32,7 @@ import time import os +from airflow import settings from airflow.hooks.base_hook import BaseHook from airflow import configuration as conf from apiclient.discovery import build @@ -52,7 +53,7 @@ class GoogleAnalyticsHook(BaseHook): version='v3', scopes=['https://www.googleapis.com/auth/analytics']) } - _key_folder = os.path.join(conf.get('core', 'airflow_home'), 'keys') + _key_folder = os.path.join(settings.AIRFLOW_HOME, 'keys') def __init__(self, google_analytics_conn_id='google_analytics_default', key_file=None): self.google_analytics_conn_id = google_analytics_conn_id