Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/.env.default
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,5 @@ CUSTOM_CORS_ALLOWED_ORIGINS=NEED_TO_SET
CUSTOM_OAUTH2_REDIRECT_URL=NEED_TO_SET
CUSTOM_OAUTH2_FAILURE_URL=NEED_TO_SET
CUSTOM_FRONTEND_URL=NEED_TO_SET
PROD_COOKIE_DOMAIN=NEED_TO_SET
DEV_COOKIE_DOMAIN=NEED_TO_SET
2 changes: 2 additions & 0 deletions backend/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-oauth2-client'
implementation group: 'org.springframework.boot', name: 'spring-boot-starter-mail', version: '3.0.5'
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'

// API Documentation (문서화)
implementation 'org.apache.commons:commons-lang3:3.18.0'
Expand Down
40 changes: 36 additions & 4 deletions backend/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ services:
volumes:
- mysql-data:/var/lib/mysql
- ./sql:/sql # 호스트 ./sql 폴더를 컨테이너 /sql에 마운트
command: >
command: >
--character-set-server=utf8mb4
--collation-server=utf8mb4_0900_ai_ci
--default-time-zone=Asia/Seoul
--skip-log-bin
--lower-case-table-names=1
healthcheck:
test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-p${DEV_DATASOURCE_PASSWORD}"]
test: [ "CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-p${DEV_DATASOURCE_PASSWORD}" ]
interval: 10s
timeout: 5s
retries: 10
Expand All @@ -40,7 +40,7 @@ services:
command: >
redis-server --appendonly yes
healthcheck:
test: ["CMD", "redis-cli", "ping"]
test: [ "CMD", "redis-cli", "ping" ]
interval: 10s
timeout: 5s
retries: 10
Expand Down Expand Up @@ -78,8 +78,40 @@ services:
timeout: 5s
retries: 10

zookeeper:
image: confluentinc/cp-zookeeper:7.4.4
container_name: zookeeper
restart: unless-stopped
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-kafka:7.4.4
container_name: kafka
restart: unless-stopped
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
volumes:
- kafka-data:/var/lib/kafka/data

volumes:
mysql-data:
redis-data:
qdrant-data:
ollama-data:
ollama-data:
kafka-data:
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
import org.springframework.ai.chat.messages.MessageType;
import org.springframework.ai.document.Document;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.stream.Collectors;

// 더이상 사용 안함
// 테스트 용도로 남겨둠
@Slf4j
@Service
@RequiredArgsConstructor
Expand All @@ -38,7 +39,7 @@ public class AsyncPostChatProcessingService {
@Value("{$custom.ai.keyword-extraction}")
private String keywordExtraction;

@Async
//@Async
@Transactional
public void processHandlerTasks(Long historyId, String userMessage, String fullResponse, List<Document> similarCaseDocuments, List<Document> similarLawDocuments) {
try {
Expand Down Expand Up @@ -74,7 +75,7 @@ private void setHistoryTitle(String userMessage, History history, String fullRes
String targetText = fullResponse.contains("해당 질문은 법률") ? userMessage : fullResponse;
TitleExtractionDto titleDto = keywordService.keywordExtract(targetText, titleExtraction, TitleExtractionDto.class);
history.setTitle(titleDto.getTitle());
historyRepository.save(history); // @Transactional 어노테이션으로 인해 메소드 종료 시 자동 저장되지만, 명시적으로 호출할 수도 있습니다.
historyRepository.save(history);
}

private void extractAndUpdateKeywordRanks(String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import com.ai.lawyer.domain.chatbot.dto.ChatDto.ChatResponse;
import com.ai.lawyer.domain.chatbot.entity.History;
import com.ai.lawyer.domain.chatbot.repository.HistoryRepository;
import com.ai.lawyer.domain.kafka.dto.ChatPostProcessEvent;
import com.ai.lawyer.domain.kafka.dto.DocumentDto;
import com.ai.lawyer.domain.member.entity.Member;
import com.ai.lawyer.domain.member.repositories.MemberRepository;
import com.ai.lawyer.global.qdrant.service.QdrantService;
Expand All @@ -22,6 +24,7 @@
import org.springframework.ai.chat.prompt.PromptTemplate;
import org.springframework.ai.document.Document;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
Expand All @@ -39,18 +42,21 @@ public class ChatBotService {
private final ChatClient chatClient;
private final QdrantService qdrantService;
private final HistoryService historyService;

private final AsyncPostChatProcessingService asyncPostChatProcessingService;

private final MemberRepository memberRepository;
private final HistoryRepository historyRepository;
private final ChatMemoryRepository chatMemoryRepository;

// KafkaTemplate 주입
private final KafkaTemplate<String, ChatPostProcessEvent> kafkaTemplate;

@Value("${custom.ai.system-message}")
private String systemMessageTemplate;

// Kafka 토픽 이름 -> 추후 application.yml로 이동 고려
private static final String POST_PROCESSING_TOPIC = "chat-post-processing";

// 핵심 로직
// 멤버 조회 -> 벡터 검색 -> 프롬프트 생성 -> LLM 호출 (스트림) -> (비동기 후처리) -> 응답 반환
// 멤버 조회 -> 벡터 검색 -> 프롬프트 생성 -> LLM 호출 (스트림) -> Kafka 이벤트 발행 -> 응답 반환
@Transactional
public Flux<ChatResponse> sendMessage(Long memberId, ChatRequest chatRequestDto, Long roomId) {

Expand Down Expand Up @@ -79,10 +85,31 @@ public Flux<ChatResponse> sendMessage(Long memberId, ChatRequest chatRequestDto,
.content()
.collectList()
.map(fullResponseList -> String.join("", fullResponseList))
.doOnNext(fullResponse -> asyncPostChatProcessingService.processHandlerTasks(history.getHistoryId(), chatRequestDto.getMessage(), fullResponse, similarCaseDocuments, similarLawDocuments)) // 비동기 후처리
.doOnNext(fullResponse -> {

// Document를 DTO로 변환
List<DocumentDto> caseDtos = similarCaseDocuments.stream().map(DocumentDto::from).collect(Collectors.toList());
List<DocumentDto> lawDtos = similarLawDocuments.stream().map(DocumentDto::from).collect(Collectors.toList());

// Kafka로 보낼 이벤트 객체
ChatPostProcessEvent event = new ChatPostProcessEvent(
history.getHistoryId(),
chatRequestDto.getMessage(),
fullResponse,
caseDtos,
lawDtos
);

// Kafka 이벤트 발행
kafkaTemplate.send(POST_PROCESSING_TOPIC, event);

})
.map(fullResponse -> createChatResponse(history, fullResponse, similarCaseDocuments, similarLawDocuments))
.flux()
.onErrorResume(throwable -> Flux.just(handleError(history)));
.onErrorResume(throwable -> {
log.error("스트리밍 처리 중 에러 발생 (historyId: {})", history.getHistoryId(), throwable);
return Flux.just(handleError(history));
});
}

private ChatResponse createChatResponse(History history, String fullResponse, List<Document> cases, List<Document> laws) {
Expand Down Expand Up @@ -111,7 +138,7 @@ private ChatMemory saveChatMemory(ChatRequest chatRequestDto, History history) {
.maxMessages(10)
.chatMemoryRepository(chatMemoryRepository)
.build();
// 사용자 메시지를 메모리에 추가 -> ai 답변은 비동기 후처리에서 추가
// 사용자 메시지를 메모리에 추가 -> ai 답변은 Consumer에서 추가
chatMemory.add(String.valueOf(history.getHistoryId()), new UserMessage(chatRequestDto.getMessage()));
return chatMemory;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package com.ai.lawyer.domain.kafka.consumer;

import com.ai.lawyer.domain.chatbot.dto.ExtractionDto.KeywordExtractionDto;
import com.ai.lawyer.domain.chatbot.dto.ExtractionDto.TitleExtractionDto;
import com.ai.lawyer.domain.chatbot.entity.*;
import com.ai.lawyer.domain.chatbot.repository.*;
import com.ai.lawyer.domain.chatbot.service.KeywordService;
import com.ai.lawyer.domain.kafka.dto.ChatPostProcessEvent;
import com.ai.lawyer.domain.kafka.dto.DocumentDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.memory.ChatMemory;
import org.springframework.ai.chat.memory.ChatMemoryRepository;
import org.springframework.ai.chat.memory.MessageWindowChatMemory;
import org.springframework.ai.chat.messages.AssistantMessage;
import org.springframework.ai.chat.messages.MessageType;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.stream.Collectors;

@Slf4j
@Service
@RequiredArgsConstructor
public class ChatPostProcessingConsumer {

private final KeywordService keywordService;
private final HistoryRepository historyRepository;
private final ChatRepository chatRepository;
private final KeywordRankRepository keywordRankRepository;
private final ChatMemoryRepository chatMemoryRepository;
private final ChatPrecedentRepository chatPrecedentRepository;
private final ChatLawRepository chatLawRepository;

@Value("${custom.ai.title-extraction}")
private String titleExtraction;
@Value("${custom.ai.keyword-extraction}")
private String keywordExtraction;

@KafkaListener(topics = "chat-post-processing", groupId = "chat-processing-group")
@Transactional
public void consume(ChatPostProcessEvent event) {
try {
History history = historyRepository.findById(event.getHistoryId())
.orElseThrow(() -> new IllegalArgumentException("존재하지 않는 채팅방입니다. historyId: " + event.getHistoryId()));

// 1. 메시지 기억 저장 (Assistant 응답)
ChatMemory chatMemory = MessageWindowChatMemory.builder()
.maxMessages(10)
.chatMemoryRepository(chatMemoryRepository)
.build();

chatMemory.add(String.valueOf(history.getHistoryId()), new AssistantMessage(event.getChatResponse()));
chatMemoryRepository.saveAll(String.valueOf(history.getHistoryId()), chatMemory.get(String.valueOf(history.getHistoryId())));

// 2. 채팅방 제목 설정 / 및 필터
setHistoryTitle(event.getUserMessage(), history, event.getChatResponse());

// 3. 채팅 기록 저장
saveChatWithDocuments(history, MessageType.USER, event.getUserMessage(), event.getSimilarCaseDocuments(), event.getSimilarLawDocuments());
saveChatWithDocuments(history, MessageType.ASSISTANT, event.getChatResponse(), event.getSimilarCaseDocuments(), event.getSimilarLawDocuments());

// 4. 키워드 추출 및 랭킹 업데이트
if (!event.getChatResponse().contains("해당 질문은 법률")) {
extractAndUpdateKeywordRanks(event.getUserMessage());
}
} catch (Exception e) {
log.error("Kafka 이벤트 처리 중 에러 발생 (historyId: {}): ", event.getHistoryId(), e);
}
}

private void setHistoryTitle(String userMessage, History history, String fullResponse) {
String targetText = fullResponse.contains("해당 질문은 법률") ? userMessage : fullResponse;
TitleExtractionDto titleDto = keywordService.keywordExtract(targetText, titleExtraction, TitleExtractionDto.class);
history.setTitle(titleDto.getTitle());
historyRepository.save(history);
}

private void extractAndUpdateKeywordRanks(String message) {
KeywordExtractionDto keywordResponse = keywordService.keywordExtract(message, keywordExtraction, KeywordExtractionDto.class);
if (keywordResponse == null || keywordResponse.getKeyword() == null) {
return;
}

KeywordRank keywordRank = keywordRankRepository.findByKeyword(keywordResponse.getKeyword());

if (keywordRank == null) {
keywordRank = KeywordRank.builder()
.keyword(keywordResponse.getKeyword())
.score(1L)
.build();
} else {
keywordRank.setScore(keywordRank.getScore() + 1);
}
keywordRankRepository.save(keywordRank);
}

private void saveChatWithDocuments(History history, MessageType type, String message, List<DocumentDto> similarCaseDocuments, List<DocumentDto> similarLawDocuments) {
Chat chat = chatRepository.save(Chat.builder()
.historyId(history)
.type(type)
.message(message)
.build());

// Ai 메시지가 저장될 때 관련 문서 저장
if (type == MessageType.ASSISTANT) {
if (similarCaseDocuments != null && !similarCaseDocuments.isEmpty()) {
List<ChatPrecedent> chatPrecedents = similarCaseDocuments.stream()
.map(doc -> ChatPrecedent.builder()
.chatId(chat)
.precedentContent(doc.getText())
.caseNumber(doc.getMetadata().get("caseNumber").toString())
.caseName(doc.getMetadata().get("caseName").toString())
.build())
.collect(Collectors.toList());
chatPrecedentRepository.saveAll(chatPrecedents);
}

if (similarLawDocuments != null && !similarLawDocuments.isEmpty()) {
List<ChatLaw> chatLaws = similarLawDocuments.stream()
.map(doc -> ChatLaw.builder()
.chatId(chat)
.content(doc.getText())
.lawName(doc.getMetadata().get("lawName").toString())
.build())
.collect(Collectors.toList());
chatLawRepository.saveAll(chatLaws);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.ai.lawyer.domain.kafka.dto;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class ChatPostProcessEvent {
private Long historyId;
private String userMessage;
private String chatResponse;
private List<DocumentDto> similarCaseDocuments;
private List<DocumentDto> similarLawDocuments;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.ai.lawyer.domain.kafka.dto;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.ai.document.Document;

import java.util.Map;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class DocumentDto {
private String text;
private Map<String, Object> metadata;

public static DocumentDto from(Document document) {
return new DocumentDto(document.getText(), document.getMetadata());
}
}
Loading