Skip to content

Commit 0966df3

Browse files
committed
OCP-DEMO Infinispan Server side event
1 parent 8a89372 commit 0966df3

File tree

9 files changed

+214
-1
lines changed

9 files changed

+214
-1
lines changed

openshift/message-board/message-service/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,12 @@
8181
<scope>provided</scope>
8282
</dependency>
8383

84+
<dependency>
85+
<groupId>org.jboss.spec.javax.websocket</groupId>
86+
<artifactId>jboss-websocket-api_1.1_spec</artifactId>
87+
<scope>provided</scope>
88+
</dependency>
89+
8490
<!-- Log -->
8591
<dependency>
8692
<groupId>org.slf4j</groupId>

openshift/message-board/message-service/src/main/java/org/hibernate/demo/message/post/core/cdi/ResourcesProducer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010
import javax.enterprise.inject.Produces;
1111
import javax.enterprise.inject.spi.InjectionPoint;
1212
import javax.persistence.EntityManager;
13+
import javax.persistence.EntityManagerFactory;
1314
import javax.persistence.PersistenceContext;
15+
import javax.persistence.PersistenceUnit;
1416

1517
import org.slf4j.Logger;
1618
import org.slf4j.LoggerFactory;
@@ -27,6 +29,10 @@ public class ResourcesProducer {
2729
@PersistenceContext
2830
private EntityManager em;
2931

32+
@Produces
33+
@PersistenceUnit
34+
private EntityManagerFactory emf;
35+
3036
@Produces
3137
private Logger produceLog(InjectionPoint injectionPoint) {
3238
return LoggerFactory.getLogger( injectionPoint.getMember().getDeclaringClass().getName());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package org.hibernate.demo.message.post.core.event;
2+
3+
import java.io.IOException;
4+
import java.util.ArrayList;
5+
import java.util.List;
6+
import java.util.Set;
7+
import javax.ejb.Asynchronous;
8+
import javax.ejb.Stateless;
9+
import javax.inject.Inject;
10+
import javax.websocket.Session;
11+
12+
import org.hibernate.demo.message.post.core.entity.Message;
13+
import org.hibernate.demo.message.post.core.repo.BoardRepo;
14+
import org.hibernate.ogm.datastore.infinispanremote.impl.protostream.ProtostreamId;
15+
16+
import org.infinispan.client.hotrod.annotation.ClientCacheEntryCreated;
17+
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
18+
import org.infinispan.client.hotrod.annotation.ClientCacheEntryRemoved;
19+
import org.infinispan.client.hotrod.annotation.ClientListener;
20+
import org.infinispan.client.hotrod.event.ClientCacheEntryCreatedEvent;
21+
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
22+
import org.infinispan.client.hotrod.event.ClientCacheEntryRemovedEvent;
23+
24+
import org.slf4j.Logger;
25+
26+
import com.fasterxml.jackson.databind.ObjectMapper;
27+
28+
@ClientListener
29+
@Stateless
30+
@Asynchronous
31+
public class BoardEventListener {
32+
33+
@Inject
34+
private Logger log;
35+
36+
@Inject
37+
private BoardSessionHolder sessions;
38+
39+
@Inject
40+
private BoardRepo boards;
41+
42+
@Inject
43+
private ObjectMapper jackson;
44+
45+
@ClientCacheEntryCreated
46+
public void boardCreated(ClientCacheEntryCreatedEvent e) {
47+
log.info( "Board created event {}", e );
48+
notifyOpenBoards( (ProtostreamId) e.getKey(), false );
49+
}
50+
51+
@ClientCacheEntryModified
52+
public void boardModified(ClientCacheEntryModifiedEvent e) {
53+
log.info( "Board modified event: {}", e );
54+
notifyOpenBoards( (ProtostreamId) e.getKey(), false );
55+
}
56+
57+
@ClientCacheEntryRemoved
58+
public void boardRemoved(ClientCacheEntryRemovedEvent e) {
59+
log.info( "Board deleted event: {}", e );
60+
notifyOpenBoards( (ProtostreamId) e.getKey(), true );
61+
}
62+
63+
private void notifyOpenBoards(ProtostreamId key, boolean removed) {
64+
String username = (String) key.columnValues[0];
65+
Set<Session> sessions = this.sessions.findByUser( username );
66+
if ( sessions.isEmpty() ) {
67+
log.warn( "No listening session, on board {}", username );
68+
return;
69+
}
70+
71+
List<Message> messagesByUser = ( removed ) ? new ArrayList<>() : boards.find( username ).getMessages();
72+
try {
73+
String json = jackson.writer().writeValueAsString( messagesByUser );
74+
log.info( "Sending message to {} webSockets listening on board ({}). Payload: {}.", sessions.size(), username, json );
75+
for ( Session session : sessions ) {
76+
if ( session.isOpen() ) {
77+
session.getBasicRemote().sendText( json );
78+
}
79+
else {
80+
sessions.remove( session );
81+
}
82+
}
83+
}
84+
catch (IOException e) {
85+
throw new RuntimeException( "Error notifying Board WebSockets", e );
86+
}
87+
}
88+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package org.hibernate.demo.message.post.core.event;
2+
3+
import javax.annotation.PostConstruct;
4+
import javax.ejb.Singleton;
5+
import javax.ejb.Startup;
6+
import javax.inject.Inject;
7+
import javax.persistence.EntityManagerFactory;
8+
9+
import org.hibernate.engine.spi.SessionFactoryImplementor;
10+
import org.hibernate.ogm.datastore.infinispanremote.impl.InfinispanRemoteDatastoreProvider;
11+
import org.hibernate.ogm.datastore.spi.DatastoreProvider;
12+
13+
import org.infinispan.client.hotrod.RemoteCache;
14+
15+
@Singleton
16+
@Startup
17+
public class BoardEventStarter {
18+
19+
@Inject
20+
private EntityManagerFactory emf;
21+
22+
@Inject
23+
private BoardEventListener boardEventListener;
24+
25+
@PostConstruct
26+
public void registerBoardListener() {
27+
InfinispanRemoteDatastoreProvider provider = (InfinispanRemoteDatastoreProvider) emf.unwrap( SessionFactoryImplementor.class )
28+
.getServiceRegistry().getService( DatastoreProvider.class );
29+
30+
RemoteCache<Object, Object> board = provider.getCache( "Board" );
31+
board.addClientListener( boardEventListener );
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package org.hibernate.demo.message.post.core.event;
2+
3+
import java.util.Map;
4+
import java.util.Set;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.stream.Collectors;
7+
import javax.enterprise.context.ApplicationScoped;
8+
import javax.websocket.Session;
9+
10+
@ApplicationScoped
11+
public class BoardSessionHolder {
12+
13+
// if a pod dies, the browser will create a new session on another node,
14+
// so we don't need to clustering session items
15+
private Map<Session, String> sessionMap = new ConcurrentHashMap<>();
16+
17+
public void addSession(Session session, String user) {
18+
sessionMap.put( session, user );
19+
}
20+
21+
public void removeSession(Session session) {
22+
sessionMap.remove( session );
23+
}
24+
25+
public Set<Session> findByUser(String username) {
26+
return sessionMap.entrySet().stream()
27+
.filter( entry -> username.equals( entry.getValue() ) )
28+
.map( map -> map.getKey() )
29+
.collect( Collectors.toSet() );
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.hibernate.demo.message.post.core.event;
2+
3+
import javax.inject.Inject;
4+
import javax.websocket.EndpointConfig;
5+
import javax.websocket.OnClose;
6+
import javax.websocket.OnError;
7+
import javax.websocket.OnMessage;
8+
import javax.websocket.OnOpen;
9+
import javax.websocket.Session;
10+
import javax.websocket.server.ServerEndpoint;
11+
12+
import org.slf4j.Logger;
13+
14+
@ServerEndpoint("/board")
15+
public class BoardWebSocket {
16+
17+
@Inject
18+
private BoardSessionHolder sessions;
19+
20+
@Inject
21+
private Logger log;
22+
23+
@OnOpen
24+
public void open(Session session, EndpointConfig config) {
25+
log.info( "Add new session {} with config ", session, config);
26+
}
27+
28+
@OnMessage
29+
public void message(String user, Session session) {
30+
log.info( "Add new session {} receiving message user {}", session, user );
31+
// session becomes active only when we have the name of the owner of the board to listen to
32+
sessions.addSession( session, user );
33+
}
34+
35+
@OnClose
36+
public void close(Session session) {
37+
log.info( "Remove session {}", session );
38+
sessions.removeSession( session );
39+
}
40+
41+
@OnError
42+
public void error(Throwable error) {
43+
log.error( "Error on websocket client", error );
44+
}
45+
}

openshift/message-board/message-service/src/main/java/org/hibernate/demo/message/post/core/service/HealthCheck.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public class HealthCheck {
2525
@GET
2626
@Produces(MediaType.APPLICATION_JSON)
2727
public List<Message> healthCheck() {
28-
log.info( "healthCheck invoked by OCP" );
28+
log.trace( "healthCheck invoked by OCP" );
2929

3030
return service.findMessagesByUser( "andrea" );
3131
}

openshift/message-board/message-service/src/main/java/org/hibernate/demo/message/post/core/service/json/JacksonConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ public class JacksonConfig implements ContextResolver<ObjectMapper> {
2020

2121
private static final String DATE_FORMAT = "dd-MM-yy HH:mm:ss";
2222

23+
@javax.enterprise.inject.Produces
2324
private ObjectMapper objectMapper;
2425

2526
public JacksonConfig() {

openshift/message-board/message-service/src/main/webapp/WEB-INF/jboss-deployment-structure.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
<dependencies>
66
<module name="org.hibernate.ogm" services="export" />
77
<module name="org.hibernate.ogm.infinispan-remote" services="export" />
8+
9+
<!-- used by client listener -->
10+
<module name="org.infinispan.client.hotrod" slot="ispn-9.3"/>
811
</dependencies>
912
<exclusions>
1013
<!-- We have to exclude the Jackson1 provided module, -->

0 commit comments

Comments
 (0)