Skip to content

Commit 56acab7

Browse files
authored
Reduce time cost of cpu_percent() calls (#2567)
1 parent 90ffab1 commit 56acab7

File tree

9 files changed

+60
-26
lines changed

9 files changed

+60
-26
lines changed

azure-pipelines.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ jobs:
8383
8484
if [[ "$(mars.test.module)" == "learn" ]]; then
8585
# remove version limit when blue-yonder/tsfresh#897 is fixed.
86-
pip install xgboost lightgbm tensorflow faiss-cpu torch torchvision \
86+
# remove keras version after https://github.com/tensorflow/tensorflow/issues/52922 is fixed.
87+
pip install xgboost lightgbm keras==2.6.0 tensorflow faiss-cpu torch torchvision \
8788
statsmodels\<0.13.0 tsfresh
8889
fi
8990
fi

docs/source/locale/zh_CN/LC_MESSAGES/development/operand.po

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ msgid ""
5555
"which is useful for serialization. If the type is uncertain, ``AnyField``"
5656
" will work."
5757
msgstr ""
58-
"对于 ``SimpleReadCSV`` 算子,类持有的 ``path`` 属性记录 csv 的文件地址,这里使用``StringField`` "
58+
"对于 ``SimpleReadCSV`` 算子,类持有的 ``path`` 属性记录 csv 的文件地址,这里使用 ``StringField`` "
5959
"表示改属性的类型是字符串,指定类型主要是为了方便序列化算子,如果某个属性的类型是不确定的,可以用 ``AnyField`` 表示。"
6060

6161
#: ../../source/development/operand.rst:52
@@ -93,7 +93,7 @@ msgid ""
9393
" at delimiter boundaries."
9494
msgstr ""
9595
"当拆分好的子任务被分发到执行器时,Mars 会调用算子的 ``execute`` 方法来做计算,对于 ``read_csv`` "
96-
"的子任务,在函数里需要根据 ``offset`` 和 ``length``读取对应的数据块,但是这两个值只是一个粗略的值,因为 csv "
96+
"的子任务,在函数里需要根据 ``offset`` 和 ``length`` 读取对应的数据块,但是这两个值只是一个粗略的值,因为 csv "
9797
"文件不能从一行的中间读取,所以每次执行的时候需要计算出分隔符所在的起始位置。"
9898

9999
#: ../../source/development/operand.rst:190
@@ -117,7 +117,7 @@ msgid ""
117117
"taken to infer some meta information of Mars DataFrame, such as dtypes, "
118118
"columns, index, etc."
119119
msgstr ""
120-
"最后,需要定义一个暴露给用户的函数接口 ``read_csv``。在这个函数里,我们需要创建``SimpleReadCSV`` "
120+
"最后,需要定义一个暴露给用户的函数接口 ``read_csv``。在这个函数里,我们需要创建 ``SimpleReadCSV`` "
121121
"算子,并且需要读取一小块采样数据,推断出输出的 DataFrame 的dtypes, columns, index 等元信息。"
122122

123123
#: ../../source/development/operand.rst:223

mars/deploy/oscar/session.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1206,8 +1206,6 @@ async def _init(
12061206
if new:
12071207
# create new session
12081208
await session_api.create_session(session_id)
1209-
else:
1210-
await session_api.get_session_address(session_id)
12111209
lifecycle_api = WebLifecycleAPI(session_id, address)
12121210
meta_api = WebMetaAPI(session_id, address)
12131211
task_api = WebTaskAPI(session_id, address)
@@ -1838,6 +1836,7 @@ def new_session(
18381836
session_id: str = None,
18391837
backend: str = "oscar",
18401838
default: bool = True,
1839+
new: bool = True,
18411840
**kwargs,
18421841
) -> AbstractSession:
18431842
ensure_isolation_created(kwargs)
@@ -1851,7 +1850,7 @@ def new_session(
18511850
session_id = _new_session_id()
18521851

18531852
session = SyncSession.init(
1854-
address, session_id=session_id, backend=backend, new=True, **kwargs
1853+
address, session_id=session_id, backend=backend, new=new, **kwargs
18551854
)
18561855
if default:
18571856
session.as_default()

mars/oscar/backends/communication/dummy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ async def send(self, message: Any):
6464
if self._closed.is_set(): # pragma: no cover
6565
raise ChannelClosed("Channel already closed, cannot send message")
6666
# put message directly into queue
67-
await self._out_queue.put(message)
67+
self._out_queue.put_nowait(message)
6868

6969
@implements(Channel.recv)
7070
async def recv(self):

mars/oscar/backends/message.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
import os
1615
from abc import ABC, abstractmethod
1716
from dataclasses import dataclass
1817
from enum import Enum
@@ -25,6 +24,14 @@
2524
from ...utils import classproperty, dataslots, implements
2625
from ..core import ActorRef
2726

27+
try:
28+
from random import randbytes
29+
except ImportError:
30+
from random import getrandbits
31+
32+
def randbytes(n: int) -> bytes:
33+
return getrandbits(n * 8).to_bytes(n, "little")
34+
2835

2936
# make sure traceback can be pickled
3037
pickling_support.install()
@@ -368,4 +375,4 @@ def _get_slots(message_cls: Type[_MessageBase]):
368375

369376

370377
def new_message_id():
371-
return os.urandom(32)
378+
return randbytes(32)

mars/resource.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,9 @@ def cpu_count():
151151

152152
_last_cgroup_cpu_measure = None
153153
_last_proc_cpu_measure = None
154+
_last_psutil_measure = None
154155
_last_cpu_percent = None
156+
_cpu_percent_interval = 0.1
155157

156158

157159
def _take_process_cpu_snapshot():
@@ -167,7 +169,7 @@ def _take_process_cpu_snapshot():
167169

168170

169171
def cpu_percent():
170-
global _last_cgroup_cpu_measure, _last_proc_cpu_measure, _last_cpu_percent
172+
global _last_cgroup_cpu_measure, _last_proc_cpu_measure, _last_cpu_percent, _last_psutil_measure
171173
if _cpu_use_cgroup_stat:
172174
# see https://www.kernel.org/doc/Documentation/cgroup-v1/cpuacct.txt
173175
with open(CGROUP_CPU_STAT_FILE, "r") as cgroup_file:
@@ -179,15 +181,15 @@ def cpu_percent():
179181

180182
last_cpu_acct, last_sample_time = _last_cgroup_cpu_measure
181183
time_delta = sample_time - last_sample_time
182-
if time_delta < 1e-2:
183-
return _last_cpu_percent
184+
if time_delta < _cpu_percent_interval:
185+
return _last_cpu_percent or 0
184186

185187
_last_cgroup_cpu_measure = (cpu_acct, sample_time)
186188
# nanoseconds / seconds * 100, we shall divide 1e7.
187189
_last_cpu_percent = round(
188190
(cpu_acct - last_cpu_acct) / (sample_time - last_sample_time) / 1e7, 1
189191
)
190-
return _last_cpu_percent
192+
return _last_cpu_percent or 0
191193
elif _cpu_use_process_stat:
192194
pts, sts = _take_process_cpu_snapshot()
193195

@@ -206,14 +208,22 @@ def cpu_percent():
206208
delta_proc = (pt2.user - pt1.user) + (pt2.system - pt1.system)
207209
time_delta = sts[pid] - old_sts[pid]
208210

209-
if time_delta < 1e-2:
211+
if time_delta < _cpu_percent_interval:
210212
return _last_cpu_percent or 0
211213
percents.append((delta_proc / time_delta) * 100)
212214
_last_proc_cpu_measure = (pts, sts)
213215
_last_cpu_percent = round(sum(percents), 1)
214-
return _last_cpu_percent
216+
return _last_cpu_percent or 0
215217
else:
216-
return sum(psutil.cpu_percent(percpu=True))
218+
measure_time = time.time()
219+
if (
220+
_last_psutil_measure is not None
221+
and measure_time - _last_psutil_measure < _cpu_percent_interval
222+
):
223+
return _last_cpu_percent or 0
224+
_last_psutil_measure = measure_time
225+
_last_cpu_percent = psutil.cpu_percent() * _cpu_total
226+
return _last_cpu_percent or 0
217227

218228

219229
def disk_usage(d):

mars/services/cluster/uploader.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ def __init__(self, role=None, interval=None, band_to_slots=None, use_gpu=True):
4444
self._interval = interval or DEFAULT_INFO_UPLOAD_INTERVAL
4545
self._upload_task = None
4646
self._upload_enabled = False
47+
self._uploaded_future = asyncio.Future()
4748
self._node_ready_event = asyncio.Event()
4849

4950
self._use_gpu = use_gpu
@@ -54,7 +55,8 @@ def __init__(self, role=None, interval=None, band_to_slots=None, use_gpu=True):
5455
self._disk_infos = []
5556

5657
async def __post_create__(self):
57-
await self.upload_node_info()
58+
self._upload_task = asyncio.create_task(self._periodical_upload_node_info())
59+
await self._uploaded_future
5860

5961
async def __pre_destroy__(self):
6062
self._upload_task.cancel()
@@ -77,10 +79,27 @@ async def _get_node_info_ref(self):
7779
NodeInfoCollectorActor.default_uid(), address=supervisor_addr
7880
)
7981

82+
async def _periodical_upload_node_info(self):
83+
while True:
84+
try:
85+
await self.upload_node_info()
86+
if not self._uploaded_future.done():
87+
self._uploaded_future.set_result(None)
88+
except asyncio.CancelledError: # pragma: no cover
89+
break
90+
except Exception as ex: # pragma: no cover # noqa: E722 # nosec # pylint: disable=bare-except
91+
logger.error(f"Failed to upload node info: {ex}")
92+
if not self._uploaded_future.done():
93+
self._uploaded_future.set_exception(ex)
94+
try:
95+
await asyncio.sleep(self._interval)
96+
except asyncio.CancelledError: # pragma: no cover
97+
break
98+
8099
async def mark_node_ready(self):
81100
self._upload_enabled = True
82101
# upload info in time to reduce latency
83-
await self.upload_node_info(call_next=False, status=NodeStatus.READY)
102+
await self.upload_node_info(status=NodeStatus.READY)
84103
self._node_ready_event.set()
85104

86105
def is_node_ready(self):
@@ -89,7 +108,7 @@ def is_node_ready(self):
89108
async def wait_node_ready(self):
90109
return self._node_ready_event.wait()
91110

92-
async def upload_node_info(self, call_next: bool = True, status: NodeStatus = None):
111+
async def upload_node_info(self, status: NodeStatus = None):
93112
try:
94113
if not self._info.env:
95114
self._info.env = await asyncio.to_thread(gather_node_env)
@@ -133,11 +152,6 @@ async def upload_node_info(self, call_next: bool = True, status: NodeStatus = No
133152
except: # noqa: E722 # nosec # pylint: disable=bare-except # pragma: no cover
134153
logger.exception(f"Failed to upload node info")
135154
raise
136-
finally:
137-
if call_next:
138-
self._upload_task = self.ref().upload_node_info.tell_delay(
139-
delay=self._interval
140-
)
141155

142156
def get_bands(self) -> Dict[BandType, int]:
143157
band_slots = dict()

mars/services/context.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@ def _call(self, coro):
8484
def get_current_session(self) -> SessionType:
8585
from ..deploy.oscar.session import new_session
8686

87-
return new_session(self.supervisor_address, self.session_id, default=False)
87+
return new_session(
88+
self.supervisor_address, self.session_id, new=False, default=False
89+
)
8890

8991
@implements(Context.get_supervisor_addresses)
9092
def get_supervisor_addresses(self) -> List[str]:

mars/services/session/api/oscar.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ async def has_session(self, session_id: str) -> bool:
6565
async def delete_session(self, session_id: str):
6666
await self._session_manager_ref.delete_session(session_id)
6767

68+
@alru_cache(cache_exceptions=False)
6869
async def get_session_address(self, session_id: str) -> str:
6970
"""
7071
Get session address.

0 commit comments

Comments
 (0)