-
Notifications
You must be signed in to change notification settings - Fork 24
[임정현] sprint12 #308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 임정현
Are you sure you want to change the base?
[임정현] sprint12 #308
The head ref may contain hidden characters: "\uC784\uC815\uD604-sprint12"
Conversation
- build.gradle
- WebSocketConfig.java - MessageWebSocketController.java
- SseController.java - SseEmitterRepository.java - SseMessageRepository.java - SseService.java
- eventIds -> eventIdQueue
- BasicNotificationService.create
이벤트명 변경 - notifications.created - binaryContents.updated - channels.* - users.*
- Client -> Reverse Proxy(NginX) -> Backend -> DB, Redis, Kafka
- JwtAuthenticationChannelInterceptor.java
- docker-compose.yml - default.conf
- docker-compose.yml - default.conf
- docker-compose.yml - default.conf - RedisConfig.java - UserLogInOutEvent.java - RedisJwtRegistry.java - RedisLockProvider.java Least Conn 방식 Nginx 로드 밸런서 구현
|
변경 사항이 너무 많이 잡혀서 리뷰하기가 힘들어요 ㅠㅠ |
반영했습니다! |
| } | ||
|
|
||
| for (String userKey : userKeys) { | ||
| List<Object> tokens = redisTemplate.opsForList().range(userKey, 0, -1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
token 캐싱할때랑 불러 올때 list로 관리하니
불러올때 매번 전체 리스트로 가져오는게 너무 비효율적으로 보여요.
| @Query("SELECT DISTINCT m.author.id " + | ||
| "FROM Message m JOIN m.attachments a " + | ||
| "WHERE a.id =:attachmentId") | ||
| List<UUID> findAuthorIdsByAttachmentId(UUID attachmentId); | ||
|
|
||
| @Query("SELECT DISTINCT m.channel.id " + | ||
| "FROM Message m JOIN m.attachments a " + | ||
| "WHERE a.id = :attachmentId") | ||
| List<UUID> findChannelIdsByAttachmentId(UUID attachmentId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
이 쿼리들 보니까 결국 attachmentId 가지고 데이터 조회하는건데
굳이 전체 메시지랑 attachement랑 조인할 필요가 있는지 모르겠습니다.
attchement 먼저 조회하고 거기에 meesage랑 연결 시킬 정보있으면 그걸로 한번더 조회하시는게 어떠신가요.
그렇게 하면 쿼리는 두번이지만 조인은 안해도 되거든요.
| for (var e : origin.entrySet()) { | ||
| for (var info : e.getValue()) { | ||
| if (refreshToken.equals(info.refreshToken())) { targetUserId = e.getKey(); old = info; break; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
모든 토큰 전체 탐색하면서 찾는건 너무 비효율적인것 같아요.
자료구조를 잘 써서 다시 디자인 해보시는게 어떠신가요.
| Queue<JwtInformation> q = origin.get(targetUserId); | ||
| synchronized (q) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
조금 어려울 수 있는 내용이긴한데요.
origin이 ConcurrentHashMap 으로 만드셨는데, HashMap에서 동시성 문제가 발생할 수 있는걸 방지하기 위해서 ConcurrentHashMap을 만드신걸거에요. 그래서 synchronized를 쓸 필요 없이 ConcurrentHashMap의 기능을 잘 쓰시면 될거 같아요.
| for (UUID channelId : channelIds) { | ||
| readStatusRepository.findAllByChannelIdWithUserAndNotificationEnabledTrue(channelId) | ||
| .forEach(rs -> receiverIds.add(rs.getUser().getId())); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
지금 처럼 channelId: UUID 하나만 받아서 쿼리 하는 대신에
channelIds:List 를 받아서 조회할 하는 쿼리를 만들어서 사용해주세요. 지금은 N + 1 문제가 있어요.
| private void sseAfterCommit(Collection<UUID> receiverIds, String eventName, Object dto) { | ||
| if (TransactionSynchronizationManager.isActualTransactionActive()) { | ||
| TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { | ||
| @Override public void afterCommit() { sseService.send(receiverIds, eventName, dto); } | ||
| }); | ||
| } else { | ||
| sseService.send(receiverIds, eventName, dto); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
아마 이건 스프린트에서 의도한건 SpringEvent로 쏘고 그걸 컨슈밍하는 타이밍을 트랜잭션 끝날때로 하라고 요구한거 일거 같은데, 다르게 하셨네요. 덕분에 저도 새로운것 알았습니다
기본 요구사항
웹소켓 구현하기
spring-boot-starter-websocket의존성을 추가하세요.웹소켓 메시지 브로커 설정
메모리 기반
SimpleBroker를 사용하세요./sub으로 설정하세요./pub으로 설정하세요./ws로 설정하고,SockJS연결을 지원해야 합니다.첨부파일이 없는 단순 텍스트 메시지인 경우 STOMP를 통해 메시지를 전송할 수 있도록 컨트롤러를 구현하세요.
/pub/messages엔드포인트에 메시지를 전송할 수 있어야 합니다.@MessageMapping을 활용하세요.MessageCreateRequest를 그대로 활용합니다.첨부파일이 포함된 메시지는 기존의 API (
POST /api/messages)를 그대로 활용합니다.클라이언트는 채널 입장 시 웹소켓으로
/sub/channels.{channelId}.messages를 구독해 메시지를 수신합니다.이를 고려해 메시지가 생성되면 해당 엔드포인트로 메시지를 보내는 컴포넌트를 구현하세요.
MessageCreatedEvent를 통해 새로운 메시지 생성 이벤트를 확인하세요.SimpMessagingTemplate를 통해 적절한 엔드포인트로 메시지를 전송하세요.SSE 구현하기
클라이언트에서 SSE 연결을 위한 엔드포인트를 구현하세요.
GET /api/sse사용자별 SseEmitter 객체를 생성하고 메시지를 전송하는 컴포넌트를 구현하세요.
connect: SseEmitter 객체를 생성합니다.send,broadcast: SseEmitter 객체를 통해 이벤트를 전송합니다.cleanUp: 주기적으로 ping을 보내서 만료된SseEmitter객체를 삭제합니다.ping: 최초 연결 또는 만료 여부를 확인하기 위한 용도로 더미 이벤트를 보냅니다.SseEmitter객체를 메모리에서 저장하는 컴포넌트를 구현하세요.ConcurrentMap: 스레드 세이프한 자료구조를 사용합니다.List<SseEmitter>: 사용자 당 N개의 연결을 허용할 수 있도록 합니다. (예: 다중 탭)이벤트 유실 복원을 위해 SSE 메시지를 저장하는 컴포넌트를 구현하세요.
LastEventId를 전송해 이벤트 유실 복원이 가능하도록 해야 합니다.새 알림이 생성되었을 때 클라이언트에 이벤트를 전송하세요.
클라이언트는 이 이벤트를 수신하면 알림 목록에 알림을 추가합니다.
이벤트 명세
배포 아키텍처 구성하기
다음의 다이어그램에 부합하는 배포 아키텍처를 Docker Compose를 통해 구현하세요.
gtwxscalk-image.png
Reverse Proxy/api/*,/ws/*요청은 Backend 컨테이너로 프록시 처리합니다./usr/share/nginx/html등)에 복사하세요.3000번 포트를 통해 접근할 수 있어야 합니다.BackendReverse Proxy를 통해/api/*,/ws/*요청이 이 서버로 전달됩니다.DB,Memory DB,Message BrokerBackend컨테이너가 접근 가능한 다음의 인프라 컨테이너들을 구성하세요심화 요구사항
웹소켓 인증/인가 처리하기
인증 처리
디스코드잇 클라이언트는
CONNECT프레임의 헤더에 다음과 같이Authorization토큰을 포함합니다.서버 측에서는
ChannelInterceptor를 구현하여 연결 시 토큰을 검증하고, 인증된 사용자 정보를SecurityContext에 설정해야 합니다.CONNECT프레임일 때 엑세스 토큰을 검증하는JwtAuthenticationChannelInterceptor구현체를 정의하세요.JwtAuthenticationFilter를 참고하세요.SecurityContext에 인증정보를 저장하는 대신accessor객체에 저장하세요.SecurityContextChannelInterceptor를 등록하여 이후 메시지 처리 흐름에서도 인증 정보를 활용할 수 있도록 구성하세요.인가 처리
AuthorizationChannelInterceptor를 사용해 메시지 권한 검사를 수행합니다.AuthorizationChannelInterceptor를 활용하기 위해의존성을 추가하세요.MessageMatcherDelegatingAuthorizationManager를 활용해 인가 정책을 정의하고, 채널에 추가하세요.분산 환경 배포 아키텍처 구성하기
-
-
- Round Robin
- Least Connections
- IP Hash
- Weight
-
-
-
-
- 원자적 연산을 위해 분산락을 사용합니다.
-
-
-
## 기본 요구사항다음의 다이어그램에 부합하는 배포 아키텍처를 Docker Compose를 통해 구현하세요.
zkdz2mts7-image.png
Backend-*deploy.replicas설정을 활용하세요.Reverse Proxyupstream블록을 수정해 다음의 로드밸런싱 전략을 적용해Backend로 트래픽을 분산시켜보세요.기본값$upstream_addr변수를 활용해 실제 요청을 처리하는 서버의 IP를 헤더에 추가하고 브라우저 개발자 도구를 활용해 비교해보세요.분산환경에 따른
InMemoryJwtRegistry의 한계점을 식별하고 Redis를 활용해 리팩토링하세요.어떤 한계가 있는지 식별하고 PR에 남겨주세요.
<aside> 💜
이게 어떤 오류인지는 아직 정확히 잘 모르겠지만, 분산환경으로 바꾼 후 인메모리에서 작동하는 JWT 액세스 토큰이 다른 애플리케이션에서 발견되지 않아 401 UnAuthorized 에러가 뜨는 것 같음
</aside>
RedisJwtRegistry구현체를 활용하세요.분산환경에 따른 웹소켓과 SSE의 한계점을 식별하고 Kafka를 활용해 리팩토링하세요.
어떤 한계가 있는지 식별하고 PR에 남겨주세요.
일반적인 카프카 이벤트와 다르게 각 서버 인스턴스마다 이벤트를 받을 수 있어야 합니다. 따라서
컨슈머 group id를 적절히 설정하세요.<aside> 💜
대체 왜 안되나 했는데 consumer의 group ip 설정을 안해줘서 그런 것 같음.
</aside>
웹소켓 구현하기
spring-boot-starter-websocket의존성을 추가하세요.implementation 'org.springframework.boot:spring-boot-starter-websocket'웹소켓 메시지 브로커 설정
메모리 기반
SimpleBroker를 사용하세요./sub으로 설정하세요./pub으로 설정하세요./ws로 설정하고,SockJS연결을 지원해야 합니다.첨부파일이 없는 단순 텍스트 메시지인 경우 STOMP를 통해 메시지를 전송할 수 있도록 컨트롤러를 구현하세요.
/pub/messages엔드포인트에 메시지를 전송할 수 있어야 합니다.@MessageMapping을 활용하세요.MessageCreateRequest를 그대로 활용합니다.첨부파일이 포함된 메시지는 기존의 API (
POST /api/messages)를 그대로 활용합니다.클라이언트는 채널 입장 시 웹소켓으로
/sub/channels.{channelId}.messages를 구독해 메시지를 수신합니다.이를 고려해 메시지가 생성되면 해당 엔드포인트로 메시지를 보내는 컴포넌트를 구현하세요.
MessageCreatedEvent를 통해 새로운 메시지 생성 이벤트를 확인하세요.SimpMessagingTemplate를 통해 적절한 엔드포인트로 메시지를 전송하세요.SSE 구현하기
클라이언트에서 SSE 연결을 위한 엔드포인트를 구현하세요.
GET /api/sse사용자별 SseEmitter 객체를 생성하고 메시지를 전송하는 컴포넌트를 구현하세요.
connect: SseEmitter 객체를 생성합니다.send,broadcast: SseEmitter 객체를 통해 이벤트를 전송합니다.cleanUp: 주기적으로 ping을 보내서 만료된SseEmitter객체를 삭제합니다.ping: 최초 연결 또는 만료 여부를 확인하기 위한 용도로 더미 이벤트를 보냅니다.SseEmitter객체를 메모리에서 저장하는 컴포넌트를 구현하세요.ConcurrentMap: 스레드 세이프한 자료구조를 사용합니다.List<SseEmitter>: 사용자 당 N개의 연결을 허용할 수 있도록 합니다. (예: 다중 탭)이벤트 유실 복원을 위해 SSE 메시지를 저장하는 컴포넌트를 구현하세요.
LastEventId를 전송해 이벤트 유실 복원이 가능하도록 해야 합니다.새 알림이 생성되었을 때 클라이언트에 이벤트를 전송하세요.
클라이언트는 이 이벤트를 수신하면 알림 목록에 알림을 추가합니다.
이벤트 명세
notifications.createdNotificationDto파일 업로드 상태가 변경될 때 이벤트를 발송하세요.
클라이언트는 해당 상태를 수신하면 파일 상태 UI를 다시 렌더링합니다.
이벤트 명세
binaryContents.updatedBinaryContentDto채널 정보가 변경될 때, 이벤트를 발송하세요.
클라이언트는 해당 이벤트를 수신하면 채널 UI를 다시 렌더링합니다.
이벤트 명세
channels.createdorupdatedordeletedChannelDto사용자 정보 또는 로그인 상태가 변경될 때, 이벤트를 발송하세요.
클라이언트는 해당 이벤트를 수신하면 사용자 UI를 다시 렌더링합니다.
이벤트 명세
users.createdorupdatedordeletedUserDto배포 아키텍처 구성하기
다음의 다이어그램에 부합하는 배포 아키텍처를 Docker Compose를 통해 구현하세요.
[gtwxscalk-image.png](https://bakey-api.codeit.kr/api/files/resource?root=static&seqId=14132&version=1&directory=/gtwxscalk-image.png&name=gtwxscalk-image.png)
Reverse Proxy/api/*,/ws/*요청은 Backend 컨테이너로 프록시 처리합니다./usr/share/nginx/html등)에 복사하세요.3000번 포트를 통해 접근할 수 있어야 합니다.BackendReverse Proxy를 통해/api/*,/ws/*요청이 이 서버로 전달됩니다.DB,Memory DB,Message BrokerBackend컨테이너가 접근 가능한 다음의 인프라 컨테이너들을 구성하세요심화 요구사항
[웹소켓 인증/인가 처리하기](https://www.notion.so/2-WebSocket-25d8d8ad920781f2a4a7f25baf3040e6?pvs=21)
인증 처리
디스코드잇 클라이언트는
CONNECT프레임의 헤더에 다음과 같이Authorization토큰을 포함합니다.서버 측에서는
ChannelInterceptor를 구현하여 연결 시 토큰을 검증하고, 인증된 사용자 정보를SecurityContext에 설정해야 합니다.CONNECT프레임일 때 엑세스 토큰을 검증하는JwtAuthenticationChannelInterceptor구현체를 정의하세요.JwtAuthenticationFilter를 참고하세요.SecurityContext에 인증정보를 저장하는 대신accessor객체에 저장하세요.SecurityContextChannelInterceptor를 등록하여 이후 메시지 처리 흐름에서도 인증 정보를 활용할 수 있도록 구성하세요.@Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors( jwtAuthenticationChannelInterceptor, new SecurityContextChannelInterceptor(), ); }인가 처리
AuthorizationChannelInterceptor를 사용해 메시지 권한 검사를 수행합니다.AuthorizationChannelInterceptor를 활용하기 위해의존성을 추가하세요.implementation 'org.springframework.security:spring-security-messaging'MessageMatcherDelegatingAuthorizationManager를 활용해 인가 정책을 정의하고, 채널에 추가하세요.@Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.interceptors( jwtAuthenticationChannelInterceptor, new SecurityContextChannelInterceptor(), authorizationChannelInterceptor() ); }분산 환경 배포 아키텍처 구성하기
다음의 다이어그램에 부합하는 배포 아키텍처를 Docker Compose를 통해 구현하세요.
[zkdz2mts7-image.png](https://bakey-api.codeit.kr/api/files/resource?root=static&seqId=14134&version=1&directory=/zkdz2mts7-image.png&name=zkdz2mts7-image.png)
Backend-*deploy.replicas설정을 활용하세요.Reverse Proxyupstream블록을 수정해 다음의 로드밸런싱 전략을 적용해Backend로 트래픽을 분산시켜보세요.기본값$upstream_addr변수를 활용해 실제 요청을 처리하는 서버의 IP를 헤더에 추가하고 브라우저 개발자 도구를 활용해 비교해보세요.분산환경에 따른
InMemoryJwtRegistry의 한계점을 식별하고 Redis를 활용해 리팩토링하세요.어떤 한계가 있는지 식별하고 PR에 남겨주세요.
💜이게 어떤 오류인지는 아직 정확히 잘 모르겠지만, 분산환경으로 바꾼 후 인메모리에서 작동하는 JWT 액세스 토큰이 다른 애플리케이션에서 발견되지 않아 401 UnAuthorized 에러가 뜨는 것 같음
RedisJwtRegistry구현체를 활용하세요.분산환경에 따른 웹소켓과 SSE의 한계점을 식별하고 Kafka를 활용해 리팩토링하세요.
어떤 한계가 있는지 식별하고 PR에 남겨주세요.
일반적인 카프카 이벤트와 다르게 각 서버 인스턴스마다 이벤트를 받을 수 있어야 합니다. 따라서
💜컨슈머 group id를 적절히 설정하세요.대체 왜 안되나 했는데 consumer의 group ip 설정을 안해줘서 그런 것 같음.