Skip to content

Commit 016d8f4

Browse files
authored
[ZEPPELIN-5926] Remove map entries if Collection is empty (#4612)
* Remove map entries if Collection is empty * Add license header to new class
1 parent 7ed5a05 commit 016d8f4

File tree

3 files changed

+111
-32
lines changed

3 files changed

+111
-32
lines changed

zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java

Lines changed: 44 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
import java.util.Date;
4848
import java.util.HashMap;
4949
import java.util.HashSet;
50-
import java.util.LinkedList;
50+
import java.util.Iterator;
5151
import java.util.List;
5252
import java.util.Map;
5353
import java.util.Map.Entry;
@@ -69,7 +69,7 @@ public class ConnectionManager {
6969

7070
final Queue<NotebookSocket> connectedSockets = Metrics.gaugeCollectionSize("zeppelin_connected_sockets", Tags.empty(), new ConcurrentLinkedQueue<>());
7171
// noteId -> connection
72-
final Map<String, List<NotebookSocket>> noteSocketMap = Metrics.gaugeMapSize("zeppelin_note_sockets", Tags.empty(), new HashMap<>());
72+
final Map<String, Set<NotebookSocket>> noteSocketMap = Metrics.gaugeMapSize("zeppelin_note_sockets", Tags.empty(), new HashMap<>());
7373
// user -> connection
7474
final Map<String, Queue<NotebookSocket>> userSocketMap = Metrics.gaugeMapSize("zeppelin_user_sockets", Tags.empty(), new HashMap<>());
7575

@@ -107,11 +107,9 @@ public void addNoteConnection(String noteId, NotebookSocket socket) {
107107
synchronized (noteSocketMap) {
108108
// make sure a socket relates only an single note.
109109
removeConnectionFromAllNote(socket);
110-
List<NotebookSocket> socketList = noteSocketMap.computeIfAbsent(noteId, k -> new LinkedList<>());
111-
if (!socketList.contains(socket)) {
112-
socketList.add(socket);
113-
}
114-
checkCollaborativeStatus(noteId, socketList);
110+
Set<NotebookSocket> sockets = noteSocketMap.computeIfAbsent(noteId, k -> new HashSet<>());
111+
sockets.add(socket);
112+
checkCollaborativeStatus(noteId, sockets);
115113
}
116114
}
117115

@@ -124,11 +122,33 @@ public void removeNoteConnection(String noteId) {
124122
public void removeNoteConnection(String noteId, NotebookSocket socket) {
125123
LOGGER.debug("Remove connection {} from note: {}", socket, noteId);
126124
synchronized (noteSocketMap) {
127-
List<NotebookSocket> socketList = noteSocketMap.getOrDefault(noteId, Collections.emptyList());
128-
if (!socketList.isEmpty()) {
129-
socketList.remove(socket);
125+
Set<NotebookSocket> sockets = noteSocketMap.getOrDefault(noteId, Collections.emptySet());
126+
removeNoteConnection(noteId, sockets, socket);
127+
// Remove empty socket collection from map
128+
if (sockets.isEmpty()) {
129+
noteSocketMap.remove(noteId);
130+
}
131+
}
132+
}
133+
134+
private void removeNoteConnection(String noteId, Set<NotebookSocket> sockets,
135+
NotebookSocket socket) {
136+
sockets.remove(socket);
137+
checkCollaborativeStatus(noteId, sockets);
138+
}
139+
140+
public void removeConnectionFromAllNote(NotebookSocket socket) {
141+
LOGGER.debug("Remove connection {} from all notes", socket);
142+
synchronized (noteSocketMap) {
143+
Iterator<Entry<String, Set<NotebookSocket>>> iterator = noteSocketMap.entrySet().iterator();
144+
while (iterator.hasNext()) {
145+
Entry<String, Set<NotebookSocket>> noteSocketMapEntry = iterator.next();
146+
removeNoteConnection(noteSocketMapEntry.getKey(), noteSocketMapEntry.getValue(), socket);
147+
// Remove empty socket collection from map
148+
if (noteSocketMapEntry.getValue().isEmpty()) {
149+
iterator.remove();
150+
}
130151
}
131-
checkCollaborativeStatus(noteId, socketList);
132152
}
133153
}
134154

@@ -147,7 +167,11 @@ public void addUserConnection(String user, NotebookSocket conn) {
147167
public void removeUserConnection(String user, NotebookSocket conn) {
148168
LOGGER.debug("Remove user connection {} for user: {}", conn, user);
149169
if (userSocketMap.containsKey(user)) {
150-
userSocketMap.get(user).remove(conn);
170+
Queue<NotebookSocket> connections = userSocketMap.get(user);
171+
connections.remove(conn);
172+
if (connections.isEmpty()) {
173+
userSocketMap.remove(user);
174+
}
151175
} else {
152176
LOGGER.warn("Closing connection that is absent in user connections");
153177
}
@@ -156,7 +180,7 @@ public void removeUserConnection(String user, NotebookSocket conn) {
156180
public String getAssociatedNoteId(NotebookSocket socket) {
157181
String associatedNoteId = null;
158182
synchronized (noteSocketMap) {
159-
for (Entry<String, List<NotebookSocket>> noteSocketMapEntry : noteSocketMap.entrySet()) {
183+
for (Entry<String, Set<NotebookSocket>> noteSocketMapEntry : noteSocketMap.entrySet()) {
160184
if (noteSocketMapEntry.getValue().contains(socket)) {
161185
associatedNoteId = noteSocketMapEntry.getKey();
162186
}
@@ -166,16 +190,7 @@ public String getAssociatedNoteId(NotebookSocket socket) {
166190
return associatedNoteId;
167191
}
168192

169-
public void removeConnectionFromAllNote(NotebookSocket socket) {
170-
synchronized (noteSocketMap) {
171-
Set<String> noteIds = noteSocketMap.keySet();
172-
for (String noteId : noteIds) {
173-
removeNoteConnection(noteId, socket);
174-
}
175-
}
176-
}
177-
178-
private void checkCollaborativeStatus(String noteId, List<NotebookSocket> socketList) {
193+
private void checkCollaborativeStatus(String noteId, Set<NotebookSocket> socketList) {
179194
if (!collaborativeModeEnable.booleanValue()) {
180195
return;
181196
}
@@ -219,11 +234,11 @@ public void broadcast(String noteId, Message m) {
219234
List<NotebookSocket> socketsToBroadcast;
220235
synchronized (noteSocketMap) {
221236
broadcastToWatchers(noteId, StringUtils.EMPTY, m);
222-
List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
223-
if (socketLists == null || socketLists.isEmpty()) {
237+
Set<NotebookSocket> sockets = noteSocketMap.get(noteId);
238+
if (sockets == null || sockets.isEmpty()) {
224239
return;
225240
}
226-
socketsToBroadcast = new ArrayList<>(socketLists);
241+
socketsToBroadcast = new ArrayList<>(sockets);
227242
}
228243
LOGGER.debug("SEND >> {}", m);
229244
for (NotebookSocket conn : socketsToBroadcast) {
@@ -256,11 +271,11 @@ public void broadcastExcept(String noteId, Message m, NotebookSocket exclude) {
256271
List<NotebookSocket> socketsToBroadcast;
257272
synchronized (noteSocketMap) {
258273
broadcastToWatchers(noteId, StringUtils.EMPTY, m);
259-
List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
260-
if (socketLists == null || socketLists.isEmpty()) {
274+
Set<NotebookSocket> socketSet = noteSocketMap.get(noteId);
275+
if (socketSet == null || socketSet.isEmpty()) {
261276
return;
262277
}
263-
socketsToBroadcast = new ArrayList<>(socketLists);
278+
socketsToBroadcast = new ArrayList<>(socketSet);
264279
}
265280

266281
LOGGER.debug("SEND >> {}", m);
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.zeppelin.socket;
18+
19+
import static org.junit.jupiter.api.Assertions.assertEquals;
20+
import static org.mockito.Mockito.mock;
21+
import org.apache.zeppelin.notebook.AuthorizationService;
22+
import org.junit.jupiter.api.Test;
23+
24+
class ConnectionManagerTest {
25+
26+
@Test
27+
void checkMapGrow() {
28+
AuthorizationService authService = mock(AuthorizationService.class);
29+
30+
ConnectionManager manager = new ConnectionManager(authService);
31+
NotebookSocket socket = mock(NotebookSocket.class);
32+
manager.addNoteConnection("test", socket);
33+
assertEquals(1, manager.noteSocketMap.size());
34+
// Remove Connection from wrong note
35+
manager.removeNoteConnection("test1", socket);
36+
assertEquals(1, manager.noteSocketMap.size());
37+
// Remove Connection from right note
38+
manager.removeNoteConnection("test", socket);
39+
assertEquals(0, manager.noteSocketMap.size());
40+
41+
manager.addUserConnection("TestUser", socket);
42+
assertEquals(1, manager.userSocketMap.size());
43+
manager.removeUserConnection("TestUser", socket);
44+
assertEquals(0, manager.userSocketMap.size());
45+
}
46+
47+
@Test
48+
void checkMapGrowRemoveAll() {
49+
AuthorizationService authService = mock(AuthorizationService.class);
50+
51+
ConnectionManager manager = new ConnectionManager(authService);
52+
NotebookSocket socket = mock(NotebookSocket.class);
53+
manager.addNoteConnection("test", socket);
54+
assertEquals(1, manager.noteSocketMap.size());
55+
manager.removeConnectionFromAllNote(socket);
56+
assertEquals(0, manager.noteSocketMap.size());
57+
}
58+
}

zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package org.apache.zeppelin.socket;
1818

19-
import static java.util.Arrays.asList;
2019
import static org.junit.jupiter.api.Assertions.assertEquals;
2120
import static org.junit.jupiter.api.Assertions.assertFalse;
2221
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -42,6 +41,7 @@
4241
import java.util.HashSet;
4342
import java.util.List;
4443
import java.util.Map;
44+
import java.util.Set;
4545

4646
import javax.servlet.http.HttpServletRequest;
4747

@@ -566,7 +566,10 @@ void bindAngularObjectToRemoteForParagraphs() throws Exception {
566566
.put("noteId", "noteId")
567567
.put("paragraphId", "paragraphId"));
568568

569-
notebookServer.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn));
569+
Set<NotebookSocket> sockets = new HashSet<>();
570+
sockets.add(otherConn);
571+
sockets.add(conn);
572+
notebookServer.getConnectionManager().noteSocketMap.put("noteId", sockets);
570573

571574
// When
572575
notebookServer.angularObjectClientBind(conn, messageReceived);
@@ -619,7 +622,10 @@ void unbindAngularObjectFromRemoteForParagraphs() throws Exception {
619622
.put("noteId", "noteId")
620623
.put("paragraphId", "paragraphId"));
621624

622-
notebookServer.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn));
625+
Set<NotebookSocket> sockets = new HashSet<>();
626+
sockets.add(otherConn);
627+
sockets.add(conn);
628+
notebookServer.getConnectionManager().noteSocketMap.put("noteId", sockets);
623629

624630
// When
625631
notebookServer.angularObjectClientUnbind(conn, messageReceived);

0 commit comments

Comments
 (0)