Skip to content

Commit a1bbdd8

Browse files
authored
Merge pull request #271 from prgrms-web-devcourse-final-project/feat/kafka
Feat/kafka
2 parents 6cef13a + 9161ed7 commit a1bbdd8

File tree

9 files changed

+269
-15
lines changed

9 files changed

+269
-15
lines changed

backend/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ dependencies {
4141
implementation 'org.springframework.boot:spring-boot-starter-oauth2-client'
4242
implementation group: 'org.springframework.boot', name: 'spring-boot-starter-mail', version: '3.0.5'
4343
implementation 'org.springframework.boot:spring-boot-starter-batch'
44+
implementation 'org.springframework.kafka:spring-kafka'
45+
testImplementation 'org.springframework.kafka:spring-kafka-test'
4446

4547
// API Documentation (문서화)
4648
implementation 'org.apache.commons:commons-lang3:3.18.0'

backend/docker-compose.yml

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@ services:
1717
volumes:
1818
- mysql-data:/var/lib/mysql
1919
- ./sql:/sql # 호스트 ./sql 폴더를 컨테이너 /sql에 마운트
20-
command: >
20+
command: >
2121
--character-set-server=utf8mb4
2222
--collation-server=utf8mb4_0900_ai_ci
2323
--default-time-zone=Asia/Seoul
2424
--skip-log-bin
2525
--lower-case-table-names=1
2626
healthcheck:
27-
test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-p${DEV_DATASOURCE_PASSWORD}"]
27+
test: [ "CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-p${DEV_DATASOURCE_PASSWORD}" ]
2828
interval: 10s
2929
timeout: 5s
3030
retries: 10
@@ -40,7 +40,7 @@ services:
4040
command: >
4141
redis-server --appendonly yes
4242
healthcheck:
43-
test: ["CMD", "redis-cli", "ping"]
43+
test: [ "CMD", "redis-cli", "ping" ]
4444
interval: 10s
4545
timeout: 5s
4646
retries: 10
@@ -78,8 +78,40 @@ services:
7878
timeout: 5s
7979
retries: 10
8080

81+
zookeeper:
82+
image: confluentinc/cp-zookeeper:7.4.4
83+
container_name: zookeeper
84+
restart: unless-stopped
85+
ports:
86+
- "2181:2181"
87+
environment:
88+
ZOOKEEPER_CLIENT_PORT: 2181
89+
ZOOKEEPER_TICK_TIME: 2000
90+
91+
kafka:
92+
image: confluentinc/cp-kafka:7.4.4
93+
container_name: kafka
94+
restart: unless-stopped
95+
depends_on:
96+
- zookeeper
97+
ports:
98+
- "9092:9092"
99+
environment:
100+
KAFKA_BROKER_ID: 1
101+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
102+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
103+
KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092
104+
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
105+
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
106+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
107+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
108+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
109+
volumes:
110+
- kafka-data:/var/lib/kafka/data
111+
81112
volumes:
82113
mysql-data:
83114
redis-data:
84115
qdrant-data:
85-
ollama-data:
116+
ollama-data:
117+
kafka-data:

backend/src/main/java/com/ai/lawyer/domain/chatbot/service/AsyncPostChatProcessingService.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,14 @@
1313
import org.springframework.ai.chat.messages.MessageType;
1414
import org.springframework.ai.document.Document;
1515
import org.springframework.beans.factory.annotation.Value;
16-
import org.springframework.scheduling.annotation.Async;
1716
import org.springframework.stereotype.Service;
1817
import org.springframework.transaction.annotation.Transactional;
1918

2019
import java.util.List;
2120
import java.util.stream.Collectors;
2221

22+
// 더이상 사용 안함
23+
// 테스트 용도로 남겨둠
2324
@Slf4j
2425
@Service
2526
@RequiredArgsConstructor
@@ -38,7 +39,7 @@ public class AsyncPostChatProcessingService {
3839
@Value("{$custom.ai.keyword-extraction}")
3940
private String keywordExtraction;
4041

41-
@Async
42+
//@Async
4243
@Transactional
4344
public void processHandlerTasks(Long historyId, String userMessage, String fullResponse, List<Document> similarCaseDocuments, List<Document> similarLawDocuments) {
4445
try {
@@ -74,7 +75,7 @@ private void setHistoryTitle(String userMessage, History history, String fullRes
7475
String targetText = fullResponse.contains("해당 질문은 법률") ? userMessage : fullResponse;
7576
TitleExtractionDto titleDto = keywordService.keywordExtract(targetText, titleExtraction, TitleExtractionDto.class);
7677
history.setTitle(titleDto.getTitle());
77-
historyRepository.save(history); // @Transactional 어노테이션으로 인해 메소드 종료 시 자동 저장되지만, 명시적으로 호출할 수도 있습니다.
78+
historyRepository.save(history);
7879
}
7980

8081
private void extractAndUpdateKeywordRanks(String message) {

backend/src/main/java/com/ai/lawyer/domain/chatbot/service/ChatBotService.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import com.ai.lawyer.domain.chatbot.dto.ChatDto.ChatResponse;
77
import com.ai.lawyer.domain.chatbot.entity.History;
88
import com.ai.lawyer.domain.chatbot.repository.HistoryRepository;
9+
import com.ai.lawyer.domain.kafka.dto.ChatPostProcessEvent;
10+
import com.ai.lawyer.domain.kafka.dto.DocumentDto;
911
import com.ai.lawyer.domain.member.entity.Member;
1012
import com.ai.lawyer.domain.member.repositories.MemberRepository;
1113
import com.ai.lawyer.global.qdrant.service.QdrantService;
@@ -22,6 +24,7 @@
2224
import org.springframework.ai.chat.prompt.PromptTemplate;
2325
import org.springframework.ai.document.Document;
2426
import org.springframework.beans.factory.annotation.Value;
27+
import org.springframework.kafka.core.KafkaTemplate;
2528
import org.springframework.stereotype.Service;
2629
import org.springframework.transaction.annotation.Transactional;
2730
import reactor.core.publisher.Flux;
@@ -39,18 +42,21 @@ public class ChatBotService {
3942
private final ChatClient chatClient;
4043
private final QdrantService qdrantService;
4144
private final HistoryService historyService;
42-
43-
private final AsyncPostChatProcessingService asyncPostChatProcessingService;
44-
4545
private final MemberRepository memberRepository;
4646
private final HistoryRepository historyRepository;
4747
private final ChatMemoryRepository chatMemoryRepository;
4848

49+
// KafkaTemplate 주입
50+
private final KafkaTemplate<String, ChatPostProcessEvent> kafkaTemplate;
51+
4952
@Value("${custom.ai.system-message}")
5053
private String systemMessageTemplate;
5154

55+
// Kafka 토픽 이름 -> 추후 application.yml로 이동 고려
56+
private static final String POST_PROCESSING_TOPIC = "chat-post-processing";
57+
5258
// 핵심 로직
53-
// 멤버 조회 -> 벡터 검색 -> 프롬프트 생성 -> LLM 호출 (스트림) -> (비동기 후처리) -> 응답 반환
59+
// 멤버 조회 -> 벡터 검색 -> 프롬프트 생성 -> LLM 호출 (스트림) -> Kafka 이벤트 발행 -> 응답 반환
5460
@Transactional
5561
public Flux<ChatResponse> sendMessage(Long memberId, ChatRequest chatRequestDto, Long roomId) {
5662

@@ -79,10 +85,31 @@ public Flux<ChatResponse> sendMessage(Long memberId, ChatRequest chatRequestDto,
7985
.content()
8086
.collectList()
8187
.map(fullResponseList -> String.join("", fullResponseList))
82-
.doOnNext(fullResponse -> asyncPostChatProcessingService.processHandlerTasks(history.getHistoryId(), chatRequestDto.getMessage(), fullResponse, similarCaseDocuments, similarLawDocuments)) // 비동기 후처리
88+
.doOnNext(fullResponse -> {
89+
90+
// Document를 DTO로 변환
91+
List<DocumentDto> caseDtos = similarCaseDocuments.stream().map(DocumentDto::from).collect(Collectors.toList());
92+
List<DocumentDto> lawDtos = similarLawDocuments.stream().map(DocumentDto::from).collect(Collectors.toList());
93+
94+
// Kafka로 보낼 이벤트 객체
95+
ChatPostProcessEvent event = new ChatPostProcessEvent(
96+
history.getHistoryId(),
97+
chatRequestDto.getMessage(),
98+
fullResponse,
99+
caseDtos,
100+
lawDtos
101+
);
102+
103+
// Kafka 이벤트 발행
104+
kafkaTemplate.send(POST_PROCESSING_TOPIC, event);
105+
106+
})
83107
.map(fullResponse -> createChatResponse(history, fullResponse, similarCaseDocuments, similarLawDocuments))
84108
.flux()
85-
.onErrorResume(throwable -> Flux.just(handleError(history)));
109+
.onErrorResume(throwable -> {
110+
log.error("스트리밍 처리 중 에러 발생 (historyId: {})", history.getHistoryId(), throwable);
111+
return Flux.just(handleError(history));
112+
});
86113
}
87114

88115
private ChatResponse createChatResponse(History history, String fullResponse, List<Document> cases, List<Document> laws) {
@@ -111,7 +138,7 @@ private ChatMemory saveChatMemory(ChatRequest chatRequestDto, History history) {
111138
.maxMessages(10)
112139
.chatMemoryRepository(chatMemoryRepository)
113140
.build();
114-
// 사용자 메시지를 메모리에 추가 -> ai 답변은 비동기 후처리에서 추가
141+
// 사용자 메시지를 메모리에 추가 -> ai 답변은 Consumer에서 추가
115142
chatMemory.add(String.valueOf(history.getHistoryId()), new UserMessage(chatRequestDto.getMessage()));
116143
return chatMemory;
117144
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package com.ai.lawyer.domain.kafka.consumer;
2+
3+
import com.ai.lawyer.domain.chatbot.dto.ExtractionDto.KeywordExtractionDto;
4+
import com.ai.lawyer.domain.chatbot.dto.ExtractionDto.TitleExtractionDto;
5+
import com.ai.lawyer.domain.chatbot.entity.*;
6+
import com.ai.lawyer.domain.chatbot.repository.*;
7+
import com.ai.lawyer.domain.chatbot.service.KeywordService;
8+
import com.ai.lawyer.domain.kafka.dto.ChatPostProcessEvent;
9+
import com.ai.lawyer.domain.kafka.dto.DocumentDto;
10+
import lombok.RequiredArgsConstructor;
11+
import lombok.extern.slf4j.Slf4j;
12+
import org.springframework.ai.chat.memory.ChatMemory;
13+
import org.springframework.ai.chat.memory.ChatMemoryRepository;
14+
import org.springframework.ai.chat.memory.MessageWindowChatMemory;
15+
import org.springframework.ai.chat.messages.AssistantMessage;
16+
import org.springframework.ai.chat.messages.MessageType;
17+
import org.springframework.beans.factory.annotation.Value;
18+
import org.springframework.kafka.annotation.KafkaListener;
19+
import org.springframework.stereotype.Service;
20+
import org.springframework.transaction.annotation.Transactional;
21+
22+
import java.util.List;
23+
import java.util.stream.Collectors;
24+
25+
@Slf4j
26+
@Service
27+
@RequiredArgsConstructor
28+
public class ChatPostProcessingConsumer {
29+
30+
private final KeywordService keywordService;
31+
private final HistoryRepository historyRepository;
32+
private final ChatRepository chatRepository;
33+
private final KeywordRankRepository keywordRankRepository;
34+
private final ChatMemoryRepository chatMemoryRepository;
35+
private final ChatPrecedentRepository chatPrecedentRepository;
36+
private final ChatLawRepository chatLawRepository;
37+
38+
@Value("${custom.ai.title-extraction}")
39+
private String titleExtraction;
40+
@Value("${custom.ai.keyword-extraction}")
41+
private String keywordExtraction;
42+
43+
@KafkaListener(topics = "chat-post-processing", groupId = "chat-processing-group")
44+
@Transactional
45+
public void consume(ChatPostProcessEvent event) {
46+
try {
47+
History history = historyRepository.findById(event.getHistoryId())
48+
.orElseThrow(() -> new IllegalArgumentException("존재하지 않는 채팅방입니다. historyId: " + event.getHistoryId()));
49+
50+
// 1. 메시지 기억 저장 (Assistant 응답)
51+
ChatMemory chatMemory = MessageWindowChatMemory.builder()
52+
.maxMessages(10)
53+
.chatMemoryRepository(chatMemoryRepository)
54+
.build();
55+
56+
chatMemory.add(String.valueOf(history.getHistoryId()), new AssistantMessage(event.getChatResponse()));
57+
chatMemoryRepository.saveAll(String.valueOf(history.getHistoryId()), chatMemory.get(String.valueOf(history.getHistoryId())));
58+
59+
// 2. 채팅방 제목 설정 / 및 필터
60+
setHistoryTitle(event.getUserMessage(), history, event.getChatResponse());
61+
62+
// 3. 채팅 기록 저장
63+
saveChatWithDocuments(history, MessageType.USER, event.getUserMessage(), event.getSimilarCaseDocuments(), event.getSimilarLawDocuments());
64+
saveChatWithDocuments(history, MessageType.ASSISTANT, event.getChatResponse(), event.getSimilarCaseDocuments(), event.getSimilarLawDocuments());
65+
66+
// 4. 키워드 추출 및 랭킹 업데이트
67+
if (!event.getChatResponse().contains("해당 질문은 법률")) {
68+
extractAndUpdateKeywordRanks(event.getUserMessage());
69+
}
70+
} catch (Exception e) {
71+
log.error("Kafka 이벤트 처리 중 에러 발생 (historyId: {}): ", event.getHistoryId(), e);
72+
}
73+
}
74+
75+
private void setHistoryTitle(String userMessage, History history, String fullResponse) {
76+
String targetText = fullResponse.contains("해당 질문은 법률") ? userMessage : fullResponse;
77+
TitleExtractionDto titleDto = keywordService.keywordExtract(targetText, titleExtraction, TitleExtractionDto.class);
78+
history.setTitle(titleDto.getTitle());
79+
historyRepository.save(history);
80+
}
81+
82+
private void extractAndUpdateKeywordRanks(String message) {
83+
KeywordExtractionDto keywordResponse = keywordService.keywordExtract(message, keywordExtraction, KeywordExtractionDto.class);
84+
if (keywordResponse == null || keywordResponse.getKeyword() == null) {
85+
return;
86+
}
87+
88+
KeywordRank keywordRank = keywordRankRepository.findByKeyword(keywordResponse.getKeyword());
89+
90+
if (keywordRank == null) {
91+
keywordRank = KeywordRank.builder()
92+
.keyword(keywordResponse.getKeyword())
93+
.score(1L)
94+
.build();
95+
} else {
96+
keywordRank.setScore(keywordRank.getScore() + 1);
97+
}
98+
keywordRankRepository.save(keywordRank);
99+
}
100+
101+
private void saveChatWithDocuments(History history, MessageType type, String message, List<DocumentDto> similarCaseDocuments, List<DocumentDto> similarLawDocuments) {
102+
Chat chat = chatRepository.save(Chat.builder()
103+
.historyId(history)
104+
.type(type)
105+
.message(message)
106+
.build());
107+
108+
// Ai 메시지가 저장될 때 관련 문서 저장
109+
if (type == MessageType.ASSISTANT) {
110+
if (similarCaseDocuments != null && !similarCaseDocuments.isEmpty()) {
111+
List<ChatPrecedent> chatPrecedents = similarCaseDocuments.stream()
112+
.map(doc -> ChatPrecedent.builder()
113+
.chatId(chat)
114+
.precedentContent(doc.getText())
115+
.caseNumber(doc.getMetadata().get("caseNumber").toString())
116+
.caseName(doc.getMetadata().get("caseName").toString())
117+
.build())
118+
.collect(Collectors.toList());
119+
chatPrecedentRepository.saveAll(chatPrecedents);
120+
}
121+
122+
if (similarLawDocuments != null && !similarLawDocuments.isEmpty()) {
123+
List<ChatLaw> chatLaws = similarLawDocuments.stream()
124+
.map(doc -> ChatLaw.builder()
125+
.chatId(chat)
126+
.content(doc.getText())
127+
.lawName(doc.getMetadata().get("lawName").toString())
128+
.build())
129+
.collect(Collectors.toList());
130+
chatLawRepository.saveAll(chatLaws);
131+
}
132+
}
133+
}
134+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.ai.lawyer.domain.kafka.dto;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Data;
5+
import lombok.NoArgsConstructor;
6+
7+
import java.util.List;
8+
9+
@Data
10+
@NoArgsConstructor
11+
@AllArgsConstructor
12+
public class ChatPostProcessEvent {
13+
private Long historyId;
14+
private String userMessage;
15+
private String chatResponse;
16+
private List<DocumentDto> similarCaseDocuments;
17+
private List<DocumentDto> similarLawDocuments;
18+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.ai.lawyer.domain.kafka.dto;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Data;
5+
import lombok.NoArgsConstructor;
6+
import org.springframework.ai.document.Document;
7+
8+
import java.util.Map;
9+
10+
@Data
11+
@NoArgsConstructor
12+
@AllArgsConstructor
13+
public class DocumentDto {
14+
private String text;
15+
private Map<String, Object> metadata;
16+
17+
public static DocumentDto from(Document document) {
18+
return new DocumentDto(document.getText(), document.getMetadata());
19+
}
20+
}

0 commit comments

Comments
 (0)