@@ -45,6 +45,10 @@ public class ConnectionConsumer implements IPushableConsumer, IPipeConnectionLis
4545
4646 private static final Logger log = LoggerFactory .getLogger (ConnectionConsumer .class );
4747
48+ private static final boolean isTrace = log .isTraceEnabled ();
49+
50+ private static final boolean isDebug = log .isDebugEnabled ();
51+
4852 /**
4953 * Connection consumer class name
5054 */
@@ -122,8 +126,12 @@ public void pushMessage(IPipe pipe, IMessage message) {
122126 if (message instanceof ResetMessage ) {
123127 //ignore
124128 } else if (message instanceof StatusMessage ) {
125- StatusMessage statusMsg = (StatusMessage ) message ;
126- data .sendStatus (statusMsg .getBody ());
129+ if (data != null ) {
130+ StatusMessage statusMsg = (StatusMessage ) message ;
131+ data .sendStatus (statusMsg .getBody ());
132+ } else {
133+ log .warn ("Channel data is null" );
134+ }
127135 } else if (message instanceof RTMPMessage ) {
128136 // make sure chunk size has been sent
129137 sendChunkSize ();
@@ -143,7 +151,7 @@ public void pushMessage(IPipe pipe, IMessage message) {
143151 }
144152 // get the data type
145153 byte dataType = msg .getDataType ();
146- if (log . isTraceEnabled () ) {
154+ if (isTrace ) {
147155 log .trace ("Data type: {} source type: {}" , dataType , ((BaseEvent ) msg ).getSourceType ());
148156 }
149157 // create a new header for the consumer if the message.body doesnt already have one
@@ -155,7 +163,11 @@ public void pushMessage(IPipe pipe, IMessage message) {
155163 switch (dataType ) {
156164 case Constants .TYPE_AGGREGATE :
157165 //log.trace("Aggregate data");
158- data .write (msg );
166+ if (data != null ) {
167+ data .write (msg );
168+ } else {
169+ log .warn ("Channel data is null, aggregate data was not written" );
170+ }
159171 break ;
160172 case Constants .TYPE_AUDIO_DATA :
161173 //log.trace("Audio data");
@@ -190,28 +202,35 @@ public void pushMessage(IPipe pipe, IMessage message) {
190202 conn .ping (ping );
191203 break ;
192204 case Constants .TYPE_STREAM_METADATA :
193- if (log . isTraceEnabled () ) {
205+ if (isTrace ) {
194206 log .trace ("Meta data: {}" , (Notify ) msg );
195207 }
196- //Notify notify = new Notify(((Notify) msg).getData().asReadOnlyBuffer());
197- Notify notify = (Notify ) msg ;
198- notify .setHeader (header );
199- notify .setTimestamp (header .getTimer ());
200- data .write (notify );
208+ if (data != null ) {
209+ Notify notify = (Notify ) msg ;
210+ notify .setHeader (header );
211+ notify .setTimestamp (header .getTimer ());
212+ data .write (notify );
213+ } else {
214+ log .warn ("Channel data is null, metadata was not written" );
215+ }
201216 break ;
202217 case Constants .TYPE_FLEX_STREAM_SEND :
203- //if (log.isTraceEnabled() ) {
218+ //if (isTrace ) {
204219 //log.trace("Flex stream send: {}", (Notify) msg);
205220 //}
206- FlexStreamSend send = null ;
207- if (msg instanceof FlexStreamSend ) {
208- send = (FlexStreamSend ) msg ;
221+ if (data != null ) {
222+ FlexStreamSend send = null ;
223+ if (msg instanceof FlexStreamSend ) {
224+ send = (FlexStreamSend ) msg ;
225+ } else {
226+ send = new FlexStreamSend (((Notify ) msg ).getData ().asReadOnlyBuffer ());
227+ }
228+ send .setHeader (header );
229+ send .setTimestamp (header .getTimer ());
230+ data .write (send );
209231 } else {
210- send = new FlexStreamSend ((( Notify ) msg ). getData (). asReadOnlyBuffer () );
232+ log . warn ( "Channel data is null, flex stream data was not written" );
211233 }
212- send .setHeader (header );
213- send .setTimestamp (header .getTimer ());
214- data .write (send );
215234 break ;
216235 case Constants .TYPE_BYTES_READ :
217236 //log.trace("Bytes read");
@@ -222,11 +241,15 @@ public void pushMessage(IPipe pipe, IMessage message) {
222241 break ;
223242 default :
224243 //log.trace("Default: {}", dataType);
225- data .write (msg );
244+ if (data != null ) {
245+ data .write (msg );
246+ } else {
247+ log .warn ("Channel data is null, data type: {} was not written" , dataType );
248+ }
226249 }
227250 } else {
228251 log .debug ("Unhandled push message: {}" , message );
229- if (log . isTraceEnabled () ) {
252+ if (isTrace ) {
230253 Class <? extends IMessage > clazz = message .getClass ();
231254 log .trace ("Class info - name: {} declaring: {} enclosing: {}" , new Object [] { clazz .getName (), clazz .getDeclaringClass (), clazz .getEnclosingClass () });
232255 }
@@ -236,6 +259,7 @@ public void pushMessage(IPipe pipe, IMessage message) {
236259 /** {@inheritDoc} */
237260 public void onPipeConnectionEvent (PipeConnectionEvent event ) {
238261 if (event .getType ().equals (PipeConnectionEvent .EventType .PROVIDER_DISCONNECT )) {
262+ log .debug ("Provider disconnected" );
239263 closeChannels ();
240264 }
241265 }
@@ -287,9 +311,15 @@ private void sendChunkSize() {
287311 * Close all the channels
288312 */
289313 private void closeChannels () {
290- conn .closeChannel (video .getId ());
291- conn .closeChannel (audio .getId ());
292- conn .closeChannel (data .getId ());
314+ if (video != null ) {
315+ conn .closeChannel (video .getId ());
316+ }
317+ if (audio != null ) {
318+ conn .closeChannel (audio .getId ());
319+ }
320+ if (data != null ) {
321+ conn .closeChannel (data .getId ());
322+ }
293323 }
294324
295325}
0 commit comments