Skip to content

Commit 95d9bcb

Browse files
feat: add 5 new APIs for Miku live streaming
- Add bind_push_domain API for binding push domains (RTMP/WHIP) - Add bind_play_domain API for binding playback domains (FLV/M3U8/WHEP) - Add get_push_urls API to generate RTMP and WHIP push URLs - Add get_play_urls API to generate FLV, M3U8, and WHEP playback URLs - Add query_live_traffic_stats API to query live streaming usage statistics All APIs follow the existing MikuService patterns with proper error handling and logging. Generated with [codeagent](https://github.com/qbox/codeagent) Co-authored-by: callmefisher <[email protected]>
1 parent 2467f01 commit 95d9bcb

File tree

2 files changed

+357
-0
lines changed

2 files changed

+357
-0
lines changed

src/mcp_server/core/miku/miku.py

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,217 @@ async def create_stream(self, bucket: str, stream: str) -> Dict[str, Any]:
132132
"message": f"Failed to create stream: {text}",
133133
"status_code": status
134134
}
135+
136+
async def bind_push_domain(self, bucket: str, domain: str, domain_type: str = "pushRtmp") -> Dict[str, Any]:
137+
"""
138+
Bind a push domain to the bucket
139+
140+
Args:
141+
bucket: The bucket name
142+
domain: The push domain name
143+
domain_type: The type of push domain (default: pushRtmp)
144+
145+
Returns:
146+
Dict containing the response status and message
147+
"""
148+
url = f"{self._build_bucket_url(bucket)}/?pushDomain"
149+
headers = {
150+
**self._get_auth_header(),
151+
"Content-Type": "application/json"
152+
}
153+
data = {
154+
"domain": domain,
155+
"type": domain_type
156+
}
157+
158+
logger.info(f"Binding push domain: {domain} (type: {domain_type}) to bucket: {bucket}")
159+
160+
async with aiohttp.ClientSession() as session:
161+
async with session.post(url, headers=headers, json=data) as response:
162+
status = response.status
163+
text = await response.text()
164+
165+
if status == 200 or status == 201:
166+
logger.info(f"Successfully bound push domain: {domain} to bucket: {bucket}")
167+
return {
168+
"status": "success",
169+
"bucket": bucket,
170+
"domain": domain,
171+
"type": domain_type,
172+
"message": f"Push domain '{domain}' bound successfully to bucket '{bucket}'",
173+
"status_code": status
174+
}
175+
else:
176+
logger.error(f"Failed to bind push domain: {domain}, status: {status}, response: {text}")
177+
return {
178+
"status": "error",
179+
"bucket": bucket,
180+
"domain": domain,
181+
"type": domain_type,
182+
"message": f"Failed to bind push domain: {text}",
183+
"status_code": status
184+
}
185+
186+
async def bind_play_domain(self, bucket: str, domain: str, domain_type: str = "live") -> Dict[str, Any]:
187+
"""
188+
Bind a playback domain to the bucket
189+
190+
Args:
191+
bucket: The bucket name
192+
domain: The playback domain name
193+
domain_type: The type of playback domain (default: live)
194+
195+
Returns:
196+
Dict containing the response status and message
197+
"""
198+
url = f"{self._build_bucket_url(bucket)}/?domain"
199+
headers = {
200+
**self._get_auth_header(),
201+
"Content-Type": "application/json"
202+
}
203+
data = {
204+
"domain": domain,
205+
"type": domain_type
206+
}
207+
208+
logger.info(f"Binding playback domain: {domain} (type: {domain_type}) to bucket: {bucket}")
209+
210+
async with aiohttp.ClientSession() as session:
211+
async with session.post(url, headers=headers, json=data) as response:
212+
status = response.status
213+
text = await response.text()
214+
215+
if status == 200 or status == 201:
216+
logger.info(f"Successfully bound playback domain: {domain} to bucket: {bucket}")
217+
return {
218+
"status": "success",
219+
"bucket": bucket,
220+
"domain": domain,
221+
"type": domain_type,
222+
"message": f"Playback domain '{domain}' bound successfully to bucket '{bucket}'",
223+
"status_code": status
224+
}
225+
else:
226+
logger.error(f"Failed to bind playback domain: {domain}, status: {status}, response: {text}")
227+
return {
228+
"status": "error",
229+
"bucket": bucket,
230+
"domain": domain,
231+
"type": domain_type,
232+
"message": f"Failed to bind playback domain: {text}",
233+
"status_code": status
234+
}
235+
236+
def get_push_urls(self, push_domain: str, bucket: str, stream_name: str) -> Dict[str, Any]:
237+
"""
238+
Generate push URLs for RTMP and WHIP protocols
239+
240+
Args:
241+
push_domain: The push domain
242+
bucket: The bucket name
243+
stream_name: The stream name
244+
245+
Returns:
246+
Dict containing RTMP and WHIP push URLs
247+
"""
248+
rtmp_url = f"rtmp://{push_domain}/{bucket}/{stream_name}"
249+
whip_url = f"https://{push_domain}/{bucket}/{stream_name}.whip"
250+
251+
logger.info(f"Generated push URLs for stream: {stream_name}")
252+
return {
253+
"status": "success",
254+
"push_domain": push_domain,
255+
"bucket": bucket,
256+
"stream_name": stream_name,
257+
"rtmp_url": rtmp_url,
258+
"whip_url": whip_url,
259+
"message": "Push URLs generated successfully"
260+
}
261+
262+
def get_play_urls(self, play_domain: str, bucket: str, stream_name: str) -> Dict[str, Any]:
263+
"""
264+
Generate playback URLs for FLV, M3U8, and WHEP protocols
265+
266+
Args:
267+
play_domain: The playback domain
268+
bucket: The bucket name
269+
stream_name: The stream name
270+
271+
Returns:
272+
Dict containing FLV, M3U8, and WHEP playback URLs
273+
"""
274+
flv_url = f"https://{play_domain}/{bucket}/{stream_name}.flv"
275+
m3u8_url = f"https://{play_domain}/{bucket}/{stream_name}.m3u8"
276+
whep_url = f"https://{play_domain}/{bucket}/{stream_name}.whep"
277+
278+
logger.info(f"Generated playback URLs for stream: {stream_name}")
279+
return {
280+
"status": "success",
281+
"play_domain": play_domain,
282+
"bucket": bucket,
283+
"stream_name": stream_name,
284+
"flv_url": flv_url,
285+
"m3u8_url": m3u8_url,
286+
"whep_url": whep_url,
287+
"message": "Playback URLs generated successfully"
288+
}
289+
290+
async def query_live_traffic_stats(self, begin: str, end: str) -> Dict[str, Any]:
291+
"""
292+
Query live streaming traffic statistics
293+
294+
Args:
295+
begin: Start time in format YYYYMMDDHHMMSS (e.g., 20240101000000)
296+
end: End time in format YYYYMMDDHHMMSS (e.g., 20240129105148)
297+
298+
Returns:
299+
Dict containing traffic statistics
300+
"""
301+
if not self.endpoint_url:
302+
raise ValueError("QINIU_ENDPOINT_URL is not configured")
303+
304+
# Remove protocol and bucket prefix to get base endpoint
305+
endpoint = self.endpoint_url
306+
if endpoint.startswith("http://"):
307+
endpoint = endpoint[7:]
308+
elif endpoint.startswith("https://"):
309+
endpoint = endpoint[8:]
310+
311+
# Remove bucket prefix if present (format: bucket.endpoint)
312+
if '.' in endpoint:
313+
parts = endpoint.split('.', 1)
314+
if len(parts) > 1:
315+
endpoint = parts[1]
316+
317+
url = f"http://{endpoint}/?trafficStats&begin={begin}&end={end}&g=5min&select=flow&flow=downflow"
318+
headers = {
319+
**self._get_auth_header(),
320+
"Content-Type": "application/json"
321+
}
322+
323+
logger.info(f"Querying live traffic stats from {begin} to {end}")
324+
325+
async with aiohttp.ClientSession() as session:
326+
async with session.get(url, headers=headers) as response:
327+
status = response.status
328+
text = await response.text()
329+
330+
if status == 200:
331+
logger.info(f"Successfully queried live traffic stats")
332+
return {
333+
"status": "success",
334+
"begin": begin,
335+
"end": end,
336+
"data": text,
337+
"message": "Traffic statistics retrieved successfully",
338+
"status_code": status
339+
}
340+
else:
341+
logger.error(f"Failed to query traffic stats, status: {status}, response: {text}")
342+
return {
343+
"status": "error",
344+
"begin": begin,
345+
"end": end,
346+
"message": f"Failed to query traffic stats: {text}",
347+
"status_code": status
348+
}

src/mcp_server/core/miku/tools.py

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,155 @@ async def create_stream(self, **kwargs) -> list[types.TextContent]:
6060
result = await self.miku.create_stream(**kwargs)
6161
return [types.TextContent(type="text", text=str(result))]
6262

63+
@tools.tool_meta(
64+
types.Tool(
65+
name="miku_bind_push_domain",
66+
description="Bind a push domain to a Miku bucket for live streaming. This allows you to configure the domain for pushing RTMP/WHIP streams.",
67+
inputSchema={
68+
"type": "object",
69+
"properties": {
70+
"bucket": {
71+
"type": "string",
72+
"description": _BUCKET_DESC,
73+
},
74+
"domain": {
75+
"type": "string",
76+
"description": "The push domain name (e.g., mcp-push1.qiniu.com)",
77+
},
78+
"domain_type": {
79+
"type": "string",
80+
"description": "The type of push domain (default: pushRtmp)",
81+
"default": "pushRtmp",
82+
},
83+
},
84+
"required": ["bucket", "domain"],
85+
},
86+
)
87+
)
88+
async def bind_push_domain(self, **kwargs) -> list[types.TextContent]:
89+
result = await self.miku.bind_push_domain(**kwargs)
90+
return [types.TextContent(type="text", text=str(result))]
91+
92+
@tools.tool_meta(
93+
types.Tool(
94+
name="miku_bind_play_domain",
95+
description="Bind a playback domain to a Miku bucket for live streaming. This allows you to configure the domain for playing back streams via FLV/M3U8/WHEP.",
96+
inputSchema={
97+
"type": "object",
98+
"properties": {
99+
"bucket": {
100+
"type": "string",
101+
"description": _BUCKET_DESC,
102+
},
103+
"domain": {
104+
"type": "string",
105+
"description": "The playback domain name (e.g., mcp-play1.qiniu.com)",
106+
},
107+
"domain_type": {
108+
"type": "string",
109+
"description": "The type of playback domain (default: live)",
110+
"default": "live",
111+
},
112+
},
113+
"required": ["bucket", "domain"],
114+
},
115+
)
116+
)
117+
async def bind_play_domain(self, **kwargs) -> list[types.TextContent]:
118+
result = await self.miku.bind_play_domain(**kwargs)
119+
return [types.TextContent(type="text", text=str(result))]
120+
121+
@tools.tool_meta(
122+
types.Tool(
123+
name="miku_get_push_urls",
124+
description="Get push URLs for a stream. Returns RTMP and WHIP push URLs that can be used to push live streams.",
125+
inputSchema={
126+
"type": "object",
127+
"properties": {
128+
"push_domain": {
129+
"type": "string",
130+
"description": "The push domain name",
131+
},
132+
"bucket": {
133+
"type": "string",
134+
"description": _BUCKET_DESC,
135+
},
136+
"stream_name": {
137+
"type": "string",
138+
"description": "The stream name",
139+
},
140+
},
141+
"required": ["push_domain", "bucket", "stream_name"],
142+
},
143+
)
144+
)
145+
async def get_push_urls(self, **kwargs) -> list[types.TextContent]:
146+
result = self.miku.get_push_urls(**kwargs)
147+
return [types.TextContent(type="text", text=str(result))]
148+
149+
@tools.tool_meta(
150+
types.Tool(
151+
name="miku_get_play_urls",
152+
description="Get playback URLs for a stream. Returns FLV, M3U8, and WHEP playback URLs that can be used to play live streams.",
153+
inputSchema={
154+
"type": "object",
155+
"properties": {
156+
"play_domain": {
157+
"type": "string",
158+
"description": "The playback domain name",
159+
},
160+
"bucket": {
161+
"type": "string",
162+
"description": _BUCKET_DESC,
163+
},
164+
"stream_name": {
165+
"type": "string",
166+
"description": "The stream name",
167+
},
168+
},
169+
"required": ["play_domain", "bucket", "stream_name"],
170+
},
171+
)
172+
)
173+
async def get_play_urls(self, **kwargs) -> list[types.TextContent]:
174+
result = self.miku.get_play_urls(**kwargs)
175+
return [types.TextContent(type="text", text=str(result))]
176+
177+
@tools.tool_meta(
178+
types.Tool(
179+
name="miku_query_live_traffic_stats",
180+
description="Query live streaming traffic statistics for a time range. Returns bandwidth and traffic usage data.",
181+
inputSchema={
182+
"type": "object",
183+
"properties": {
184+
"begin": {
185+
"type": "string",
186+
"description": "Start time in format YYYYMMDDHHMMSS (e.g., 20240101000000)",
187+
},
188+
"end": {
189+
"type": "string",
190+
"description": "End time in format YYYYMMDDHHMMSS (e.g., 20240129105148)",
191+
},
192+
},
193+
"required": ["begin", "end"],
194+
},
195+
)
196+
)
197+
async def query_live_traffic_stats(self, **kwargs) -> list[types.TextContent]:
198+
result = await self.miku.query_live_traffic_stats(**kwargs)
199+
return [types.TextContent(type="text", text=str(result))]
200+
63201

64202
def register_tools(miku: MikuService):
65203
tool_impl = _ToolImpl(miku)
66204
tools.auto_register_tools(
67205
[
68206
tool_impl.create_bucket,
69207
tool_impl.create_stream,
208+
tool_impl.bind_push_domain,
209+
tool_impl.bind_play_domain,
210+
tool_impl.get_push_urls,
211+
tool_impl.get_play_urls,
212+
tool_impl.query_live_traffic_stats,
70213
]
71214
)

0 commit comments

Comments
 (0)