Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ dependencies {
implementation 'org.webjars:sockjs-client:1.5.1'
implementation 'org.webjars:stomp-websocket:2.3.4'
// kafka
implementation 'org.springframework.kafka:spring-kafka'
// implementation 'org.springframework.kafka:spring-kafka'

compileOnly 'org.projectlombok:lombok'

Expand Down
52 changes: 25 additions & 27 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,30 @@ services:
DB_NAME: ${DB_NAME}
DB_PASSWORD: ${DB_PASSWORD}
SLACK_WEBHOOK_URL: ${SLACK_WEBHOOK_URL}
depends_on:
- kafka
restart: on-failure

zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ports:
- "2181:2181"
restart: always

kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_HEAP_OPTS: "-Xms256M -Xmx512M" # JVM 힙 메모리 최소/최대 설정
depends_on:
- zookeeper
restart: always
# zookeeper:
# image: confluentinc/cp-zookeeper:latest
# container_name: zookeeper
# environment:
# ZOOKEEPER_CLIENT_PORT: 2181
# ports:
# - "2181:2181"
# restart: always
#
# kafka:
# image: confluentinc/cp-kafka:latest
# container_name: kafka
# ports:
# - "9092:9092"
# environment:
# KAFKA_BROKER_ID: 1
# KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
# KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# KAFKA_HEAP_OPTS: "-Xms256M -Xmx512M" # JVM 힙 메모리 최소/최대 설정
# depends_on:
# - zookeeper
# restart: always
196 changes: 98 additions & 98 deletions src/main/java/org/myteam/server/chat/config/KafkaConfig.java
Original file line number Diff line number Diff line change
@@ -1,98 +1,98 @@
package org.myteam.server.chat.config;


import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.myteam.server.chat.domain.Chat;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConfig {

private static final String BOOTSTRAP_SERVERS = "kafka:9092";
private static final String DEFAULT_GROUP_ID = "chat-group";

@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
return new KafkaAdmin(configs);
}

/**
* Kafka ProducerFactory를 생성하는 Bean 메서드
*/
@Bean
public ProducerFactory<String, Chat> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigurations());
}

/**
* Kafka Producer 구성을 위한 설정값들을 포함한 맵을 반환하는 메서드
*/
@Bean
public Map<String, Object> producerConfigurations() {
Map<String, Object> producerConfigurations = new HashMap<>();

producerConfigurations.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
producerConfigurations.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerConfigurations.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
producerConfigurations.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); // JSON 타입 헤더 제거 (선택사항)

return producerConfigurations;
}

/**
* KafkaTemplate을 생성하는 Bean 메서드
*/
@Bean
public KafkaTemplate<String, Chat> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

/**
* Kafka ConsumerFactory를 생성하는 Bean 메서드
*/
@Bean
public ConsumerFactory<String, Chat> consumerFactory() {
JsonDeserializer<Chat> deserializer = new JsonDeserializer<>(Chat.class);
deserializer.addTrustedPackages("*"); // 모든 패키지 신뢰 (필요 시 제한적으로 변경)

Map<String, Object> consumerConfigurations = Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS,
ConsumerConfig.GROUP_ID_CONFIG, DEFAULT_GROUP_ID,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"
);

return new DefaultKafkaConsumerFactory<>(consumerConfigurations, new StringDeserializer(), deserializer);
}

/**
* KafkaListener 컨테이너 팩토리를 생성하는 Bean 메서드
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Chat> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Chat> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());

factory.setConcurrency(3); // 병렬 처리 설정 (기본값 1)
factory.getContainerProperties().setPollTimeout(3000L); // 폴링 시간 설정 (선택사항)

return factory;
}
}
//package org.myteam.server.chat.config;
//
//
//import org.apache.kafka.clients.admin.AdminClientConfig;
//import org.apache.kafka.clients.consumer.ConsumerConfig;
//import org.apache.kafka.clients.producer.ProducerConfig;
//import org.apache.kafka.common.serialization.StringDeserializer;
//import org.apache.kafka.common.serialization.StringSerializer;
//import org.myteam.server.chat.domain.Chat;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.kafka.annotation.EnableKafka;
//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
//import org.springframework.kafka.core.*;
//import org.springframework.kafka.support.serializer.JsonDeserializer;
//import org.springframework.kafka.support.serializer.JsonSerializer;
//
//import java.util.HashMap;
//import java.util.Map;
//
//@EnableKafka
//@Configuration
//public class KafkaConfig {
//
// private static final String BOOTSTRAP_SERVERS = "kafka:9092";
// private static final String DEFAULT_GROUP_ID = "chat-group";
//
// @Bean
// public KafkaAdmin kafkaAdmin() {
// Map<String, Object> configs = new HashMap<>();
// configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// return new KafkaAdmin(configs);
// }
//
// /**
// * Kafka ProducerFactory를 생성하는 Bean 메서드
// */
// @Bean
// public ProducerFactory<String, Chat> producerFactory() {
// return new DefaultKafkaProducerFactory<>(producerConfigurations());
// }
//
// /**
// * Kafka Producer 구성을 위한 설정값들을 포함한 맵을 반환하는 메서드
// */
// @Bean
// public Map<String, Object> producerConfigurations() {
// Map<String, Object> producerConfigurations = new HashMap<>();
//
// producerConfigurations.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// producerConfigurations.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// producerConfigurations.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// producerConfigurations.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); // JSON 타입 헤더 제거 (선택사항)
//
// return producerConfigurations;
// }
//
// /**
// * KafkaTemplate을 생성하는 Bean 메서드
// */
// @Bean
// public KafkaTemplate<String, Chat> kafkaTemplate() {
// return new KafkaTemplate<>(producerFactory());
// }
//
// /**
// * Kafka ConsumerFactory를 생성하는 Bean 메서드
// */
// @Bean
// public ConsumerFactory<String, Chat> consumerFactory() {
// JsonDeserializer<Chat> deserializer = new JsonDeserializer<>(Chat.class);
// deserializer.addTrustedPackages("*"); // 모든 패키지 신뢰 (필요 시 제한적으로 변경)
//
// Map<String, Object> consumerConfigurations = Map.of(
// ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS,
// ConsumerConfig.GROUP_ID_CONFIG, DEFAULT_GROUP_ID,
// ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
// ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer,
// ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"
// );
//
// return new DefaultKafkaConsumerFactory<>(consumerConfigurations, new StringDeserializer(), deserializer);
// }
//
// /**
// * KafkaListener 컨테이너 팩토리를 생성하는 Bean 메서드
// */
// @Bean
// public ConcurrentKafkaListenerContainerFactory<String, Chat> kafkaListenerContainerFactory() {
// ConcurrentKafkaListenerContainerFactory<String, Chat> factory = new ConcurrentKafkaListenerContainerFactory<>();
// factory.setConsumerFactory(consumerFactory());
//
// factory.setConcurrency(3); // 병렬 처리 설정 (기본값 1)
// factory.getContainerProperties().setPollTimeout(3000L); // 폴링 시간 설정 (선택사항)
//
// return factory;
// }
//}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@


import lombok.RequiredArgsConstructor;
import org.myteam.server.chat.interceptor.StompHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
Expand All @@ -14,6 +16,8 @@
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

private final StompHandler stompHandler;

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/play-hive");
Expand All @@ -36,4 +40,9 @@ public void configureWebSocketTransport(WebSocketTransportRegistration registry)
registry.setSendTimeLimit(100 * 10000);
registry.setSendBufferSizeLimit(3 * 512 * 1024);
}

@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(stompHandler);
}
}
55 changes: 14 additions & 41 deletions src/main/java/org/myteam/server/chat/controller/ChatController.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import org.myteam.server.chat.service.FilterWriteService;
import org.myteam.server.global.web.response.ResponseDto;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.web.bind.annotation.*;

import java.util.List;
Expand All @@ -26,22 +29,24 @@ public class ChatController {

private final ChatReadService chatReadService;
private final ChatWriteService chatWriteService;
private final FilterWriteService filterWriteService;

/**
* Kafka를 통해 메시지 전송
* TODO: Kafka를 통해 메시지 전송
* 원인: EC2 프리티어 램 메모리 이슈로 인한 인스턴스 정지로 Kafka 현재 사용 불가능
*/
@PostMapping("/send/{roomId}")
public ResponseEntity<ResponseDto<String>> sendMessageToRoom(@PathVariable Long roomId,
@RequestBody ChatMessage message) {
@MessageMapping("/{roomId}")
@SendTo("/room/{roomId}")
public ChatMessage chat(@DestinationVariable Long roomId, ChatMessage message) {
log.info("Sending message to room {}: {}", roomId, message);

Chat chat = chatWriteService.createChat(roomId, message.getSender(), message.getSenderEmail(), message.getMessage());

return ResponseEntity.ok(new ResponseDto<>(
SUCCESS.name(),
"Message sent to Kafka topic: room-" + roomId,
null));
return ChatMessage.builder()
.roomId(roomId)
.sender(chat.getSender())
.senderEmail(chat.getSenderEmail())
.message(chat.getMessage())
.build();
}

/**
Expand Down Expand Up @@ -94,36 +99,4 @@ public ResponseEntity<ResponseDto<List<ChatRoomResponse>>> getChatRoom() {
chatRooms
));
}

/**
* 필터 데이터 추가
*/
@PostMapping("/filter")
public ResponseEntity<ResponseDto<String>> addFilterData(@RequestBody FilterDataRequest filterData) {
log.info("addFilterData: {}", filterData);

filterWriteService.addFilteredWord(filterData.getFilterData());

return ResponseEntity.ok(new ResponseDto<>(
SUCCESS.name(),
"Successfully Filter data added",
filterData.getFilterData()
));
}

/**
* 필터 데이터 삭제
*/
@DeleteMapping("/filter")
public ResponseEntity<ResponseDto<String>> deleteFilterData(@RequestBody FilterDataRequest filterData) {
log.info("deleteFilterData: {}", filterData);

filterWriteService.removeFilteredWord(filterData.getFilterData());

return ResponseEntity.ok(new ResponseDto<>(
SUCCESS.name(),
"Successfully Filter data deleted",
filterData.getFilterData()
));
}
}
Loading