Skip to content

Commit b934823

Browse files
committed
merge master
2 parents a105c11 + 4e24722 commit b934823

File tree

2 files changed

+158
-8
lines changed

2 files changed

+158
-8
lines changed

src/main/java/io/antmedia/muxer/EndpointMuxer.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public class EndpointMuxer extends Muxer {
5353
boolean keyFrameReceived = false;
5454

5555
private AtomicBoolean preparedIO = new AtomicBoolean(false);
56+
private AtomicBoolean cancelOpenIO = new AtomicBoolean(false);
5657

5758
public String muxerType = null;
5859

@@ -156,32 +157,34 @@ public synchronized boolean prepareIO()
156157
return false;
157158
}
158159
preparedIO.set(true);
160+
cancelOpenIO.set(false);
159161
boolean result = false;
160162
//if there is a stream in the output format context, try to push
161163
if (getOutputFormatContext().nb_streams() > 0)
162164
{
163165
this.vertx.executeBlocking(() -> {
164-
165166
if (openIO())
166167
{
167168
if (bsfFilterContextList.isEmpty())
168169
{
169170
writeHeader();
170171
return null;
171172
}
172-
isRunning.set(true);
173-
setStatus(IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING);
173+
if (!exitIfCancelled())
174+
{
175+
isRunning.set(true);
176+
setStatus(IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING);
177+
}
178+
174179
}
175180
else
176181
{
177182
clearResource();
178183
setStatus(IAntMediaStreamHandler.BROADCAST_STATUS_FAILED);
179184
logger.error("Cannot initializeOutputFormatContextIO for {} endpoint:{}", muxerType ,url);
180185
}
181-
186+
182187
return null;
183-
184-
185188
}, false);
186189

187190
result = true;
@@ -225,19 +228,24 @@ public synchronized boolean writeHeader() {
225228
*/
226229
@Override
227230
public synchronized void writeTrailer() {
231+
cancelOpenIO.set(true);
228232
if(headerWritten){
229233
super.writeTrailer();
230234
trailerWritten = true;
231235
}
232236
else{
233237
logger.info("Not writing trailer because header is not written yet");
238+
clearResource();
234239
}
235240
setStatus(IAntMediaStreamHandler.BROADCAST_STATUS_FINISHED);
236241
}
237242

238243
@Override
239244
public synchronized void clearResource() {
240245
super.clearResource();
246+
if (!headerWritten) {
247+
preparedIO.set(false);
248+
}
241249
/**
242250
* Don't free the allocatedExtraDataPointer because it's internally deallocated
243251
*
@@ -250,6 +258,15 @@ public synchronized void clearResource() {
250258
//allocatedExtraDataPointer is freed when the context is closing
251259
}
252260

261+
private boolean exitIfCancelled() {
262+
if (!cancelOpenIO.get()) {
263+
return false;
264+
}
265+
logger.info("RTMP muxer openIO cancelled for {}", url);
266+
clearResource();
267+
return true;
268+
}
269+
253270
/**
254271
* {@inheritDoc}
255272
*/

src/test/java/io/antmedia/test/MuxerUnitTest.java

Lines changed: 135 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import static org.mockito.Mockito.verify;
6262
import static org.mockito.Mockito.when;
6363

64+
<<<<<<< HEAD
6465
import java.io.File;
6566
import java.io.FileInputStream;
6667
import java.io.FileOutputStream;
@@ -80,6 +81,9 @@
8081
import java.util.concurrent.TimeUnit;
8182
import java.util.concurrent.atomic.AtomicBoolean;
8283
import io.antmedia.*;
84+
=======
85+
import java.lang.reflect.Field;
86+
>>>>>>> origin/master
8387
import com.google.gson.JsonObject;
8488
import io.antmedia.AntMediaApplicationAdapter;
8589
import io.antmedia.AppSettings;
@@ -1068,7 +1072,7 @@ public void testGetAudioCodecParameters() {
10681072
}
10691073

10701074
@Test
1071-
public void testStopRtmpStreamingWhenRtmpMuxerNull() {
1075+
public void testStopRtmpStreamingWhenEndpointMuxerNull() {
10721076
appScope = (WebScope) applicationContext.getBean("web.scope");
10731077
logger.info("Application / web scope: {}", appScope);
10741078
assertTrue(appScope.getDepth() == 1);
@@ -1402,6 +1406,72 @@ public void testRTMPPrepareIO() {
14021406
verify(endpointMuxer2).clearResource();
14031407
}
14041408

1409+
@Test
1410+
public void testRTMPMuxerRaceCondition() {
1411+
appScope = (WebScope) applicationContext.getBean("web.scope");
1412+
vertx = (Vertx) appScope.getContext().getApplicationContext().getBean(IAntMediaStreamHandler.VERTX_BEAN_NAME);
1413+
1414+
EndpointMuxer rtmpMuxer = new RtmpMuxer("rtmp://no_server", vertx);
1415+
rtmpMuxer.init(appScope, "test", 0, null, 0);
1416+
1417+
AVCodecParameters codecParameters = new AVCodecParameters();
1418+
codecParameters.codec_id(AV_CODEC_ID_H264);
1419+
codecParameters.codec_type(AVMEDIA_TYPE_VIDEO);
1420+
AVRational rat = new AVRational().num(1).den(1000);
1421+
assertTrue(rtmpMuxer.addStream(codecParameters, rat, 50));
1422+
1423+
// 1. Test preparedIO reset in clearResource
1424+
assertTrue(rtmpMuxer.prepareIO());
1425+
assertFalse(rtmpMuxer.prepareIO()); // already prepared
1426+
1427+
rtmpMuxer.clearResource(); // headerWritten is false, resets preparedIO
1428+
1429+
// Re-add stream because clearResource cleared outputFormatContext
1430+
assertTrue(rtmpMuxer.addStream(codecParameters, rat, 50));
1431+
assertTrue(rtmpMuxer.prepareIO()); // can prepare again
1432+
1433+
// 2. Test cancelOpenIO race protection
1434+
EndpointMuxer rtmpMuxer2 = new RtmpMuxer("rtmp://no_server", vertx);
1435+
rtmpMuxer2.init(appScope, "test", 0, null, 0);
1436+
assertTrue(rtmpMuxer2.addStream(codecParameters, rat, 50));
1437+
1438+
// Start prepareIO which will call openIO in blocking thread
1439+
rtmpMuxer2.prepareIO();
1440+
// Immediately cancel it
1441+
rtmpMuxer2.writeTrailer();
1442+
1443+
Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> {
1444+
String status = rtmpMuxer2.getStatus();
1445+
return status.equals(IAntMediaStreamHandler.BROADCAST_STATUS_FINISHED) || status.equals(IAntMediaStreamHandler.BROADCAST_STATUS_FAILED);
1446+
});
1447+
1448+
// isRunning should remain false because it was cancelled
1449+
assertFalse(rtmpMuxer2.getIsRunning().get());
1450+
}
1451+
1452+
@Test
1453+
public void testWriteTrailerBeforeHeader() {
1454+
appScope = (WebScope) applicationContext.getBean("web.scope");
1455+
vertx = (Vertx) appScope.getContext().getApplicationContext().getBean(IAntMediaStreamHandler.VERTX_BEAN_NAME);
1456+
1457+
EndpointMuxer rtmpMuxer = new RtmpMuxer("rtmp://no_server", vertx);
1458+
rtmpMuxer.init(appScope, "test", 0, null, 0);
1459+
1460+
// No header written yet
1461+
rtmpMuxer.writeTrailer();
1462+
1463+
assertEquals(IAntMediaStreamHandler.BROADCAST_STATUS_FINISHED, rtmpMuxer.getStatus());
1464+
1465+
AVCodecParameters codecParameters = new AVCodecParameters();
1466+
codecParameters.codec_id(AV_CODEC_ID_H264);
1467+
codecParameters.codec_type(AVMEDIA_TYPE_VIDEO);
1468+
AVRational rat = new AVRational().num(1).den(1000);
1469+
1470+
// It should be able to add stream and prepareIO again because clearResource was called in writeTrailer
1471+
assertTrue(rtmpMuxer.addStream(codecParameters, rat, 50));
1472+
assertTrue(rtmpMuxer.prepareIO());
1473+
}
1474+
14051475

14061476
@Test
14071477
public void testRTMPHealthCheckProcess() {
@@ -6646,7 +6716,70 @@ public void testPrepareFromInputFormatContextForData() throws Exception {
66466716
assertEquals(1, muxAdaptor.getVideoStreamIndex());
66476717
assertEquals(2, muxAdaptor.getAudioStreamIndex());
66486718
}
6649-
6719+
6720+
@Test
6721+
public void testEndpointMuxerPrepareIOCancelledAndNotCancelled() throws Exception {
6722+
appScope = (WebScope) applicationContext.getBean("web.scope");
6723+
vertx = (Vertx) appScope.getContext().getApplicationContext().getBean(IAntMediaStreamHandler.VERTX_BEAN_NAME);
6724+
6725+
// Scenario 1: Test Cancellation (Lines 246-253 and 151-164 cancellation path)
6726+
EndpointMuxer endpointMuxer = Mockito.spy(new EndpointMuxer("rtmp://dummy", vertx));
6727+
6728+
// Mock getOutputFormatContext to return a context with streams
6729+
AVFormatContext outputFormatContext = new AVFormatContext(null);
6730+
avformat_alloc_output_context2(outputFormatContext, null, "flv", null);
6731+
avformat_new_stream(outputFormatContext, null);
6732+
Mockito.doReturn(outputFormatContext).when(endpointMuxer).getOutputFormatContext();
6733+
6734+
// Mock openIO to set cancelOpenIO = true and return true
6735+
Mockito.doAnswer(invocation -> {
6736+
Field cancelField = EndpointMuxer.class.getDeclaredField("cancelOpenIO");
6737+
cancelField.setAccessible(true);
6738+
AtomicBoolean cancel = (AtomicBoolean) cancelField.get(endpointMuxer);
6739+
cancel.set(true);
6740+
return true;
6741+
}).when(endpointMuxer).openIO();
6742+
6743+
// Add element to bsfFilterContextList so it enters the check block
6744+
Field bsfField = Muxer.class.getDeclaredField("bsfFilterContextList");
6745+
bsfField.setAccessible(true);
6746+
List<AVBSFContext> bsfList = (List<AVBSFContext>) bsfField.get(endpointMuxer);
6747+
bsfList.add(new AVBSFContext(null));
6748+
6749+
endpointMuxer.prepareIO();
6750+
6751+
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
6752+
// preparedIO should be false because clearResource is called
6753+
Field preparedField = EndpointMuxer.class.getDeclaredField("preparedIO");
6754+
preparedField.setAccessible(true);
6755+
AtomicBoolean prepared = (AtomicBoolean) preparedField.get(endpointMuxer);
6756+
return !prepared.get() && !endpointMuxer.getIsRunning().get();
6757+
});
6758+
6759+
Mockito.verify(endpointMuxer, Mockito.atLeastOnce()).clearResource();
6760+
6761+
6762+
// Scenario 2: Test Normal Flow (Lines 151-164 normal path)
6763+
EndpointMuxer rtmpMuxer2 = Mockito.spy(new EndpointMuxer("rtmp://dummy2", vertx));
6764+
Mockito.doReturn(outputFormatContext).when(rtmpMuxer2).getOutputFormatContext();
6765+
Mockito.doReturn(true).when(rtmpMuxer2).openIO();
6766+
6767+
bsfList = (List<AVBSFContext>) bsfField.get(rtmpMuxer2);
6768+
bsfList.add(new AVBSFContext(null));
6769+
6770+
rtmpMuxer2.prepareIO();
6771+
6772+
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> {
6773+
return rtmpMuxer2.getIsRunning().get()
6774+
&& rtmpMuxer2.getStatus().equals(IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING);
6775+
});
6776+
6777+
assertTrue(rtmpMuxer2.getIsRunning().get());
6778+
assertEquals(IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING, rtmpMuxer2.getStatus());
6779+
6780+
// Clean up
6781+
avformat_free_context(outputFormatContext);
6782+
}
66506783

66516784

66526785

0 commit comments

Comments
 (0)