Skip to content

Commit 4332c3b

Browse files
feat: Add 5 new Miku MCP server APIs for live streaming
- Add bind_push_domain API to bind push domain with custom type - Add bind_play_domain API to bind play domain with custom type - Add get_push_urls to generate RTMP and WHIP push URLs - Add get_play_urls to generate FLV, M3U8, and WHEP play URLs - Add query_traffic_stats API to query live streaming usage statistics All APIs follow the S3-style URL pattern and support customizable parameters. Generated with [codeagent](https://github.com/qbox/codeagent) Co-authored-by: callmefisher <[email protected]>
1 parent 2467f01 commit 4332c3b

File tree

2 files changed

+369
-0
lines changed

2 files changed

+369
-0
lines changed

src/mcp_server/core/miku/miku.py

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,3 +132,210 @@ async def create_stream(self, bucket: str, stream: str) -> Dict[str, Any]:
132132
"message": f"Failed to create stream: {text}",
133133
"status_code": status
134134
}
135+
136+
async def bind_push_domain(self, bucket: str, domain: str, domain_type: str = "pushRtmp") -> Dict[str, Any]:
137+
"""
138+
Bind a push domain to the bucket
139+
140+
Args:
141+
bucket: The bucket name
142+
domain: The push domain to bind (e.g., "mcp-push1.qiniu.com")
143+
domain_type: The domain type (default: "pushRtmp")
144+
145+
Returns:
146+
Dict containing the response status and message
147+
"""
148+
base_url = self._build_bucket_url(bucket)
149+
url = f"{base_url}?pushDomain"
150+
headers = self._get_auth_header()
151+
headers["Content-Type"] = "application/json"
152+
153+
payload = {
154+
"domain": domain,
155+
"type": domain_type
156+
}
157+
158+
logger.info(f"Binding push domain: {domain} (type: {domain_type}) to bucket: {bucket}")
159+
160+
async with aiohttp.ClientSession() as session:
161+
async with session.post(url, headers=headers, json=payload) as response:
162+
status = response.status
163+
text = await response.text()
164+
165+
if status == 200 or status == 201:
166+
logger.info(f"Successfully bound push domain: {domain} to bucket: {bucket}")
167+
return {
168+
"status": "success",
169+
"bucket": bucket,
170+
"domain": domain,
171+
"type": domain_type,
172+
"message": f"Push domain '{domain}' bound successfully to bucket '{bucket}'",
173+
"status_code": status
174+
}
175+
else:
176+
logger.error(f"Failed to bind push domain: {domain}, status: {status}, response: {text}")
177+
return {
178+
"status": "error",
179+
"bucket": bucket,
180+
"domain": domain,
181+
"message": f"Failed to bind push domain: {text}",
182+
"status_code": status
183+
}
184+
185+
async def bind_play_domain(self, bucket: str, domain: str, domain_type: str = "live") -> Dict[str, Any]:
186+
"""
187+
Bind a play domain to the bucket
188+
189+
Args:
190+
bucket: The bucket name
191+
domain: The play domain to bind (e.g., "mcp-play1.qiniu.com")
192+
domain_type: The domain type (default: "live")
193+
194+
Returns:
195+
Dict containing the response status and message
196+
"""
197+
base_url = self._build_bucket_url(bucket)
198+
url = f"{base_url}?domain"
199+
headers = self._get_auth_header()
200+
headers["Content-Type"] = "application/json"
201+
202+
payload = {
203+
"domain": domain,
204+
"type": domain_type
205+
}
206+
207+
logger.info(f"Binding play domain: {domain} (type: {domain_type}) to bucket: {bucket}")
208+
209+
async with aiohttp.ClientSession() as session:
210+
async with session.post(url, headers=headers, json=payload) as response:
211+
status = response.status
212+
text = await response.text()
213+
214+
if status == 200 or status == 201:
215+
logger.info(f"Successfully bound play domain: {domain} to bucket: {bucket}")
216+
return {
217+
"status": "success",
218+
"bucket": bucket,
219+
"domain": domain,
220+
"type": domain_type,
221+
"message": f"Play domain '{domain}' bound successfully to bucket '{bucket}'",
222+
"status_code": status
223+
}
224+
else:
225+
logger.error(f"Failed to bind play domain: {domain}, status: {status}, response: {text}")
226+
return {
227+
"status": "error",
228+
"bucket": bucket,
229+
"domain": domain,
230+
"message": f"Failed to bind play domain: {text}",
231+
"status_code": status
232+
}
233+
234+
def get_push_urls(self, push_domain: str, bucket: str, stream_name: str) -> Dict[str, Any]:
235+
"""
236+
Get push URLs for a stream
237+
238+
Args:
239+
push_domain: The push domain (e.g., "mcp-push1.qiniu.com")
240+
bucket: The bucket name
241+
stream_name: The stream name
242+
243+
Returns:
244+
Dict containing RTMP and WHIP push URLs
245+
"""
246+
rtmp_url = f"rtmp://{push_domain}/{bucket}/{stream_name}"
247+
whip_url = f"https://{push_domain}/{bucket}/{stream_name}.whip"
248+
249+
return {
250+
"status": "success",
251+
"push_domain": push_domain,
252+
"bucket": bucket,
253+
"stream": stream_name,
254+
"rtmp_url": rtmp_url,
255+
"whip_url": whip_url,
256+
"message": f"Push URLs generated for stream '{stream_name}'"
257+
}
258+
259+
def get_play_urls(self, play_domain: str, bucket: str, stream_name: str) -> Dict[str, Any]:
260+
"""
261+
Get play URLs for a stream
262+
263+
Args:
264+
play_domain: The play domain (e.g., "mcp-play1.qiniu.com")
265+
bucket: The bucket name
266+
stream_name: The stream name
267+
268+
Returns:
269+
Dict containing FLV, HLS (M3U8), and WHEP play URLs
270+
"""
271+
flv_url = f"https://{play_domain}/{bucket}/{stream_name}.flv"
272+
m3u8_url = f"https://{play_domain}/{bucket}/{stream_name}.m3u8"
273+
whep_url = f"https://{play_domain}/{bucket}/{stream_name}.whep"
274+
275+
return {
276+
"status": "success",
277+
"play_domain": play_domain,
278+
"bucket": bucket,
279+
"stream": stream_name,
280+
"flv_url": flv_url,
281+
"m3u8_url": m3u8_url,
282+
"whep_url": whep_url,
283+
"message": f"Play URLs generated for stream '{stream_name}'"
284+
}
285+
286+
async def query_traffic_stats(self, begin: str, end: str, g: str = "5min",
287+
select: str = "flow", flow: str = "downflow") -> Dict[str, Any]:
288+
"""
289+
Query live streaming traffic statistics
290+
291+
Args:
292+
begin: Start time in format YYYYMMDDHHMMSS (e.g., "20240101000000")
293+
end: End time in format YYYYMMDDHHMMSS (e.g., "20240129105148")
294+
g: Time granularity (default: "5min")
295+
select: Select parameter (default: "flow")
296+
flow: Flow type (default: "downflow")
297+
298+
Returns:
299+
Dict containing traffic statistics
300+
"""
301+
if not self.endpoint_url:
302+
raise ValueError("QINIU_ENDPOINT_URL is not configured")
303+
304+
# Remove protocol if present in endpoint_url
305+
endpoint = self.endpoint_url
306+
if endpoint.startswith("http://"):
307+
endpoint = endpoint[7:]
308+
elif endpoint.startswith("https://"):
309+
endpoint = endpoint[8:]
310+
311+
# Build URL with query parameters
312+
url = f"http://{endpoint}?trafficStats&begin={begin}&end={end}&g={g}&select={select}&flow={flow}"
313+
headers = self._get_auth_header()
314+
headers["Content-Type"] = "application/json"
315+
316+
logger.info(f"Querying traffic stats from {begin} to {end}")
317+
318+
async with aiohttp.ClientSession() as session:
319+
async with session.get(url, headers=headers) as response:
320+
status = response.status
321+
text = await response.text()
322+
323+
if status == 200:
324+
logger.info(f"Successfully retrieved traffic stats")
325+
return {
326+
"status": "success",
327+
"begin": begin,
328+
"end": end,
329+
"data": text,
330+
"message": f"Traffic statistics retrieved successfully",
331+
"status_code": status
332+
}
333+
else:
334+
logger.error(f"Failed to query traffic stats, status: {status}, response: {text}")
335+
return {
336+
"status": "error",
337+
"begin": begin,
338+
"end": end,
339+
"message": f"Failed to query traffic stats: {text}",
340+
"status_code": status
341+
}

src/mcp_server/core/miku/tools.py

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@
1010

1111
_BUCKET_DESC = "Miku bucket name"
1212
_STREAM_DESC = "Miku stream name"
13+
_DOMAIN_DESC = "Domain name (e.g., 'mcp-push1.qiniu.com')"
14+
_PUSH_DOMAIN_DESC = "Push domain name (e.g., 'mcp-push1.qiniu.com')"
15+
_PLAY_DOMAIN_DESC = "Play domain name (e.g., 'mcp-play1.qiniu.com')"
16+
_DOMAIN_TYPE_DESC = "Domain type"
1317

1418

1519
class _ToolImpl:
@@ -60,12 +64,170 @@ async def create_stream(self, **kwargs) -> list[types.TextContent]:
6064
result = await self.miku.create_stream(**kwargs)
6165
return [types.TextContent(type="text", text=str(result))]
6266

67+
@tools.tool_meta(
68+
types.Tool(
69+
name="miku_bind_push_domain",
70+
description="Bind a push domain to a Miku bucket. This allows you to configure the domain for RTMP/WHIP streaming.",
71+
inputSchema={
72+
"type": "object",
73+
"properties": {
74+
"bucket": {
75+
"type": "string",
76+
"description": _BUCKET_DESC,
77+
},
78+
"domain": {
79+
"type": "string",
80+
"description": _DOMAIN_DESC,
81+
},
82+
"domain_type": {
83+
"type": "string",
84+
"description": _DOMAIN_TYPE_DESC,
85+
"default": "pushRtmp",
86+
},
87+
},
88+
"required": ["bucket", "domain"],
89+
},
90+
)
91+
)
92+
async def bind_push_domain(self, **kwargs) -> list[types.TextContent]:
93+
result = await self.miku.bind_push_domain(**kwargs)
94+
return [types.TextContent(type="text", text=str(result))]
95+
96+
@tools.tool_meta(
97+
types.Tool(
98+
name="miku_bind_play_domain",
99+
description="Bind a play domain to a Miku bucket. This allows you to configure the domain for live streaming playback.",
100+
inputSchema={
101+
"type": "object",
102+
"properties": {
103+
"bucket": {
104+
"type": "string",
105+
"description": _BUCKET_DESC,
106+
},
107+
"domain": {
108+
"type": "string",
109+
"description": _DOMAIN_DESC,
110+
},
111+
"domain_type": {
112+
"type": "string",
113+
"description": _DOMAIN_TYPE_DESC,
114+
"default": "live",
115+
},
116+
},
117+
"required": ["bucket", "domain"],
118+
},
119+
)
120+
)
121+
async def bind_play_domain(self, **kwargs) -> list[types.TextContent]:
122+
result = await self.miku.bind_play_domain(**kwargs)
123+
return [types.TextContent(type="text", text=str(result))]
124+
125+
@tools.tool_meta(
126+
types.Tool(
127+
name="miku_get_push_urls",
128+
description="Get push URLs for a stream. Returns RTMP and WHIP URLs for streaming.",
129+
inputSchema={
130+
"type": "object",
131+
"properties": {
132+
"push_domain": {
133+
"type": "string",
134+
"description": _PUSH_DOMAIN_DESC,
135+
},
136+
"bucket": {
137+
"type": "string",
138+
"description": _BUCKET_DESC,
139+
},
140+
"stream_name": {
141+
"type": "string",
142+
"description": _STREAM_DESC,
143+
},
144+
},
145+
"required": ["push_domain", "bucket", "stream_name"],
146+
},
147+
)
148+
)
149+
async def get_push_urls(self, **kwargs) -> list[types.TextContent]:
150+
result = self.miku.get_push_urls(**kwargs)
151+
return [types.TextContent(type="text", text=str(result))]
152+
153+
@tools.tool_meta(
154+
types.Tool(
155+
name="miku_get_play_urls",
156+
description="Get play URLs for a stream. Returns FLV, HLS (M3U8), and WHEP URLs for playback.",
157+
inputSchema={
158+
"type": "object",
159+
"properties": {
160+
"play_domain": {
161+
"type": "string",
162+
"description": _PLAY_DOMAIN_DESC,
163+
},
164+
"bucket": {
165+
"type": "string",
166+
"description": _BUCKET_DESC,
167+
},
168+
"stream_name": {
169+
"type": "string",
170+
"description": _STREAM_DESC,
171+
},
172+
},
173+
"required": ["play_domain", "bucket", "stream_name"],
174+
},
175+
)
176+
)
177+
async def get_play_urls(self, **kwargs) -> list[types.TextContent]:
178+
result = self.miku.get_play_urls(**kwargs)
179+
return [types.TextContent(type="text", text=str(result))]
180+
181+
@tools.tool_meta(
182+
types.Tool(
183+
name="miku_query_traffic_stats",
184+
description="Query live streaming traffic statistics. Time format: YYYYMMDDHHMMSS (e.g., '20240101000000').",
185+
inputSchema={
186+
"type": "object",
187+
"properties": {
188+
"begin": {
189+
"type": "string",
190+
"description": "Start time in format YYYYMMDDHHMMSS (e.g., '20240101000000')",
191+
},
192+
"end": {
193+
"type": "string",
194+
"description": "End time in format YYYYMMDDHHMMSS (e.g., '20240129105148')",
195+
},
196+
"g": {
197+
"type": "string",
198+
"description": "Time granularity (default: '5min')",
199+
"default": "5min",
200+
},
201+
"select": {
202+
"type": "string",
203+
"description": "Select parameter (default: 'flow')",
204+
"default": "flow",
205+
},
206+
"flow": {
207+
"type": "string",
208+
"description": "Flow type (default: 'downflow')",
209+
"default": "downflow",
210+
},
211+
},
212+
"required": ["begin", "end"],
213+
},
214+
)
215+
)
216+
async def query_traffic_stats(self, **kwargs) -> list[types.TextContent]:
217+
result = await self.miku.query_traffic_stats(**kwargs)
218+
return [types.TextContent(type="text", text=str(result))]
219+
63220

64221
def register_tools(miku: MikuService):
65222
tool_impl = _ToolImpl(miku)
66223
tools.auto_register_tools(
67224
[
68225
tool_impl.create_bucket,
69226
tool_impl.create_stream,
227+
tool_impl.bind_push_domain,
228+
tool_impl.bind_play_domain,
229+
tool_impl.get_push_urls,
230+
tool_impl.get_play_urls,
231+
tool_impl.query_traffic_stats,
70232
]
71233
)

0 commit comments

Comments
 (0)