Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion backend/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-oauth2-client'
implementation group: 'org.springframework.boot', name: 'spring-boot-starter-mail', version: '3.0.5'
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'

// API Documentation (문서화)
Expand Down
34 changes: 1 addition & 33 deletions backend/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,40 +82,8 @@ services:
timeout: 5s
retries: 10

zookeeper:
image: confluentinc/cp-zookeeper:7.8.0
container_name: zookeeper
restart: unless-stopped
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-kafka:7.8.0
container_name: kafka
restart: unless-stopped
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
volumes:
- kafka-data:/var/lib/kafka/data

volumes:
mysql-data:
redis-data:
qdrant-data:
ollama-data:
kafka-data:
ollama-data:
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@ public class ChatBotController {
@Operation(summary = "01. 새로운 채팅", description = "첫 메시지 전송으로 새로운 채팅방을 생성하고 챗봇과 대화를 시작")
@PostMapping(value = "/message")
public Flux<ChatResponse> postNewMessage(@RequestBody ChatRequest chatRequest) {
// SecurityContext에서 memberId를 미리 추출 (컨트롤러 진입 시점)

Long memberId = AuthUtil.getAuthenticatedMemberId();
if (memberId == null) {
throw new IllegalStateException("인증된 사용자가 아닙니다.");
}

log.info("새로운 채팅 요청: memberId={}", memberId);

// memberId를 Flux에 전달 (SecurityContext 전파 문제 방지)
return chatBotService.sendMessage(memberId, chatRequest, null);
}

Expand All @@ -39,11 +36,11 @@ public Flux<ChatResponse> postNewMessage(@RequestBody ChatRequest chatRequest) {
public Flux<ChatResponse> postMessage(
@RequestBody ChatRequest chatRequest,
@PathVariable(value = "roomId", required = false) Long roomId) {
// SecurityContext에서 memberId를 미리 추출 (컨트롤러 진입 시점)

Long memberId = AuthUtil.getAuthenticatedMemberId();

log.info("기존 채팅 요청: memberId={}, roomId={}", memberId, roomId);

// memberId를 Flux에 전달 (SecurityContext 전파 문제 방지)
return chatBotService.sendMessage(memberId, chatRequest, roomId);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.ai.lawyer.domain.chatbot.service;

import com.ai.lawyer.domain.chatbot.dto.ChatDto.ChatHistoryDto;
import com.ai.lawyer.domain.chatbot.dto.ChatDto.ChatLawDto;
import com.ai.lawyer.domain.chatbot.dto.ChatDto.ChatPrecedentDto;
import com.ai.lawyer.domain.chatbot.dto.ExtractionDto.KeywordExtractionDto;
import com.ai.lawyer.domain.chatbot.dto.ExtractionDto.TitleExtractionDto;
import com.ai.lawyer.domain.chatbot.entity.*;
import com.ai.lawyer.domain.chatbot.repository.*;
import com.ai.lawyer.infrastructure.redis.service.ChatCacheService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.memory.ChatMemory;
Expand All @@ -16,6 +20,7 @@
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -27,6 +32,8 @@
public class AsyncPostChatProcessingService {

private final KeywordService keywordService;
private final ChatCacheService chatCacheService;

private final HistoryRepository historyRepository;
private final ChatRepository chatRepository;
private final KeywordRankRepository keywordRankRepository;
Expand Down Expand Up @@ -98,6 +105,9 @@ private void extractAndUpdateKeywordRanks(String message) {
}

private void saveChatWithDocuments(History history, MessageType type, String message, List<Document> similarCaseDocuments, List<Document> similarLawDocuments) {
List<ChatPrecedent> chatPrecedents = new ArrayList<>();
List<ChatLaw> chatLaws = new ArrayList<>();

Chat chat = chatRepository.save(Chat.builder()
.historyId(history)
.type(type)
Expand All @@ -107,7 +117,7 @@ private void saveChatWithDocuments(History history, MessageType type, String mes
// Ai 메시지가 저장될 때 관련 문서 저장
if (type == MessageType.ASSISTANT) {
if (similarCaseDocuments != null && !similarCaseDocuments.isEmpty()) {
List<ChatPrecedent> chatPrecedents = similarCaseDocuments.stream()
chatPrecedents = similarCaseDocuments.stream()
.map(doc -> ChatPrecedent.builder()
.chatId(chat)
.precedentContent(doc.getText())
Expand All @@ -119,7 +129,7 @@ private void saveChatWithDocuments(History history, MessageType type, String mes
}

if (similarLawDocuments != null && !similarLawDocuments.isEmpty()) {
List<ChatLaw> chatLaws = similarLawDocuments.stream()
chatLaws = similarLawDocuments.stream()
.map(doc -> ChatLaw.builder()
.chatId(chat)
.content(doc.getText())
Expand All @@ -129,5 +139,16 @@ private void saveChatWithDocuments(History history, MessageType type, String mes
chatLawRepository.saveAll(chatLaws);
}
}

// Redis 캐시에 DTO 저장
ChatHistoryDto dto = ChatHistoryDto.builder()
.type(type.toString())
.message(message)
.createdAt(chat.getCreatedAt())
.precedent(chatPrecedents.isEmpty() ? null : ChatPrecedentDto.from(chatPrecedents.get(0)))
.law(chatLaws.isEmpty() ? null : ChatLawDto.from(chatLaws.get(0)))
.build();

chatCacheService.cacheChatMessage(history.getHistoryId(), dto);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import com.ai.lawyer.domain.chatbot.entity.History;
import com.ai.lawyer.domain.chatbot.repository.HistoryRepository;
import com.ai.lawyer.global.qdrant.service.QdrantService;
import com.ai.lawyer.infrastructure.kafka.dto.ChatPostProcessEvent;
import com.ai.lawyer.infrastructure.kafka.dto.DocumentDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
Expand All @@ -22,7 +20,6 @@
import org.springframework.ai.chat.prompt.PromptTemplate;
import org.springframework.ai.document.Document;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
Expand All @@ -40,66 +37,41 @@ public class ChatBotService {
private final ChatClient chatClient;
private final QdrantService qdrantService;
private final HistoryService historyService;
private final AsyncPostChatProcessingService asyncPostChatProcessingService;

private final HistoryRepository historyRepository;
private final ChatMemoryRepository chatMemoryRepository;

// KafkaTemplate 주입
private final KafkaTemplate<String, ChatPostProcessEvent> kafkaTemplate;

@Value("${custom.ai.system-message}")
private String systemMessageTemplate;

// Kafka 토픽 이름 -> 추후 application.yml로 이동 고려
private static final String POST_PROCESSING_TOPIC = "chat-post-processing";

// 핵심 로직
// 멤버 조회 -> 벡터 검색 -> 프롬프트 생성 -> LLM 호출 (스트림) -> Kafka 이벤트 발행 -> 응답 반환
@Transactional
public Flux<ChatResponse> sendMessage(Long memberId, ChatRequest chatRequestDto, Long roomId) {

// 벡터 검색 (판례, 법령) (블로킹)
// 벡터 검색 (판례, 법령)
List<Document> similarCaseDocuments = qdrantService.searchDocument(chatRequestDto.getMessage(), "type", "판례");
List<Document> similarLawDocuments = qdrantService.searchDocument(chatRequestDto.getMessage(), "type", "법령");

String caseContext = formatting(similarCaseDocuments);
String lawContext = formatting(similarLawDocuments);

// 채팅방 조회 또는 생성 (블로킹)
// 채팅방 조회 또는 생성
History history = getOrCreateRoom(memberId, roomId);

// 메시지 기억 관리 (User 메시지 추가)
// 메시지 기억 관리
ChatMemory chatMemory = saveChatMemory(chatRequestDto, history);

// 프롬프트 생성
Prompt prompt = getPrompt(caseContext, lawContext, chatMemory, history);

// 준비된 데이터를 담은 컨텍스트 객체 반환
//return new PreparedChatContext(prompt, history, similarCaseDocuments, similarLawDocuments);

return chatClient.prompt(prompt)
.stream()
.content()
.collectList()
.map(fullResponseList -> String.join("", fullResponseList))
.doOnNext(fullResponse -> {

// Document를 DTO로 변환
List<DocumentDto> caseDtos = similarCaseDocuments.stream().map(DocumentDto::from).collect(Collectors.toList());
List<DocumentDto> lawDtos = similarLawDocuments.stream().map(DocumentDto::from).collect(Collectors.toList());

// Kafka로 보낼 이벤트 객체
ChatPostProcessEvent event = new ChatPostProcessEvent(
history.getHistoryId(),
chatRequestDto.getMessage(),
fullResponse,
caseDtos,
lawDtos
);

// Kafka 이벤트 발행
kafkaTemplate.send(POST_PROCESSING_TOPIC, event);

})
.doOnNext(fullResponse -> asyncPostChatProcessingService.processHandlerTasks(history.getHistoryId(), chatRequestDto.getMessage(), fullResponse, similarCaseDocuments, similarLawDocuments))
.map(fullResponse -> createChatResponse(history, fullResponse, similarCaseDocuments, similarLawDocuments))
.flux()
.onErrorResume(throwable -> {
Expand Down Expand Up @@ -176,23 +148,4 @@ private ChatResponse handleError(History history) {
.build();
}

/**
* 블로킹 작업에서 준비된 데이터를 담는 컨텍스트 클래스
* 리액티브 체인에서 데이터를 전달하기 위한 내부 클래스
*/
private static class PreparedChatContext {
final Prompt prompt;
final History history;
final List<Document> similarCaseDocuments;
final List<Document> similarLawDocuments;

PreparedChatContext(Prompt prompt, History history,
List<Document> similarCaseDocuments,
List<Document> similarLawDocuments) {
this.prompt = prompt;
this.history = history;
this.similarCaseDocuments = similarCaseDocuments;
this.similarLawDocuments = similarLawDocuments;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.springframework.ai.chat.messages.AssistantMessage;
import org.springframework.ai.chat.messages.MessageType;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

Expand Down Expand Up @@ -47,7 +46,7 @@ public class ChatPostProcessingConsumer {
@Value("${custom.ai.keyword-extraction}")
private String keywordExtraction;

@KafkaListener(topics = "chat-post-processing", groupId = "chat-processing-group")
//@KafkaListener(topics = "chat-post-processing", groupId = "chat-processing-group")
@Transactional
public void consume(ChatPostProcessEvent event) {
try {
Expand Down
12 changes: 0 additions & 12 deletions backend/src/main/resources/application-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,6 @@ spring:
password: ${DEV_REDIS_PASSWORD}
embedded: false

kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: chat-processing-group # 컨슈머 그룹 ID
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"

batch:
job:
enabled: false # 최소 한번 시작
Expand Down