Skip to content

Commit c9b42b9

Browse files
authored
Merge pull request #7003 from ant-media/SrtRestream
SRT Restream Endpoint is accepted.
2 parents 38f23c3 + bf95461 commit c9b42b9

File tree

15 files changed

+677
-462
lines changed

15 files changed

+677
-462
lines changed

src/main/java/io/antmedia/datastore/db/InMemoryDataStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public boolean removeEndpoint(String id, Endpoint endpoint, boolean checkRTMPUrl
145145
for (Iterator<Endpoint> iterator = endPointList.iterator(); iterator.hasNext();) {
146146
Endpoint endpointItem = iterator.next();
147147
if(checkRTMPUrl) {
148-
if (endpointItem.getRtmpUrl().equals(endpoint.getRtmpUrl())) {
148+
if (endpointItem.getEndpointUrl().equals(endpoint.getEndpointUrl())) {
149149
iterator.remove();
150150
result = true;
151151
break;

src/main/java/io/antmedia/datastore/db/MapBasedDataStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ public boolean removeEndpoint(String id, Endpoint endpoint, boolean checkRTMPUrl
197197
for (Iterator<Endpoint> iterator = endPointList.iterator(); iterator.hasNext();) {
198198
Endpoint endpointItem = iterator.next();
199199
if (checkRTMPUrl) {
200-
if (endpointItem.getRtmpUrl().equals(endpoint.getRtmpUrl())) {
200+
if (endpointItem.getEndpointUrl().equals(endpoint.getEndpointUrl())) {
201201
iterator.remove();
202202
result = true;
203203
break;

src/main/java/io/antmedia/datastore/db/MongoStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,7 @@ public boolean removeEndpoint(String id, Endpoint endpoint, boolean checkRTMPUrl
513513
for (Iterator<Endpoint> iterator = endPointList.iterator(); iterator.hasNext();) {
514514
Endpoint endpointItem = iterator.next();
515515
if(checkRTMPUrl) {
516-
if (endpointItem.getRtmpUrl().equals(endpoint.getRtmpUrl())) {
516+
if (endpointItem.getEndpointUrl().equals(endpoint.getEndpointUrl())) {
517517
iterator.remove();
518518
result = true;
519519
break;

src/main/java/io/antmedia/datastore/db/types/Endpoint.java

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,19 @@ public class Endpoint {
2323
@Schema(description = "The service name like facebook, periscope, youtube or generic")
2424
private String type;
2525

26+
/**
27+
* RTMP Or SRT URL of the endpoint
28+
*/
29+
@Schema(description = "RTMP or SRT URL of the endpoint")
30+
private String endpointUrl;
31+
2632
/**
2733
* RTMP URL of the endpoint
34+
*
35+
* @deprecated use {@link #endpointUrl}
36+
*
2837
*/
38+
@Deprecated(since = "3.0" , forRemoval = true)
2939
@Schema(description = "The RTMP URL of the endpoint")
3040
private String rtmpUrl;
3141

@@ -46,22 +56,28 @@ public Endpoint() {
4656
public Endpoint(String rtmpUrl, String type, String endpointServiceId, String status) {
4757
this();
4858
this.status = status;
49-
this.rtmpUrl = rtmpUrl;
59+
this.endpointUrl = rtmpUrl;
5060
this.type = type;
5161
this.endpointServiceId = endpointServiceId;
5262
}
5363

54-
public String getRtmpUrl() {
55-
return rtmpUrl;
64+
public String getEndpointUrl() {
65+
if(endpointUrl==null)
66+
return rtmpUrl;
67+
return endpointUrl;
5668
}
5769

58-
public void setRtmpUrl(String rtmpUrl) {
59-
this.rtmpUrl = rtmpUrl;
70+
public void setEndpointUrl(String endpointUrl) {
71+
this.endpointUrl = endpointUrl;
72+
this.rtmpUrl = endpointUrl;
6073
}
6174

6275
public String getEndpointServiceId() {
6376
return endpointServiceId;
6477
}
78+
public void setRtmpUrl(String rtmpUrl){
79+
this.rtmpUrl = rtmpUrl;
80+
}
6581

6682
public void setEndpointServiceId(String endpointServiceId) {
6783
this.endpointServiceId = endpointServiceId;

src/main/java/io/antmedia/muxer/RtmpMuxer.java renamed to src/main/java/io/antmedia/muxer/EndpointMuxer.java

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.bytedeco.ffmpeg.global.avutil.AV_ROUND_PASS_MINMAX;
2020
import static org.bytedeco.ffmpeg.global.avutil.av_rescale_q;
2121
import static org.bytedeco.ffmpeg.global.avutil.av_rescale_q_rnd;
22+
import org.bytedeco.ffmpeg.global.avutil;
2223

2324
import java.nio.ByteBuffer;
2425
import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,13 +35,12 @@
3435
import org.bytedeco.ffmpeg.avformat.AVStream;
3536
import org.bytedeco.ffmpeg.avutil.AVRational;
3637
import org.bytedeco.ffmpeg.global.avcodec;
37-
import org.bytedeco.ffmpeg.global.avutil;
3838
import org.bytedeco.javacpp.BytePointer;
3939
import org.bytedeco.javacpp.SizeTPointer;
4040

4141
import io.vertx.core.Vertx;
4242

43-
public class RtmpMuxer extends Muxer {
43+
public class EndpointMuxer extends Muxer {
4444

4545
private String url;
4646
private volatile boolean trailerWritten = false;
@@ -55,25 +55,40 @@ public class RtmpMuxer extends Muxer {
5555
private AtomicBoolean preparedIO = new AtomicBoolean(false);
5656
private AtomicBoolean cancelOpenIO = new AtomicBoolean(false);
5757

58-
public RtmpMuxer(String url, Vertx vertx) {
58+
public String muxerType = null;
59+
60+
public EndpointMuxer(String url, Vertx vertx) {
5961
super(vertx);
60-
format = "flv";
62+
this.format = "flv";
6163
this.url = url;
6264

63-
parseRtmpURL(this.url);
65+
parseEndpointURL(this.url);
6466
}
65-
void parseRtmpURL(String url){
66-
if(url == null)
67-
return;
68-
// check if app name is present in the URL rtmp://Domain.com/AppName/StreamId
69-
String regex = "rtmp(s)?://[a-zA-Z0-9\\.-]+(:[0-9]+)?/([^/]+)/.*";
7067

71-
Pattern rtmpAppName = Pattern.compile(regex);
72-
Matcher checkAppName = rtmpAppName.matcher(url);
68+
public String getMuxerType() {
69+
return muxerType;
70+
}
7371

74-
if (!checkAppName.matches()) {
75-
//this is the fix to send stream for urls without app
76-
setOption("rtmp_app","");
72+
void parseEndpointURL(String url){
73+
if(url == null)
74+
return;
75+
if(url.startsWith("rtmp")) {
76+
format = "flv";
77+
muxerType = "rtmp";
78+
// check if app name is present in the URL rtmp://Domain.com/AppName/StreamId
79+
String regex = "rtmp(s)?://[a-zA-Z0-9\\.-]+(:[0-9]+)?/([^/]+)/.*";
80+
81+
Pattern rtmpAppName = Pattern.compile(regex);
82+
Matcher checkAppName = rtmpAppName.matcher(url);
83+
84+
if (!checkAppName.matches()) {
85+
//this is the fix to send stream for urls without app
86+
setOption("rtmp_app", "");
87+
}
88+
}
89+
else if(url.startsWith("srt")){
90+
muxerType = "srt";
91+
format = "mpegts";
7792
}
7893
}
7994
@Override
@@ -166,7 +181,7 @@ public synchronized boolean prepareIO()
166181
{
167182
clearResource();
168183
setStatus(IAntMediaStreamHandler.BROADCAST_STATUS_FAILED);
169-
logger.error("Cannot initializeOutputFormatContextIO for rtmp endpoint:{}", url);
184+
logger.error("Cannot initializeOutputFormatContextIO for {} endpoint:{}", muxerType ,url);
170185
}
171186

172187
return null;
@@ -259,7 +274,7 @@ private boolean exitIfCancelled() {
259274
public synchronized boolean addVideoStream(int width, int height, AVRational timebase, int codecId, int streamIndex, boolean isAVC, AVCodecParameters codecpar) {
260275

261276
boolean result = super.addVideoStream(width, height, timebase, codecId, streamIndex, isAVC, codecpar);
262-
if (result)
277+
if (result && this.format.equals("flv"))
263278
{
264279
AVStream outStream = getOutputFormatContext().streams(inputOutputStreamIndexMap.get(streamIndex));
265280

@@ -290,7 +305,7 @@ public synchronized void writePacket(AVPacket pkt, final AVRational inputTimebas
290305
writeFrameInternal(pkt, inputTimebase, outputTimebase, context, codecType);
291306
}
292307

293-
private synchronized void writeFrameInternal(AVPacket pkt, AVRational inputTimebase, AVRational outputTimebase,
308+
public synchronized void writeFrameInternal(AVPacket pkt, AVRational inputTimebase, AVRational outputTimebase,
294309
AVFormatContext context, int codecType)
295310
{
296311
long pts = pkt.pts();
@@ -321,7 +336,7 @@ private synchronized void writeFrameInternal(AVPacket pkt, AVRational inputTimeb
321336
return;
322337
}
323338

324-
while (av_bsf_receive_packet(bsfFilterContextList.get(0), getTmpPacket()) == 0)
339+
while ((ret = av_bsf_receive_packet(bsfFilterContextList.get(0), getTmpPacket())) == 0)
325340
{
326341
if (!headerWritten)
327342
{
@@ -410,13 +425,13 @@ public synchronized void writeVideoBuffer(ByteBuffer encodedVideoFrame, long dts
410425
{
411426

412427
if (!isRunning.get() || !registeredStreamIndexList.contains(streamIndex)) {
413-
logPacketIssue("Not writing to RTMP muxer because it's not started for {}", url);
428+
logPacketIssue("Not writing to {} muxer because it's not started for {}", muxerType,url);
414429
return;
415430
}
416431

417432
if (!keyFrameReceived && isKeyFrame) {
418433
keyFrameReceived = true;
419-
logger.info("Key frame is received to start for rtmp:{}", url);
434+
logger.info("Key frame is received to start for {}:{}", muxerType,url);
420435
}
421436

422437
if (keyFrameReceived) {

src/main/java/io/antmedia/muxer/MuxAdaptor.java

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -632,9 +632,9 @@ public static void setUpEndPoints(MuxAdaptor muxAdaptor, Broadcast broadcast, Ve
632632
if (endPointList != null && !endPointList.isEmpty())
633633
{
634634
for (Endpoint endpoint : endPointList) {
635-
RtmpMuxer rtmpMuxer = new RtmpMuxer(endpoint.getRtmpUrl(), vertx);
636-
rtmpMuxer.setStatusListener(muxAdaptor);
637-
muxAdaptor.addMuxer(rtmpMuxer);
635+
EndpointMuxer endpointMuxer = new EndpointMuxer(endpoint.getEndpointUrl(), vertx);
636+
endpointMuxer.setStatusListener(muxAdaptor);
637+
muxAdaptor.addMuxer(endpointMuxer);
638638
}
639639
}
640640
}
@@ -2347,7 +2347,7 @@ else if(recordType == RecordType.WEBM) {
23472347
return muxer;
23482348
}
23492349

2350-
public boolean prepareMuxer(Muxer muxer, int resolutionHeight)
2350+
public boolean prepareMuxer(Muxer muxer, int resolutionHeight)
23512351
{
23522352
boolean streamAdded = false;
23532353
muxer.init(scope, streamId, resolutionHeight, getSubfolder(getBroadcast(), getAppSettings()), 0);
@@ -2460,31 +2460,31 @@ public ClientBroadcastStream getBroadcastStream() {
24602460
}
24612461

24622462

2463-
public Result startRtmpStreaming(String rtmpUrl, int resolutionHeight)
2463+
public Result startEndpointStreaming(String endpointUrl, int resolutionHeight)
24642464
{
24652465
Result result = new Result(false);
2466-
rtmpUrl = rtmpUrl.replaceAll("[\n\r\t]", "_");
2466+
endpointUrl = endpointUrl.replaceAll("[\n\r\t]", "_");
24672467

24682468
if (!isRecording.get())
24692469
{
2470-
logger.warn("Start rtmp streaming return false for stream:{} because stream is being prepared", streamId);
2471-
result.setMessage("Start rtmp streaming return false for stream:"+ streamId +" because stream is being prepared. Try again");
2470+
logger.warn("Start endpoint streaming return false for stream:{} because stream is being prepared", streamId);
2471+
result.setMessage("Start endpoint streaming return false for stream:"+ streamId +" because stream is being prepared. Try again");
24722472
return result;
24732473
}
2474-
logger.info("start rtmp streaming for stream id:{} to {} with requested resolution height{} stream resolution:{}", streamId, rtmpUrl, resolutionHeight, height);
2474+
logger.info("start endpoint streaming for stream id:{} to {} with requested resolution height{} stream resolution:{}", streamId, endpointUrl, resolutionHeight, height);
24752475

24762476
if (resolutionHeight == 0 || resolutionHeight == height)
24772477
{
2478-
RtmpMuxer rtmpMuxer = new RtmpMuxer(rtmpUrl, vertx);
2479-
rtmpMuxer.setStatusListener(this);
2480-
if (prepareMuxer(rtmpMuxer, resolutionHeight))
2478+
EndpointMuxer endpointMuxer = new EndpointMuxer(endpointUrl, vertx);
2479+
endpointMuxer.setStatusListener(this);
2480+
if (prepareMuxer(endpointMuxer, resolutionHeight))
24812481
{
24822482
result.setSuccess(true);
24832483
}
24842484
else
24852485
{
2486-
logger.error("RTMP prepare returned false so that rtmp pushing to {} for {} didn't started ", rtmpUrl, streamId);
2487-
result.setMessage("RTMP prepare returned false so that rtmp pushing to " + rtmpUrl + " for "+ streamId +" didn't started ");
2486+
logger.error("endpoint prepare returned false so that stream pushing to {} for {} didn't started ", endpointUrl, streamId);
2487+
result.setMessage("endpoint prepare returned false so that stream pushing to " + endpointUrl + " for "+ streamId +" didn't started ");
24882488
}
24892489
}
24902490

@@ -2555,13 +2555,13 @@ private void tryToRepublish(String url, Long id)
25552555
logger.info("Health check process failed, trying to republish to the endpoint: {}", url);
25562556

25572557
//TODO: 0 as second parameter may cause a problem
2558-
stopRtmpStreaming(url, 0);
2559-
startRtmpStreaming(url, height);
2558+
stopEndpointStreaming(url, 0);
2559+
startEndpointStreaming(url, height);
25602560
retryCounter.put(url, tmpRetryCount + 1);
25612561
}
25622562
else{
25632563
logger.info("Exceeded republish retry limit, endpoint {} can't be reached and will be closed" , url);
2564-
stopRtmpStreaming(url, 0);
2564+
stopEndpointStreaming(url, 0);
25652565
sendEndpointErrorNotifyHook(url);
25662566
retryCounter.remove(url);
25672567
}
@@ -2623,12 +2623,12 @@ private void updateBroadcastRecord() {
26232623
for (Iterator iterator = broadcast.getEndPointList().iterator(); iterator.hasNext();)
26242624
{
26252625
Endpoint endpoint = (Endpoint) iterator.next();
2626-
String statusUpdate = endpointStatusUpdateMap.getOrDefault(endpoint.getRtmpUrl(), null);
2626+
String statusUpdate = endpointStatusUpdateMap.getOrDefault(endpoint.getEndpointUrl(), null);
26272627
if (statusUpdate != null) {
26282628
endpoint.setStatus(statusUpdate);
26292629
}
26302630
else {
2631-
logger.warn("Endpoint is not found to update its status to {} for rtmp url:{}", statusUpdate, endpoint.getRtmpUrl());
2631+
logger.warn("Endpoint is not found to update its status to {} for rtmp url:{}", statusUpdate, endpoint.getEndpointUrl());
26322632
}
26332633
}
26342634
BroadcastUpdate broadcastUpdate = new BroadcastUpdate();
@@ -2641,38 +2641,38 @@ private void updateBroadcastRecord() {
26412641
}
26422642
}
26432643

2644-
public RtmpMuxer getRtmpMuxer(String rtmpUrl)
2644+
public EndpointMuxer getEndpointMuxer(String rtmpUrl)
26452645
{
2646-
RtmpMuxer rtmpMuxer = null;
2646+
EndpointMuxer endpointMuxer = null;
26472647
synchronized (muxerList)
26482648
{
26492649
Iterator<Muxer> iterator = muxerList.iterator();
26502650
while (iterator.hasNext())
26512651
{
26522652
Muxer muxer = iterator.next();
2653-
if (muxer instanceof RtmpMuxer &&
2654-
((RtmpMuxer)muxer).getOutputURL().equals(rtmpUrl))
2653+
if (muxer instanceof EndpointMuxer &&
2654+
((EndpointMuxer)muxer).getOutputURL().equals(rtmpUrl))
26552655
{
2656-
rtmpMuxer = (RtmpMuxer) muxer;
2656+
endpointMuxer = (EndpointMuxer) muxer;
26572657
break;
26582658
}
26592659
}
26602660
}
2661-
return rtmpMuxer;
2661+
return endpointMuxer;
26622662
}
26632663

2664-
public Result stopRtmpStreaming(String rtmpUrl, int resolutionHeight)
2664+
public Result stopEndpointStreaming(String endpointUrl, int resolutionHeight)
26652665
{
26662666
Result result = new Result(false);
26672667
if (resolutionHeight == 0 || resolutionHeight == height)
26682668
{
2669-
RtmpMuxer rtmpMuxer = getRtmpMuxer(rtmpUrl);
2670-
String status = statusMap.getOrDefault(rtmpUrl, null);
2671-
if (rtmpMuxer != null)
2669+
EndpointMuxer endpointMuxer = getEndpointMuxer(endpointUrl);
2670+
String status = statusMap.getOrDefault(endpointUrl, null);
2671+
if (endpointMuxer != null)
26722672
{
2673-
muxerList.remove(rtmpMuxer);
2674-
statusMap.remove(rtmpUrl);
2675-
rtmpMuxer.writeTrailer();
2673+
muxerList.remove(endpointMuxer);
2674+
statusMap.remove(endpointUrl);
2675+
endpointMuxer.writeTrailer();
26762676
result.setSuccess(true);
26772677
}
26782678
else if(status == null

src/main/java/io/antmedia/muxer/Muxer.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,10 @@ public abstract class Muxer {
187187
avRationalTimeBase.den(1);
188188
}
189189

190+
public void setFormat(String format) {
191+
this.format = format;
192+
}
193+
190194
protected String subFolder = null;
191195

192196
/**
@@ -235,7 +239,7 @@ public void setStreamIndex(int streamIndex) {
235239
public void setKeyFrame(boolean isKeyFrame) {
236240
this.keyFrame = isKeyFrame;
237241
}
238-
242+
239243
public ByteBuffer getEncodedVideoFrame() {
240244
return encodedVideoFrame;
241245
}
@@ -296,6 +300,10 @@ protected Muxer(Vertx vertx) {
296300
logger = LoggerFactory.getLogger(this.getClass());
297301
}
298302

303+
public List<AVBSFContext> getBsfFilterContextList() {
304+
return bsfFilterContextList;
305+
}
306+
299307
public static File getPreviewFile(IScope scope, String name, String extension) {
300308
String appScopeName = ScopeUtils.findApplication(scope).getName();
301309
return new File(String.format("%s/webapps/%s/%s", System.getProperty("red5.root"), appScopeName,

0 commit comments

Comments
 (0)