Skip to content

Commit 0aefc78

Browse files
committed
Fixes for shared object concurrency
1 parent 38bc95e commit 0aefc78

File tree

12 files changed

+178
-229
lines changed

12 files changed

+178
-229
lines changed

pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<parent>
44
<groupId>org.red5</groupId>
55
<artifactId>red5-parent</artifactId>
6-
<version>1.0.10-M10</version>
6+
<version>1.0.10</version>
77
</parent>
88
<modelVersion>4.0.0</modelVersion>
99
<artifactId>red5-server-common</artifactId>
@@ -236,6 +236,13 @@
236236
<version>${tomcat.version}</version>
237237
<scope>provided</scope>
238238
</dependency>
239+
<!-- future message bus work -->
240+
<!--
241+
<dependency>
242+
<groupId>net.engio</groupId>
243+
<artifactId>mbassador</artifactId>
244+
<version>1.3.2</version>
245+
</dependency> -->
239246
<dependency>
240247
<groupId>junit</groupId>
241248
<artifactId>junit</artifactId>

src/main/java/org/red5/server/AttributeStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public class AttributeStore implements ICastingAttributeStore {
4242
/**
4343
* Map for attributes
4444
*/
45-
protected ConcurrentMap<String, Object> attributes = new ConcurrentAttributesMap<String, Object>(1);
45+
protected ConcurrentMap<String, Object> attributes = new ConcurrentAttributesMap<>(1);
4646

4747
/**
4848
* Creates empty attribute store. Object is not associated with a persistence storage.

src/main/java/org/red5/server/api/statistics/support/StatisticsCounter.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,28 +21,25 @@
2121
import java.util.concurrent.atomic.AtomicInteger;
2222

2323
/**
24-
* Counts numbers used by the statistics. Keeps track of current, maximum and total numbers.
24+
* Counts numbers used by the statistics. Keeps track of current and total numbers.
2525
*
2626
* @author The Red5 Project
2727
* @author Joachim Bauch (jojo@struktur.de)
2828
*/
2929
public class StatisticsCounter {
3030

31-
/** Current number. */
31+
/** Current number */
3232
private AtomicInteger current = new AtomicInteger();
3333

34-
/** Total number. */
35-
private AtomicInteger total = new AtomicInteger();
36-
37-
/** Maximum number. */
38-
private AtomicInteger max = new AtomicInteger();
34+
/** Total number */
35+
private int total;
3936

4037
/**
4138
* Increment statistics by one.
4239
*/
4340
public void increment() {
44-
total.incrementAndGet();
45-
max.compareAndSet(current.intValue(), current.incrementAndGet());
41+
current.incrementAndGet();
42+
total++;
4643
}
4744

4845
/**
@@ -67,24 +64,25 @@ public int getCurrent() {
6764
* @return total
6865
*/
6966
public int getTotal() {
70-
return total.intValue();
67+
return total;
7168
}
7269

7370
/**
74-
* Get maximum number.
71+
* Get maximum number. Use total instead.
7572
*
7673
* @return max
7774
*/
75+
@Deprecated
7876
public int getMax() {
79-
return max.intValue();
77+
return total;
8078
}
8179

8280
/* (non-Javadoc)
8381
* @see java.lang.Object#toString()
8482
*/
8583
@Override
8684
public String toString() {
87-
return "StatisticsCounter [current=" + current + ", total=" + total + ", max=" + max + "]";
85+
return "StatisticsCounter [current=" + current + ", total=" + total + "]";
8886
}
8987

9088
}

src/main/java/org/red5/server/net/rtmp/RTMPConnection.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import java.util.Set;
2828
import java.util.UUID;
2929
import java.util.concurrent.ConcurrentHashMap;
30-
import java.util.concurrent.ConcurrentLinkedQueue;
3130
import java.util.concurrent.ConcurrentMap;
3231
import java.util.concurrent.CopyOnWriteArraySet;
3332
import java.util.concurrent.ScheduledFuture;
@@ -1571,7 +1570,7 @@ public long getPendingVideoMessages(Number streamId) {
15711570
* @param events
15721571
* shared object events
15731572
*/
1574-
public void sendSharedObjectMessage(String name, int currentVersion, boolean persistent, ConcurrentLinkedQueue<ISharedObjectEvent> events) {
1573+
public void sendSharedObjectMessage(String name, int currentVersion, boolean persistent, Set<ISharedObjectEvent> events) {
15751574
// create a new sync message for every client to avoid concurrent access through multiple threads
15761575
SharedObjectMessage syncMessage = state.getEncoding() == Encoding.AMF3 ? new FlexSharedObjectMessage(null, name, currentVersion, persistent) : new SharedObjectMessage(null, name, currentVersion, persistent);
15771576
syncMessage.addEvents(events);

src/main/java/org/red5/server/net/rtmp/event/BaseEvent.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ public void release() {
200200
protected abstract void releaseInternal();
201201

202202
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
203-
type = (Type) in.readObject();
203+
type = Type.valueOf(in.readUTF());
204204
sourceType = in.readByte();
205205
timestamp = in.readInt();
206206
if (log.isTraceEnabled()) {
@@ -212,7 +212,7 @@ public void writeExternal(ObjectOutput out) throws IOException {
212212
if (log.isTraceEnabled()) {
213213
log.trace("writeExternal - type: {} sourceType: {} timestamp: {}", type, sourceType, timestamp);
214214
}
215-
out.writeObject(type);
215+
out.writeUTF(type.name());
216216
out.writeByte(sourceType);
217217
out.writeInt(timestamp);
218218
}

src/main/java/org/red5/server/scope/Scope.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -622,17 +622,17 @@ public IScopeHandler getHandler() {
622622
/** {@inheritDoc} */
623623
@Deprecated
624624
public int getMaxClients() {
625-
return connectionStats.getMax();
625+
return connectionStats.getTotal();
626626
}
627627

628628
/** {@inheritDoc} */
629629
public int getMaxConnections() {
630-
return connectionStats.getMax();
630+
return connectionStats.getTotal();
631631
}
632632

633633
/** {@inheritDoc} */
634634
public int getMaxSubscopes() {
635-
return subscopeStats.getMax();
635+
return subscopeStats.getTotal();
636636
}
637637

638638
/**

src/main/java/org/red5/server/so/ISharedObjectEvent.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ enum Type {
3030
/**
3131
* Returns the type of the event.
3232
*
33-
* @return the type of the event.
33+
* @return the type of the event
3434
*/
3535
public Type getType();
3636

@@ -43,13 +43,7 @@ enum Type {
4343
* <li>the attribute name to delete for DELETE_ATTRIBUTE</li>
4444
* <li>the handler name to call for SEND_MESSAGE</li>
4545
* </ul>
46-
* In all other cases the key is
47-
*
48-
* <pre>
49-
* null
50-
* </pre>
51-
*
52-
* .
46+
* In all other cases the key is null
5347
*
5448
* @return the key of the event
5549
*/
@@ -63,13 +57,7 @@ enum Type {
6357
* <li>the attribute value to set for SET_ATTRIBUTE</li>
6458
* <li>a list of parameters to pass to the handler for SEND_MESSAGE</li>
6559
* </ul>
66-
* In all other cases the value is
67-
*
68-
* <pre>
69-
* null
70-
* </pre>
71-
*
72-
* .
60+
* In all other cases the value is null
7361
*
7462
* @return the value of the event
7563
*/

src/main/java/org/red5/server/so/ISharedObjectMessage.java

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
package org.red5.server.so;
2020

21-
import java.util.Queue;
21+
import java.util.Set;
2222

2323
import org.red5.server.net.rtmp.event.IRTMPEvent;
2424

@@ -53,28 +53,25 @@ public interface ISharedObjectMessage extends IRTMPEvent {
5353
*
5454
* @return set of ISharedObjectEvents
5555
*/
56-
public Queue<ISharedObjectEvent> getEvents();
56+
public Set<ISharedObjectEvent> getEvents();
5757

5858
/**
5959
* Addition event handler
6060
*
61-
* @param type
62-
* Event type
63-
* @param key
64-
* Handler key
65-
* @param value
66-
* Event value (like arguments)
61+
* @param type Event type
62+
* @param key Handler key
63+
* @param value Event value (like arguments)
6764
* @return true if event is added and false if it is not added
6865
*/
6966
public boolean addEvent(ISharedObjectEvent.Type type, String key, Object value);
7067

7168
/**
7269
* Add event handler
7370
*
74-
* @param event
75-
* SO event
71+
* @param event SO event
72+
* @return true if event is added and false if it is not added
7673
*/
77-
public void addEvent(ISharedObjectEvent event);
74+
public boolean addEvent(ISharedObjectEvent event);
7875

7976
/**
8077
* Clear shared object
@@ -84,17 +81,7 @@ public interface ISharedObjectMessage extends IRTMPEvent {
8481
/**
8582
* Is empty?
8683
*
87-
* @return <pre>
88-
* true
89-
* </pre>
90-
*
91-
* if shared object is empty,
92-
*
93-
* <pre>
94-
* false
95-
* </pre>
96-
*
97-
* otherwise
84+
* @return true if shared object is empty, false otherwise
9885
*/
9986
public boolean isEmpty();
10087

0 commit comments

Comments
 (0)