2020 */
2121package org .freedesktop .gstreamer ;
2222
23- import java .util .HashMap ;
2423import java .util .List ;
2524import java .util .Locale ;
26- import java .util .Map ;
27- import java .util .concurrent .ConcurrentHashMap ;
2825import java .util .concurrent .CopyOnWriteArrayList ;
2926import java .util .logging .Level ;
3027import java .util .logging .Logger ;
@@ -78,14 +75,9 @@ public class Bus extends GstObject {
7875 private static final SyncCallback SYNC_CALLBACK = new SyncCallback ();
7976
8077 private final Object lock = new Object ();
81- private Map <Class <?>, Map <Object , MessageProxy >> signalListeners ;
82- private List <MessageProxy > messageProxies = new CopyOnWriteArrayList <>();
78+ private final List <MessageProxy <?>> messageProxies = new CopyOnWriteArrayList <>();
8379 private boolean watchAdded = false ;
84- private BusSyncHandler syncHandler = new BusSyncHandler () {
85- public BusSyncReply syncMessage (Message msg ) {
86- return BusSyncReply .PASS ;
87- }
88- };
80+ private BusSyncHandler syncHandler = null ;
8981
9082 /**
9183 * This constructor is used internally by gstreamer-java
@@ -473,8 +465,14 @@ public void setSyncHandler(BusSyncHandler handler) {
473465 * @param callback The callback to call when the signal is emitted.
474466 */
475467 private <T > void connect (Class <T > listenerClass , T listener , BusCallback callback ) {
476- final String signal = listenerClass .getSimpleName ().toLowerCase (Locale .ROOT ).replace ('_' , '-' );
477- connect (signal , listenerClass , listener , callback );
468+ String className = listenerClass .getSimpleName ();
469+ MessageType type ;
470+ if ("MESSAGE" .equals (className )) {
471+ type = MessageType .ANY ;
472+ } else {
473+ type = MessageType .valueOf (listenerClass .getSimpleName ());
474+ }
475+ addMessageProxy (type , listenerClass , listener , callback );
478476 }
479477
480478 /**
@@ -483,89 +481,71 @@ private <T> void connect(Class<T> listenerClass, T listener, BusCallback callbac
483481 * This differs to {@link GObject#connect} in that it hooks up Bus signals
484482 * to the sync callback, not the generic GObject signal mechanism.
485483 *
484+ * @param <T> listener type
486485 * @param signal the name of the signal to connect to.
487486 * @param listenerClass the class of the {@code listener}
488487 * @param listener the listener to associate with the {@code callback}
489488 * @param callback the callback to call when the signal is emitted.
490489 */
491490 @ Override
492- public synchronized <T > void connect (String signal , Class <T > listenerClass , T listener ,
491+ public <T > void connect (String signal , Class <T > listenerClass , T listener ,
493492 final Callback callback ) {
494493 if (listenerClass .getEnclosingClass () != Bus .class ) {
495494 super .connect (signal , listenerClass , listener , callback );
496- return ;
497- }
498- MessageType type ;
499- if ("message" .equals (signal )) {
500- type = MessageType .ANY ;
501495 } else {
502- //@TODO refactor to stop unnecessary String operations
503- type = MessageType .valueOf (signal .toUpperCase (Locale .ROOT ).replace ('-' , '_' ));
504- }
505- final Map <Class <?>, Map <Object , MessageProxy >> signals = getListenerMap ();
506- // @TODO this was using type so doubtful ever worked
507- // these maps needs relooking at!
508- Map <Object , MessageProxy > m = signals .get (listenerClass );
509- if (m == null ) {
510- m = new HashMap <Object , MessageProxy >();
511- signals .put (listenerClass , m );
496+ MessageType type ;
497+ if ("message" .equals (signal )) {
498+ type = MessageType .ANY ;
499+ } else {
500+ type = MessageType .valueOf (signal .toUpperCase (Locale .ROOT ).replace ('-' , '_' ));
501+ }
502+ addMessageProxy (type , listenerClass , listener , (BusCallback ) callback );
512503 }
513- MessageProxy proxy = new MessageProxy (type , (BusCallback ) callback );
514- m .put (listener , proxy );
515- messageProxies .add (proxy );
516-
504+ }
505+
506+ private synchronized <T > void addMessageProxy (MessageType type ,
507+ Class <T > listenerClass ,
508+ T listener ,
509+ BusCallback callback ) {
510+ messageProxies .add (new MessageProxy (type , listenerClass , listener , callback ));
517511 addWatch ();
518512 }
519513
520514 @ Override
521- public synchronized <T > void disconnect (Class <T > listenerClass , T listener ) {
515+ public <T > void disconnect (Class <T > listenerClass , T listener ) {
522516 if (listenerClass .getEnclosingClass () != Bus .class ) {
523517 super .disconnect (listenerClass , listener );
524- return ;
518+ } else {
519+ removeMessageProxy (listenerClass , listener );
525520 }
526- final Map <Class <?>, Map <Object , MessageProxy >> signals = getListenerMap ();
527- Map <Object , MessageProxy > m = signals .get (listenerClass );
528- if (m != null ) {
529- MessageProxy proxy = m .remove (listener );
530- if (proxy != null ) {
531- messageProxies .remove (proxy );
532- }
533- if (m .isEmpty ()) {
534- removeWatch ();
535- signals .remove (listenerClass );
536- }
521+ }
522+
523+ private synchronized <T > void removeMessageProxy (Class <T > listenerClass , T listener ) {
524+ messageProxies .removeIf (p -> p .listener == listener );
525+ if (messageProxies .isEmpty ()) {
526+ removeWatch ();
537527 }
538528 }
539529
540530 /**
541531 * Dispatches a message to all interested listeners.
542- *
543532 * <p>
544533 * We do this here from a sync callback, because the default gstbus dispatch
545534 * uses the default main context to signal that there are messages waiting
546535 * on the bus. Since that is used by the GTK L&F under swing, we never get
547536 * those notifications, and the messages just queue up.
548537 *
549- * @param message
550538 */
551539 private void dispatchMessage (Message msg ) {
552- // Dispatch to listeners
553- messageProxies .forEach ((listener ) -> {
540+ messageProxies .forEach (p -> {
554541 try {
555- (( MessageProxy ) listener ) .busMessage (this , msg );
542+ p .busMessage (this , msg );
556543 } catch (Throwable t ) {
557544 LOG .log (Level .SEVERE , "Exception thrown by bus message handler" , t );
558545 }
559546 });
560547 }
561548
562- private final Map <Class <?>, Map <Object , MessageProxy >> getListenerMap () {
563- if (signalListeners == null ) {
564- signalListeners = new ConcurrentHashMap <Class <?>, Map <Object , MessageProxy >>();
565- }
566- return signalListeners ;
567- }
568-
569549 @ Override
570550 public void dispose () {
571551 removeWatch ();
@@ -845,17 +825,21 @@ public static interface MESSAGE {
845825 public void busMessage (Bus bus , Message message );
846826 }
847827
848- private static class MessageProxy implements MESSAGE {
828+ private static class MessageProxy < T > {
849829
850830 private final MessageType type ;
831+ private final Class <T > listenerClass ;
832+ private final Object listener ;
851833 private final BusCallback callback ;
852834
853- public MessageProxy (MessageType type , BusCallback callback ) {
835+ MessageProxy (MessageType type , Class < T > listenerClass , T listener , BusCallback callback ) {
854836 this .type = type ;
837+ this .listenerClass = listenerClass ;
838+ this .listener = listener ;
855839 this .callback = callback ;
856840 }
857841
858- public void busMessage (final Bus bus , final Message msg ) {
842+ void busMessage (final Bus bus , final Message msg ) {
859843 if (type == MessageType .ANY || type == msg .getType ()) {
860844 callback .callback (bus , msg , null );
861845 }
0 commit comments