Skip to content

Commit 6edb9f1

Browse files
authored
[ZEPPELIN-6260] Fix memory leak in WebSocket watcher connections
### What is this PR for? This PR fixes a memory leak issue where WebSocket connections that are switched to watcher mode are never removed from the watcherSockets queue when the connection is closed. This causes the queue to grow indefinitely, leading to increased memory usage over time and potential OutOfMemoryError in long-running Zeppelin servers. ### What type of PR is it? Bug Fix ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-6260 ### How should this be tested? Steps to reproduce the issue: 1. Open a WebSocket connection to Zeppelin 2. Send a WATCHER message to switch the connection to watcher mode 3. Close the connection 4. The connection remains in watcherSockets queue indefinitely Verification after fix: - Unit tests have been added to verify the fix: - removeWatcherConnectionCleansQueue: Basic functionality test - removeWatcherConnectionWithMultipleWatchers: Tests selective removal - removeWatcherConnectionConcurrentTest: Tests thread safety - switchConnectionToWatcherAndRemove: Tests complete lifecycle ### Screenshots (if appropriate) N/A ### Questions: - Does the license files need update? No - Is there breaking changes for older versions? No - Does this needs documentation? No ### Description of changes: 1. Added removeWatcherConnection() method to ConnectionManager to safely remove connections from the watcherSockets queue 2. Modified NotebookServer.removeConnection() to call removeWatcherConnection() when any connection is closed 3. Added debug logging to track watcher connection removal 4. Added comprehensive unit tests including concurrent access scenarios The fix is minimal and safe: - The ConcurrentLinkedQueue.remove() operation is safe even if the element doesn't exist - The synchronization block is very short, minimizing performance impact - The approach ensures no watcher connections are leaked, regardless of how they were closed ### Related observations: During the investigation, I noticed that broadcastToWatchers() doesn't remove watchers when IOException occurs. This could cause performance degradation as closed connections would repeatedly fail. However, this is a separate issue and should be addressed in a different PR to keep this fix focused. Closes #5001 from renechoi/ZEPPELIN-6260-fix-watcher-memory-leak. Signed-off-by: Philipp Dallig <philipp.dallig@gmail.com>
1 parent 1a66333 commit 6edb9f1

File tree

3 files changed

+120
-0
lines changed

3 files changed

+120
-0
lines changed

zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,14 @@ public void removeUserConnection(String user, NotebookSocket conn) {
175175
}
176176
}
177177

178+
public void removeWatcherConnection(NotebookSocket conn) {
179+
synchronized (watcherSockets) {
180+
if (watcherSockets.remove(conn)) {
181+
LOGGER.debug("Removed watcher connection: {}", conn);
182+
}
183+
}
184+
}
185+
178186
public String getAssociatedNoteId(NotebookSocket socket) {
179187
String associatedNoteId = null;
180188
synchronized (noteSocketMap) {

zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,7 @@ public void onClose(Session session, CloseReason closeReason) {
503503

504504
private void removeConnection(NotebookSocket notebookSocket) {
505505
connectionManager.removeConnection(notebookSocket);
506+
connectionManager.removeWatcherConnection(notebookSocket);
506507
connectionManager.removeConnectionFromAllNote(notebookSocket);
507508
connectionManager.removeUserConnection(notebookSocket.getUser(), notebookSocket);
508509
}

zeppelin-server/src/test/java/org/apache/zeppelin/socket/ConnectionManagerTest.java

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,21 @@
1717
package org.apache.zeppelin.socket;
1818

1919
import static org.junit.jupiter.api.Assertions.assertEquals;
20+
import static org.junit.jupiter.api.Assertions.assertFalse;
21+
import static org.junit.jupiter.api.Assertions.assertTrue;
2022
import static org.mockito.Mockito.mock;
23+
import static org.mockito.Mockito.when;
24+
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.TimeUnit;
2131

2232
import org.apache.zeppelin.conf.ZeppelinConfiguration;
2333
import org.apache.zeppelin.notebook.AuthorizationService;
34+
import org.apache.zeppelin.util.WatcherSecurityKey;
2435
import org.junit.jupiter.api.Test;
2536

2637
class ConnectionManagerTest {
@@ -57,4 +68,104 @@ void checkMapGrowRemoveAll() {
5768
manager.removeConnectionFromAllNote(socket);
5869
assertEquals(0, manager.noteSocketMap.size());
5970
}
71+
72+
@Test
73+
void removeWatcherConnectionCleansQueue() {
74+
AuthorizationService authService = mock(AuthorizationService.class);
75+
76+
ConnectionManager manager = new ConnectionManager(authService, ZeppelinConfiguration.load());
77+
NotebookSocket socket = mock(NotebookSocket.class);
78+
79+
manager.watcherSockets.add(socket);
80+
assertEquals(1, manager.watcherSockets.size());
81+
82+
manager.removeWatcherConnection(socket);
83+
assertEquals(0, manager.watcherSockets.size());
84+
}
85+
86+
@Test
87+
void removeWatcherConnectionWithMultipleWatchers() {
88+
AuthorizationService authService = mock(AuthorizationService.class);
89+
90+
ConnectionManager manager = new ConnectionManager(authService, ZeppelinConfiguration.load());
91+
NotebookSocket socket1 = mock(NotebookSocket.class);
92+
NotebookSocket socket2 = mock(NotebookSocket.class);
93+
NotebookSocket socket3 = mock(NotebookSocket.class);
94+
95+
// Add multiple watchers
96+
manager.watcherSockets.add(socket1);
97+
manager.watcherSockets.add(socket2);
98+
manager.watcherSockets.add(socket3);
99+
assertEquals(3, manager.watcherSockets.size());
100+
101+
// Remove only socket2
102+
manager.removeWatcherConnection(socket2);
103+
assertEquals(2, manager.watcherSockets.size());
104+
assertTrue(manager.watcherSockets.contains(socket1));
105+
assertFalse(manager.watcherSockets.contains(socket2));
106+
assertTrue(manager.watcherSockets.contains(socket3));
107+
}
108+
109+
@Test
110+
void removeWatcherConnectionConcurrentTest() throws InterruptedException {
111+
AuthorizationService authService = mock(AuthorizationService.class);
112+
ConnectionManager manager = new ConnectionManager(authService, ZeppelinConfiguration.load());
113+
114+
int threadCount = 10;
115+
List<NotebookSocket> sockets = new ArrayList<>();
116+
117+
// Create and add multiple watcher sockets
118+
for (int i = 0; i < threadCount; i++) {
119+
NotebookSocket socket = mock(NotebookSocket.class);
120+
sockets.add(socket);
121+
manager.watcherSockets.add(socket);
122+
}
123+
124+
assertEquals(threadCount, manager.watcherSockets.size());
125+
126+
// Remove sockets concurrently
127+
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
128+
CountDownLatch latch = new CountDownLatch(threadCount);
129+
130+
for (NotebookSocket socket : sockets) {
131+
executor.submit(() -> {
132+
manager.removeWatcherConnection(socket);
133+
latch.countDown();
134+
});
135+
}
136+
137+
// Wait for all threads to complete
138+
assertTrue(latch.await(5, TimeUnit.SECONDS));
139+
executor.shutdown();
140+
141+
// Verify all sockets were removed
142+
assertEquals(0, manager.watcherSockets.size());
143+
}
144+
145+
@Test
146+
void switchConnectionToWatcherAndRemove() {
147+
AuthorizationService authService = mock(AuthorizationService.class);
148+
ConnectionManager manager = new ConnectionManager(authService, ZeppelinConfiguration.load());
149+
150+
NotebookSocket socket = mock(NotebookSocket.class);
151+
when(socket.getUser()).thenReturn("testUser");
152+
when(socket.getHeader(WatcherSecurityKey.HTTP_HEADER)).thenReturn(WatcherSecurityKey.getKey());
153+
154+
// Add socket as regular connection first
155+
manager.addConnection(socket);
156+
manager.addUserConnection("testUser", socket);
157+
158+
// Switch to watcher
159+
manager.switchConnectionToWatcher(socket);
160+
161+
// Verify it's in watcher queue
162+
assertTrue(manager.watcherSockets.contains(socket));
163+
assertFalse(manager.connectedSockets.contains(socket));
164+
165+
// Remove watcher connection
166+
manager.removeWatcherConnection(socket);
167+
168+
// Verify it's completely removed
169+
assertFalse(manager.watcherSockets.contains(socket));
170+
}
60171
}

0 commit comments

Comments
 (0)