Skip to content

Commit d6177ff

Browse files
authored
Merge pull request #355 from prgrms-web-devcourse-final-project/develop
카푸카 제거
2 parents 7805ad4 + da8ccc7 commit d6177ff

File tree

7 files changed

+35
-110
lines changed

7 files changed

+35
-110
lines changed

backend/build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ dependencies {
4141
implementation 'org.springframework.boot:spring-boot-starter-oauth2-client'
4242
implementation group: 'org.springframework.boot', name: 'spring-boot-starter-mail', version: '3.0.5'
4343
implementation 'org.springframework.boot:spring-boot-starter-batch'
44-
implementation 'org.springframework.kafka:spring-kafka'
4544
testImplementation 'org.springframework.kafka:spring-kafka-test'
4645

4746
// API Documentation (문서화)

backend/docker-compose.yml

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -82,40 +82,8 @@ services:
8282
timeout: 5s
8383
retries: 10
8484

85-
zookeeper:
86-
image: confluentinc/cp-zookeeper:7.8.0
87-
container_name: zookeeper
88-
restart: unless-stopped
89-
ports:
90-
- "2181:2181"
91-
environment:
92-
ZOOKEEPER_CLIENT_PORT: 2181
93-
ZOOKEEPER_TICK_TIME: 2000
94-
95-
kafka:
96-
image: confluentinc/cp-kafka:7.8.0
97-
container_name: kafka
98-
restart: unless-stopped
99-
depends_on:
100-
- zookeeper
101-
ports:
102-
- "9092:9092"
103-
environment:
104-
KAFKA_BROKER_ID: 1
105-
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
106-
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
107-
KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092
108-
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
109-
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
110-
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
111-
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
112-
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
113-
volumes:
114-
- kafka-data:/var/lib/kafka/data
115-
11685
volumes:
11786
mysql-data:
11887
redis-data:
11988
qdrant-data:
120-
ollama-data:
121-
kafka-data:
89+
ollama-data:

backend/src/main/java/com/ai/lawyer/domain/chatbot/controller/ChatBotController.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,11 @@ public class ChatBotController {
2323
@Operation(summary = "01. 새로운 채팅", description = "첫 메시지 전송으로 새로운 채팅방을 생성하고 챗봇과 대화를 시작")
2424
@PostMapping(value = "/message")
2525
public Flux<ChatResponse> postNewMessage(@RequestBody ChatRequest chatRequest) {
26-
// SecurityContext에서 memberId를 미리 추출 (컨트롤러 진입 시점)
26+
2727
Long memberId = AuthUtil.getAuthenticatedMemberId();
28-
if (memberId == null) {
29-
throw new IllegalStateException("인증된 사용자가 아닙니다.");
30-
}
28+
3129
log.info("새로운 채팅 요청: memberId={}", memberId);
3230

33-
// memberId를 Flux에 전달 (SecurityContext 전파 문제 방지)
3431
return chatBotService.sendMessage(memberId, chatRequest, null);
3532
}
3633

@@ -39,11 +36,11 @@ public Flux<ChatResponse> postNewMessage(@RequestBody ChatRequest chatRequest) {
3936
public Flux<ChatResponse> postMessage(
4037
@RequestBody ChatRequest chatRequest,
4138
@PathVariable(value = "roomId", required = false) Long roomId) {
42-
// SecurityContext에서 memberId를 미리 추출 (컨트롤러 진입 시점)
39+
4340
Long memberId = AuthUtil.getAuthenticatedMemberId();
41+
4442
log.info("기존 채팅 요청: memberId={}, roomId={}", memberId, roomId);
4543

46-
// memberId를 Flux에 전달 (SecurityContext 전파 문제 방지)
4744
return chatBotService.sendMessage(memberId, chatRequest, roomId);
4845
}
4946

backend/src/main/java/com/ai/lawyer/domain/chatbot/service/AsyncPostChatProcessingService.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
package com.ai.lawyer.domain.chatbot.service;
22

3+
import com.ai.lawyer.domain.chatbot.dto.ChatDto.ChatHistoryDto;
4+
import com.ai.lawyer.domain.chatbot.dto.ChatDto.ChatLawDto;
5+
import com.ai.lawyer.domain.chatbot.dto.ChatDto.ChatPrecedentDto;
36
import com.ai.lawyer.domain.chatbot.dto.ExtractionDto.KeywordExtractionDto;
47
import com.ai.lawyer.domain.chatbot.dto.ExtractionDto.TitleExtractionDto;
58
import com.ai.lawyer.domain.chatbot.entity.*;
69
import com.ai.lawyer.domain.chatbot.repository.*;
10+
import com.ai.lawyer.infrastructure.redis.service.ChatCacheService;
711
import lombok.RequiredArgsConstructor;
812
import lombok.extern.slf4j.Slf4j;
913
import org.springframework.ai.chat.memory.ChatMemory;
@@ -16,6 +20,7 @@
1620
import org.springframework.stereotype.Service;
1721
import org.springframework.transaction.annotation.Transactional;
1822

23+
import java.util.ArrayList;
1924
import java.util.List;
2025
import java.util.stream.Collectors;
2126

@@ -27,6 +32,8 @@
2732
public class AsyncPostChatProcessingService {
2833

2934
private final KeywordService keywordService;
35+
private final ChatCacheService chatCacheService;
36+
3037
private final HistoryRepository historyRepository;
3138
private final ChatRepository chatRepository;
3239
private final KeywordRankRepository keywordRankRepository;
@@ -98,6 +105,9 @@ private void extractAndUpdateKeywordRanks(String message) {
98105
}
99106

100107
private void saveChatWithDocuments(History history, MessageType type, String message, List<Document> similarCaseDocuments, List<Document> similarLawDocuments) {
108+
List<ChatPrecedent> chatPrecedents = new ArrayList<>();
109+
List<ChatLaw> chatLaws = new ArrayList<>();
110+
101111
Chat chat = chatRepository.save(Chat.builder()
102112
.historyId(history)
103113
.type(type)
@@ -107,7 +117,7 @@ private void saveChatWithDocuments(History history, MessageType type, String mes
107117
// Ai 메시지가 저장될 때 관련 문서 저장
108118
if (type == MessageType.ASSISTANT) {
109119
if (similarCaseDocuments != null && !similarCaseDocuments.isEmpty()) {
110-
List<ChatPrecedent> chatPrecedents = similarCaseDocuments.stream()
120+
chatPrecedents = similarCaseDocuments.stream()
111121
.map(doc -> ChatPrecedent.builder()
112122
.chatId(chat)
113123
.precedentContent(doc.getText())
@@ -119,7 +129,7 @@ private void saveChatWithDocuments(History history, MessageType type, String mes
119129
}
120130

121131
if (similarLawDocuments != null && !similarLawDocuments.isEmpty()) {
122-
List<ChatLaw> chatLaws = similarLawDocuments.stream()
132+
chatLaws = similarLawDocuments.stream()
123133
.map(doc -> ChatLaw.builder()
124134
.chatId(chat)
125135
.content(doc.getText())
@@ -129,5 +139,16 @@ private void saveChatWithDocuments(History history, MessageType type, String mes
129139
chatLawRepository.saveAll(chatLaws);
130140
}
131141
}
142+
143+
// Redis 캐시에 DTO 저장
144+
ChatHistoryDto dto = ChatHistoryDto.builder()
145+
.type(type.toString())
146+
.message(message)
147+
.createdAt(chat.getCreatedAt())
148+
.precedent(chatPrecedents.isEmpty() ? null : ChatPrecedentDto.from(chatPrecedents.get(0)))
149+
.law(chatLaws.isEmpty() ? null : ChatLawDto.from(chatLaws.get(0)))
150+
.build();
151+
152+
chatCacheService.cacheChatMessage(history.getHistoryId(), dto);
132153
}
133154
}

backend/src/main/java/com/ai/lawyer/domain/chatbot/service/ChatBotService.java

Lines changed: 6 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
import com.ai.lawyer.domain.chatbot.entity.History;
88
import com.ai.lawyer.domain.chatbot.repository.HistoryRepository;
99
import com.ai.lawyer.global.qdrant.service.QdrantService;
10-
import com.ai.lawyer.infrastructure.kafka.dto.ChatPostProcessEvent;
11-
import com.ai.lawyer.infrastructure.kafka.dto.DocumentDto;
1210
import lombok.RequiredArgsConstructor;
1311
import lombok.extern.slf4j.Slf4j;
1412
import org.springframework.ai.chat.client.ChatClient;
@@ -22,7 +20,6 @@
2220
import org.springframework.ai.chat.prompt.PromptTemplate;
2321
import org.springframework.ai.document.Document;
2422
import org.springframework.beans.factory.annotation.Value;
25-
import org.springframework.kafka.core.KafkaTemplate;
2623
import org.springframework.stereotype.Service;
2724
import org.springframework.transaction.annotation.Transactional;
2825
import reactor.core.publisher.Flux;
@@ -40,66 +37,41 @@ public class ChatBotService {
4037
private final ChatClient chatClient;
4138
private final QdrantService qdrantService;
4239
private final HistoryService historyService;
40+
private final AsyncPostChatProcessingService asyncPostChatProcessingService;
41+
4342
private final HistoryRepository historyRepository;
4443
private final ChatMemoryRepository chatMemoryRepository;
4544

46-
// KafkaTemplate 주입
47-
private final KafkaTemplate<String, ChatPostProcessEvent> kafkaTemplate;
48-
4945
@Value("${custom.ai.system-message}")
5046
private String systemMessageTemplate;
5147

52-
// Kafka 토픽 이름 -> 추후 application.yml로 이동 고려
53-
private static final String POST_PROCESSING_TOPIC = "chat-post-processing";
54-
5548
// 핵심 로직
5649
// 멤버 조회 -> 벡터 검색 -> 프롬프트 생성 -> LLM 호출 (스트림) -> Kafka 이벤트 발행 -> 응답 반환
5750
@Transactional
5851
public Flux<ChatResponse> sendMessage(Long memberId, ChatRequest chatRequestDto, Long roomId) {
5952

60-
// 벡터 검색 (판례, 법령) (블로킹)
53+
// 벡터 검색 (판례, 법령)
6154
List<Document> similarCaseDocuments = qdrantService.searchDocument(chatRequestDto.getMessage(), "type", "판례");
6255
List<Document> similarLawDocuments = qdrantService.searchDocument(chatRequestDto.getMessage(), "type", "법령");
6356

6457
String caseContext = formatting(similarCaseDocuments);
6558
String lawContext = formatting(similarLawDocuments);
6659

67-
// 채팅방 조회 또는 생성 (블로킹)
60+
// 채팅방 조회 또는 생성
6861
History history = getOrCreateRoom(memberId, roomId);
6962

70-
// 메시지 기억 관리 (User 메시지 추가)
63+
// 메시지 기억 관리
7164
ChatMemory chatMemory = saveChatMemory(chatRequestDto, history);
7265

7366
// 프롬프트 생성
7467
Prompt prompt = getPrompt(caseContext, lawContext, chatMemory, history);
7568

76-
// 준비된 데이터를 담은 컨텍스트 객체 반환
77-
//return new PreparedChatContext(prompt, history, similarCaseDocuments, similarLawDocuments);
78-
7969
return chatClient.prompt(prompt)
8070
.stream()
8171
.content()
8272
.collectList()
8373
.map(fullResponseList -> String.join("", fullResponseList))
84-
.doOnNext(fullResponse -> {
85-
86-
// Document를 DTO로 변환
87-
List<DocumentDto> caseDtos = similarCaseDocuments.stream().map(DocumentDto::from).collect(Collectors.toList());
88-
List<DocumentDto> lawDtos = similarLawDocuments.stream().map(DocumentDto::from).collect(Collectors.toList());
89-
90-
// Kafka로 보낼 이벤트 객체
91-
ChatPostProcessEvent event = new ChatPostProcessEvent(
92-
history.getHistoryId(),
93-
chatRequestDto.getMessage(),
94-
fullResponse,
95-
caseDtos,
96-
lawDtos
97-
);
98-
99-
// Kafka 이벤트 발행
100-
kafkaTemplate.send(POST_PROCESSING_TOPIC, event);
101-
102-
})
74+
.doOnNext(fullResponse -> asyncPostChatProcessingService.processHandlerTasks(history.getHistoryId(), chatRequestDto.getMessage(), fullResponse, similarCaseDocuments, similarLawDocuments))
10375
.map(fullResponse -> createChatResponse(history, fullResponse, similarCaseDocuments, similarLawDocuments))
10476
.flux()
10577
.onErrorResume(throwable -> {
@@ -176,23 +148,4 @@ private ChatResponse handleError(History history) {
176148
.build();
177149
}
178150

179-
/**
180-
* 블로킹 작업에서 준비된 데이터를 담는 컨텍스트 클래스
181-
* 리액티브 체인에서 데이터를 전달하기 위한 내부 클래스
182-
*/
183-
private static class PreparedChatContext {
184-
final Prompt prompt;
185-
final History history;
186-
final List<Document> similarCaseDocuments;
187-
final List<Document> similarLawDocuments;
188-
189-
PreparedChatContext(Prompt prompt, History history,
190-
List<Document> similarCaseDocuments,
191-
List<Document> similarLawDocuments) {
192-
this.prompt = prompt;
193-
this.history = history;
194-
this.similarCaseDocuments = similarCaseDocuments;
195-
this.similarLawDocuments = similarLawDocuments;
196-
}
197-
}
198151
}

backend/src/main/java/com/ai/lawyer/infrastructure/kafka/consumer/ChatPostProcessingConsumer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.springframework.ai.chat.messages.AssistantMessage;
2020
import org.springframework.ai.chat.messages.MessageType;
2121
import org.springframework.beans.factory.annotation.Value;
22-
import org.springframework.kafka.annotation.KafkaListener;
2322
import org.springframework.stereotype.Service;
2423
import org.springframework.transaction.annotation.Transactional;
2524

@@ -47,7 +46,7 @@ public class ChatPostProcessingConsumer {
4746
@Value("${custom.ai.keyword-extraction}")
4847
private String keywordExtraction;
4948

50-
@KafkaListener(topics = "chat-post-processing", groupId = "chat-processing-group")
49+
//@KafkaListener(topics = "chat-post-processing", groupId = "chat-processing-group")
5150
@Transactional
5251
public void consume(ChatPostProcessEvent event) {
5352
try {

backend/src/main/resources/application-dev.yml

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,6 @@ spring:
1717
password: ${DEV_REDIS_PASSWORD}
1818
embedded: false
1919

20-
kafka:
21-
bootstrap-servers: localhost:9092
22-
producer:
23-
key-serializer: org.apache.kafka.common.serialization.StringSerializer
24-
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
25-
consumer:
26-
group-id: chat-processing-group # 컨슈머 그룹 ID
27-
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
28-
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
29-
properties:
30-
spring.json.trusted.packages: "*"
31-
3220
batch:
3321
job:
3422
enabled: false # 최소 한번 시작

0 commit comments

Comments
 (0)