@@ -132,3 +132,231 @@ async def create_stream(self, bucket: str, stream: str) -> Dict[str, Any]:
132132 "message" : f"Failed to create stream: { text } " ,
133133 "status_code" : status
134134 }
135+
136+ async def bind_push_domain (self , bucket : str , domain : str , domain_type : str = "pushRtmp" ) -> Dict [str , Any ]:
137+ """
138+ Bind push domain for streaming
139+
140+ Args:
141+ bucket: The bucket name
142+ domain: Domain to bind (e.g., "mcp-push1.qiniu.com")
143+ domain_type: Type of push domain (default: "pushRtmp")
144+
145+ Returns:
146+ Dict containing the response status and message
147+ """
148+ if not self .endpoint_url :
149+ raise ValueError ("QINIU_ENDPOINT_URL is not configured" )
150+
151+ # Remove protocol if present in endpoint_url
152+ endpoint = self .endpoint_url
153+ if endpoint .startswith ("http://" ):
154+ endpoint = endpoint [7 :]
155+ elif endpoint .startswith ("https://" ):
156+ endpoint = endpoint [8 :]
157+
158+ # Build URL: https://<bucket>.<endpoint>/?pushDomain
159+ url = f"https://{ bucket } .{ endpoint } /?pushDomain"
160+ headers = self ._get_auth_header ()
161+ headers ["Content-Type" ] = "application/json"
162+
163+ payload = {
164+ "domain" : domain ,
165+ "type" : domain_type
166+ }
167+
168+ logger .info (f"Binding push domain: { domain } to bucket: { bucket } " )
169+
170+ async with aiohttp .ClientSession () as session :
171+ async with session .post (url , headers = headers , json = payload ) as response :
172+ status = response .status
173+ text = await response .text ()
174+
175+ if status == 200 or status == 201 :
176+ logger .info (f"Successfully bound push domain: { domain } " )
177+ return {
178+ "status" : "success" ,
179+ "bucket" : bucket ,
180+ "domain" : domain ,
181+ "type" : domain_type ,
182+ "message" : f"Push domain '{ domain } ' bound successfully" ,
183+ "status_code" : status
184+ }
185+ else :
186+ logger .error (f"Failed to bind push domain: { domain } , status: { status } , response: { text } " )
187+ return {
188+ "status" : "error" ,
189+ "bucket" : bucket ,
190+ "domain" : domain ,
191+ "message" : f"Failed to bind push domain: { text } " ,
192+ "status_code" : status
193+ }
194+
195+ async def bind_play_domain (self , bucket : str , domain : str , domain_type : str = "live" ) -> Dict [str , Any ]:
196+ """
197+ Bind play domain for streaming
198+
199+ Args:
200+ bucket: The bucket name
201+ domain: Domain to bind (e.g., "mcp-play1.qiniu.com")
202+ domain_type: Type of play domain (default: "live")
203+
204+ Returns:
205+ Dict containing the response status and message
206+ """
207+ if not self .endpoint_url :
208+ raise ValueError ("QINIU_ENDPOINT_URL is not configured" )
209+
210+ # Remove protocol if present in endpoint_url
211+ endpoint = self .endpoint_url
212+ if endpoint .startswith ("http://" ):
213+ endpoint = endpoint [7 :]
214+ elif endpoint .startswith ("https://" ):
215+ endpoint = endpoint [8 :]
216+
217+ # Build URL: https://<bucket>.<endpoint>/?domain
218+ url = f"https://{ bucket } .{ endpoint } /?domain"
219+ headers = self ._get_auth_header ()
220+ headers ["Content-Type" ] = "application/json"
221+
222+ payload = {
223+ "domain" : domain ,
224+ "type" : domain_type
225+ }
226+
227+ logger .info (f"Binding play domain: { domain } to bucket: { bucket } " )
228+
229+ async with aiohttp .ClientSession () as session :
230+ async with session .post (url , headers = headers , json = payload ) as response :
231+ status = response .status
232+ text = await response .text ()
233+
234+ if status == 200 or status == 201 :
235+ logger .info (f"Successfully bound play domain: { domain } " )
236+ return {
237+ "status" : "success" ,
238+ "bucket" : bucket ,
239+ "domain" : domain ,
240+ "type" : domain_type ,
241+ "message" : f"Play domain '{ domain } ' bound successfully" ,
242+ "status_code" : status
243+ }
244+ else :
245+ logger .error (f"Failed to bind play domain: { domain } , status: { status } , response: { text } " )
246+ return {
247+ "status" : "error" ,
248+ "bucket" : bucket ,
249+ "domain" : domain ,
250+ "message" : f"Failed to bind play domain: { text } " ,
251+ "status_code" : status
252+ }
253+
254+ def get_push_urls (self , push_domain : str , bucket : str , stream_name : str ) -> Dict [str , Any ]:
255+ """
256+ Get push URLs for streaming
257+
258+ Args:
259+ push_domain: Push domain (e.g., "mcp-push1.qiniu.com")
260+ bucket: The bucket name
261+ stream_name: Stream name
262+
263+ Returns:
264+ Dict containing RTMP and WHIP URLs
265+ """
266+ rtmp_url = f"rtmp://{ push_domain } /{ bucket } /{ stream_name } "
267+ whip_url = f"https://{ push_domain } /{ bucket } /{ stream_name } .whip"
268+
269+ return {
270+ "status" : "success" ,
271+ "push_domain" : push_domain ,
272+ "bucket" : bucket ,
273+ "stream_name" : stream_name ,
274+ "rtmp_url" : rtmp_url ,
275+ "whip_url" : whip_url ,
276+ "message" : "Push URLs generated successfully"
277+ }
278+
279+ def get_play_urls (self , play_domain : str , bucket : str , stream_name : str ) -> Dict [str , Any ]:
280+ """
281+ Get playback URLs for streaming
282+
283+ Args:
284+ play_domain: Play domain (e.g., "mcp-play1.qiniu.com")
285+ bucket: The bucket name
286+ stream_name: Stream name
287+
288+ Returns:
289+ Dict containing FLV, M3U8, and WHEP URLs
290+ """
291+ flv_url = f"https://{ play_domain } /{ bucket } /{ stream_name } .flv"
292+ m3u8_url = f"https://{ play_domain } /{ bucket } /{ stream_name } .m3u8"
293+ whep_url = f"https://{ play_domain } /{ bucket } /{ stream_name } .whep"
294+
295+ return {
296+ "status" : "success" ,
297+ "play_domain" : play_domain ,
298+ "bucket" : bucket ,
299+ "stream_name" : stream_name ,
300+ "flv_url" : flv_url ,
301+ "m3u8_url" : m3u8_url ,
302+ "whep_url" : whep_url ,
303+ "message" : "Play URLs generated successfully"
304+ }
305+
306+ async def query_traffic_stats (self , begin : str , end : str , g : str = "5min" , select : str = "flow" , flow : str = "downflow" ) -> Dict [str , Any ]:
307+ """
308+ Query live streaming traffic statistics
309+
310+ Args:
311+ begin: Start time in format YYYYMMDDHHMMSS (e.g., "20240101000000")
312+ end: End time in format YYYYMMDDHHMMSS (e.g., "20240129105148")
313+ g: Granularity (default: "5min")
314+ select: Select parameter (default: "flow")
315+ flow: Flow type (default: "downflow")
316+
317+ Returns:
318+ Dict containing traffic statistics
319+ """
320+ if not self .endpoint_url :
321+ raise ValueError ("QINIU_ENDPOINT_URL is not configured" )
322+
323+ # Remove protocol and subdomain from endpoint_url to get base domain
324+ endpoint = self .endpoint_url
325+ if endpoint .startswith ("http://" ):
326+ endpoint = endpoint [7 :]
327+ elif endpoint .startswith ("https://" ):
328+ endpoint = endpoint [8 :]
329+
330+ # Extract base domain (e.g., "cn-east-1.qiniumiku.com" from "mls.cn-east-1.qiniumiku.com")
331+ # Use the endpoint directly for traffic stats
332+ url = f"http://mls.{ endpoint } /?trafficStats&begin={ begin } &end={ end } &g={ g } &select={ select } &flow={ flow } "
333+
334+ headers = self ._get_auth_header ()
335+ headers ["Content-Type" ] = "application/json"
336+
337+ logger .info (f"Querying traffic stats from { begin } to { end } " )
338+
339+ async with aiohttp .ClientSession () as session :
340+ async with session .get (url , headers = headers ) as response :
341+ status = response .status
342+ text = await response .text ()
343+
344+ if status == 200 :
345+ logger .info (f"Successfully retrieved traffic stats" )
346+ return {
347+ "status" : "success" ,
348+ "begin" : begin ,
349+ "end" : end ,
350+ "data" : text ,
351+ "message" : "Traffic statistics retrieved successfully" ,
352+ "status_code" : status
353+ }
354+ else :
355+ logger .error (f"Failed to query traffic stats, status: { status } , response: { text } " )
356+ return {
357+ "status" : "error" ,
358+ "begin" : begin ,
359+ "end" : end ,
360+ "message" : f"Failed to query traffic stats: { text } " ,
361+ "status_code" : status
362+ }
0 commit comments