@@ -188,21 +188,21 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
188188 *
189189 * @see org.red5.server.net.rtmp.Channel
190190 */
191- private transient ConcurrentMap <Integer , Channel > channels = new ConcurrentHashMap <Integer , Channel >(channelsInitalCapacity , 0.9f , channelsConcurrencyLevel );
191+ private transient ConcurrentMap <Integer , Channel > channels = new ConcurrentHashMap <>(channelsInitalCapacity , 0.9f , channelsConcurrencyLevel );
192192
193193 /**
194194 * Queues of tasks for every channel
195195 *
196196 * @see org.red5.server.net.rtmp.ReceivedMessageTaskQueue
197197 */
198- private final transient ConcurrentMap <Integer , ReceivedMessageTaskQueue > tasksByStreams = new ConcurrentHashMap <Integer , ReceivedMessageTaskQueue >(streamsInitalCapacity , 0.9f , streamsConcurrencyLevel );
198+ private final transient ConcurrentMap <Integer , ReceivedMessageTaskQueue > tasksByStreams = new ConcurrentHashMap <>(streamsInitalCapacity , 0.9f , streamsConcurrencyLevel );
199199
200200 /**
201201 * Client streams
202202 *
203203 * @see org.red5.server.api.stream.IClientStream
204204 */
205- private transient ConcurrentMap <Number , IClientStream > streams = new ConcurrentHashMap <Number , IClientStream >(streamsInitalCapacity , 0.9f , streamsConcurrencyLevel );
205+ private transient ConcurrentMap <Number , IClientStream > streams = new ConcurrentHashMap <>(streamsInitalCapacity , 0.9f , streamsConcurrencyLevel );
206206
207207 /**
208208 * Reserved stream ids. Stream id's directly relate to individual NetStream instances.
@@ -217,14 +217,14 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
217217 /**
218218 * Hash map that stores pending calls and ids as pairs.
219219 */
220- private transient ConcurrentMap <Integer , IPendingServiceCall > pendingCalls = new ConcurrentHashMap <Integer , IPendingServiceCall >(pendingCallsInitalCapacity , 0.75f , pendingCallsConcurrencyLevel );
220+ private transient ConcurrentMap <Integer , IPendingServiceCall > pendingCalls = new ConcurrentHashMap <>(pendingCallsInitalCapacity , 0.75f , pendingCallsConcurrencyLevel );
221221
222222 /**
223223 * Deferred results set.
224224 *
225225 * @see org.red5.server.net.rtmp.DeferredResult
226226 */
227- private transient CopyOnWriteArraySet <DeferredResult > deferredResults = new CopyOnWriteArraySet <DeferredResult >();
227+ private transient CopyOnWriteArraySet <DeferredResult > deferredResults = new CopyOnWriteArraySet <>();
228228
229229 /**
230230 * Last ping round trip time
@@ -274,7 +274,7 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
274274 /**
275275 * Map for pending video packets keyed by stream id.
276276 */
277- private transient ConcurrentMap <Number , AtomicInteger > pendingVideos = new ConcurrentHashMap <Number , AtomicInteger >(1 , 0.9f , 1 );
277+ private transient ConcurrentMap <Number , AtomicInteger > pendingVideos = new ConcurrentHashMap <>(1 , 0.9f , 1 );
278278
279279 /**
280280 * Number of (NetStream) streams used.
@@ -284,7 +284,7 @@ public abstract class RTMPConnection extends BaseConnection implements IStreamCa
284284 /**
285285 * Remembered stream buffer durations.
286286 */
287- private transient ConcurrentMap <Number , Integer > streamBuffers = new ConcurrentHashMap <Number , Integer >(1 , 0.9f , 1 );
287+ private transient ConcurrentMap <Number , Integer > streamBuffers = new ConcurrentHashMap <>(1 , 0.9f , 1 );
288288
289289 /**
290290 * Maximum time in milliseconds to wait for a valid handshake.
@@ -503,10 +503,12 @@ public void startWaitForHandshake() {
503503 log .debug ("startWaitForHandshake - {}" , sessionId );
504504 }
505505 // start the handshake checker after maxHandshakeTimeout milliseconds
506- try {
507- waitForHandshakeTask = scheduler .schedule (new WaitForHandshakeTask (), new Date (System .currentTimeMillis () + maxHandshakeTimeout ));
508- } catch (TaskRejectedException e ) {
509- log .error ("WaitForHandshake task was rejected for {}" , sessionId , e );
506+ if (scheduler != null ) {
507+ try {
508+ waitForHandshakeTask = scheduler .schedule (new WaitForHandshakeTask (), new Date (System .currentTimeMillis () + maxHandshakeTimeout ));
509+ } catch (TaskRejectedException e ) {
510+ log .error ("WaitForHandshake task was rejected for {}" , sessionId , e );
511+ }
510512 }
511513 }
512514
@@ -996,10 +998,8 @@ public void close() {
996998 // dump memory stats
997999 log .trace ("Memory at close - free: {}K total: {}K" , Runtime .getRuntime ().freeMemory () / 1024 , Runtime .getRuntime ().totalMemory () / 1024 );
9981000 }
999- } else {
1000- if (log .isDebugEnabled ()) {
1001- log .debug ("Already closing.." );
1002- }
1001+ } else if (log .isDebugEnabled ()) {
1002+ log .debug ("Already closing.." );
10031003 }
10041004 }
10051005
@@ -1365,52 +1365,52 @@ public String messageTypeToName(byte headerDataType) {
13651365 /**
13661366 * Handle the incoming message.
13671367 *
1368- * @param message
1369- * message
1368+ * @param packet
1369+ * incoming message packet
13701370 */
1371- public void handleMessageReceived (Packet message ) {
1371+ public void handleMessageReceived (Packet packet ) {
13721372 if (log .isTraceEnabled ()) {
13731373 log .trace ("handleMessageReceived - {}" , sessionId );
13741374 }
1375- final byte dataType = message .getHeader ().getDataType ();
1376- // route these types outside the executor
1377- switch (dataType ) {
1378- case Constants .TYPE_PING :
1379- case Constants .TYPE_ABORT :
1380- case Constants .TYPE_BYTES_READ :
1381- case Constants .TYPE_CHUNK_SIZE :
1382- case Constants .TYPE_CLIENT_BANDWIDTH :
1383- case Constants .TYPE_SERVER_BANDWIDTH :
1384- // pass message to the handler
1385- try {
1386- handler .messageReceived (this , message );
1387- } catch (Exception e ) {
1388- log .error ("Error processing received message {}" , sessionId , e );
1389- }
1390- break ;
1391- default :
1392- if (executor != null ) {
1393- final String messageType = getMessageType (message );
1375+ // set the packet expiration time if maxHandlingTimeout is not disabled (set to 0)
1376+ if (maxHandlingTimeout > 0 ) {
1377+ packet .setExpirationTime (System .currentTimeMillis () + maxHandlingTimeout );
1378+ }
1379+ if (executor != null ) {
1380+ final byte dataType = packet .getHeader ().getDataType ();
1381+ // route these types outside the executor
1382+ switch (dataType ) {
1383+ case Constants .TYPE_PING :
1384+ case Constants .TYPE_ABORT :
1385+ case Constants .TYPE_BYTES_READ :
1386+ case Constants .TYPE_CHUNK_SIZE :
1387+ case Constants .TYPE_CLIENT_BANDWIDTH :
1388+ case Constants .TYPE_SERVER_BANDWIDTH :
1389+ // pass message to the handler
1390+ try {
1391+ handler .messageReceived (this , packet );
1392+ } catch (Exception e ) {
1393+ log .error ("Error processing received message {}" , sessionId , e );
1394+ }
1395+ break ;
1396+ default :
1397+ final String messageType = getMessageType (packet );
13941398 try {
13951399 // increment the packet number
13961400 final long packetNumber = packetSequence .incrementAndGet ();
13971401 if (executorQueueSizeToDropAudioPackets > 0 && currentQueueSize .get () >= executorQueueSizeToDropAudioPackets ) {
1398- if (message .getHeader ().getDataType () == Constants .TYPE_AUDIO_DATA ) {
1402+ if (packet .getHeader ().getDataType () == Constants .TYPE_AUDIO_DATA ) {
13991403 // if there's a backlog of messages in the queue. Flash might have sent a burst of messages after a network congestion. Throw away packets that we are able to discard.
14001404 log .info ("Queue threshold reached. Discarding packet: session=[{}], msgType=[{}], packetNum=[{}]" , sessionId , messageType , packetNumber );
14011405 return ;
14021406 }
14031407 }
1404- // set the packet expiration time if maxHandlingTimeout is not disabled (set to 0)
1405- if (maxHandlingTimeout > 0 ) {
1406- message .setExpirationTime (System .currentTimeMillis () + maxHandlingTimeout );
1407- }
1408- int streamId = message .getHeader ().getStreamId ().intValue ();
1408+ int streamId = packet .getHeader ().getStreamId ().intValue ();
14091409 if (log .isTraceEnabled ()) {
1410- log .trace ("Handling message for streamId: {}, channelId: {} Channels: {}" , streamId , message .getHeader ().getChannelId (), channels );
1410+ log .trace ("Handling message for streamId: {}, channelId: {} Channels: {}" , streamId , packet .getHeader ().getChannelId (), channels );
14111411 }
14121412 // create a task to setProcessing the message
1413- ReceivedMessageTask task = new ReceivedMessageTask (sessionId , message , handler , this );
1413+ ReceivedMessageTask task = new ReceivedMessageTask (sessionId , packet , handler , this );
14141414 task .setPacketNumber (packetNumber );
14151415 // create a task queue
14161416 ReceivedMessageTaskQueue newStreamTasks = new ReceivedMessageTaskQueue (streamId , this );
@@ -1426,13 +1426,19 @@ public void handleMessageReceived(Packet message) {
14261426 } catch (Exception e ) {
14271427 log .error ("Incoming message handling failed on session=[" + sessionId + "], messageType=[" + messageType + "]" , e );
14281428 if (log .isDebugEnabled ()) {
1429- log .debug ("Execution rejected on {} - {}" , getSessionId () , RTMP .states [getStateCode ()]);
1429+ log .debug ("Execution rejected on {} - {}" , sessionId , RTMP .states [getStateCode ()]);
14301430 log .debug ("Lock permits - decode: {} encode: {}" , decoderLock .availablePermits (), encoderLock .availablePermits ());
14311431 }
14321432 }
1433- } else {
1434- log .warn ("Executor is null on {} state: {}" , getSessionId (), RTMP .states [getStateCode ()]);
1435- }
1433+ }
1434+ } else {
1435+ log .debug ("Executor is null on {} state: {}" , sessionId , RTMP .states [getStateCode ()]);
1436+ // pass message to the handler
1437+ try {
1438+ handler .messageReceived (this , packet );
1439+ } catch (Exception e ) {
1440+ log .error ("Error processing received message {} state: {}" , sessionId , RTMP .states [getStateCode ()], e );
1441+ }
14361442 }
14371443 }
14381444
@@ -1452,7 +1458,7 @@ public void onTaskRemoved(ReceivedMessageTaskQueue queue) {
14521458 private void processTasksQueue (final ReceivedMessageTaskQueue currentStreamTasks ) {
14531459 int streamId = currentStreamTasks .getStreamId ();
14541460 if (log .isTraceEnabled ()) {
1455- log .trace ("Process tasks for streamId {}" , streamId );
1461+ log .trace ("Process tasks for streamId {}" , streamId );
14561462 }
14571463 final ReceivedMessageTask task = currentStreamTasks .getTaskToProcess ();
14581464 if (task != null ) {
0 commit comments