Skip to content

A better implementation? #61

@Playit3110

Description

@Playit3110

Hello Dave,
I am right now working on a simular project as you did and want to bring it further.

I had the same idea to use some MPEG encoding, but went a bit further. I used MP4 as the encoding (more specific fragmented MP4), because it just uses the <video> tag in HTML and the implementation with Media Source Extensions API is so easy, that I could make a client in seconds.

Now about the server

I used picamera2 and websockets to connect to a remote WS server, to minimize the amount of execution on the Pi that runs the camera. I wrote my own fMP4Output based on PyavOutput to get the fragmented MP4 and send it to a BytesIO virtual file.

But right now it is not working because the websocket gets stuck, no new frames are retrived from the camera and send.

Code

stream.py

#!/usr/bin/env python3
import sys, io
from threading import Condition

import asyncio
from websockets import connect
import ssl

from picamera2 import Picamera2
from picamera2.encoders import H264Encoder
from picamera2.outputs import fMP4Output


## Camera Resolution (width, height)
resolution = (1280, 720)


## Class to handle streaming output
class StreamOutput(io.BufferedIOBase):
	def __init__(self):
		self.frame = None
		self.condition = Condition()

	def seekable(self):
		return True

	def writable(self):
		return True

	def write(self, buf):
		with self.condition:
			self.frame = buf
			self.condition.notify_all()


## Class to handle WS requests
async def stream(wss):
	while True:
		with fmp4.condition:
			fmp4.condition.wait()
			await wss.send(fmp4.frame)


## Create Picamera2 instance and configure it
picam2 = Picamera2()
picam2.configure(
	picam2.create_video_configuration(
		main = {"size": resolution},
		buffer_count = 2
	)
)


fmp4 = StreamOutput()
picam2.start_recording(
	H264Encoder(10**7),
	fMP4Output(fmp4)
)


context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.load_verify_locations("./server-ca.crt")

async def main():
	## Set up and start the WS Client
	async with connect(
		"wss://example.com/stream",
		ssl = context
	) as wss:
		await stream(wss)
		await asyncio.get_running_loop().create_future()


try:
	asyncio.run(main())
finally:
	## Stop recording when the script is interrupted
	picam2.stop_recording()

fmp4output.py

from fractions import Fraction

import av

from .output import Output


class fMP4Output(Output):
    def __init__(self, output_name, pts=None):
        super().__init__(pts=pts)
        self._output_name = output_name
        self._streams = {}
        self._container = None
        # A user can set this to get notifications of failures.
        self.error_callback = None

    def _add_stream(self, encoder_stream, codec_name, **kwargs):
        # The output container that does the muxing needs to know about the streams for which packets
        # will be sent to it. It literally needs to copy them for the output container.
        stream = self._container.add_stream(codec_name, **kwargs)

        self._streams[encoder_stream] = stream

    def start(self):
        """Start the PyavOutput."""
        self._container = av.open(self._output_name, "w", format="mp4", options={
            "frag_duration": "1",
            "movflags": "+frag_keyframe+empty_moov"
        })
        super().start()

    def stop(self):
        """Stop the PyavOutput."""
        super().stop()
        if self._container:
            try:
                self._container.close()
            except Exception:
                pass
            self._container = None
            self._streams = {}

    def outputframe(self, frame, keyframe=True, timestamp=None, packet=None, audio=False):
        """Output an encoded frame using PyAv."""
        if self.recording and self._container:
            orig_stream = None
            # We must make a packet that looks like it came from our own container's version of the stream.
            if not packet:
                # No packet present. It must have come from a video encoder that isn't using libav, so make one up.
                packet = av.Packet(frame)
                packet.dts = timestamp
                packet.pts = timestamp
                packet.time_base = Fraction(1, 1000000)
                packet.stream = self._streams["video"]
            else:
                # We can perform a switcheroo on the packet's stream, swapping the encoder's version for ours!
                orig_stream = packet.stream
                if orig_stream not in self._streams:
                    raise RuntimeError("Stream not found in PyavOutput")
                packet.stream = self._streams[orig_stream]

            try:
                self._container.mux(packet)
            except Exception as e:
                try:
                    self._container.close()
                except Exception:
                    pass
                self._container = None
                if self.error_callback:
                    self.error_callback(e)

            # Put the original stream back, just in case the encoder has multiple outputs and will pass
            # it to each one.
            packet.stream = orig_stream

            self.outputtimestamp(timestamp)

Client

Code

index.html

<!DOCTYPE html>
<html lang="en">
<head>
	<meta charset="UTF-8">
	<meta name="viewport" content="width=device-width, initial-scale=1.0">
	<title>WS Video</title>
	<script src="stream.js"></script>
</head>
<body>
	<video id="video" controls></video>
</body>
</html>

stream.js

let video = document.getElementById("video");

let url = new URL(location.href);
url.protocol = "wss";
url.port = 8000;

let mediasource = new MediaSource();
video.src = URL.createObjectURL(mediasource);

let buffer, ws, attempts = 0, messages = [];
mediasource.onsourceopen = function connect() {
	if(buffer)
		mediasource.removeSourceBuffer(buffer);
	buffer = mediasource.addSourceBuffer("video/mp4; codecs=\"avc1.42E01E, mp4a.40.2\"");

	buffer.onupdateend = function() {
		if(
			mediasource.readyState == "open" &&
			this.updating == false &&
			messages.length > 0
		) this.appendBuffer(messages.shift());

		if(video.paused)
			video.play();
	};


	ws = new WebSocket(url);
	ws.binaryType = "arraybuffer";

	ws.onerror = function() {
		if(attempts++ < 3)
			setTimeout(connect, 3000);
	}

	ws.onmessage = function({data: message}) {
		messages.push(message);
		buffer.onupdateend();
	}
};

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions