-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdeepstream_run.py
More file actions
210 lines (176 loc) · 7.88 KB
/
deepstream_run.py
File metadata and controls
210 lines (176 loc) · 7.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
#!/usr/bin/env python3
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst, GLib
import argparse
import threading
import signal
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'src/')))
from pipeline_builder import PipelineBuilder
from rtsp_streamer import start_rtsp_streaming
from hls_http_server import run_hls_http_server, create_temp_hls_dir
from camera_resolutions import CameraResolutions
sys.path.append('/opt/nvidia/deepstream/deepstream/sources/deepstream_python_apps/apps/')
from common.bus_call import bus_call
class DeepStreamApp:
def __init__(self, options):
self.options = options
if self.options.graph:
self._setup_graph_dump()
Gst.init(None)
self.set_display_env()
self.hls_dir = None
self._setup_sink()
builder = PipelineBuilder(
self.options,
hls_dir = self.hls_dir
)
self.pipeline = builder.build()
def set_display_env(self):
"""Set or unset the DISPLAY environment variable based on output usage.
Unsets DISPLAY if an output sink is defined (like RTSP or HLS),
enabling file-only operation without EGL. Otherwise, uses ':0'.
"""
if self.options.output_uri:
if "DISPLAY" in os.environ:
os.unsetenv('DISPLAY')
del os.environ['DISPLAY']
else:
os.environ['DISPLAY'] = ':0'
def _setup_graph_dump(self):
"""
Configure an output directory for pipeline graph dumps.
Creates a "pipeline_graphs" subfolder in the script's directory if it
doesn't exist, and sets GST_DEBUG_DUMP_DOT_DIR to enable GStreamer
to dump DOT graph files for inspection.
"""
subfolder = "pipeline_graphs"
script_dir = os.path.dirname(os.path.abspath(__file__))
dump_dir = os.path.join(script_dir, subfolder)
os.makedirs(dump_dir, exist_ok=True)
os.environ['GST_DEBUG_DUMP_DOT_DIR'] = dump_dir
def handle_sigint(self, user_data=None):
"""Handle SIGINT signal by sending EOS to the pipeline and waiting to finish.
Sends an End-Of-Stream event, then blocks until an EOS or ERROR message
is received, ensuring graceful shutdown.
"""
print("SIGINT received - sending EOS to upstream source element")
self.pipeline.send_event(Gst.Event.new_eos())
# Wait for EOS message
bus = self.pipeline.get_bus()
# Wait until we receive an EOS or ERROR message
while True:
msg = bus.timed_pop_filtered(
Gst.CLOCK_TIME_NONE,
Gst.MessageType.EOS | Gst.MessageType.ERROR
)
if msg:
t = msg.type
if t == Gst.MessageType.EOS:
print("End-Of-Stream reached.")
break
elif t == Gst.MessageType.ERROR:
err, debug = msg.parse_error()
print(
f"Error received from element {msg.src.get_name()}: {err.message}")
print(
f"Debugging information: {debug if debug else 'None'}")
break
else:
break
def setup_bus(self, pipeline, loop):
"""Connect the GStreamer pipeline's bus to the main loop for message handling.
Sets up signal-watch and links messages to the shared bus_call handler.
"""
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
def _setup_sink(self):
"""
Configure the output sink (RTSP, HLS, or others).
Starts RTSP streaming on the configured port if output is 'rtsp',
or sets up an HLS server if output is 'hls'. Stores the temporary
HLS directory path in self.hls_dir when applicable.
"""
if self.options.output_uri == "rtsp":
start_rtsp_streaming(self.options.outport, self.options.codec)
elif self.options.output_uri == "hls":
self.hls_dir = create_temp_hls_dir()
threading.Thread(
target=run_hls_http_server,
args=(self.hls_dir, self.options.outport),
daemon=True
).start()
def run(self):
"""
Start the GStreamer pipeline and run the GLib main loop.
Optionally dumps the pipeline graph if enabled, sets up message
handling, attaches a SIGINT handler for graceful shutdown, and launches
the pipeline in PLAYING state until terminated or an EOS is received.
"""
if self.options.graph:
Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "deepstream_pipeline")
print("Starting pipeline")
self.loop = GLib.MainLoop()
self.setup_bus(self.pipeline, self.loop)
# Register SIGINT handler
GLib.unix_signal_add(
GLib.PRIORITY_DEFAULT,
signal.SIGINT,
self.handle_sigint
)
self.pipeline.set_state(Gst.State.PLAYING)
try:
self.loop.run()
except BaseException:
pass
finally:
self.pipeline.set_state(Gst.State.NULL)
print("Pipeline shut down cleanly.")
def parse_args():
parser = argparse.ArgumentParser(description="DeepStream-based video pipeline app")
# Input options
parser.add_argument("-i", "--input", dest="input_uri",
help="Input source: rtsp://..., file path, 'csi', 'udp' or /dev/videoX",
default="csi")
parser.add_argument("-o", "--output", dest="output_uri",
help="Output sink: 'rtsp', file path, 'hls', 'udp', or leave empty for EGL display",
default=None)
parser.add_argument("-c", "--codec", dest="codec",
choices=["h264", "h265"],
help="Codec to be used", default="h264")
parser.add_argument("-n", "--noinfer", dest="no_infer",
action="store_true", help="Disable Yolo inference", default=False)
# Network options
parser.add_argument("-p", "--port", dest="outport", type=int,
help="Outbound port number [default: 5000]", default=5000)
parser.add_argument("-q", "--inport", dest="inport", type=int,
help="Inbound port number [default: 5000]", default=5000)
parser.add_argument("-H", "--host", dest="udp_host",
help="UDP stream host [default: 192.168.137.1]", default="192.168.137.1")
parser.add_argument("-b", "--bitrate", dest="bitrate", type=int,
help="Bitrate for UDP/RTSP/HLS outputs [default: 4000000]", default=4000000)
# Config and graph
script_dir = os.path.dirname(os.path.abspath(__file__))
default_cfg = script_dir + "/config_infer_primary_Yolov4-tiny.txt"
parser.add_argument("-f", "--config", dest="config_file",
help="DeepStream config file path", default=default_cfg)
parser.add_argument("-g", "--graph", dest="graph", action="store_true",
help="Generate pipeline .dot file", default=False)
# Camera resolutions
resolutions = CameraResolutions()
resolution_choices = list(resolutions.MODES.keys())
help_text = "Camera resolution mode number:\n" + "\n".join(
[f" {k}: {v[0]}x{v[1]} @ {v[2]} fps" for k, v in resolutions.MODES.items()]
)
parser.add_argument("-r", "--resolution", dest="resolution_mode", type=int,
choices=resolution_choices, help=help_text, default=3)
return parser.parse_args()
def main():
user_options = parse_args()
app = DeepStreamApp(user_options)
app.run()
if __name__ == '__main__':
sys.exit(main())