diff --git a/backend/build.gradle b/backend/build.gradle index 008907e..615add1 100644 --- a/backend/build.gradle +++ b/backend/build.gradle @@ -41,7 +41,6 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-oauth2-client' implementation group: 'org.springframework.boot', name: 'spring-boot-starter-mail', version: '3.0.5' implementation 'org.springframework.boot:spring-boot-starter-batch' - implementation 'org.springframework.kafka:spring-kafka' testImplementation 'org.springframework.kafka:spring-kafka-test' // API Documentation (문서화) diff --git a/backend/docker-compose.yml b/backend/docker-compose.yml index 3dc0af5..d0204b3 100644 --- a/backend/docker-compose.yml +++ b/backend/docker-compose.yml @@ -82,40 +82,8 @@ services: timeout: 5s retries: 10 - zookeeper: - image: confluentinc/cp-zookeeper:7.8.0 - container_name: zookeeper - restart: unless-stopped - ports: - - "2181:2181" - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 - - kafka: - image: confluentinc/cp-kafka:7.8.0 - container_name: kafka - restart: unless-stopped - depends_on: - - zookeeper - ports: - - "9092:9092" - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT - KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092 - KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092 - KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - volumes: - - kafka-data:/var/lib/kafka/data - volumes: mysql-data: redis-data: qdrant-data: - ollama-data: - kafka-data: \ No newline at end of file + ollama-data: \ No newline at end of file diff --git a/backend/src/main/java/com/ai/lawyer/domain/chatbot/controller/ChatBotController.java b/backend/src/main/java/com/ai/lawyer/domain/chatbot/controller/ChatBotController.java index d456726..554d36b 100644 --- a/backend/src/main/java/com/ai/lawyer/domain/chatbot/controller/ChatBotController.java +++ b/backend/src/main/java/com/ai/lawyer/domain/chatbot/controller/ChatBotController.java @@ -23,14 +23,11 @@ public class ChatBotController { @Operation(summary = "01. 새로운 채팅", description = "첫 메시지 전송으로 새로운 채팅방을 생성하고 챗봇과 대화를 시작") @PostMapping(value = "/message") public Flux postNewMessage(@RequestBody ChatRequest chatRequest) { - // SecurityContext에서 memberId를 미리 추출 (컨트롤러 진입 시점) + Long memberId = AuthUtil.getAuthenticatedMemberId(); - if (memberId == null) { - throw new IllegalStateException("인증된 사용자가 아닙니다."); - } + log.info("새로운 채팅 요청: memberId={}", memberId); - // memberId를 Flux에 전달 (SecurityContext 전파 문제 방지) return chatBotService.sendMessage(memberId, chatRequest, null); } @@ -39,11 +36,11 @@ public Flux postNewMessage(@RequestBody ChatRequest chatRequest) { public Flux postMessage( @RequestBody ChatRequest chatRequest, @PathVariable(value = "roomId", required = false) Long roomId) { - // SecurityContext에서 memberId를 미리 추출 (컨트롤러 진입 시점) + Long memberId = AuthUtil.getAuthenticatedMemberId(); + log.info("기존 채팅 요청: memberId={}, roomId={}", memberId, roomId); - // memberId를 Flux에 전달 (SecurityContext 전파 문제 방지) return chatBotService.sendMessage(memberId, chatRequest, roomId); } diff --git a/backend/src/main/java/com/ai/lawyer/domain/chatbot/service/AsyncPostChatProcessingService.java b/backend/src/main/java/com/ai/lawyer/domain/chatbot/service/AsyncPostChatProcessingService.java index df6efef..4cf9f06 100644 --- a/backend/src/main/java/com/ai/lawyer/domain/chatbot/service/AsyncPostChatProcessingService.java +++ b/backend/src/main/java/com/ai/lawyer/domain/chatbot/service/AsyncPostChatProcessingService.java @@ -1,9 +1,13 @@ package com.ai.lawyer.domain.chatbot.service; +import com.ai.lawyer.domain.chatbot.dto.ChatDto.ChatHistoryDto; +import com.ai.lawyer.domain.chatbot.dto.ChatDto.ChatLawDto; +import com.ai.lawyer.domain.chatbot.dto.ChatDto.ChatPrecedentDto; import com.ai.lawyer.domain.chatbot.dto.ExtractionDto.KeywordExtractionDto; import com.ai.lawyer.domain.chatbot.dto.ExtractionDto.TitleExtractionDto; import com.ai.lawyer.domain.chatbot.entity.*; import com.ai.lawyer.domain.chatbot.repository.*; +import com.ai.lawyer.infrastructure.redis.service.ChatCacheService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.ai.chat.memory.ChatMemory; @@ -16,6 +20,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -27,6 +32,8 @@ public class AsyncPostChatProcessingService { private final KeywordService keywordService; + private final ChatCacheService chatCacheService; + private final HistoryRepository historyRepository; private final ChatRepository chatRepository; private final KeywordRankRepository keywordRankRepository; @@ -98,6 +105,9 @@ private void extractAndUpdateKeywordRanks(String message) { } private void saveChatWithDocuments(History history, MessageType type, String message, List similarCaseDocuments, List similarLawDocuments) { + List chatPrecedents = new ArrayList<>(); + List chatLaws = new ArrayList<>(); + Chat chat = chatRepository.save(Chat.builder() .historyId(history) .type(type) @@ -107,7 +117,7 @@ private void saveChatWithDocuments(History history, MessageType type, String mes // Ai 메시지가 저장될 때 관련 문서 저장 if (type == MessageType.ASSISTANT) { if (similarCaseDocuments != null && !similarCaseDocuments.isEmpty()) { - List chatPrecedents = similarCaseDocuments.stream() + chatPrecedents = similarCaseDocuments.stream() .map(doc -> ChatPrecedent.builder() .chatId(chat) .precedentContent(doc.getText()) @@ -119,7 +129,7 @@ private void saveChatWithDocuments(History history, MessageType type, String mes } if (similarLawDocuments != null && !similarLawDocuments.isEmpty()) { - List chatLaws = similarLawDocuments.stream() + chatLaws = similarLawDocuments.stream() .map(doc -> ChatLaw.builder() .chatId(chat) .content(doc.getText()) @@ -129,5 +139,16 @@ private void saveChatWithDocuments(History history, MessageType type, String mes chatLawRepository.saveAll(chatLaws); } } + + // Redis 캐시에 DTO 저장 + ChatHistoryDto dto = ChatHistoryDto.builder() + .type(type.toString()) + .message(message) + .createdAt(chat.getCreatedAt()) + .precedent(chatPrecedents.isEmpty() ? null : ChatPrecedentDto.from(chatPrecedents.get(0))) + .law(chatLaws.isEmpty() ? null : ChatLawDto.from(chatLaws.get(0))) + .build(); + + chatCacheService.cacheChatMessage(history.getHistoryId(), dto); } } \ No newline at end of file diff --git a/backend/src/main/java/com/ai/lawyer/domain/chatbot/service/ChatBotService.java b/backend/src/main/java/com/ai/lawyer/domain/chatbot/service/ChatBotService.java index d319345..97481bd 100644 --- a/backend/src/main/java/com/ai/lawyer/domain/chatbot/service/ChatBotService.java +++ b/backend/src/main/java/com/ai/lawyer/domain/chatbot/service/ChatBotService.java @@ -7,8 +7,6 @@ import com.ai.lawyer.domain.chatbot.entity.History; import com.ai.lawyer.domain.chatbot.repository.HistoryRepository; import com.ai.lawyer.global.qdrant.service.QdrantService; -import com.ai.lawyer.infrastructure.kafka.dto.ChatPostProcessEvent; -import com.ai.lawyer.infrastructure.kafka.dto.DocumentDto; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.ai.chat.client.ChatClient; @@ -22,7 +20,6 @@ import org.springframework.ai.chat.prompt.PromptTemplate; import org.springframework.ai.document.Document; import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import reactor.core.publisher.Flux; @@ -40,66 +37,41 @@ public class ChatBotService { private final ChatClient chatClient; private final QdrantService qdrantService; private final HistoryService historyService; + private final AsyncPostChatProcessingService asyncPostChatProcessingService; + private final HistoryRepository historyRepository; private final ChatMemoryRepository chatMemoryRepository; - // KafkaTemplate 주입 - private final KafkaTemplate kafkaTemplate; - @Value("${custom.ai.system-message}") private String systemMessageTemplate; - // Kafka 토픽 이름 -> 추후 application.yml로 이동 고려 - private static final String POST_PROCESSING_TOPIC = "chat-post-processing"; - // 핵심 로직 // 멤버 조회 -> 벡터 검색 -> 프롬프트 생성 -> LLM 호출 (스트림) -> Kafka 이벤트 발행 -> 응답 반환 @Transactional public Flux sendMessage(Long memberId, ChatRequest chatRequestDto, Long roomId) { - // 벡터 검색 (판례, 법령) (블로킹) + // 벡터 검색 (판례, 법령) List similarCaseDocuments = qdrantService.searchDocument(chatRequestDto.getMessage(), "type", "판례"); List similarLawDocuments = qdrantService.searchDocument(chatRequestDto.getMessage(), "type", "법령"); String caseContext = formatting(similarCaseDocuments); String lawContext = formatting(similarLawDocuments); - // 채팅방 조회 또는 생성 (블로킹) + // 채팅방 조회 또는 생성 History history = getOrCreateRoom(memberId, roomId); - // 메시지 기억 관리 (User 메시지 추가) + // 메시지 기억 관리 ChatMemory chatMemory = saveChatMemory(chatRequestDto, history); // 프롬프트 생성 Prompt prompt = getPrompt(caseContext, lawContext, chatMemory, history); - // 준비된 데이터를 담은 컨텍스트 객체 반환 - //return new PreparedChatContext(prompt, history, similarCaseDocuments, similarLawDocuments); - return chatClient.prompt(prompt) .stream() .content() .collectList() .map(fullResponseList -> String.join("", fullResponseList)) - .doOnNext(fullResponse -> { - - // Document를 DTO로 변환 - List caseDtos = similarCaseDocuments.stream().map(DocumentDto::from).collect(Collectors.toList()); - List lawDtos = similarLawDocuments.stream().map(DocumentDto::from).collect(Collectors.toList()); - - // Kafka로 보낼 이벤트 객체 - ChatPostProcessEvent event = new ChatPostProcessEvent( - history.getHistoryId(), - chatRequestDto.getMessage(), - fullResponse, - caseDtos, - lawDtos - ); - - // Kafka 이벤트 발행 - kafkaTemplate.send(POST_PROCESSING_TOPIC, event); - - }) + .doOnNext(fullResponse -> asyncPostChatProcessingService.processHandlerTasks(history.getHistoryId(), chatRequestDto.getMessage(), fullResponse, similarCaseDocuments, similarLawDocuments)) .map(fullResponse -> createChatResponse(history, fullResponse, similarCaseDocuments, similarLawDocuments)) .flux() .onErrorResume(throwable -> { @@ -176,23 +148,4 @@ private ChatResponse handleError(History history) { .build(); } - /** - * 블로킹 작업에서 준비된 데이터를 담는 컨텍스트 클래스 - * 리액티브 체인에서 데이터를 전달하기 위한 내부 클래스 - */ - private static class PreparedChatContext { - final Prompt prompt; - final History history; - final List similarCaseDocuments; - final List similarLawDocuments; - - PreparedChatContext(Prompt prompt, History history, - List similarCaseDocuments, - List similarLawDocuments) { - this.prompt = prompt; - this.history = history; - this.similarCaseDocuments = similarCaseDocuments; - this.similarLawDocuments = similarLawDocuments; - } - } } \ No newline at end of file diff --git a/backend/src/main/java/com/ai/lawyer/infrastructure/kafka/consumer/ChatPostProcessingConsumer.java b/backend/src/main/java/com/ai/lawyer/infrastructure/kafka/consumer/ChatPostProcessingConsumer.java index e5b53c4..ca54806 100644 --- a/backend/src/main/java/com/ai/lawyer/infrastructure/kafka/consumer/ChatPostProcessingConsumer.java +++ b/backend/src/main/java/com/ai/lawyer/infrastructure/kafka/consumer/ChatPostProcessingConsumer.java @@ -19,7 +19,6 @@ import org.springframework.ai.chat.messages.AssistantMessage; import org.springframework.ai.chat.messages.MessageType; import org.springframework.beans.factory.annotation.Value; -import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -47,7 +46,7 @@ public class ChatPostProcessingConsumer { @Value("${custom.ai.keyword-extraction}") private String keywordExtraction; - @KafkaListener(topics = "chat-post-processing", groupId = "chat-processing-group") + //@KafkaListener(topics = "chat-post-processing", groupId = "chat-processing-group") @Transactional public void consume(ChatPostProcessEvent event) { try { diff --git a/backend/src/main/resources/application-dev.yml b/backend/src/main/resources/application-dev.yml index 7a69790..fb41d61 100644 --- a/backend/src/main/resources/application-dev.yml +++ b/backend/src/main/resources/application-dev.yml @@ -17,18 +17,6 @@ spring: password: ${DEV_REDIS_PASSWORD} embedded: false - kafka: - bootstrap-servers: localhost:9092 - producer: - key-serializer: org.apache.kafka.common.serialization.StringSerializer - value-serializer: org.springframework.kafka.support.serializer.JsonSerializer - consumer: - group-id: chat-processing-group # 컨슈머 그룹 ID - key-deserializer: org.apache.kafka.common.serialization.StringDeserializer - value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer - properties: - spring.json.trusted.packages: "*" - batch: job: enabled: false # 최소 한번 시작