Skip to content
Draft
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

## 0.7.0dev

- Removed direct AWX dependencies from collectors module
- Replaced AWX license functions with direct database queries
- Drop psycopg2 support - no longer compatible with awx older than 22.4.0
- Removed direct AWX imports from config collector, replaced with db queries
- added `metrics_utility.library`
- TODO

Expand Down
31 changes: 6 additions & 25 deletions metrics_utility/automation_controller_billing/collectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from django.db.utils import ProgrammingError
from django.utils.timezone import now, timedelta
from django.utils.translation import gettext_lazy as _
from psycopg.errors import UndefinedTable

from metrics_utility.automation_controller_billing.helpers import (
get_config_and_settings_from_db,
Expand All @@ -29,14 +30,6 @@
from .prometheus_client import PrometheusClient


try:
from psycopg.errors import UndefinedTable
except ImportError:

class UndefinedTable(Exception):
pass


"""
This module is used to define metrics collected by
gather_automation_controller_billing_data command. Each function is
Expand Down Expand Up @@ -165,27 +158,15 @@ def _copy_table(table, query, path, prepend_query=None):
if prepend_query:
cursor.execute(prepend_query)

if hasattr(cursor, 'copy_expert') and callable(cursor.copy_expert):
_copy_table_aap_2_4_and_below(cursor, query, file)
else:
_copy_table_aap_2_5_and_above(cursor, query, file)
# Use psycopg (v3) cursor.copy() method
with cursor.copy(query) as copy:
while data := copy.read():
byte_data = bytes(data)
file.write(byte_data.decode())

return file.file_list(keep_empty=True)


def _copy_table_aap_2_4_and_below(cursor, query, file):
# Automation Controller 4.4 and below use psycopg2 with .copy_expert() method
cursor.copy_expert(query, file)


def _copy_table_aap_2_5_and_above(cursor, query, file):
# Automation Controller 4.5 and above use psycopg3 with .copy() method
with cursor.copy(query) as copy:
while data := copy.read():
byte_data = bytes(data)
file.write(byte_data.decode())


def yaml_and_json_parsing_functions():
query = """
-- Define function for parsing field out of yaml encoded as text
Expand Down
8 changes: 5 additions & 3 deletions metrics_utility/library/collectors/controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import distro

from django.utils.dateparse import parse_datetime
from psycopg import sql

from ..util import collector

Expand Down Expand Up @@ -99,9 +100,10 @@ def _get_install_type():
def _get_controller_settings(db, keys):
settings = {}
with db.cursor() as cursor:
# FIXME: psycopg.sql ?
in_sql = "'" + "', '".join(keys) + "'"
cursor.execute(f'SELECT key, value FROM conf_setting WHERE key IN ({in_sql})')
# Build safe SQL query with parameter placeholders
placeholders = sql.SQL(', ').join(sql.Placeholder() * len(keys))
query = sql.SQL('SELECT key, value FROM conf_setting WHERE key IN ({})').format(placeholders)
cursor.execute(query, keys)
for key, value in cursor.fetchall():
if value:
settings[key] = json.loads(value, object_hook=_datetime_hook)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
from psycopg import sql

from ..util import collector, copy_table


@collector
def job_host_summary_service(*, db=None, since=None, until=None, output_dir=None):
where = ' AND '.join(
[
f"mu.finished >= '{since.isoformat()}'",
f"mu.finished < '{until.isoformat()}'",
]
)
# Build WHERE clause using parameterized query
where_parts = [
sql.SQL('mu.finished >= %(since)s'),
sql.SQL('mu.finished < %(until)s'),
]
where_clause = sql.SQL(' AND ').join(where_parts)

query = f"""
query_template = sql.SQL("""
WITH
-- First: restrict to jobs that FINISHED in the window (uses index on main_unifiedjob.finished if present)
filtered_jobs AS (
Expand Down Expand Up @@ -78,6 +80,17 @@ def job_host_summary_service(*, db=None, since=None, until=None, output_dir=None
LEFT JOIN main_organization mo ON mo.id = mu.organization_id
LEFT JOIN hosts_variables hv ON hv.host_id = mjs.host_id
ORDER BY mu.finished ASC
"""
""").format(where=where_clause)

return copy_table(db=db, table='main_jobhostsummary', query=query, prepend_query=True, output_dir=output_dir)
# Convert to string and pass params (no context needed, uses default encoding)
query = query_template.as_string()
params = {'since': since, 'until': until}

return copy_table(
db=db,
table='main_jobhostsummary',
query=query,
params=params,
prepend_query=True,
output_dir=output_dir,
)
59 changes: 52 additions & 7 deletions metrics_utility/library/collectors/controller/main_host.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
from psycopg import sql

from ..util import collector, copy_table, date_where


def _main_host_query(where):
return f"""
"""
Build main_host query with dynamic WHERE clause.

Args:
where: Either a string (for simple WHERE clauses) or sql.SQL object

Returns:
sql.SQL query or string
"""
query_template = """
SELECT
main_host.name as host_name,
main_host.id AS host_id,
Expand Down Expand Up @@ -85,6 +96,13 @@ def _main_host_query(where):
ORDER BY main_host.id ASC
"""

# If where is a string, return a simple f-string formatted query
if isinstance(where, str):
return query_template.format(where=where)

# Otherwise, it's an sql.SQL object, use sql.SQL formatting
return sql.SQL(query_template).format(where=where)


@collector
def main_host(*, db=None, output_dir=None):
Expand All @@ -96,10 +114,37 @@ def main_host(*, db=None, output_dir=None):
def main_host_daily(*, db=None, since=None, until=None, output_dir=None):
# prefer running with until=False, to not skip hosts that keep being modified

where = f"""
# Build WHERE clause using date_where
created_where, created_params = date_where('main_host.created', since, until)
modified_where, modified_params = date_where('main_host.modified', since, until)

# Rename params to avoid conflicts
params_created = {f'created_{k}': v for k, v in created_params.items()}
params_modified = {f'modified_{k}': v for k, v in modified_params.items()}
all_params = {**params_created, **params_modified}

# Update the SQL queries to use renamed params
created_where_str = created_where.as_string(db).replace('%(since)s', '%(created_since)s').replace('%(until)s', '%(created_until)s')
modified_where_str = modified_where.as_string(db).replace('%(since)s', '%(modified_since)s').replace('%(until)s', '%(modified_until)s')

where = sql.SQL("""
enabled='t'
AND ({date_where('main_host.created', since, until)}
OR {date_where('main_host.modified', since, until)})
"""
query = _main_host_query(where)
return copy_table(db=db, table='main_host_daily', query=query, prepend_query=True, output_dir=output_dir)
AND ({created}
OR {modified})
""").format(
created=sql.SQL(created_where_str),
modified=sql.SQL(modified_where_str),
)

query_obj = _main_host_query(where)
# Convert to string (no context needed, uses default encoding)
query = query_obj.as_string()

return copy_table(
db=db,
table='main_host_daily',
query=query,
params=all_params,
prepend_query=True,
output_dir=output_dir,
)
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from datetime import timedelta

from psycopg import sql

from ..util import collector, copy_table


Expand Down Expand Up @@ -31,7 +33,7 @@ def main_jobevent_service(*, db=None, since=None, until=None, output_dir=None):
# We are loading the finished jobs then we are filtering
# for the job_created, this cannot be done by simple joins because
# job_created is partitioned and partitions pruning dont work with joins
job_ids_set = set(job_id for job_id, _ in jobs)
job_ids_list = [job_id for job_id, _ in jobs]

# Extract unique hour boundaries from job_created timestamps
# This reduces potentially 100K timestamps down to ~100-1000 hourly ranges
Expand Down Expand Up @@ -67,28 +69,43 @@ def main_jobevent_service(*, db=None, since=None, until=None, output_dir=None):

# Build WHERE clause with consolidated ranges for partition pruning
# PostgreSQL can see these literal timestamps and prune partitions accordingly
# NOTE: We use literal timestamps here (not params) for partition pruning optimization
or_clauses = []
for range_start, range_end in ranges:
or_clauses.append(f"(e.job_created >= '{range_start.isoformat()}'::timestamptz AND e.job_created < '{range_end.isoformat()}'::timestamptz)")
or_clauses.append(
sql.SQL('(e.job_created >= {start}::timestamptz AND e.job_created < {end}::timestamptz)').format(
start=sql.Literal(range_start.isoformat()),
end=sql.Literal(range_end.isoformat()),
)
)

# Handle edge case: if no ranges, use FALSE to return empty result set
# This maintains valid SQL structure while returning 0 rows
timestamp_where_clause = ' OR '.join(or_clauses) if or_clauses else 'FALSE'
if or_clauses:
timestamp_where_clause = sql.SQL(' OR ').join(or_clauses)
else:
timestamp_where_clause = sql.SQL('FALSE')

# Build job_id IN clause
# Build job_id IN clause using params
# Handle edge case: if no jobs, use FALSE to return empty result set with proper schema
if job_ids_set:
job_ids_str = ','.join(str(job_id) for job_id in job_ids_set)
job_id_where_clause = f'e.job_id IN ({job_ids_str})'
if job_ids_list:
# Build parameterized IN clause
job_id_placeholders = sql.SQL(', ').join(sql.Placeholder() * len(job_ids_list))
job_id_where_clause = sql.SQL('e.job_id IN ({})').format(job_id_placeholders)
job_params = job_ids_list
else:
job_id_where_clause = 'FALSE'
job_id_where_clause = sql.SQL('FALSE')
job_params = []

# Combine both WHERE conditions
where_clause = f'({timestamp_where_clause}) AND ({job_id_where_clause})'
where_clause = sql.SQL('({timestamp}) AND ({job_ids})').format(
timestamp=timestamp_where_clause,
job_ids=job_id_where_clause,
)

# Final event query
# - WHERE clause filters by job_id and enables partition pruning via literal hour boundaries
query = f"""
query_template = sql.SQL("""
SELECT
e.id,
e.created,
Expand Down Expand Up @@ -137,7 +154,10 @@ def main_jobevent_service(*, db=None, since=None, until=None, output_dir=None):
SELECT replace(e.event_data, '\\u', '\\u005cu')::jsonb AS event_data
) AS ed
LEFT JOIN main_unifiedjob uj ON uj.id = e.job_id
WHERE {where_clause}
"""
WHERE {where}
""").format(where=where_clause)

# Convert to string for copy_table (no context needed, uses default encoding)
query = query_template.as_string()

return copy_table(db=db, table='main_jobevent', query=query, output_dir=output_dir)
return copy_table(db=db, table='main_jobevent', query=query, params=job_params, output_dir=output_dir)
59 changes: 28 additions & 31 deletions metrics_utility/library/collectors/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,39 @@
import pathlib
import tempfile

from psycopg import sql

from ..csv_file_splitter import CsvFileSplitter


# FIXME: psycopg.sql
def date_where(field, since, until):
"""
Build a WHERE clause for date filtering using psycopg.sql for safe query building.

Args:
field: Field name (will be properly escaped as an identifier)
since: Optional datetime - include records >= since
until: Optional datetime - include records < until

Returns:
A tuple of (sql.SQL object, dict of params)
"""
if since and until:
return f'( "{field}" >= \'{since.isoformat()}\' AND "{field}" < \'{until.isoformat()}\' )'
query = sql.SQL('( {field} >= %(since)s AND {field} < %(until)s )').format(field=sql.Identifier(field))
params = {'since': since, 'until': until}
return query, params

if since:
return f'( "{field}" >= \'{since.isoformat()}\' )'
query = sql.SQL('( {field} >= %(since)s )').format(field=sql.Identifier(field))
params = {'since': since}
return query, params

if until:
return f'( "{field}" < \'{until.isoformat()}\' )'
query = sql.SQL('( {field} < %(until)s )').format(field=sql.Identifier(field))
params = {'until': until}
return query, params

return 'true'
return sql.SQL('true'), {}


def collector(func):
Expand Down Expand Up @@ -59,38 +77,17 @@ def copy_table(db, table, query, params=None, prepend_query=False, output_file=N

copy_query = f'COPY ({query}) TO STDOUT WITH CSV HEADER'

# FIXME: remove once 2.4 is no longer supported
if hasattr(cursor, 'copy_expert') and callable(cursor.copy_expert):
_copy_table_aap_2_4_and_below(cursor, copy_query, params, file)
else:
_copy_table_aap_2_5_and_above(cursor, copy_query, params, file)
# Use psycopg (v3) cursor.copy() method
with cursor.copy(copy_query, params) as copy:
while data := copy.read():
byte_data = bytes(data)
file.write(byte_data.decode())

if output_file:
return [output_file.name]
return file.file_list(keep_empty=True)


def _copy_table_aap_2_4_and_below(cursor, query, params, file):
if params:
# copy_expert doesn't support params, make do (but no escaping)
for p in params:
if f'%({p})s' in query:
query = query.replace(f'%({p})s', f'"{params[p]}"')
if f'%({p})d' in query:
query = query.replace(f'%({p})d', str(int(params[p])))

# Automation Controller 4.4 and below use psycopg2 with .copy_expert() method
cursor.copy_expert(query, file)


def _copy_table_aap_2_5_and_above(cursor, query, params, file):
# Automation Controller 4.5 and above use psycopg3 with .copy() method
with cursor.copy(query, params) as copy:
while data := copy.read():
byte_data = bytes(data)
file.write(byte_data.decode())


def _yaml_json_functions():
return """
-- Define function for parsing field out of yaml encoded as text
Expand Down
Loading