Skip to content

Commit ee3ecc9

Browse files
committed
Migrate proxy from ossrs/srs to proxy-go
1 parent f54d892 commit ee3ecc9

File tree

24 files changed

+6960
-32
lines changed

24 files changed

+6960
-32
lines changed

.gitignore

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,4 @@
1-
# Prerequisites
2-
*.d
3-
4-
# Compiled Object files
5-
*.slo
6-
*.lo
7-
*.o
8-
*.obj
9-
10-
# Precompiled Headers
11-
*.gch
12-
*.pch
13-
14-
# Compiled Dynamic libraries
15-
*.so
16-
*.dylib
17-
*.dll
18-
19-
# Fortran module files
20-
*.mod
21-
*.smod
22-
23-
# Compiled Static libraries
24-
*.lai
25-
*.la
26-
*.a
27-
*.lib
28-
29-
# Executables
30-
*.exe
31-
*.out
32-
*.app
1+
.idea
2+
srs-proxy
3+
.env
4+
.go-formarted

Makefile

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
.PHONY: all build test fmt clean run
2+
3+
all: build
4+
5+
build: fmt ./srs-proxy
6+
7+
./srs-proxy: *.go
8+
go build -o srs-proxy .
9+
10+
test:
11+
go test ./...
12+
13+
fmt: ./.go-formarted
14+
15+
./.go-formarted: *.go
16+
touch .go-formarted
17+
go fmt ./...
18+
19+
clean:
20+
rm -f srs-proxy .go-formarted
21+
22+
run: fmt
23+
go run .

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,7 @@
11
# proxy-go
2+
23
A RTMP/SRT/WebRTC/WHIP/WHEP/HLS/HTTP-FLV proxy for media server.
4+
5+
## Usage
6+
7+
See [SRS Origin Cluster](https://ossrs.io/lts/en-us/docs/v7/doc/origin-cluster) for details.

api.go

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
// Copyright (c) 2025 Winlin
2+
//
3+
// SPDX-License-Identifier: MIT
4+
package main
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"net/http"
10+
"os"
11+
"strings"
12+
"sync"
13+
"time"
14+
15+
"srs-proxy/errors"
16+
"srs-proxy/logger"
17+
)
18+
19+
// srsHTTPAPIServer is the proxy for SRS HTTP API, to proxy the WebRTC HTTP API like WHIP and WHEP,
20+
// to proxy other HTTP API of SRS like the streams and clients, etc.
21+
type srsHTTPAPIServer struct {
22+
// The underlayer HTTP server.
23+
server *http.Server
24+
// The WebRTC server.
25+
rtc *srsWebRTCServer
26+
// The gracefully quit timeout, wait server to quit.
27+
gracefulQuitTimeout time.Duration
28+
// The wait group for all goroutines.
29+
wg sync.WaitGroup
30+
}
31+
32+
func NewSRSHTTPAPIServer(opts ...func(*srsHTTPAPIServer)) *srsHTTPAPIServer {
33+
v := &srsHTTPAPIServer{}
34+
for _, opt := range opts {
35+
opt(v)
36+
}
37+
return v
38+
}
39+
40+
func (v *srsHTTPAPIServer) Close() error {
41+
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
42+
defer cancel()
43+
v.server.Shutdown(ctx)
44+
45+
v.wg.Wait()
46+
return nil
47+
}
48+
49+
func (v *srsHTTPAPIServer) Run(ctx context.Context) error {
50+
// Parse address to listen.
51+
addr := envHttpAPI()
52+
if !strings.Contains(addr, ":") {
53+
addr = ":" + addr
54+
}
55+
56+
// Create server and handler.
57+
mux := http.NewServeMux()
58+
v.server = &http.Server{Addr: addr, Handler: mux}
59+
logger.Df(ctx, "HTTP API server listen at %v", addr)
60+
61+
// Shutdown the server gracefully when quiting.
62+
go func() {
63+
ctxParent := ctx
64+
<-ctxParent.Done()
65+
66+
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
67+
defer cancel()
68+
69+
v.server.Shutdown(ctx)
70+
}()
71+
72+
// The basic version handler, also can be used as health check API.
73+
logger.Df(ctx, "Handle /api/v1/versions by %v", addr)
74+
mux.HandleFunc("/api/v1/versions", func(w http.ResponseWriter, r *http.Request) {
75+
apiResponse(ctx, w, r, map[string]string{
76+
"signature": Signature(),
77+
"version": Version(),
78+
})
79+
})
80+
81+
// The WebRTC WHIP API handler.
82+
logger.Df(ctx, "Handle /rtc/v1/whip/ by %v", addr)
83+
mux.HandleFunc("/rtc/v1/whip/", func(w http.ResponseWriter, r *http.Request) {
84+
if err := v.rtc.HandleApiForWHIP(ctx, w, r); err != nil {
85+
apiError(ctx, w, r, err)
86+
}
87+
})
88+
89+
// The WebRTC WHEP API handler.
90+
logger.Df(ctx, "Handle /rtc/v1/whep/ by %v", addr)
91+
mux.HandleFunc("/rtc/v1/whep/", func(w http.ResponseWriter, r *http.Request) {
92+
if err := v.rtc.HandleApiForWHEP(ctx, w, r); err != nil {
93+
apiError(ctx, w, r, err)
94+
}
95+
})
96+
97+
// Run HTTP API server.
98+
v.wg.Add(1)
99+
go func() {
100+
defer v.wg.Done()
101+
102+
err := v.server.ListenAndServe()
103+
if err != nil {
104+
if ctx.Err() != context.Canceled {
105+
// TODO: If HTTP API server closed unexpectedly, we should notice the main loop to quit.
106+
logger.Wf(ctx, "HTTP API accept err %+v", err)
107+
} else {
108+
logger.Df(ctx, "HTTP API server done")
109+
}
110+
}
111+
}()
112+
113+
return nil
114+
}
115+
116+
// systemAPI is the system HTTP API of the proxy server, for SRS media server to register the service
117+
// to proxy server. It also provides some other system APIs like the status of proxy server, like exporter
118+
// for Prometheus metrics.
119+
type systemAPI struct {
120+
// The underlayer HTTP server.
121+
server *http.Server
122+
// The gracefully quit timeout, wait server to quit.
123+
gracefulQuitTimeout time.Duration
124+
// The wait group for all goroutines.
125+
wg sync.WaitGroup
126+
}
127+
128+
func NewSystemAPI(opts ...func(*systemAPI)) *systemAPI {
129+
v := &systemAPI{}
130+
for _, opt := range opts {
131+
opt(v)
132+
}
133+
return v
134+
}
135+
136+
func (v *systemAPI) Close() error {
137+
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
138+
defer cancel()
139+
v.server.Shutdown(ctx)
140+
141+
v.wg.Wait()
142+
return nil
143+
}
144+
145+
func (v *systemAPI) Run(ctx context.Context) error {
146+
// Parse address to listen.
147+
addr := envSystemAPI()
148+
if !strings.Contains(addr, ":") {
149+
addr = ":" + addr
150+
}
151+
152+
// Create server and handler.
153+
mux := http.NewServeMux()
154+
v.server = &http.Server{Addr: addr, Handler: mux}
155+
logger.Df(ctx, "System API server listen at %v", addr)
156+
157+
// Shutdown the server gracefully when quiting.
158+
go func() {
159+
ctxParent := ctx
160+
<-ctxParent.Done()
161+
162+
ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout)
163+
defer cancel()
164+
165+
v.server.Shutdown(ctx)
166+
}()
167+
168+
// The basic version handler, also can be used as health check API.
169+
logger.Df(ctx, "Handle /api/v1/versions by %v", addr)
170+
mux.HandleFunc("/api/v1/versions", func(w http.ResponseWriter, r *http.Request) {
171+
apiResponse(ctx, w, r, map[string]string{
172+
"signature": Signature(),
173+
"version": Version(),
174+
})
175+
})
176+
177+
// The register service for SRS media servers.
178+
logger.Df(ctx, "Handle /api/v1/srs/register by %v", addr)
179+
mux.HandleFunc("/api/v1/srs/register", func(w http.ResponseWriter, r *http.Request) {
180+
if err := func() error {
181+
var deviceID, ip, serverID, serviceID, pid string
182+
var rtmp, stream, api, srt, rtc []string
183+
if err := ParseBody(r.Body, &struct {
184+
// The IP of SRS, mandatory.
185+
IP *string `json:"ip"`
186+
// The server id of SRS, store in file, may not change, mandatory.
187+
ServerID *string `json:"server"`
188+
// The service id of SRS, always change when restarted, mandatory.
189+
ServiceID *string `json:"service"`
190+
// The process id of SRS, always change when restarted, mandatory.
191+
PID *string `json:"pid"`
192+
// The RTMP listen endpoints, mandatory.
193+
RTMP *[]string `json:"rtmp"`
194+
// The HTTP Stream listen endpoints, optional.
195+
HTTP *[]string `json:"http"`
196+
// The API listen endpoints, optional.
197+
API *[]string `json:"api"`
198+
// The SRT listen endpoints, optional.
199+
SRT *[]string `json:"srt"`
200+
// The RTC listen endpoints, optional.
201+
RTC *[]string `json:"rtc"`
202+
// The device id of SRS, optional.
203+
DeviceID *string `json:"device_id"`
204+
}{
205+
IP: &ip, DeviceID: &deviceID,
206+
ServerID: &serverID, ServiceID: &serviceID, PID: &pid,
207+
RTMP: &rtmp, HTTP: &stream, API: &api, SRT: &srt, RTC: &rtc,
208+
}); err != nil {
209+
return errors.Wrapf(err, "parse body")
210+
}
211+
212+
if ip == "" {
213+
return errors.Errorf("empty ip")
214+
}
215+
if serverID == "" {
216+
return errors.Errorf("empty server")
217+
}
218+
if serviceID == "" {
219+
return errors.Errorf("empty service")
220+
}
221+
if pid == "" {
222+
return errors.Errorf("empty pid")
223+
}
224+
if len(rtmp) == 0 {
225+
return errors.Errorf("empty rtmp")
226+
}
227+
228+
server := NewSRSServer(func(srs *SRSServer) {
229+
srs.IP, srs.DeviceID = ip, deviceID
230+
srs.ServerID, srs.ServiceID, srs.PID = serverID, serviceID, pid
231+
srs.RTMP, srs.HTTP, srs.API = rtmp, stream, api
232+
srs.SRT, srs.RTC = srt, rtc
233+
srs.UpdatedAt = time.Now()
234+
})
235+
if err := srsLoadBalancer.Update(ctx, server); err != nil {
236+
return errors.Wrapf(err, "update SRS server %+v", server)
237+
}
238+
239+
logger.Df(ctx, "Register SRS media server, %+v", server)
240+
return nil
241+
}(); err != nil {
242+
apiError(ctx, w, r, err)
243+
}
244+
245+
type Response struct {
246+
Code int `json:"code"`
247+
PID string `json:"pid"`
248+
}
249+
250+
apiResponse(ctx, w, r, &Response{
251+
Code: 0, PID: fmt.Sprintf("%v", os.Getpid()),
252+
})
253+
})
254+
255+
// Run System API server.
256+
v.wg.Add(1)
257+
go func() {
258+
defer v.wg.Done()
259+
260+
err := v.server.ListenAndServe()
261+
if err != nil {
262+
if ctx.Err() != context.Canceled {
263+
// TODO: If System API server closed unexpectedly, we should notice the main loop to quit.
264+
logger.Wf(ctx, "System API accept err %+v", err)
265+
} else {
266+
logger.Df(ctx, "System API server done")
267+
}
268+
}
269+
}()
270+
271+
return nil
272+
}

debug.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// Copyright (c) 2025 Winlin
2+
//
3+
// SPDX-License-Identifier: MIT
4+
package main
5+
6+
import (
7+
"context"
8+
"net/http"
9+
10+
"srs-proxy/logger"
11+
)
12+
13+
func handleGoPprof(ctx context.Context) {
14+
if addr := envGoPprof(); addr != "" {
15+
go func() {
16+
logger.Df(ctx, "Start Go pprof at %v", addr)
17+
http.ListenAndServe(addr, nil)
18+
}()
19+
}
20+
}

0 commit comments

Comments
 (0)