From 5d7da22348d85bb76558878c2c6472a986384222 Mon Sep 17 00:00:00 2001 From: Vu Tran Trung Date: Wed, 10 Sep 2025 14:03:19 +0700 Subject: [PATCH 1/4] set up ws --- .../app/controllers/AuthenticatedAction.scala | 31 ++++++++ .../app/controllers/WebSocketController.scala | 72 +++++++++++++++++++ backend/app/dto/websocket/InMsg.scala | 12 ++++ backend/app/dto/websocket/OutMsg.scala | 12 ++++ backend/app/services/Rooms.scala | 26 +++++++ .../app/websocket/codecs/ColumnCodecs.scala | 11 +++ .../app/websocket/codecs/DomainCodecs.scala | 19 +++++ .../websocket/codecs/WebSocketCodecs.scala | 39 ++++++++++ .../websocket/handlers/ColumnHandler.scala | 29 ++++++++ backend/app/websocket/handlers/Handler.scala | 16 +++++ backend/conf/routes | 5 +- 11 files changed, 271 insertions(+), 1 deletion(-) create mode 100644 backend/app/controllers/WebSocketController.scala create mode 100644 backend/app/dto/websocket/InMsg.scala create mode 100644 backend/app/dto/websocket/OutMsg.scala create mode 100644 backend/app/services/Rooms.scala create mode 100644 backend/app/websocket/codecs/ColumnCodecs.scala create mode 100644 backend/app/websocket/codecs/DomainCodecs.scala create mode 100644 backend/app/websocket/codecs/WebSocketCodecs.scala create mode 100644 backend/app/websocket/handlers/ColumnHandler.scala create mode 100644 backend/app/websocket/handlers/Handler.scala diff --git a/backend/app/controllers/AuthenticatedAction.scala b/backend/app/controllers/AuthenticatedAction.scala index af49ab3..6ba54a0 100644 --- a/backend/app/controllers/AuthenticatedAction.scala +++ b/backend/app/controllers/AuthenticatedAction.scala @@ -1,5 +1,7 @@ package controllers +import org.apache.pekko.stream.scaladsl.Flow +import play.api.libs.json.JsValue import play.api.mvc._ import services.{CookieService, JwtService, UserToken} @@ -62,3 +64,32 @@ class AuthenticatedActionWithUser @Inject()( } } } + +class AuthenticatedWebSocket @Inject()( + jwtService: JwtService, + cookieService: CookieService + )(implicit ec: ExecutionContext) { + + def apply(block: UserToken ⇒ Flow[JsValue, JsValue, _]): WebSocket = { + WebSocket.acceptOrResult[JsValue, JsValue] { requestHeader ⇒ + cookieService.getTokenFromRequest(requestHeader) match { + case Some(token) ⇒ + jwtService.validateToken(token) match { + case Success(userToken) ⇒ + Future.successful(Right(block(userToken))) + case Failure(ex) ⇒ + Future.successful( + Left( + Results.Unauthorized(s"Invalid token: ${ex.getMessage}") + .withCookies(cookieService.createExpiredAuthCookie()) + ) + ) + } + case None ⇒ + Future.successful( + Left(Results.Unauthorized("No authentication token found")) + ) + } + } + } +} diff --git a/backend/app/controllers/WebSocketController.scala b/backend/app/controllers/WebSocketController.scala new file mode 100644 index 0000000..4faca20 --- /dev/null +++ b/backend/app/controllers/WebSocketController.scala @@ -0,0 +1,72 @@ +package controllers + +import dto.websocket.{InMsg, OutMsg} +import dto.websocket.InMsg.{ColumnInMsg, Join, Ping} +import dto.websocket.OutMsg.{ErrorMsg, Joined} +import org.apache.pekko.NotUsed +import org.apache.pekko.stream.{KillSwitches, Materializer, OverflowStrategy, UniqueKillSwitch} +import org.apache.pekko.stream.scaladsl.{Flow, Keep, Sink, Source} +import play.api.libs.json.{JsValue, Json} +import play.api.mvc.{InjectedController, WebSocket} +import services.Rooms +import websocket.handlers.{ColumnHandler, HandlerContext} +import websocket.codecs.DomainCodecs._ +import websocket.codecs.WebSocketCodecs._ + +import javax.inject.{Inject, Singleton} +import scala.concurrent.{ExecutionContext, Future} + +@Singleton +class WebSocketController @Inject()( + rooms: Rooms, + columnHandler: ColumnHandler, + authenticatedWebSocket: AuthenticatedWebSocket + )(implicit mat: Materializer, ec: ExecutionContext) extends InjectedController { + + def ws: WebSocket = authenticatedWebSocket { userToken ⇒ + socketFlow(userToken.userId) + } + + private def socketFlow(userId: Int): Flow[JsValue, JsValue, NotUsed] = { + val (outQueue, outSource) = + Source.queue[JsValue](64, OverflowStrategy.dropHead).preMaterialize() + + var currentKillSwitch: Option[UniqueKillSwitch] = None + + val inbound: Sink[JsValue, NotUsed] = + Flow[JsValue].mapAsync(1) { + case js if js.validate[InMsg].isSuccess => + js.as[InMsg] match { + case Ping => + outQueue.offer(Json.toJson(OutMsg.Pong)) + + case Join(boardId) => + // unsubscribe if already joined another board + currentKillSwitch.foreach(_.shutdown()) + val (_, roomSrc) = rooms.room(boardId) + + val (ks, _) = roomSrc + .viaMat(KillSwitches.single)(Keep.right) + .map(Json.parse) + .toMat(Sink.foreach(outQueue.offer))(Keep.both) + .run() + + currentKillSwitch = Some(ks) + outQueue.offer(Json.toJson(Joined(boardId, userId))).map(_ => ()) + + case msg: ColumnInMsg => + columnHandler.handle(msg, HandlerContext(userId, outQueue)) + + case _ => + outQueue.offer(Json.toJson(ErrorMsg("Unknown message"))).map(_ => ()) + } + + case other => + Future.successful( + outQueue.offer(Json.toJson(ErrorMsg(s"Invalid JSON: $other"))) + ).map(_ => ()) + }.to(Sink.ignore) + + Flow.fromSinkAndSourceCoupled(inbound, outSource) + } +} diff --git a/backend/app/dto/websocket/InMsg.scala b/backend/app/dto/websocket/InMsg.scala new file mode 100644 index 0000000..c423a16 --- /dev/null +++ b/backend/app/dto/websocket/InMsg.scala @@ -0,0 +1,12 @@ +package dto.websocket + +sealed trait InMsg +object InMsg { + + case object Ping extends InMsg + case class Join(boardId: Int) extends InMsg + + // Column domain + sealed trait ColumnInMsg extends InMsg + case class MoveColumn(boardId: Int, columnId: Int, newPos: Int) extends ColumnInMsg +} diff --git a/backend/app/dto/websocket/OutMsg.scala b/backend/app/dto/websocket/OutMsg.scala new file mode 100644 index 0000000..e51e6a0 --- /dev/null +++ b/backend/app/dto/websocket/OutMsg.scala @@ -0,0 +1,12 @@ +package dto.websocket + +sealed trait OutMsg +object OutMsg { + + case object Pong extends OutMsg + case class Joined(boardId: Int, userId: Int) extends OutMsg + case class ErrorMsg(error: String) extends OutMsg + + // column + case class ColumnMoved(boardId: Int, columnId: Int, newPos: Int) extends OutMsg +} diff --git a/backend/app/services/Rooms.scala b/backend/app/services/Rooms.scala new file mode 100644 index 0000000..119921d --- /dev/null +++ b/backend/app/services/Rooms.scala @@ -0,0 +1,26 @@ +package services + +import org.apache.pekko.NotUsed +import org.apache.pekko.stream.{Materializer, OverflowStrategy} +import org.apache.pekko.stream.scaladsl.{BroadcastHub, Keep, Source, SourceQueueWithComplete} + +import javax.inject.{Inject, Singleton} + +@Singleton +class Rooms @Inject()(implicit mat: Materializer) { + + private var rooms: Map[Int, (SourceQueueWithComplete[String], Source[String, NotUsed])] = Map.empty + + def room(boardId: Int): (SourceQueueWithComplete[String], Source[String, NotUsed]) = + rooms.getOrElse(boardId, { + // Each board has its own queue + broadcast hub + val (queue, src) = Source + .queue[String](bufferSize = 128, overflowStrategy = OverflowStrategy.dropHead) + .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both) + .run() + + val room = (queue, src) + rooms += boardId -> room + room + }) +} diff --git a/backend/app/websocket/codecs/ColumnCodecs.scala b/backend/app/websocket/codecs/ColumnCodecs.scala new file mode 100644 index 0000000..796d29c --- /dev/null +++ b/backend/app/websocket/codecs/ColumnCodecs.scala @@ -0,0 +1,11 @@ +package websocket.codecs + +import dto.websocket.{InMsg, OutMsg} +import play.api.libs.json.{Json, OFormat, OWrites} + +object ColumnCodecs { + implicit val moveColumnFormat: OFormat[InMsg.MoveColumn] = Json.format[InMsg.MoveColumn] + + implicit val columnMovedWrites: OWrites[OutMsg.ColumnMoved] = + Json.writes[OutMsg.ColumnMoved].transform(_ ++ Json.obj("type" -> "columnMoved")) +} diff --git a/backend/app/websocket/codecs/DomainCodecs.scala b/backend/app/websocket/codecs/DomainCodecs.scala new file mode 100644 index 0000000..ea6620f --- /dev/null +++ b/backend/app/websocket/codecs/DomainCodecs.scala @@ -0,0 +1,19 @@ +package websocket.codecs + +import dto.websocket.{InMsg, OutMsg} +import play.api.libs.json.{Json, OFormat, OWrites} + +object DomainCodecs { + // Join read/write + implicit val joinFormat: OFormat[InMsg.Join] = Json.format[InMsg.Join] + + // Out writes for simple system messages + implicit val pongWrites: OWrites[OutMsg.Pong.type] = + OWrites(_ => Json.obj("type" -> "pong")) + + implicit val joinedWrites: OWrites[OutMsg.Joined] = + Json.writes[OutMsg.Joined].transform(_ ++ Json.obj("type" -> "joined")) + + implicit val errorWrites: OWrites[OutMsg.ErrorMsg] = + Json.writes[OutMsg.ErrorMsg].transform(_ ++ Json.obj("type" -> "error")) +} diff --git a/backend/app/websocket/codecs/WebSocketCodecs.scala b/backend/app/websocket/codecs/WebSocketCodecs.scala new file mode 100644 index 0000000..77d2f78 --- /dev/null +++ b/backend/app/websocket/codecs/WebSocketCodecs.scala @@ -0,0 +1,39 @@ +package websocket.codecs + +import play.api.libs.json._ +import DomainCodecs._ +import ColumnCodecs._ +import dto.websocket.{InMsg, OutMsg} + +object WebSocketCodecs { + // InMsg: read "type" then delegate to domain Reads + implicit val inMsgReads: Reads[InMsg] = Reads { js => + (js \ "type").validate[String].flatMap { + case t if t.equalsIgnoreCase("ping") => + JsSuccess(InMsg.Ping) + + case t if t.equalsIgnoreCase("join") => + js.validate[InMsg.Join] + + case t if t.equalsIgnoreCase("columnMoved") || t.equalsIgnoreCase("moveColumn") => + // normalize naming: frontend might send "moveColumn" or "MoveColumn" + js.validate[InMsg.MoveColumn] + + // add more cases for other domains: + // case t if t.equalsIgnoreCase("moveTask") => js.validate[InMsg.MoveTask] + + case other => + JsError(s"Unknown type: $other") + } + } + + // OutMsg: choose correct domain writer (these writers are imported from domain codec objects) + implicit val outMsgWrites: Writes[OutMsg] = Writes { + case OutMsg.Pong => Json.toJson(OutMsg.Pong)(pongWrites) + case j: OutMsg.Joined => Json.toJson(j)(joinedWrites) + case c: OutMsg.ColumnMoved => Json.toJson(c)(columnMovedWrites) + case e: OutMsg.ErrorMsg => Json.toJson(e)(errorWrites) + // add Task/Comment branches here later + } +} + diff --git a/backend/app/websocket/handlers/ColumnHandler.scala b/backend/app/websocket/handlers/ColumnHandler.scala new file mode 100644 index 0000000..d32272e --- /dev/null +++ b/backend/app/websocket/handlers/ColumnHandler.scala @@ -0,0 +1,29 @@ +package websocket.handlers + +import com.google.inject.Inject +import websocket.codecs.ColumnCodecs._ +import websocket.codecs.DomainCodecs._ +import dto.request.column.UpdateColumnPositionRequest +import dto.websocket.InMsg.{ColumnInMsg, MoveColumn} +import dto.websocket.OutMsg +import play.api.libs.json.Json +import services.ColumnService + +import scala.concurrent.{ExecutionContext, Future} + +class ColumnHandler @Inject() (columnService: ColumnService)(implicit ec: ExecutionContext) + extends Handler[ColumnInMsg] { + + override def handle(msg: ColumnInMsg, ctx: HandlerContext): Future[Unit] = msg match { + case MoveColumn(boardId, columnId, newPos) => + columnService + .updatePosition(columnId, UpdateColumnPositionRequest(newPos), ctx.userId) + .map { _ => + ctx.outQueue.offer(Json.toJson(OutMsg.ColumnMoved(boardId, columnId, newPos))) + () + } + .recover { case ex => + ctx.outQueue.offer(Json.toJson(OutMsg.ErrorMsg(ex.getMessage))) + } + } +} diff --git a/backend/app/websocket/handlers/Handler.scala b/backend/app/websocket/handlers/Handler.scala new file mode 100644 index 0000000..49800dd --- /dev/null +++ b/backend/app/websocket/handlers/Handler.scala @@ -0,0 +1,16 @@ +package websocket.handlers + +import dto.websocket.InMsg +import org.apache.pekko.stream.scaladsl.SourceQueueWithComplete +import play.api.libs.json.JsValue + +import scala.concurrent.Future + +trait Handler[M <: InMsg] { + def handle(msg: M, ctx: HandlerContext): Future[Unit] +} + +case class HandlerContext( + userId: Int, + outQueue: SourceQueueWithComplete[JsValue] +) diff --git a/backend/conf/routes b/backend/conf/routes index 4769bfe..5f9a9d5 100644 --- a/backend/conf/routes +++ b/backend/conf/routes @@ -53,4 +53,7 @@ PATCH /api/tasks/:taskId controllers.TaskController.update(taskId: GET /api/tasks/:taskId controllers.TaskController.getById(taskId: Int) PATCH /api/tasks/:taskId/archive controllers.TaskController.archive(taskId: Int) PATCH /api/tasks/:taskId/restore controllers.TaskController.restore(taskId: Int) -DELETE /api/tasks/:taskId controllers.TaskController.delete(taskId: Int) \ No newline at end of file +DELETE /api/tasks/:taskId controllers.TaskController.delete(taskId: Int) + +#Web socket +GET /ws controllers.WebSocketController.ws \ No newline at end of file From 32969382463d3cbc974ca473f5930da2f4482177 Mon Sep 17 00:00:00 2001 From: Vu Tran Trung Date: Wed, 10 Sep 2025 14:45:57 +0700 Subject: [PATCH 2/4] set up ws --- frontend/src/hooks/useWebsocket.ts | 21 ++++++++++ frontend/src/pages/WorkspaceBoard.tsx | 41 ++++++++++++------ frontend/src/services/axiosClient.ts | 1 + frontend/src/services/wsService.ts | 60 +++++++++++++++++++++++++++ frontend/src/types/ws/columns.ts | 5 +++ frontend/src/types/ws/domains.ts | 8 ++++ frontend/src/types/ws/index.ts | 5 +++ 7 files changed, 128 insertions(+), 13 deletions(-) create mode 100644 frontend/src/hooks/useWebsocket.ts create mode 100644 frontend/src/services/wsService.ts create mode 100644 frontend/src/types/ws/columns.ts create mode 100644 frontend/src/types/ws/domains.ts create mode 100644 frontend/src/types/ws/index.ts diff --git a/frontend/src/hooks/useWebsocket.ts b/frontend/src/hooks/useWebsocket.ts new file mode 100644 index 0000000..c2090e2 --- /dev/null +++ b/frontend/src/hooks/useWebsocket.ts @@ -0,0 +1,21 @@ +import { wsService } from "@/services/wsService"; +import type { InMsg, OutMsg } from "@/types/ws"; +import { useEffect, useState } from "react"; + +export function useWebSocket() { + const [lastMessage, setLastMessage] = useState(null); + + useEffect(() => { + wsService.connect(); + + const unsubscribe = wsService.onMessage((msg) => { + setLastMessage(msg); + }); + + return () => unsubscribe(); + }, []); + + const send = (msg: InMsg) => wsService.send(msg); + + return { lastMessage, send }; +} \ No newline at end of file diff --git a/frontend/src/pages/WorkspaceBoard.tsx b/frontend/src/pages/WorkspaceBoard.tsx index 9e692cd..da26eb7 100644 --- a/frontend/src/pages/WorkspaceBoard.tsx +++ b/frontend/src/pages/WorkspaceBoard.tsx @@ -2,10 +2,13 @@ import BoardNavbar from '@/components/board/BoardNavbar'; import DroppableColumn from '@/components/board/DroppableColumn'; import TaskDetailModal from '@/components/board/TaskDetailModal'; import LoadingContent from '@/components/ui/LoadingContent'; +import { useWebSocket } from '@/hooks/useWebsocket'; import { archiveColumn, createNewColumn, fetchBoardColumns, fetchBoardDetail, updateColumn, updateColumnPosititon } from '@/services/boardService'; import { notify } from '@/services/toastService'; import { reopenBoard } from '@/services/workspaceService'; import type { Board, Column } from '@/types'; +import type { ColumnOutMsg } from '@/types/ws/columns'; +import type { DomainOutMsg } from '@/types/ws/domains'; import { DndContext, DragOverlay, @@ -33,6 +36,7 @@ import { useParams } from 'react-router-dom'; const WorkspaceBoard = () => { const { boardId } = useParams(); + const { lastMessage, send } = useWebSocket(); const [isBoardClosed, setIsBoardClosed] = useState(false); const [boardDetail, setBoardDetail] = useState({ id: 0, name: '', status: undefined }); const [columns, setColumns] = useState([]); @@ -43,18 +47,6 @@ const WorkspaceBoard = () => { const [showDetailModal, setShowDetailModal] = useState(false); const containerRef = useRef(null); - const ws = new WebSocket("http://localhost:9000/ws"); - - ws.onopen = () => { - console.log("Connected to WS"); - ws.send(JSON.stringify({ type: "ping" })); - }; - - ws.onmessage = (event) => { - const msg = JSON.parse(event.data); - console.log("Received:", msg); - }; - // OPTIMIZATION: Track dragging state separately from active elements const [isDragging, setIsDragging] = useState(false); const [dragType, setDragType] = useState<'column' | 'item' | null>(null); @@ -127,8 +119,31 @@ const WorkspaceBoard = () => { } useEffect(() => { + send({ type: "join", boardId: Number(boardId) }); fetchBoardData(); - }, []); + }, [boardId]); + + useEffect(() => { + if (!lastMessage) return; + + switch (lastMessage.type) { + case "pong": + console.log("Server alive"); + break; + + case "joined": + console.log("Joined board", lastMessage.boardId); + break; + + case "columnMoved": + console.log("Column moved:", (lastMessage as ColumnOutMsg).columnId); + break; + + case "error": + console.error("WS Error:", (lastMessage as DomainOutMsg).type); + break; + } + }, [lastMessage]); useEffect(() => { const handleClickOutside = (event: MouseEvent) => { diff --git a/frontend/src/services/axiosClient.ts b/frontend/src/services/axiosClient.ts index aa6043e..e0701ab 100644 --- a/frontend/src/services/axiosClient.ts +++ b/frontend/src/services/axiosClient.ts @@ -13,6 +13,7 @@ const axiosClients: AxiosInstance = axios.create({ axiosClients.interceptors.request.use( function (config) { // Add any custom logic before the request is sent + return config; }, function (error) { diff --git a/frontend/src/services/wsService.ts b/frontend/src/services/wsService.ts new file mode 100644 index 0000000..8cf02e6 --- /dev/null +++ b/frontend/src/services/wsService.ts @@ -0,0 +1,60 @@ +import type { InMsg, OutMsg } from "@/types/ws"; + +type Listener = (msg: OutMsg) => void; + +class WSService { + private ws?: WebSocket; + private listeners: Listener[] = []; + private pending: InMsg[] = []; + + baseURL = `${import.meta.env.VITE_TRELLO_LIKE_API_URL}` || 'http://localhost:9000' + + connect() { + if (this.ws) return; + + const wsUrl = `${this.baseURL}/ws`; + this.ws = new WebSocket(wsUrl); + + this.ws.onopen = () => { + console.log("[WS] Connected"); + // flush pending + this.pending.forEach(msg => this.send(msg)); + this.pending = []; + this.send({ type: "ping" }); + }; + + this.ws.onmessage = (event) => { + try { + const msg: OutMsg = JSON.parse(event.data); + console.log("[WS] Received:", msg); + this.listeners.forEach((l) => l(msg)); + } catch (e) { + console.error("[WS] Parse error:", e); + } + }; + + this.ws.onclose = () => { + console.warn("[WS] Closed, retrying..."); + this.ws = undefined; + setTimeout(() => this.connect(), 3000); + }; + } + + send(msg: InMsg) { + if (this.ws?.readyState === WebSocket.OPEN) { + this.ws.send(JSON.stringify(msg)); + } else { + console.log("[WS] Queued until open:", msg); + this.pending.push(msg); + } + } + + onMessage(listener: Listener) { + this.listeners.push(listener); + return () => { + this.listeners = this.listeners.filter((l) => l !== listener); + }; + } +} + +export const wsService = new WSService(); diff --git a/frontend/src/types/ws/columns.ts b/frontend/src/types/ws/columns.ts new file mode 100644 index 0000000..228ac27 --- /dev/null +++ b/frontend/src/types/ws/columns.ts @@ -0,0 +1,5 @@ +export type ColumnInMsg = + | { type: 'moveColumn'; boardId: number; columnId: number; newPosition: number }; + +export type ColumnOutMsg = + | { type: 'columnMoved'; columnId: number, newPosition: number } \ No newline at end of file diff --git a/frontend/src/types/ws/domains.ts b/frontend/src/types/ws/domains.ts new file mode 100644 index 0000000..6dbbbad --- /dev/null +++ b/frontend/src/types/ws/domains.ts @@ -0,0 +1,8 @@ +export type DomainInMsg = + | { type: 'ping' } + | { type: 'join'; boardId: number; }; + +export type DomainOutMsg = + | { type: 'pong' } + | { type: 'joined'; boardId: number } + | { type: 'error'; message: string }; \ No newline at end of file diff --git a/frontend/src/types/ws/index.ts b/frontend/src/types/ws/index.ts new file mode 100644 index 0000000..be6b785 --- /dev/null +++ b/frontend/src/types/ws/index.ts @@ -0,0 +1,5 @@ +import type { ColumnInMsg, ColumnOutMsg } from "./columns"; +import type { DomainInMsg, DomainOutMsg } from "./domains"; + +export type InMsg = DomainInMsg | ColumnInMsg; +export type OutMsg = DomainOutMsg | ColumnOutMsg; \ No newline at end of file From 83e2fa1b3381dbd33d20b06fc15f390df8aa507a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nguy=E1=BB=85n=20L=C3=A2m=20Nh=E1=BA=ADt?= Date: Thu, 11 Sep 2025 16:08:52 +0700 Subject: [PATCH 3/4] setup websocket and implement broadcast message to user after moved column --- .gitignore | 2 + backend/app/actors/ProjectActor.scala | 50 ++++++++++ backend/app/actors/ProjectActorRegistry.scala | 34 +++++++ backend/app/actors/ProjectClientActor.scala | 72 ++++++++++++++ .../app/controllers/AuthenticatedAction.scala | 45 ++++----- .../app/controllers/ColumnController.scala | 8 +- .../app/controllers/WebSocketController.scala | 93 ++++++------------- backend/app/dto/websocket/InMsg.scala | 12 --- backend/app/dto/websocket/OutMsg.scala | 12 --- .../app/dto/websocket/OutgoingMessage.scala | 12 +++ .../dto/websocket/board/BoardMessage.scala | 19 ++++ backend/app/modules/ActorsModule.scala | 28 ++++++ .../app/repositories/ColumnRepository.scala | 8 +- backend/app/services/BroadcastService.scala | 24 +++++ backend/app/services/ColumnService.scala | 16 +++- backend/app/services/ProjectService.scala | 4 + backend/app/services/WebSocketService.scala | 47 ++++++++++ .../app/websocket/codecs/ColumnCodecs.scala | 11 --- .../app/websocket/codecs/DomainCodecs.scala | 19 ---- .../websocket/codecs/WebSocketCodecs.scala | 39 -------- .../websocket/handlers/ColumnHandler.scala | 29 ------ backend/app/websocket/handlers/Handler.scala | 16 ---- backend/conf/application.conf | 1 + backend/conf/logback.xml | 1 + backend/conf/routes | 4 +- 25 files changed, 373 insertions(+), 233 deletions(-) create mode 100644 .gitignore create mode 100644 backend/app/actors/ProjectActor.scala create mode 100644 backend/app/actors/ProjectActorRegistry.scala create mode 100644 backend/app/actors/ProjectClientActor.scala delete mode 100644 backend/app/dto/websocket/InMsg.scala delete mode 100644 backend/app/dto/websocket/OutMsg.scala create mode 100644 backend/app/dto/websocket/OutgoingMessage.scala create mode 100644 backend/app/dto/websocket/board/BoardMessage.scala create mode 100644 backend/app/modules/ActorsModule.scala create mode 100644 backend/app/services/BroadcastService.scala create mode 100644 backend/app/services/WebSocketService.scala delete mode 100644 backend/app/websocket/codecs/ColumnCodecs.scala delete mode 100644 backend/app/websocket/codecs/DomainCodecs.scala delete mode 100644 backend/app/websocket/codecs/WebSocketCodecs.scala delete mode 100644 backend/app/websocket/handlers/ColumnHandler.scala delete mode 100644 backend/app/websocket/handlers/Handler.scala diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d48c759 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea +.vscode \ No newline at end of file diff --git a/backend/app/actors/ProjectActor.scala b/backend/app/actors/ProjectActor.scala new file mode 100644 index 0000000..bdb4c72 --- /dev/null +++ b/backend/app/actors/ProjectActor.scala @@ -0,0 +1,50 @@ +package actors + +import dto.websocket.OutgoingMessage +import org.apache.pekko.actor.{Actor, ActorRef, Props} +import play.api.Logger +import play.api.libs.json.{JsValue, Json} + +/** + * Actor that manages WebSocket connections for a specific project. + */ +object ProjectActor { + def props(projectId: Int): Props = Props(new ProjectActor(projectId)) + + case class Join(userId: Int, out: ActorRef) + case class Leave(userId: Int) + case class Broadcast(message: OutgoingMessage) +} + +/** + * Actor that manages WebSocket connections for a specific project. + * It keeps track of connected users and broadcasts messages to them. + * + * @param projectId the ID of the project + */ +class ProjectActor(projectId: Int) extends Actor { + import ProjectActor._ + + private var members = Map.empty[Int, ActorRef] + + def receive: Receive = { + case Join(userId, out) => + members += userId -> out + Logger("actors").info( + s"UserId $userId joined project $projectId. Total members: ${members.size}" + ) + + case Leave(userId) => + members -= userId + Logger("actors").info( + s"UserId with $userId left project $projectId. Total members: ${members.size}" + ) + + case Broadcast(msg) => + val js: JsValue = Json.toJson(msg) + members.values.foreach(_ ! js) + Logger("actors").info( + s"Broadcasted message to project $projectId members: ${members.size} users" + ) + } +} diff --git a/backend/app/actors/ProjectActorRegistry.scala b/backend/app/actors/ProjectActorRegistry.scala new file mode 100644 index 0000000..fc87c4d --- /dev/null +++ b/backend/app/actors/ProjectActorRegistry.scala @@ -0,0 +1,34 @@ +package actors + +import org.apache.pekko.actor.{Actor, ActorRef, Props} +import dto.websocket.OutgoingMessage +import modules.ActorNames + +object ProjectActorRegistry { + def props: Props = Props(new ProjectActorRegistry) + + case class GetProjectActor(projectId: Int) + case class BroadcastToProject(projectId: Int, message: OutgoingMessage) +} + +class ProjectActorRegistry extends Actor { + import ProjectActorRegistry._ + import ProjectActor._ + + private var projectActors = Map.empty[Int, ActorRef] + + def receive: Receive = { + case GetProjectActor(projectId) => + // create new ProjectActor if not exists + val actor = projectActors.getOrElse(projectId, { + val newActor = context.actorOf(ProjectActor.props(projectId), s"${ActorNames.ProjectActorPrefix}$projectId") + projectActors += projectId -> newActor + newActor + }) + // return ActorRef for requester + sender() ! actor + + case BroadcastToProject(projectId, message) => + projectActors.get(projectId).foreach(_ ! Broadcast(message)) + } +} diff --git a/backend/app/actors/ProjectClientActor.scala b/backend/app/actors/ProjectClientActor.scala new file mode 100644 index 0000000..c2ad4d7 --- /dev/null +++ b/backend/app/actors/ProjectClientActor.scala @@ -0,0 +1,72 @@ +package actors + +import org.apache.pekko.actor.{Actor, ActorRef, Props} +import org.apache.pekko.pattern.{ask, pipe} +import org.apache.pekko.util.Timeout + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ + +/** + * Actor that manages a WebSocket connection for a user in a specific project. + */ +object ProjectClientActor { + def props(out: ActorRef, + userId: Int, + projectId: Int, + registry: ActorRef): Props = + Props(new ProjectClientActor(out, userId, projectId, registry)) + + /** + * Message indicating that the ProjectActor has been found in the registry. + * @param projectRef the ActorRef of the ProjectActor + */ + private case class RegistryFound(projectRef: ActorRef) +} + +/** + * Actor that manages a WebSocket connection for a user in a specific project. + * It registers the user with the ProjectActor upon creation and deregisters + * upon termination. + * + * @param out the ActorRef to send messages to the WebSocket + * @param userId the ID of the user + * @param projectId the ID of the project + * @param registry the ActorRef of the ProjectActorRegistry + */ +class ProjectClientActor(out: ActorRef, + userId: Int, + projectId: Int, + registry: ActorRef) + extends Actor { + import ProjectActor._ + import ProjectActorRegistry._ + import ProjectClientActor._ + + // Execution context and timeout for ask pattern + implicit val ec: ExecutionContext = context.dispatcher + implicit val timeout: Timeout = Timeout(3.seconds) + + // Reference to the ProjectActor, once found + private var projectRefOpt: Option[ActorRef] = None + + // On start, ask the registry for the ProjectActor + override def preStart(): Unit = { + (registry ? GetProjectActor(projectId)) + .mapTo[ActorRef] + .map(RegistryFound) pipeTo self + } + + // On stop, inform the ProjectActor that the user is leaving + override def postStop(): Unit = { + projectRefOpt.foreach(_ ! Leave(userId)) + super.postStop() + } + + // Handle incoming messages + def receive: Receive = { + case RegistryFound(projectRef) => + projectRefOpt = Some(projectRef) + projectRef ! Join(userId, out) + } +} diff --git a/backend/app/controllers/AuthenticatedAction.scala b/backend/app/controllers/AuthenticatedAction.scala index 6ba54a0..3e93d3e 100644 --- a/backend/app/controllers/AuthenticatedAction.scala +++ b/backend/app/controllers/AuthenticatedAction.scala @@ -70,26 +70,29 @@ class AuthenticatedWebSocket @Inject()( cookieService: CookieService )(implicit ec: ExecutionContext) { - def apply(block: UserToken ⇒ Flow[JsValue, JsValue, _]): WebSocket = { - WebSocket.acceptOrResult[JsValue, JsValue] { requestHeader ⇒ - cookieService.getTokenFromRequest(requestHeader) match { - case Some(token) ⇒ - jwtService.validateToken(token) match { - case Success(userToken) ⇒ - Future.successful(Right(block(userToken))) - case Failure(ex) ⇒ - Future.successful( - Left( - Results.Unauthorized(s"Invalid token: ${ex.getMessage}") - .withCookies(cookieService.createExpiredAuthCookie()) - ) - ) - } - case None ⇒ - Future.successful( - Left(Results.Unauthorized("No authentication token found")) - ) - } - } + def apply( + block: UserToken ⇒ Future[Either[Result, Flow[JsValue, JsValue, _]]] + ): WebSocket = { + WebSocket.acceptOrResult[JsValue, JsValue] { requestHeader ⇒ + cookieService.getTokenFromRequest(requestHeader) match { + case Some(token) ⇒ + jwtService.validateToken(token) match { + case Success(userToken) ⇒ + block(userToken) + case Failure(ex) ⇒ + Future.successful( + Left( + Results + .Unauthorized(s"Invalid token: ${ex.getMessage}") + .withCookies(cookieService.createExpiredAuthCookie()) + ) + ) + } + case None ⇒ + Future.successful( + Left(Results.Unauthorized("No authentication token found")) + ) + } } + } } diff --git a/backend/app/controllers/ColumnController.scala b/backend/app/controllers/ColumnController.scala index 8aea609..3057bf8 100644 --- a/backend/app/controllers/ColumnController.scala +++ b/backend/app/controllers/ColumnController.scala @@ -112,15 +112,15 @@ class ColumnController @Inject()( } } - /** PATCH /columns/:columnId/position */ - def updatePosition(columnId: Int): Action[JsValue] = + /** PATCH /projects/:projectId/columns/:columnId/position */ + def updatePosition(projectId: Int, columnId: Int): Action[JsValue] = authenticatedActionWithUser.async(parse.json) { request => implicit val messages: Messages = request.messages val updatedBy = request.userToken.userId handleJsonValidation[UpdateColumnPositionRequest](request.body) { - updatePositionDto => + updateColumnPositionDto => columnService - .updatePosition(columnId, updatePositionDto, updatedBy) + .updatePosition(projectId, columnId, updateColumnPositionDto, updatedBy) .map { _ => Ok( Json.toJson( diff --git a/backend/app/controllers/WebSocketController.scala b/backend/app/controllers/WebSocketController.scala index 4faca20..41fa4d5 100644 --- a/backend/app/controllers/WebSocketController.scala +++ b/backend/app/controllers/WebSocketController.scala @@ -1,72 +1,37 @@ package controllers -import dto.websocket.{InMsg, OutMsg} -import dto.websocket.InMsg.{ColumnInMsg, Join, Ping} -import dto.websocket.OutMsg.{ErrorMsg, Joined} -import org.apache.pekko.NotUsed -import org.apache.pekko.stream.{KillSwitches, Materializer, OverflowStrategy, UniqueKillSwitch} -import org.apache.pekko.stream.scaladsl.{Flow, Keep, Sink, Source} -import play.api.libs.json.{JsValue, Json} -import play.api.mvc.{InjectedController, WebSocket} -import services.Rooms -import websocket.handlers.{ColumnHandler, HandlerContext} -import websocket.codecs.DomainCodecs._ -import websocket.codecs.WebSocketCodecs._ +import modules.ActorNames +import org.apache.pekko.actor.{ActorRef, ActorSystem} +import org.apache.pekko.stream.Materializer +import play.api.libs.json.JsValue +import play.api.libs.streams.ActorFlow +import play.api.mvc._ +import services.ProjectService -import javax.inject.{Inject, Singleton} -import scala.concurrent.{ExecutionContext, Future} +import javax.inject._ +import scala.concurrent.ExecutionContext @Singleton class WebSocketController @Inject()( - rooms: Rooms, - columnHandler: ColumnHandler, - authenticatedWebSocket: AuthenticatedWebSocket - )(implicit mat: Materializer, ec: ExecutionContext) extends InjectedController { - - def ws: WebSocket = authenticatedWebSocket { userToken ⇒ - socketFlow(userToken.userId) - } - - private def socketFlow(userId: Int): Flow[JsValue, JsValue, NotUsed] = { - val (outQueue, outSource) = - Source.queue[JsValue](64, OverflowStrategy.dropHead).preMaterialize() - - var currentKillSwitch: Option[UniqueKillSwitch] = None - - val inbound: Sink[JsValue, NotUsed] = - Flow[JsValue].mapAsync(1) { - case js if js.validate[InMsg].isSuccess => - js.as[InMsg] match { - case Ping => - outQueue.offer(Json.toJson(OutMsg.Pong)) - - case Join(boardId) => - // unsubscribe if already joined another board - currentKillSwitch.foreach(_.shutdown()) - val (_, roomSrc) = rooms.room(boardId) - - val (ks, _) = roomSrc - .viaMat(KillSwitches.single)(Keep.right) - .map(Json.parse) - .toMat(Sink.foreach(outQueue.offer))(Keep.both) - .run() - - currentKillSwitch = Some(ks) - outQueue.offer(Json.toJson(Joined(boardId, userId))).map(_ => ()) - - case msg: ColumnInMsg => - columnHandler.handle(msg, HandlerContext(userId, outQueue)) - - case _ => - outQueue.offer(Json.toJson(ErrorMsg("Unknown message"))).map(_ => ()) - } - - case other => - Future.successful( - outQueue.offer(Json.toJson(ErrorMsg(s"Invalid JSON: $other"))) - ).map(_ => ()) - }.to(Sink.ignore) - - Flow.fromSinkAndSourceCoupled(inbound, outSource) + cc: ControllerComponents, + authenticatedWebSocket: AuthenticatedWebSocket, + @Named(ActorNames.ProjectActorRegistry) projectActorRegistry: ActorRef, + projectService: ProjectService + )(implicit system: ActorSystem, mat: Materializer, ec: ExecutionContext) + extends AbstractController(cc) { + + def joinProject(projectId: Int): WebSocket = authenticatedWebSocket { userToken => + projectService.isUserInActiveProject(userToken.userId, projectId).map { exists => + if (exists) { + Right( + ActorFlow.actorRef[JsValue, JsValue] { out => + actors.ProjectClientActor.props(out, userToken.userId, projectId, projectActorRegistry) + } + ) + } else { + Left(Results.Forbidden("User is not a member of this project")) + } } + } } + diff --git a/backend/app/dto/websocket/InMsg.scala b/backend/app/dto/websocket/InMsg.scala deleted file mode 100644 index c423a16..0000000 --- a/backend/app/dto/websocket/InMsg.scala +++ /dev/null @@ -1,12 +0,0 @@ -package dto.websocket - -sealed trait InMsg -object InMsg { - - case object Ping extends InMsg - case class Join(boardId: Int) extends InMsg - - // Column domain - sealed trait ColumnInMsg extends InMsg - case class MoveColumn(boardId: Int, columnId: Int, newPos: Int) extends ColumnInMsg -} diff --git a/backend/app/dto/websocket/OutMsg.scala b/backend/app/dto/websocket/OutMsg.scala deleted file mode 100644 index e51e6a0..0000000 --- a/backend/app/dto/websocket/OutMsg.scala +++ /dev/null @@ -1,12 +0,0 @@ -package dto.websocket - -sealed trait OutMsg -object OutMsg { - - case object Pong extends OutMsg - case class Joined(boardId: Int, userId: Int) extends OutMsg - case class ErrorMsg(error: String) extends OutMsg - - // column - case class ColumnMoved(boardId: Int, columnId: Int, newPos: Int) extends OutMsg -} diff --git a/backend/app/dto/websocket/OutgoingMessage.scala b/backend/app/dto/websocket/OutgoingMessage.scala new file mode 100644 index 0000000..f041acb --- /dev/null +++ b/backend/app/dto/websocket/OutgoingMessage.scala @@ -0,0 +1,12 @@ +package dto.websocket + +import dto.websocket.board.BoardMessage +import play.api.libs.json.Writes + +trait OutgoingMessage + +object OutgoingMessage { + implicit val writes: Writes[OutgoingMessage] = { + case bm: BoardMessage => BoardMessage.writes.writes(bm) + } +} diff --git a/backend/app/dto/websocket/board/BoardMessage.scala b/backend/app/dto/websocket/board/BoardMessage.scala new file mode 100644 index 0000000..05d22c8 --- /dev/null +++ b/backend/app/dto/websocket/board/BoardMessage.scala @@ -0,0 +1,19 @@ +package dto.websocket.board + +import play.api.libs.json._ +import dto.websocket.OutgoingMessage + +sealed trait BoardMessage extends OutgoingMessage + +case class ColumnMoved(columnId: Int, newPosition: Int) extends BoardMessage +case class TaskMoved(taskId: Int, fromColumnId: Int, toColumnId: Int, newPosition: Int) extends BoardMessage + +object BoardMessage { + implicit val columnMovedFormat: OFormat[ColumnMoved] = Json.format[ColumnMoved] + implicit val taskMovedFormat: OFormat[TaskMoved] = Json.format[TaskMoved] + + implicit val writes: Writes[BoardMessage] = Writes { + case cm: ColumnMoved => Json.obj("type" -> "columnMoved") ++ Json.toJsObject(cm) + case tm: TaskMoved => Json.obj("type" -> "taskMoved") ++ Json.toJsObject(tm) + } +} \ No newline at end of file diff --git a/backend/app/modules/ActorsModule.scala b/backend/app/modules/ActorsModule.scala new file mode 100644 index 0000000..a04eaa6 --- /dev/null +++ b/backend/app/modules/ActorsModule.scala @@ -0,0 +1,28 @@ +package modules + +import actors.ProjectActorRegistry +import com.google.inject.name.Named +import com.google.inject.{AbstractModule, Provides, Singleton} +import org.apache.pekko.actor.{ActorRef, ActorSystem, Props} +import play.api.Logger + +/** + * Constants for actor names to ensure consistency across the application. + */ +object ActorNames { + final val ProjectActorRegistry = "project-actor-registry" + final val ProjectActorPrefix = "project-actor-" +} + +/** + * Module to provide actor instances. + */ +class ActorsModule extends AbstractModule { + @Provides + @Singleton + @Named(ActorNames.ProjectActorRegistry) + def provideProjectActorRegistry(system: ActorSystem): ActorRef = { + Logger("actors").info("Creating ProjectActorRegistry actor") + system.actorOf(Props[ProjectActorRegistry], ActorNames.ProjectActorRegistry) + } +} diff --git a/backend/app/repositories/ColumnRepository.scala b/backend/app/repositories/ColumnRepository.scala index 1707768..abf275c 100644 --- a/backend/app/repositories/ColumnRepository.scala +++ b/backend/app/repositories/ColumnRepository.scala @@ -85,8 +85,12 @@ class ColumnRepository @Inject()( def findStatusIfUserInProject(columnId: Int, userId: Int): DBIO[Option[ColumnStatus]] = { val query = for { - (c, p) <- columns join projects on (_.projectId === _.id) - if c.id === columnId.bind && p.createdBy === userId.bind + (((c, p), up)) <- columns + .join(projects).on(_.projectId === _.id) + .join(userProjects).on { case ((c, p), up) => + p.id === up.projectId && up.userId === userId.bind + } + if c.id === columnId.bind } yield c.status query.result.headOption diff --git a/backend/app/services/BroadcastService.scala b/backend/app/services/BroadcastService.scala new file mode 100644 index 0000000..192997b --- /dev/null +++ b/backend/app/services/BroadcastService.scala @@ -0,0 +1,24 @@ +package services + +import org.apache.pekko.actor.ActorRef +import org.apache.pekko.util.Timeout +import actors.ProjectActorRegistry +import com.google.inject.name.Named +import dto.websocket.OutgoingMessage +import modules.ActorNames + +import javax.inject.Inject +import scala.concurrent.Future +import scala.concurrent.duration._ + +class BroadcastService @Inject() (@Named(ActorNames.ProjectActorRegistry) registry: ActorRef) { + import ProjectActorRegistry._ + + implicit val timeout: Timeout = Timeout(3.seconds) + + // Broadcast to project actor + def broadcastToProject(projectId: Int, message: OutgoingMessage): Future[Unit] = { + registry ! BroadcastToProject(projectId, message) + Future.successful(()) + } +} diff --git a/backend/app/services/ColumnService.scala b/backend/app/services/ColumnService.scala index 84676a2..1cf16ac 100644 --- a/backend/app/services/ColumnService.scala +++ b/backend/app/services/ColumnService.scala @@ -2,6 +2,7 @@ package services import dto.request.column.{CreateColumnRequest, UpdateColumnPositionRequest, UpdateColumnRequest} import dto.response.column.{ColumnSummariesResponse, ColumnWithTasksResponse} +import dto.websocket.board.ColumnMoved import exception.AppException import models.Enums.ColumnStatus import models.Enums.ColumnStatus.ColumnStatus @@ -17,6 +18,7 @@ import scala.concurrent.{ExecutionContext, Future} class ColumnService @Inject()( columnRepository: ColumnRepository, projectRepository: ProjectRepository, + broadcastService: BroadcastService, protected val dbConfigProvider: DatabaseConfigProvider )(implicit ec: ExecutionContext) extends HasDatabaseConfigProvider[JdbcProfile] { @@ -157,7 +159,10 @@ class ColumnService @Inject()( errorMsg = "Only archived columns can be deleted" ) - def updatePosition(columnId: Int, request: UpdateColumnPositionRequest, userId: Int): Future[Int] = { + def updatePosition(projectId: Int, + columnId: Int, + request: UpdateColumnPositionRequest, + userId: Int): Future[Int] = { val action = for { maybeStatus <- columnRepository.findStatusIfUserInProject( columnId, @@ -173,7 +178,14 @@ class ColumnService @Inject()( } } yield updatedRows - db.run(action) + val resultF: Future[Int] = db.run(action) + resultF.foreach { _ => + broadcastService.broadcastToProject( + projectId, + ColumnMoved(columnId, request.position) + ) + } + resultF } def getArchivedColumns(projectId: Int, userId: Int): Future[Seq[ColumnSummariesResponse]] = { diff --git a/backend/app/services/ProjectService.scala b/backend/app/services/ProjectService.scala index 6443e3e..00331bd 100644 --- a/backend/app/services/ProjectService.scala +++ b/backend/app/services/ProjectService.scala @@ -184,4 +184,8 @@ class ProjectService @Inject()( db.run(action) } + def isUserInActiveProject(userId: Int, projectId: Int): Future[Boolean] = { + db.run(projectRepository.isUserInActiveProject(userId, projectId)) + } + } diff --git a/backend/app/services/WebSocketService.scala b/backend/app/services/WebSocketService.scala new file mode 100644 index 0000000..4557c90 --- /dev/null +++ b/backend/app/services/WebSocketService.scala @@ -0,0 +1,47 @@ +package services + +import org.apache.pekko.actor.{ActorRef, ActorSystem} +import org.apache.pekko.stream.Materializer +import org.apache.pekko.stream.scaladsl.Flow +import play.api.db.slick.{DatabaseConfigProvider, HasDatabaseConfigProvider} +import play.api.libs.json.JsValue +import play.api.libs.streams.ActorFlow +import play.api.mvc.{Result, Results} +import repositories.ProjectRepository +import slick.jdbc.JdbcProfile + +import javax.inject._ +import scala.concurrent.{ExecutionContext, Future} + +/** + * WebSocket service to manage WebSocket connections. + */ +@Singleton +class WebSocketService @Inject()( + @Named("projectActorRegistry") projectActorRegistry: ActorRef, + projectRepository: ProjectRepository, + protected val dbConfigProvider: DatabaseConfigProvider +)(implicit system: ActorSystem, ec: ExecutionContext, mat: Materializer) + extends HasDatabaseConfigProvider[JdbcProfile] { + + /** + * Join a project WebSocket if the user is part of the project. + * @param userId the id of the user trying to join + * @param projectId the id of the project to join + * @return a Future containing either a Result (error) or a Flow for the WebSocket connection + */ + def joinProject( + userId: Int, + projectId: Int + ): Future[Either[Result, Flow[JsValue, JsValue, _]]] = { + db.run(projectRepository.isUserInProject(userId, projectId)).map { + case true => + Right(ActorFlow.actorRef[JsValue, JsValue] { out => + actors.ProjectClientActor + .props(out, userId, projectId, projectActorRegistry) + }) + case false => + Left(Results.Forbidden("You are not a member of this project")) + } + } +} diff --git a/backend/app/websocket/codecs/ColumnCodecs.scala b/backend/app/websocket/codecs/ColumnCodecs.scala deleted file mode 100644 index 796d29c..0000000 --- a/backend/app/websocket/codecs/ColumnCodecs.scala +++ /dev/null @@ -1,11 +0,0 @@ -package websocket.codecs - -import dto.websocket.{InMsg, OutMsg} -import play.api.libs.json.{Json, OFormat, OWrites} - -object ColumnCodecs { - implicit val moveColumnFormat: OFormat[InMsg.MoveColumn] = Json.format[InMsg.MoveColumn] - - implicit val columnMovedWrites: OWrites[OutMsg.ColumnMoved] = - Json.writes[OutMsg.ColumnMoved].transform(_ ++ Json.obj("type" -> "columnMoved")) -} diff --git a/backend/app/websocket/codecs/DomainCodecs.scala b/backend/app/websocket/codecs/DomainCodecs.scala deleted file mode 100644 index ea6620f..0000000 --- a/backend/app/websocket/codecs/DomainCodecs.scala +++ /dev/null @@ -1,19 +0,0 @@ -package websocket.codecs - -import dto.websocket.{InMsg, OutMsg} -import play.api.libs.json.{Json, OFormat, OWrites} - -object DomainCodecs { - // Join read/write - implicit val joinFormat: OFormat[InMsg.Join] = Json.format[InMsg.Join] - - // Out writes for simple system messages - implicit val pongWrites: OWrites[OutMsg.Pong.type] = - OWrites(_ => Json.obj("type" -> "pong")) - - implicit val joinedWrites: OWrites[OutMsg.Joined] = - Json.writes[OutMsg.Joined].transform(_ ++ Json.obj("type" -> "joined")) - - implicit val errorWrites: OWrites[OutMsg.ErrorMsg] = - Json.writes[OutMsg.ErrorMsg].transform(_ ++ Json.obj("type" -> "error")) -} diff --git a/backend/app/websocket/codecs/WebSocketCodecs.scala b/backend/app/websocket/codecs/WebSocketCodecs.scala deleted file mode 100644 index 77d2f78..0000000 --- a/backend/app/websocket/codecs/WebSocketCodecs.scala +++ /dev/null @@ -1,39 +0,0 @@ -package websocket.codecs - -import play.api.libs.json._ -import DomainCodecs._ -import ColumnCodecs._ -import dto.websocket.{InMsg, OutMsg} - -object WebSocketCodecs { - // InMsg: read "type" then delegate to domain Reads - implicit val inMsgReads: Reads[InMsg] = Reads { js => - (js \ "type").validate[String].flatMap { - case t if t.equalsIgnoreCase("ping") => - JsSuccess(InMsg.Ping) - - case t if t.equalsIgnoreCase("join") => - js.validate[InMsg.Join] - - case t if t.equalsIgnoreCase("columnMoved") || t.equalsIgnoreCase("moveColumn") => - // normalize naming: frontend might send "moveColumn" or "MoveColumn" - js.validate[InMsg.MoveColumn] - - // add more cases for other domains: - // case t if t.equalsIgnoreCase("moveTask") => js.validate[InMsg.MoveTask] - - case other => - JsError(s"Unknown type: $other") - } - } - - // OutMsg: choose correct domain writer (these writers are imported from domain codec objects) - implicit val outMsgWrites: Writes[OutMsg] = Writes { - case OutMsg.Pong => Json.toJson(OutMsg.Pong)(pongWrites) - case j: OutMsg.Joined => Json.toJson(j)(joinedWrites) - case c: OutMsg.ColumnMoved => Json.toJson(c)(columnMovedWrites) - case e: OutMsg.ErrorMsg => Json.toJson(e)(errorWrites) - // add Task/Comment branches here later - } -} - diff --git a/backend/app/websocket/handlers/ColumnHandler.scala b/backend/app/websocket/handlers/ColumnHandler.scala deleted file mode 100644 index d32272e..0000000 --- a/backend/app/websocket/handlers/ColumnHandler.scala +++ /dev/null @@ -1,29 +0,0 @@ -package websocket.handlers - -import com.google.inject.Inject -import websocket.codecs.ColumnCodecs._ -import websocket.codecs.DomainCodecs._ -import dto.request.column.UpdateColumnPositionRequest -import dto.websocket.InMsg.{ColumnInMsg, MoveColumn} -import dto.websocket.OutMsg -import play.api.libs.json.Json -import services.ColumnService - -import scala.concurrent.{ExecutionContext, Future} - -class ColumnHandler @Inject() (columnService: ColumnService)(implicit ec: ExecutionContext) - extends Handler[ColumnInMsg] { - - override def handle(msg: ColumnInMsg, ctx: HandlerContext): Future[Unit] = msg match { - case MoveColumn(boardId, columnId, newPos) => - columnService - .updatePosition(columnId, UpdateColumnPositionRequest(newPos), ctx.userId) - .map { _ => - ctx.outQueue.offer(Json.toJson(OutMsg.ColumnMoved(boardId, columnId, newPos))) - () - } - .recover { case ex => - ctx.outQueue.offer(Json.toJson(OutMsg.ErrorMsg(ex.getMessage))) - } - } -} diff --git a/backend/app/websocket/handlers/Handler.scala b/backend/app/websocket/handlers/Handler.scala deleted file mode 100644 index 49800dd..0000000 --- a/backend/app/websocket/handlers/Handler.scala +++ /dev/null @@ -1,16 +0,0 @@ -package websocket.handlers - -import dto.websocket.InMsg -import org.apache.pekko.stream.scaladsl.SourceQueueWithComplete -import play.api.libs.json.JsValue - -import scala.concurrent.Future - -trait Handler[M <: InMsg] { - def handle(msg: M, ctx: HandlerContext): Future[Unit] -} - -case class HandlerContext( - userId: Int, - outQueue: SourceQueueWithComplete[JsValue] -) diff --git a/backend/conf/application.conf b/backend/conf/application.conf index 57f066d..9134b60 100644 --- a/backend/conf/application.conf +++ b/backend/conf/application.conf @@ -16,6 +16,7 @@ include "security.conf" # Enable modules play.modules.enabled += "modules.SecurityModule" play.modules.enabled += "modules.DatabaseModule" +play.modules.enabled += "modules.ActorsModule" # Enable filters play.filters.enabled += "play.filters.gzip.GzipFilter" diff --git a/backend/conf/logback.xml b/backend/conf/logback.xml index ab6c2b1..43c5253 100644 --- a/backend/conf/logback.xml +++ b/backend/conf/logback.xml @@ -41,6 +41,7 @@ + diff --git a/backend/conf/routes b/backend/conf/routes index 5f9a9d5..4336785 100644 --- a/backend/conf/routes +++ b/backend/conf/routes @@ -45,7 +45,7 @@ PATCH /api/projects/:projectId/columns/:columnId controllers.ColumnController PATCH /api/columns/:columnId/archive controllers.ColumnController.archive(columnId: Int) PATCH /api/columns/:columnId/restore controllers.ColumnController.restore(columnId: Int) DELETE /api/columns/:columnId controllers.ColumnController.delete(columnId: Int) -PATCH /api/columns/:columnId/position controllers.ColumnController.updatePosition(columnId: Int) +PATCH /api/projects/:projectId/columns/:columnId/position controllers.ColumnController.updatePosition(projectId: Int, columnId: Int) # Task routes POST /api/columns/:columnId/tasks controllers.TaskController.create(columnId: Int) @@ -56,4 +56,4 @@ PATCH /api/tasks/:taskId/restore controllers.TaskController.restore(taskId DELETE /api/tasks/:taskId controllers.TaskController.delete(taskId: Int) #Web socket -GET /ws controllers.WebSocketController.ws \ No newline at end of file +GET /ws/project/:projectId controllers.WebSocketController.joinProject(projectId: Int) \ No newline at end of file From fde73fee7e46e53fe28221867470da9f1912d239 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Nguy=E1=BB=85n=20L=C3=A2m=20Nh=E1=BA=ADt?= Date: Fri, 12 Sep 2025 14:27:26 +0700 Subject: [PATCH 4/4] test: unit test for join websocket room and move clolumn --- .../UserWorkspaceRepository.scala | 29 ----- backend/app/services/Rooms.scala | 26 ---- backend/app/services/WebSocketService.scala | 47 ------- backend/build.sbt | 3 +- .../controllers/ColumnControllerSpec.scala | 29 ++++- .../WebSocketControllerISpec.scala | 123 ++++++++++++++++++ 6 files changed, 148 insertions(+), 109 deletions(-) delete mode 100644 backend/app/repositories/UserWorkspaceRepository.scala delete mode 100644 backend/app/services/Rooms.scala delete mode 100644 backend/app/services/WebSocketService.scala create mode 100644 backend/test/controllers/WebSocketControllerISpec.scala diff --git a/backend/app/repositories/UserWorkspaceRepository.scala b/backend/app/repositories/UserWorkspaceRepository.scala deleted file mode 100644 index 074fa38..0000000 --- a/backend/app/repositories/UserWorkspaceRepository.scala +++ /dev/null @@ -1,29 +0,0 @@ -package repositories - -import models.entities.UserWorkspace -import models.tables.TableRegistry -import play.api.db.slick.{DatabaseConfigProvider, HasDatabaseConfigProvider} -import slick.jdbc.JdbcProfile - -import javax.inject.{Inject, Singleton} -import scala.concurrent.ExecutionContext - -@Singleton -class UserWorkspaceRepository @Inject()( - protected val dbConfigProvider: DatabaseConfigProvider -)(implicit ec: ExecutionContext) - extends HasDatabaseConfigProvider[JdbcProfile] { - - import profile.api._ - private val userWorkspaces = TableRegistry.userWorkspaces - - /** - * Creates a DBIO action to insert a new UserWorkspace record into the database. - * The action will only be executed when passed to the `db.run` method. - * @param userWorkspace The UserWorkspace entity to insert. - * @return A DBIO action that returns the number of rows affected. - */ - def insertAction(userWorkspace: UserWorkspace): DBIO[Int] = { - userWorkspaces += userWorkspace - } -} diff --git a/backend/app/services/Rooms.scala b/backend/app/services/Rooms.scala deleted file mode 100644 index 119921d..0000000 --- a/backend/app/services/Rooms.scala +++ /dev/null @@ -1,26 +0,0 @@ -package services - -import org.apache.pekko.NotUsed -import org.apache.pekko.stream.{Materializer, OverflowStrategy} -import org.apache.pekko.stream.scaladsl.{BroadcastHub, Keep, Source, SourceQueueWithComplete} - -import javax.inject.{Inject, Singleton} - -@Singleton -class Rooms @Inject()(implicit mat: Materializer) { - - private var rooms: Map[Int, (SourceQueueWithComplete[String], Source[String, NotUsed])] = Map.empty - - def room(boardId: Int): (SourceQueueWithComplete[String], Source[String, NotUsed]) = - rooms.getOrElse(boardId, { - // Each board has its own queue + broadcast hub - val (queue, src) = Source - .queue[String](bufferSize = 128, overflowStrategy = OverflowStrategy.dropHead) - .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both) - .run() - - val room = (queue, src) - rooms += boardId -> room - room - }) -} diff --git a/backend/app/services/WebSocketService.scala b/backend/app/services/WebSocketService.scala deleted file mode 100644 index 4557c90..0000000 --- a/backend/app/services/WebSocketService.scala +++ /dev/null @@ -1,47 +0,0 @@ -package services - -import org.apache.pekko.actor.{ActorRef, ActorSystem} -import org.apache.pekko.stream.Materializer -import org.apache.pekko.stream.scaladsl.Flow -import play.api.db.slick.{DatabaseConfigProvider, HasDatabaseConfigProvider} -import play.api.libs.json.JsValue -import play.api.libs.streams.ActorFlow -import play.api.mvc.{Result, Results} -import repositories.ProjectRepository -import slick.jdbc.JdbcProfile - -import javax.inject._ -import scala.concurrent.{ExecutionContext, Future} - -/** - * WebSocket service to manage WebSocket connections. - */ -@Singleton -class WebSocketService @Inject()( - @Named("projectActorRegistry") projectActorRegistry: ActorRef, - projectRepository: ProjectRepository, - protected val dbConfigProvider: DatabaseConfigProvider -)(implicit system: ActorSystem, ec: ExecutionContext, mat: Materializer) - extends HasDatabaseConfigProvider[JdbcProfile] { - - /** - * Join a project WebSocket if the user is part of the project. - * @param userId the id of the user trying to join - * @param projectId the id of the project to join - * @return a Future containing either a Result (error) or a Flow for the WebSocket connection - */ - def joinProject( - userId: Int, - projectId: Int - ): Future[Either[Result, Flow[JsValue, JsValue, _]]] = { - db.run(projectRepository.isUserInProject(userId, projectId)).map { - case true => - Right(ActorFlow.actorRef[JsValue, JsValue] { out => - actors.ProjectClientActor - .props(out, userId, projectId, projectActorRegistry) - }) - case false => - Left(Results.Forbidden("You are not a member of this project")) - } - } -} diff --git a/backend/build.sbt b/backend/build.sbt index 216e11b..00324d9 100644 --- a/backend/build.sbt +++ b/backend/build.sbt @@ -69,8 +69,7 @@ lazy val root = (project in file(".")) coverageExcludedPackages := Seq( "controllers\\.javascript\\..*", "controllers.Reverse.*", - "dto\\.request\\..*", - "dto\\.response\\..*", + "dto\\..*", "filters\\..*", "mappers\\..*", "models\\..*", diff --git a/backend/test/controllers/ColumnControllerSpec.scala b/backend/test/controllers/ColumnControllerSpec.scala index f83973b..5bd0da4 100644 --- a/backend/test/controllers/ColumnControllerSpec.scala +++ b/backend/test/controllers/ColumnControllerSpec.scala @@ -1,6 +1,10 @@ package controllers -import dto.request.column.{CreateColumnRequest, UpdateColumnPositionRequest, UpdateColumnRequest} +import dto.request.column.{ + CreateColumnRequest, + UpdateColumnPositionRequest, + UpdateColumnRequest +} import exception.AppException import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.ScalaFutures @@ -12,7 +16,7 @@ import play.api.mvc.Cookie import play.api.test.Helpers._ import play.api.test._ import play.api.{Application, Configuration} -import services.{ColumnService, JwtService, ProjectService, UserToken, WorkspaceService} +import services.{JwtService, ProjectService, UserToken, WorkspaceService} class ColumnControllerSpec extends PlaySpec @@ -49,7 +53,6 @@ class ColumnControllerSpec override def beforeAll(): Unit = { val workspaceService = inject[WorkspaceService] val projectService = inject[ProjectService] - val columnService = inject[ColumnService] await( workspaceService.createWorkspace( @@ -214,11 +217,11 @@ class ColumnControllerSpec } "delete column successfully" in { - val archiveRequest = FakeRequest(PATCH, "/api/columns/1/archive") + val archiveRequest = FakeRequest(PATCH, "/api/columns/2/archive") .withCookies(Cookie(cookieName, fakeToken)) route(app, archiveRequest).get - val request = FakeRequest(DELETE, "/api/columns/1") + val request = FakeRequest(DELETE, "/api/columns/2") .withCookies(Cookie(cookieName, fakeToken)) val result = route(app, request).get @@ -227,4 +230,20 @@ class ColumnControllerSpec .as[String] mustBe "Column deleted successfully" } } + + "update column position successfully" in { + val columnPosition = 100 + + val body = Json.toJson(UpdateColumnPositionRequest(columnPosition)) + val request = FakeRequest(PATCH, "/api/projects/1/columns/1/position") + .withCookies(Cookie(cookieName, fakeToken)) + .withBody(body) + .withHeaders(CONTENT_TYPE -> "application/json") + + val result = route(app, request).get + + status(result) mustBe OK + (contentAsJson(result) \ "message") + .as[String] mustBe "Column position updated successfully" + } } diff --git a/backend/test/controllers/WebSocketControllerISpec.scala b/backend/test/controllers/WebSocketControllerISpec.scala new file mode 100644 index 0000000..686df18 --- /dev/null +++ b/backend/test/controllers/WebSocketControllerISpec.scala @@ -0,0 +1,123 @@ +package controllers + +import akka.actor.ActorSystem +import akka.stream.Materializer +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.ScalaFutures +import org.scalatestplus.play._ +import org.scalatestplus.play.guice.GuiceOneAppPerSuite +import play.api.inject.guice.GuiceApplicationBuilder +import play.api.mvc.Cookie +import play.api.test.Helpers._ +import play.api.test.{FakeRequest, Injecting} +import play.api.{Application, Configuration} +import services.{JwtService, ProjectService, UserToken, WorkspaceService} + +import scala.concurrent.{ExecutionContext, Future} + +class WebSocketControllerISpec + extends PlaySpec + with Injecting + with GuiceOneAppPerSuite + with ScalaFutures + with BeforeAndAfterAll { + + override implicit def fakeApplication(): Application = { + new GuiceApplicationBuilder() + .configure( + "config.resource" -> "application.test.conf", + "slick.dbs.default.db.url" + -> s"jdbc:h2:mem:websocket;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE;MODE=PostgreSQL;DATABASE_TO_UPPER=false" + ) + .build() + } + + implicit lazy val system: ActorSystem = app.injector.instanceOf[ActorSystem] + implicit lazy val mat: Materializer = app.injector.instanceOf[Materializer] + implicit lazy val ec: ExecutionContext = + app.injector.instanceOf[ExecutionContext] + lazy val controller: WebSocketController = + app.injector.instanceOf[WebSocketController] + + lazy val config: Configuration = app.configuration + lazy val defaultAdminEmail: String = + config.getOptional[String]("admin.email").getOrElse("admin@mail.com") + lazy val defaultAdminName: String = + config.getOptional[String]("admin.name").getOrElse("Administrator") + lazy val cookieName: String = + config.getOptional[String]("cookie.name").getOrElse("auth_token") + + def fakeToken: String = { + val jwtService = inject[JwtService] + jwtService + .generateToken(UserToken(1, defaultAdminName, defaultAdminEmail)) + .getOrElse(throw new RuntimeException("JWT token not generated")) + } + + override def beforeAll(): Unit = { + val workspaceService = inject[WorkspaceService] + val projectService = inject[ProjectService] + + await( + workspaceService.createWorkspace( + dto.request.workspace.CreateWorkspaceRequest("Workspace test"), + 1 + ) + ) + + await( + // Create a project with default columns + projectService.createProject( + dto.request.project.CreateProjectRequest("Project test"), + 1, + 1 + ) + ) + } + + "WebSocketController#joinProject" should { + + "reject if user is not in project" in { + // fake user token → projectService trả về false + + val request = FakeRequest(GET, "/ws/projects/123") + .withCookies( + Cookie(cookieName, fakeToken) + ) + val inExpectedProjectId = -1 + + val wsFuture = controller.joinProject(inExpectedProjectId).apply(request) + + val either = await(wsFuture) + + either.isLeft mustBe true + + val result = either.left.get + + status(Future.successful(result)) mustBe FORBIDDEN + contentAsString(Future.successful(result)) must include( + "User is not a member of this project" + ) + + } + + "accept if user is in project" in { + // giả lập user trong project → cần mock ProjectService + // Ở đây mình dùng application.conf test để override + + val request = FakeRequest(GET, "/ws/projects/1") + .withCookies( + Cookie(cookieName, fakeToken) + ) + val wsFuture = controller.joinProject(1).apply(request) + + val either = await(wsFuture) + + either.isRight mustBe true + + val flow = either.toOption.get + + flow must not be null + } + } +}