@@ -37,8 +37,8 @@ from crewai.flow.flow import Flow, start, listen
3737from crewai.flow.human_feedback import human_feedback
3838
3939class SimpleReviewFlow (Flow ):
40- @human_feedback (request = " 이 콘텐츠를 검토해 주세요:" )
4140 @start ()
41+ @human_feedback (message = " 이 콘텐츠를 검토해 주세요:" )
4242 def generate_content (self ):
4343 return " 검토가 필요한 AI 생성 콘텐츠입니다."
4444
@@ -63,19 +63,20 @@ flow.kickoff()
6363
6464| 매개변수 | 타입 | 필수 | 설명 |
6565| ----------| ------| ------| ------|
66- | ` request ` | ` str ` | 예 | 메서드 출력과 함께 인간에게 표시되는 메시지 |
66+ | ` message ` | ` str ` | 예 | 메서드 출력과 함께 인간에게 표시되는 메시지 |
6767| ` emit ` | ` Sequence[str] ` | 아니오 | 가능한 outcome 목록. 피드백이 이 중 하나로 매핑되어 ` @listen ` 데코레이터를 트리거합니다 |
6868| ` llm ` | ` str \| BaseLLM ` | ` emit ` 지정 시 | 피드백을 해석하고 outcome에 매핑하는 데 사용되는 LLM |
6969| ` default_outcome ` | ` str ` | 아니오 | 피드백이 제공되지 않을 때 사용할 outcome. ` emit ` 에 있어야 합니다 |
7070| ` metadata ` | ` dict ` | 아니오 | 엔터프라이즈 통합을 위한 추가 데이터 |
71+ | ` provider ` | ` HumanFeedbackProvider ` | 아니오 | 비동기/논블로킹 피드백을 위한 커스텀 프로바이더. [ 비동기 인간 피드백] ( #비동기-인간-피드백-논블로킹 ) 참조 |
7172
7273### 기본 사용법 (라우팅 없음)
7374
7475` emit ` 을 지정하지 않으면, 데코레이터는 단순히 피드백을 수집하고 다음 리스너에 ` HumanFeedbackResult ` 를 전달합니다:
7576
7677``` python Code
77- @human_feedback (request = " 이 분석에 대해 어떻게 생각하시나요?" )
7878@start ()
79+ @human_feedback (message = " 이 분석에 대해 어떻게 생각하시나요?" )
7980def analyze_data (self ):
8081 return " 분석 결과: 매출 15% 증가, 비용 8% 감소"
8182
@@ -91,13 +92,13 @@ def handle_feedback(self, result):
9192` emit ` 을 지정하면, 데코레이터는 라우터가 됩니다. 인간의 자유 형식 피드백이 LLM에 의해 해석되어 지정된 outcome 중 하나로 매핑됩니다:
9293
9394``` python Code
95+ @start ()
9496@human_feedback (
95- request = " 이 콘텐츠의 출판을 승인하시겠습니까?" ,
97+ message = " 이 콘텐츠의 출판을 승인하시겠습니까?" ,
9698 emit = [" approved" , " rejected" , " needs_revision" ],
9799 llm = " gpt-4o-mini" ,
98100 default_outcome = " needs_revision" ,
99101)
100- @start ()
101102def review_content (self ):
102103 return " 블로그 게시물 초안 내용..."
103104
@@ -212,13 +213,13 @@ class ContentApprovalFlow(Flow[ContentState]):
212213 self .state.draft = f " # { topic} \n\n { topic} 에 대한 초안입니다... "
213214 return self .state.draft
214215
216+ @listen (generate_draft)
215217 @human_feedback (
216- request = " 이 초안을 검토해 주세요. 'approved', 'rejected'로 답하거나 수정 피드백을 제공해 주세요:" ,
218+ message = " 이 초안을 검토해 주세요. 'approved', 'rejected'로 답하거나 수정 피드백을 제공해 주세요:" ,
217219 emit = [" approved" , " rejected" , " needs_revision" ],
218220 llm = " gpt-4o-mini" ,
219221 default_outcome = " needs_revision" ,
220222 )
221- @listen (generate_draft)
222223 def review_draft (self , draft ):
223224 return draft
224225
@@ -278,37 +279,37 @@ Flow 완료. 요청된 수정: 0
278279
279280## 다른 데코레이터와 결합하기
280281
281- ` @human_feedback ` 데코레이터는 다른 Flow 데코레이터와 함께 작동합니다. 순서가 중요합니다 :
282+ ` @human_feedback ` 데코레이터는 다른 Flow 데코레이터와 함께 작동합니다. 가장 안쪽 데코레이터(함수에 가장 가까운)로 배치하세요 :
282283
283284``` python Code
284- # 올바름: @human_feedback이 Flow 데코레이터를 감쌉니다
285- @human_feedback (request = " 이것을 검토해 주세요:" )
285+ # 올바름: @human_feedback이 가장 안쪽(함수에 가장 가까움)
286286@start ()
287+ @human_feedback (message = " 이것을 검토해 주세요:" )
287288def my_start_method (self ):
288289 return " content"
289290
290- @human_feedback (request = " 이것도 검토해 주세요:" )
291291@listen (other_method)
292+ @human_feedback (message = " 이것도 검토해 주세요:" )
292293def my_listener (self , data ):
293294 return f " processed: { data} "
294295```
295296
296297<Tip >
297- ` @human_feedback ` 를 가장 바깥쪽 데코레이터(첫 번째/상단 )로 배치하여 메서드가 완료된 후 실행되고 반환 값을 캡처할 수 있도록 하세요.
298+ ` @human_feedback ` 를 가장 안쪽 데코레이터(마지막/함수에 가장 가까움 )로 배치하여 메서드를 직접 래핑하고 Flow 시스템에 전달하기 전에 반환 값을 캡처할 수 있도록 하세요.
298299</Tip >
299300
300301## 모범 사례
301302
302303### 1. 명확한 요청 메시지 작성
303304
304- ` request ` 매개변수는 인간이 보는 것입니다. 실행 가능하게 만드세요:
305+ ` message ` 매개변수는 인간이 보는 것입니다. 실행 가능하게 만드세요:
305306
306307``` python Code
307308# ✅ 좋음 - 명확하고 실행 가능
308- @human_feedback (request = " 이 요약이 핵심 포인트를 정확하게 캡처했나요? '예'로 답하거나 무엇이 빠졌는지 설명해 주세요:" )
309+ @human_feedback (message = " 이 요약이 핵심 포인트를 정확하게 캡처했나요? '예'로 답하거나 무엇이 빠졌는지 설명해 주세요:" )
309310
310311# ❌ 나쁨 - 모호함
311- @human_feedback (request = " 이것을 검토해 주세요:" )
312+ @human_feedback (message = " 이것을 검토해 주세요:" )
312313```
313314
314315### 2. 의미 있는 Outcome 선택
@@ -329,7 +330,7 @@ emit=["state_1", "state_2", "state_3"]
329330
330331``` python Code
331332@human_feedback (
332- request = " 승인하시겠습니까? (수정 요청하려면 Enter 누르세요)" ,
333+ message = " 승인하시겠습니까? (수정 요청하려면 Enter 누르세요)" ,
333334 emit = [" approved" , " needs_revision" ],
334335 llm = " gpt-4o-mini" ,
335336 default_outcome = " needs_revision" , # 안전한 기본값
@@ -365,9 +366,216 @@ Flow를 설계할 때, 라우팅이 필요한지 고려하세요:
365366| 승인/거부/수정이 있는 승인 게이트 | ` emit ` 사용 |
366367| 로깅만을 위한 코멘트 수집 | ` emit ` 없음 |
367368
369+ ## 비동기 인간 피드백 (논블로킹)
370+
371+ 기본적으로 ` @human_feedback ` 은 콘솔 입력을 기다리며 실행을 차단합니다. 프로덕션 애플리케이션에서는 Slack, 이메일, 웹훅 또는 API와 같은 외부 시스템과 통합되는 ** 비동기/논블로킹** 피드백이 필요할 수 있습니다.
372+
373+ ### Provider 추상화
374+
375+ 커스텀 피드백 수집 전략을 지정하려면 ` provider ` 매개변수를 사용하세요:
376+
377+ ``` python Code
378+ from crewai.flow import Flow, start, human_feedback, HumanFeedbackProvider, HumanFeedbackPending, PendingFeedbackContext
379+
380+ class WebhookProvider (HumanFeedbackProvider ):
381+ """ 웹훅 콜백을 기다리며 Flow를 일시 중지하는 Provider."""
382+
383+ def __init__ (self , webhook_url : str ):
384+ self .webhook_url = webhook_url
385+
386+ def request_feedback (self , context : PendingFeedbackContext, flow : Flow) -> str :
387+ # 외부 시스템에 알림 (예: Slack 메시지 전송, 티켓 생성)
388+ self .send_notification(context)
389+
390+ # 실행 일시 중지 - 프레임워크가 자동으로 영속성 처리
391+ raise HumanFeedbackPending(
392+ context = context,
393+ callback_info = {" webhook_url" : f " { self .webhook_url} / { context.flow_id} " }
394+ )
395+
396+ class ReviewFlow (Flow ):
397+ @start ()
398+ @human_feedback (
399+ message = " 이 콘텐츠를 검토해 주세요:" ,
400+ emit = [" approved" , " rejected" ],
401+ llm = " gpt-4o-mini" ,
402+ provider = WebhookProvider(" https://myapp.com/api" ),
403+ )
404+ def generate_content (self ):
405+ return " AI가 생성한 콘텐츠..."
406+
407+ @listen (" approved" )
408+ def publish (self , result ):
409+ return " 출판됨!"
410+ ```
411+
412+ <Tip >
413+ Flow 프레임워크는 ` HumanFeedbackPending ` 이 발생하면 ** 자동으로 상태를 영속화** 합니다. Provider는 외부 시스템에 알리고 예외를 발생시키기만 하면 됩니다—수동 영속성 호출이 필요하지 않습니다.
414+ </Tip >
415+
416+ ### 일시 중지된 Flow 처리
417+
418+ 비동기 provider를 사용하면 ` kickoff() ` 는 예외를 발생시키는 대신 ` HumanFeedbackPending ` 객체를 반환합니다:
419+
420+ ``` python Code
421+ flow = ReviewFlow()
422+ result = flow.kickoff()
423+
424+ if isinstance (result, HumanFeedbackPending):
425+ # Flow가 일시 중지됨, 상태가 자동으로 영속화됨
426+ print (f " 피드백 대기 중: { result.callback_info[' webhook_url' ]} " )
427+ print (f " Flow ID: { result.context.flow_id} " )
428+ else :
429+ # 정상 완료
430+ print (f " Flow 완료: { result} " )
431+ ```
432+
433+ ### 일시 중지된 Flow 재개
434+
435+ 피드백이 도착하면 (예: 웹훅을 통해) Flow를 재개합니다:
436+
437+ ``` python Code
438+ # 동기 핸들러:
439+ def handle_feedback_webhook (flow_id : str , feedback : str ):
440+ flow = ReviewFlow.from_pending(flow_id)
441+ result = flow.resume(feedback)
442+ return result
443+
444+ # 비동기 핸들러 (FastAPI, aiohttp 등):
445+ async def handle_feedback_webhook (flow_id : str , feedback : str ):
446+ flow = ReviewFlow.from_pending(flow_id)
447+ result = await flow.resume_async(feedback)
448+ return result
449+ ```
450+
451+ ### 주요 타입
452+
453+ | 타입 | 설명 |
454+ | ------| ------|
455+ | ` HumanFeedbackProvider ` | 커스텀 피드백 provider를 위한 프로토콜 |
456+ | ` PendingFeedbackContext ` | 일시 중지된 Flow를 재개하는 데 필요한 모든 정보 포함 |
457+ | ` HumanFeedbackPending ` | Flow가 피드백을 위해 일시 중지되면 ` kickoff() ` 에서 반환됨 |
458+ | ` ConsoleProvider ` | 기본 블로킹 콘솔 입력 provider |
459+
460+ ### PendingFeedbackContext
461+
462+ 컨텍스트는 재개에 필요한 모든 것을 포함합니다:
463+
464+ ``` python Code
465+ @dataclass
466+ class PendingFeedbackContext :
467+ flow_id: str # 이 Flow 실행의 고유 식별자
468+ flow_class: str # 정규화된 클래스 이름
469+ method_name: str # 피드백을 트리거한 메서드
470+ method_output: Any # 인간에게 표시된 출력
471+ message: str # 요청 메시지
472+ emit: list[str ] | None # 라우팅을 위한 가능한 outcome
473+ default_outcome: str | None
474+ metadata: dict # 커스텀 메타데이터
475+ llm: str | None # outcome 매핑을 위한 LLM
476+ requested_at: datetime
477+ ```
478+
479+ ### 완전한 비동기 Flow 예제
480+
481+ ``` python Code
482+ from crewai.flow import (
483+ Flow, start, listen, human_feedback,
484+ HumanFeedbackProvider, HumanFeedbackPending, PendingFeedbackContext
485+ )
486+
487+ class SlackNotificationProvider (HumanFeedbackProvider ):
488+ """ Slack 알림을 보내고 비동기 피드백을 위해 일시 중지하는 Provider."""
489+
490+ def __init__ (self , channel : str ):
491+ self .channel = channel
492+
493+ def request_feedback (self , context : PendingFeedbackContext, flow : Flow) -> str :
494+ # Slack 알림 전송 (직접 구현)
495+ slack_thread_id = self .post_to_slack(
496+ channel = self .channel,
497+ message = f " 검토 필요: \n\n { context.method_output} \n\n { context.message} " ,
498+ )
499+
500+ # 실행 일시 중지 - 프레임워크가 자동으로 영속성 처리
501+ raise HumanFeedbackPending(
502+ context = context,
503+ callback_info = {
504+ " slack_channel" : self .channel,
505+ " thread_id" : slack_thread_id,
506+ }
507+ )
508+
509+ class ContentPipeline (Flow ):
510+ @start ()
511+ @human_feedback (
512+ message = " 이 콘텐츠의 출판을 승인하시겠습니까?" ,
513+ emit = [" approved" , " rejected" , " needs_revision" ],
514+ llm = " gpt-4o-mini" ,
515+ default_outcome = " needs_revision" ,
516+ provider = SlackNotificationProvider(" #content-reviews" ),
517+ )
518+ def generate_content (self ):
519+ return " AI가 생성한 블로그 게시물 콘텐츠..."
520+
521+ @listen (" approved" )
522+ def publish (self , result ):
523+ print (f " 출판 중! 검토자 의견: { result.feedback} " )
524+ return {" status" : " published" }
525+
526+ @listen (" rejected" )
527+ def archive (self , result ):
528+ print (f " 보관됨. 이유: { result.feedback} " )
529+ return {" status" : " archived" }
530+
531+ @listen (" needs_revision" )
532+ def queue_revision (self , result ):
533+ print (f " 수정 대기열에 추가됨: { result.feedback} " )
534+ return {" status" : " revision_needed" }
535+
536+
537+ # Flow 시작 (Slack 응답을 기다리며 일시 중지)
538+ def start_content_pipeline ():
539+ flow = ContentPipeline()
540+ result = flow.kickoff()
541+
542+ if isinstance (result, HumanFeedbackPending):
543+ return {" status" : " pending" , " flow_id" : result.context.flow_id}
544+
545+ return result
546+
547+
548+ # Slack 웹훅이 실행될 때 재개 (동기 핸들러)
549+ def on_slack_feedback (flow_id : str , slack_message : str ):
550+ flow = ContentPipeline.from_pending(flow_id)
551+ result = flow.resume(slack_message)
552+ return result
553+
554+
555+ # 핸들러가 비동기인 경우 (FastAPI, aiohttp, Slack Bolt 비동기 등)
556+ async def on_slack_feedback_async (flow_id : str , slack_message : str ):
557+ flow = ContentPipeline.from_pending(flow_id)
558+ result = await flow.resume_async(slack_message)
559+ return result
560+ ```
561+
562+ <Warning >
563+ 비동기 웹 프레임워크(FastAPI, aiohttp, Slack Bolt 비동기 모드)를 사용하는 경우 ` flow.resume() ` 대신 ` await flow.resume_async() ` 를 사용하세요. 실행 중인 이벤트 루프 내에서 ` resume() ` 을 호출하면 ` RuntimeError ` 가 발생합니다.
564+ </Warning >
565+
566+ ### 비동기 피드백 모범 사례
567+
568+ 1 . ** 반환 타입 확인** : ` kickoff() ` 는 일시 중지되면 ` HumanFeedbackPending ` 을 반환합니다—try/except가 필요하지 않습니다
569+ 2 . ** 올바른 resume 메서드 사용** : 동기 코드에서는 ` resume() ` , 비동기 코드에서는 ` await resume_async() ` 사용
570+ 3 . ** 콜백 정보 저장** : ` callback_info ` 를 사용하여 웹훅 URL, 티켓 ID 등을 저장
571+ 4 . ** 멱등성 구현** : 안전을 위해 resume 핸들러는 멱등해야 합니다
572+ 5 . ** 자동 영속성** : ` HumanFeedbackPending ` 이 발생하면 상태가 자동으로 저장되며 기본적으로 ` SQLiteFlowPersistence ` 사용
573+ 6 . ** 커스텀 영속성** : 필요한 경우 ` from_pending() ` 에 커스텀 영속성 인스턴스 전달
574+
368575## 관련 문서
369576
370577- [ Flow 개요] ( /ko/concepts/flows ) - CrewAI Flow에 대해 알아보기
371578- [ Flow 상태 관리] ( /ko/guides/flows/mastering-flow-state ) - Flow에서 상태 관리하기
579+ - [ Flow 영속성] ( /ko/concepts/flows#persistence ) - Flow 상태 영속화
372580- [ @router 를 사용한 라우팅] ( /ko/concepts/flows#router ) - 조건부 라우팅에 대해 더 알아보기
373581- [ 실행 시 인간 입력] ( /ko/learn/human-input-on-execution ) - 태스크 수준 인간 입력
0 commit comments