66import com .ai .lawyer .domain .chatbot .dto .ChatDto .ChatResponse ;
77import com .ai .lawyer .domain .chatbot .entity .History ;
88import com .ai .lawyer .domain .chatbot .repository .HistoryRepository ;
9+ import com .ai .lawyer .infrastructure .kafka .dto .ChatPostProcessEvent ;
10+ import com .ai .lawyer .infrastructure .kafka .dto .DocumentDto ;
11+ import com .ai .lawyer .domain .member .entity .Member ;
12+ import com .ai .lawyer .domain .member .repositories .MemberRepository ;
913import com .ai .lawyer .global .qdrant .service .QdrantService ;
1014import lombok .RequiredArgsConstructor ;
1115import lombok .extern .slf4j .Slf4j ;
2024import org .springframework .ai .chat .prompt .PromptTemplate ;
2125import org .springframework .ai .document .Document ;
2226import org .springframework .beans .factory .annotation .Value ;
27+ import org .springframework .kafka .core .KafkaTemplate ;
2328import org .springframework .stereotype .Service ;
2429import org .springframework .transaction .annotation .Transactional ;
2530import reactor .core .publisher .Flux ;
31+ import reactor .core .publisher .Mono ;
32+ import reactor .core .scheduler .Schedulers ;
2633
2734import java .util .HashMap ;
2835import java .util .List ;
@@ -37,48 +44,78 @@ public class ChatBotService {
3744 private final ChatClient chatClient ;
3845 private final QdrantService qdrantService ;
3946 private final HistoryService historyService ;
40- private final AsyncPostChatProcessingService asyncPostChatProcessingService ;
41-
47+ private final MemberRepository memberRepository ;
4248 private final HistoryRepository historyRepository ;
4349 private final ChatMemoryRepository chatMemoryRepository ;
4450
51+ // KafkaTemplate 주입
52+ private final KafkaTemplate <String , ChatPostProcessEvent > kafkaTemplate ;
53+
4554 @ Value ("${custom.ai.system-message}" )
4655 private String systemMessageTemplate ;
4756
57+ // Kafka 토픽 이름 -> 추후 application.yml로 이동 고려
58+ private static final String POST_PROCESSING_TOPIC = "chat-post-processing" ;
59+
4860 // 핵심 로직
4961 // 멤버 조회 -> 벡터 검색 -> 프롬프트 생성 -> LLM 호출 (스트림) -> Kafka 이벤트 발행 -> 응답 반환
5062 @ Transactional
5163 public Flux <ChatResponse > sendMessage (Long memberId , ChatRequest chatRequestDto , Long roomId ) {
64+ return Mono .fromCallable (() -> {
5265
53- // 벡터 검색 (판례, 법령)
54- List <Document > similarCaseDocuments = qdrantService .searchDocument (chatRequestDto .getMessage (), "type" , "판례" );
55- List <Document > similarLawDocuments = qdrantService .searchDocument (chatRequestDto .getMessage (), "type" , "법령" );
66+ // 벡터 검색 (판례, 법령) (블로킹 )
67+ List <Document > similarCaseDocuments = qdrantService .searchDocument (chatRequestDto .getMessage (), "type" , "판례" );
68+ List <Document > similarLawDocuments = qdrantService .searchDocument (chatRequestDto .getMessage (), "type" , "법령" );
5669
57- String caseContext = formatting (similarCaseDocuments );
58- String lawContext = formatting (similarLawDocuments );
70+ String caseContext = formatting (similarCaseDocuments );
71+ String lawContext = formatting (similarLawDocuments );
5972
60- // 채팅방 조회 또는 생성
61- History history = getOrCreateRoom (memberId , roomId );
73+ // 채팅방 조회 또는 생성 (블로킹)
74+ History history = getOrCreateRoom (memberId , roomId );
6275
63- // 메시지 기억 관리
64- ChatMemory chatMemory = saveChatMemory (chatRequestDto , history );
76+ // 메시지 기억 관리 (User 메시지 추가)
77+ ChatMemory chatMemory = saveChatMemory (chatRequestDto , history );
6578
66- // 프롬프트 생성
67- Prompt prompt = getPrompt (caseContext , lawContext , chatMemory , history );
79+ // 프롬프트 생성
80+ Prompt prompt = getPrompt (caseContext , lawContext , chatMemory , history );
6881
69- return chatClient .prompt (prompt )
82+ // 준비된 데이터를 담은 컨텍스트 객체 반환
83+ return new PreparedChatContext (prompt , history , similarCaseDocuments , similarLawDocuments );
84+ })
85+ .subscribeOn (Schedulers .boundedElastic ()) // 블로킹 작업을 별도 스레드에서 실행
86+ .flatMapMany (context -> {
87+ // LLM 스트리밍 호출 및 클라이언트에게 즉시 응답
88+ return chatClient .prompt (context .prompt )
7089 .stream ()
7190 .content ()
7291 .collectList ()
7392 .map (fullResponseList -> String .join ("" , fullResponseList ))
74- .doOnNext (fullResponse -> asyncPostChatProcessingService .processHandlerTasks (history .getHistoryId (), chatRequestDto .getMessage (), fullResponse , similarCaseDocuments , similarLawDocuments ))
75- .map (fullResponse -> createChatResponse (history , fullResponse , similarCaseDocuments , similarLawDocuments ))
93+ .doOnNext (fullResponse -> {
94+
95+ // Document를 DTO로 변환
96+ List <DocumentDto > caseDtos = context .similarCaseDocuments .stream ().map (DocumentDto ::from ).collect (Collectors .toList ());
97+ List <DocumentDto > lawDtos = context .similarLawDocuments .stream ().map (DocumentDto ::from ).collect (Collectors .toList ());
98+
99+ // Kafka로 보낼 이벤트 객체
100+ ChatPostProcessEvent event = new ChatPostProcessEvent (
101+ context .history .getHistoryId (),
102+ chatRequestDto .getMessage (),
103+ fullResponse ,
104+ caseDtos ,
105+ lawDtos
106+ );
107+
108+ // Kafka 이벤트 발행
109+ kafkaTemplate .send (POST_PROCESSING_TOPIC , event );
110+
111+ })
112+ .map (fullResponse -> createChatResponse (context .history , fullResponse , context .similarCaseDocuments , context .similarLawDocuments ))
76113 .flux ()
77114 .onErrorResume (throwable -> {
78- log .error ("스트리밍 처리 중 에러 발생 (historyId: {})" , history .getHistoryId (), throwable );
79- return Flux .just (handleError (history ));
115+ log .error ("스트리밍 처리 중 에러 발생 (historyId: {})" , context . history .getHistoryId (), throwable );
116+ return Flux .just (handleError (context . history ));
80117 });
81-
118+ });
82119 }
83120
84121 private ChatResponse createChatResponse (History history , String fullResponse , List <Document > cases , List <Document > laws ) {
@@ -148,4 +185,23 @@ private ChatResponse handleError(History history) {
148185 .build ();
149186 }
150187
188+ /**
189+ * 블로킹 작업에서 준비된 데이터를 담는 컨텍스트 클래스
190+ * 리액티브 체인에서 데이터를 전달하기 위한 내부 클래스
191+ */
192+ private static class PreparedChatContext {
193+ final Prompt prompt ;
194+ final History history ;
195+ final List <Document > similarCaseDocuments ;
196+ final List <Document > similarLawDocuments ;
197+
198+ PreparedChatContext (Prompt prompt , History history ,
199+ List <Document > similarCaseDocuments ,
200+ List <Document > similarLawDocuments ) {
201+ this .prompt = prompt ;
202+ this .history = history ;
203+ this .similarCaseDocuments = similarCaseDocuments ;
204+ this .similarLawDocuments = similarLawDocuments ;
205+ }
206+ }
151207}
0 commit comments