Skip to content

Commit da8ccc7

Browse files
authored
Merge pull request #354 from prgrms-web-devcourse-final-project/fix/kafka
fix[kafka]: 수정
2 parents f838244 + d353f6c commit da8ccc7

File tree

6 files changed

+28
-78
lines changed

6 files changed

+28
-78
lines changed

backend/build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ dependencies {
4141
implementation 'org.springframework.boot:spring-boot-starter-oauth2-client'
4242
implementation group: 'org.springframework.boot', name: 'spring-boot-starter-mail', version: '3.0.5'
4343
implementation 'org.springframework.boot:spring-boot-starter-batch'
44-
implementation 'org.springframework.kafka:spring-kafka'
4544
testImplementation 'org.springframework.kafka:spring-kafka-test'
4645

4746
// API Documentation (문서화)

backend/docker-compose.yml

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -82,40 +82,8 @@ services:
8282
timeout: 5s
8383
retries: 10
8484

85-
zookeeper:
86-
image: confluentinc/cp-zookeeper:7.8.0
87-
container_name: zookeeper
88-
restart: unless-stopped
89-
ports:
90-
- "2181:2181"
91-
environment:
92-
ZOOKEEPER_CLIENT_PORT: 2181
93-
ZOOKEEPER_TICK_TIME: 2000
94-
95-
kafka:
96-
image: confluentinc/cp-kafka:7.8.0
97-
container_name: kafka
98-
restart: unless-stopped
99-
depends_on:
100-
- zookeeper
101-
ports:
102-
- "9092:9092"
103-
environment:
104-
KAFKA_BROKER_ID: 1
105-
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
106-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
107-
KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092
108-
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
109-
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
110-
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
111-
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
112-
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
113-
volumes:
114-
- kafka-data:/var/lib/kafka/data
115-
11685
volumes:
11786
mysql-data:
11887
redis-data:
11988
qdrant-data:
120-
ollama-data:
121-
kafka-data:
89+
ollama-data:

backend/src/main/java/com/ai/lawyer/domain/chatbot/service/AsyncPostChatProcessingService.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
package com.ai.lawyer.domain.chatbot.service;
22

3+
import com.ai.lawyer.domain.chatbot.dto.ChatDto.ChatHistoryDto;
4+
import com.ai.lawyer.domain.chatbot.dto.ChatDto.ChatLawDto;
5+
import com.ai.lawyer.domain.chatbot.dto.ChatDto.ChatPrecedentDto;
36
import com.ai.lawyer.domain.chatbot.dto.ExtractionDto.KeywordExtractionDto;
47
import com.ai.lawyer.domain.chatbot.dto.ExtractionDto.TitleExtractionDto;
58
import com.ai.lawyer.domain.chatbot.entity.*;
69
import com.ai.lawyer.domain.chatbot.repository.*;
10+
import com.ai.lawyer.infrastructure.redis.service.ChatCacheService;
711
import lombok.RequiredArgsConstructor;
812
import lombok.extern.slf4j.Slf4j;
913
import org.springframework.ai.chat.memory.ChatMemory;
@@ -16,6 +20,7 @@
1620
import org.springframework.stereotype.Service;
1721
import org.springframework.transaction.annotation.Transactional;
1822

23+
import java.util.ArrayList;
1924
import java.util.List;
2025
import java.util.stream.Collectors;
2126

@@ -27,6 +32,8 @@
2732
public class AsyncPostChatProcessingService {
2833

2934
private final KeywordService keywordService;
35+
private final ChatCacheService chatCacheService;
36+
3037
private final HistoryRepository historyRepository;
3138
private final ChatRepository chatRepository;
3239
private final KeywordRankRepository keywordRankRepository;
@@ -98,6 +105,9 @@ private void extractAndUpdateKeywordRanks(String message) {
98105
}
99106

100107
private void saveChatWithDocuments(History history, MessageType type, String message, List<Document> similarCaseDocuments, List<Document> similarLawDocuments) {
108+
List<ChatPrecedent> chatPrecedents = new ArrayList<>();
109+
List<ChatLaw> chatLaws = new ArrayList<>();
110+
101111
Chat chat = chatRepository.save(Chat.builder()
102112
.historyId(history)
103113
.type(type)
@@ -107,7 +117,7 @@ private void saveChatWithDocuments(History history, MessageType type, String mes
107117
// Ai 메시지가 저장될 때 관련 문서 저장
108118
if (type == MessageType.ASSISTANT) {
109119
if (similarCaseDocuments != null && !similarCaseDocuments.isEmpty()) {
110-
List<ChatPrecedent> chatPrecedents = similarCaseDocuments.stream()
120+
chatPrecedents = similarCaseDocuments.stream()
111121
.map(doc -> ChatPrecedent.builder()
112122
.chatId(chat)
113123
.precedentContent(doc.getText())
@@ -119,7 +129,7 @@ private void saveChatWithDocuments(History history, MessageType type, String mes
119129
}
120130

121131
if (similarLawDocuments != null && !similarLawDocuments.isEmpty()) {
122-
List<ChatLaw> chatLaws = similarLawDocuments.stream()
132+
chatLaws = similarLawDocuments.stream()
123133
.map(doc -> ChatLaw.builder()
124134
.chatId(chat)
125135
.content(doc.getText())
@@ -129,5 +139,16 @@ private void saveChatWithDocuments(History history, MessageType type, String mes
129139
chatLawRepository.saveAll(chatLaws);
130140
}
131141
}
142+
143+
// Redis 캐시에 DTO 저장
144+
ChatHistoryDto dto = ChatHistoryDto.builder()
145+
.type(type.toString())
146+
.message(message)
147+
.createdAt(chat.getCreatedAt())
148+
.precedent(chatPrecedents.isEmpty() ? null : ChatPrecedentDto.from(chatPrecedents.get(0)))
149+
.law(chatLaws.isEmpty() ? null : ChatLawDto.from(chatLaws.get(0)))
150+
.build();
151+
152+
chatCacheService.cacheChatMessage(history.getHistoryId(), dto);
132153
}
133154
}

backend/src/main/java/com/ai/lawyer/domain/chatbot/service/ChatBotService.java

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
import com.ai.lawyer.domain.chatbot.entity.History;
88
import com.ai.lawyer.domain.chatbot.repository.HistoryRepository;
99
import com.ai.lawyer.global.qdrant.service.QdrantService;
10-
import com.ai.lawyer.infrastructure.kafka.dto.ChatPostProcessEvent;
11-
import com.ai.lawyer.infrastructure.kafka.dto.DocumentDto;
1210
import lombok.RequiredArgsConstructor;
1311
import lombok.extern.slf4j.Slf4j;
1412
import org.springframework.ai.chat.client.ChatClient;
@@ -22,7 +20,6 @@
2220
import org.springframework.ai.chat.prompt.PromptTemplate;
2321
import org.springframework.ai.document.Document;
2422
import org.springframework.beans.factory.annotation.Value;
25-
import org.springframework.kafka.core.KafkaTemplate;
2623
import org.springframework.stereotype.Service;
2724
import org.springframework.transaction.annotation.Transactional;
2825
import reactor.core.publisher.Flux;
@@ -40,18 +37,14 @@ public class ChatBotService {
4037
private final ChatClient chatClient;
4138
private final QdrantService qdrantService;
4239
private final HistoryService historyService;
40+
private final AsyncPostChatProcessingService asyncPostChatProcessingService;
41+
4342
private final HistoryRepository historyRepository;
4443
private final ChatMemoryRepository chatMemoryRepository;
4544

46-
// KafkaTemplate 주입
47-
private final KafkaTemplate<String, ChatPostProcessEvent> kafkaTemplate;
48-
4945
@Value("${custom.ai.system-message}")
5046
private String systemMessageTemplate;
5147

52-
// Kafka 토픽 이름 -> 추후 application.yml로 이동 고려
53-
private static final String POST_PROCESSING_TOPIC = "chat-post-processing";
54-
5548
// 핵심 로직
5649
// 멤버 조회 -> 벡터 검색 -> 프롬프트 생성 -> LLM 호출 (스트림) -> Kafka 이벤트 발행 -> 응답 반환
5750
@Transactional
@@ -78,25 +71,7 @@ public Flux<ChatResponse> sendMessage(Long memberId, ChatRequest chatRequestDto,
7871
.content()
7972
.collectList()
8073
.map(fullResponseList -> String.join("", fullResponseList))
81-
.doOnNext(fullResponse -> {
82-
83-
// Document를 DTO로 변환
84-
List<DocumentDto> caseDtos = similarCaseDocuments.stream().map(DocumentDto::from).collect(Collectors.toList());
85-
List<DocumentDto> lawDtos = similarLawDocuments.stream().map(DocumentDto::from).collect(Collectors.toList());
86-
87-
// Kafka로 보낼 이벤트 객체
88-
ChatPostProcessEvent event = new ChatPostProcessEvent(
89-
history.getHistoryId(),
90-
chatRequestDto.getMessage(),
91-
fullResponse,
92-
caseDtos,
93-
lawDtos
94-
);
95-
96-
// Kafka 이벤트 발행
97-
kafkaTemplate.send(POST_PROCESSING_TOPIC, event);
98-
99-
})
74+
.doOnNext(fullResponse -> asyncPostChatProcessingService.processHandlerTasks(history.getHistoryId(), chatRequestDto.getMessage(), fullResponse, similarCaseDocuments, similarLawDocuments))
10075
.map(fullResponse -> createChatResponse(history, fullResponse, similarCaseDocuments, similarLawDocuments))
10176
.flux()
10277
.onErrorResume(throwable -> {

backend/src/main/java/com/ai/lawyer/infrastructure/kafka/consumer/ChatPostProcessingConsumer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.springframework.ai.chat.messages.AssistantMessage;
2020
import org.springframework.ai.chat.messages.MessageType;
2121
import org.springframework.beans.factory.annotation.Value;
22-
import org.springframework.kafka.annotation.KafkaListener;
2322
import org.springframework.stereotype.Service;
2423
import org.springframework.transaction.annotation.Transactional;
2524

@@ -47,7 +46,7 @@ public class ChatPostProcessingConsumer {
4746
@Value("${custom.ai.keyword-extraction}")
4847
private String keywordExtraction;
4948

50-
@KafkaListener(topics = "chat-post-processing", groupId = "chat-processing-group")
49+
//@KafkaListener(topics = "chat-post-processing", groupId = "chat-processing-group")
5150
@Transactional
5251
public void consume(ChatPostProcessEvent event) {
5352
try {

backend/src/main/resources/application-dev.yml

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,6 @@ spring:
1717
password: ${DEV_REDIS_PASSWORD}
1818
embedded: false
1919

20-
kafka:
21-
bootstrap-servers: localhost:9092
22-
producer:
23-
key-serializer: org.apache.kafka.common.serialization.StringSerializer
24-
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
25-
consumer:
26-
group-id: chat-processing-group # 컨슈머 그룹 ID
27-
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
28-
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
29-
properties:
30-
spring.json.trusted.packages: "*"
31-
3220
batch:
3321
job:
3422
enabled: false # 최소 한번 시작

0 commit comments

Comments
 (0)