Skip to content

Commit 5e6990d

Browse files
authored
Add more logs on execution info of operands (#2940)
1 parent e713c5e commit 5e6990d

File tree

8 files changed

+45
-13
lines changed

8 files changed

+45
-13
lines changed

mars/core/entity/executable.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,11 @@ def cb(_, sess=ref(session)):
9494
# if decref in isolation, means that this tileable
9595
# is not required for main thread, thus we do not need
9696
# to wait for decref, otherwise, wait a bit
97-
fut.result(0.5)
97+
try:
98+
fut.result(0.5)
99+
except concurrent.futures.TimeoutError:
100+
# ignore timeout
101+
pass
98102

99103
self.tileable = ref(tileable, cb)
100104

mars/dataframe/base/bloom_filter.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def tile(cls, op: "DataFrameBloomFilter"):
7272

7373
# union all chunk filters
7474
combine_size = options.combine_size
75-
while len(chunks) > 4:
75+
while len(chunks) > combine_size:
7676
new_chunks = []
7777
for i in range(0, len(chunks), combine_size):
7878
chks = chunks[i : i + combine_size]

mars/dataframe/groupby/aggregation.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import functools
1616
import itertools
17+
import logging
1718
import typing
1819
import uuid
1920
from typing import List
@@ -59,6 +60,8 @@
5960
cp = lazy_import("cupy", globals=globals(), rename="cp")
6061
cudf = lazy_import("cudf", globals=globals())
6162

63+
logger = logging.getLogger(__name__)
64+
6265
_support_get_group_without_as_index = pd_release_version[:2] > (1, 0)
6366

6467

@@ -617,9 +620,11 @@ def _tile_auto(
617620
if cls._choose_tree_method(
618621
raw_sizes, agg_sizes, len(chunks), len(in_df.chunks), op.chunk_store_limit
619622
):
623+
logger.debug("Choose tree method for groupby operand %s", op)
620624
return cls._combine_tree(op, chunks + left_chunks, out_df, func_infos)
621625
else:
622626
# otherwise, use shuffle
627+
logger.debug("Choose shuffle method for groupby operand %s", op)
623628
return cls._perform_shuffle(
624629
op, chunks + left_chunks, in_df, out_df, func_infos
625630
)

mars/dataframe/merge/merge.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,12 @@ def tile(cls, op: "DataFrameMerge"):
627627
if op.how == "inner" and op.bloom_filter:
628628
if has_unknown_shape(left, right):
629629
yield left.chunks + right.chunks
630+
small_one = right if len(left.chunks) > len(right.chunks) else left
631+
logger.debug(
632+
"Apply bloom filter for operand %s, use DataFrame %s to build bloom filter.",
633+
op,
634+
small_one,
635+
)
630636
left, right = yield from recursive_tile(
631637
*cls._apply_bloom_filter(left, right, left_on, right_on, op)
632638
)
@@ -638,6 +644,7 @@ def tile(cls, op: "DataFrameMerge"):
638644
if op.method == "auto":
639645
# if method is auto, select new method after auto merge
640646
method = cls._choose_merge_method(op, left, right)
647+
logger.debug("Choose %s method for merge operand %s", method, op)
641648
if method == MergeMethod.one_chunk:
642649
ret = cls._tile_one_chunk(op, left, right)
643650
elif method == MergeMethod.broadcast:

mars/serialization/core.pyx

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import asyncio
1717
import datetime
18+
import hashlib
1819
import inspect
1920
import sys
2021
from cpython cimport PyObject
@@ -27,7 +28,6 @@ import numpy as np
2728
import pandas as pd
2829

2930
from .._utils cimport TypeDispatcher
30-
from ..utils import tokenize_int
3131

3232
import cloudpickle
3333

@@ -142,7 +142,9 @@ cdef class Serializer:
142142

143143
@classmethod
144144
def calc_default_serializer_id(cls):
145-
return tokenize_int(f"{cls.__module__}.{cls.__qualname__}") % _SERIALIZER_ID_PRIME
145+
s = f"{cls.__module__}.{cls.__qualname__}"
146+
h = hashlib.md5(s.encode())
147+
return int(h.hexdigest(), 16) % _SERIALIZER_ID_PRIME
146148

147149
@classmethod
148150
def register(cls, obj_type):

mars/services/scheduling/worker/execution.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -111,16 +111,18 @@ def _fill_subtask_result_with_exception(
111111
_, exc, tb = sys.exc_info()
112112
if isinstance(exc, asyncio.CancelledError):
113113
status = SubtaskStatus.cancelled
114-
log_str = "Cancel"
114+
logger.exception(
115+
"Cancel run subtask %s on band %s",
116+
subtask.subtask_id,
117+
subtask_info.band_name,
118+
)
115119
else:
116120
status = SubtaskStatus.errored
117-
log_str = "Failed to"
118-
logger.exception(
119-
"%s run subtask %s on band %s",
120-
log_str,
121-
subtask.subtask_id,
122-
subtask_info.band_name,
123-
)
121+
logger.exception(
122+
"Failed to run subtask %s on band %s",
123+
subtask.subtask_id,
124+
subtask_info.band_name,
125+
)
124126
subtask_info.result.status = status
125127
subtask_info.result.progress = 1.0
126128
subtask_info.result.error = exc

mars/services/session/supervisor/core.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
import asyncio
1616
import functools
17+
import logging
18+
import os
1719
import time
1820
from typing import Dict, List, Optional
1921

@@ -23,6 +25,8 @@
2325
from ...core import NodeRole, create_service_session, destroy_service_session
2426
from ..core import SessionInfo
2527

28+
logger = logging.getLogger(__name__)
29+
2630

2731
class SessionManagerActor(mo.Actor):
2832
def __init__(self, service_config: Optional[Dict] = None):
@@ -164,6 +168,11 @@ async def __post_create__(self):
164168
address=self.address,
165169
uid=CustomLogMetaActor.gen_uid(self._session_id),
166170
)
171+
logger.debug(
172+
"Session %s actor created on pid: %s",
173+
self._session_id,
174+
os.getpid(),
175+
)
167176

168177
async def remove(self):
169178
await destroy_service_session(

setup.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,13 @@
7272
cythonize_kw["compiler_directives"] = {"linetrace": True}
7373

7474
if "MSC" in sys.version:
75-
extra_compile_args = ["/Ot", "/I" + os.path.join(repo_root, "misc")]
75+
extra_compile_args = ["/std:c11", "/Ot", "/I" + os.path.join(repo_root, "misc")]
7676
cy_extension_kw["extra_compile_args"] = extra_compile_args
7777
else:
7878
extra_compile_args = ["-O3"]
79+
if sys.platform != "darwin":
80+
# for macOS, we assume that C++ 11 is enabled by default
81+
extra_compile_args.append("-std=c++0x")
7982
cy_extension_kw["extra_compile_args"] = extra_compile_args
8083

8184

0 commit comments

Comments
 (0)