1515import java .util .stream .Collectors ;
1616
1717public class JanusClient implements WebSocket .Listener {
18- private static final Logger logger = LoggerFactory .getLogger (JanusClient .class );
19- private static final long DEFAULT_CONNECTION_TIMEOUT_MS = 10_000 ; // 10 seconds
20- private static final long SERVER_INFO_TIMEOUT_MS = 20_000 ; // 20 seconds for server info
21- private static final long KEEP_ALIVE_INTERVAL_SECONDS = 45 ; // 45 seconds for keep-alive
18+ private static final Logger logger = LoggerFactory .getLogger (JanusClient .class );
19+ private static final long DEFAULT_CONNECTION_TIMEOUT_MS = 10_000 ; // 10 seconds
20+ private static final long SERVER_INFO_TIMEOUT_MS = 20_000 ; // 20 seconds for server info
21+ private static final long KEEP_ALIVE_INTERVAL_SECONDS = 45 ; // 45 seconds for keep-alive
22+ private final Map <Long , JanusSession > sessions = new ConcurrentHashMap <>();
23+ private final Map <Long , ScheduledFuture <?>> keepAliveTasks = new ConcurrentHashMap <>();
24+ private final StringBuilder messageBuffer = new StringBuilder ();
2225 private final JanusConfiguration config ;
2326 private final HttpClient httpClient ;
2427 private WebSocket webSocket ;
2528 private final ExecutorService executor ;
26- private final Map <Long , JanusSession > sessions = new ConcurrentHashMap <>();
2729
2830 private final ScheduledExecutorService keepAliveScheduler ;
29- private final Map < Long , ScheduledFuture <?>> keepAliveTasks = new ConcurrentHashMap <>();
31+
3032 private final TransactionManager transactionManager ;
31- private final StringBuilder messageBuffer = new StringBuilder ();
33+
3234
3335 public JanusClient (JanusConfiguration config ) {
34- this .config = config ;
36+ this .config = config ;
3537 this .transactionManager = new TransactionManager ();
36- this .executor = Executors .newVirtualThreadPerTaskExecutor ();
37- this .httpClient = HttpClient .newBuilder ().executor (this .executor ).build ();
38+ this .executor = Executors .newVirtualThreadPerTaskExecutor ();
39+ this .httpClient = HttpClient .newBuilder ().executor (this .executor ).build ();
3840 this .keepAliveScheduler = Executors .newScheduledThreadPool (1 );
3941
40- try {
42+ try {
4143 logger .info ("Starting connection attempt..." );
4244 connect ().get ();
4345 logger .info ("Connection established, retrieving server info..." );
44-
45- // Get server info
4646 ServerInfo serverInfo = getServerInfo ().get ();
47- logger .info ("Server Info:\n Janus={}, \n Version={}, \n Plugins={}" ,
48- serverInfo .janus (),
49- serverInfo .versionString (),
50- serverInfo .plugins ().keySet ());
47+ if (config .isLogEnabled ()) {
48+ logger .info ("Server Info:\n Janus={}, \n Version={}, \n Plugins={}" ,
49+ serverInfo .janus (),
50+ serverInfo .versionString (),
51+ serverInfo .plugins ().keySet ());
52+ }
5153
5254 } catch (InterruptedException e ) {
5355 logger .info ("Program interrupted, shutting down." );
@@ -70,6 +72,7 @@ public CompletableFuture<Void> connect() {
7072 throw new JanusException ("Connection failed" , throwable );
7173 });
7274 }
75+
7376 private void scheduleKeepAlive (long sessionId ) {
7477 ScheduledFuture <?> keepAliveTask = keepAliveScheduler .scheduleAtFixedRate (() -> {
7578 try {
@@ -78,21 +81,24 @@ private void scheduleKeepAlive(long sessionId) {
7881 keepAlive .put ("session_id" , sessionId );
7982 keepAlive .put ("transaction" , transactionManager .createTransaction ());
8083 sendMessage (keepAlive );
81- logger .info ("Sent keep-alive for session {}" , sessionId );
84+ if (config .isLogEnabled ()) {
85+ logger .info ("Sent keep-alive for session {}" , sessionId );
86+ }
8287 } catch (Exception e ) {
8388 logger .error ("Failed to send keep-alive for session {}: {}" , sessionId , e .getMessage (), e );
8489 }
8590 }, KEEP_ALIVE_INTERVAL_SECONDS , KEEP_ALIVE_INTERVAL_SECONDS , TimeUnit .SECONDS );
8691 keepAliveTasks .put (sessionId , keepAliveTask );
8792 logger .debug ("Scheduled keep-alive task for session {}" , sessionId );
8893 }
94+
8995 public void disconnect () {
9096 // 1. Stop keep-alive tasks and shut down the scheduler
91- logger .debug ("Shutting down keep-alive scheduler..." );
97+ logger .info ("Shutting down keep-alive scheduler..." );
9298 keepAliveTasks .values ().forEach (task -> task .cancel (false ));
9399 keepAliveTasks .clear ();
94100 keepAliveScheduler .shutdown ();
95-
101+
96102 // 2. Close WebSocket connection
97103 if (webSocket != null && !webSocket .isOutputClosed ()) {
98104 try {
@@ -102,11 +108,12 @@ public void disconnect() {
102108 logger .warn ("Error during graceful WebSocket disconnect: {}" , e .getMessage ());
103109 }
104110 }
105-
106- // 3. Shut down the main executor
107- logger .debug ("Shutting down main executor..." );
111+ if (config .isLogEnabled ()) {
112+ // 3. Shut down the main executor
113+ logger .info ("Shutting down main executor..." );
114+ }
108115 executor .shutdown ();
109-
116+
110117 // 4. Await termination of both schedulers
111118 try {
112119 if (!executor .awaitTermination (5 , TimeUnit .SECONDS )) {
@@ -130,12 +137,14 @@ public void onOpen(WebSocket webSocket) {
130137
131138 @ Override
132139 public CompletionStage <?> onText (WebSocket webSocket , CharSequence data , boolean last ) {
133- logger .debug ("Received WebSocket message fragment: length={}, last={}" , data .length (), last );
140+ if (config .isLogEnabled ()) {
141+ logger .info ("Received WebSocket message fragment: length={}, last={}" , data .length (), last );
142+ }
134143 messageBuffer .append (data );
135144
136145 if (last ) {
137146 String completeMessage = messageBuffer .toString ();
138- if (config .isLogEnabled ()){
147+ if (config .isLogEnabled ()) {
139148 logger .debug ("Message From Janus: {}" , completeMessage );
140149 }
141150 executor .submit (() -> processMessage (completeMessage ));
@@ -160,7 +169,7 @@ private void processMessage(String message) {
160169 }
161170
162171 JSONObject json = new JSONObject (message );
163- // logger.info("Processing JSON: {}", json.toString(2));
172+ // logger.info("Processing JSON: {}", json.toString(2));
164173
165174 String transactionId = json .optString ("transaction" , null );
166175 if (transactionId != null && !transactionId .isEmpty ()) {
@@ -212,7 +221,9 @@ public CompletableFuture<JanusSession> createSession() {
212221 long sessionId = response .getJSONObject ("data" ).getLong ("id" );
213222 JanusSession session = new JanusSession (this , sessionId );
214223 sessions .put (sessionId , session );
215- logger .info ("Session created, session ID={}" , sessionId );
224+ if (config .isLogEnabled ()) {
225+ logger .info ("Session created, session ID={}" , sessionId );
226+ }
216227 scheduleKeepAlive (sessionId );
217228 return session ;
218229 });
@@ -225,9 +236,9 @@ public CompletableFuture<ServerInfo> getServerInfo() {
225236 JSONObject request = new JSONObject ();
226237 request .put ("janus" , "info" );
227238 request .put ("transaction" , transactionId );
228-
229- logger .debug ("Sending server info request: {}" , request .toString ());
230-
239+ if ( config . isLogEnabled ()) {
240+ logger .info ("Sending server info request: {}" , request .toString ());
241+ }
231242 sendMessage (request );
232243 return future .orTimeout (SERVER_INFO_TIMEOUT_MS , TimeUnit .MILLISECONDS )
233244 .thenApply (this ::convertToServerInfo )
@@ -334,7 +345,9 @@ public void sendMessage(JSONObject message) {
334345 throw new IllegalStateException ("WebSocket is not connected." );
335346 }
336347 String msgStr = message .toString ();
337- logger .debug ("Sending message: {}" , msgStr );
348+ if (config .isLogEnabled ()) {
349+ logger .info ("Sending message: {}" , msgStr );
350+ }
338351 webSocket .sendText (msgStr , true );
339352 }
340353
0 commit comments