Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 2 additions & 52 deletions metrics_utility/library/dataframes/base_traditional.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,6 @@
# a dataframe class with logic for merges based on lists of indexes and merge operations
# used by DataframeMainJobevent, DataframeMainHost and DataframeJobHostSummary
class BaseTraditional(BaseDataframe):
def cast_dataframe(self, df):
types = self.cast_types()
levels = []
if len(self.unique_index_columns()) == 1:
# Special behavior if the index is not composite, but only 1 column
# Casting index field to object
df.index = df.index.astype(object)
else:
# Composite index branch
# Casting index field to object
for index, _level in enumerate(df.index.levels):
casted_level = df.index.levels[index].astype(object)
levels.append(casted_level)

df.index = df.index.set_levels(levels)

return df.astype(types)

def dedup(self, dataframe, hostname_mapping=None, **kwargs):
if dataframe is None or dataframe.empty:
return self.empty()
Expand All @@ -45,41 +27,13 @@ def dedup(self, dataframe, hostname_mapping=None, **kwargs):
df_grouped = self.regroup(df)

# cast types to match the table
df_grouped = self.cast_dataframe(df_grouped)
return df_grouped.reset_index()
df_grouped = df_grouped.astype(self.cast_types())

def summarize_merged_dataframes(self, df, columns, operations={}):
for col in columns:
if operations.get(col) == 'min':
df[col] = df[[f'{col}_x', f'{col}_y']].min(axis=1)
elif operations.get(col) == 'max':
df[col] = df[[f'{col}_x', f'{col}_y']].max(axis=1)
elif operations.get(col) == 'combine_set':
df[col] = df.apply(lambda row: combine_set(row.get(f'{col}_x'), row.get(f'{col}_y')), axis=1)
elif operations.get(col) == 'combine_json':
df[col] = df.apply(lambda row: combine_json(row.get(f'{col}_x'), row.get(f'{col}_y')), axis=1)
elif operations.get(col) == 'combine_json_values':
df[col] = df.apply(lambda row: combine_json_values(row.get(f'{col}_x'), row.get(f'{col}_y')), axis=1)
else:
df[col] = df[[f'{col}_x', f'{col}_y']].sum(axis=1)
del df[f'{col}_x']
del df[f'{col}_y']
return df
return df_grouped.reset_index()

def empty(self):
return pd.DataFrame(columns=self.unique_index_columns() + self.data_columns())

# Multipart collection, merge the dataframes and sum counts
# used by BaseDataframe.add_rollup
def merge(self, rollup, new_group):
if rollup is None:
return new_group

rollup = pd.merge(rollup.loc[:,], new_group.loc[:,], on=self.unique_index_columns(), how='outer')
rollup = self.summarize_merged_dataframes(rollup, self.data_columns(), operations=self.operations())
rollup = self.cast_dataframe(rollup)
return rollup

@staticmethod
def cast_types():
pass
Expand All @@ -88,10 +42,6 @@ def cast_types():
def data_columns():
pass

@staticmethod
def operations():
pass

@staticmethod
def unique_index_columns():
pass
Expand Down
90 changes: 75 additions & 15 deletions metrics_utility/library/dataframes/job_host_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from metrics_utility.library.dataframes.base_traditional import (
BaseTraditional,
combine_json_values,
combine_set,
merge_arrays,
merge_json_sets,
merge_setdicts,
Expand Down Expand Up @@ -97,7 +99,17 @@ def group(self, dataframe):
facts=('facts', merge_json_sets),
host_names_before_dedup=('host_names_before_dedup', set),
)
return self.cast_dataframe(group)

return group.astype(
{
'task_runs': int,
'host_runs': int,
'managed_node_type': int,
'first_automation': 'datetime64[ns]',
'last_automation': 'datetime64[ns]',
'job_created': 'datetime64[ns]',
}
)

# Merge pre-aggregated
def regroup(self, dataframe):
Expand All @@ -115,6 +127,68 @@ def regroup(self, dataframe):
host_names_before_dedup=('host_names_before_dedup', merge_sets),
)

def merge(self, rollup, new_group):
if rollup is None:
return new_group

df = pd.merge(rollup.loc[:,], new_group.loc[:,], on=self.unique_index_columns(), how='outer')

# Apply aggregations directly
df['host_runs'] = df[['host_runs_x', 'host_runs_y']].sum(axis=1)
df['task_runs'] = df[['task_runs_x', 'task_runs_y']].sum(axis=1)
df['first_automation'] = df[['first_automation_x', 'first_automation_y']].min(axis=1)
df['last_automation'] = df[['last_automation_x', 'last_automation_y']].max(axis=1)
df['job_created'] = df[['job_created_x', 'job_created_y']].max(axis=1)
df['managed_node_type'] = df[['managed_node_type_x', 'managed_node_type_y']].min(axis=1)
df['managed_node_types_set'] = df.apply(
lambda row: combine_set(row.get('managed_node_types_set_x'), row.get('managed_node_types_set_y')), axis=1
)
df['events'] = df.apply(lambda row: combine_set(row.get('events_x'), row.get('events_y')), axis=1)
df['canonical_facts'] = df.apply(lambda row: combine_json_values(row.get('canonical_facts_x'), row.get('canonical_facts_y')), axis=1)
df['facts'] = df.apply(lambda row: combine_json_values(row.get('facts_x'), row.get('facts_y')), axis=1)
df['host_names_before_dedup'] = df.apply(
lambda row: combine_set(row.get('host_names_before_dedup_x'), row.get('host_names_before_dedup_y')), axis=1
)

# Drop the _x and _y columns
df = df.drop(
columns=[
'host_runs_x',
'host_runs_y',
'task_runs_x',
'task_runs_y',
'first_automation_x',
'first_automation_y',
'last_automation_x',
'last_automation_y',
'job_created_x',
'job_created_y',
'managed_node_type_x',
'managed_node_type_y',
'managed_node_types_set_x',
'managed_node_types_set_y',
'events_x',
'events_y',
'canonical_facts_x',
'canonical_facts_y',
'facts_x',
'facts_y',
'host_names_before_dedup_x',
'host_names_before_dedup_y',
]
)

return df.astype(
{
'task_runs': int,
'host_runs': int,
'managed_node_type': int,
'first_automation': 'datetime64[ns]',
'last_automation': 'datetime64[ns]',
'job_created': 'datetime64[ns]',
}
)

@staticmethod
def unique_index_columns():
return ['organization_name', 'job_template_name', 'host_name', 'original_host_name', 'install_uuid', 'job_remote_id']
Expand Down Expand Up @@ -146,20 +220,6 @@ def cast_types():
'job_created': 'datetime64[ns]',
}

@staticmethod
def operations():
return {
'first_automation': 'min',
'last_automation': 'max',
'job_created': 'max',
'managed_node_type': 'min',
'managed_node_types_set': 'combine_set',
'events': 'combine_set',
'canonical_facts': 'combine_json_values',
'facts': 'combine_json_values',
'host_names_before_dedup': 'combine_set',
}

def dedup(self, dataframe, hostname_mapping=None, scope_dataframe=None, deduplicator=None):
"""
Override dedup method to enrich canonical facts and facts from scope_dataframe
Expand Down
64 changes: 50 additions & 14 deletions metrics_utility/library/dataframes/main_host.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import pandas as pd

from metrics_utility.library.dataframes.base_traditional import BaseTraditional, merge_json_sets, merge_setdicts, merge_sets, parse_json
from metrics_utility.library.dataframes.base_traditional import (
BaseTraditional,
combine_json_values,
combine_set,
merge_json_sets,
merge_setdicts,
merge_sets,
parse_json,
)


def compute_serial(row):
Expand Down Expand Up @@ -52,7 +60,8 @@ def group(self, dataframe):
serials=('serial', set),
host_names_before_dedup=('host_names_before_dedup', set),
)
return self.cast_dataframe(group)

return group.astype({'last_automation': 'datetime64[ns]'})

# Merge pre-aggregated
def regroup(self, dataframe):
Expand All @@ -66,6 +75,45 @@ def regroup(self, dataframe):
host_names_before_dedup=('host_names_before_dedup', merge_sets),
)

def merge(self, rollup, new_group):
if rollup is None:
return new_group

df = pd.merge(rollup.loc[:,], new_group.loc[:,], on=self.unique_index_columns(), how='outer')

# Apply aggregations directly
df['last_automation'] = df[['last_automation_x', 'last_automation_y']].max(axis=1)
df['organizations'] = df.apply(lambda row: combine_set(row.get('organizations_x'), row.get('organizations_y')), axis=1)
df['inventories'] = df.apply(lambda row: combine_set(row.get('inventories_x'), row.get('inventories_y')), axis=1)
df['canonical_facts'] = df.apply(lambda row: combine_json_values(row.get('canonical_facts_x'), row.get('canonical_facts_y')), axis=1)
df['facts'] = df.apply(lambda row: combine_json_values(row.get('facts_x'), row.get('facts_y')), axis=1)
df['serials'] = df.apply(lambda row: combine_set(row.get('serials_x'), row.get('serials_y')), axis=1)
df['host_names_before_dedup'] = df.apply(
lambda row: combine_set(row.get('host_names_before_dedup_x'), row.get('host_names_before_dedup_y')), axis=1
)

# Drop the _x and _y columns
df = df.drop(
columns=[
'last_automation_x',
'last_automation_y',
'organizations_x',
'organizations_y',
'inventories_x',
'inventories_y',
'canonical_facts_x',
'canonical_facts_y',
'facts_x',
'facts_y',
'serials_x',
'serials_y',
'host_names_before_dedup_x',
'host_names_before_dedup_y',
]
)

return df.astype({'last_automation': 'datetime64[ns]'})

@staticmethod
def unique_index_columns():
return ['host_name', 'install_uuid']
Expand All @@ -77,15 +125,3 @@ def data_columns():
@staticmethod
def cast_types():
return {'last_automation': 'datetime64[ns]'}

@staticmethod
def operations():
return {
'last_automation': 'max',
'organizations': 'combine_set',
'inventories': 'combine_set',
'canonical_facts': 'combine_json_values',
'facts': 'combine_json_values',
'serials': 'combine_set',
'host_names_before_dedup': 'combine_set',
}
24 changes: 18 additions & 6 deletions metrics_utility/library/dataframes/main_jobevent.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import re

import pandas as pd

from metrics_utility.library.dataframes.base_traditional import BaseTraditional


Expand Down Expand Up @@ -54,8 +56,7 @@ def group(self, dataframe):
# Duration is null in older versions of Controller
group['duration'] = group.duration.fillna(0)

# Tweak types to match the table
return self.cast_dataframe(group)
return group.astype({'duration': 'float64', 'task_runs': 'int64'})

# Merge pre-aggregated
def regroup(self, dataframe):
Expand All @@ -64,6 +65,21 @@ def regroup(self, dataframe):
duration=('duration', 'sum'),
)

def merge(self, rollup, new_group):
if rollup is None:
return new_group

df = pd.merge(rollup.loc[:,], new_group.loc[:,], on=self.unique_index_columns(), how='outer')

# Apply aggregations directly (both use sum)
df['task_runs'] = df[['task_runs_x', 'task_runs_y']].sum(axis=1)
df['duration'] = df[['duration_x', 'duration_y']].sum(axis=1)

# Drop the _x and _y columns
df = df.drop(columns=['task_runs_x', 'task_runs_y', 'duration_x', 'duration_y'])

return df.astype({'duration': 'float64', 'task_runs': 'int64'})

@staticmethod
def extract_collection_name(x):
if x is None:
Expand Down Expand Up @@ -98,7 +114,3 @@ def data_columns():
@staticmethod
def cast_types():
return {'duration': 'float64', 'task_runs': 'int64'}

@staticmethod
def operations():
return {}