Skip to content

Commit 43e2f01

Browse files
authored
Merge pull request #85 from MT-TEAM-Org/fix/kafka-config
feat: 채팅 로직 kafka -> spring stomp
2 parents 89c8394 + 8813503 commit 43e2f01

File tree

13 files changed

+419
-278
lines changed

13 files changed

+419
-278
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ dependencies {
3636
implementation 'org.webjars:sockjs-client:1.5.1'
3737
implementation 'org.webjars:stomp-websocket:2.3.4'
3838
// kafka
39-
implementation 'org.springframework.kafka:spring-kafka'
39+
// implementation 'org.springframework.kafka:spring-kafka'
4040

4141
compileOnly 'org.projectlombok:lombok'
4242

docker-compose.yml

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,32 +28,30 @@ services:
2828
DB_NAME: ${DB_NAME}
2929
DB_PASSWORD: ${DB_PASSWORD}
3030
SLACK_WEBHOOK_URL: ${SLACK_WEBHOOK_URL}
31-
depends_on:
32-
- kafka
33-
restart: on-failure
34-
35-
zookeeper:
36-
image: confluentinc/cp-zookeeper:latest
37-
container_name: zookeeper
38-
environment:
39-
ZOOKEEPER_CLIENT_PORT: 2181
40-
ports:
41-
- "2181:2181"
4231
restart: always
4332

44-
kafka:
45-
image: confluentinc/cp-kafka:latest
46-
container_name: kafka
47-
ports:
48-
- "9092:9092"
49-
environment:
50-
KAFKA_BROKER_ID: 1
51-
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
52-
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
53-
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
54-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
55-
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
56-
KAFKA_HEAP_OPTS: "-Xms256M -Xmx512M" # JVM 힙 메모리 최소/최대 설정
57-
depends_on:
58-
- zookeeper
59-
restart: always
33+
# zookeeper:
34+
# image: confluentinc/cp-zookeeper:latest
35+
# container_name: zookeeper
36+
# environment:
37+
# ZOOKEEPER_CLIENT_PORT: 2181
38+
# ports:
39+
# - "2181:2181"
40+
# restart: always
41+
#
42+
# kafka:
43+
# image: confluentinc/cp-kafka:latest
44+
# container_name: kafka
45+
# ports:
46+
# - "9092:9092"
47+
# environment:
48+
# KAFKA_BROKER_ID: 1
49+
# KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
50+
# KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
51+
# KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
52+
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
53+
# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
54+
# KAFKA_HEAP_OPTS: "-Xms256M -Xmx512M" # JVM 힙 메모리 최소/최대 설정
55+
# depends_on:
56+
# - zookeeper
57+
# restart: always
Lines changed: 98 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,98 +1,98 @@
1-
package org.myteam.server.chat.config;
2-
3-
4-
import org.apache.kafka.clients.admin.AdminClientConfig;
5-
import org.apache.kafka.clients.consumer.ConsumerConfig;
6-
import org.apache.kafka.clients.producer.ProducerConfig;
7-
import org.apache.kafka.common.serialization.StringDeserializer;
8-
import org.apache.kafka.common.serialization.StringSerializer;
9-
import org.myteam.server.chat.domain.Chat;
10-
import org.springframework.context.annotation.Bean;
11-
import org.springframework.context.annotation.Configuration;
12-
import org.springframework.kafka.annotation.EnableKafka;
13-
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
14-
import org.springframework.kafka.core.*;
15-
import org.springframework.kafka.support.serializer.JsonDeserializer;
16-
import org.springframework.kafka.support.serializer.JsonSerializer;
17-
18-
import java.util.HashMap;
19-
import java.util.Map;
20-
21-
@EnableKafka
22-
@Configuration
23-
public class KafkaConfig {
24-
25-
private static final String BOOTSTRAP_SERVERS = "kafka:9092";
26-
private static final String DEFAULT_GROUP_ID = "chat-group";
27-
28-
@Bean
29-
public KafkaAdmin kafkaAdmin() {
30-
Map<String, Object> configs = new HashMap<>();
31-
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
32-
return new KafkaAdmin(configs);
33-
}
34-
35-
/**
36-
* Kafka ProducerFactory를 생성하는 Bean 메서드
37-
*/
38-
@Bean
39-
public ProducerFactory<String, Chat> producerFactory() {
40-
return new DefaultKafkaProducerFactory<>(producerConfigurations());
41-
}
42-
43-
/**
44-
* Kafka Producer 구성을 위한 설정값들을 포함한 맵을 반환하는 메서드
45-
*/
46-
@Bean
47-
public Map<String, Object> producerConfigurations() {
48-
Map<String, Object> producerConfigurations = new HashMap<>();
49-
50-
producerConfigurations.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
51-
producerConfigurations.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
52-
producerConfigurations.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
53-
producerConfigurations.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); // JSON 타입 헤더 제거 (선택사항)
54-
55-
return producerConfigurations;
56-
}
57-
58-
/**
59-
* KafkaTemplate을 생성하는 Bean 메서드
60-
*/
61-
@Bean
62-
public KafkaTemplate<String, Chat> kafkaTemplate() {
63-
return new KafkaTemplate<>(producerFactory());
64-
}
65-
66-
/**
67-
* Kafka ConsumerFactory를 생성하는 Bean 메서드
68-
*/
69-
@Bean
70-
public ConsumerFactory<String, Chat> consumerFactory() {
71-
JsonDeserializer<Chat> deserializer = new JsonDeserializer<>(Chat.class);
72-
deserializer.addTrustedPackages("*"); // 모든 패키지 신뢰 (필요 시 제한적으로 변경)
73-
74-
Map<String, Object> consumerConfigurations = Map.of(
75-
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS,
76-
ConsumerConfig.GROUP_ID_CONFIG, DEFAULT_GROUP_ID,
77-
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
78-
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer,
79-
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"
80-
);
81-
82-
return new DefaultKafkaConsumerFactory<>(consumerConfigurations, new StringDeserializer(), deserializer);
83-
}
84-
85-
/**
86-
* KafkaListener 컨테이너 팩토리를 생성하는 Bean 메서드
87-
*/
88-
@Bean
89-
public ConcurrentKafkaListenerContainerFactory<String, Chat> kafkaListenerContainerFactory() {
90-
ConcurrentKafkaListenerContainerFactory<String, Chat> factory = new ConcurrentKafkaListenerContainerFactory<>();
91-
factory.setConsumerFactory(consumerFactory());
92-
93-
factory.setConcurrency(3); // 병렬 처리 설정 (기본값 1)
94-
factory.getContainerProperties().setPollTimeout(3000L); // 폴링 시간 설정 (선택사항)
95-
96-
return factory;
97-
}
98-
}
1+
//package org.myteam.server.chat.config;
2+
//
3+
//
4+
//import org.apache.kafka.clients.admin.AdminClientConfig;
5+
//import org.apache.kafka.clients.consumer.ConsumerConfig;
6+
//import org.apache.kafka.clients.producer.ProducerConfig;
7+
//import org.apache.kafka.common.serialization.StringDeserializer;
8+
//import org.apache.kafka.common.serialization.StringSerializer;
9+
//import org.myteam.server.chat.domain.Chat;
10+
//import org.springframework.context.annotation.Bean;
11+
//import org.springframework.context.annotation.Configuration;
12+
//import org.springframework.kafka.annotation.EnableKafka;
13+
//import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
14+
//import org.springframework.kafka.core.*;
15+
//import org.springframework.kafka.support.serializer.JsonDeserializer;
16+
//import org.springframework.kafka.support.serializer.JsonSerializer;
17+
//
18+
//import java.util.HashMap;
19+
//import java.util.Map;
20+
//
21+
//@EnableKafka
22+
//@Configuration
23+
//public class KafkaConfig {
24+
//
25+
// private static final String BOOTSTRAP_SERVERS = "kafka:9092";
26+
// private static final String DEFAULT_GROUP_ID = "chat-group";
27+
//
28+
// @Bean
29+
// public KafkaAdmin kafkaAdmin() {
30+
// Map<String, Object> configs = new HashMap<>();
31+
// configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
32+
// return new KafkaAdmin(configs);
33+
// }
34+
//
35+
// /**
36+
// * Kafka ProducerFactory를 생성하는 Bean 메서드
37+
// */
38+
// @Bean
39+
// public ProducerFactory<String, Chat> producerFactory() {
40+
// return new DefaultKafkaProducerFactory<>(producerConfigurations());
41+
// }
42+
//
43+
// /**
44+
// * Kafka Producer 구성을 위한 설정값들을 포함한 맵을 반환하는 메서드
45+
// */
46+
// @Bean
47+
// public Map<String, Object> producerConfigurations() {
48+
// Map<String, Object> producerConfigurations = new HashMap<>();
49+
//
50+
// producerConfigurations.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
51+
// producerConfigurations.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
52+
// producerConfigurations.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
53+
// producerConfigurations.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); // JSON 타입 헤더 제거 (선택사항)
54+
//
55+
// return producerConfigurations;
56+
// }
57+
//
58+
// /**
59+
// * KafkaTemplate을 생성하는 Bean 메서드
60+
// */
61+
// @Bean
62+
// public KafkaTemplate<String, Chat> kafkaTemplate() {
63+
// return new KafkaTemplate<>(producerFactory());
64+
// }
65+
//
66+
// /**
67+
// * Kafka ConsumerFactory를 생성하는 Bean 메서드
68+
// */
69+
// @Bean
70+
// public ConsumerFactory<String, Chat> consumerFactory() {
71+
// JsonDeserializer<Chat> deserializer = new JsonDeserializer<>(Chat.class);
72+
// deserializer.addTrustedPackages("*"); // 모든 패키지 신뢰 (필요 시 제한적으로 변경)
73+
//
74+
// Map<String, Object> consumerConfigurations = Map.of(
75+
// ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS,
76+
// ConsumerConfig.GROUP_ID_CONFIG, DEFAULT_GROUP_ID,
77+
// ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
78+
// ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer,
79+
// ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"
80+
// );
81+
//
82+
// return new DefaultKafkaConsumerFactory<>(consumerConfigurations, new StringDeserializer(), deserializer);
83+
// }
84+
//
85+
// /**
86+
// * KafkaListener 컨테이너 팩토리를 생성하는 Bean 메서드
87+
// */
88+
// @Bean
89+
// public ConcurrentKafkaListenerContainerFactory<String, Chat> kafkaListenerContainerFactory() {
90+
// ConcurrentKafkaListenerContainerFactory<String, Chat> factory = new ConcurrentKafkaListenerContainerFactory<>();
91+
// factory.setConsumerFactory(consumerFactory());
92+
//
93+
// factory.setConcurrency(3); // 병렬 처리 설정 (기본값 1)
94+
// factory.getContainerProperties().setPollTimeout(3000L); // 폴링 시간 설정 (선택사항)
95+
//
96+
// return factory;
97+
// }
98+
//}

src/main/java/org/myteam/server/chat/config/WebSocketConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33

44
import lombok.RequiredArgsConstructor;
5+
import org.myteam.server.chat.interceptor.StompHandler;
56
import org.springframework.context.annotation.Configuration;
7+
import org.springframework.messaging.simp.config.ChannelRegistration;
68
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
79
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
810
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
@@ -14,6 +16,8 @@
1416
@RequiredArgsConstructor
1517
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
1618

19+
private final StompHandler stompHandler;
20+
1721
@Override
1822
public void configureMessageBroker(MessageBrokerRegistry registry) {
1923
registry.setApplicationDestinationPrefixes("/play-hive");
@@ -36,4 +40,9 @@ public void configureWebSocketTransport(WebSocketTransportRegistration registry)
3640
registry.setSendTimeLimit(100 * 10000);
3741
registry.setSendBufferSizeLimit(3 * 512 * 1024);
3842
}
43+
44+
@Override
45+
public void configureClientInboundChannel(ChannelRegistration registration) {
46+
registration.interceptors(stompHandler);
47+
}
3948
}

src/main/java/org/myteam/server/chat/controller/ChatController.java

Lines changed: 14 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
import org.myteam.server.chat.service.FilterWriteService;
1313
import org.myteam.server.global.web.response.ResponseDto;
1414
import org.springframework.http.ResponseEntity;
15+
import org.springframework.messaging.handler.annotation.DestinationVariable;
16+
import org.springframework.messaging.handler.annotation.MessageMapping;
17+
import org.springframework.messaging.handler.annotation.SendTo;
1518
import org.springframework.web.bind.annotation.*;
1619

1720
import java.util.List;
@@ -26,22 +29,24 @@ public class ChatController {
2629

2730
private final ChatReadService chatReadService;
2831
private final ChatWriteService chatWriteService;
29-
private final FilterWriteService filterWriteService;
3032

3133
/**
32-
* Kafka를 통해 메시지 전송
34+
* TODO: Kafka를 통해 메시지 전송
35+
* 원인: EC2 프리티어 램 메모리 이슈로 인한 인스턴스 정지로 Kafka 현재 사용 불가능
3336
*/
34-
@PostMapping("/send/{roomId}")
35-
public ResponseEntity<ResponseDto<String>> sendMessageToRoom(@PathVariable Long roomId,
36-
@RequestBody ChatMessage message) {
37+
@MessageMapping("/{roomId}")
38+
@SendTo("/room/{roomId}")
39+
public ChatMessage chat(@DestinationVariable Long roomId, ChatMessage message) {
3740
log.info("Sending message to room {}: {}", roomId, message);
3841

3942
Chat chat = chatWriteService.createChat(roomId, message.getSender(), message.getSenderEmail(), message.getMessage());
4043

41-
return ResponseEntity.ok(new ResponseDto<>(
42-
SUCCESS.name(),
43-
"Message sent to Kafka topic: room-" + roomId,
44-
null));
44+
return ChatMessage.builder()
45+
.roomId(roomId)
46+
.sender(chat.getSender())
47+
.senderEmail(chat.getSenderEmail())
48+
.message(chat.getMessage())
49+
.build();
4550
}
4651

4752
/**
@@ -94,36 +99,4 @@ public ResponseEntity<ResponseDto<List<ChatRoomResponse>>> getChatRoom() {
9499
chatRooms
95100
));
96101
}
97-
98-
/**
99-
* 필터 데이터 추가
100-
*/
101-
@PostMapping("/filter")
102-
public ResponseEntity<ResponseDto<String>> addFilterData(@RequestBody FilterDataRequest filterData) {
103-
log.info("addFilterData: {}", filterData);
104-
105-
filterWriteService.addFilteredWord(filterData.getFilterData());
106-
107-
return ResponseEntity.ok(new ResponseDto<>(
108-
SUCCESS.name(),
109-
"Successfully Filter data added",
110-
filterData.getFilterData()
111-
));
112-
}
113-
114-
/**
115-
* 필터 데이터 삭제
116-
*/
117-
@DeleteMapping("/filter")
118-
public ResponseEntity<ResponseDto<String>> deleteFilterData(@RequestBody FilterDataRequest filterData) {
119-
log.info("deleteFilterData: {}", filterData);
120-
121-
filterWriteService.removeFilteredWord(filterData.getFilterData());
122-
123-
return ResponseEntity.ok(new ResponseDto<>(
124-
SUCCESS.name(),
125-
"Successfully Filter data deleted",
126-
filterData.getFilterData()
127-
));
128-
}
129102
}

0 commit comments

Comments
 (0)