99 BlockOutput ,
1010 BlockSchemaInput ,
1111 BlockSchemaOutput ,
12+ BlockType ,
1213)
13- from backend .data .execution import ExecutionStatus
14+ from backend .data .execution import ExecutionContext , ExecutionStatus
1415from backend .data .human_review import ReviewResult
1516from backend .data .model import SchemaField
1617from backend .executor .manager import async_update_node_execution_status
@@ -61,15 +62,15 @@ def __init__(self):
6162 categories = {BlockCategory .BASIC },
6263 input_schema = HumanInTheLoopBlock .Input ,
6364 output_schema = HumanInTheLoopBlock .Output ,
65+ block_type = BlockType .HUMAN_IN_THE_LOOP ,
6466 test_input = {
6567 "data" : {"name" : "John Doe" , "age" : 30 },
6668 "name" : "User profile data" ,
6769 "editable" : True ,
6870 },
6971 test_output = [
70- ("reviewed_data" , {"name" : "John Doe" , "age" : 30 }),
7172 ("status" , "approved" ),
72- ("review_message " , "" ),
73+ ("reviewed_data " , { "name" : "John Doe" , "age" : 30 } ),
7374 ],
7475 test_mock = {
7576 "get_or_create_human_review" : lambda * _args , ** _kwargs : ReviewResult (
@@ -80,9 +81,25 @@ def __init__(self):
8081 node_exec_id = "test-node-exec-id" ,
8182 ),
8283 "update_node_execution_status" : lambda * _args , ** _kwargs : None ,
84+ "update_review_processed_status" : lambda * _args , ** _kwargs : None ,
8385 },
8486 )
8587
88+ async def get_or_create_human_review (self , ** kwargs ):
89+ return await get_database_manager_async_client ().get_or_create_human_review (
90+ ** kwargs
91+ )
92+
93+ async def update_node_execution_status (self , ** kwargs ):
94+ return await async_update_node_execution_status (
95+ db_client = get_database_manager_async_client (), ** kwargs
96+ )
97+
98+ async def update_review_processed_status (self , node_exec_id : str , processed : bool ):
99+ return await get_database_manager_async_client ().update_review_processed_status (
100+ node_exec_id , processed
101+ )
102+
86103 async def run (
87104 self ,
88105 input_data : Input ,
@@ -92,20 +109,20 @@ async def run(
92109 graph_exec_id : str ,
93110 graph_id : str ,
94111 graph_version : int ,
112+ execution_context : ExecutionContext ,
95113 ** kwargs ,
96114 ) -> BlockOutput :
97- """
98- Execute the Human In The Loop block.
115+ if not execution_context .safe_mode :
116+ logger .info (
117+ f"HITL block skipping review for node { node_exec_id } - safe mode disabled"
118+ )
119+ yield "status" , "approved"
120+ yield "reviewed_data" , input_data .data
121+ yield "review_message" , "Auto-approved (safe mode disabled)"
122+ return
99123
100- This method uses one function to handle the complete workflow - checking existing reviews
101- and creating pending ones as needed.
102- """
103124 try :
104- logger .debug (f"HITL block executing for node { node_exec_id } " )
105-
106- # Use the data layer to handle the complete workflow
107- db_client = get_database_manager_async_client ()
108- result = await db_client .get_or_create_human_review (
125+ result = await self .get_or_create_human_review (
109126 user_id = user_id ,
110127 node_exec_id = node_exec_id ,
111128 graph_exec_id = graph_exec_id ,
@@ -119,32 +136,24 @@ async def run(
119136 logger .error (f"Error in HITL block for node { node_exec_id } : { str (e )} " )
120137 raise
121138
122- # Check if we're waiting for human input
123139 if result is None :
124140 logger .info (
125141 f"HITL block pausing execution for node { node_exec_id } - awaiting human review"
126142 )
127143 try :
128- # Set node status to REVIEW so execution manager can't mark it as COMPLETED
129- # The VALID_STATUS_TRANSITIONS will then prevent any unwanted status changes
130- # Use the proper wrapper function to ensure websocket events are published
131- await async_update_node_execution_status (
132- db_client = db_client ,
144+ await self .update_node_execution_status (
133145 exec_id = node_exec_id ,
134146 status = ExecutionStatus .REVIEW ,
135147 )
136- # Execution pauses here until API routes process the review
137148 return
138149 except Exception as e :
139150 logger .error (
140151 f"Failed to update node status for HITL block { node_exec_id } : { str (e )} "
141152 )
142153 raise
143154
144- # Review is complete (approved or rejected) - check if unprocessed
145155 if not result .processed :
146- # Mark as processed before yielding
147- await db_client .update_review_processed_status (
156+ await self .update_review_processed_status (
148157 node_exec_id = node_exec_id , processed = True
149158 )
150159
0 commit comments