Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.idea
.vscode
50 changes: 50 additions & 0 deletions backend/app/actors/ProjectActor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package actors

import dto.websocket.OutgoingMessage
import org.apache.pekko.actor.{Actor, ActorRef, Props}
import play.api.Logger
import play.api.libs.json.{JsValue, Json}

/**
* Actor that manages WebSocket connections for a specific project.
*/
object ProjectActor {
def props(projectId: Int): Props = Props(new ProjectActor(projectId))

case class Join(userId: Int, out: ActorRef)
case class Leave(userId: Int)
case class Broadcast(message: OutgoingMessage)
}

/**
* Actor that manages WebSocket connections for a specific project.
* It keeps track of connected users and broadcasts messages to them.
*
* @param projectId the ID of the project
*/
class ProjectActor(projectId: Int) extends Actor {
import ProjectActor._

private var members = Map.empty[Int, ActorRef]

def receive: Receive = {
case Join(userId, out) =>
members += userId -> out
Logger("actors").info(
s"UserId $userId joined project $projectId. Total members: ${members.size}"
)

case Leave(userId) =>
members -= userId
Logger("actors").info(
s"UserId with $userId left project $projectId. Total members: ${members.size}"
)

case Broadcast(msg) =>
val js: JsValue = Json.toJson(msg)
members.values.foreach(_ ! js)
Logger("actors").info(
s"Broadcasted message to project $projectId members: ${members.size} users"
)
}
}
34 changes: 34 additions & 0 deletions backend/app/actors/ProjectActorRegistry.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package actors

import org.apache.pekko.actor.{Actor, ActorRef, Props}
import dto.websocket.OutgoingMessage
import modules.ActorNames

object ProjectActorRegistry {
def props: Props = Props(new ProjectActorRegistry)

case class GetProjectActor(projectId: Int)
case class BroadcastToProject(projectId: Int, message: OutgoingMessage)
}

class ProjectActorRegistry extends Actor {
import ProjectActorRegistry._
import ProjectActor._

private var projectActors = Map.empty[Int, ActorRef]

def receive: Receive = {
case GetProjectActor(projectId) =>
// create new ProjectActor if not exists
val actor = projectActors.getOrElse(projectId, {
val newActor = context.actorOf(ProjectActor.props(projectId), s"${ActorNames.ProjectActorPrefix}$projectId")
projectActors += projectId -> newActor
newActor
})
// return ActorRef for requester
sender() ! actor

case BroadcastToProject(projectId, message) =>
projectActors.get(projectId).foreach(_ ! Broadcast(message))
}
}
72 changes: 72 additions & 0 deletions backend/app/actors/ProjectClientActor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package actors

import org.apache.pekko.actor.{Actor, ActorRef, Props}
import org.apache.pekko.pattern.{ask, pipe}
import org.apache.pekko.util.Timeout

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

/**
* Actor that manages a WebSocket connection for a user in a specific project.
*/
object ProjectClientActor {
def props(out: ActorRef,
userId: Int,
projectId: Int,
registry: ActorRef): Props =
Props(new ProjectClientActor(out, userId, projectId, registry))

/**
* Message indicating that the ProjectActor has been found in the registry.
* @param projectRef the ActorRef of the ProjectActor
*/
private case class RegistryFound(projectRef: ActorRef)
}

/**
* Actor that manages a WebSocket connection for a user in a specific project.
* It registers the user with the ProjectActor upon creation and deregisters
* upon termination.
*
* @param out the ActorRef to send messages to the WebSocket
* @param userId the ID of the user
* @param projectId the ID of the project
* @param registry the ActorRef of the ProjectActorRegistry
*/
class ProjectClientActor(out: ActorRef,
userId: Int,
projectId: Int,
registry: ActorRef)
extends Actor {
import ProjectActor._
import ProjectActorRegistry._
import ProjectClientActor._

// Execution context and timeout for ask pattern
implicit val ec: ExecutionContext = context.dispatcher
implicit val timeout: Timeout = Timeout(3.seconds)

// Reference to the ProjectActor, once found
private var projectRefOpt: Option[ActorRef] = None

// On start, ask the registry for the ProjectActor
override def preStart(): Unit = {
(registry ? GetProjectActor(projectId))
.mapTo[ActorRef]
.map(RegistryFound) pipeTo self
}

// On stop, inform the ProjectActor that the user is leaving
override def postStop(): Unit = {
projectRefOpt.foreach(_ ! Leave(userId))
super.postStop()
}

// Handle incoming messages
def receive: Receive = {
case RegistryFound(projectRef) =>
projectRefOpt = Some(projectRef)
projectRef ! Join(userId, out)
}
}
34 changes: 34 additions & 0 deletions backend/app/controllers/AuthenticatedAction.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package controllers

import org.apache.pekko.stream.scaladsl.Flow
import play.api.libs.json.JsValue
import play.api.mvc._
import services.{CookieService, JwtService, UserToken}

Expand Down Expand Up @@ -62,3 +64,35 @@ class AuthenticatedActionWithUser @Inject()(
}
}
}

class AuthenticatedWebSocket @Inject()(
jwtService: JwtService,
cookieService: CookieService
)(implicit ec: ExecutionContext) {

def apply(
block: UserToken ⇒ Future[Either[Result, Flow[JsValue, JsValue, _]]]
): WebSocket = {
WebSocket.acceptOrResult[JsValue, JsValue] { requestHeader ⇒
cookieService.getTokenFromRequest(requestHeader) match {
case Some(token) ⇒
jwtService.validateToken(token) match {
case Success(userToken) ⇒
block(userToken)
case Failure(ex) ⇒
Future.successful(
Left(
Results
.Unauthorized(s"Invalid token: ${ex.getMessage}")
.withCookies(cookieService.createExpiredAuthCookie())
)
)
}
case None ⇒
Future.successful(
Left(Results.Unauthorized("No authentication token found"))
)
}
}
}
}
8 changes: 4 additions & 4 deletions backend/app/controllers/ColumnController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,15 @@ class ColumnController @Inject()(
}
}

/** PATCH /columns/:columnId/position */
def updatePosition(columnId: Int): Action[JsValue] =
/** PATCH /projects/:projectId/columns/:columnId/position */
def updatePosition(projectId: Int, columnId: Int): Action[JsValue] =
authenticatedActionWithUser.async(parse.json) { request =>
implicit val messages: Messages = request.messages
val updatedBy = request.userToken.userId
handleJsonValidation[UpdateColumnPositionRequest](request.body) {
updatePositionDto =>
updateColumnPositionDto =>
columnService
.updatePosition(columnId, updatePositionDto, updatedBy)
.updatePosition(projectId, columnId, updateColumnPositionDto, updatedBy)
.map { _ =>
Ok(
Json.toJson(
Expand Down
37 changes: 37 additions & 0 deletions backend/app/controllers/WebSocketController.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package controllers

import modules.ActorNames
import org.apache.pekko.actor.{ActorRef, ActorSystem}
import org.apache.pekko.stream.Materializer
import play.api.libs.json.JsValue
import play.api.libs.streams.ActorFlow
import play.api.mvc._
import services.ProjectService

import javax.inject._
import scala.concurrent.ExecutionContext

@Singleton
class WebSocketController @Inject()(
cc: ControllerComponents,
authenticatedWebSocket: AuthenticatedWebSocket,
@Named(ActorNames.ProjectActorRegistry) projectActorRegistry: ActorRef,
projectService: ProjectService
)(implicit system: ActorSystem, mat: Materializer, ec: ExecutionContext)
extends AbstractController(cc) {

def joinProject(projectId: Int): WebSocket = authenticatedWebSocket { userToken =>
projectService.isUserInActiveProject(userToken.userId, projectId).map { exists =>
if (exists) {
Right(
ActorFlow.actorRef[JsValue, JsValue] { out =>
actors.ProjectClientActor.props(out, userToken.userId, projectId, projectActorRegistry)
}
)
} else {
Left(Results.Forbidden("User is not a member of this project"))
}
}
}
}

12 changes: 12 additions & 0 deletions backend/app/dto/websocket/OutgoingMessage.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package dto.websocket

import dto.websocket.board.BoardMessage
import play.api.libs.json.Writes

trait OutgoingMessage

object OutgoingMessage {
implicit val writes: Writes[OutgoingMessage] = {
case bm: BoardMessage => BoardMessage.writes.writes(bm)
}
}
19 changes: 19 additions & 0 deletions backend/app/dto/websocket/board/BoardMessage.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package dto.websocket.board

import play.api.libs.json._
import dto.websocket.OutgoingMessage

sealed trait BoardMessage extends OutgoingMessage

case class ColumnMoved(columnId: Int, newPosition: Int) extends BoardMessage
case class TaskMoved(taskId: Int, fromColumnId: Int, toColumnId: Int, newPosition: Int) extends BoardMessage

object BoardMessage {
implicit val columnMovedFormat: OFormat[ColumnMoved] = Json.format[ColumnMoved]
implicit val taskMovedFormat: OFormat[TaskMoved] = Json.format[TaskMoved]

implicit val writes: Writes[BoardMessage] = Writes {
case cm: ColumnMoved => Json.obj("type" -> "columnMoved") ++ Json.toJsObject(cm)
case tm: TaskMoved => Json.obj("type" -> "taskMoved") ++ Json.toJsObject(tm)
}
}
28 changes: 28 additions & 0 deletions backend/app/modules/ActorsModule.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package modules

import actors.ProjectActorRegistry
import com.google.inject.name.Named
import com.google.inject.{AbstractModule, Provides, Singleton}
import org.apache.pekko.actor.{ActorRef, ActorSystem, Props}
import play.api.Logger

/**
* Constants for actor names to ensure consistency across the application.
*/
object ActorNames {
final val ProjectActorRegistry = "project-actor-registry"
final val ProjectActorPrefix = "project-actor-"
}

/**
* Module to provide actor instances.
*/
class ActorsModule extends AbstractModule {
@Provides
@Singleton
@Named(ActorNames.ProjectActorRegistry)
def provideProjectActorRegistry(system: ActorSystem): ActorRef = {
Logger("actors").info("Creating ProjectActorRegistry actor")
system.actorOf(Props[ProjectActorRegistry], ActorNames.ProjectActorRegistry)
}
}
8 changes: 6 additions & 2 deletions backend/app/repositories/ColumnRepository.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,12 @@ class ColumnRepository @Inject()(

def findStatusIfUserInProject(columnId: Int, userId: Int): DBIO[Option[ColumnStatus]] = {
val query = for {
(c, p) <- columns join projects on (_.projectId === _.id)
if c.id === columnId.bind && p.createdBy === userId.bind
(((c, p), up)) <- columns
.join(projects).on(_.projectId === _.id)
.join(userProjects).on { case ((c, p), up) =>
p.id === up.projectId && up.userId === userId.bind
}
if c.id === columnId.bind
} yield c.status

query.result.headOption
Expand Down
29 changes: 0 additions & 29 deletions backend/app/repositories/UserWorkspaceRepository.scala

This file was deleted.

24 changes: 24 additions & 0 deletions backend/app/services/BroadcastService.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package services

import org.apache.pekko.actor.ActorRef
import org.apache.pekko.util.Timeout
import actors.ProjectActorRegistry
import com.google.inject.name.Named
import dto.websocket.OutgoingMessage
import modules.ActorNames

import javax.inject.Inject
import scala.concurrent.Future
import scala.concurrent.duration._

class BroadcastService @Inject() (@Named(ActorNames.ProjectActorRegistry) registry: ActorRef) {
import ProjectActorRegistry._

implicit val timeout: Timeout = Timeout(3.seconds)

// Broadcast to project actor
def broadcastToProject(projectId: Int, message: OutgoingMessage): Future[Unit] = {
registry ! BroadcastToProject(projectId, message)
Future.successful(())
}
}
Loading
Loading