Skip to content

Commit ed3c13a

Browse files
authored
Feature/s3 (#1375)
* Refactor COS upload and experiment state APIs Removed boto3/botocore dependencies and the CosClient, replacing direct COS upload logic with a new API-based approach using presigned URLs. Moved experiment state update logic to a dedicated API module and updated all usages. Refactored Client to remove COS logic, simplified session handling, and updated uploader and callback code to use the new APIs. Updated imports and tests to reflect file moves and API changes. * Reset buffer pointer before COS upload Added buffer.seek(0) before uploading to COS to ensure the buffer pointer is at the start. Also removed unused COS upload tests and related imports from test_client.py for code cleanup. * Move buffer seek to _upload function Relocated the buffer.seek(0) call from upload_to_cos to the _upload function to ensure the buffer is reset before uploading. This centralizes buffer handling and prevents redundant seeks. * Add retry logic to file upload and unit tests Enhanced the file upload function with retry logic and exponential backoff for robustness against network errors. Added comprehensive unit tests to verify upload success, retry behavior, exception handling, and server error scenarios. * Improve error handling in COS upload function Enhanced the upload_to_cos function to better handle exceptions during concurrent uploads. Failed uploads are now logged with warnings and retried, with errors on retry attempts also logged for improved reliability and debugging. * Improve retry backoff in upload_file function Changed the retry delay in upload_file to use exponential backoff (2^(attempt-1) seconds) instead of linear. Also removed a redundant comment in upload_to_cos for clarity. * Remove unused import and URL prefix logic Deleted the import of get_host_api and related code that handled URL prefixing for private environments, as it is no longer needed.
1 parent 59332e9 commit ed3c13a

File tree

15 files changed

+337
-289
lines changed

15 files changed

+337
-289
lines changed

requirements.txt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ click
55
pyyaml
66
psutil>=5.0.0
77
nvidia-ml-py
8-
boto3>=1.35.49
9-
botocore
108
pydantic~=2.0
119
pyecharts>=2.0.0
1210
wrapt>=1.17.0

swanlab/api/experiment.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
from swanlab.api.base import ApiBase, ApiHTTP
1313
from swanlab.api.types import ApiResponse, Experiment, Pagination
14-
from swanlab.package import get_host_api
1514

1615
try:
1716
from pandas import DataFrame
@@ -181,11 +180,6 @@ def get_metrics(
181180
continue
182181

183182
url:str = resp.data.get("url", "")
184-
# 私有化环境可能不会携带 ip:https://github.com/SwanHubX/SwanLab/issues/1267
185-
if not (url.startswith('https://') or url.startswith('http://')):
186-
url = get_host_api().split('/api')[0] + url # url 已添加前缀 /
187-
188-
189183
df = pd.read_csv(url, index_col=0)
190184

191185
if idx == 0:

swanlab/core_python/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,3 @@
88
"""
99

1010
from .client import *
11-
from .session import create_session
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
"""
2+
@author: cunyue
3+
@file: experiment.py
4+
@time: 2025/12/11 18:37
5+
@description: 定义实验相关的后端API接口
6+
"""
7+
8+
from typing import Literal
9+
10+
from swanlab.core_python.client import Client
11+
12+
13+
def update_experiment_state(
14+
client: Client,
15+
*,
16+
username: str,
17+
projname: str,
18+
cuid: str,
19+
state: Literal['FINISHED', 'CRASHED', 'ABORTED'],
20+
finished_at: str = None,
21+
):
22+
"""
23+
更新实验状态,注意此接口会将客户端标记为 pending 状态,表示实验已结束
24+
:param client: 已登录的客户端实例
25+
:param username: 实验所属用户名
26+
:param projname: 实验所属项目名称
27+
:param cuid: 实验唯一标识符
28+
:param state: 实验状态
29+
:param finished_at: 实验结束时间,格式为 ISO 8601,如果不提供则使用当前时间
30+
"""
31+
put_data = {
32+
"state": state,
33+
"finishedAt": finished_at,
34+
"from": "sdk",
35+
}
36+
put_data = {k: v for k, v in put_data.items() if v is not None} # 移除值为None的键
37+
client.put(f"/project/{username}/{projname}/runs/{cuid}/state", put_data)
38+
client.pending = True

swanlab/core_python/api/service.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
"""
2+
@author: cunyue
3+
@file: service.py
4+
@time: 2025/12/11 18:48
5+
@description: 服务相关API接口
6+
"""
7+
8+
import time
9+
from concurrent.futures import ThreadPoolExecutor
10+
from io import BytesIO
11+
from typing import List, Tuple
12+
13+
import requests
14+
from requests.exceptions import RequestException
15+
16+
from ..client import Client
17+
from ...log import swanlog
18+
from ...toolkit.models.data import MediaBuffer
19+
20+
21+
def upload_file(*, url: str, buffer: BytesIO, max_retries=3):
22+
"""
23+
上传文件到COS
24+
:param url: COS上传URL
25+
:param buffer: 文件内容的BytesIO对象
26+
:param max_retries: 最大重试次数
27+
"""
28+
# 这里也可以创建一个 Session 对象复用 TCP 连接
29+
with requests.Session() as session:
30+
for attempt in range(1, max_retries + 1):
31+
try:
32+
buffer.seek(0)
33+
response = session.put(
34+
url,
35+
data=buffer,
36+
headers={'Content-Type': 'application/octet-stream'},
37+
timeout=30,
38+
)
39+
response.raise_for_status()
40+
return
41+
except RequestException:
42+
swanlog.warning("Upload attempt {} failed for URL: {}".format(attempt, url))
43+
# 如果是最后一次尝试,抛出异常
44+
if attempt == max_retries:
45+
raise
46+
# 简单的指数退避(等待 1s, 2s, 4s...)
47+
time.sleep(2 ** (attempt - 1))
48+
49+
50+
def upload_to_cos(client: Client, *, cuid: str, buffers: List[MediaBuffer]):
51+
"""
52+
上传文件到COS
53+
:param client: 对应的客户端实例
54+
:param cuid: 实验cuid
55+
:param buffers: 媒体数据缓冲区
56+
"""
57+
failed_buffers: List[Tuple[str, MediaBuffer]] = []
58+
# 1. 后端签名
59+
data, _ = client.post(
60+
'/resources/presigned/put',
61+
{"experimentId": cuid, "paths": [buffer.file_name for buffer in buffers]},
62+
)
63+
urls: List[str] = data['urls']
64+
# 2. 并发上传
65+
# executor.submit可能会失败,因为线程数有限或者线程池已经关闭
66+
# 来自此issue: https://github.com/SwanHubX/SwanLab/issues/889,此时需要一个个发送
67+
with ThreadPoolExecutor(max_workers=10) as executor:
68+
futures = []
69+
assert len(urls) == len(buffers), "URLs and buffers length mismatch"
70+
# 2.1 在线程中并发上传
71+
for index, buffer in enumerate(buffers):
72+
url = urls[index]
73+
try:
74+
future = executor.submit(upload_file, url=url, buffer=buffer)
75+
futures.append((future, url, buffer))
76+
except RuntimeError:
77+
failed_buffers.append((url, buffer))
78+
# 2.2 收集结果
79+
for future, url, buffer in futures:
80+
try:
81+
future.result()
82+
except Exception as e:
83+
swanlog.warning(f"Failed to upload {url}: {e}, will retry...")
84+
failed_buffers.append((url, buffer))
85+
# 3. 重试失败的buffer,重新上传
86+
if len(failed_buffers):
87+
swanlog.debug("Retrying failed buffers: {}".format(len(failed_buffers)))
88+
for url, buffer in failed_buffers:
89+
try:
90+
upload_file(url=url, buffer=buffer)
91+
except Exception as e:
92+
swanlog.error(f"Failed to upload {url}: {e}")

swanlab/core_python/auth/providers/api_key.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@
1313
from rich.status import Status
1414
from rich.text import Text
1515

16+
from swanlab.core_python.client.session import create_session
1617
from swanlab.env import is_windows, is_interactive
1718
from swanlab.error import ValidationError, APIKeyFormatError, KeyFileError
1819
from swanlab.log import swanlog
1920
from swanlab.package import get_setting_url, get_host_api, get_host_web, fmt_web_host, save_key as sk, get_key
20-
from ...session import create_session
2121

2222

2323
class LoginInfo:

0 commit comments

Comments
 (0)