66import com .ai .lawyer .domain .chatbot .dto .ChatDto .ChatResponse ;
77import com .ai .lawyer .domain .chatbot .entity .History ;
88import com .ai .lawyer .domain .chatbot .repository .HistoryRepository ;
9- import com .ai .lawyer .infrastructure .kafka .dto .ChatPostProcessEvent ;
10- import com .ai .lawyer .infrastructure .kafka .dto .DocumentDto ;
11- import com .ai .lawyer .domain .member .entity .Member ;
12- import com .ai .lawyer .domain .member .repositories .MemberRepository ;
139import com .ai .lawyer .global .qdrant .service .QdrantService ;
1410import lombok .RequiredArgsConstructor ;
1511import lombok .extern .slf4j .Slf4j ;
2420import org .springframework .ai .chat .prompt .PromptTemplate ;
2521import org .springframework .ai .document .Document ;
2622import org .springframework .beans .factory .annotation .Value ;
27- import org .springframework .kafka .core .KafkaTemplate ;
2823import org .springframework .stereotype .Service ;
2924import org .springframework .transaction .annotation .Transactional ;
3025import reactor .core .publisher .Flux ;
31- import reactor .core .publisher .Mono ;
32- import reactor .core .scheduler .Schedulers ;
3326
3427import java .util .HashMap ;
3528import java .util .List ;
@@ -44,78 +37,48 @@ public class ChatBotService {
4437 private final ChatClient chatClient ;
4538 private final QdrantService qdrantService ;
4639 private final HistoryService historyService ;
47- private final MemberRepository memberRepository ;
40+ private final AsyncPostChatProcessingService asyncPostChatProcessingService ;
41+
4842 private final HistoryRepository historyRepository ;
4943 private final ChatMemoryRepository chatMemoryRepository ;
5044
51- // KafkaTemplate 주입
52- private final KafkaTemplate <String , ChatPostProcessEvent > kafkaTemplate ;
53-
5445 @ Value ("${custom.ai.system-message}" )
5546 private String systemMessageTemplate ;
5647
57- // Kafka 토픽 이름 -> 추후 application.yml로 이동 고려
58- private static final String POST_PROCESSING_TOPIC = "chat-post-processing" ;
59-
6048 // 핵심 로직
6149 // 멤버 조회 -> 벡터 검색 -> 프롬프트 생성 -> LLM 호출 (스트림) -> Kafka 이벤트 발행 -> 응답 반환
6250 @ Transactional
6351 public Flux <ChatResponse > sendMessage (Long memberId , ChatRequest chatRequestDto , Long roomId ) {
64- return Mono .fromCallable (() -> {
6552
66- // 벡터 검색 (판례, 법령) (블로킹 )
67- List <Document > similarCaseDocuments = qdrantService .searchDocument (chatRequestDto .getMessage (), "type" , "판례" );
68- List <Document > similarLawDocuments = qdrantService .searchDocument (chatRequestDto .getMessage (), "type" , "법령" );
53+ // 벡터 검색 (판례, 법령)
54+ List <Document > similarCaseDocuments = qdrantService .searchDocument (chatRequestDto .getMessage (), "type" , "판례" );
55+ List <Document > similarLawDocuments = qdrantService .searchDocument (chatRequestDto .getMessage (), "type" , "법령" );
6956
70- String caseContext = formatting (similarCaseDocuments );
71- String lawContext = formatting (similarLawDocuments );
57+ String caseContext = formatting (similarCaseDocuments );
58+ String lawContext = formatting (similarLawDocuments );
7259
73- // 채팅방 조회 또는 생성 (블로킹)
74- History history = getOrCreateRoom (memberId , roomId );
60+ // 채팅방 조회 또는 생성
61+ History history = getOrCreateRoom (memberId , roomId );
7562
76- // 메시지 기억 관리 (User 메시지 추가)
77- ChatMemory chatMemory = saveChatMemory (chatRequestDto , history );
63+ // 메시지 기억 관리
64+ ChatMemory chatMemory = saveChatMemory (chatRequestDto , history );
7865
79- // 프롬프트 생성
80- Prompt prompt = getPrompt (caseContext , lawContext , chatMemory , history );
66+ // 프롬프트 생성
67+ Prompt prompt = getPrompt (caseContext , lawContext , chatMemory , history );
8168
82- // 준비된 데이터를 담은 컨텍스트 객체 반환
83- return new PreparedChatContext (prompt , history , similarCaseDocuments , similarLawDocuments );
84- })
85- .subscribeOn (Schedulers .boundedElastic ()) // 블로킹 작업을 별도 스레드에서 실행
86- .flatMapMany (context -> {
87- // LLM 스트리밍 호출 및 클라이언트에게 즉시 응답
88- return chatClient .prompt (context .prompt )
69+ return chatClient .prompt (prompt )
8970 .stream ()
9071 .content ()
9172 .collectList ()
9273 .map (fullResponseList -> String .join ("" , fullResponseList ))
93- .doOnNext (fullResponse -> {
94-
95- // Document를 DTO로 변환
96- List <DocumentDto > caseDtos = context .similarCaseDocuments .stream ().map (DocumentDto ::from ).collect (Collectors .toList ());
97- List <DocumentDto > lawDtos = context .similarLawDocuments .stream ().map (DocumentDto ::from ).collect (Collectors .toList ());
98-
99- // Kafka로 보낼 이벤트 객체
100- ChatPostProcessEvent event = new ChatPostProcessEvent (
101- context .history .getHistoryId (),
102- chatRequestDto .getMessage (),
103- fullResponse ,
104- caseDtos ,
105- lawDtos
106- );
107-
108- // Kafka 이벤트 발행
109- kafkaTemplate .send (POST_PROCESSING_TOPIC , event );
110-
111- })
112- .map (fullResponse -> createChatResponse (context .history , fullResponse , context .similarCaseDocuments , context .similarLawDocuments ))
74+ .doOnNext (fullResponse -> asyncPostChatProcessingService .processHandlerTasks (history .getHistoryId (), chatRequestDto .getMessage (), fullResponse , similarCaseDocuments , similarLawDocuments ))
75+ .map (fullResponse -> createChatResponse (history , fullResponse , similarCaseDocuments , similarLawDocuments ))
11376 .flux ()
11477 .onErrorResume (throwable -> {
115- log .error ("스트리밍 처리 중 에러 발생 (historyId: {})" , context . history .getHistoryId (), throwable );
116- return Flux .just (handleError (context . history ));
78+ log .error ("스트리밍 처리 중 에러 발생 (historyId: {})" , history .getHistoryId (), throwable );
79+ return Flux .just (handleError (history ));
11780 });
118- });
81+
11982 }
12083
12184 private ChatResponse createChatResponse (History history , String fullResponse , List <Document > cases , List <Document > laws ) {
@@ -185,23 +148,4 @@ private ChatResponse handleError(History history) {
185148 .build ();
186149 }
187150
188- /**
189- * 블로킹 작업에서 준비된 데이터를 담는 컨텍스트 클래스
190- * 리액티브 체인에서 데이터를 전달하기 위한 내부 클래스
191- */
192- private static class PreparedChatContext {
193- final Prompt prompt ;
194- final History history ;
195- final List <Document > similarCaseDocuments ;
196- final List <Document > similarLawDocuments ;
197-
198- PreparedChatContext (Prompt prompt , History history ,
199- List <Document > similarCaseDocuments ,
200- List <Document > similarLawDocuments ) {
201- this .prompt = prompt ;
202- this .history = history ;
203- this .similarCaseDocuments = similarCaseDocuments ;
204- this .similarLawDocuments = similarLawDocuments ;
205- }
206- }
207151}
0 commit comments