Skip to content

Commit 826b58f

Browse files
committed
Sync with upstream metrics emit in modin-project/modin#7550
1 parent 559c2bb commit 826b58f

File tree

3 files changed

+41
-75
lines changed

3 files changed

+41
-75
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
*PANDAS_REQUIREMENTS,
4545
### For private customer releases
4646
#"modin @ git+https://github.com/sfc-gh-joshi/modin.git@joshi/hybrid-do-not-release", # TODO point at main
47-
"modin @ git+https://github.com/modin-project/modin.git@hybrid-client-alpha",
47+
"modin @ git+https://github.com/modin-project/modin.git@main",
4848
"tqdm",
4949
]
5050
DEVELOPMENT_REQUIREMENTS = [

src/snowflake/snowpark/modin/plugin/_internal/telemetry.py

Lines changed: 30 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -625,10 +625,8 @@ def snowpark_pandas_api_watcher(api_name: str, _time: Union[int, float]) -> None
625625
)
626626

627627
@cached(cache={})
628-
def get_user_source_location(mode, group) -> str:
629-
628+
def get_user_source_location(group) -> str:
630629
import inspect
631-
632630
stack = inspect.stack()
633631
frame_before_snowpandas = None
634632
location = "<unknown>"
@@ -644,81 +642,43 @@ def get_user_source_location(mode, group) -> str:
644642
and frame_before_snowpandas.code_context is not None
645643
):
646644
location = frame_before_snowpandas.code_context[0].replace("\n", "")
647-
return {'mode': mode, 'group': group, 'location': location }
645+
return { 'group': group, 'source': location }
646+
647+
648648

649649
def get_hybrid_switch_log():
650650
global hybrid_switch_log
651651
return hybrid_switch_log.copy()
652-
653-
def add_to_hybrid_switch_log(metrics: dict):
654-
global hybrid_switch_log
655-
try:
656-
mode = metrics['mode']
657-
source = get_user_source_location(mode, metrics['group'])['location']
658-
if len(source) > 40:
659-
source = source[0:17] + "..." + source[-20:-1] + source[-1]
660-
hybrid_switch_log = native_pd.concat([hybrid_switch_log,
661-
native_pd.DataFrame({'source': [source],
662-
'mode': [metrics['mode']],
663-
'group': [metrics['group']],
664-
'metric': [metrics['metric']],
665-
'submetric': [metrics['submetric'] or None],
666-
'value': [metrics['value']],
667-
'from': [metrics['from'] if 'from' in metrics else None],
668-
'to': [metrics['to'] if 'to' in metrics else None],
669-
})])
670-
except Exception as e:
671-
print(f"Exception: {type(e).__name__} - {e}")
672652

673653

674-
def hybrid_metrics_watcher(metric_name: str, value: Union[int, float]) -> None:
654+
def hybrid_metrics_watcher(metric_name: str, metric_value: Union[int, float]) -> None:
655+
global hybrid_switch_log
656+
mode = None
675657
if metric_name.startswith("modin.hybrid.auto"):
676-
tokens = metric_name.split(".")
677-
from_engine = None
678-
to_engine = None
679-
metric = None
680-
group = None
681-
submetric = None
682-
if len(tokens) >= 9:
683-
from_engine = tokens[4]
684-
to_engine = tokens[6]
685-
metric = tokens[7]
686-
if len(tokens) == 9:
687-
group = tokens[8]
688-
if len(tokens) == 10:
689-
submetric = tokens[8]
690-
group = tokens[9]
691-
add_to_hybrid_switch_log({'mode': 'single',
692-
'from': from_engine,
693-
'to': to_engine,
694-
'metric': metric,
695-
'submetric': submetric,
696-
'group': group,
697-
'value': value})
698-
if metric_name.startswith("modin.hybrid.cast"):
699-
tokens = metric_name.split(".")
700-
to_engine = None
701-
metric = None
702-
submetric = None
703-
group = None
704-
if len(tokens) == 7 and tokens[3] == 'to' and tokens[5] == 'cost':
705-
to_engine = tokens[4]
706-
group = tokens[6]
707-
metric = 'cost'
708-
if len(tokens) == 6 and tokens[3] == 'decision':
709-
submetric = tokens[4]
710-
group = tokens[5]
711-
metric = 'decision'
712-
add_to_hybrid_switch_log({'mode': 'merge',
713-
'to': to_engine,
714-
'metric': metric,
715-
'submetric': submetric,
716-
'group': group,
717-
'value': value})
718-
658+
mode = "auto"
659+
elif metric_name.startswith("modin.hybrid.merge"):
660+
mode = "merge"
661+
else:
662+
return
663+
tokens = metric_name.split(".")[3:]
664+
entry = {'mode': mode}
665+
while len(tokens) >= 2:
666+
key = tokens.pop(0)
667+
if key == 'api':
668+
value = tokens.pop(0) + '.' + tokens.pop(0)
669+
else:
670+
value = tokens.pop(0)
671+
entry[key] = value
719672

720-
721-
673+
if len(tokens) == 1:
674+
key = tokens.pop(0)
675+
entry[key] = metric_value
676+
677+
source = get_user_source_location(entry['group'])
678+
entry['source'] = source['source']
679+
new_row = native_pd.DataFrame(entry, index=[0])
680+
hybrid_switch_log = native_pd.concat([hybrid_switch_log, new_row])
681+
722682
def connect_modin_telemetry() -> None:
723683
MetricsMode.enable()
724684
add_metric_handler(snowpark_pandas_api_watcher)

src/snowflake/snowpark/modin/plugin/extensions/pd_extensions.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,16 @@ def _snowpark_pandas_obj_check(obj: Union[DataFrame, Series]):
3939

4040
@register_pd_accessor("explain")
4141
def explain(last=20) -> native_pd.DataFrame:
42-
stats = get_hybrid_switch_log().tail(last)
43-
stats = stats.drop_duplicates().fillna(value=' ')
44-
stats2 = stats.reset_index()
45-
return stats2.set_index([ 'source', 'mode', 'from', 'to', 'metric']).drop(columns=['index', 'group'])
42+
stats = get_hybrid_switch_log()
43+
stats = stats.reset_index(drop=True).sort_index().reset_index()
44+
stats['decision'] = stats.groupby(['group']).bfill().ffill()['decision']
45+
stats['api'] = stats.groupby(['group']).bfill().ffill()['api']
46+
stats = stats.groupby('group', sort=False).apply(include_groups=False,func=lambda x: x.melt(ignore_index=False, id_vars=['source', 'api', 'decision', 'candidate', 'index', 'mode'], var_name='metric', value_vars=['stay_cost', 'move_to_cost', 'other_execute_cost', 'cost', 'delta', 'rows', 'cols'])).dropna()
47+
stats = stats.set_index(['source', 'decision', 'api', 'index'])
48+
stats = stats.sort_index(level='index')
49+
stats['value'] = stats['value'].astype(int)
50+
stats.reset_index().drop(columns='index').set_index(['source', 'decision', 'api'])
51+
return stats.tail(last)
4652

4753
@register_pd_accessor("read_snowflake")
4854
def read_snowflake(

0 commit comments

Comments
 (0)