diff --git a/backend/build.gradle b/backend/build.gradle index ee05421..ece81c8 100644 --- a/backend/build.gradle +++ b/backend/build.gradle @@ -41,6 +41,8 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-oauth2-client' implementation group: 'org.springframework.boot', name: 'spring-boot-starter-mail', version: '3.0.5' implementation 'org.springframework.boot:spring-boot-starter-batch' + implementation 'org.springframework.kafka:spring-kafka' + testImplementation 'org.springframework.kafka:spring-kafka-test' // API Documentation (문서화) implementation 'org.apache.commons:commons-lang3:3.18.0' diff --git a/backend/docker-compose.yml b/backend/docker-compose.yml index 6726a09..60c3af7 100644 --- a/backend/docker-compose.yml +++ b/backend/docker-compose.yml @@ -17,14 +17,14 @@ services: volumes: - mysql-data:/var/lib/mysql - ./sql:/sql # 호스트 ./sql 폴더를 컨테이너 /sql에 마운트 - command: > + command: > --character-set-server=utf8mb4 --collation-server=utf8mb4_0900_ai_ci --default-time-zone=Asia/Seoul --skip-log-bin --lower-case-table-names=1 healthcheck: - test: ["CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-p${DEV_DATASOURCE_PASSWORD}"] + test: [ "CMD", "mysqladmin", "ping", "-h", "127.0.0.1", "-p${DEV_DATASOURCE_PASSWORD}" ] interval: 10s timeout: 5s retries: 10 @@ -40,7 +40,7 @@ services: command: > redis-server --appendonly yes healthcheck: - test: ["CMD", "redis-cli", "ping"] + test: [ "CMD", "redis-cli", "ping" ] interval: 10s timeout: 5s retries: 10 @@ -78,8 +78,40 @@ services: timeout: 5s retries: 10 + zookeeper: + image: confluentinc/cp-zookeeper:7.4.4 + container_name: zookeeper + restart: unless-stopped + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka: + image: confluentinc/cp-kafka:7.4.4 + container_name: kafka + restart: unless-stopped + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092 + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + volumes: + - kafka-data:/var/lib/kafka/data + volumes: mysql-data: redis-data: qdrant-data: - ollama-data: \ No newline at end of file + ollama-data: + kafka-data: \ No newline at end of file diff --git a/backend/src/main/java/com/ai/lawyer/domain/chatbot/service/AsyncPostChatProcessingService.java b/backend/src/main/java/com/ai/lawyer/domain/chatbot/service/AsyncPostChatProcessingService.java index 419371a..df6efef 100644 --- a/backend/src/main/java/com/ai/lawyer/domain/chatbot/service/AsyncPostChatProcessingService.java +++ b/backend/src/main/java/com/ai/lawyer/domain/chatbot/service/AsyncPostChatProcessingService.java @@ -13,13 +13,14 @@ import org.springframework.ai.chat.messages.MessageType; import org.springframework.ai.document.Document; import org.springframework.beans.factory.annotation.Value; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.List; import java.util.stream.Collectors; +// 더이상 사용 안함 +// 테스트 용도로 남겨둠 @Slf4j @Service @RequiredArgsConstructor @@ -38,7 +39,7 @@ public class AsyncPostChatProcessingService { @Value("{$custom.ai.keyword-extraction}") private String keywordExtraction; - @Async + //@Async @Transactional public void processHandlerTasks(Long historyId, String userMessage, String fullResponse, List similarCaseDocuments, List similarLawDocuments) { try { @@ -74,7 +75,7 @@ private void setHistoryTitle(String userMessage, History history, String fullRes String targetText = fullResponse.contains("해당 질문은 법률") ? userMessage : fullResponse; TitleExtractionDto titleDto = keywordService.keywordExtract(targetText, titleExtraction, TitleExtractionDto.class); history.setTitle(titleDto.getTitle()); - historyRepository.save(history); // @Transactional 어노테이션으로 인해 메소드 종료 시 자동 저장되지만, 명시적으로 호출할 수도 있습니다. + historyRepository.save(history); } private void extractAndUpdateKeywordRanks(String message) { diff --git a/backend/src/main/java/com/ai/lawyer/domain/chatbot/service/ChatBotService.java b/backend/src/main/java/com/ai/lawyer/domain/chatbot/service/ChatBotService.java index 928e8d6..7231c9d 100644 --- a/backend/src/main/java/com/ai/lawyer/domain/chatbot/service/ChatBotService.java +++ b/backend/src/main/java/com/ai/lawyer/domain/chatbot/service/ChatBotService.java @@ -6,6 +6,8 @@ import com.ai.lawyer.domain.chatbot.dto.ChatDto.ChatResponse; import com.ai.lawyer.domain.chatbot.entity.History; import com.ai.lawyer.domain.chatbot.repository.HistoryRepository; +import com.ai.lawyer.domain.kafka.dto.ChatPostProcessEvent; +import com.ai.lawyer.domain.kafka.dto.DocumentDto; import com.ai.lawyer.domain.member.entity.Member; import com.ai.lawyer.domain.member.repositories.MemberRepository; import com.ai.lawyer.global.qdrant.service.QdrantService; @@ -22,6 +24,7 @@ import org.springframework.ai.chat.prompt.PromptTemplate; import org.springframework.ai.document.Document; import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import reactor.core.publisher.Flux; @@ -39,18 +42,21 @@ public class ChatBotService { private final ChatClient chatClient; private final QdrantService qdrantService; private final HistoryService historyService; - - private final AsyncPostChatProcessingService asyncPostChatProcessingService; - private final MemberRepository memberRepository; private final HistoryRepository historyRepository; private final ChatMemoryRepository chatMemoryRepository; + // KafkaTemplate 주입 + private final KafkaTemplate kafkaTemplate; + @Value("${custom.ai.system-message}") private String systemMessageTemplate; + // Kafka 토픽 이름 -> 추후 application.yml로 이동 고려 + private static final String POST_PROCESSING_TOPIC = "chat-post-processing"; + // 핵심 로직 - // 멤버 조회 -> 벡터 검색 -> 프롬프트 생성 -> LLM 호출 (스트림) -> (비동기 후처리) -> 응답 반환 + // 멤버 조회 -> 벡터 검색 -> 프롬프트 생성 -> LLM 호출 (스트림) -> Kafka 이벤트 발행 -> 응답 반환 @Transactional public Flux sendMessage(Long memberId, ChatRequest chatRequestDto, Long roomId) { @@ -79,10 +85,31 @@ public Flux sendMessage(Long memberId, ChatRequest chatRequestDto, .content() .collectList() .map(fullResponseList -> String.join("", fullResponseList)) - .doOnNext(fullResponse -> asyncPostChatProcessingService.processHandlerTasks(history.getHistoryId(), chatRequestDto.getMessage(), fullResponse, similarCaseDocuments, similarLawDocuments)) // 비동기 후처리 + .doOnNext(fullResponse -> { + + // Document를 DTO로 변환 + List caseDtos = similarCaseDocuments.stream().map(DocumentDto::from).collect(Collectors.toList()); + List lawDtos = similarLawDocuments.stream().map(DocumentDto::from).collect(Collectors.toList()); + + // Kafka로 보낼 이벤트 객체 + ChatPostProcessEvent event = new ChatPostProcessEvent( + history.getHistoryId(), + chatRequestDto.getMessage(), + fullResponse, + caseDtos, + lawDtos + ); + + // Kafka 이벤트 발행 + kafkaTemplate.send(POST_PROCESSING_TOPIC, event); + + }) .map(fullResponse -> createChatResponse(history, fullResponse, similarCaseDocuments, similarLawDocuments)) .flux() - .onErrorResume(throwable -> Flux.just(handleError(history))); + .onErrorResume(throwable -> { + log.error("스트리밍 처리 중 에러 발생 (historyId: {})", history.getHistoryId(), throwable); + return Flux.just(handleError(history)); + }); } private ChatResponse createChatResponse(History history, String fullResponse, List cases, List laws) { @@ -111,7 +138,7 @@ private ChatMemory saveChatMemory(ChatRequest chatRequestDto, History history) { .maxMessages(10) .chatMemoryRepository(chatMemoryRepository) .build(); - // 사용자 메시지를 메모리에 추가 -> ai 답변은 비동기 후처리에서 추가 + // 사용자 메시지를 메모리에 추가 -> ai 답변은 Consumer에서 추가 chatMemory.add(String.valueOf(history.getHistoryId()), new UserMessage(chatRequestDto.getMessage())); return chatMemory; } diff --git a/backend/src/main/java/com/ai/lawyer/domain/kafka/consumer/ChatPostProcessingConsumer.java b/backend/src/main/java/com/ai/lawyer/domain/kafka/consumer/ChatPostProcessingConsumer.java new file mode 100644 index 0000000..cc9f271 --- /dev/null +++ b/backend/src/main/java/com/ai/lawyer/domain/kafka/consumer/ChatPostProcessingConsumer.java @@ -0,0 +1,134 @@ +package com.ai.lawyer.domain.kafka.consumer; + +import com.ai.lawyer.domain.chatbot.dto.ExtractionDto.KeywordExtractionDto; +import com.ai.lawyer.domain.chatbot.dto.ExtractionDto.TitleExtractionDto; +import com.ai.lawyer.domain.chatbot.entity.*; +import com.ai.lawyer.domain.chatbot.repository.*; +import com.ai.lawyer.domain.chatbot.service.KeywordService; +import com.ai.lawyer.domain.kafka.dto.ChatPostProcessEvent; +import com.ai.lawyer.domain.kafka.dto.DocumentDto; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.ai.chat.memory.ChatMemory; +import org.springframework.ai.chat.memory.ChatMemoryRepository; +import org.springframework.ai.chat.memory.MessageWindowChatMemory; +import org.springframework.ai.chat.messages.AssistantMessage; +import org.springframework.ai.chat.messages.MessageType; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +import java.util.List; +import java.util.stream.Collectors; + +@Slf4j +@Service +@RequiredArgsConstructor +public class ChatPostProcessingConsumer { + + private final KeywordService keywordService; + private final HistoryRepository historyRepository; + private final ChatRepository chatRepository; + private final KeywordRankRepository keywordRankRepository; + private final ChatMemoryRepository chatMemoryRepository; + private final ChatPrecedentRepository chatPrecedentRepository; + private final ChatLawRepository chatLawRepository; + + @Value("${custom.ai.title-extraction}") + private String titleExtraction; + @Value("${custom.ai.keyword-extraction}") + private String keywordExtraction; + + @KafkaListener(topics = "chat-post-processing", groupId = "chat-processing-group") + @Transactional + public void consume(ChatPostProcessEvent event) { + try { + History history = historyRepository.findById(event.getHistoryId()) + .orElseThrow(() -> new IllegalArgumentException("존재하지 않는 채팅방입니다. historyId: " + event.getHistoryId())); + + // 1. 메시지 기억 저장 (Assistant 응답) + ChatMemory chatMemory = MessageWindowChatMemory.builder() + .maxMessages(10) + .chatMemoryRepository(chatMemoryRepository) + .build(); + + chatMemory.add(String.valueOf(history.getHistoryId()), new AssistantMessage(event.getChatResponse())); + chatMemoryRepository.saveAll(String.valueOf(history.getHistoryId()), chatMemory.get(String.valueOf(history.getHistoryId()))); + + // 2. 채팅방 제목 설정 / 및 필터 + setHistoryTitle(event.getUserMessage(), history, event.getChatResponse()); + + // 3. 채팅 기록 저장 + saveChatWithDocuments(history, MessageType.USER, event.getUserMessage(), event.getSimilarCaseDocuments(), event.getSimilarLawDocuments()); + saveChatWithDocuments(history, MessageType.ASSISTANT, event.getChatResponse(), event.getSimilarCaseDocuments(), event.getSimilarLawDocuments()); + + // 4. 키워드 추출 및 랭킹 업데이트 + if (!event.getChatResponse().contains("해당 질문은 법률")) { + extractAndUpdateKeywordRanks(event.getUserMessage()); + } + } catch (Exception e) { + log.error("Kafka 이벤트 처리 중 에러 발생 (historyId: {}): ", event.getHistoryId(), e); + } + } + + private void setHistoryTitle(String userMessage, History history, String fullResponse) { + String targetText = fullResponse.contains("해당 질문은 법률") ? userMessage : fullResponse; + TitleExtractionDto titleDto = keywordService.keywordExtract(targetText, titleExtraction, TitleExtractionDto.class); + history.setTitle(titleDto.getTitle()); + historyRepository.save(history); + } + + private void extractAndUpdateKeywordRanks(String message) { + KeywordExtractionDto keywordResponse = keywordService.keywordExtract(message, keywordExtraction, KeywordExtractionDto.class); + if (keywordResponse == null || keywordResponse.getKeyword() == null) { + return; + } + + KeywordRank keywordRank = keywordRankRepository.findByKeyword(keywordResponse.getKeyword()); + + if (keywordRank == null) { + keywordRank = KeywordRank.builder() + .keyword(keywordResponse.getKeyword()) + .score(1L) + .build(); + } else { + keywordRank.setScore(keywordRank.getScore() + 1); + } + keywordRankRepository.save(keywordRank); + } + + private void saveChatWithDocuments(History history, MessageType type, String message, List similarCaseDocuments, List similarLawDocuments) { + Chat chat = chatRepository.save(Chat.builder() + .historyId(history) + .type(type) + .message(message) + .build()); + + // Ai 메시지가 저장될 때 관련 문서 저장 + if (type == MessageType.ASSISTANT) { + if (similarCaseDocuments != null && !similarCaseDocuments.isEmpty()) { + List chatPrecedents = similarCaseDocuments.stream() + .map(doc -> ChatPrecedent.builder() + .chatId(chat) + .precedentContent(doc.getText()) + .caseNumber(doc.getMetadata().get("caseNumber").toString()) + .caseName(doc.getMetadata().get("caseName").toString()) + .build()) + .collect(Collectors.toList()); + chatPrecedentRepository.saveAll(chatPrecedents); + } + + if (similarLawDocuments != null && !similarLawDocuments.isEmpty()) { + List chatLaws = similarLawDocuments.stream() + .map(doc -> ChatLaw.builder() + .chatId(chat) + .content(doc.getText()) + .lawName(doc.getMetadata().get("lawName").toString()) + .build()) + .collect(Collectors.toList()); + chatLawRepository.saveAll(chatLaws); + } + } + } +} \ No newline at end of file diff --git a/backend/src/main/java/com/ai/lawyer/domain/kafka/dto/ChatPostProcessEvent.java b/backend/src/main/java/com/ai/lawyer/domain/kafka/dto/ChatPostProcessEvent.java new file mode 100644 index 0000000..812a0cb --- /dev/null +++ b/backend/src/main/java/com/ai/lawyer/domain/kafka/dto/ChatPostProcessEvent.java @@ -0,0 +1,18 @@ +package com.ai.lawyer.domain.kafka.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class ChatPostProcessEvent { + private Long historyId; + private String userMessage; + private String chatResponse; + private List similarCaseDocuments; + private List similarLawDocuments; +} diff --git a/backend/src/main/java/com/ai/lawyer/domain/kafka/dto/DocumentDto.java b/backend/src/main/java/com/ai/lawyer/domain/kafka/dto/DocumentDto.java new file mode 100644 index 0000000..88d984e --- /dev/null +++ b/backend/src/main/java/com/ai/lawyer/domain/kafka/dto/DocumentDto.java @@ -0,0 +1,20 @@ +package com.ai.lawyer.domain.kafka.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.springframework.ai.document.Document; + +import java.util.Map; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class DocumentDto { + private String text; + private Map metadata; + + public static DocumentDto from(Document document) { + return new DocumentDto(document.getText(), document.getMetadata()); + } +} \ No newline at end of file diff --git a/backend/src/main/resources/application-dev.yml b/backend/src/main/resources/application-dev.yml index 9320def..ff9d6f6 100644 --- a/backend/src/main/resources/application-dev.yml +++ b/backend/src/main/resources/application-dev.yml @@ -17,12 +17,32 @@ spring: password: ${DEV_REDIS_PASSWORD} embedded: false + kafka: + bootstrap-servers: localhost:9092 + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + consumer: + group-id: chat-processing-group # 컨슈머 그룹 ID + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + properties: + spring.json.trusted.packages: "*" + batch: job: enabled: false # 최소 한번 시작 jdbc: initialize-schema: always + task: + scheduling: + shutdown: + await-termination: true + await-termination-period: 60s + lifecycle: + timeout-per-shutdown-phase: 60s + datasource-meta: driver-class-name: com.mysql.cj.jdbc.Driver jdbc-url: jdbc:mysql://localhost:3306/meta_db?useSSL=false&useUnicode=true&serverTimezone=Asia/Seoul&allowPublicKeyRetrieval=true&createDatabaseIfNotExist=true diff --git a/backend/src/main/resources/system-prompt.yml b/backend/src/main/resources/system-prompt.yml index 91139cf..b48819a 100644 --- a/backend/src/main/resources/system-prompt.yml +++ b/backend/src/main/resources/system-prompt.yml @@ -40,7 +40,7 @@ custom: [ { "id": 1, - "keyword": "폭행" + "keyword": "층간소음 폭행" "socre": 1 } ]