Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
302 changes: 195 additions & 107 deletions backend/src/main/java/io/f1/backend/domain/game/app/RoomService.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
package io.f1.backend.domain.game.websocket;

import static io.f1.backend.domain.game.app.RoomService.ROOM_LOCK_PREFIX;
import static io.f1.backend.domain.game.app.RoomService.USER_LOCK_PREFIX;
import static io.f1.backend.domain.game.websocket.WebSocketUtils.getUserDestination;

import io.f1.backend.domain.game.app.RoomService;
import io.f1.backend.domain.game.dto.MessageType;
import io.f1.backend.domain.game.dto.response.HeartbeatResponse;
import io.f1.backend.domain.user.dto.UserPrincipal;
import io.f1.backend.global.lock.LockExecutor;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.springframework.messaging.simp.user.SimpSession;
import org.springframework.messaging.simp.user.SimpUser;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.stereotype.Component;

import java.security.Principal;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Component
@RequiredArgsConstructor
public class HeartbeatMonitor {
Expand All @@ -32,6 +36,7 @@ public class HeartbeatMonitor {
private final MessageSender messageSender;
private final RoomService roomService;
private final SimpUserRegistry simpUserRegistry;
private final LockExecutor lockExecutor;

@Scheduled(fixedDelay = HEARTBEAT_CHECK_INTERVAL_MS)
public void monitorClientHeartbeat() {
Expand All @@ -58,25 +63,34 @@ private void handleSessionHeartbeat(SimpUser user, SimpSession session) {
new HeartbeatResponse(DIRECTION),
user.getName());

// todo FE 개발 될때까지 주석 처리
// missedPongCounter.merge(sessionId, 1, Integer::sum);
// int missedCnt = missedPongCounter.get(sessionId);
//
// /* max_missed_heartbeats 이상 pong 이 안왔을때 - disconnect 처리 */
// if (missedCnt >= MAX_MISSED_HEARTBEATS) {
//
// Principal principal = user.getPrincipal();
//
// if (principal instanceof UsernamePasswordAuthenticationToken token
// && token.getPrincipal() instanceof UserPrincipal userPrincipal) {
//
// Long userId = userPrincipal.getUserId();
// Long roomId = roomService.getRoomIdByUserId(userId);
//
// roomService.disconnectOrExitRoom(roomId, userPrincipal);
// }
// cleanSession(sessionId);
// }
missedPongCounter.merge(sessionId, 1, Integer::sum);
int missedCnt = missedPongCounter.get(sessionId);

/* max_missed_heartbeats 이상 pong 이 안왔을때 - disconnect 처리 */
if (missedCnt >= MAX_MISSED_HEARTBEATS) {

Principal principal = user.getPrincipal();

if (principal instanceof UsernamePasswordAuthenticationToken token
&& token.getPrincipal() instanceof UserPrincipal userPrincipal) {

Long userId = userPrincipal.getUserId();
Long roomId = roomService.getRoomIdByUserIdWithLock(userId);

lockExecutor.executeWithLock(
USER_LOCK_PREFIX,
userId,
() -> {
lockExecutor.executeWithLock(
ROOM_LOCK_PREFIX,
roomId,
() -> {
roomService.disconnectOrExitRoom(roomId, userPrincipal);
});
});
}
cleanSession(sessionId);
}
}

public void resetMissedPongCount(String sessionId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.f1.backend.domain.game.websocket.controller;

import static io.f1.backend.domain.game.websocket.WebSocketUtils.getSessionId;
import static io.f1.backend.domain.game.websocket.WebSocketUtils.getSessionUser;

import io.f1.backend.domain.game.app.ChatService;
Expand Down Expand Up @@ -33,22 +34,25 @@ public void initializeRoomSocket(@DestinationVariable Long roomId, Message<?> me
UserPrincipal principal = getSessionUser(message);

roomService.initializeRoomSocket(roomId, principal);
roomService.addSessionRoomId(getSessionId(message), roomId);
}

@MessageMapping("/room/reconnect/{roomId}")
public void reconnect(@DestinationVariable Long roomId, Message<?> message) {

UserPrincipal principal = getSessionUser(message);
roomService.changeConnectedStatus(roomId, principal.getUserId(), ConnectionState.CONNECTED);
roomService.reconnectSendResponse(roomId, principal);
roomService.changeConnectedStatusWithLock(
roomId, principal.getUserId(), ConnectionState.CONNECTED);
roomService.reconnectSendResponseWithLock(roomId, principal);
roomService.addSessionRoomId(getSessionId(message), roomId);
}

@MessageMapping("/room/exit/{roomId}")
public void exitRoom(@DestinationVariable Long roomId, Message<?> message) {

UserPrincipal principal = getSessionUser(message);

roomService.exitRoom(roomId, principal);
roomService.exitRoomWithLock(roomId, principal);
}

@MessageMapping("/room/start/{roomId}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ public class HeartbeatController {
public void handlePong(Message<?> message) {
String sessionId = getSessionId(message);

// todo FE 개발 될때까지 주석 처리
// heartbeatMonitor.resetMissedPongCount(sessionId);
heartbeatMonitor.resetMissedPongCount(sessionId);
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package io.f1.backend.domain.game.websocket.eventlistener;

import static io.f1.backend.domain.game.app.RoomService.ROOM_LOCK_PREFIX;
import static io.f1.backend.domain.game.app.RoomService.USER_LOCK_PREFIX;
import static io.f1.backend.domain.game.websocket.WebSocketUtils.getSessionUser;

import io.f1.backend.domain.game.app.RoomService;
import io.f1.backend.domain.game.model.ConnectionState;
import io.f1.backend.domain.game.websocket.DisconnectTaskManager;
import io.f1.backend.domain.game.websocket.HeartbeatMonitor;
import io.f1.backend.domain.user.dto.UserPrincipal;
import io.f1.backend.global.lock.LockExecutor;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -24,6 +27,7 @@ public class WebsocketEventListener {
private final RoomService roomService;
private final DisconnectTaskManager taskManager;
private final HeartbeatMonitor heartbeatMonitor;
private final LockExecutor lockExecutor;

@EventListener
public void handleDisconnectedListener(SessionDisconnectEvent event) {
Expand All @@ -32,26 +36,45 @@ public void handleDisconnectedListener(SessionDisconnectEvent event) {
UserPrincipal principal = getSessionUser(message);

Long userId = principal.getUserId();
String sessionId = event.getSessionId();

// todo FE 개발 될때까지 주석 처리
// heartbeatMonitor.cleanSession(event.getSessionId());
heartbeatMonitor.cleanSession(sessionId);
Long roomId = roomService.getRoomIdBySessionId(sessionId);
roomService.removeSessionRoomId(sessionId);

/* 정상 로직 */
if (!roomService.isUserInAnyRoom(userId)) {
return;
}

Long roomId = roomService.getRoomIdByUserId(userId);
if (!roomService.existsRoom(roomId)) {
return;
}

roomService.changeConnectedStatus(roomId, userId, ConnectionState.DISCONNECTED);
lockExecutor.executeWithLock(
ROOM_LOCK_PREFIX,
roomId,
() ->
roomService.changeConnectedStatus(
roomId, userId, ConnectionState.DISCONNECTED));

taskManager.scheduleDisconnectTask(
userId,
() -> {
if (ConnectionState.DISCONNECTED.equals(
roomService.getPlayerState(userId, roomId))) {
roomService.disconnectOrExitRoom(roomId, principal);
}
lockExecutor.executeWithLock(
USER_LOCK_PREFIX,
userId,
() -> {
lockExecutor.executeWithLock(
ROOM_LOCK_PREFIX,
roomId,
() -> {
if (ConnectionState.DISCONNECTED.equals(
roomService.getPlayerState(userId, roomId))) {
roomService.disconnectOrExitRoom(roomId, principal);
}
});
});
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ public enum CommonErrorCode implements ErrorCode {
INTERNAL_SERVER_ERROR(
"E500001", HttpStatus.INTERNAL_SERVER_ERROR, "서버에러가 발생했습니다. 관리자에게 문의해주세요."),
INVALID_JSON_FORMAT("E400008", HttpStatus.BAD_REQUEST, "요청 형식이 올바르지 않습니다. JSON 문법을 확인해주세요."),
LOCK_ACQUISITION_FAILED("E409003", HttpStatus.CONFLICT, "다른 요청이 작업 중입니다. 잠시 후 다시 시도해주세요.");
LOCK_ACQUISITION_FAILED("E409003", HttpStatus.CONFLICT, "다른 요청이 작업 중입니다. 잠시 후 다시 시도해주세요."),
LOCK_INTERRUPTED("E500004", HttpStatus.INTERNAL_SERVER_ERROR, "작업 중 락 획득이 중단되었습니다. 다시 시도해주세요.");

private final String code;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import io.f1.backend.global.exception.errorcode.ErrorCode;
import io.f1.backend.global.exception.response.ErrorResponse;

import jakarta.servlet.http.HttpServletRequest;

import lombok.extern.slf4j.Slf4j;

import org.springframework.http.ResponseEntity;
Expand All @@ -28,8 +30,13 @@ public ResponseEntity<ErrorResponse> handleCustomException(CustomException e) {
}

@ExceptionHandler(Exception.class)
public ResponseEntity<ErrorResponse> handleException(Exception e) {
public ResponseEntity<ErrorResponse> handleException(Exception e, HttpServletRequest request) {
log.warn("handleException: {}", e.getMessage());

if ("text/event-stream".equals(request.getHeader("Accept"))) {
return ResponseEntity.noContent().build();
}

CommonErrorCode errorCode = CommonErrorCode.INTERNAL_SERVER_ERROR;

ErrorResponse response = new ErrorResponse(errorCode.getCode(), errorCode.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
@RequiredArgsConstructor
public class DistributedLockAspect {

private static final String LOCK_KEY_FORMAT = "lock:%s:{%s}";
public static final String LOCK_KEY_FORMAT = "lock:%s:{%s}";

private final RedissonClient redissonClient;

Expand Down
73 changes: 73 additions & 0 deletions backend/src/main/java/io/f1/backend/global/lock/LockExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.f1.backend.global.lock;

import static io.f1.backend.global.lock.DistributedLockAspect.LOCK_KEY_FORMAT;

import io.f1.backend.global.exception.CustomException;
import io.f1.backend.global.exception.errorcode.CommonErrorCode;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

@Slf4j
@Component
@RequiredArgsConstructor
public class LockExecutor {

private final RedissonClient redissonClient;

// 시간단위를 초로 변경
private static final TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;

// 락 점유를 위한 대기 시간
private static final long DEFAULT_WAIT_TIME = 5L;

// 락 점유 시간
private static final long DEFAULT_LEASE_TIME = 3L;

public <T> T executeWithLock(String prefix, Object key, Supplier<T> supplier) {
String lockKey = formatLockKey(prefix, key);
RLock rlock = redissonClient.getLock(lockKey);

boolean acquired = false;
try {
acquired = rlock.tryLock(DEFAULT_WAIT_TIME, DEFAULT_LEASE_TIME, DEFAULT_TIME_UNIT);

if (!acquired) {
log.warn("[LockExecutor] Lock acquisition failed: {}", key);
throw new CustomException(CommonErrorCode.LOCK_ACQUISITION_FAILED);
}
log.info("[LockExecutor] Lock acquired: {}", key);

return supplier.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CustomException(CommonErrorCode.LOCK_INTERRUPTED);
} finally {
if (acquired && rlock.isHeldByCurrentThread()) {
rlock.unlock();
log.info("[LockExecutor] Lock released: {}", key);
}
}
}

public void executeWithLock(String prefix, Object key, Runnable runnable) {
executeWithLock(
prefix,
key,
() -> {
runnable.run();
return null;
});
}

private String formatLockKey(String prefix, Object value) {
return String.format(LOCK_KEY_FORMAT, prefix, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ static class TestRoomService extends RoomService {
private final Map<Long, Room> rooms = new ConcurrentHashMap<>();

public TestRoomService() {
super(null, null, null, null, null, null);
super(null, null, null, null, null, null, null);
}

@Override
Expand Down
Loading