diff --git a/CHANGELOG.md b/CHANGELOG.md index 8cca91a35..e8960aeea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,8 +2,8 @@ ## 0.7.0dev -- Removed direct AWX dependencies from collectors module -- Replaced AWX license functions with direct database queries +- Drop psycopg2 support - no longer compatible with awx older than 22.4.0 +- Removed direct AWX imports from config collector, replaced with db queries - added `metrics_utility.library` - TODO diff --git a/metrics_utility/automation_controller_billing/collectors.py b/metrics_utility/automation_controller_billing/collectors.py index c24808b22..436f8ac30 100644 --- a/metrics_utility/automation_controller_billing/collectors.py +++ b/metrics_utility/automation_controller_billing/collectors.py @@ -13,6 +13,7 @@ from django.db.utils import ProgrammingError from django.utils.timezone import now, timedelta from django.utils.translation import gettext_lazy as _ +from psycopg.errors import UndefinedTable from metrics_utility.automation_controller_billing.helpers import ( get_config_and_settings_from_db, @@ -29,14 +30,6 @@ from .prometheus_client import PrometheusClient -try: - from psycopg.errors import UndefinedTable -except ImportError: - - class UndefinedTable(Exception): - pass - - """ This module is used to define metrics collected by gather_automation_controller_billing_data command. Each function is @@ -165,27 +158,15 @@ def _copy_table(table, query, path, prepend_query=None): if prepend_query: cursor.execute(prepend_query) - if hasattr(cursor, 'copy_expert') and callable(cursor.copy_expert): - _copy_table_aap_2_4_and_below(cursor, query, file) - else: - _copy_table_aap_2_5_and_above(cursor, query, file) + # Use psycopg (v3) cursor.copy() method + with cursor.copy(query) as copy: + while data := copy.read(): + byte_data = bytes(data) + file.write(byte_data.decode()) return file.file_list(keep_empty=True) -def _copy_table_aap_2_4_and_below(cursor, query, file): - # Automation Controller 4.4 and below use psycopg2 with .copy_expert() method - cursor.copy_expert(query, file) - - -def _copy_table_aap_2_5_and_above(cursor, query, file): - # Automation Controller 4.5 and above use psycopg3 with .copy() method - with cursor.copy(query) as copy: - while data := copy.read(): - byte_data = bytes(data) - file.write(byte_data.decode()) - - def yaml_and_json_parsing_functions(): query = """ -- Define function for parsing field out of yaml encoded as text diff --git a/metrics_utility/library/collectors/controller/config.py b/metrics_utility/library/collectors/controller/config.py index 0b723b6b0..f2b4c832f 100644 --- a/metrics_utility/library/collectors/controller/config.py +++ b/metrics_utility/library/collectors/controller/config.py @@ -7,6 +7,7 @@ import distro from django.utils.dateparse import parse_datetime +from psycopg import sql from ..util import collector @@ -99,9 +100,10 @@ def _get_install_type(): def _get_controller_settings(db, keys): settings = {} with db.cursor() as cursor: - # FIXME: psycopg.sql ? - in_sql = "'" + "', '".join(keys) + "'" - cursor.execute(f'SELECT key, value FROM conf_setting WHERE key IN ({in_sql})') + # Build safe SQL query with parameter placeholders + placeholders = sql.SQL(', ').join(sql.Placeholder() * len(keys)) + query = sql.SQL('SELECT key, value FROM conf_setting WHERE key IN ({})').format(placeholders) + cursor.execute(query, keys) for key, value in cursor.fetchall(): if value: settings[key] = json.loads(value, object_hook=_datetime_hook) diff --git a/metrics_utility/library/collectors/controller/job_host_summary_service.py b/metrics_utility/library/collectors/controller/job_host_summary_service.py index af7d9cc86..5ea206d12 100644 --- a/metrics_utility/library/collectors/controller/job_host_summary_service.py +++ b/metrics_utility/library/collectors/controller/job_host_summary_service.py @@ -1,16 +1,18 @@ +from psycopg import sql + from ..util import collector, copy_table @collector def job_host_summary_service(*, db=None, since=None, until=None, output_dir=None): - where = ' AND '.join( - [ - f"mu.finished >= '{since.isoformat()}'", - f"mu.finished < '{until.isoformat()}'", - ] - ) + # Build WHERE clause using parameterized query + where_parts = [ + sql.SQL('mu.finished >= %(since)s'), + sql.SQL('mu.finished < %(until)s'), + ] + where_clause = sql.SQL(' AND ').join(where_parts) - query = f""" + query_template = sql.SQL(""" WITH -- First: restrict to jobs that FINISHED in the window (uses index on main_unifiedjob.finished if present) filtered_jobs AS ( @@ -78,6 +80,17 @@ def job_host_summary_service(*, db=None, since=None, until=None, output_dir=None LEFT JOIN main_organization mo ON mo.id = mu.organization_id LEFT JOIN hosts_variables hv ON hv.host_id = mjs.host_id ORDER BY mu.finished ASC - """ + """).format(where=where_clause) - return copy_table(db=db, table='main_jobhostsummary', query=query, prepend_query=True, output_dir=output_dir) + # Convert to string and pass params (no context needed, uses default encoding) + query = query_template.as_string() + params = {'since': since, 'until': until} + + return copy_table( + db=db, + table='main_jobhostsummary', + query=query, + params=params, + prepend_query=True, + output_dir=output_dir, + ) diff --git a/metrics_utility/library/collectors/controller/main_host.py b/metrics_utility/library/collectors/controller/main_host.py index e68e3f26c..018a3516d 100644 --- a/metrics_utility/library/collectors/controller/main_host.py +++ b/metrics_utility/library/collectors/controller/main_host.py @@ -1,8 +1,19 @@ +from psycopg import sql + from ..util import collector, copy_table, date_where def _main_host_query(where): - return f""" + """ + Build main_host query with dynamic WHERE clause. + + Args: + where: Either a string (for simple WHERE clauses) or sql.SQL object + + Returns: + sql.SQL query or string + """ + query_template = """ SELECT main_host.name as host_name, main_host.id AS host_id, @@ -85,6 +96,13 @@ def _main_host_query(where): ORDER BY main_host.id ASC """ + # If where is a string, return a simple f-string formatted query + if isinstance(where, str): + return query_template.format(where=where) + + # Otherwise, it's an sql.SQL object, use sql.SQL formatting + return sql.SQL(query_template).format(where=where) + @collector def main_host(*, db=None, output_dir=None): @@ -96,10 +114,37 @@ def main_host(*, db=None, output_dir=None): def main_host_daily(*, db=None, since=None, until=None, output_dir=None): # prefer running with until=False, to not skip hosts that keep being modified - where = f""" + # Build WHERE clause using date_where + created_where, created_params = date_where('main_host.created', since, until) + modified_where, modified_params = date_where('main_host.modified', since, until) + + # Rename params to avoid conflicts + params_created = {f'created_{k}': v for k, v in created_params.items()} + params_modified = {f'modified_{k}': v for k, v in modified_params.items()} + all_params = {**params_created, **params_modified} + + # Update the SQL queries to use renamed params + created_where_str = created_where.as_string().replace('%(since)s', '%(created_since)s').replace('%(until)s', '%(created_until)s') + modified_where_str = modified_where.as_string().replace('%(since)s', '%(modified_since)s').replace('%(until)s', '%(modified_until)s') + + where = sql.SQL(""" enabled='t' - AND ({date_where('main_host.created', since, until)} - OR {date_where('main_host.modified', since, until)}) - """ - query = _main_host_query(where) - return copy_table(db=db, table='main_host_daily', query=query, prepend_query=True, output_dir=output_dir) + AND ({created} + OR {modified}) + """).format( + created=sql.SQL(created_where_str), + modified=sql.SQL(modified_where_str), + ) + + query_obj = _main_host_query(where) + # Convert to string (no context needed, uses default encoding) + query = query_obj.as_string() + + return copy_table( + db=db, + table='main_host_daily', + query=query, + params=all_params, + prepend_query=True, + output_dir=output_dir, + ) diff --git a/metrics_utility/library/collectors/controller/main_jobevent_service.py b/metrics_utility/library/collectors/controller/main_jobevent_service.py index 5d04f2d2b..62d3b8c74 100644 --- a/metrics_utility/library/collectors/controller/main_jobevent_service.py +++ b/metrics_utility/library/collectors/controller/main_jobevent_service.py @@ -1,5 +1,7 @@ from datetime import timedelta +from psycopg import sql + from ..util import collector, copy_table @@ -31,7 +33,7 @@ def main_jobevent_service(*, db=None, since=None, until=None, output_dir=None): # We are loading the finished jobs then we are filtering # for the job_created, this cannot be done by simple joins because # job_created is partitioned and partitions pruning dont work with joins - job_ids_set = set(job_id for job_id, _ in jobs) + job_ids_list = [job_id for job_id, _ in jobs] # Extract unique hour boundaries from job_created timestamps # This reduces potentially 100K timestamps down to ~100-1000 hourly ranges @@ -67,28 +69,43 @@ def main_jobevent_service(*, db=None, since=None, until=None, output_dir=None): # Build WHERE clause with consolidated ranges for partition pruning # PostgreSQL can see these literal timestamps and prune partitions accordingly + # NOTE: We use literal timestamps here (not params) for partition pruning optimization or_clauses = [] for range_start, range_end in ranges: - or_clauses.append(f"(e.job_created >= '{range_start.isoformat()}'::timestamptz AND e.job_created < '{range_end.isoformat()}'::timestamptz)") + or_clauses.append( + sql.SQL('(e.job_created >= {start}::timestamptz AND e.job_created < {end}::timestamptz)').format( + start=sql.Literal(range_start.isoformat()), + end=sql.Literal(range_end.isoformat()), + ) + ) # Handle edge case: if no ranges, use FALSE to return empty result set # This maintains valid SQL structure while returning 0 rows - timestamp_where_clause = ' OR '.join(or_clauses) if or_clauses else 'FALSE' + if or_clauses: + timestamp_where_clause = sql.SQL(' OR ').join(or_clauses) + else: + timestamp_where_clause = sql.SQL('FALSE') - # Build job_id IN clause + # Build job_id IN clause using params # Handle edge case: if no jobs, use FALSE to return empty result set with proper schema - if job_ids_set: - job_ids_str = ','.join(str(job_id) for job_id in job_ids_set) - job_id_where_clause = f'e.job_id IN ({job_ids_str})' + if job_ids_list: + # Build parameterized IN clause + job_id_placeholders = sql.SQL(', ').join(sql.Placeholder() * len(job_ids_list)) + job_id_where_clause = sql.SQL('e.job_id IN ({})').format(job_id_placeholders) + job_params = job_ids_list else: - job_id_where_clause = 'FALSE' + job_id_where_clause = sql.SQL('FALSE') + job_params = [] # Combine both WHERE conditions - where_clause = f'({timestamp_where_clause}) AND ({job_id_where_clause})' + where_clause = sql.SQL('({timestamp}) AND ({job_ids})').format( + timestamp=timestamp_where_clause, + job_ids=job_id_where_clause, + ) # Final event query # - WHERE clause filters by job_id and enables partition pruning via literal hour boundaries - query = f""" + query_template = sql.SQL(""" SELECT e.id, e.created, @@ -137,7 +154,10 @@ def main_jobevent_service(*, db=None, since=None, until=None, output_dir=None): SELECT replace(e.event_data, '\\u', '\\u005cu')::jsonb AS event_data ) AS ed LEFT JOIN main_unifiedjob uj ON uj.id = e.job_id - WHERE {where_clause} - """ + WHERE {where} + """).format(where=where_clause) + + # Convert to string for copy_table (no context needed, uses default encoding) + query = query_template.as_string() - return copy_table(db=db, table='main_jobevent', query=query, output_dir=output_dir) + return copy_table(db=db, table='main_jobevent', query=query, params=job_params, output_dir=output_dir) diff --git a/metrics_utility/library/collectors/util.py b/metrics_utility/library/collectors/util.py index 588f6b788..85a371a2f 100644 --- a/metrics_utility/library/collectors/util.py +++ b/metrics_utility/library/collectors/util.py @@ -2,21 +2,40 @@ import pathlib import tempfile +from psycopg import sql + from ..csv_file_splitter import CsvFileSplitter -# FIXME: psycopg.sql +def _sql_identifier(name): + return sql.Identifier(*name.split('.')) + + def date_where(field, since, until): + """ + field: field name (dots allowed) + since: Optional datetime - include records >= since + until: Optional datetime - include records < until + + Returns: + A tuple of (sql.SQL object, dict of params) + """ if since and until: - return f'( "{field}" >= \'{since.isoformat()}\' AND "{field}" < \'{until.isoformat()}\' )' + query = sql.SQL('( {field} >= %(since)s AND {field} < %(until)s )').format(field=_sql_identifier(field)) + params = {'since': since, 'until': until} + return query, params if since: - return f'( "{field}" >= \'{since.isoformat()}\' )' + query = sql.SQL('( {field} >= %(since)s )').format(field=_sql_identifier(field)) + params = {'since': since} + return query, params if until: - return f'( "{field}" < \'{until.isoformat()}\' )' + query = sql.SQL('( {field} < %(until)s )').format(field=_sql_identifier(field)) + params = {'until': until} + return query, params - return 'true' + return sql.SQL('true'), {} def collector(func): @@ -59,38 +78,17 @@ def copy_table(db, table, query, params=None, prepend_query=False, output_file=N copy_query = f'COPY ({query}) TO STDOUT WITH CSV HEADER' - # FIXME: remove once 2.4 is no longer supported - if hasattr(cursor, 'copy_expert') and callable(cursor.copy_expert): - _copy_table_aap_2_4_and_below(cursor, copy_query, params, file) - else: - _copy_table_aap_2_5_and_above(cursor, copy_query, params, file) + # Use psycopg (v3) cursor.copy() method + with cursor.copy(copy_query, params) as copy: + while data := copy.read(): + byte_data = bytes(data) + file.write(byte_data.decode()) if output_file: return [output_file.name] return file.file_list(keep_empty=True) -def _copy_table_aap_2_4_and_below(cursor, query, params, file): - if params: - # copy_expert doesn't support params, make do (but no escaping) - for p in params: - if f'%({p})s' in query: - query = query.replace(f'%({p})s', f'"{params[p]}"') - if f'%({p})d' in query: - query = query.replace(f'%({p})d', str(int(params[p]))) - - # Automation Controller 4.4 and below use psycopg2 with .copy_expert() method - cursor.copy_expert(query, file) - - -def _copy_table_aap_2_5_and_above(cursor, query, params, file): - # Automation Controller 4.5 and above use psycopg3 with .copy() method - with cursor.copy(query, params) as copy: - while data := copy.read(): - byte_data = bytes(data) - file.write(byte_data.decode()) - - def _yaml_json_functions(): return """ -- Define function for parsing field out of yaml encoded as text diff --git a/metrics_utility/library/storage/__init__.py b/metrics_utility/library/storage/__init__.py index 0e551ab9d..764cb9290 100644 --- a/metrics_utility/library/storage/__init__.py +++ b/metrics_utility/library/storage/__init__.py @@ -1,5 +1,7 @@ from .crc import StorageCRC, StorageCRCMutual from .directory import StorageDirectory +from .helpers import load_csv, load_json, load_parquet, save_csv, save_json, save_parquet +from .postgres import StoragePostgres, create_storage_table from .s3 import StorageS3 from .segment import StorageSegment @@ -8,6 +10,14 @@ 'StorageCRC', 'StorageCRCMutual', 'StorageDirectory', + 'StoragePostgres', 'StorageS3', 'StorageSegment', + 'create_storage_table', + 'load_csv', + 'load_json', + 'load_parquet', + 'save_csv', + 'save_json', + 'save_parquet', ] diff --git a/metrics_utility/library/storage/crc.py b/metrics_utility/library/storage/crc.py index 7307ddf15..7a27b95e7 100644 --- a/metrics_utility/library/storage/crc.py +++ b/metrics_utility/library/storage/crc.py @@ -49,6 +49,14 @@ def _put(self, file_tuple): if response.status_code >= 300: raise Exception(f'{self.__class__.__name__}: Upload failed with status {response.status_code}: {response.text}') + def get(self, remote): + """CRC storage is write-only and does not support get().""" + raise NotImplementedError(f'{self.__class__.__name__} is write-only and does not support get()') + + def get_data(self, remote, format='auto'): + """CRC storage is write-only and does not support get_data().""" + raise NotImplementedError(f'{self.__class__.__name__} is write-only and does not support get_data()') + class StorageCRC(Base): def __init__(self, **settings): diff --git a/metrics_utility/library/storage/directory.py b/metrics_utility/library/storage/directory.py index 216f02f65..9cad52808 100644 --- a/metrics_utility/library/storage/directory.py +++ b/metrics_utility/library/storage/directory.py @@ -4,6 +4,7 @@ from contextlib import contextmanager +from .helpers import load_csv, load_json, load_parquet from .util import date_filter, dict_to_json_file @@ -38,6 +39,50 @@ def glob(self, pattern, since=None, until=None): def get(self, remote): yield self._path(remote) + def get_data(self, remote, format='auto'): + """ + Retrieve data from storage and return it parsed. + + Args: + remote: Path to the file in storage + format: Format of the data - 'auto' (detect from extension), + 'json', 'csv', or 'parquet' + + Returns: + For JSON: dict or list + For CSV: list of dicts + For Parquet: pandas DataFrame + + Raises: + ValueError: If format is unsupported or cannot be auto-detected + FileNotFoundError: If the file doesn't exist + """ + if not self.exists(remote): + raise FileNotFoundError(f'File not found in storage: {remote}') + + # Auto-detect format from file extension + if format == 'auto': + remote_lower = remote.lower() + if remote_lower.endswith('.json'): + format = 'json' + elif remote_lower.endswith('.csv'): + format = 'csv' + elif remote_lower.endswith('.parquet'): + format = 'parquet' + else: + raise ValueError(f"Cannot auto-detect format for '{remote}'. Please specify format explicitly: 'json', 'csv', or 'parquet'") + + # Load the data using the appropriate helper + with self.get(remote) as filename: + if format == 'json': + return load_json(filename) + elif format == 'csv': + return load_csv(filename) + elif format == 'parquet': + return load_parquet(filename) + else: + raise ValueError(f"Unsupported format: '{format}'. Supported formats: 'auto', 'json', 'csv', 'parquet'") + def put(self, remote, *, filename=None, fileobj=None, dict=None): full_path = self._path(remote) os.makedirs(os.path.dirname(full_path), exist_ok=True) diff --git a/metrics_utility/library/storage/helpers.py b/metrics_utility/library/storage/helpers.py new file mode 100644 index 000000000..7f27192d2 --- /dev/null +++ b/metrics_utility/library/storage/helpers.py @@ -0,0 +1,143 @@ +import csv +import json + +import pandas as pd + + +# Load functions - accept either a filename (str/Path) or a file-like object + + +def load_csv(source): + """ + Load a CSV file and return a list of dictionaries. + + Args: + source: Either a filename (str/Path) or a file-like object + + Returns: + List of dictionaries, one per row + """ + # pandas read_csv handles both filenames and file-like objects + df = pd.read_csv(source, encoding='utf-8') + return df.to_dict('records') + + +def load_json(source): + """ + Load a JSON file and return the parsed data (list or dict). + + Args: + source: Either a filename (str/Path) or a file-like object + + Returns: + Parsed JSON data (list or dict) + """ + if hasattr(source, 'read'): + return json.load(source) + else: + with open(source, 'r', encoding='utf-8') as f: + return json.load(f) + + +def load_parquet(source): + """ + Load a Parquet file and return a pandas DataFrame. + + Args: + source: Either a filename (str/Path) or a file-like object + + Returns: + pandas DataFrame + """ + return pd.read_parquet(source) + + +# Save functions - follow storage put convention with filename= and fileobj= parameters + + +def save_csv(data, *, filename=None, fileobj=None): + """ + Save data as CSV to a file. + + Args: + data: Either a list of dictionaries or a pandas DataFrame + filename: Path to save the file (mutually exclusive with fileobj) + fileobj: File-like object to write to (mutually exclusive with filename) + + Note: + Exactly one of filename or fileobj must be provided. + """ + if (filename is None) == (fileobj is None): + raise ValueError('Exactly one of filename or fileobj must be provided') + + if isinstance(data, pd.DataFrame): + # Use pandas to_csv for DataFrames + if filename: + data.to_csv(filename, index=False, encoding='utf-8') + else: + data.to_csv(fileobj, index=False, encoding='utf-8') + elif isinstance(data, list): + # Use csv.DictWriter for list of dicts + if not data: + # Handle empty list + if filename: + with open(filename, 'w', newline='', encoding='utf-8') as f: + pass + return + + fieldnames = list(data[0].keys()) + + if filename: + with open(filename, 'w', newline='', encoding='utf-8') as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(data) + else: + writer = csv.DictWriter(fileobj, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(data) + else: + raise TypeError(f'data must be a DataFrame or list of dicts, got {type(data).__name__}') + + +def save_json(data, *, filename=None, fileobj=None): + """ + Save data as JSON to a file. + + Args: + data: A list or dictionary to save + filename: Path to save the file (mutually exclusive with fileobj) + fileobj: File-like object to write to (mutually exclusive with filename) + + Note: + Exactly one of filename or fileobj must be provided. + """ + if (filename is None) == (fileobj is None): + raise ValueError('Exactly one of filename or fileobj must be provided') + + if filename: + with open(filename, 'w', encoding='utf-8') as f: + json.dump(data, f) + else: + json.dump(data, fileobj) + + +def save_parquet(df, *, filename=None, fileobj=None): + """ + Save a DataFrame as Parquet to a file. + + Args: + df: pandas DataFrame to save + filename: Path to save the file (mutually exclusive with fileobj) + fileobj: File-like object to write to (mutually exclusive with filename) + + Note: + Exactly one of filename or fileobj must be provided. + """ + if (filename is None) == (fileobj is None): + raise ValueError('Exactly one of filename or fileobj must be provided') + + if filename: + df.to_parquet(filename) + else: + df.to_parquet(fileobj) diff --git a/metrics_utility/library/storage/postgres.py b/metrics_utility/library/storage/postgres.py new file mode 100644 index 000000000..d3d1e7ee4 --- /dev/null +++ b/metrics_utility/library/storage/postgres.py @@ -0,0 +1,316 @@ +import datetime +import fnmatch +import json + +from contextlib import contextmanager + +from psycopg import sql + +from .helpers import load_csv, load_json +from .util import dict_to_json_file + + +def _sql_identifier(name): + return sql.Identifier(*name.split('.')) + + +class StoragePostgres: + """ + StoragePostgres(db=connection, table='daily', [key_field='key', value_field='value', timestamp_field=None]) + stores & reads data as a json value in a key-value-style table + supports dict, csv files, json files + """ + + def __init__(self, **settings): + self.db = settings.get('db') + self.table = settings.get('table') + self.key_field = settings.get('key_field', 'key') + self.value_field = settings.get('value_field', 'value') + self.timestamp_field = settings.get('timestamp_field', None) + + if not self.db: + raise Exception('StoragePostgres: db connection is required') + + if not self.table: + raise Exception('StoragePostgres: table is required') + + @contextmanager + def get(self, key): + """ + Get value from the database as a temporary JSON file. + + Args: + key: The key to look up + + Yields: + Path to a temporary JSON file containing the data + + Raises: + KeyError: If the key doesn't exist in the database + """ + data = self.get_data(key) + if data is None: + raise KeyError(f'Key not found in storage: {key}') + + # Use the util function to create a temporary JSON file + with dict_to_json_file(data) as filename: + yield filename + + def get_data(self, key): + """ + Get value from the database by key and return it parsed. + + Args: + key: The key to look up + + Returns: + The parsed JSON value (dict or list) or None if not found + """ + with self.db.cursor() as cursor: + query = sql.SQL('SELECT {value_field} FROM {table} WHERE {key_field} = %s').format( + value_field=_sql_identifier(self.value_field), + table=_sql_identifier(self.table), + key_field=_sql_identifier(self.key_field), + ) + cursor.execute(query, (key,)) + + row = cursor.fetchone() + if row is None: + return None + + # The value is stored as JSON/JSONB in PostgreSQL + value = row[0] + + # If it's already a dict/list (JSONB type), return it + if isinstance(value, (dict, list)): + return value + + # Otherwise parse it as JSON string + return json.loads(value) if value else None + + def put(self, key, *, filename=None, fileobj=None, dict=None, update_timestamp=True): + """ + Store value in the database. + + Args: + key: The key to store under + filename: Path to a CSV or JSON file to load + fileobj: File-like object containing CSV or JSON data + dict: Dictionary or list to store directly + update_timestamp: If True (default), update timestamp to now. If False, don't update timestamp field. + + Note: + Exactly one of filename, fileobj, or dict must be provided. + For CSV files, the loaded list of dicts will be stored. + For JSON files, the loaded data (dict or list) will be stored. + """ + if sum([filename is not None, fileobj is not None, dict is not None]) != 1: + raise ValueError('Exactly one of filename, fileobj, or dict must be provided') + + # Determine the value to store + value = None + + if dict is not None: + value = dict + elif filename is not None: + # Determine file type from extension + if filename.endswith('.csv'): + value = load_csv(filename) + elif filename.endswith('.json'): + value = load_json(filename) + else: + raise ValueError(f'Unsupported file type for filename: {filename}. Only .csv and .json are supported.') + elif fileobj is not None: + # Try to detect file type from fileobj name attribute if available + fileobj_name = getattr(fileobj, 'name', '') + if fileobj_name.endswith('.csv') or 'csv' in fileobj_name.lower(): + value = load_csv(fileobj) + elif fileobj_name.endswith('.json') or 'json' in fileobj_name.lower(): + value = load_json(fileobj) + else: + # Default to JSON for file-like objects without clear extension + value = load_json(fileobj) + + # Convert value to JSON string + json_value = json.dumps(value) + + # Build and execute the upsert query + with self.db.cursor() as cursor: + if self.timestamp_field and update_timestamp: + # Include timestamp in the upsert + query = sql.SQL( + """INSERT INTO {table} ({key_field}, {value_field}, {timestamp_field}) + VALUES (%s, %s, %s) + ON CONFLICT ({key_field}) + DO UPDATE SET {value_field} = EXCLUDED.{value_field}, + {timestamp_field} = EXCLUDED.{timestamp_field}""" + ).format( + table=_sql_identifier(self.table), + key_field=_sql_identifier(self.key_field), + value_field=_sql_identifier(self.value_field), + timestamp_field=_sql_identifier(self.timestamp_field), + ) + cursor.execute(query, (key, json_value, datetime.datetime.now(datetime.timezone.utc))) + else: + # No timestamp or don't update it + query = sql.SQL( + """INSERT INTO {table} ({key_field}, {value_field}) + VALUES (%s, %s) + ON CONFLICT ({key_field}) + DO UPDATE SET {value_field} = EXCLUDED.{value_field}""" + ).format( + table=_sql_identifier(self.table), + key_field=_sql_identifier(self.key_field), + value_field=_sql_identifier(self.value_field), + ) + cursor.execute(query, (key, json_value)) + + # Commit the transaction + self.db.commit() + + def glob(self, pattern, since=None, until=None): + """ + List keys matching a glob pattern, optionally filtered by timestamp. + + Args: + pattern: Glob pattern to match against keys (e.g., "data-*") + since: Optional datetime - include only records with timestamp >= since + until: Optional datetime - include only records with timestamp < until + + Returns: + List of matching keys + """ + with self.db.cursor() as cursor: + # Build the base query + if self.timestamp_field and (since or until): + query = sql.SQL('SELECT {key_field}, {timestamp_field} FROM {table}').format( + key_field=_sql_identifier(self.key_field), + timestamp_field=_sql_identifier(self.timestamp_field), + table=_sql_identifier(self.table), + ) + else: + query = sql.SQL('SELECT {key_field} FROM {table}').format( + key_field=_sql_identifier(self.key_field), + table=_sql_identifier(self.table), + ) + cursor.execute(query) + + rows = cursor.fetchall() + + # Filter by pattern and timestamp + matching_keys = [] + for row in rows: + key = row[0] + + # Check glob pattern + if not fnmatch.fnmatch(key, pattern): + continue + + # Check timestamp if applicable + if self.timestamp_field and (since or until): + timestamp = row[1] + if since and timestamp < since: + continue + if until and timestamp >= until: + continue + + matching_keys.append(key) + + return matching_keys + + def exists(self, key): + """ + Check if a key exists in the database. + + Args: + key: The key to check + + Returns: + True if the key exists, False otherwise + """ + with self.db.cursor() as cursor: + query = sql.SQL('SELECT 1 FROM {table} WHERE {key_field} = %s').format( + table=_sql_identifier(self.table), + key_field=_sql_identifier(self.key_field), + ) + cursor.execute(query, (key,)) + + return cursor.fetchone() is not None + + def remove(self, key): + """ + Remove a key from the database. + + Args: + key: The key to remove + """ + with self.db.cursor() as cursor: + query = sql.SQL('DELETE FROM {table} WHERE {key_field} = %s').format( + table=_sql_identifier(self.table), + key_field=_sql_identifier(self.key_field), + ) + cursor.execute(query, (key,)) + + self.db.commit() + + +def create_storage_table( + db, + table='storage', + key_field='key', + value_field='value', + timestamp_field='updated_at', +): + """ + Create a storage table if it doesn't exist. + + Args: + db: Database connection + table: Table name + key_field: Name of the key field (will be PRIMARY KEY) + value_field: Name of the value field (will be JSONB) + timestamp_field: Name of the timestamp field (will auto-update, can be None to skip) + + Example: + create_storage_table(db, table='my_storage') + storage = StoragePostgres(db=db, table='my_storage') + """ + with db.cursor() as cursor: + # Build the CREATE TABLE statement + if timestamp_field: + query = sql.SQL( + """CREATE TABLE IF NOT EXISTS {table} ( + {key_field} TEXT PRIMARY KEY, + {value_field} JSONB NOT NULL, + {timestamp_field} TIMESTAMPTZ NOT NULL DEFAULT NOW() + )""" + ).format( + table=_sql_identifier(table), + key_field=_sql_identifier(key_field), + value_field=_sql_identifier(value_field), + timestamp_field=_sql_identifier(timestamp_field), + ) + else: + query = sql.SQL( + """CREATE TABLE IF NOT EXISTS {table} ( + {key_field} TEXT PRIMARY KEY, + {value_field} JSONB NOT NULL + )""" + ).format( + table=_sql_identifier(table), + key_field=_sql_identifier(key_field), + value_field=_sql_identifier(value_field), + ) + cursor.execute(query) + + # Create an index on the timestamp field if it exists + if timestamp_field: + index_name = f'{table}_{timestamp_field}_idx' + query = sql.SQL('CREATE INDEX IF NOT EXISTS {index_name} ON {table} ({timestamp_field})').format( + index_name=_sql_identifier(index_name), + table=_sql_identifier(table), + timestamp_field=_sql_identifier(timestamp_field), + ) + cursor.execute(query) + + db.commit() diff --git a/metrics_utility/library/storage/s3.py b/metrics_utility/library/storage/s3.py index 2b5675022..fae026c58 100644 --- a/metrics_utility/library/storage/s3.py +++ b/metrics_utility/library/storage/s3.py @@ -6,6 +6,7 @@ import boto3 +from .helpers import load_csv, load_json, load_parquet from .util import date_filter, dict_to_json_file @@ -58,6 +59,47 @@ def get(self, remote): self.client.download_file(Bucket=self.bucket, Key=remote, Filename=local_filename) yield local_filename + def get_data(self, remote, format='auto'): + """ + Retrieve data from S3 and return it parsed. + + Args: + remote: Path to the object in S3 + format: Format of the data - 'auto' (detect from extension), + 'json', 'csv', or 'parquet' + + Returns: + For JSON: dict or list + For CSV: list of dicts + For Parquet: pandas DataFrame + + Raises: + ValueError: If format is unsupported or cannot be auto-detected + Exception: If the S3 object doesn't exist or download fails + """ + # Auto-detect format from file extension + if format == 'auto': + remote_lower = remote.lower() + if remote_lower.endswith('.json'): + format = 'json' + elif remote_lower.endswith('.csv'): + format = 'csv' + elif remote_lower.endswith('.parquet'): + format = 'parquet' + else: + raise ValueError(f"Cannot auto-detect format for '{remote}'. Please specify format explicitly: 'json', 'csv', or 'parquet'") + + # Load the data using the appropriate helper + with self.get(remote) as filename: + if format == 'json': + return load_json(filename) + elif format == 'csv': + return load_csv(filename) + elif format == 'parquet': + return load_parquet(filename) + else: + raise ValueError(f"Unsupported format: '{format}'. Supported formats: 'auto', 'json', 'csv', 'parquet'") + def put(self, remote, *, filename=None, fileobj=None, dict=None): if filename: self.client.upload_file(Filename=filename, Bucket=self.bucket, Key=remote) diff --git a/metrics_utility/library/storage/segment.py b/metrics_utility/library/storage/segment.py index 0a35d46f9..857604b81 100644 --- a/metrics_utility/library/storage/segment.py +++ b/metrics_utility/library/storage/segment.py @@ -165,3 +165,11 @@ def put(self, artifact_name, *, filename=None, fileobj=None, dict=None, event_na analytics.flush() return chunks + + def get(self, remote): + """StorageSegment is write-only and does not support get().""" + raise NotImplementedError('StorageSegment is write-only and does not support get()') + + def get_data(self, remote, format='auto'): + """StorageSegment is write-only and does not support get_data().""" + raise NotImplementedError('StorageSegment is write-only and does not support get_data()') diff --git a/metrics_utility/test/library/test_collectors_job_host_summary_service.py b/metrics_utility/test/library/test_collectors_job_host_summary_service.py index b81617dfb..36e52c560 100644 --- a/metrics_utility/test/library/test_collectors_job_host_summary_service.py +++ b/metrics_utility/test/library/test_collectors_job_host_summary_service.py @@ -66,13 +66,18 @@ def test_job_host_summary_service_query_contains_time_range(mock_copy_table): call_args = mock_copy_table.call_args query = call_args[1]['query'] + params = call_args[1]['params'] - # Query should contain time boundaries (uses mu.finished) - assert '2024-03-01' in query - assert '2024-03-15' in query + # Query should contain parameterized placeholders (uses mu.finished) + assert '%(since)s' in query + assert '%(until)s' in query assert 'mu.finished >=' in query assert 'mu.finished <' in query + # Actual datetime values should be in params + assert params['since'] == since + assert params['until'] == until + @patch('metrics_utility.library.collectors.controller.job_host_summary_service.copy_table') def test_job_host_summary_service_query_structure(mock_copy_table): @@ -160,7 +165,7 @@ def test_job_host_summary_service_orders_by_finished(mock_copy_table): @patch('metrics_utility.library.collectors.controller.job_host_summary_service.copy_table') def test_job_host_summary_service_isoformat(mock_copy_table): - """Test that datetime objects are converted to isoformat in query.""" + """Test that datetime objects are passed as params, not embedded in query.""" mock_db = MagicMock() since = datetime.datetime(2024, 7, 20, 8, 15, 30, tzinfo=datetime.timezone.utc) until = datetime.datetime(2024, 7, 21, 16, 45, 0, tzinfo=datetime.timezone.utc) @@ -170,8 +175,8 @@ def test_job_host_summary_service_isoformat(mock_copy_table): instance.gather() call_args = mock_copy_table.call_args - query = call_args[1]['query'] + params = call_args[1]['params'] - # Should contain isoformat timestamps - assert '2024-07-20T08:15:30+00:00' in query - assert '2024-07-21T16:45:00+00:00' in query + # Datetime objects should be passed as params (not embedded in query as strings) + assert params['since'] == since + assert params['until'] == until diff --git a/metrics_utility/test/library/test_collectors_main_jobevent_service.py b/metrics_utility/test/library/test_collectors_main_jobevent_service.py index a101e9b88..de65db9c4 100644 --- a/metrics_utility/test/library/test_collectors_main_jobevent_service.py +++ b/metrics_utility/test/library/test_collectors_main_jobevent_service.py @@ -67,8 +67,6 @@ def test_main_jobevent_service_with_jobs_calls_copy_table(mock_copy_table): """Test that collector calls copy_table when jobs are found.""" mock_db = MagicMock() mock_cursor = MagicMock() - # Configure mock cursor to simulate psycopg3 (no copy_expert method) - del mock_cursor.copy_expert mock_db.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor) mock_db.cursor.return_value.__exit__ = MagicMock(return_value=False) @@ -100,8 +98,6 @@ def test_main_jobevent_service_query_structure(mock_copy_table): """Test that the SQL query has expected structure.""" mock_db = MagicMock() mock_cursor = MagicMock() - # Configure mock cursor to simulate psycopg3 (no copy_expert method) - del mock_cursor.copy_expert mock_db.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor) mock_db.cursor.return_value.__exit__ = MagicMock(return_value=False) @@ -152,10 +148,13 @@ def test_main_jobevent_service_builds_temp_table_and_hourly_ranges(mock_copy_tab call_args = mock_copy_table.call_args query = call_args[1]['query'] + params = call_args[1].get('params', []) # Should use direct job_id IN clause (no temp table for read-only replica compatibility) assert 'e.job_id IN (' in query - assert '100' in query or '200' in query # Should contain job IDs + + # Job IDs should be in params (parameterized query) + assert 100 in params and 200 in params # Should have hourly timestamp ranges (truncated to hour boundaries) # Job 1 at 10:30:45 -> hour range 10:00:00 to 11:00:00 @@ -208,8 +207,6 @@ def test_main_jobevent_service_playbook_stats_handling(mock_copy_table): """Test that query handles playbook_on_stats event specially.""" mock_db = MagicMock() mock_cursor = MagicMock() - # Configure mock cursor to simulate psycopg3 (no copy_expert method) - del mock_cursor.copy_expert mock_db.cursor.return_value.__enter__ = MagicMock(return_value=mock_cursor) mock_db.cursor.return_value.__exit__ = MagicMock(return_value=False) diff --git a/metrics_utility/test/library/test_storage_directory.py b/metrics_utility/test/library/test_storage_directory.py index aed98b7d3..696156750 100644 --- a/metrics_utility/test/library/test_storage_directory.py +++ b/metrics_utility/test/library/test_storage_directory.py @@ -113,3 +113,48 @@ def test_remove(): storage = StorageDirectory(base_path=base_path) storage.remove(target_filename) assert storage.exists(target_filename) is False + + +def test_get_data_json(): + storage = StorageDirectory(base_path=base_path) + + # Recreate the file since test_remove() removed it + obj = {'foo': 5, 'bar': 'baz'} + storage.put(target_filename, dict=obj) + + # Get JSON data directly (need to specify format since filename has no extension) + data = storage.get_data(target_filename, format='json') + assert isinstance(data, dict) + assert data == {'foo': 5, 'bar': 'baz'} + + +def test_get_data_json_explicit_format(): + storage = StorageDirectory(base_path=base_path) + + # File should still exist from previous test + # Explicitly specify format + data = storage.get_data(target_filename, format='json') + assert isinstance(data, dict) + assert data == {'foo': 5, 'bar': 'baz'} + + +def test_get_data_not_found(): + storage = StorageDirectory(base_path=base_path) + + with pytest.raises(FileNotFoundError, match='File not found in storage'): + storage.get_data('nonexistent.json') + + +def test_get_data_unsupported_format(): + storage = StorageDirectory(base_path=base_path) + + # Create a file with unsupported extension + unsupported_file = f'test-unsupported-{os.getpid()}.txt' + storage.put(unsupported_file, filename='/dev/null') + + try: + with pytest.raises(ValueError, match='Cannot auto-detect format'): + storage.get_data(unsupported_file) + finally: + if storage.exists(unsupported_file): + storage.remove(unsupported_file) diff --git a/metrics_utility/test/library/test_storage_helpers.py b/metrics_utility/test/library/test_storage_helpers.py new file mode 100644 index 000000000..f30b2ad71 --- /dev/null +++ b/metrics_utility/test/library/test_storage_helpers.py @@ -0,0 +1,208 @@ +import io +import os +import tempfile + +import pandas as pd +import pytest + +from metrics_utility.library.storage import load_csv, load_json, load_parquet, save_csv, save_json, save_parquet + + +# Test data +test_dict_list = [ + {'name': 'Alice', 'age': 30, 'city': 'NYC'}, + {'name': 'Bob', 'age': 25, 'city': 'LA'}, +] + +test_dict_list_with_unicode = [ + {'name': 'Müller', 'age': 30, 'city': 'München'}, + {'name': 'José', 'age': 25, 'city': 'São Paulo'}, +] + +test_json_data = {'users': test_dict_list, 'count': 2} +test_json_data_with_unicode = {'message': 'Hello 世界 🌍', 'emoji': '🎉'} + + +def test_csv_save_load_list_of_dicts_filename(): + """Test save_csv and load_csv with list of dicts using filename.""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f: + csv_path = f.name + + try: + save_csv(test_dict_list, filename=csv_path) + loaded = load_csv(csv_path) + assert loaded == test_dict_list + finally: + os.unlink(csv_path) + + +def test_csv_save_load_dataframe_filename(): + """Test save_csv and load_csv with DataFrame using filename.""" + test_df = pd.DataFrame(test_dict_list) + + with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f: + csv_path = f.name + + try: + save_csv(test_df, filename=csv_path) + loaded = load_csv(csv_path) + assert loaded == test_dict_list + finally: + os.unlink(csv_path) + + +def test_csv_save_load_fileobj(): + """Test save_csv and load_csv with file objects.""" + csv_buffer = io.StringIO() + save_csv(test_dict_list, fileobj=csv_buffer) + + csv_buffer.seek(0) + loaded = load_csv(csv_buffer) + assert loaded == test_dict_list + + +def test_csv_utf8_encoding(): + """Test that CSV files handle UTF-8 characters correctly.""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f: + csv_path = f.name + + try: + save_csv(test_dict_list_with_unicode, filename=csv_path) + loaded = load_csv(csv_path) + assert loaded == test_dict_list_with_unicode + assert loaded[0]['name'] == 'Müller' + assert loaded[0]['city'] == 'München' + finally: + os.unlink(csv_path) + + +def test_csv_empty_list(): + """Test save_csv with an empty list.""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f: + csv_path = f.name + + try: + save_csv([], filename=csv_path) + # File should exist but be empty + assert os.path.exists(csv_path) + assert os.path.getsize(csv_path) == 0 + finally: + os.unlink(csv_path) + + +def test_json_save_load_filename(): + """Test save_json and load_json using filename.""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + json_path = f.name + + try: + save_json(test_json_data, filename=json_path) + loaded = load_json(json_path) + assert loaded == test_json_data + finally: + os.unlink(json_path) + + +def test_json_save_load_fileobj(): + """Test save_json and load_json with file objects.""" + json_buffer = io.StringIO() + save_json(test_json_data, fileobj=json_buffer) + + json_buffer.seek(0) + loaded = load_json(json_buffer) + assert loaded == test_json_data + + +def test_json_utf8_encoding(): + """Test that JSON files handle UTF-8 characters correctly.""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + json_path = f.name + + try: + save_json(test_json_data_with_unicode, filename=json_path) + loaded = load_json(json_path) + assert loaded == test_json_data_with_unicode + assert loaded['message'] == 'Hello 世界 🌍' + assert loaded['emoji'] == '🎉' + finally: + os.unlink(json_path) + + +def test_parquet_save_load_filename(): + """Test save_parquet and load_parquet using filename.""" + try: + import pyarrow # noqa: F401 + except ImportError: + pytest.skip('pyarrow not available') + + test_df = pd.DataFrame(test_dict_list) + + with tempfile.NamedTemporaryFile(mode='w', suffix='.parquet', delete=False) as f: + parquet_path = f.name + + try: + save_parquet(test_df, filename=parquet_path) + loaded = load_parquet(parquet_path) + assert test_df.equals(loaded) + finally: + os.unlink(parquet_path) + + +def test_parquet_save_load_fileobj(): + """Test save_parquet and load_parquet with file objects.""" + try: + import pyarrow # noqa: F401 + except ImportError: + pytest.skip('pyarrow not available') + + test_df = pd.DataFrame(test_dict_list) + parquet_buffer = io.BytesIO() + + save_parquet(test_df, fileobj=parquet_buffer) + parquet_buffer.seek(0) + loaded = load_parquet(parquet_buffer) + assert test_df.equals(loaded) + + +def test_save_csv_error_no_args(): + """Test that save_csv raises ValueError when neither filename nor fileobj is provided.""" + with pytest.raises(ValueError, match='Exactly one of filename or fileobj must be provided'): + save_csv(test_dict_list) + + +def test_save_csv_error_both_args(): + """Test that save_csv raises ValueError when both filename and fileobj are provided.""" + with pytest.raises(ValueError, match='Exactly one of filename or fileobj must be provided'): + save_csv(test_dict_list, filename='test.csv', fileobj=io.StringIO()) + + +def test_save_csv_error_invalid_type(): + """Test that save_csv raises TypeError for invalid data type.""" + with pytest.raises(TypeError, match='data must be a DataFrame or list of dicts'): + save_csv('invalid data', filename='test.csv') + + +def test_save_json_error_no_args(): + """Test that save_json raises ValueError when neither filename nor fileobj is provided.""" + with pytest.raises(ValueError, match='Exactly one of filename or fileobj must be provided'): + save_json(test_json_data) + + +def test_save_json_error_both_args(): + """Test that save_json raises ValueError when both filename and fileobj are provided.""" + with pytest.raises(ValueError, match='Exactly one of filename or fileobj must be provided'): + save_json(test_json_data, filename='test.json', fileobj=io.StringIO()) + + +def test_save_parquet_error_no_args(): + """Test that save_parquet raises ValueError when neither filename nor fileobj is provided.""" + test_df = pd.DataFrame(test_dict_list) + with pytest.raises(ValueError, match='Exactly one of filename or fileobj must be provided'): + save_parquet(test_df) + + +def test_save_parquet_error_both_args(): + """Test that save_parquet raises ValueError when both filename and fileobj are provided.""" + test_df = pd.DataFrame(test_dict_list) + with pytest.raises(ValueError, match='Exactly one of filename or fileobj must be provided'): + save_parquet(test_df, filename='test.parquet', fileobj=io.BytesIO()) diff --git a/metrics_utility/test/library/test_storage_postgres.py b/metrics_utility/test/library/test_storage_postgres.py new file mode 100644 index 000000000..b77fb25b6 --- /dev/null +++ b/metrics_utility/test/library/test_storage_postgres.py @@ -0,0 +1,411 @@ +import datetime +import os +import tempfile + +from unittest.mock import MagicMock + +import pytest + +from metrics_utility.library.storage import StoragePostgres, create_storage_table + + +# Note: These tests use mocked database connections for unit testing. +# For integration tests with a real PostgreSQL database, use the mock DB from `make compose`. + + +def test_storage_postgres_requires_db(): + """Test that StoragePostgres requires a db connection.""" + with pytest.raises(Exception, match='db connection is required'): + StoragePostgres(table='test_table') + + +def test_storage_postgres_requires_table(): + """Test that StoragePostgres requires a table name.""" + mock_db = MagicMock() + with pytest.raises(Exception, match='table is required'): + StoragePostgres(db=mock_db) + + +def test_storage_postgres_init(): + """Test StoragePostgres initialization with all parameters.""" + mock_db = MagicMock() + storage = StoragePostgres( + db=mock_db, + table='my_table', + key_field='my_key', + value_field='my_value', + timestamp_field='my_timestamp', + ) + + assert storage.db == mock_db + assert storage.table == 'my_table' + assert storage.key_field == 'my_key' + assert storage.value_field == 'my_value' + assert storage.timestamp_field == 'my_timestamp' + + +def test_storage_postgres_init_defaults(): + """Test StoragePostgres initialization with default field names.""" + mock_db = MagicMock() + storage = StoragePostgres(db=mock_db, table='my_table') + + assert storage.key_field == 'key' + assert storage.value_field == 'value' + assert storage.timestamp_field is None + + +def test_get_dict_value(): + """Test get_data method returns dict value from database.""" + mock_db = MagicMock() + mock_cursor = MagicMock() + mock_db.cursor.return_value.__enter__.return_value = mock_cursor + + # Simulate JSONB column returning a dict directly + mock_cursor.fetchone.return_value = ({'foo': 'bar', 'count': 42},) + + storage = StoragePostgres(db=mock_db, table='test_table') + result = storage.get_data('test_key') + + assert result == {'foo': 'bar', 'count': 42} + mock_cursor.execute.assert_called_once() + assert 'SELECT' in str(mock_cursor.execute.call_args) + assert 'test_key' in str(mock_cursor.execute.call_args) + + +def test_get_list_value(): + """Test get_data method returns list value from database.""" + mock_db = MagicMock() + mock_cursor = MagicMock() + mock_db.cursor.return_value.__enter__.return_value = mock_cursor + + # Simulate JSONB column returning a list directly + mock_cursor.fetchone.return_value = ([{'name': 'Alice'}, {'name': 'Bob'}],) + + storage = StoragePostgres(db=mock_db, table='test_table') + result = storage.get_data('test_key') + + assert result == [{'name': 'Alice'}, {'name': 'Bob'}] + + +def test_get_not_found(): + """Test get_data method returns None when key doesn't exist.""" + mock_db = MagicMock() + mock_cursor = MagicMock() + mock_db.cursor.return_value.__enter__.return_value = mock_cursor + + mock_cursor.fetchone.return_value = None + + storage = StoragePostgres(db=mock_db, table='test_table') + result = storage.get_data('nonexistent_key') + + assert result is None + + +def test_put_dict(): + """Test put method stores a dict.""" + mock_db = MagicMock() + mock_cursor = MagicMock() + mock_db.cursor.return_value.__enter__.return_value = mock_cursor + + storage = StoragePostgres(db=mock_db, table='test_table', timestamp_field='updated_at') + test_data = {'foo': 'bar', 'count': 42} + + storage.put('test_key', dict=test_data) + + mock_cursor.execute.assert_called_once() + call_args = mock_cursor.execute.call_args + + # Check that INSERT query was called + assert 'INSERT' in str(call_args) + assert 'test_key' in str(call_args) + + # Check that commit was called + mock_db.commit.assert_called_once() + + +def test_put_list(): + """Test put method stores a list.""" + mock_db = MagicMock() + mock_cursor = MagicMock() + mock_db.cursor.return_value.__enter__.return_value = mock_cursor + + storage = StoragePostgres(db=mock_db, table='test_table') + test_data = [{'name': 'Alice'}, {'name': 'Bob'}] + + storage.put('test_key', dict=test_data) + + mock_cursor.execute.assert_called_once() + mock_db.commit.assert_called_once() + + +def test_put_csv_filename(): + """Test put method loads and stores CSV file.""" + mock_db = MagicMock() + mock_cursor = MagicMock() + mock_db.cursor.return_value.__enter__.return_value = mock_cursor + + storage = StoragePostgres(db=mock_db, table='test_table') + + # Create a temporary CSV file + with tempfile.NamedTemporaryFile(mode='w', suffix='.csv', delete=False) as f: + f.write('name,age\n') + f.write('Alice,30\n') + f.write('Bob,25\n') + csv_path = f.name + + try: + storage.put('test_key', filename=csv_path) + + mock_cursor.execute.assert_called_once() + mock_db.commit.assert_called_once() + + # Verify the data was converted from CSV to list of dicts + call_args = str(mock_cursor.execute.call_args) + assert 'Alice' in call_args or 'test_key' in call_args + finally: + os.unlink(csv_path) + + +def test_put_json_filename(): + """Test put method loads and stores JSON file.""" + mock_db = MagicMock() + mock_cursor = MagicMock() + mock_db.cursor.return_value.__enter__.return_value = mock_cursor + + storage = StoragePostgres(db=mock_db, table='test_table') + + # Create a temporary JSON file + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + f.write('{"users": [{"name": "Alice"}], "count": 1}') + json_path = f.name + + try: + storage.put('test_key', filename=json_path) + + mock_cursor.execute.assert_called_once() + mock_db.commit.assert_called_once() + finally: + os.unlink(json_path) + + +def test_put_unsupported_file_type(): + """Test put method raises error for unsupported file types.""" + mock_db = MagicMock() + storage = StoragePostgres(db=mock_db, table='test_table') + + with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f: + f.write('some text') + txt_path = f.name + + try: + with pytest.raises(ValueError, match='Unsupported file type'): + storage.put('test_key', filename=txt_path) + finally: + os.unlink(txt_path) + + +def test_put_requires_exactly_one_param(): + """Test put method requires exactly one of filename, fileobj, or dict.""" + mock_db = MagicMock() + storage = StoragePostgres(db=mock_db, table='test_table') + + # No parameters + with pytest.raises(ValueError, match='Exactly one'): + storage.put('test_key') + + # Multiple parameters + with pytest.raises(ValueError, match='Exactly one'): + storage.put('test_key', dict={'foo': 'bar'}, filename='test.json') + + +def test_put_update_timestamp_false(): + """Test put method with update_timestamp=False.""" + mock_db = MagicMock() + mock_cursor = MagicMock() + mock_db.cursor.return_value.__enter__.return_value = mock_cursor + + storage = StoragePostgres(db=mock_db, table='test_table', timestamp_field='updated_at') + + storage.put('test_key', dict={'foo': 'bar'}, update_timestamp=False) + + # Verify the INSERT query doesn't include timestamp + # When update_timestamp=False, we should not be passing a timestamp value + # The query should only have 2 parameters (key, value) + assert mock_cursor.execute.called + + +def test_glob_basic(): + """Test glob method returns matching keys.""" + mock_db = MagicMock() + mock_cursor = MagicMock() + mock_db.cursor.return_value.__enter__.return_value = mock_cursor + + # Simulate database returning some keys + mock_cursor.fetchall.return_value = [ + ('data-2025-01-01',), + ('data-2025-01-02',), + ('config',), + ] + + storage = StoragePostgres(db=mock_db, table='test_table') + result = storage.glob('data-*') + + assert result == ['data-2025-01-01', 'data-2025-01-02'] + + +def test_glob_with_timestamp_filter(): + """Test glob method with timestamp filtering.""" + mock_db = MagicMock() + mock_cursor = MagicMock() + mock_db.cursor.return_value.__enter__.return_value = mock_cursor + + since = datetime.datetime(2025, 1, 2, tzinfo=datetime.timezone.utc) + until = datetime.datetime(2025, 1, 4, tzinfo=datetime.timezone.utc) + + # Simulate database returning keys with timestamps + mock_cursor.fetchall.return_value = [ + ('data-2025-01-01', datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc)), + ('data-2025-01-02', datetime.datetime(2025, 1, 2, tzinfo=datetime.timezone.utc)), + ('data-2025-01-03', datetime.datetime(2025, 1, 3, tzinfo=datetime.timezone.utc)), + ('data-2025-01-04', datetime.datetime(2025, 1, 4, tzinfo=datetime.timezone.utc)), + ('data-2025-01-05', datetime.datetime(2025, 1, 5, tzinfo=datetime.timezone.utc)), + ] + + storage = StoragePostgres(db=mock_db, table='test_table', timestamp_field='updated_at') + result = storage.glob('data-*', since=since, until=until) + + # Should include 01-02 and 01-03 (since <= timestamp < until) + assert result == ['data-2025-01-02', 'data-2025-01-03'] + + +def test_exists_true(): + """Test exists method returns True when key exists.""" + mock_db = MagicMock() + mock_cursor = MagicMock() + mock_db.cursor.return_value.__enter__.return_value = mock_cursor + + mock_cursor.fetchone.return_value = (1,) + + storage = StoragePostgres(db=mock_db, table='test_table') + result = storage.exists('test_key') + + assert result is True + + +def test_exists_false(): + """Test exists method returns False when key doesn't exist.""" + mock_db = MagicMock() + mock_cursor = MagicMock() + mock_db.cursor.return_value.__enter__.return_value = mock_cursor + + mock_cursor.fetchone.return_value = None + + storage = StoragePostgres(db=mock_db, table='test_table') + result = storage.exists('nonexistent_key') + + assert result is False + + +def test_remove(): + """Test remove method deletes a key.""" + mock_db = MagicMock() + mock_cursor = MagicMock() + mock_db.cursor.return_value.__enter__.return_value = mock_cursor + + storage = StoragePostgres(db=mock_db, table='test_table') + storage.remove('test_key') + + mock_cursor.execute.assert_called_once() + assert 'DELETE' in str(mock_cursor.execute.call_args) + assert 'test_key' in str(mock_cursor.execute.call_args) + mock_db.commit.assert_called_once() + + +def test_create_storage_table_with_timestamp(): + """Test create_storage_table creates table with timestamp field.""" + mock_db = MagicMock() + mock_cursor = MagicMock() + mock_db.cursor.return_value.__enter__.return_value = mock_cursor + + create_storage_table( + db=mock_db, + table='my_storage', + key_field='my_key', + value_field='my_value', + timestamp_field='my_timestamp', + ) + + # Should execute CREATE TABLE and CREATE INDEX + assert mock_cursor.execute.call_count == 2 + + # First call should be CREATE TABLE + first_call = str(mock_cursor.execute.call_args_list[0]) + assert 'CREATE TABLE' in first_call + + # Second call should be CREATE INDEX + second_call = str(mock_cursor.execute.call_args_list[1]) + assert 'CREATE INDEX' in second_call + + mock_db.commit.assert_called_once() + + +def test_create_storage_table_without_timestamp(): + """Test create_storage_table creates table without timestamp field.""" + mock_db = MagicMock() + mock_cursor = MagicMock() + mock_db.cursor.return_value.__enter__.return_value = mock_cursor + + create_storage_table( + db=mock_db, + table='my_storage', + timestamp_field=None, + ) + + # Should execute only CREATE TABLE (no index since no timestamp) + assert mock_cursor.execute.call_count == 1 + + call = str(mock_cursor.execute.call_args) + assert 'CREATE TABLE' in call + + mock_db.commit.assert_called_once() + + +def test_get_as_context_manager(): + """Test get() context manager creates temp JSON file.""" + mock_db = MagicMock() + mock_cursor = MagicMock() + mock_db.cursor.return_value.__enter__.return_value = mock_cursor + + # Simulate JSONB column returning a dict + mock_cursor.fetchone.return_value = ({'foo': 'bar', 'count': 42},) + + storage = StoragePostgres(db=mock_db, table='test_table') + + with storage.get('test_key') as filename: + # Verify it's a file path + assert isinstance(filename, str) + # Verify the file exists and contains the data as JSON + assert os.path.exists(filename) + with open(filename, 'r') as f: + import json + + data = json.load(f) + assert data == {'foo': 'bar', 'count': 42} + + # After exiting context, temp file should be cleaned up + assert not os.path.exists(filename) + + +def test_get_as_context_manager_key_not_found(): + """Test get() context manager raises KeyError when key doesn't exist.""" + mock_db = MagicMock() + mock_cursor = MagicMock() + mock_db.cursor.return_value.__enter__.return_value = mock_cursor + + mock_cursor.fetchone.return_value = None + + storage = StoragePostgres(db=mock_db, table='test_table') + + with pytest.raises(KeyError, match='Key not found'): + with storage.get('nonexistent_key') as _filename: + pass diff --git a/metrics_utility/test/library/test_storage_s3.py b/metrics_utility/test/library/test_storage_s3.py index e1000c9d2..d49dbf125 100644 --- a/metrics_utility/test/library/test_storage_s3.py +++ b/metrics_utility/test/library/test_storage_s3.py @@ -87,3 +87,56 @@ def test_remove(): storage = StorageS3(**s3_settings) storage.remove(s3_object_name) assert storage.exists(s3_object_name) is False + + +def test_get_data_json(): + storage = StorageS3(**s3_settings) + + # Upload test data with .json extension for auto-detection + json_object_name = f'temporary-object-x{os.getpid()}y.json' + obj = {'foo': 5, 'bar': 'baz'} + storage.put(json_object_name, dict=obj) + + try: + # Get JSON data directly with auto-detection + data = storage.get_data(json_object_name) + assert isinstance(data, dict) + assert data == {'foo': 5, 'bar': 'baz'} + finally: + if storage.exists(json_object_name): + storage.remove(json_object_name) + + +def test_get_data_json_explicit_format(): + storage = StorageS3(**s3_settings) + + # Upload test data with unknown extension to test format parameter + test_object_name = f'temporary-object-x{os.getpid()}y.unknown' + obj = {'foo': 5, 'bar': 'baz'} + storage.put(test_object_name, dict=obj) + + try: + # Specify format parameter + data = storage.get_data(test_object_name, format='json') + assert isinstance(data, dict) + assert data == {'foo': 5, 'bar': 'baz'} + finally: + if storage.exists(test_object_name): + storage.remove(test_object_name) + + +def test_get_data_unsupported_format(): + import pytest + + storage = StorageS3(**s3_settings) + + # Create a file with unsupported extension + unsupported_file = f'test-unsupported-{os.getpid()}.txt' + storage.put(unsupported_file, filename='/dev/null') + + try: + with pytest.raises(ValueError, match='Cannot auto-detect format'): + storage.get_data(unsupported_file) + finally: + if storage.exists(unsupported_file): + storage.remove(unsupported_file) diff --git a/workers/0-gather-controller-crc.py b/workers/0-gather-controller-crc.py index 68c12d3a6..7ef7ead73 100644 --- a/workers/0-gather-controller-crc.py +++ b/workers/0-gather-controller-crc.py @@ -9,8 +9,8 @@ worker_key = 'gather-controller-crc' # assume exceptions are logged & saved in task results by what's running the worker -controller_db = ConnectionHandler(settings.controller_db) -metrics_db = ConnectionHandler(settings.metrics_db) +controller_db = ConnectionHandler({'default': settings.controller_db})['default'] +metrics_db = ConnectionHandler({'default': settings.metrics_db})['default'] crc_storage = library.storage.StorageCRCMutual(settings.crc_storage) # wrappers around datetime, timedelta, timezone - always a datetime with timezone diff --git a/workers/1-gather-controller-full.py b/workers/1-gather-controller-full.py index fdb91f93a..6ebfe7bab 100644 --- a/workers/1-gather-controller-full.py +++ b/workers/1-gather-controller-full.py @@ -8,8 +8,8 @@ worker_key = 'gather-controller-full' -controller_db = ConnectionHandler(settings.controller_db) -metrics_db = ConnectionHandler(settings.metrics_db) +controller_db = ConnectionHandler({'default': settings.controller_db})['default'] +metrics_db = ConnectionHandler({'default': settings.metrics_db})['default'] s3_storage = library.storage.StorageS3(settings.s3_storage) since = library.instants.last_day() diff --git a/workers/5-report-renewal.py b/workers/5-report-renewal.py index e527d818c..a86652e28 100644 --- a/workers/5-report-renewal.py +++ b/workers/5-report-renewal.py @@ -8,8 +8,8 @@ worker_key = 'report-renewal' -controller_db = ConnectionHandler(settings.controller_db) -metrics_db = ConnectionHandler(settings.metrics_db) +controller_db = ConnectionHandler({'default': settings.controller_db})['default'] +metrics_db = ConnectionHandler({'default': settings.metrics_db})['default'] s3_storage = library.storage.StorageS3(settings.s3_storage) # our db, no lock needed diff --git a/workers/6-gather-postgres.py b/workers/6-gather-postgres.py new file mode 100644 index 000000000..403aedf81 --- /dev/null +++ b/workers/6-gather-postgres.py @@ -0,0 +1,53 @@ +# Gather config and job_host_summary collectors, upload to PostgreSQL storage + +from django.db.utils import ConnectionHandler +from settings import settings + +from metrics_utility import library + + +worker_key = 'gather-postgres' + +# Setup database connections +controller_db = ConnectionHandler({'default': settings.controller_db})['default'] +metrics_db = ConnectionHandler({'default': settings.metrics_db})['default'] + +# Create storage table if it doesn't exist +storage_config = { + 'db': metrics_db, + 'table': 'metrics_data', + 'key_field': 'key', + 'value_field': 'value', + 'timestamp_field': 'updated_at', +} + +library.storage.postgres.create_storage_table(**storage_config) + +# Setup PostgreSQL storage using metrics_db +postgres_storage = library.storage.StoragePostgres(**storage_config) + +# Time range for collectors +since = library.instants.last_day() +until = library.instants.this_day() + +# Setup collectors +c = library.collectors.controller +config_collector = c.config(db=controller_db) +job_host_summary_collector = c.job_host_summary(db=controller_db, since=since, until=until) + +# Gather and upload with database lock +with library.lock(db=metrics_db, key=worker_key): + # Gather config (returns dict) + config_data = config_collector.gather() + + # Upload config to PostgreSQL storage + config_key = f'config-{since.date()}-{until.date()}' + postgres_storage.put(config_key, dict=config_data) + + # Gather job_host_summary (returns list of CSV files) + job_host_summary_files = job_host_summary_collector.gather() + + # Upload each CSV file to PostgreSQL storage + for i, csv_file in enumerate(job_host_summary_files): + jhs_key = f'job_host_summary-{since.date()}-{until.date()}-{i:06}' + postgres_storage.put(jhs_key, filename=csv_file) diff --git a/workers/settings.py b/workers/settings.py index 6e3899bf6..94575760a 100644 --- a/workers/settings.py +++ b/workers/settings.py @@ -1,5 +1,12 @@ +import os + from dataclasses import dataclass, field +from metrics_utility import prepare + + +prepare() + @dataclass class Settings: @@ -12,4 +19,21 @@ class Settings: retention: dict = field(default_factory=dict) -settings = Settings() +settings = Settings( + controller_db={ + 'ENGINE': 'django.db.backends.postgresql', + 'NAME': 'awx', + 'USER': 'myuser', + 'PASSWORD': 'mypassword', + 'HOST': os.getenv('METRICS_UTILITY_DB_HOST', 'localhost'), + 'PORT': '5432', + }, + metrics_db={ + 'ENGINE': 'django.db.backends.postgresql', + 'NAME': 'metrics_service', + 'USER': 'metrics_service', + 'PASSWORD': 'metrics_service', + 'HOST': os.getenv('METRICS_UTILITY_DB_HOST', 'localhost'), + 'PORT': '5432', + }, +)