Skip to content

Commit 93268d1

Browse files
feat/BE: setup websocket and implement broadcast message to user after moving columns
* setup websocket and implement broadcast message to user after moved column * test: unit test for join websocket room and move column
1 parent 1c9c894 commit 93268d1

28 files changed

+623
-59
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
.idea
2+
.vscode
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package actors
2+
3+
import dto.websocket.OutgoingMessage
4+
import org.apache.pekko.actor.{Actor, ActorRef, Props}
5+
import play.api.Logger
6+
import play.api.libs.json.{JsValue, Json}
7+
8+
/**
9+
* Actor that manages WebSocket connections for a specific project.
10+
*/
11+
object ProjectActor {
12+
def props(projectId: Int): Props = Props(new ProjectActor(projectId))
13+
14+
case class Join(userId: Int, out: ActorRef)
15+
case class Leave(userId: Int)
16+
case class Broadcast(message: OutgoingMessage)
17+
}
18+
19+
/**
20+
* Actor that manages WebSocket connections for a specific project.
21+
* It keeps track of connected users and broadcasts messages to them.
22+
*
23+
* @param projectId the ID of the project
24+
*/
25+
class ProjectActor(projectId: Int) extends Actor {
26+
import ProjectActor._
27+
28+
private var members = Map.empty[Int, ActorRef]
29+
30+
def receive: Receive = {
31+
case Join(userId, out) =>
32+
members += userId -> out
33+
Logger("actors").info(
34+
s"UserId $userId joined project $projectId. Total members: ${members.size}"
35+
)
36+
37+
case Leave(userId) =>
38+
members -= userId
39+
Logger("actors").info(
40+
s"UserId with $userId left project $projectId. Total members: ${members.size}"
41+
)
42+
43+
case Broadcast(msg) =>
44+
val js: JsValue = Json.toJson(msg)
45+
members.values.foreach(_ ! js)
46+
Logger("actors").info(
47+
s"Broadcasted message to project $projectId members: ${members.size} users"
48+
)
49+
}
50+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package actors
2+
3+
import org.apache.pekko.actor.{Actor, ActorRef, Props}
4+
import dto.websocket.OutgoingMessage
5+
import modules.ActorNames
6+
7+
object ProjectActorRegistry {
8+
def props: Props = Props(new ProjectActorRegistry)
9+
10+
case class GetProjectActor(projectId: Int)
11+
case class BroadcastToProject(projectId: Int, message: OutgoingMessage)
12+
}
13+
14+
class ProjectActorRegistry extends Actor {
15+
import ProjectActorRegistry._
16+
import ProjectActor._
17+
18+
private var projectActors = Map.empty[Int, ActorRef]
19+
20+
def receive: Receive = {
21+
case GetProjectActor(projectId) =>
22+
// create new ProjectActor if not exists
23+
val actor = projectActors.getOrElse(projectId, {
24+
val newActor = context.actorOf(ProjectActor.props(projectId), s"${ActorNames.ProjectActorPrefix}$projectId")
25+
projectActors += projectId -> newActor
26+
newActor
27+
})
28+
// return ActorRef for requester
29+
sender() ! actor
30+
31+
case BroadcastToProject(projectId, message) =>
32+
projectActors.get(projectId).foreach(_ ! Broadcast(message))
33+
}
34+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package actors
2+
3+
import org.apache.pekko.actor.{Actor, ActorRef, Props}
4+
import org.apache.pekko.pattern.{ask, pipe}
5+
import org.apache.pekko.util.Timeout
6+
7+
import scala.concurrent.ExecutionContext
8+
import scala.concurrent.duration._
9+
10+
/**
11+
* Actor that manages a WebSocket connection for a user in a specific project.
12+
*/
13+
object ProjectClientActor {
14+
def props(out: ActorRef,
15+
userId: Int,
16+
projectId: Int,
17+
registry: ActorRef): Props =
18+
Props(new ProjectClientActor(out, userId, projectId, registry))
19+
20+
/**
21+
* Message indicating that the ProjectActor has been found in the registry.
22+
* @param projectRef the ActorRef of the ProjectActor
23+
*/
24+
private case class RegistryFound(projectRef: ActorRef)
25+
}
26+
27+
/**
28+
* Actor that manages a WebSocket connection for a user in a specific project.
29+
* It registers the user with the ProjectActor upon creation and deregisters
30+
* upon termination.
31+
*
32+
* @param out the ActorRef to send messages to the WebSocket
33+
* @param userId the ID of the user
34+
* @param projectId the ID of the project
35+
* @param registry the ActorRef of the ProjectActorRegistry
36+
*/
37+
class ProjectClientActor(out: ActorRef,
38+
userId: Int,
39+
projectId: Int,
40+
registry: ActorRef)
41+
extends Actor {
42+
import ProjectActor._
43+
import ProjectActorRegistry._
44+
import ProjectClientActor._
45+
46+
// Execution context and timeout for ask pattern
47+
implicit val ec: ExecutionContext = context.dispatcher
48+
implicit val timeout: Timeout = Timeout(3.seconds)
49+
50+
// Reference to the ProjectActor, once found
51+
private var projectRefOpt: Option[ActorRef] = None
52+
53+
// On start, ask the registry for the ProjectActor
54+
override def preStart(): Unit = {
55+
(registry ? GetProjectActor(projectId))
56+
.mapTo[ActorRef]
57+
.map(RegistryFound) pipeTo self
58+
}
59+
60+
// On stop, inform the ProjectActor that the user is leaving
61+
override def postStop(): Unit = {
62+
projectRefOpt.foreach(_ ! Leave(userId))
63+
super.postStop()
64+
}
65+
66+
// Handle incoming messages
67+
def receive: Receive = {
68+
case RegistryFound(projectRef) =>
69+
projectRefOpt = Some(projectRef)
70+
projectRef ! Join(userId, out)
71+
}
72+
}

backend/app/controllers/AuthenticatedAction.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package controllers
22

3+
import org.apache.pekko.stream.scaladsl.Flow
4+
import play.api.libs.json.JsValue
35
import play.api.mvc._
46
import services.{CookieService, JwtService, UserToken}
57

@@ -62,3 +64,35 @@ class AuthenticatedActionWithUser @Inject()(
6264
}
6365
}
6466
}
67+
68+
class AuthenticatedWebSocket @Inject()(
69+
jwtService: JwtService,
70+
cookieService: CookieService
71+
)(implicit ec: ExecutionContext) {
72+
73+
def apply(
74+
block: UserToken Future[Either[Result, Flow[JsValue, JsValue, _]]]
75+
): WebSocket = {
76+
WebSocket.acceptOrResult[JsValue, JsValue] { requestHeader
77+
cookieService.getTokenFromRequest(requestHeader) match {
78+
case Some(token)
79+
jwtService.validateToken(token) match {
80+
case Success(userToken)
81+
block(userToken)
82+
case Failure(ex)
83+
Future.successful(
84+
Left(
85+
Results
86+
.Unauthorized(s"Invalid token: ${ex.getMessage}")
87+
.withCookies(cookieService.createExpiredAuthCookie())
88+
)
89+
)
90+
}
91+
case None
92+
Future.successful(
93+
Left(Results.Unauthorized("No authentication token found"))
94+
)
95+
}
96+
}
97+
}
98+
}

backend/app/controllers/ColumnController.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,15 +112,15 @@ class ColumnController @Inject()(
112112
}
113113
}
114114

115-
/** PATCH /columns/:columnId/position */
116-
def updatePosition(columnId: Int): Action[JsValue] =
115+
/** PATCH /projects/:projectId/columns/:columnId/position */
116+
def updatePosition(projectId: Int, columnId: Int): Action[JsValue] =
117117
authenticatedActionWithUser.async(parse.json) { request =>
118118
implicit val messages: Messages = request.messages
119119
val updatedBy = request.userToken.userId
120120
handleJsonValidation[UpdateColumnPositionRequest](request.body) {
121-
updatePositionDto =>
121+
updateColumnPositionDto =>
122122
columnService
123-
.updatePosition(columnId, updatePositionDto, updatedBy)
123+
.updatePosition(projectId, columnId, updateColumnPositionDto, updatedBy)
124124
.map { _ =>
125125
Ok(
126126
Json.toJson(
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package controllers
2+
3+
import modules.ActorNames
4+
import org.apache.pekko.actor.{ActorRef, ActorSystem}
5+
import org.apache.pekko.stream.Materializer
6+
import play.api.libs.json.JsValue
7+
import play.api.libs.streams.ActorFlow
8+
import play.api.mvc._
9+
import services.ProjectService
10+
11+
import javax.inject._
12+
import scala.concurrent.ExecutionContext
13+
14+
@Singleton
15+
class WebSocketController @Inject()(
16+
cc: ControllerComponents,
17+
authenticatedWebSocket: AuthenticatedWebSocket,
18+
@Named(ActorNames.ProjectActorRegistry) projectActorRegistry: ActorRef,
19+
projectService: ProjectService
20+
)(implicit system: ActorSystem, mat: Materializer, ec: ExecutionContext)
21+
extends AbstractController(cc) {
22+
23+
def joinProject(projectId: Int): WebSocket = authenticatedWebSocket { userToken =>
24+
projectService.isUserInActiveProject(userToken.userId, projectId).map { exists =>
25+
if (exists) {
26+
Right(
27+
ActorFlow.actorRef[JsValue, JsValue] { out =>
28+
actors.ProjectClientActor.props(out, userToken.userId, projectId, projectActorRegistry)
29+
}
30+
)
31+
} else {
32+
Left(Results.Forbidden("User is not a member of this project"))
33+
}
34+
}
35+
}
36+
}
37+
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package dto.websocket
2+
3+
import dto.websocket.board.BoardMessage
4+
import play.api.libs.json.Writes
5+
6+
trait OutgoingMessage
7+
8+
object OutgoingMessage {
9+
implicit val writes: Writes[OutgoingMessage] = {
10+
case bm: BoardMessage => BoardMessage.writes.writes(bm)
11+
}
12+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package dto.websocket.board
2+
3+
import play.api.libs.json._
4+
import dto.websocket.OutgoingMessage
5+
6+
sealed trait BoardMessage extends OutgoingMessage
7+
8+
case class ColumnMoved(columnId: Int, newPosition: Int) extends BoardMessage
9+
case class TaskMoved(taskId: Int, fromColumnId: Int, toColumnId: Int, newPosition: Int) extends BoardMessage
10+
11+
object BoardMessage {
12+
implicit val columnMovedFormat: OFormat[ColumnMoved] = Json.format[ColumnMoved]
13+
implicit val taskMovedFormat: OFormat[TaskMoved] = Json.format[TaskMoved]
14+
15+
implicit val writes: Writes[BoardMessage] = Writes {
16+
case cm: ColumnMoved => Json.obj("type" -> "columnMoved") ++ Json.toJsObject(cm)
17+
case tm: TaskMoved => Json.obj("type" -> "taskMoved") ++ Json.toJsObject(tm)
18+
}
19+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package modules
2+
3+
import actors.ProjectActorRegistry
4+
import com.google.inject.name.Named
5+
import com.google.inject.{AbstractModule, Provides, Singleton}
6+
import org.apache.pekko.actor.{ActorRef, ActorSystem, Props}
7+
import play.api.Logger
8+
9+
/**
10+
* Constants for actor names to ensure consistency across the application.
11+
*/
12+
object ActorNames {
13+
final val ProjectActorRegistry = "project-actor-registry"
14+
final val ProjectActorPrefix = "project-actor-"
15+
}
16+
17+
/**
18+
* Module to provide actor instances.
19+
*/
20+
class ActorsModule extends AbstractModule {
21+
@Provides
22+
@Singleton
23+
@Named(ActorNames.ProjectActorRegistry)
24+
def provideProjectActorRegistry(system: ActorSystem): ActorRef = {
25+
Logger("actors").info("Creating ProjectActorRegistry actor")
26+
system.actorOf(Props[ProjectActorRegistry], ActorNames.ProjectActorRegistry)
27+
}
28+
}

0 commit comments

Comments
 (0)