diff --git a/server/cmd/api/api/api.go b/server/cmd/api/api/api.go index f5d8084d..0fa4d85a 100644 --- a/server/cmd/api/api/api.go +++ b/server/cmd/api/api/api.go @@ -14,14 +14,20 @@ import ( oapi "github.com/onkernel/kernel-images/server/lib/oapi" "github.com/onkernel/kernel-images/server/lib/recorder" "github.com/onkernel/kernel-images/server/lib/scaletozero" + "github.com/onkernel/kernel-images/server/lib/stream" ) type ApiService struct { // defaultRecorderID is used whenever the caller doesn't specify an explicit ID. defaultRecorderID string - - recordManager recorder.RecordManager - factory recorder.FFmpegRecorderFactory + defaultStreamID string + + recordManager recorder.RecordManager + factory recorder.FFmpegRecorderFactory + streamManager stream.Manager + streamFactory stream.FFmpegStreamerFactory + rtmpServer stream.InternalServer + streamDefaults stream.Params // Filesystem watch management watchMu sync.RWMutex watches map[string]*fsWatch @@ -46,7 +52,7 @@ type ApiService struct { var _ oapi.StrictServerInterface = (*ApiService)(nil) -func New(recordManager recorder.RecordManager, factory recorder.FFmpegRecorderFactory, upstreamMgr *devtoolsproxy.UpstreamManager, stz scaletozero.Controller, nekoAuthClient *nekoclient.AuthClient) (*ApiService, error) { +func New(recordManager recorder.RecordManager, factory recorder.FFmpegRecorderFactory, upstreamMgr *devtoolsproxy.UpstreamManager, stz scaletozero.Controller, nekoAuthClient *nekoclient.AuthClient, streamManager stream.Manager, streamFactory stream.FFmpegStreamerFactory, rtmpServer stream.InternalServer, streamDefaults stream.Params) (*ApiService, error) { switch { case recordManager == nil: return nil, fmt.Errorf("recordManager cannot be nil") @@ -56,12 +62,26 @@ func New(recordManager recorder.RecordManager, factory recorder.FFmpegRecorderFa return nil, fmt.Errorf("upstreamMgr cannot be nil") case nekoAuthClient == nil: return nil, fmt.Errorf("nekoAuthClient cannot be nil") + case streamManager == nil: + return nil, fmt.Errorf("streamManager cannot be nil") + case streamFactory == nil: + return nil, fmt.Errorf("streamFactory cannot be nil") + case rtmpServer == nil: + return nil, fmt.Errorf("rtmpServer cannot be nil") + } + if streamDefaults.FrameRate == nil || streamDefaults.DisplayNum == nil { + return nil, fmt.Errorf("streamDefaults must include frame rate and display number") } return &ApiService{ recordManager: recordManager, factory: factory, defaultRecorderID: "default", + streamManager: streamManager, + streamFactory: streamFactory, + rtmpServer: rtmpServer, + streamDefaults: streamDefaults, + defaultStreamID: "default", watches: make(map[string]*fsWatch), procs: make(map[string]*processHandle), upstreamMgr: upstreamMgr, @@ -236,6 +256,129 @@ func (s *ApiService) DeleteRecording(ctx context.Context, req oapi.DeleteRecordi return oapi.DeleteRecording200Response{}, nil } +func (s *ApiService) StartStream(ctx context.Context, req oapi.StartStreamRequestObject) (oapi.StartStreamResponseObject, error) { + log := logger.FromContext(ctx) + + if req.Body == nil { + return oapi.StartStream400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: "request body required"}}, nil + } + + streamID := s.defaultStreamID + if req.Body.Id != nil && *req.Body.Id != "" { + streamID = *req.Body.Id + } + + mode := stream.ModeInternal + if req.Body.Mode != nil && *req.Body.Mode != "" { + mode = stream.Mode(*req.Body.Mode) + } + if mode != stream.ModeInternal && mode != stream.ModeRemote { + return oapi.StartStream400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: "invalid stream mode"}}, nil + } + + frameRate := s.streamDefaults.FrameRate + if req.Body.Framerate != nil { + frameRate = req.Body.Framerate + } + + if existing, ok := s.streamManager.GetStream(streamID); ok { + if existing.IsStreaming(ctx) { + return oapi.StartStream409JSONResponse{ConflictErrorJSONResponse: oapi.ConflictErrorJSONResponse{Message: "stream already in progress"}}, nil + } + _ = s.streamManager.DeregisterStream(ctx, existing) + } + + var ingestURL string + var playbackURL *string + var securePlaybackURL *string + streamPath := fmt.Sprintf("live/%s", streamID) + + switch mode { + case stream.ModeInternal: + if err := s.rtmpServer.Start(ctx); err != nil { + log.Error("failed to start internal rtmp server", "err", err) + return oapi.StartStream500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to start internal streaming server"}}, nil + } + s.rtmpServer.EnsureStream(streamPath) + ingestURL = s.rtmpServer.IngestURL(streamPath) + playbackURL, securePlaybackURL = s.rtmpServer.PlaybackURLs("", streamPath) + case stream.ModeRemote: + if req.Body.TargetUrl == nil || *req.Body.TargetUrl == "" { + return oapi.StartStream400JSONResponse{BadRequestErrorJSONResponse: oapi.BadRequestErrorJSONResponse{Message: "target_url is required for remote streaming"}}, nil + } + ingestURL = *req.Body.TargetUrl + playbackURL = &ingestURL + } + + params := stream.Params{ + FrameRate: frameRate, + DisplayNum: s.streamDefaults.DisplayNum, + Mode: mode, + IngestURL: ingestURL, + PlaybackURL: playbackURL, + SecurePlaybackURL: securePlaybackURL, + } + + streamer, err := s.streamFactory(streamID, params) + if err != nil { + log.Error("failed to create streamer", "err", err, "stream_id", streamID) + return oapi.StartStream500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to create streamer"}}, nil + } + if err := s.streamManager.RegisterStream(ctx, streamer); err != nil { + log.Error("failed to register stream", "err", err, "stream_id", streamID) + return oapi.StartStream409JSONResponse{ConflictErrorJSONResponse: oapi.ConflictErrorJSONResponse{Message: "stream already exists"}}, nil + } + if err := streamer.Start(ctx); err != nil { + log.Error("failed to start stream", "err", err, "stream_id", streamID) + _ = s.streamManager.DeregisterStream(ctx, streamer) + return oapi.StartStream500JSONResponse{InternalErrorJSONResponse: oapi.InternalErrorJSONResponse{Message: "failed to start stream"}}, nil + } + + return oapi.StartStream201JSONResponse(streamMetadataToOAPI(streamer.Metadata(), streamer.IsStreaming(ctx))), nil +} + +func (s *ApiService) StopStream(ctx context.Context, req oapi.StopStreamRequestObject) (oapi.StopStreamResponseObject, error) { + log := logger.FromContext(ctx) + + streamID := s.defaultStreamID + if req.Body != nil && req.Body.Id != nil && *req.Body.Id != "" { + streamID = *req.Body.Id + } + + streamer, ok := s.streamManager.GetStream(streamID) + if !ok { + return oapi.StopStream404JSONResponse{NotFoundErrorJSONResponse: oapi.NotFoundErrorJSONResponse{Message: "stream not found"}}, nil + } + + if err := streamer.Stop(ctx); err != nil { + log.Error("failed to stop stream", "err", err, "stream_id", streamID) + } + _ = s.streamManager.DeregisterStream(ctx, streamer) + + return oapi.StopStream200Response{}, nil +} + +func (s *ApiService) ListStreams(ctx context.Context, _ oapi.ListStreamsRequestObject) (oapi.ListStreamsResponseObject, error) { + streams := s.streamManager.ListStreams(ctx) + infos := make([]oapi.StreamInfo, 0, len(streams)) + for _, st := range streams { + infos = append(infos, streamMetadataToOAPI(st.Metadata(), st.IsStreaming(ctx))) + } + return oapi.ListStreams200JSONResponse(infos), nil +} + +func streamMetadataToOAPI(meta stream.Metadata, isStreaming bool) oapi.StreamInfo { + return oapi.StreamInfo{ + Id: meta.ID, + Mode: oapi.StreamInfoMode(meta.Mode), + IngestUrl: meta.IngestURL, + PlaybackUrl: meta.PlaybackURL, + SecurePlaybackUrl: meta.SecurePlaybackURL, + StartedAt: meta.StartedAt, + IsStreaming: isStreaming, + } +} + // ListRecorders returns a list of all registered recorders and whether each one is currently recording. func (s *ApiService) ListRecorders(ctx context.Context, _ oapi.ListRecordersRequestObject) (oapi.ListRecordersResponseObject, error) { infos := []oapi.RecorderInfo{} @@ -261,5 +404,19 @@ func (s *ApiService) ListRecorders(ctx context.Context, _ oapi.ListRecordersRequ } func (s *ApiService) Shutdown(ctx context.Context) error { - return s.recordManager.StopAll(ctx) + var errs []error + if err := s.recordManager.StopAll(ctx); err != nil { + errs = append(errs, err) + } + if err := s.streamManager.StopAll(ctx); err != nil { + errs = append(errs, err) + } + if err := s.rtmpServer.Close(ctx); err != nil { + errs = append(errs, err) + } + + if len(errs) > 0 { + return errors.Join(errs...) + } + return nil } diff --git a/server/cmd/api/api/api_test.go b/server/cmd/api/api/api_test.go index dc192e30..4869f56f 100644 --- a/server/cmd/api/api/api_test.go +++ b/server/cmd/api/api/api_test.go @@ -8,6 +8,7 @@ import ( "math/rand" "os" "testing" + "time" "log/slog" @@ -16,6 +17,7 @@ import ( oapi "github.com/onkernel/kernel-images/server/lib/oapi" "github.com/onkernel/kernel-images/server/lib/recorder" "github.com/onkernel/kernel-images/server/lib/scaletozero" + "github.com/onkernel/kernel-images/server/lib/stream" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -25,8 +27,7 @@ func TestApiService_StartRecording(t *testing.T) { t.Run("success", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) - require.NoError(t, err) + svc := newApiServiceForTest(t, mgr) resp, err := svc.StartRecording(ctx, oapi.StartRecordingRequestObject{}) require.NoError(t, err) @@ -39,11 +40,10 @@ func TestApiService_StartRecording(t *testing.T) { t.Run("already recording", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) - require.NoError(t, err) + svc := newApiServiceForTest(t, mgr) // First start should succeed - _, err = svc.StartRecording(ctx, oapi.StartRecordingRequestObject{}) + _, err := svc.StartRecording(ctx, oapi.StartRecordingRequestObject{}) require.NoError(t, err) // Second start should return conflict @@ -54,9 +54,9 @@ func TestApiService_StartRecording(t *testing.T) { t.Run("custom ids don't collide", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) - require.NoError(t, err) + svc := newApiServiceForTest(t, mgr) + var err error for i := 0; i < 5; i++ { customID := fmt.Sprintf("rec-%d", i) resp, err := svc.StartRecording(ctx, oapi.StartRecordingRequestObject{Body: &oapi.StartRecordingJSONRequestBody{Id: &customID}}) @@ -87,8 +87,7 @@ func TestApiService_StopRecording(t *testing.T) { t.Run("no active recording", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) - require.NoError(t, err) + svc := newApiServiceForTest(t, mgr) resp, err := svc.StopRecording(ctx, oapi.StopRecordingRequestObject{}) require.NoError(t, err) @@ -100,8 +99,7 @@ func TestApiService_StopRecording(t *testing.T) { rec := &mockRecorder{id: "default", isRecordingFlag: true} require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder") - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) - require.NoError(t, err) + svc := newApiServiceForTest(t, mgr) resp, err := svc.StopRecording(ctx, oapi.StopRecordingRequestObject{}) require.NoError(t, err) require.IsType(t, oapi.StopRecording200Response{}, resp) @@ -115,8 +113,7 @@ func TestApiService_StopRecording(t *testing.T) { force := true req := oapi.StopRecordingRequestObject{Body: &oapi.StopRecordingJSONRequestBody{ForceStop: &force}} - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) - require.NoError(t, err) + svc := newApiServiceForTest(t, mgr) resp, err := svc.StopRecording(ctx, req) require.NoError(t, err) require.IsType(t, oapi.StopRecording200Response{}, resp) @@ -129,8 +126,7 @@ func TestApiService_DownloadRecording(t *testing.T) { t.Run("not found", func(t *testing.T) { mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) - require.NoError(t, err) + svc := newApiServiceForTest(t, mgr) resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{}) require.NoError(t, err) require.IsType(t, oapi.DownloadRecording404JSONResponse{}, resp) @@ -149,8 +145,7 @@ func TestApiService_DownloadRecording(t *testing.T) { rec := &mockRecorder{id: "default", isRecordingFlag: true, recordingData: randomBytes(minRecordingSizeInBytes - 1)} require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder") - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) - require.NoError(t, err) + svc := newApiServiceForTest(t, mgr) // will return a 202 when the recording is too small resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{}) require.NoError(t, err) @@ -179,8 +174,7 @@ func TestApiService_DownloadRecording(t *testing.T) { rec := &mockRecorder{id: "default", recordingData: data} require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder") - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) - require.NoError(t, err) + svc := newApiServiceForTest(t, mgr) resp, err := svc.DownloadRecording(ctx, oapi.DownloadRecordingRequestObject{}) require.NoError(t, err) r, ok := resp.(oapi.DownloadRecording200Videomp4Response) @@ -193,14 +187,67 @@ func TestApiService_DownloadRecording(t *testing.T) { }) } +func TestApiService_StreamLifecycle(t *testing.T) { + ctx := context.Background() + mgr := recorder.NewFFmpegManager() + svc := newApiServiceForTest(t, mgr) + + mode := oapi.StartStreamRequestModeInternal + resp, err := svc.StartStream(ctx, oapi.StartStreamRequestObject{ + Body: &oapi.StartStreamJSONRequestBody{Mode: &mode}, + }) + require.NoError(t, err) + created, ok := resp.(oapi.StartStream201JSONResponse) + require.True(t, ok, "expected start stream response") + assert.Equal(t, oapi.StreamInfoModeInternal, created.Mode) + assert.True(t, created.IsStreaming) + + streamer, exists := svc.streamManager.GetStream("default") + require.True(t, exists) + assert.True(t, streamer.IsStreaming(ctx)) + + listResp, err := svc.ListStreams(ctx, oapi.ListStreamsRequestObject{}) + require.NoError(t, err) + listTyped, ok := listResp.(oapi.ListStreams200JSONResponse) + require.True(t, ok) + require.Len(t, listTyped, 1) + assert.Equal(t, oapi.StreamInfoModeInternal, listTyped[0].Mode) + + stopResp, err := svc.StopStream(ctx, oapi.StopStreamRequestObject{}) + require.NoError(t, err) + require.IsType(t, oapi.StopStream200Response{}, stopResp) +} + +func TestApiService_StartStream_RemoteValidation(t *testing.T) { + ctx := context.Background() + mgr := recorder.NewFFmpegManager() + svc := newApiServiceForTest(t, mgr) + + mode := oapi.StartStreamRequestModeRemote + resp, err := svc.StartStream(ctx, oapi.StartStreamRequestObject{ + Body: &oapi.StartStreamJSONRequestBody{Mode: &mode}, + }) + require.NoError(t, err) + require.IsType(t, oapi.StartStream400JSONResponse{}, resp) +} + +func TestApiService_StopStream_NotFound(t *testing.T) { + ctx := context.Background() + mgr := recorder.NewFFmpegManager() + svc := newApiServiceForTest(t, mgr) + + resp, err := svc.StopStream(ctx, oapi.StopStreamRequestObject{}) + require.NoError(t, err) + require.IsType(t, oapi.StopStream404JSONResponse{}, resp) +} + func TestApiService_Shutdown(t *testing.T) { ctx := context.Background() mgr := recorder.NewFFmpegManager() rec := &mockRecorder{id: "default", isRecordingFlag: true} require.NoError(t, mgr.RegisterRecorder(ctx, rec), "failed to register recorder") - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) - require.NoError(t, err) + svc := newApiServiceForTest(t, mgr) require.NoError(t, svc.Shutdown(ctx)) require.True(t, rec.stopCalled, "Shutdown should have stopped active recorder") @@ -290,6 +337,92 @@ func newMockFactory() recorder.FFmpegRecorderFactory { } } +type mockStreamer struct { + id string + meta stream.Metadata + started bool + startErr error + stopErr error +} + +func (m *mockStreamer) ID() string { return m.id } + +func (m *mockStreamer) Start(ctx context.Context) error { + if m.startErr != nil { + return m.startErr + } + m.started = true + if m.meta.StartedAt.IsZero() { + m.meta.StartedAt = time.Now() + } + return nil +} + +func (m *mockStreamer) Stop(ctx context.Context) error { + if m.stopErr != nil { + return m.stopErr + } + m.started = false + return nil +} + +func (m *mockStreamer) IsStreaming(ctx context.Context) bool { return m.started } + +func (m *mockStreamer) Metadata() stream.Metadata { + meta := m.meta + if meta.ID == "" { + meta.ID = m.id + } + return meta +} + +type mockRTMPServer struct{} + +func (mockRTMPServer) Start(ctx context.Context) error { return nil } +func (mockRTMPServer) EnsureStream(path string) {} +func (mockRTMPServer) IngestURL(path string) string { + return "rtmp://internal/" + path +} +func (mockRTMPServer) PlaybackURLs(host string, path string) (*string, *string) { + url := "rtmp://" + host + "/" + path + return &url, nil +} +func (mockRTMPServer) Close(ctx context.Context) error { return nil } + +func newMockStreamFactory() stream.FFmpegStreamerFactory { + return func(id string, params stream.Params) (stream.Streamer, error) { + return &mockStreamer{ + id: id, + meta: stream.Metadata{ + ID: id, + Mode: params.Mode, + IngestURL: params.IngestURL, + PlaybackURL: params.PlaybackURL, + SecurePlaybackURL: params.SecurePlaybackURL, + StartedAt: time.Now(), + }, + }, nil + } +} + +func testStreamDefaults() stream.Params { + frameRate := 10 + display := 1 + return stream.Params{ + FrameRate: &frameRate, + DisplayNum: &display, + Mode: stream.ModeInternal, + } +} + +func newApiServiceForTest(t *testing.T, mgr recorder.RecordManager) *ApiService { + t.Helper() + defaults := testStreamDefaults() + svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t), stream.NewStreamManager(), newMockStreamFactory(), mockRTMPServer{}, defaults) + require.NoError(t, err) + return svc +} + func newTestUpstreamManager() *devtoolsproxy.UpstreamManager { logger := slog.New(slog.NewTextHandler(io.Discard, nil)) return devtoolsproxy.NewUpstreamManager("", logger) @@ -306,8 +439,7 @@ func newMockNekoClient(t *testing.T) *nekoclient.AuthClient { func TestApiService_PatchChromiumFlags(t *testing.T) { ctx := context.Background() mgr := recorder.NewFFmpegManager() - svc, err := New(mgr, newMockFactory(), newTestUpstreamManager(), scaletozero.NewNoopController(), newMockNekoClient(t)) - require.NoError(t, err) + svc := newApiServiceForTest(t, mgr) // Test with valid flags flags := []string{"--kiosk", "--start-maximized"} diff --git a/server/cmd/api/main.go b/server/cmd/api/main.go index e25f5496..4a1bda2a 100644 --- a/server/cmd/api/main.go +++ b/server/cmd/api/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "crypto/tls" "encoding/json" "fmt" "log/slog" @@ -27,6 +28,7 @@ import ( oapi "github.com/onkernel/kernel-images/server/lib/oapi" "github.com/onkernel/kernel-images/server/lib/recorder" "github.com/onkernel/kernel-images/server/lib/scaletozero" + "github.com/onkernel/kernel-images/server/lib/stream" ) func main() { @@ -72,6 +74,31 @@ func main() { os.Exit(1) } + streamDefaults := stream.Params{ + DisplayNum: &config.DisplayNum, + FrameRate: &config.FrameRate, + Mode: stream.ModeInternal, + } + + var tlsConfig *tls.Config + if config.RTMPSCertPath != "" && config.RTMPSKeyPath != "" { + cert, err := tls.LoadX509KeyPair(config.RTMPSCertPath, config.RTMPSKeyPath) + if err != nil { + slogger.Error("failed to load RTMPS certificate", "err", err) + } else { + tlsConfig = &tls.Config{Certificates: []tls.Certificate{cert}} + } + } + if tlsConfig == nil && config.RTMPSListenAddr != "" { + cfg, err := stream.SelfSignedTLSConfig() + if err != nil { + slogger.Error("failed to generate self-signed RTMPS certificate", "err", err) + } else { + tlsConfig = cfg + } + } + rtmpServer := stream.NewRTMPServer(config.RTMPListenAddr, config.RTMPSListenAddr, tlsConfig, slogger) + // DevTools WebSocket upstream manager: tail Chromium supervisord log const chromiumLogPath = "/var/log/supervisord/chromium" upstreamMgr := devtoolsproxy.NewUpstreamManager(chromiumLogPath, slogger) @@ -94,6 +121,10 @@ func main() { upstreamMgr, stz, nekoAuthClient, + stream.NewStreamManager(), + stream.NewFFmpegStreamerFactory(config.PathToFFmpeg, streamDefaults, stz), + rtmpServer, + streamDefaults, ) if err != nil { slogger.Error("failed to create api service", "err", err) diff --git a/server/cmd/config/config.go b/server/cmd/config/config.go index 7b063d3b..b3bfc4d7 100644 --- a/server/cmd/config/config.go +++ b/server/cmd/config/config.go @@ -20,6 +20,12 @@ type Config struct { // Absolute or relative path to the ffmpeg binary. If empty the code falls back to "ffmpeg" on $PATH. PathToFFmpeg string `envconfig:"FFMPEG_PATH" default:"ffmpeg"` + // RTMP/RTMPS internal server configuration + RTMPListenAddr string `envconfig:"RTMP_LISTEN_ADDR" default:":1935"` + RTMPSListenAddr string `envconfig:"RTMPS_LISTEN_ADDR" default:":1936"` + RTMPSCertPath string `envconfig:"RTMPS_CERT_PATH" default:""` + RTMPSKeyPath string `envconfig:"RTMPS_KEY_PATH" default:""` + // DevTools proxy configuration LogCDPMessages bool `envconfig:"LOG_CDP_MESSAGES" default:"false"` } @@ -53,6 +59,12 @@ func validate(config *Config) error { if config.PathToFFmpeg == "" { return fmt.Errorf("FFMPEG_PATH is required") } + if config.RTMPListenAddr == "" { + return fmt.Errorf("RTMP_LISTEN_ADDR is required") + } + if (config.RTMPSCertPath == "") != (config.RTMPSKeyPath == "") { + return fmt.Errorf("RTMPS_CERT_PATH and RTMPS_KEY_PATH must both be set or both be empty") + } return nil } diff --git a/server/cmd/config/config_test.go b/server/cmd/config/config_test.go index 5b2ce1d2..32ad8bff 100644 --- a/server/cmd/config/config_test.go +++ b/server/cmd/config/config_test.go @@ -17,12 +17,16 @@ func TestLoad(t *testing.T) { name: "defaults (no env set)", env: map[string]string{}, wantCfg: &Config{ - Port: 10001, - FrameRate: 10, - DisplayNum: 1, - MaxSizeInMB: 500, - OutputDir: ".", - PathToFFmpeg: "ffmpeg", + Port: 10001, + FrameRate: 10, + DisplayNum: 1, + MaxSizeInMB: 500, + OutputDir: ".", + PathToFFmpeg: "ffmpeg", + RTMPListenAddr: ":1935", + RTMPSListenAddr: ":1936", + RTMPSCertPath: "", + RTMPSKeyPath: "", }, }, { @@ -36,12 +40,16 @@ func TestLoad(t *testing.T) { "FFMPEG_PATH": "/usr/local/bin/ffmpeg", }, wantCfg: &Config{ - Port: 12345, - FrameRate: 20, - DisplayNum: 2, - MaxSizeInMB: 250, - OutputDir: "/tmp", - PathToFFmpeg: "/usr/local/bin/ffmpeg", + Port: 12345, + FrameRate: 20, + DisplayNum: 2, + MaxSizeInMB: 250, + OutputDir: "/tmp", + PathToFFmpeg: "/usr/local/bin/ffmpeg", + RTMPListenAddr: ":1935", + RTMPSListenAddr: ":1936", + RTMPSCertPath: "", + RTMPSKeyPath: "", }, }, { diff --git a/server/go.mod b/server/go.mod index 9b16ea6c..b16c33d4 100644 --- a/server/go.mod +++ b/server/go.mod @@ -12,6 +12,7 @@ require ( github.com/google/uuid v1.6.0 github.com/kelseyhightower/envconfig v1.4.0 github.com/m1k1o/neko/server v0.0.0-20251008185748-46e2fc7d3866 + github.com/notedit/rtmp v0.0.2 github.com/nrednav/cuid2 v1.1.0 github.com/oapi-codegen/runtime v1.1.2 github.com/samber/lo v1.52.0 diff --git a/server/go.sum b/server/go.sum index 95301225..005e576f 100644 --- a/server/go.sum +++ b/server/go.sum @@ -50,6 +50,8 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw= github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826/go.mod h1:TaXosZuwdSHYgviHp1DAtfrULt5eUgsSMsZf+YrPgl8= +github.com/notedit/rtmp v0.0.2 h1:5+to4yezKATiJgnrcETu9LbV5G/QsWkOV9Ts2M/p33w= +github.com/notedit/rtmp v0.0.2/go.mod h1:vzuE21rowz+lT1NGsWbreIvYulgBpCGnQyeTyFblUHc= github.com/nrednav/cuid2 v1.1.0 h1:Y2P9Fo1Iz7lKuwcn+fS0mbxkNvEqoNLUtm0+moHCnYc= github.com/nrednav/cuid2 v1.1.0/go.mod h1:jBjkJAI+QLM4EUGvtwGDHC1cP1QQrRNfLo/A7qJFDhA= github.com/oapi-codegen/runtime v1.1.2 h1:P2+CubHq8fO4Q6fV1tqDBZHCwpVpvPg7oKiYzQgXIyI= diff --git a/server/lib/oapi/oapi.go b/server/lib/oapi/oapi.go index 4b08cf14..a6d128c3 100644 --- a/server/lib/oapi/oapi.go +++ b/server/lib/oapi/oapi.go @@ -1,6 +1,6 @@ // Package oapi provides primitives to interact with the openapi HTTP API. // -// Code generated by github.com/oapi-codegen/oapi-codegen/v2 version v2.5.0 DO NOT EDIT. +// Code generated by github.com/oapi-codegen/oapi-codegen/v2 version v2.5.1 DO NOT EDIT. package oapi import ( @@ -89,6 +89,18 @@ const ( Stdout ProcessStreamEventStream = "stdout" ) +// Defines values for StartStreamRequestMode. +const ( + StartStreamRequestModeInternal StartStreamRequestMode = "internal" + StartStreamRequestModeRemote StartStreamRequestMode = "remote" +) + +// Defines values for StreamInfoMode. +const ( + StreamInfoModeInternal StreamInfoMode = "internal" + StreamInfoModeRemote StreamInfoMode = "remote" +) + // Defines values for LogsStreamParamsSource. const ( Path LogsStreamParamsSource = "path" @@ -533,6 +545,26 @@ type StartRecordingRequest struct { MaxFileSizeInMB *int `json:"maxFileSizeInMB,omitempty"` } +// StartStreamRequest defines model for StartStreamRequest. +type StartStreamRequest struct { + // Framerate Streaming framerate in fps (overrides server default) + Framerate *int `json:"framerate,omitempty"` + + // Id Optional identifier for the streaming session. Alphanumeric or hyphen. + Id *string `json:"id,omitempty"` + + // Mode Where to send the stream output. "internal" starts a local RTMP(S) server and streams to it. + // "remote" pushes the stream to the provided RTMP/RTMPS target_url. + Mode *StartStreamRequestMode `json:"mode,omitempty"` + + // TargetUrl RTMP or RTMPS URL to push the stream to when mode is "remote". + TargetUrl *string `json:"target_url,omitempty"` +} + +// StartStreamRequestMode Where to send the stream output. "internal" starts a local RTMP(S) server and streams to it. +// "remote" pushes the stream to the provided RTMP/RTMPS target_url. +type StartStreamRequestMode string + // StopRecordingRequest defines model for StopRecordingRequest. type StopRecordingRequest struct { // ForceStop Immediately stop without graceful shutdown. This may result in a corrupted video file. @@ -542,6 +574,39 @@ type StopRecordingRequest struct { Id *string `json:"id,omitempty"` } +// StopStreamRequest defines model for StopStreamRequest. +type StopStreamRequest struct { + // Id Identifier of the stream to stop. Alphanumeric or hyphen. + Id *string `json:"id,omitempty"` +} + +// StreamInfo defines model for StreamInfo. +type StreamInfo struct { + // Id Stream identifier + Id string `json:"id"` + + // IngestUrl URL ffmpeg is publishing to + IngestUrl string `json:"ingest_url"` + + // IsStreaming Whether the ffmpeg streaming process is currently running + IsStreaming bool `json:"is_streaming"` + + // Mode Whether the stream is using the internal RTMP server or a remote endpoint + Mode StreamInfoMode `json:"mode"` + + // PlaybackUrl RTMP playback URL if available (internal streams only) + PlaybackUrl *string `json:"playback_url"` + + // SecurePlaybackUrl RTMPS playback URL when TLS is enabled for the internal server + SecurePlaybackUrl *string `json:"secure_playback_url"` + + // StartedAt Timestamp when streaming started + StartedAt time.Time `json:"started_at"` +} + +// StreamInfoMode Whether the stream is using the internal RTMP server or a remote endpoint +type StreamInfoMode string + // TypeTextRequest defines model for TypeTextRequest. type TypeTextRequest struct { // Delay Delay in milliseconds between keystrokes @@ -732,6 +797,12 @@ type StartRecordingJSONRequestBody = StartRecordingRequest // StopRecordingJSONRequestBody defines body for StopRecording for application/json ContentType. type StopRecordingJSONRequestBody = StopRecordingRequest +// StartStreamJSONRequestBody defines body for StartStream for application/json ContentType. +type StartStreamJSONRequestBody = StartStreamRequest + +// StopStreamJSONRequestBody defines body for StopStream for application/json ContentType. +type StopStreamJSONRequestBody = StopStreamRequest + // RequestEditorFn is the function signature for the RequestEditor callback function type RequestEditorFn func(ctx context.Context, req *http.Request) error @@ -969,6 +1040,19 @@ type ClientInterface interface { StopRecordingWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) StopRecording(ctx context.Context, body StopRecordingJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error) + + // ListStreams request + ListStreams(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error) + + // StartStreamWithBody request with any body + StartStreamWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) + + StartStream(ctx context.Context, body StartStreamJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error) + + // StopStreamWithBody request with any body + StopStreamWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) + + StopStream(ctx context.Context, body StopStreamJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error) } func (c *Client) PatchChromiumFlagsWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) { @@ -1727,6 +1811,66 @@ func (c *Client) StopRecording(ctx context.Context, body StopRecordingJSONReques return c.Client.Do(req) } +func (c *Client) ListStreams(ctx context.Context, reqEditors ...RequestEditorFn) (*http.Response, error) { + req, err := NewListStreamsRequest(c.Server) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + if err := c.applyEditors(ctx, req, reqEditors); err != nil { + return nil, err + } + return c.Client.Do(req) +} + +func (c *Client) StartStreamWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) { + req, err := NewStartStreamRequestWithBody(c.Server, contentType, body) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + if err := c.applyEditors(ctx, req, reqEditors); err != nil { + return nil, err + } + return c.Client.Do(req) +} + +func (c *Client) StartStream(ctx context.Context, body StartStreamJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error) { + req, err := NewStartStreamRequest(c.Server, body) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + if err := c.applyEditors(ctx, req, reqEditors); err != nil { + return nil, err + } + return c.Client.Do(req) +} + +func (c *Client) StopStreamWithBody(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*http.Response, error) { + req, err := NewStopStreamRequestWithBody(c.Server, contentType, body) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + if err := c.applyEditors(ctx, req, reqEditors); err != nil { + return nil, err + } + return c.Client.Do(req) +} + +func (c *Client) StopStream(ctx context.Context, body StopStreamJSONRequestBody, reqEditors ...RequestEditorFn) (*http.Response, error) { + req, err := NewStopStreamRequest(c.Server, body) + if err != nil { + return nil, err + } + req = req.WithContext(ctx) + if err := c.applyEditors(ctx, req, reqEditors); err != nil { + return nil, err + } + return c.Client.Do(req) +} + // NewPatchChromiumFlagsRequest calls the generic PatchChromiumFlags builder with application/json body func NewPatchChromiumFlagsRequest(server string, body PatchChromiumFlagsJSONRequestBody) (*http.Request, error) { var bodyReader io.Reader @@ -3336,6 +3480,113 @@ func NewStopRecordingRequestWithBody(server string, contentType string, body io. return req, nil } +// NewListStreamsRequest generates requests for ListStreams +func NewListStreamsRequest(server string) (*http.Request, error) { + var err error + + serverURL, err := url.Parse(server) + if err != nil { + return nil, err + } + + operationPath := fmt.Sprintf("/stream/list") + if operationPath[0] == '/' { + operationPath = "." + operationPath + } + + queryURL, err := serverURL.Parse(operationPath) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("GET", queryURL.String(), nil) + if err != nil { + return nil, err + } + + return req, nil +} + +// NewStartStreamRequest calls the generic StartStream builder with application/json body +func NewStartStreamRequest(server string, body StartStreamJSONRequestBody) (*http.Request, error) { + var bodyReader io.Reader + buf, err := json.Marshal(body) + if err != nil { + return nil, err + } + bodyReader = bytes.NewReader(buf) + return NewStartStreamRequestWithBody(server, "application/json", bodyReader) +} + +// NewStartStreamRequestWithBody generates requests for StartStream with any type of body +func NewStartStreamRequestWithBody(server string, contentType string, body io.Reader) (*http.Request, error) { + var err error + + serverURL, err := url.Parse(server) + if err != nil { + return nil, err + } + + operationPath := fmt.Sprintf("/stream/start") + if operationPath[0] == '/' { + operationPath = "." + operationPath + } + + queryURL, err := serverURL.Parse(operationPath) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", queryURL.String(), body) + if err != nil { + return nil, err + } + + req.Header.Add("Content-Type", contentType) + + return req, nil +} + +// NewStopStreamRequest calls the generic StopStream builder with application/json body +func NewStopStreamRequest(server string, body StopStreamJSONRequestBody) (*http.Request, error) { + var bodyReader io.Reader + buf, err := json.Marshal(body) + if err != nil { + return nil, err + } + bodyReader = bytes.NewReader(buf) + return NewStopStreamRequestWithBody(server, "application/json", bodyReader) +} + +// NewStopStreamRequestWithBody generates requests for StopStream with any type of body +func NewStopStreamRequestWithBody(server string, contentType string, body io.Reader) (*http.Request, error) { + var err error + + serverURL, err := url.Parse(server) + if err != nil { + return nil, err + } + + operationPath := fmt.Sprintf("/stream/stop") + if operationPath[0] == '/' { + operationPath = "." + operationPath + } + + queryURL, err := serverURL.Parse(operationPath) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", queryURL.String(), body) + if err != nil { + return nil, err + } + + req.Header.Add("Content-Type", contentType) + + return req, nil +} + func (c *Client) applyEditors(ctx context.Context, req *http.Request, additionalEditors []RequestEditorFn) error { for _, r := range c.RequestEditors { if err := r(ctx, req); err != nil { @@ -3543,6 +3794,19 @@ type ClientWithResponsesInterface interface { StopRecordingWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*StopRecordingResponse, error) StopRecordingWithResponse(ctx context.Context, body StopRecordingJSONRequestBody, reqEditors ...RequestEditorFn) (*StopRecordingResponse, error) + + // ListStreamsWithResponse request + ListStreamsWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*ListStreamsResponse, error) + + // StartStreamWithBodyWithResponse request with any body + StartStreamWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*StartStreamResponse, error) + + StartStreamWithResponse(ctx context.Context, body StartStreamJSONRequestBody, reqEditors ...RequestEditorFn) (*StartStreamResponse, error) + + // StopStreamWithBodyWithResponse request with any body + StopStreamWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*StopStreamResponse, error) + + StopStreamWithResponse(ctx context.Context, body StopStreamJSONRequestBody, reqEditors ...RequestEditorFn) (*StopStreamResponse, error) } type PatchChromiumFlagsResponse struct { @@ -4476,6 +4740,78 @@ func (r StopRecordingResponse) StatusCode() int { return 0 } +type ListStreamsResponse struct { + Body []byte + HTTPResponse *http.Response + JSON200 *[]StreamInfo + JSON500 *InternalError +} + +// Status returns HTTPResponse.Status +func (r ListStreamsResponse) Status() string { + if r.HTTPResponse != nil { + return r.HTTPResponse.Status + } + return http.StatusText(0) +} + +// StatusCode returns HTTPResponse.StatusCode +func (r ListStreamsResponse) StatusCode() int { + if r.HTTPResponse != nil { + return r.HTTPResponse.StatusCode + } + return 0 +} + +type StartStreamResponse struct { + Body []byte + HTTPResponse *http.Response + JSON201 *StreamInfo + JSON400 *BadRequestError + JSON409 *ConflictError + JSON500 *InternalError +} + +// Status returns HTTPResponse.Status +func (r StartStreamResponse) Status() string { + if r.HTTPResponse != nil { + return r.HTTPResponse.Status + } + return http.StatusText(0) +} + +// StatusCode returns HTTPResponse.StatusCode +func (r StartStreamResponse) StatusCode() int { + if r.HTTPResponse != nil { + return r.HTTPResponse.StatusCode + } + return 0 +} + +type StopStreamResponse struct { + Body []byte + HTTPResponse *http.Response + JSON400 *BadRequestError + JSON404 *NotFoundError + JSON500 *InternalError +} + +// Status returns HTTPResponse.Status +func (r StopStreamResponse) Status() string { + if r.HTTPResponse != nil { + return r.HTTPResponse.Status + } + return http.StatusText(0) +} + +// StatusCode returns HTTPResponse.StatusCode +func (r StopStreamResponse) StatusCode() int { + if r.HTTPResponse != nil { + return r.HTTPResponse.StatusCode + } + return 0 +} + // PatchChromiumFlagsWithBodyWithResponse request with arbitrary body returning *PatchChromiumFlagsResponse func (c *ClientWithResponses) PatchChromiumFlagsWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*PatchChromiumFlagsResponse, error) { rsp, err := c.PatchChromiumFlagsWithBody(ctx, contentType, body, reqEditors...) @@ -5019,23 +5355,66 @@ func (c *ClientWithResponses) StopRecordingWithResponse(ctx context.Context, bod return ParseStopRecordingResponse(rsp) } -// ParsePatchChromiumFlagsResponse parses an HTTP response from a PatchChromiumFlagsWithResponse call -func ParsePatchChromiumFlagsResponse(rsp *http.Response) (*PatchChromiumFlagsResponse, error) { - bodyBytes, err := io.ReadAll(rsp.Body) - defer func() { _ = rsp.Body.Close() }() +// ListStreamsWithResponse request returning *ListStreamsResponse +func (c *ClientWithResponses) ListStreamsWithResponse(ctx context.Context, reqEditors ...RequestEditorFn) (*ListStreamsResponse, error) { + rsp, err := c.ListStreams(ctx, reqEditors...) if err != nil { return nil, err } + return ParseListStreamsResponse(rsp) +} - response := &PatchChromiumFlagsResponse{ - Body: bodyBytes, - HTTPResponse: rsp, +// StartStreamWithBodyWithResponse request with arbitrary body returning *StartStreamResponse +func (c *ClientWithResponses) StartStreamWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*StartStreamResponse, error) { + rsp, err := c.StartStreamWithBody(ctx, contentType, body, reqEditors...) + if err != nil { + return nil, err } + return ParseStartStreamResponse(rsp) +} - switch { - case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 400: - var dest BadRequestError - if err := json.Unmarshal(bodyBytes, &dest); err != nil { +func (c *ClientWithResponses) StartStreamWithResponse(ctx context.Context, body StartStreamJSONRequestBody, reqEditors ...RequestEditorFn) (*StartStreamResponse, error) { + rsp, err := c.StartStream(ctx, body, reqEditors...) + if err != nil { + return nil, err + } + return ParseStartStreamResponse(rsp) +} + +// StopStreamWithBodyWithResponse request with arbitrary body returning *StopStreamResponse +func (c *ClientWithResponses) StopStreamWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader, reqEditors ...RequestEditorFn) (*StopStreamResponse, error) { + rsp, err := c.StopStreamWithBody(ctx, contentType, body, reqEditors...) + if err != nil { + return nil, err + } + return ParseStopStreamResponse(rsp) +} + +func (c *ClientWithResponses) StopStreamWithResponse(ctx context.Context, body StopStreamJSONRequestBody, reqEditors ...RequestEditorFn) (*StopStreamResponse, error) { + rsp, err := c.StopStream(ctx, body, reqEditors...) + if err != nil { + return nil, err + } + return ParseStopStreamResponse(rsp) +} + +// ParsePatchChromiumFlagsResponse parses an HTTP response from a PatchChromiumFlagsWithResponse call +func ParsePatchChromiumFlagsResponse(rsp *http.Response) (*PatchChromiumFlagsResponse, error) { + bodyBytes, err := io.ReadAll(rsp.Body) + defer func() { _ = rsp.Body.Close() }() + if err != nil { + return nil, err + } + + response := &PatchChromiumFlagsResponse{ + Body: bodyBytes, + HTTPResponse: rsp, + } + + switch { + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 400: + var dest BadRequestError + if err := json.Unmarshal(bodyBytes, &dest); err != nil { return nil, err } response.JSON400 = &dest @@ -6523,6 +6902,126 @@ func ParseStopRecordingResponse(rsp *http.Response) (*StopRecordingResponse, err return response, nil } +// ParseListStreamsResponse parses an HTTP response from a ListStreamsWithResponse call +func ParseListStreamsResponse(rsp *http.Response) (*ListStreamsResponse, error) { + bodyBytes, err := io.ReadAll(rsp.Body) + defer func() { _ = rsp.Body.Close() }() + if err != nil { + return nil, err + } + + response := &ListStreamsResponse{ + Body: bodyBytes, + HTTPResponse: rsp, + } + + switch { + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 200: + var dest []StreamInfo + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON200 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 500: + var dest InternalError + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON500 = &dest + + } + + return response, nil +} + +// ParseStartStreamResponse parses an HTTP response from a StartStreamWithResponse call +func ParseStartStreamResponse(rsp *http.Response) (*StartStreamResponse, error) { + bodyBytes, err := io.ReadAll(rsp.Body) + defer func() { _ = rsp.Body.Close() }() + if err != nil { + return nil, err + } + + response := &StartStreamResponse{ + Body: bodyBytes, + HTTPResponse: rsp, + } + + switch { + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 201: + var dest StreamInfo + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON201 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 400: + var dest BadRequestError + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON400 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 409: + var dest ConflictError + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON409 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 500: + var dest InternalError + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON500 = &dest + + } + + return response, nil +} + +// ParseStopStreamResponse parses an HTTP response from a StopStreamWithResponse call +func ParseStopStreamResponse(rsp *http.Response) (*StopStreamResponse, error) { + bodyBytes, err := io.ReadAll(rsp.Body) + defer func() { _ = rsp.Body.Close() }() + if err != nil { + return nil, err + } + + response := &StopStreamResponse{ + Body: bodyBytes, + HTTPResponse: rsp, + } + + switch { + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 400: + var dest BadRequestError + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON400 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 404: + var dest NotFoundError + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON404 = &dest + + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 500: + var dest InternalError + if err := json.Unmarshal(bodyBytes, &dest); err != nil { + return nil, err + } + response.JSON500 = &dest + + } + + return response, nil +} + // ServerInterface represents all server handlers. type ServerInterface interface { // Update Chromium launch flags and restart @@ -6642,6 +7141,15 @@ type ServerInterface interface { // Stop the recording // (POST /recording/stop) StopRecording(w http.ResponseWriter, r *http.Request) + // List active streams + // (GET /stream/list) + ListStreams(w http.ResponseWriter, r *http.Request) + // Start live streaming to an internal RTMP(S) server or a remote RTMP(S) endpoint. + // (POST /stream/start) + StartStream(w http.ResponseWriter, r *http.Request) + // Stop a live stream + // (POST /stream/stop) + StopStream(w http.ResponseWriter, r *http.Request) } // Unimplemented server implementation that returns http.StatusNotImplemented for each endpoint. @@ -6882,6 +7390,24 @@ func (_ Unimplemented) StopRecording(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNotImplemented) } +// List active streams +// (GET /stream/list) +func (_ Unimplemented) ListStreams(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotImplemented) +} + +// Start live streaming to an internal RTMP(S) server or a remote RTMP(S) endpoint. +// (POST /stream/start) +func (_ Unimplemented) StartStream(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotImplemented) +} + +// Stop a live stream +// (POST /stream/stop) +func (_ Unimplemented) StopStream(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotImplemented) +} + // ServerInterfaceWrapper converts contexts to parameters. type ServerInterfaceWrapper struct { Handler ServerInterface @@ -7668,6 +8194,48 @@ func (siw *ServerInterfaceWrapper) StopRecording(w http.ResponseWriter, r *http. handler.ServeHTTP(w, r) } +// ListStreams operation middleware +func (siw *ServerInterfaceWrapper) ListStreams(w http.ResponseWriter, r *http.Request) { + + handler := http.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + siw.Handler.ListStreams(w, r) + })) + + for _, middleware := range siw.HandlerMiddlewares { + handler = middleware(handler) + } + + handler.ServeHTTP(w, r) +} + +// StartStream operation middleware +func (siw *ServerInterfaceWrapper) StartStream(w http.ResponseWriter, r *http.Request) { + + handler := http.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + siw.Handler.StartStream(w, r) + })) + + for _, middleware := range siw.HandlerMiddlewares { + handler = middleware(handler) + } + + handler.ServeHTTP(w, r) +} + +// StopStream operation middleware +func (siw *ServerInterfaceWrapper) StopStream(w http.ResponseWriter, r *http.Request) { + + handler := http.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + siw.Handler.StopStream(w, r) + })) + + for _, middleware := range siw.HandlerMiddlewares { + handler = middleware(handler) + } + + handler.ServeHTTP(w, r) +} + type UnescapedCookieParamError struct { ParamName string Err error @@ -7898,6 +8466,15 @@ func HandlerWithOptions(si ServerInterface, options ChiServerOptions) http.Handl r.Group(func(r chi.Router) { r.Post(options.BaseURL+"/recording/stop", wrapper.StopRecording) }) + r.Group(func(r chi.Router) { + r.Get(options.BaseURL+"/stream/list", wrapper.ListStreams) + }) + r.Group(func(r chi.Router) { + r.Post(options.BaseURL+"/stream/start", wrapper.StartStream) + }) + r.Group(func(r chi.Router) { + r.Post(options.BaseURL+"/stream/stop", wrapper.StopStream) + }) return r } @@ -9607,6 +10184,118 @@ func (response StopRecording500JSONResponse) VisitStopRecordingResponse(w http.R return json.NewEncoder(w).Encode(response) } +type ListStreamsRequestObject struct { +} + +type ListStreamsResponseObject interface { + VisitListStreamsResponse(w http.ResponseWriter) error +} + +type ListStreams200JSONResponse []StreamInfo + +func (response ListStreams200JSONResponse) VisitListStreamsResponse(w http.ResponseWriter) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(200) + + return json.NewEncoder(w).Encode(response) +} + +type ListStreams500JSONResponse struct{ InternalErrorJSONResponse } + +func (response ListStreams500JSONResponse) VisitListStreamsResponse(w http.ResponseWriter) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(500) + + return json.NewEncoder(w).Encode(response) +} + +type StartStreamRequestObject struct { + Body *StartStreamJSONRequestBody +} + +type StartStreamResponseObject interface { + VisitStartStreamResponse(w http.ResponseWriter) error +} + +type StartStream201JSONResponse StreamInfo + +func (response StartStream201JSONResponse) VisitStartStreamResponse(w http.ResponseWriter) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(201) + + return json.NewEncoder(w).Encode(response) +} + +type StartStream400JSONResponse struct{ BadRequestErrorJSONResponse } + +func (response StartStream400JSONResponse) VisitStartStreamResponse(w http.ResponseWriter) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(400) + + return json.NewEncoder(w).Encode(response) +} + +type StartStream409JSONResponse struct{ ConflictErrorJSONResponse } + +func (response StartStream409JSONResponse) VisitStartStreamResponse(w http.ResponseWriter) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(409) + + return json.NewEncoder(w).Encode(response) +} + +type StartStream500JSONResponse struct{ InternalErrorJSONResponse } + +func (response StartStream500JSONResponse) VisitStartStreamResponse(w http.ResponseWriter) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(500) + + return json.NewEncoder(w).Encode(response) +} + +type StopStreamRequestObject struct { + Body *StopStreamJSONRequestBody +} + +type StopStreamResponseObject interface { + VisitStopStreamResponse(w http.ResponseWriter) error +} + +type StopStream200Response struct { +} + +func (response StopStream200Response) VisitStopStreamResponse(w http.ResponseWriter) error { + w.WriteHeader(200) + return nil +} + +type StopStream400JSONResponse struct{ BadRequestErrorJSONResponse } + +func (response StopStream400JSONResponse) VisitStopStreamResponse(w http.ResponseWriter) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(400) + + return json.NewEncoder(w).Encode(response) +} + +type StopStream404JSONResponse struct{ NotFoundErrorJSONResponse } + +func (response StopStream404JSONResponse) VisitStopStreamResponse(w http.ResponseWriter) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(404) + + return json.NewEncoder(w).Encode(response) +} + +type StopStream500JSONResponse struct{ InternalErrorJSONResponse } + +func (response StopStream500JSONResponse) VisitStopStreamResponse(w http.ResponseWriter) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(500) + + return json.NewEncoder(w).Encode(response) +} + // StrictServerInterface represents all server handlers. type StrictServerInterface interface { // Update Chromium launch flags and restart @@ -9726,6 +10415,15 @@ type StrictServerInterface interface { // Stop the recording // (POST /recording/stop) StopRecording(ctx context.Context, request StopRecordingRequestObject) (StopRecordingResponseObject, error) + // List active streams + // (GET /stream/list) + ListStreams(ctx context.Context, request ListStreamsRequestObject) (ListStreamsResponseObject, error) + // Start live streaming to an internal RTMP(S) server or a remote RTMP(S) endpoint. + // (POST /stream/start) + StartStream(ctx context.Context, request StartStreamRequestObject) (StartStreamResponseObject, error) + // Stop a live stream + // (POST /stream/stop) + StopStream(ctx context.Context, request StopStreamRequestObject) (StopStreamResponseObject, error) } type StrictHandlerFunc = strictnethttp.StrictHTTPHandlerFunc @@ -10910,126 +11608,221 @@ func (sh *strictHandler) StopRecording(w http.ResponseWriter, r *http.Request) { } } +// ListStreams operation middleware +func (sh *strictHandler) ListStreams(w http.ResponseWriter, r *http.Request) { + var request ListStreamsRequestObject + + handler := func(ctx context.Context, w http.ResponseWriter, r *http.Request, request interface{}) (interface{}, error) { + return sh.ssi.ListStreams(ctx, request.(ListStreamsRequestObject)) + } + for _, middleware := range sh.middlewares { + handler = middleware(handler, "ListStreams") + } + + response, err := handler(r.Context(), w, r, request) + + if err != nil { + sh.options.ResponseErrorHandlerFunc(w, r, err) + } else if validResponse, ok := response.(ListStreamsResponseObject); ok { + if err := validResponse.VisitListStreamsResponse(w); err != nil { + sh.options.ResponseErrorHandlerFunc(w, r, err) + } + } else if response != nil { + sh.options.ResponseErrorHandlerFunc(w, r, fmt.Errorf("unexpected response type: %T", response)) + } +} + +// StartStream operation middleware +func (sh *strictHandler) StartStream(w http.ResponseWriter, r *http.Request) { + var request StartStreamRequestObject + + var body StartStreamJSONRequestBody + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + sh.options.RequestErrorHandlerFunc(w, r, fmt.Errorf("can't decode JSON body: %w", err)) + return + } + request.Body = &body + + handler := func(ctx context.Context, w http.ResponseWriter, r *http.Request, request interface{}) (interface{}, error) { + return sh.ssi.StartStream(ctx, request.(StartStreamRequestObject)) + } + for _, middleware := range sh.middlewares { + handler = middleware(handler, "StartStream") + } + + response, err := handler(r.Context(), w, r, request) + + if err != nil { + sh.options.ResponseErrorHandlerFunc(w, r, err) + } else if validResponse, ok := response.(StartStreamResponseObject); ok { + if err := validResponse.VisitStartStreamResponse(w); err != nil { + sh.options.ResponseErrorHandlerFunc(w, r, err) + } + } else if response != nil { + sh.options.ResponseErrorHandlerFunc(w, r, fmt.Errorf("unexpected response type: %T", response)) + } +} + +// StopStream operation middleware +func (sh *strictHandler) StopStream(w http.ResponseWriter, r *http.Request) { + var request StopStreamRequestObject + + var body StopStreamJSONRequestBody + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + sh.options.RequestErrorHandlerFunc(w, r, fmt.Errorf("can't decode JSON body: %w", err)) + return + } + request.Body = &body + + handler := func(ctx context.Context, w http.ResponseWriter, r *http.Request, request interface{}) (interface{}, error) { + return sh.ssi.StopStream(ctx, request.(StopStreamRequestObject)) + } + for _, middleware := range sh.middlewares { + handler = middleware(handler, "StopStream") + } + + response, err := handler(r.Context(), w, r, request) + + if err != nil { + sh.options.ResponseErrorHandlerFunc(w, r, err) + } else if validResponse, ok := response.(StopStreamResponseObject); ok { + if err := validResponse.VisitStopStreamResponse(w); err != nil { + sh.options.ResponseErrorHandlerFunc(w, r, err) + } + } else if response != nil { + sh.options.ResponseErrorHandlerFunc(w, r, fmt.Errorf("unexpected response type: %T", response)) + } +} + // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/+x9aXMbN7boX0H1mypbb7jIW6ai+eTYcqJnO3ZZzvPchL4cqPuQxKgb6ABoUrTL//3W", - "OUAvZKO5SbKt1K1KxRTZDRzg7AsOPkexynIlQVoTnXyONJhcSQP0x088eQd/FmDsqdZK41exkhakxY88", - "z1MRcyuUHP7HKInfmXgGGcdPf9MwiU6i/zOsxx+6X83Qjfbly5delICJtchxkOgEJ2R+xuhLL3qm5CQV", - "8deavZwOpz6TFrTk6VeaupyOnYOeg2b+wV70q7IvVCGTrwTHr8oymi/C3/zjONqzVMSXr1VhoMQPApAk", - "Al/k6VutctBWIN1MeGqgF+WNrz5HF4W1DsLVCWlI5n5lVjGBG8FjyxbCzqJeBLLIopM/ohQmNupFWkxn", - "+G8mkiSFqBdd8Pgy6kUTpRdcJ9HHXmSXOUQnkbFayCluYYygj93X69O/X+bA1ITRM4zH9HU9a6IW+GeR", - "R36Y4AQzlSbjS1ia0PISMRGgGf6M68NnWVLgq8zOwE0c9SJhIaP3W6P7L7jWfIl/yyIb01t+ugkvUhud", - "PGihssguQOPirMiAJteQA7cr8/rRcdunQBR31V7Fv1islE6E5JZ2qxqA5coIv2ftkZbtkf7rkJG+9CIN", - "fxZCQ4JIuYpw6BoR6uI/4Jj2mQZu4bnQEFull4dRaqaSAKG8yd3rLClHZ/ggu69iy1Pm0NVjMJgO2D+e", - "PDkasOcOM7Tx/3jyZBD1opxbZPPoJPrvP477//j4+VHv8Ze/RQGSyrmdtYF4emFUWlhoAIEP4gwxLX1t", - "kuHg/7YHX9tNmim0mc8hBQtvuZ0dto9bllACntA0Nw/4O4iJ0KaHQS+SNuxnCUjr2NmTri4naayEPU3z", - "GZdFBlrETGk2W+YzkOv45/1PT/u/H/d/7H/8+9+Ci20vTJg85UtUU2K653pmQJKztaZnhdYgLUvc2Mw9", - "x4RkubiC1AQZW8NEg5mNNbewfUj/NMOnceBfPrH7GV+yC2CySFMmJkwqyxKwEFt+kcJRcNKFSEIEtT4b", - "PbYR/uDWaj79Ctot0XzaodkqjeZUXEjPJJDy5YrQP14X+s/xEVx9JtJUGIiVTAy7ALsAkCUgqNUYlwkz", - "lmvrqTdTc2A8VV4vIXcNCCwpMgT0OIST62g+3Iu9FF9YoLzRCWhIWCqMRbb846rHlh+baibnQptqiXam", - "VTGdscVMpA6IqZDTAXtdGMvQuOJCMm5ZCtxY9pDlSkhrBk1I10FubEjGr87crw9p7+o/1lez8UdjIR8T", - "usfZqpp/sifKNaTcijkwHNKsrZrdR8ZDZAgprEDthoMdbUc8jTbOQY8NTDNvj9a2yHG3MVIBRNhwUOWg", - "mR8HF1LRH3vtgGAPViB6sNVE6NQNlRm9pvPBGD6FABmuDVw+GBz7CuLCwtuULxfExLvKktWt8m8hwYIb", - "kdVDshitk3XxEwdNFrRtz+nv4f/jc+4+0gCNsQfsPZpg+OWMG8bjGAwxy72cT+Fej90jh+PK3uuRyLh3", - "odXCgL7H5lwLlNZmMJKnVzzLUzhho4gvuLAMXx5MlVX3782szc3JcAjumUGssntH/2QabKElazxuhU3h", - "/tE/R9FIhmwiNGNVYccG4hVq+6FFba/5FZGNW6NA2Ssy0j2ePSrrjAnDfjgm6nLvRCePjo/3ojXa/B3p", - "wRDAe5IDvoScs0YF9epa9AAlla8ORcTPPAmj2q33Z8JFCklo13UF9Bp1zYDNeVqAxyQk7GLp7Hmyi8WE", - "cbk8csIiAR2A59xymXCdMIKXTbTKaIDmwlrwGJuowm4YTBU2L+yuoxVE8O3hPszAzkDXC/L8kjD/yqRI", - "02U95IVSKXDZoo5yghCBvBApnMmJassjYcaJ0JuhIgNaGMZrb2AQgKeHDs0Y6b893CtUcRkpahdGID4Z", - "OH864zY6iRJuoU9vB3Yv7CrhspxzdCGsYffRJ+qxUZToxZXu43+jCO3iUdTXi77u43+j6GgQmkHyENw/", - "cQMMfyrt8AlOqXRwJ3Z2qkqTp00k4hOML5YWAnRyLj6RYKGfB+yYTRpgCDCD7f4srdFDtzJZr6SDBg79", - "pneR0/nSWMhO55VGXkeMoQdYPONyCgzwwUFLfuxCfnwygRj5YWc6PBSX1VSHInU/KgkHimhLGf42aNju", - "z96dPn1/GvWiD+/O6N/np69O6cO701+fvj4NmPFryKdfe90GyythLOEtsEa0FnFt7R0T0jEwsjRIWxJi", - "Zbhuig1WUilggr9S0w7aespSNaW5lrXobQQo20TWsLnWpJKaVkoKLY9BlzFgLM/ygGZCXY/T1xAtuGG5", - "VkkROyraRbx1WH7NqUMIe63mcA1P8joeFVrUe3lU20J9tc8ELC60UZpZdVCob9eRdg714TYfHptKwNjx", - "thgbGIvAIw+VqmFbiKoXGR1vG9ioQsew85jrBkU5Qa+xitAOvbl853M5e1qcP4Ok0NWbl6zMBrW5V12u", - "2OBWF9DOaSTI/GBKk2mw3VxSl8G1vOU2nvnw14F81RH/et4d96p8gIePj/ePgj3vjH4N2NmEqUxYC0mP", - "FQYMscVMTGfo9/E5Fyk6Vu4VtCdcqJHIx4tSr4B+OO49Ou49fNJ7cPwxDCJt7VgkKWzH14TR1whyYcAl", - "DNAcYYsZSJai0z4XsEBVUwU+hxpomWgAxOjXh3W/Boo1jeOZVplA2D93z06Psmf+UcYnFnRj/aXxgk6s", - "NIUGJizjCc9drF3CgiHUKz4e0QTt5Qx4MinSHs1WfZN2kGdn2PF5Z7ixIptHD493Cz6+1WDMSziQspNC", - "cwfUxsCgf6rSG0hTpEgoGrgWPmqSKKL7uOee5RqY5XnutOjBscEqmZJtU2mXsGQ5bg8zuDkyhsFeGi48", - "/ysfK8TRzTK7UClNThMN2CmPZwynYGamijRhF8B441lmijxX2jqP9ypRVql0JO8bAPavBw9oLcuMJTCh", - "qJqS5mjAfITEMCHjtEiAjaJ35DePIvSNzmdiYt3HZ1an7tPT1H/14skoGoxcvNAFyIRxAc+YAOSpUQhl", - "rLILr7KMz0W58f5uS5eL/qLZ/v6eX9Cwe2zomrSm3Q3Ka61Q4J9eQXxjQTCOy8sobL2UKEekKky6bKsm", - "rqerMdM/PrYz/W4krqdFBuvx3a1Uxc1YK7Ua8wwvo/DRTLcfFPpn+CrLtZiLFKbQIXa4GRcGAj7Y+pDc", - "OHLAp3EoWaSkPUoZ306Hu7UHXBzaaNI8SjMzgzStthx1QSGDlni8CIz1QelL5OHaJbnPmy7ZkR/Rx1fc", - "JEKGFrDd5gI57yavz6E8isfZ51b9w6mcC60kRaKrACfCasBWqthvfWM3aspvBSn3i0t2I7A7/OjQuZUN", - "rxV75E2mqxBWraPNhKVWqvIXbUrD9ZePtRRQ0MuAK2HH4WC3XyrDRyhgFx7BhSLHFz88DkcifnjcB4mv", - "J8w9yi6KycRxVkcoctfBVGG7B/vSjb2XIk0PE6LnYopKlqjX8fAa9a6izNDjK0Iten/67nW0edxmPMQ/", - "/vLs1auoF539+j7qRb/89nZ7GMTPvYGIz3O+kM19SNM3k+jkj83BjIAi+vKxNegBrHHWiLDwC8QtZwZH", - "g6R7h/NQVcGb80qWnz0PU63/fRx63RWM9bnBLYSEibpIISCvqsBHUYgkTNMcLZsxt+HACgU+nEPQ1EL+", - "tT1iK514ttwWZk9slEUAhl52AqsTC3FejPM4sL5TY0XG0a579vY3VlAAKgcdg7R82hQokrKZWyTSaSmJ", - "mJis7NWMOzHltmubuO9FGWRd0ecaYvTUEPMsgwzVrYO+Ckx3CMOg5/q2xqldiXbqQkpEn1s2JGG27kZs", - "IuRhguw5txzFzUILF0taIz2X+BEyLwLB7IRbvpOMTpqzDLYGYqpxP25d87VUL4LjazQMDtdeIT5hQXYR", - "SZ17pweYf3wQ7eqd+qVo4HVmYR81dH7Kcr5MFUcyRScLJZScVhj0GTulWSomEC/j1GcmzHWxWUWia2LB", - "VQS1OYQD269WQWqlAJAVgtU6O4mGSpC6wYVhI3pxFHWxLMIf0AIupuh+LvMdtAXxrJCXTYB9ArVKy+7G", - "xK6cDnQ4X4merpntpjbqmrnyrS6lsdWVcfqw/bWpiv8avzecqz2UXA2tf+lAYNeEBynfJpwhIXIeawBp", - "Zsq+g6mP8NxAyPMXF+qsShin3v7eUPDXEQT7QMGvfQbasbjYjXUPPa+8n8IEuUVL0NcpM95jzGAWotyF", - "Xrmx21B2SDBPV4jeZNW2CCPIsuexVru7DusJktTy8dXmmOIvSotPSlL9M83FeKYKaQfsLRVzz8F/bxiV", - "rfSYhClf+R7xEJZ0DoIt5Y7/HyGOd5g/UQsZmL7Iw5NfJwvnxr7RPBy3bDETMdVL56BR/qxOtT9T7D3k", - "zpm5c7DPKMN3YKJGJAnILQU5LoNYh2f9S1vTS/65DrBfiBTegs6EMUJJcxj8U62KQFL6V1gw+snXOmj2", - "84q3t29RTeDYwQ+PHx/td8pALWQoxIiw0k8UVCzh/a0D3l0KMBYzZciXKvfWZRJc0JqyOcmhJwA2FMSc", - "o8Z+YT5wG9/oGYbqgAl5Czj6IFw5h3Qq5rA9TlwRtx+PVe+myx2ypp05YNqBa56EmGieQTjH+a425cqH", - "UP9PciTQOWgtEjDMuCNtfgeOmrWWD7eUWvaC5zCq9FEg1tGw14BI7YbOYxDQZRLtTJ67MGV3iLeGoxni", - "LKuzN+/Oxg3J+BUVeolPcCZf/9QNAVUFGV+e9vqnHTHy4Ph4tf51xxzmuVX5dQlN6RhwnO38cpZlkAhu", - "IV0yY1VOiRVVWDbVPIZJkTIzKywq/QF7PxOGZZSJJ5daSEolaV3kFhI2Fwko2qxwImafg0COgxGgWzwF", - "9H6Zw3u4sgcbdtc7Q4Jmj9XqEszWDLCFq5CDBVeU17N09NJ5vzNFucwsL2zTIO+qmcNx2+IOHxPePaVa", - "8ugkeglaQsrOMj4Fw56+PYt60Ry0caAcDx4MjkkR5iB5LqKT6NHgePDIF+TRhg3LkoXhJOXTUivEAbXw", - "GvQUqPyAnnTJPrgShoIdSoLpsSJHn5GtDRooepgLzkyRg54Lo3TSG0kuE0bF8oW0IqVtq55+DvP3SqWG", - "jaJUGAtSyOkoogK4VEhgwjB1QVyP5tJE6bJqmwSlr86hTDDSipNxSXTi6m7KWV7Q+h0qwNifVLLc60Dy", - "GreXu7kWyS2X5PbQKpbRtvoq4j9GUb9/KZS5dJnxfj8RBt3u/jQvRtHHo8OT2Q6gMFnVz6Fz7+pZ6mPy", - "D4+PAwYbwe/wndDRiWppHtnrteRfetFjN1LI96tmHK6fyv/Si57s8t7qkXY6311kGdfL6CT6zdFlBWLK", - "CxnPPBIQeA8zvVZTb5Gniid9uLIgya7rc5n0y2cR58oERMBv9BqyBErGDMmxGoJ9EjnjOp6JOTIMXFk6", - "Dm5nkLFCoogdzlQGw0vi7GE99XBUHB8/itFcpU/QG0kDlmnkl6w5g1uVkAewISu5cCS/Ihu6/TqtlvpU", - "Ju/8Hm9ix6xIrci5tkN07/oJt3wTR9Zb2V0xUz+DrOnQT3tCxV9oJDb4b3X4cPn3C5UiTsnJQFc05TH4", - "YxsluvbD+pqCfdr/nfc/Hfd/HIz7Hz8/6D188iTsC30S+RitgDaIv9cEWR4QRHxxhCzn8SU0WLuG+n5W", - "GFtV+2RcigkYO0CxeNSMIV4IiSy4TedV4Pk6+pC1v1G8NbB7mIx7EIpjV9TgSAGSXkDMOa6pmEMYpoEn", - "31rgtURQhc0Gkd/nBgWSOWoKwWqJXhp6u2XoGk1kqnAlt6XsW+XlupHGNVTppuBgu1PHoSrMnV52TTHK", - "IBEk3xRt5yIrUopfMdrnlcYdYWtyDUcUOupGTxW9uiXstKJjuyPnRuZvVIWHOuC4wNpcGHEhUmGXlQHz", - "3Vgqv4jE16epRSMYuIbmRPNpmxPX89xUPycTF8ItKcodku8x5aMM6dKZ3ROlGcdptXXHpHs4vVw/OD8V", - "c3AHBrzISIEbGIzk+5Uze1uOq4esgKpHwS2RZqsHwqFyAwf6TuQFgeLOxpAsIzRxwsMaxSAat8nu6mzP", - "LWGgdXboepLbh8lxZd8WC6/Loz9ZEy5fx2FyiMVEQNJgArOLKKdy7fElLLewuD9fUc9DmRtiZ1lxeRWm", - "G7CX+HOdW2gUiY9kqPR7wF6QaEDANMzQdJhDxeCN13vMAIwkAhOuE2fcsvK4fDwVdjDRAAmYS6vygdLT", - "4RX+L9fKquHVgwfuQ55yIYdusAQmg5kTNT7GN1NSadMM5fRTmEO9XsMK4yO4sd8KkwLkxtvdDgsqCYYH", - "/MGFW2KH9XMRh3IDIZSo5XtSZE79NA1QossdCN9U6d9uUfWeX0KdJr4tY6aV7f7icbTRehEZn8Iwd9UZ", - "9UzbXaKWvVIDwGjQb4rQZzy3hUbTtEZQGR/egk6Vpt1CzOXx2dznutMlGhZDhbxd5t/xO9swPxqSdNWQ", - "ofYvaO4gy6+cvvEWykoi3aXphGSpmlKa3Yr40riuMa7Iw/lFDQpiFzDjc4EkzZdszvXyn8wW5DD7nk8l", - "Aw9G8gPaTxfKzhpLoQHLtTKqAnBg5FrNBXmYthZvNLMT8Jk/ImQFLfV+NQZZafUERy6UesFtPAPDFjOA", - "1JebeVH4by/YvXPR7/u+eb+yfp8sP3bMXNjB2You8PDvkIQ8L9Ppt8R+jQKPQ6WjJ6/vxL9zwNS2gkMP", - "t2i0+Q6Bu4jI8hB/h3D0KZRbwst6huZQzLhMyTL/nrQWNcy0CFg3FnwrtpVUSSCv4I9Q3pbxEDgy/JV9", - "7dV+fQH19Zt3rsvedTE9WZ7nvAaaHx//uP291e66N5hF6FgOksbEDF2nynF1MozIpAhFyla7ed5WuCzc", - "M/TQkGhdG+LW+R2xrlsp45SirLe/xItrX7kDXlx/zdvGS7v96MHhiAolbonJ9Tjr8fb3Vps230gcgyBv", - "9thZx1uZu9iAshcuf/B9Y4sK3f4CiCJ8VDhSC5kqniB3jT8JqnCZgg1VVNlCS8M4+/3srSvhaaSc3GFZ", - "QpcpPYs6rLHS1mgN/37+50L/LnJKkWmegQVt6Ajdzm2GyzwYWtDloujsNL73ZwEkDlymryzPW6WBXjP9", - "uK3c7+Neytnv67UcStz1co1VaQ8RVnOD7yJdemQ1RQjjJaH5JVf0ioQ3LmtpPKGuUlTVJWpXWtraiOt7", - "IKH9hF7dKatNSCTGGm247iDJ/Ax2pZFYecy1hb2KbFJhLCki00k3dT+zw4TQ3aSUetUBUqntk9TVit1B", - "WqH6EMK8q69s0wY1J+uyT8puXreYV7kJ24TyGLU9fwfxRCug/k1UcbOJmTXwpLIqg7z8DnjibcrdWJkm", - "K00JHP974WYVW7D9+nDltWwIEv24uhtz/b4RsSB+axuUbggqicOAE/TjxpmOTu5uH625veKKjjM8h3J8", - "Y6iyFOIOIvIcbKBJaAN1QzruY2YirzDsCrq6sxJP01Qtyrovql8UcuqmcHWHKXiF4PO8GjLlZYBrQjvo", - "qHMszYMbK2ysLJKOysRDukE22hF4g3a3/pClQN23/s/X/m1u+bi5vpl24cZq/whLVdnfXRd1gXLAibfX", - "muxQ+u4by5o5lTATv7kmSa6CWVhTO++t2odQt9EQczj3/cZYY1/ST5pH3xq12ZXTbNVufNAst71GLewm", - "fjiQsH8XeU3WDQT+ZYicN0vs10i0ovdFmbjpKJNsHK28LWUeOL25O04PPJVCyw72WfpNij8LCB05rHli", - "4bdj6ymuttFIy2Q3fS7kGxGaW0wz0oR75Q76mlUSG34ut/yLP54G7qTpOr2pvCa3NW+DPAjvMngHosLj", - "Jidiu88QaDNTIkrl+d1H1DmdncQV0YmGgBe4jqShq5To9Aldm6AX5tQ99hVxte7fWbiyDtqgY7ctsNe8", - "SSFUeXR+2ui2Uxu1vpKEuoTwhFb9OfpX//z8tP/MwdZ/H7xg4DUkgvtDkROGw1P7Hl+Ycn9diB1Fzd0p", - "e/u0RF2guc+Xu0imtNGtXfY12U7sVhSLVvnmdNgHfGSXyMXzhunDW1GM24te9DqPvE+qPhCdLSBWbp78", - "4fHjLjAzd5VUEKyNjSMc8+2i8a8ZVznQLSk7nN15NUr+JWrOMnNfJxVTNTXDemPDsXY19W3bOuTwGkG4", - "iwk2Um4paMrLaqqjkcE2YuFpJipN1WKF8tb60rfbXayjWcl0WVUSMjEpL1UQhnnQNjBmt1bZZ57G2sOz", - "1Q+Mffu56JtptOrilq2qDAnru9ZeIc2AQDM1B41TOwbJq9vShr6DeLfjflq2GNcXwmqul6271iip4S5y", - "qJs3+5vxGJ9yIY3zg/31eMz3yhxJJVmqYp7OlLEnPz58+PBmbtx7766E8D0i124po24jpr6Yzd+pWN3m", - "EShUbV1W98xph9vw7DovSvzK9XldF/QFr4bvvALuW5Z0nbYuiBzWtz46iggQp2cQJ5OIO7od/UYD5Vs7", - "5dFu0fx16aDdJj1AAXXPcn8j4veA9447EVYRTG2pt2KYWmHfLopXWnh/Gxw3G36HVKHr4P2d4ZZvQO7n", - "ujf4l+GlWD1HEkT0S0EHErb75Y2u45tMwi0txXd3Fg5CaLM7/nd1lPrNyzuZKERRUrX3L83WboozVbf2", - "oAey2tP9axPdLYsSt6iQFPG/3MmKr0Zbdbe8btQnYge1Qk/9ZcTNShP7b6TCGj3lA8T3U7PH+50NetTC", - "xzW930yHqrDbYiH15qnCbgyKfCN5dA3nPtChf6ubv9Z7H82M9eb7/xvDvoUYdoOqVWHXYhb1rYp1Hiws", - "Xd0xg7p9/G2e6mi19ew+5N3VHvYvcJ4j1zAXZICXzT6bvUNb+PPl9p3yqKzHb6JwYyqiygBUrUbrVPSA", - "0Unq6k7RxgHp6npRH2KtXu/KCpD4CucEtjUr3S7kaMOGWf742kWWjdbDLo+zIqqqX/sv/B0T/acb73pQ", - "k/oqjvYFFQP2c8E1lxYg8V2r37149ujRox8Hm8PJK6Ccu+T+QZCU9ysdCAiC8vD44SYWFSiTRJrSBQ5a", - "TTUY02M5dS9iVi9dIIml3HVobWz3O7B62X86saFe4ufFdOpOz1ATpbX77hpdEPXSMUG9iE19kO+iBqiO", - "4LjT7YZ4EaTdTaKkwumBzlMV5Q0trnTyGjboTrfEr9wH0y49bPFr2UBSV1De2LEDnqbNYVe3rdWJNFDH", - "dNtqNNyFPahFH2xi0fIGmrt3MJx2oGqMUsu1AXsj0yWVXdayLgfNzp6zmEvXLmQqjAUNiesCgRJk0May", - "yjchudGb/NZwHOh/vr+h5OuKvm0PDqvyVfVDC/mfAAAA///el5kiKpgAAA==", + "H4sIAAAAAAAC/+x9e3PbtvbgV8Fw70zsrR7Oq3fq+8dOmjitN0mTsZzt3VZZ/SDySMQ1CbAAKFnJ+Lvv", + "4AB8iAT1sp3End9Mp5ElEjjAeT9w8CUIRZoJDlyr4PRLIEFlgivAP36m0QX8lYPSZ1IKab4KBdfAtflI", + "syxhIdVM8OF/lODmOxXGkFLz6R8SZsFp8D+G1fhD+6sa2tFubm56QQQqlCwzgwSnZkLiZgxuesFLwWcJ", + "C7/W7MV0ZupzrkFymnylqYvpyAjkAiRxD/aC34R+LXIefSU4fhOa4HyB+c09bkZ7mbDw6p3IFRT4MQBE", + "ETMv0uSDFBlIzQzdzGiioBdkta++BNNcawvh+oQ4JLG/Ei0IMxtBQ02WTMdBLwCep8Hpn0ECMx30Asnm", + "sfk3ZVGUQNALpjS8CnrBTMgllVHwqRfoVQbBaaC0ZHxutjA0oE/s183pL1cZEDEj+AyhIX5dzRqJpfkz", + "zwI3jHeCWCTR5ApWyre8iM0YSGJ+Nuszz5IoN68SHYOdOOgFTEOK77dGd19QKenK/M3zdIJvuelmNE90", + "cPq4hco8nYI0i9MsBZxcQgZUr83rRjfbPgekuOv2Kv5NQiFkxDjVuFvlACQTirk9a4+0ao/0fw8Z6aYX", + "SPgrZxIig5TrwAxdIUJM/wOWaV9KoBpeMQmhFnJ1GKWmIvIQyvvMvk6iYnRiHiRHItQ0IRZdPQKD+YD8", + "8/nz4wF5ZTGDG//P588HQS/IqDZsHpwG/+/Pk/4/P3152nt284/AQ1IZ1XEbiBdTJZJcQw0I86CZIcSl", + "NyYZDv5ne/DGbuJMvs18BQlo+EB1fNg+bllCAXiE09w94BcQIqHND4OeRW3YzyPg2rKzI11ZTFJbCXmR", + "ZDHleQqShURIEq+yGHgT/7T/+UX/j5P+T/1PP/zDu9j2wpjKEroyaorN91xPDCg5W2t6mUsJXJPIjk3s", + "c4RxkrFrSJSXsSXMJKh4IqmG7UO6p4l52gz862dylNIVmQLheZIQNiNcaBKBhlDTaQLH3kmXLPIRVHM2", + "fGwj/N6tlXT+FbRbJOm8Q7OVGs2qOJ+eiSChqzWhf9IU+q/MI2b1KUsSpiAUPFJkCnoJwAtAjFYjlEdE", + "aSq1o95ULIDQRDi9ZLhrgGBxlhpAT3w4uY3mM3uxl+LzC5T3MgIJEUmY0oYt/7zukdWnuprJKJOqXKKO", + "pcjnMVnGLLFAzBmfD8i7XGlijCvKOKGaJECVJk9IJhjXalCHtAlybUNSen1uf32Ce1f90VzNxh+VhmyC", + "6J6k62r++Z4ol5BQzRZAzJCqsWpyZBjPIINxppnRbmaw4+2Ix9EmGciJgnnq7NHKFjnpNkZKgBAbFqoM", + "JHHjmIWU9EfeWSDI4zWIHm81ETp1Q2lGN3Q+KEXn4CHDxsDFg96xryHMNXxI6GqJTLyrLFnfKveWIViw", + "I5JqSBIa66QpfkKvyWJs2xH+PfzfdEHtRxygNvaAXBoTzHwZU0VoGIJCZnmU0Tk86pFH6HBc60c9FBmP", + "plIsFchHZEElM9JaDcb87JqmWQKnZBzQJWWamJcHc6HF0aNY60ydDodgnxmEIn10/C8iQeeSk9rjmukE", + "jo7/NQ7G3GcTGTNW5HqiIFyjth9b1PaOXiPZ2DUyI3tZirrHsUdpnRGmyI8nSF32neD06cnJXrSGm78j", + "PSgEeE9yMC8ZzmlQQbW6Fj1AQeXrQyHxE0fCRu1W+zOjLIHIt+uyBLpBXTGQBU1ycJiEiExX1p5Hu5jN", + "COWrYyssIpAeeEaa8ojKiCC8ZCZFigPUF9aCR+lI5HrDYCLXWa53HS1Hgm8P93sMOgZZLcjxS0TcK7M8", + "SVbVkFMhEqC8RR3FBD4Cec0SOOcz0ZZHTE0iJjdDhQY0U4RW3sDAA0/PODQTQ//t4d4aFZeiorZhBOST", + "gfWnU6qD0yCiGvr4tmf3/K6SWZZ1jqZMK3JkfKIeGQeRXF7LvvlvHBi7eBz05bIv++a/cXA88M3AqQ/u", + "n6kCYn4q7PCZmVJI707s7FQVJk+bSNhnmExXGjx0MmKfUbDgzwNyQmY1MBiowXZ/FtfooFubrFfQQQ2H", + "btO7yGm0UhrSs0WpkZuIUfgACWPK50DAPDhoyY9dyI/OZhAaftiZDg/FZTnVoUjdj0r8gSLcUmJ+G9Rs", + "95cXZy8uz4Je8PvFOf776uztGX64OPvtxbszjxnfQD7+2us2WN4ypRFvnjUaa9Gsrb1jjFsGNiwNXBeE", + "WBqum2KDpVTymOBvxbyDtl6QRMxxrlUlemsByjaR1WyuhlQS81JJGctj0GUMKE3TzKOZjK4301cQLaki", + "mRRRHloq2kW8dVh+9al9CHsnFnALT/I2HpWxqPfyqLaF+iqfCUiYSyUk0eKgUN+uI+0c6jPbfHhsKgKl", + "J9tibKC0Ad7wUKEatoWoeoGS4baBlchlCDuP2TQoigl6tVX4duj91YXL5expcf4CHENX79+QIhvU5l5x", + "tWaDa5lDO6cRGeYHVZhMg+3mkrjyruUD1WHswl8H8lVH/OtVd9yr9AGePDvZPwr2qjP6NSDnMyJSpjVE", + "PZIrUMgWMZvHxu+jC8oS41jZV4w9YUONSD5OlDoF9ONJ7+lJ78nz3uOTT34QcWsnLEpgO75mBL82IOcK", + "bMLAmCNkGQMniXHaFwyWRtWUgc+hBFymMQBC49f7db8EjDVNwliKlBnYv3TPjo+Sl+5RQmcaZG39hfFi", + "nFiucgmEaUIjmtlYO4clMVCv+XhIE7iXMdBolic9nK38Jukgz86w46vOcGNJNk+fnOwWfPwgQak3cCBl", + "R7mkFqiNgUH3VKk3DE2hIsFoYCN8VCdRg+6Tnn2WSiCaZpnVogfHBstkSrpNpV3BimRme4gym8NDGOyl", + "4fzzv3WxQjO6WqVTkeDkONGAnNEwJmYKomKRJxGZAqG1Z4nKs0xIbT3e60hoIZIxP1IA5N+PH+NaVimJ", + "YIZRNcHV8YC4CIkijIdJHgEZBxfoN48D4xuNYjbT9uNLLRP76UXivnr9fBwMxjZeaANkTNmAZ4gA0kQJ", + "A2Uo0qlTWcrloux4P+jC5cK/cLYfLukUh91jQxvSGnfXK6+lMAL/7BrCOwuCUbO8FMPWK27kCBe5SlZt", + "1UTlfD1m+uendqbfjkTlPE+hGd/dSlVUTaQQ6zFP/zJyF820+4Ghf2JeJZlkC5bAHDrEDlWTXIHHB2sO", + "SZUlB/O0GYrnCWqPQsa30+F27R4XBzcaNY+QRMWQJOWWG12Qc68lHi49Y/0u5JXh4colOaJ1l+zYjeji", + "K3YSxn0L2G5zAV90k9cXXx7F4exLq/7hjC+YFBwj0WWA08CqQJeq2G19bTcqym8FKfeLS3YjsDv8aNG5", + "lQ1vFXukdaYrEVauo82EhVYq8xdtSjPrLx5rKSCvlwHXTE/8wW63VGIewYCdfwQbipxMf3zmj0T8+KwP", + "3LweEfsomeazmeWsjlDkroOJXHcPdtONvTcsSQ4ToiM2N0oWqdfycIN611Gm8PE1oRZcnl28CzaPW4+H", + "uMffnL99G/SC898ug17w68cP28Mgbu4NRDzK6JLX9yFJ3s+C0z83BzM8iujmU2vQA1jjvBZhoVODW0qU", + "GQ2i7h3OfFUF70elLD9/5ada9/vE97otGOtTZbYQIsKqIgWPvCoDH3nOIj9NU2PZTKj2B1Yw8GEdgroW", + "cq/tEVvpxLOmOld7YqMoAlD4shVYnVgIs3yShZ71nSnNUmrsupcfPpIcA1AZyBC4pvO6QOGYzdwikc4K", + "SUTYbG2vYmrFlN2ubeK+F6SQdkWfK4iNp2YwT1JIjbq10JeB6Q5h6PVcP1Q41WvRTplzbtBnlw2Rn627", + "ERsxfpgge0U1NeJmKZmNJTVIzyZ+GM9yTzA7opruJKOj+iyDrYGYctxPW9d8K9VrwHE1GsoM116heUID", + "7yKSKveODxD3+CDY1Tt1S5FAq8zCPmpodEYyukoENWRqnCwjofi8xKDL2AlJEjaDcBUmLjOhbovNMhJd", + "EYtZhVebgz+w/XYdpFYKwLCCt1pnJ9FQClI7OFNkjC+Ogy6WNfB7tICNKdqfi3wHbkEY5/yqDrBLoJZp", + "2d2Y2JbTgfTnK42nq+Ld1EZVM1e81aU0troyVh+2v1Zl8V/t95pztYeSq6B1Lx0IbEN4oPKtw+kTIqNQ", + "AnAVC30BcxfhuYOQ56821FmWMM6d/b2h4K8jCPY7Br/2GWjH4mI71iPjeWX9BGaGWyQHeZsy4z3G9GYh", + "il3oFRu7DWWHBPNkiehNVm2LMLwsOwql2N11aCZIEk0n15tjir8KyT4LjvXPOBehqci5HpAPWMy9APe9", + "Ili20iMc5nTte4MHv6SzEGwpd/w/BuJwh/kjseSe6fPMP/ltsnB27DvNw1FNljELsV46A2nkz/pU+zPF", + "3kPunJkbgX6JGb4DEzUsioBvKcixGcQqPOte2ppecs91gP2aJfABZMqUYoKrw+CfS5F7ktK/wZLgT67W", + "QZJf1ry9fYtqPMcOfnz27Hi/UwZiyX0hRgMr/oRBxQLejx3w7lKAsYyFQl+q2FubSbBBa8zmRIeeANhQ", + "EDMyGvu1+p3q8E7PMJQHTNBbMKMP/JVzhk7ZArbHiUviduOR8t1ktUPWtDMHjDtwy5MQM0lT8Oc4LypT", + "rnjI6P9ZZgh0AVKyCBRR9kib24Hjeq3lky2llj3vOYwyfeSJddTsNUBSu6PzGAh0kUQ75yMbpuwO8VZw", + "1EOcRXX25t3ZuCEpvcZCL/YZzvm7n7shwKog5crT3v28I0Yen5ys17/umMNESrM+4p2TmR32uyIzVYJ0", + "52RWiv8iBsvc2dDAIzWsFLXh3bjy/9ChHpBx+e44sP6LIpQkwlhMF5fvPhyNjotts8dBzNto1DA9GPNx", + "ICEVGsYByXIVuzoFN0cVhVkw42+b8YbmfyOiqZyDnuQysdnFwvOsLcQO7PWbq7c9Aufy3Qezt3aejxdv", + "0XzJVdyADF031JboURfLaCBD6jRT/+t0OBz8sFuIcqRFdltpKmQIZpztSuE8TSFiVEOyIkqLDLOHItdk", + "LmkIszwhKs61sWwH5DJmiqRYboJxI8YxXyplnmmIiEGRQIngzzbuc9rNqikD0D0edTM7dBtpstuCKoK5", + "9+WYiYrYyS3XYQerSSWfGGF8DqqDiwzfzGZpBnPDHlk+TZiKbYjRO5SalOJus13uBq2EYxF2Y8rY7BK4", + "TlakCiN7a9Vh8xwOZ5jxLlyuQrCgXCgkmpCEEsv5BHiE57j2FkZZQldTGl5tEEfFIyiO2KxWv3VUwlVI", + "VsGT1fEukS1l7D+YbJ9+tD4/Cr7LtyOzP8DNFFGpsypocIN2AmP3QFlNI24OlO0QGEvtIacaDTeocA0y", + "n+F7ucrgEq71wdGP2x20vIKV0lJcgdpaJqXh2re5cI3FLxr7E9gQcSyw4CfNcu3j+WZhuRm3vTU3KBus", + "HMIDV8Fp8AYkh4Scp3QOirz4cB70ggVIZUE5GTwenKC3mAGnGQtOg6eDk8FTV7WOGzYs6vqGs4TOC9cp", + "9PhO70DOAWv08ElbEQPXTGFGQHBQPZJnhl5IY1BPZeCCUaLyDOSCKSGj3pgbMwZPlOVcswS3rXz6FSwu", + "hUiMQZAwpcEIoXGAVeIJ42gqiCnyRkSmMBOyONqERO1KWNGgMbRiHYEoOLXFqcUsr3H9FhWg9M8iWu3V", + "taNhLRS72Uh3Fkuye6gFSXFb3VGbP8dBv3/FhLqy5WP9fsSU4fP+PMvHwafjwyu+LEB+sqqeM9LEFn1W", + "vWSenJx4ohoIv8V3hDZouTSH7OaBq5te8MyO5AuQljMOm61rbnrB813eW+/7gk1Q8jSlcmX0pqXLEsSE", + "5jyMHRIM8A5mfK2i3jxLBI36cK2BY/CjT3nUL541OBfKIwI+4muGJYwqSw05lkOQzywjVIYxWxiGgWuN", + "PVN0DCnJuTHRhrFIYXiFnD2sph6O85OTpyGnKeAn6I25Am10MpYiVTPYVTF+ABuSggvH/Cuyod2vs3Kp", + "L3h04fZ4EzumeaJZRqUeGm3Vj6immziy2srustLqGcOaFv24J1ghbVzcGv+tD+8/I/VaJAanGIkzDk9C", + "Q3BnGwt07Yf1hkX7ov8H7X8+6f80mPQ/fXnce/L8uT9g+JllE+NFtEH8oyLI4hS9wRc1kGU0vIIaa1dQ", + "H6W50mVJbEo5m4HSAyMWj+v2w5Rxw4LbdF4Jnjts5rMMNoq3GnYPk3GPfcnekhosKUDU84g5yzUlczBF", + "JNDoWwu8lggqsVkj8iOqjEBSx3UhWC7RSUNntwxtN6ZU5PZcSiH71nm56jZ1C1W6KYPWbmd1qAqzLT5s", + "56gikwLRN0XbiKV5gkkegvu81t3Kb002cIT5lW70lCmee8JOK4W0O3LuZP7a0SlfmzibfVowxaYsYXpV", + "GjDfjaXyK4tcEbdY1jJmDTRHks7bnNgsBsMicx7ZPGdBUbaTTI8IFyNNVtbsnqHXrWIhte0l0jPT82Z3", + "mTlbgD1V50RGAlTBYMwv1w62b+np4rMCykY+90SarUZBh8oNM9B3Ii8QFHuAFGUZookiHhoUY9C4TXaX", + "B2DvCQOtA7a3k9wul2xW9m2x8K44H5vW4XJhdpVByGYMohoTqF1EOZ5pmlzBaguLF8H9ch4sb0B25iWX", + "l7msAXljfq4S8LWTVGPuOx81IK9RNBjAJMTGdFhAyeC113tEAYy5AcZ/mIpQTYqeMuGc6cFMAkSgrrTI", + "BkLOh9fmf5kUWgyvHz+2H7KEMj60g0UwG8RW1LhQYSy4kKoeyuknsIBqvUWwcSatEqUJUQlAppzdbbEg", + "Im94wJ3uuyd2aB4ePJQbEKFILd+TIrPqp26AIl3uQPiqrJHqFlWX9AqqWqr7MmZaJWE3DkcbrReW0jkM", + "Mxtyr2ba7hK17JUKAIKDflOEvqSZzqUxTSsEFemYLegUSdItxGyxG1m4grBkZQyLoTC8XRSpme90zfyo", + "SdJ1QwZ7pBlzx7D82hFVZ6GsVZvZWhbGSSLmWIumWXilbGs1Wwlp/aIaBZEpxHTBDEnTFVlQufoX0Tk6", + "zK4xYsHAgzH/3dhPU6Hj2lJwwGKtBEvlLBhFOrZnpbnL7ZqZrYBP3TlazXCpR+UYaKVVExzbUOqU6jAG", + "RZYxQOJqsp0o/C8n2J1z0e+75rK/kX4fLT9yQmzYwdqKNvDwXz4JOSpqzu6J/WpVkIdKR0de34l/Z4Gp", + "bAWLHqqN0eba6O4iIotONx3C0aVQ7gkvzQzNoZixmZJV9j1pLewqrQ1g3Vhw/UrXUiWevILrM3BfxoOn", + "r8ZX9rXXm9p61NdH51wXDV5DfLJoenALND87+Wn7e+st6O8wi9CxHEMaMzW07Zwn5fFpJJPcFylbb3l9", + "X+Eyf2PtQ0OiVQGlXed3xLp2pYRiirLa/gIvtsfzDnixTajvGy/tHt0HhyNKlNglRrfjrGfb31u/2eBO", + "4hgIeb0RXRNvRe5iA8pe2/zB940trAb/GyAK8VHiSCx5ImhkuGvymWGF3By0r+xY55IrQskf5x9snWst", + "5WQ7SiC6VFnoVYY11nr/NfDv5n/F5B8swxSZpClokArPme/ci7/IgxkLulgUNhgx7/2VA4oDm+kratjX", + "aaBXTz9uq4n/tJdydvt6K4fS7HqxxrI0EAmrvsEPkS4dsuoihNCC0NySS3o1hDcpamkcoa5TVNlKcVda", + "2tqt8nsgof2EXtVOsk1IKMZqvSofIMn8Anqt22bRC6KFvZJsEqY0KiLVSTdV08/DhNDDpJRq1R5SqeyT", + "xNaKPUBawfoQxLytz27TBnbw7LJPipaX95hXuQvbBPMYlT3/APGEK8Amh1hxs4mZJdCotCq9vHwBNHI2", + "5W6sjJMVpoQZ/3vhZhFq0P2qA8GtbAgU/WZ1d+b6fSNiMfitbFC8Rq8gDgVW0E9qBx87ubt9/vT+iis6", + "DroeyvG1oYpSiAeIyBFoTyftGuqGeCZWxSwrMWwLurqzEi+SRCyLui+sX2R8bqewdYcJOIXg8rzuvMKs", + "7NQ+6KhzLMyDOytsLC2SjsrEQ1om13r2OIN2tybKhUDdt/7P1f5t7ou8ub4Zd+HOav8QS2XZ30MXdZ5y", + "wJmz1+rsUPjuG8uaKZYwI7/ZToK2gplpVTnvrdoHX0tuH3NY9/3OWGNf0o/q58Nrtdml06zFbnxQL7e9", + "RS3sJn44kLD/YFlF1jUE/m2InNZL7BskWtL7skjcdJRJ1voP3Jcy97Q42B2nB55KwWV7mxF+5OyvHHwH", + "piueWLrt2Hpssm004jLJXZ8L+UaEZhdTjzSZvbLdMNQ6iQ2/FFt+446ngT0n36Q3kVXk1vA20INwLoNz", + "IEo8bnIitvsMnl5sBaJElj18RI3w7LVZEZ5o8HiBTSQNbaVEp09oT/C+Vmf2sa+Iq6Z/p+FaW2i9jt22", + "wF79uiFf5dHorNaSrjJqXSUJttKiEa76S/Dv/mh01n9pYetfem/heQcRo+5Q5IyY4bHHnStMOWoKseOg", + "vjvFyd+WqPN0wLt5iGSKG93aZVeTbcVuSbHGKt+cDvvdPLJL5OJVzfShrSjG/UUvep0NO2Zls6TOPklr", + "1zP/+OxZF5juKLIXrI3dlSzz7aLxbxlXOdAtKdqAPng1iv6l0ZxF5r5KKiZirobVxvpj7WLuept2yOEG", + "QdjbezZSbiFoihvdyqOR3l6b/mlmIknEco3yGpe3tHtCNdEseLIqKwkJmxU3DzFFHGgbGLNbq+wzT23t", + "/tmqByauWUTwzTRaebvZVlVmCOu71l4+zWCAJmIB0kxtGSQrrxQdums2uh33s+IeDjllWlK5al1IikkN", + "e9tRdcOBuz6W0DllXFk/2N0hW3QCGXPBbVeiWCh9+tOTJ0/u5lraS3tvkmuk3LjKE1tyqer2UnfxcHnl", + "ladQtXWj60urHe7Ds+u8Tfgr1+d13WLrOxjXfU/qtyzpOmvdojysrka2FOEhTscgViYhd3Q7+rVbBu7t", + "lEf7HoOvSwftu0Q8FFBd7OGuDf4e8N5xcdA6gvHuhq0Yxvsi7hfFa/dcfBsc12/F8KlCe83Fd4ZbugG5", + "X6oLNG6GV2z9HIkX0W8YHkjY7pfXrubYZBJuuXdjd2fhIITWr5D5ro5Sv3/zIBOFRpSUd+AUZms3xany", + "ShOvB7J+8cnXJrp7FiV2UT4p4n55kBVftbtH7PK6UR+xHdQKPvW3ETdrN718IxVWu3jFQ3w/1y9CebBB", + "j0r42JthNtOhyPW2WEi1eSLXG4Mi30ge3cK591xjs9XNb1xQY8yM5g01/x3DvocYdo2qRa4bMYvq6uEq", + "D+aXrvaYQXXHyn2e6mi1Be4+5N3VQ/1vcJ4jk7BgaIAXzYLrvYdb+HPl9p3yqKjHr6NwYyqizACUrYqr", + "VPSA4Enq8uLt2gHp8g5uF2ItX+/KCqD48ucEtnUH3i7kcMOGafbs1kWWtf78No+zJqrKX/uv3UVM/Rcb", + "+7yKWXVfVfsWpwH5JaeScg0QuasdLl6/fPr06U+DzeHkNVBGNrl/ECTFJYQHAmJAeXLyZBOLMiOTWJLg", + "LUdSzCUo1SMZdi8iWq5sIIkk1HZorW33BWi56r+Yad+FG6N8PrenZ7CJUuNS2FoXRLly3W/LRWxqa/8Q", + "NUB5BMeeblfIi65v9HaJkjCrBzpPVRTXmNnSyVvYoGWV5CbVsHZpWrv0sMWvRQNJWUJ5Z8cOaJLUh13f", + "tlYnUk8d032rUf9VJV4t+ngTixbdpx/ewXDcgbIxSiXXBuQ9T1ZYdlnJugwkOX9FQsptu5A5UxokRLYL", + "hJEggzaWRbYJybW7De4Nx577E/Y3lFxd0bftwaFFtq5+7HZb52G7ILKW7tcRQ7XbB/YQQtistui3f5eS", + "qDFubde2dES2DFI1mrd1hbaNghYEGF4SQHnZ6T5ZYecNdztJ7baTo+KWmLVbAmpXmHy8eFu03vFcM+BG", + "qacLiya2aq0fv7EqqS64NFe2JdqCwRL87Qtr1+jcp5xdv1rjHgpGdyVHX1wQXcCHLsaTisbd5b01umyS", + "Y5MIzU/FlRWDBodsk+H3TDzNa1l2ld4lWm8tur9lFSit4xXX/v8DAAD//wFMWytApQAA", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/server/lib/stream/ffmpeg.go b/server/lib/stream/ffmpeg.go new file mode 100644 index 00000000..722490c5 --- /dev/null +++ b/server/lib/stream/ffmpeg.go @@ -0,0 +1,289 @@ +package stream + +import ( + "context" + "errors" + "fmt" + "math" + "os" + "os/exec" + "runtime" + "strconv" + "strings" + "sync" + "syscall" + "time" + + "github.com/onkernel/kernel-images/server/lib/logger" + "github.com/onkernel/kernel-images/server/lib/scaletozero" +) + +const ( + // exitCodeInitValue mirrors recorder to represent an unset exit code. + exitCodeInitValue = math.MinInt +) + +// FFmpegStreamer streams the display to an RTMP(S) endpoint. +type FFmpegStreamer struct { + mu sync.Mutex + + id string + binaryPath string + params Params + + cmd *exec.Cmd + exited chan struct{} + ffmpegErr error + exitCode int + startedAt time.Time + stz *scaletozero.Oncer +} + +// NewFFmpegStreamerFactory builds an FFmpeg-backed streamer factory. +func NewFFmpegStreamerFactory(pathToFFmpeg string, defaults Params, ctrl scaletozero.Controller) FFmpegStreamerFactory { + return func(id string, overrides Params) (Streamer, error) { + merged := mergeParams(defaults, overrides) + if err := validateParams(merged); err != nil { + return nil, err + } + + return &FFmpegStreamer{ + id: id, + binaryPath: pathToFFmpeg, + params: merged, + stz: scaletozero.NewOncer(ctrl), + exitCode: exitCodeInitValue, + }, nil + } +} + +func mergeParams(defaults Params, overrides Params) Params { + out := Params{ + FrameRate: defaults.FrameRate, + DisplayNum: defaults.DisplayNum, + Mode: defaults.Mode, + IngestURL: defaults.IngestURL, + PlaybackURL: defaults.PlaybackURL, + SecurePlaybackURL: defaults.SecurePlaybackURL, + } + if overrides.FrameRate != nil { + out.FrameRate = overrides.FrameRate + } + if overrides.DisplayNum != nil { + out.DisplayNum = overrides.DisplayNum + } + if overrides.Mode != "" { + out.Mode = overrides.Mode + } + if overrides.IngestURL != "" { + out.IngestURL = overrides.IngestURL + } + if overrides.PlaybackURL != nil { + out.PlaybackURL = overrides.PlaybackURL + } + if overrides.SecurePlaybackURL != nil { + out.SecurePlaybackURL = overrides.SecurePlaybackURL + } + if out.Mode == "" { + out.Mode = ModeInternal + } + return out +} + +func validateParams(p Params) error { + if p.IngestURL == "" { + return fmt.Errorf("ingest URL is required") + } + if p.FrameRate == nil { + return fmt.Errorf("frame rate is required") + } + if p.DisplayNum == nil { + return fmt.Errorf("display number is required") + } + return nil +} + +func (fs *FFmpegStreamer) ID() string { + return fs.id +} + +// Start begins streaming to the configured ingest URL. +func (fs *FFmpegStreamer) Start(ctx context.Context) error { + log := logger.FromContext(ctx) + + fs.mu.Lock() + if fs.cmd != nil { + fs.mu.Unlock() + return fmt.Errorf("stream already in progress") + } + + if err := fs.stz.Disable(ctx); err != nil { + fs.mu.Unlock() + return fmt.Errorf("failed to disable scale-to-zero: %w", err) + } + + fs.ffmpegErr = nil + fs.exitCode = exitCodeInitValue + fs.startedAt = time.Now() + fs.exited = make(chan struct{}) + + args, err := ffmpegStreamArgs(fs.params) + if err != nil { + _ = fs.stz.Enable(context.WithoutCancel(ctx)) + fs.mu.Unlock() + return err + } + log.Info(fmt.Sprintf("%s %s", fs.binaryPath, strings.Join(args, " "))) + + cmd := exec.Command(fs.binaryPath, args...) + cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + fs.cmd = cmd + fs.mu.Unlock() + + if err := cmd.Start(); err != nil { + _ = fs.stz.Enable(context.WithoutCancel(ctx)) + fs.mu.Lock() + fs.ffmpegErr = err + fs.cmd = nil + close(fs.exited) + fs.mu.Unlock() + return fmt.Errorf("failed to start ffmpeg process: %w", err) + } + + go fs.waitForCommand(ctx) + + // Detect immediate startup failures. + if err := waitForChan(ctx, 250*time.Millisecond, fs.exited); err == nil { + fs.mu.Lock() + defer fs.mu.Unlock() + if fs.ffmpegErr != nil { + return fmt.Errorf("failed to start ffmpeg process: %w", fs.ffmpegErr) + } + return fmt.Errorf("ffmpeg process exited immediately with code %d", fs.exitCode) + } + + return nil +} + +// Stop attempts a graceful shutdown of the ffmpeg process, escalating to SIGKILL on timeout. +func (fs *FFmpegStreamer) Stop(ctx context.Context) error { + defer fs.stz.Enable(context.WithoutCancel(ctx)) + + fs.mu.Lock() + cmd := fs.cmd + exited := fs.exited + fs.mu.Unlock() + + if cmd == nil { + return nil + } + + // Request graceful stop. + _ = syscall.Kill(-cmd.Process.Pid, syscall.SIGINT) + if err := waitForChan(ctx, 5*time.Second, exited); err == nil { + return nil + } + + // Force kill. + _ = syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL) + _ = waitForChan(ctx, 2*time.Second, exited) + return nil +} + +func (fs *FFmpegStreamer) IsStreaming(ctx context.Context) bool { + fs.mu.Lock() + defer fs.mu.Unlock() + + return fs.cmd != nil && fs.cmd.ProcessState == nil +} + +func (fs *FFmpegStreamer) Metadata() Metadata { + fs.mu.Lock() + defer fs.mu.Unlock() + + return Metadata{ + ID: fs.id, + Mode: fs.params.Mode, + IngestURL: fs.params.IngestURL, + PlaybackURL: fs.params.PlaybackURL, + SecurePlaybackURL: fs.params.SecurePlaybackURL, + StartedAt: fs.startedAt, + } +} + +func (fs *FFmpegStreamer) waitForCommand(ctx context.Context) { + defer fs.stz.Enable(context.WithoutCancel(ctx)) + + err := fs.cmd.Wait() + + fs.mu.Lock() + defer fs.mu.Unlock() + fs.ffmpegErr = err + if fs.cmd.ProcessState != nil { + fs.exitCode = fs.cmd.ProcessState.ExitCode() + } + fs.cmd = nil + close(fs.exited) + + if err != nil { + logger.FromContext(ctx).Info("ffmpeg stream exited with error", "err", err, "exitCode", fs.exitCode) + } else { + logger.FromContext(ctx).Info("ffmpeg stream exited", "exitCode", fs.exitCode) + } +} + +// ffmpegStreamArgs builds input/output arguments for live streaming. +func ffmpegStreamArgs(params Params) ([]string, error) { + input, err := screenCaptureArgs(params) + if err != nil { + return nil, err + } + + args := append(input, + "-c:v", "libx264", + "-preset", "veryfast", + "-tune", "zerolatency", + "-pix_fmt", "yuv420p", + "-g", strconv.Itoa(*params.FrameRate*2), + "-use_wallclock_as_timestamps", "1", + "-fflags", "nobuffer", + "-f", "flv", + params.IngestURL, + ) + + return args, nil +} + +func screenCaptureArgs(params Params) ([]string, error) { + switch runtime.GOOS { + case "darwin": + return []string{ + "-f", "avfoundation", + "-framerate", strconv.Itoa(*params.FrameRate), + "-pixel_format", "nv12", + "-i", fmt.Sprintf("%d:none", *params.DisplayNum), + }, nil + case "linux": + return []string{ + "-f", "x11grab", + "-framerate", strconv.Itoa(*params.FrameRate), + "-i", fmt.Sprintf(":%d", *params.DisplayNum), + }, nil + default: + return nil, fmt.Errorf("unsupported platform: %s", runtime.GOOS) + } +} + +// waitForChan returns nil if and only if the channel is closed within the timeout. +func waitForChan(ctx context.Context, timeout time.Duration, c <-chan struct{}) error { + select { + case <-c: + return nil + case <-time.After(timeout): + return errors.New("process did not exit within timeout") + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/server/lib/stream/manager.go b/server/lib/stream/manager.go new file mode 100644 index 00000000..72cdd4cc --- /dev/null +++ b/server/lib/stream/manager.go @@ -0,0 +1,86 @@ +package stream + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/onkernel/kernel-images/server/lib/logger" +) + +type StreamManager struct { + mu sync.Mutex + streams map[string]Streamer +} + +func NewStreamManager() *StreamManager { + return &StreamManager{ + streams: make(map[string]Streamer), + } +} + +func (sm *StreamManager) GetStream(id string) (Streamer, bool) { + sm.mu.Lock() + defer sm.mu.Unlock() + + stream, ok := sm.streams[id] + return stream, ok +} + +func (sm *StreamManager) ListStreams(ctx context.Context) []Streamer { + sm.mu.Lock() + defer sm.mu.Unlock() + + streams := make([]Streamer, 0, len(sm.streams)) + for _, stream := range sm.streams { + streams = append(streams, stream) + } + return streams +} + +func (sm *StreamManager) RegisterStream(ctx context.Context, streamer Streamer) error { + log := logger.FromContext(ctx) + + sm.mu.Lock() + defer sm.mu.Unlock() + + if _, exists := sm.streams[streamer.ID()]; exists { + return fmt.Errorf("stream with id '%s' already exists", streamer.ID()) + } + + sm.streams[streamer.ID()] = streamer + log.Info("registered new stream", "id", streamer.ID()) + return nil +} + +func (sm *StreamManager) DeregisterStream(ctx context.Context, streamer Streamer) error { + sm.mu.Lock() + defer sm.mu.Unlock() + + delete(sm.streams, streamer.ID()) + return nil +} + +func (sm *StreamManager) StopAll(ctx context.Context) error { + log := logger.FromContext(ctx) + + sm.mu.Lock() + defer sm.mu.Unlock() + + var errs []error + for id, streamer := range sm.streams { + if streamer.IsStreaming(ctx) { + if err := streamer.Stop(ctx); err != nil { + errs = append(errs, fmt.Errorf("failed to stop stream '%s': %w", id, err)) + log.Error("failed to stop stream during shutdown", "id", id, "err", err) + } + } + } + + if len(errs) > 0 { + return errors.Join(errs...) + } + + return nil +} diff --git a/server/lib/stream/rtmp_server.go b/server/lib/stream/rtmp_server.go new file mode 100644 index 00000000..0ed2d39d --- /dev/null +++ b/server/lib/stream/rtmp_server.go @@ -0,0 +1,310 @@ +package stream + +import ( + "context" + "crypto/rand" + "crypto/rsa" + "crypto/tls" + "crypto/x509" + "encoding/pem" + "errors" + "fmt" + "log/slog" + "math/big" + "net" + "strings" + "sync" + "time" + + "github.com/notedit/rtmp/format/rtmp" + "github.com/notedit/rtmp/pubsub" +) + +// RTMPServer implements an internal RTMP/RTMPS server backed by pubsub. +type RTMPServer struct { + mu sync.Mutex + rtmpAddr string + rtmpsAddr string + tlsConfig *tls.Config + streams map[string]*pubsub.PubSub + listener net.Listener + tlsListener net.Listener + server *rtmp.Server + running bool + logger *slog.Logger + wg sync.WaitGroup +} + +func NewRTMPServer(rtmpAddr, rtmpsAddr string, tlsConfig *tls.Config, logger *slog.Logger) *RTMPServer { + return &RTMPServer{ + rtmpAddr: rtmpAddr, + rtmpsAddr: rtmpsAddr, + tlsConfig: tlsConfig, + streams: make(map[string]*pubsub.PubSub), + logger: logger, + } +} + +func (s *RTMPServer) Start(ctx context.Context) error { + s.mu.Lock() + if s.running { + s.mu.Unlock() + return nil + } + if s.streams == nil { + s.streams = make(map[string]*pubsub.PubSub) + } + + rtmpListener, err := net.Listen("tcp", s.rtmpAddr) + if err != nil { + s.mu.Unlock() + return fmt.Errorf("failed to start rtmp listener: %w", err) + } + + srv := rtmp.NewServer() + srv.HandleConn = func(c *rtmp.Conn, nc net.Conn) { + s.handleConn(ctx, c, nc) + } + srv.LogEvent = func(c *rtmp.Conn, nc net.Conn, e int) { + if s.logger == nil { + return + } + s.logger.Debug("rtmp event", slog.String("event", rtmp.EventString[e]), slog.String("remote_addr", nc.RemoteAddr().String())) + } + + s.listener = rtmpListener + s.server = srv + s.running = true + s.mu.Unlock() + + if s.logger != nil { + s.logger.Info("rtmp listener started", slog.String("addr", rtmpListener.Addr().String())) + } + s.wg.Add(1) + go s.acceptLoop(ctx, rtmpListener, false) + + if s.tlsConfig != nil && s.rtmpsAddr != "" { + tlsListener, err := tls.Listen("tcp", s.rtmpsAddr, s.tlsConfig) + if err != nil { + _ = rtmpListener.Close() + return fmt.Errorf("failed to start rtmps listener: %w", err) + } + s.mu.Lock() + s.tlsListener = tlsListener + s.mu.Unlock() + + if s.logger != nil { + s.logger.Info("rtmps listener started", slog.String("addr", tlsListener.Addr().String())) + } + s.wg.Add(1) + go s.acceptLoop(ctx, tlsListener, true) + } + + return nil +} + +func (s *RTMPServer) acceptLoop(ctx context.Context, ln net.Listener, secure bool) { + defer s.wg.Done() + for { + conn, err := ln.Accept() + if err != nil { + select { + case <-ctx.Done(): + return + default: + } + if errors.Is(err, net.ErrClosed) { + return + } + if s.logger != nil { + s.logger.Warn("rtmp accept error", slog.String("err", err.Error()), slog.Bool("secure", secure)) + } + continue + } + go s.server.HandleNetConn(conn) + } +} + +func (s *RTMPServer) Close(ctx context.Context) error { + s.mu.Lock() + if !s.running { + s.mu.Unlock() + return nil + } + s.running = false + rtmpListener := s.listener + tlsListener := s.tlsListener + s.mu.Unlock() + + if rtmpListener != nil { + _ = rtmpListener.Close() + } + if tlsListener != nil { + _ = tlsListener.Close() + } + + done := make(chan struct{}) + go func() { + defer close(done) + s.wg.Wait() + }() + + select { + case <-done: + case <-ctx.Done(): + return ctx.Err() + } + return nil +} + +func (s *RTMPServer) EnsureStream(path string) { + s.mu.Lock() + defer s.mu.Unlock() + + if _, ok := s.streams[path]; !ok { + s.streams[path] = &pubsub.PubSub{} + } +} + +func (s *RTMPServer) IngestURL(streamPath string) string { + port := s.listenPort(s.rtmpAddr, "1935") + hostPort := net.JoinHostPort("127.0.0.1", port) + return fmt.Sprintf("rtmp://%s/%s", hostPort, strings.TrimPrefix(streamPath, "/")) +} + +func (s *RTMPServer) PlaybackURLs(host string, streamPath string) (rtmpURL *string, rtmpsURL *string) { + cleanHost := stripPort(host) + if cleanHost == "" { + cleanHost = "127.0.0.1" + } + streamPath = strings.TrimPrefix(streamPath, "/") + + if s.rtmpAddr != "" { + port := s.listenPort(s.rtmpAddr, "1935") + hostPort := net.JoinHostPort(cleanHost, port) + url := fmt.Sprintf("rtmp://%s/%s", hostPort, streamPath) + rtmpURL = &url + } + + s.mu.Lock() + tlsListener := s.tlsListener + s.mu.Unlock() + if tlsListener != nil && s.rtmpsAddr != "" { + port := s.listenPort(s.rtmpsAddr, "1936") + hostPort := net.JoinHostPort(cleanHost, port) + url := fmt.Sprintf("rtmps://%s/%s", hostPort, streamPath) + rtmpsURL = &url + } + + return rtmpURL, rtmpsURL +} + +func (s *RTMPServer) handleConn(ctx context.Context, c *rtmp.Conn, nc net.Conn) { + if err := c.Prepare(rtmp.StageGotPublishOrPlayCommand, 0); err != nil { + if s.logger != nil { + s.logger.Error("rtmp handshake failed", slog.String("err", err.Error())) + } + _ = nc.Close() + return + } + + path := strings.TrimPrefix(c.URL.Path, "/") + if path == "" { + path = "live/default" + } + + ps := s.ensurePubSub(path) + if c.Publishing { + if s.logger != nil { + s.logger.Info("rtmp publisher connected", slog.String("path", path)) + } + go func() { + ps.SetPub(c) + _ = nc.Close() + if s.logger != nil { + s.logger.Info("rtmp publisher disconnected", slog.String("path", path)) + } + }() + } else { + if s.logger != nil { + s.logger.Info("rtmp subscriber connected", slog.String("path", path)) + } + done := make(chan bool, 1) + go func() { + <-c.CloseNotify() + close(done) + if s.logger != nil { + s.logger.Info("rtmp subscriber disconnected", slog.String("path", path)) + } + }() + go ps.AddSub(done, c) + } +} + +func (s *RTMPServer) ensurePubSub(path string) *pubsub.PubSub { + s.mu.Lock() + defer s.mu.Unlock() + + ps, ok := s.streams[path] + if !ok { + ps = &pubsub.PubSub{} + s.streams[path] = ps + } + return ps +} + +func (s *RTMPServer) listenPort(addr string, defaultPort string) string { + _, port, err := net.SplitHostPort(addr) + if err == nil && port != "" { + return port + } + if strings.HasPrefix(addr, ":") { + return strings.TrimPrefix(addr, ":") + } + return defaultPort +} + +func stripPort(hostport string) string { + host, _, err := net.SplitHostPort(hostport) + if err != nil { + return hostport + } + return host +} + +// SelfSignedTLSConfig generates a minimal self-signed TLS config for RTMPS. +func SelfSignedTLSConfig() (*tls.Config, error) { + priv, err := rsa.GenerateKey(rand.Reader, 2048) + if err != nil { + return nil, fmt.Errorf("generate key: %w", err) + } + + serial, err := rand.Int(rand.Reader, big.NewInt(1<<62)) + if err != nil { + return nil, fmt.Errorf("serial: %w", err) + } + + template := x509.Certificate{ + SerialNumber: serial, + NotBefore: time.Now().Add(-1 * time.Hour), + NotAfter: time.Now().Add(365 * 24 * time.Hour), + KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, + DNSNames: []string{"localhost"}, + } + + der, err := x509.CreateCertificate(rand.Reader, &template, &template, &priv.PublicKey, priv) + if err != nil { + return nil, fmt.Errorf("create cert: %w", err) + } + + certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: der}) + keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(priv)}) + + cert, err := tls.X509KeyPair(certPEM, keyPEM) + if err != nil { + return nil, fmt.Errorf("tls key pair: %w", err) + } + + return &tls.Config{Certificates: []tls.Certificate{cert}}, nil +} diff --git a/server/lib/stream/stream.go b/server/lib/stream/stream.go new file mode 100644 index 00000000..fab5a1b6 --- /dev/null +++ b/server/lib/stream/stream.go @@ -0,0 +1,63 @@ +package stream + +import ( + "context" + "time" +) + +type Mode string + +const ( + ModeInternal Mode = "internal" + ModeRemote Mode = "remote" +) + +// Params holds stream creation settings. +type Params struct { + FrameRate *int + DisplayNum *int + IngestURL string + Mode Mode + PlaybackURL *string + SecurePlaybackURL *string +} + +// Metadata describes a running stream. +type Metadata struct { + ID string + Mode Mode + IngestURL string + PlaybackURL *string + SecurePlaybackURL *string + StartedAt time.Time +} + +// Streamer defines the interface for a streaming session. +type Streamer interface { + ID() string + Start(ctx context.Context) error + Stop(ctx context.Context) error + IsStreaming(ctx context.Context) bool + Metadata() Metadata +} + +// Manager defines the interface for tracking streaming sessions. +type Manager interface { + GetStream(id string) (Streamer, bool) + ListStreams(ctx context.Context) []Streamer + RegisterStream(ctx context.Context, streamer Streamer) error + DeregisterStream(ctx context.Context, streamer Streamer) error + StopAll(ctx context.Context) error +} + +// FFmpegStreamerFactory returns a Streamer configured with the provided id and params. +type FFmpegStreamerFactory func(id string, params Params) (Streamer, error) + +// InternalServer represents the internal RTMP(S) server used for live streaming. +type InternalServer interface { + Start(ctx context.Context) error + EnsureStream(path string) + IngestURL(streamPath string) string + PlaybackURLs(host string, streamPath string) (rtmpURL *string, rtmpsURL *string) + Close(ctx context.Context) error +} diff --git a/server/openapi.yaml b/server/openapi.yaml index 9b23c130..5a16324d 100644 --- a/server/openapi.yaml +++ b/server/openapi.yaml @@ -23,6 +23,66 @@ paths: $ref: "#/components/responses/ConflictError" "500": $ref: "#/components/responses/InternalError" + /stream/start: + post: + summary: Start live streaming to an internal RTMP(S) server or a remote RTMP(S) endpoint. + description: | + Start streaming the display to either an internally hosted RTMP(S) server (default) or a remote RTMP/RTMPS URL. + When using the internal server the response includes playback URLs that can be used by viewers. + operationId: startStream + requestBody: + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/StartStreamRequest" + responses: + "201": + description: Stream started + content: + application/json: + schema: + $ref: "#/components/schemas/StreamInfo" + "400": + $ref: "#/components/responses/BadRequestError" + "409": + $ref: "#/components/responses/ConflictError" + "500": + $ref: "#/components/responses/InternalError" + /stream/stop: + post: + summary: Stop a live stream + operationId: stopStream + requestBody: + required: false + content: + application/json: + schema: + $ref: "#/components/schemas/StopStreamRequest" + responses: + "200": + description: Stream stopped + "400": + $ref: "#/components/responses/BadRequestError" + "404": + $ref: "#/components/responses/NotFoundError" + "500": + $ref: "#/components/responses/InternalError" + /stream/list: + get: + summary: List active streams + operationId: listStreams + responses: + "200": + description: List of active streams + content: + application/json: + schema: + type: array + items: + $ref: "#/components/schemas/StreamInfo" + "500": + $ref: "#/components/responses/InternalError" /process/exec: post: summary: Execute a command synchronously @@ -978,6 +1038,66 @@ paths: $ref: "#/components/responses/InternalError" components: schemas: + StartStreamRequest: + type: object + properties: + id: + type: string + description: Optional identifier for the streaming session. Alphanumeric or hyphen. + pattern: "^[a-zA-Z0-9-]+$" + mode: + type: string + description: | + Where to send the stream output. "internal" starts a local RTMP(S) server and streams to it. + "remote" pushes the stream to the provided RTMP/RTMPS target_url. + enum: [internal, remote] + default: internal + target_url: + type: string + description: RTMP or RTMPS URL to push the stream to when mode is "remote". + pattern: "^rtmps?://.+" + framerate: + type: integer + description: Streaming framerate in fps (overrides server default) + minimum: 1 + maximum: 20 + additionalProperties: false + StopStreamRequest: + type: object + properties: + id: + type: string + description: Identifier of the stream to stop. Alphanumeric or hyphen. + pattern: "^[a-zA-Z0-9-]+$" + additionalProperties: false + StreamInfo: + type: object + required: [id, mode, ingest_url, is_streaming, started_at] + properties: + id: + type: string + description: Stream identifier + mode: + type: string + enum: [internal, remote] + description: Whether the stream is using the internal RTMP server or a remote endpoint + ingest_url: + type: string + description: URL ffmpeg is publishing to + playback_url: + type: [string, "null"] + description: RTMP playback URL if available (internal streams only) + secure_playback_url: + type: [string, "null"] + description: RTMPS playback URL when TLS is enabled for the internal server + started_at: + type: string + format: date-time + description: Timestamp when streaming started + is_streaming: + type: boolean + description: Whether the ffmpeg streaming process is currently running + additionalProperties: false StartRecordingRequest: type: object properties: