Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@ public class ChatBotController {
private final ChatBotService chatBotService;

@Operation(summary = "01. 새로운 채팅", description = "첫 메시지 전송으로 새로운 채팅방을 생성하고 챗봇과 대화를 시작")
@PostMapping(value = "/message", produces = "application/stream+json")
@PostMapping(value = "/message")
public Flux<ChatResponse> postNewMessage(@RequestBody ChatRequest chatRequest) {
// SecurityContext에서 memberId를 미리 추출 (컨트롤러 진입 시점)
Long memberId = AuthUtil.getAuthenticatedMemberId();
if (memberId == null) {
throw new IllegalStateException("인증된 사용자가 아닙니다.");
}
log.info("새로운 채팅 요청: memberId={}", memberId);

// memberId를 Flux에 전달 (SecurityContext 전파 문제 방지)
return chatBotService.sendMessage(memberId, chatRequest, null);
}

@Operation(summary = "02. 기존 채팅", description = "기존 채팅방에 메시지를 보내고 챗봇과 대화를 이어감")
@PostMapping(value = "{roomId}/message", produces = "application/stream+json")
@PostMapping(value = "{roomId}/message")
public Flux<ChatResponse> postMessage(
@RequestBody ChatRequest chatRequest,
@PathVariable(value = "roomId", required = false) Long roomId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@
import com.ai.lawyer.domain.chatbot.dto.ChatDto.ChatResponse;
import com.ai.lawyer.domain.chatbot.entity.History;
import com.ai.lawyer.domain.chatbot.repository.HistoryRepository;
import com.ai.lawyer.global.qdrant.service.QdrantService;
import com.ai.lawyer.infrastructure.kafka.dto.ChatPostProcessEvent;
import com.ai.lawyer.infrastructure.kafka.dto.DocumentDto;
import com.ai.lawyer.domain.member.entity.Member;
import com.ai.lawyer.domain.member.repositories.MemberRepository;
import com.ai.lawyer.global.qdrant.service.QdrantService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
Expand All @@ -28,8 +26,6 @@
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.HashMap;
import java.util.List;
Expand All @@ -44,7 +40,6 @@ public class ChatBotService {
private final ChatClient chatClient;
private final QdrantService qdrantService;
private final HistoryService historyService;
private final MemberRepository memberRepository;
private final HistoryRepository historyRepository;
private final ChatMemoryRepository chatMemoryRepository;

Expand All @@ -61,44 +56,40 @@ public class ChatBotService {
// 멤버 조회 -> 벡터 검색 -> 프롬프트 생성 -> LLM 호출 (스트림) -> Kafka 이벤트 발행 -> 응답 반환
@Transactional
public Flux<ChatResponse> sendMessage(Long memberId, ChatRequest chatRequestDto, Long roomId) {
return Mono.fromCallable(() -> {

// 벡터 검색 (판례, 법령) (블로킹)
List<Document> similarCaseDocuments = qdrantService.searchDocument(chatRequestDto.getMessage(), "type", "판례");
List<Document> similarLawDocuments = qdrantService.searchDocument(chatRequestDto.getMessage(), "type", "법령");
// 벡터 검색 (판례, 법령) (블로킹)
List<Document> similarCaseDocuments = qdrantService.searchDocument(chatRequestDto.getMessage(), "type", "판례");
List<Document> similarLawDocuments = qdrantService.searchDocument(chatRequestDto.getMessage(), "type", "법령");

String caseContext = formatting(similarCaseDocuments);
String lawContext = formatting(similarLawDocuments);
String caseContext = formatting(similarCaseDocuments);
String lawContext = formatting(similarLawDocuments);

// 채팅방 조회 또는 생성 (블로킹)
History history = getOrCreateRoom(memberId, roomId);
// 채팅방 조회 또는 생성 (블로킹)
History history = getOrCreateRoom(memberId, roomId);

// 메시지 기억 관리 (User 메시지 추가)
ChatMemory chatMemory = saveChatMemory(chatRequestDto, history);
// 메시지 기억 관리 (User 메시지 추가)
ChatMemory chatMemory = saveChatMemory(chatRequestDto, history);

// 프롬프트 생성
Prompt prompt = getPrompt(caseContext, lawContext, chatMemory, history);
// 프롬프트 생성
Prompt prompt = getPrompt(caseContext, lawContext, chatMemory, history);

// 준비된 데이터를 담은 컨텍스트 객체 반환
return new PreparedChatContext(prompt, history, similarCaseDocuments, similarLawDocuments);
})
.subscribeOn(Schedulers.boundedElastic()) // 블로킹 작업을 별도 스레드에서 실행
.flatMapMany(context -> {
// LLM 스트리밍 호출 및 클라이언트에게 즉시 응답
return chatClient.prompt(context.prompt)
// 준비된 데이터를 담은 컨텍스트 객체 반환
//return new PreparedChatContext(prompt, history, similarCaseDocuments, similarLawDocuments);

return chatClient.prompt(prompt)
.stream()
.content()
.collectList()
.map(fullResponseList -> String.join("", fullResponseList))
.doOnNext(fullResponse -> {

// Document를 DTO로 변환
List<DocumentDto> caseDtos = context.similarCaseDocuments.stream().map(DocumentDto::from).collect(Collectors.toList());
List<DocumentDto> lawDtos = context.similarLawDocuments.stream().map(DocumentDto::from).collect(Collectors.toList());
List<DocumentDto> caseDtos = similarCaseDocuments.stream().map(DocumentDto::from).collect(Collectors.toList());
List<DocumentDto> lawDtos = similarLawDocuments.stream().map(DocumentDto::from).collect(Collectors.toList());

// Kafka로 보낼 이벤트 객체
ChatPostProcessEvent event = new ChatPostProcessEvent(
context.history.getHistoryId(),
history.getHistoryId(),
chatRequestDto.getMessage(),
fullResponse,
caseDtos,
Expand All @@ -109,13 +100,13 @@ public Flux<ChatResponse> sendMessage(Long memberId, ChatRequest chatRequestDto,
kafkaTemplate.send(POST_PROCESSING_TOPIC, event);

})
.map(fullResponse -> createChatResponse(context.history, fullResponse, context.similarCaseDocuments, context.similarLawDocuments))
.map(fullResponse -> createChatResponse(history, fullResponse, similarCaseDocuments, similarLawDocuments))
.flux()
.onErrorResume(throwable -> {
log.error("스트리밍 처리 중 에러 발생 (historyId: {})", context.history.getHistoryId(), throwable);
return Flux.just(handleError(context.history));
log.error("스트리밍 처리 중 에러 발생 (historyId: {})", history.getHistoryId(), throwable);
return Flux.just(handleError(history));
});
});

}

private ChatResponse createChatResponse(History history, String fullResponse, List<Document> cases, List<Document> laws) {
Expand Down Expand Up @@ -196,8 +187,8 @@ private static class PreparedChatContext {
final List<Document> similarLawDocuments;

PreparedChatContext(Prompt prompt, History history,
List<Document> similarCaseDocuments,
List<Document> similarLawDocuments) {
List<Document> similarCaseDocuments,
List<Document> similarLawDocuments) {
this.prompt = prompt;
this.history = history;
this.similarCaseDocuments = similarCaseDocuments;
Expand Down