@@ -1012,6 +1012,13 @@ def test_extract_error_code_parses_detail_and_regex_and_unknown():
10121012 assert extract_error_code ("no code here" ) == "unknown_error"
10131013
10141014
1015+ def test_extract_error_code_top_level_key ():
1016+ from backend .data_process .tasks import extract_error_code
1017+
1018+ payload = json .dumps ({"error_code" : "top_level" })
1019+ assert extract_error_code (payload ) == "top_level"
1020+
1021+
10151022def test_save_error_to_redis_branches (monkeypatch ):
10161023 from backend .data_process .tasks import save_error_to_redis
10171024
@@ -1112,6 +1119,58 @@ def test_process_error_fallback_when_save_error_raises(monkeypatch, tmp_path):
11121119 ) or self .states == []
11131120
11141121
1122+ def test_process_error_truncates_reason_when_no_error_code (monkeypatch , tmp_path ):
1123+ """process should truncate long messages when extract_error_code is falsy"""
1124+ tasks , fake_ray = import_tasks_with_fake_ray (monkeypatch , initialized = True )
1125+
1126+ long_msg = "x" * 250
1127+ error_json = json .dumps ({"message" : long_msg })
1128+
1129+ # Provide actor but make ray.get raise inside the try block
1130+ class FakeActor :
1131+ def __init__ (self ):
1132+ self .process_file = types .SimpleNamespace (remote = lambda * a , ** k : "ref_err" )
1133+ self .store_chunks_in_redis = types .SimpleNamespace (
1134+ remote = lambda * a , ** k : None )
1135+
1136+ monkeypatch .setattr (tasks , "get_ray_actor" , lambda : FakeActor ())
1137+ fake_ray .get = lambda * _ : (_ for _ in ()).throw (Exception (error_json ))
1138+ # Force extract_error_code to return None so truncation path executes
1139+ monkeypatch .setattr (tasks , "extract_error_code" , lambda * a , ** k : None )
1140+
1141+ calls : list [str ] = []
1142+
1143+ def save_and_capture (task_id , reason , start_time ):
1144+ calls .append (reason )
1145+
1146+ monkeypatch .setattr (tasks , "save_error_to_redis" , save_and_capture )
1147+
1148+ # Ensure source file exists so FileNotFound is not raised before ray.get
1149+ f = tmp_path / "exists.txt"
1150+ f .write_text ("data" )
1151+
1152+ self = FakeSelf ("trunc-proc" )
1153+ with pytest .raises (Exception ):
1154+ tasks .process (
1155+ self ,
1156+ source = str (f ),
1157+ source_type = "local" ,
1158+ chunking_strategy = "basic" ,
1159+ index_name = "idx" ,
1160+ original_filename = "f.txt" ,
1161+ )
1162+
1163+ # Captured reason should be truncated because error_code is falsy
1164+ assert len (calls ) >= 1
1165+ truncated_reason = calls [- 1 ]
1166+ assert truncated_reason .endswith ("..." )
1167+ assert len (truncated_reason ) <= 203
1168+ assert any (
1169+ s .get ("meta" , {}).get ("stage" ) == "text_extraction_failed"
1170+ for s in self .states
1171+ )
1172+
1173+
11151174def test_forward_cancel_check_warning_then_continue (monkeypatch ):
11161175 tasks , _ = import_tasks_with_fake_ray (monkeypatch )
11171176 monkeypatch .setattr (tasks , "ELASTICSEARCH_SERVICE" , "http://api" )
@@ -1197,6 +1256,58 @@ def post(self, *a, **k):
11971256 assert "detail_err" in str (exc .value )
11981257
11991258
1259+ def test_forward_index_documents_regex_error_code (monkeypatch ):
1260+ tasks , _ = import_tasks_with_fake_ray (monkeypatch )
1261+ monkeypatch .setattr (tasks , "ELASTICSEARCH_SERVICE" , "http://api" )
1262+ monkeypatch .setattr (tasks , "get_file_size" , lambda * a , ** k : 0 )
1263+
1264+ class FakeResponse :
1265+ status = 500
1266+
1267+ async def text (self ):
1268+ # Include quotes so regex r'\"error_code\": \"...\"' matches
1269+ return 'oops "error_code":"regex_branch"'
1270+
1271+ async def __aenter__ (self ):
1272+ return self
1273+
1274+ async def __aexit__ (self , * a ):
1275+ return False
1276+
1277+ class FakeSession :
1278+ def __init__ (self , * a , ** k ):
1279+ pass
1280+
1281+ async def __aenter__ (self ):
1282+ return self
1283+
1284+ async def __aexit__ (self , * a ):
1285+ return False
1286+
1287+ def post (self , * a , ** k ):
1288+ return FakeResponse ()
1289+
1290+ fake_aiohttp = types .SimpleNamespace (
1291+ TCPConnector = lambda verify_ssl = False : None ,
1292+ ClientTimeout = lambda total = None : None ,
1293+ ClientSession = FakeSession ,
1294+ ClientConnectorError = Exception ,
1295+ ClientResponseError = Exception ,
1296+ )
1297+ monkeypatch .setattr (tasks , "aiohttp" , fake_aiohttp )
1298+ monkeypatch .setattr (tasks , "run_async" , _run_coro )
1299+
1300+ self = FakeSelf ("regex-err" )
1301+ with pytest .raises (Exception ) as exc :
1302+ tasks .forward (
1303+ self ,
1304+ processed_data = {"chunks" : [{"content" : "x" , "metadata" : {}}]},
1305+ index_name = "idx" ,
1306+ source = "/a.txt" ,
1307+ )
1308+ assert "regex_branch" in str (exc .value )
1309+
1310+
12001311def test_forward_index_documents_client_connector_error (monkeypatch ):
12011312 tasks , _ = import_tasks_with_fake_ray (monkeypatch )
12021313 monkeypatch .setattr (tasks , "ELASTICSEARCH_SERVICE" , "http://api" )
@@ -1273,6 +1384,69 @@ def post(self, *a, **k):
12731384 assert "Failed to connect to API" in str (exc .value ) or "timeout" in str (exc .value ).lower ()
12741385
12751386
1387+ def test_forward_truncates_reason_when_no_error_code (monkeypatch ):
1388+ tasks , _ = import_tasks_with_fake_ray (monkeypatch )
1389+ monkeypatch .setattr (tasks , "ELASTICSEARCH_SERVICE" , "http://api" )
1390+ monkeypatch .setattr (tasks , "get_file_size" , lambda * a , ** k : 0 )
1391+ monkeypatch .setattr (tasks , "extract_error_code" , lambda * a , ** k : None )
1392+
1393+ long_msg = json .dumps ({"message" : "m" * 250 })
1394+ monkeypatch .setattr (
1395+ tasks , "run_async" , lambda coro : (_ for _ in ()).throw (Exception (long_msg ))
1396+ )
1397+
1398+ reasons : list [str ] = []
1399+ monkeypatch .setattr (
1400+ tasks , "save_error_to_redis" , lambda tid , reason , st : reasons .append (reason )
1401+ )
1402+
1403+ self = FakeSelf ("f-trunc" )
1404+ with pytest .raises (Exception ):
1405+ tasks .forward (
1406+ self ,
1407+ processed_data = {"chunks" : [{"content" : "x" , "metadata" : {}}]},
1408+ index_name = "idx" ,
1409+ source = "/a.txt" ,
1410+ )
1411+
1412+ assert reasons and reasons [0 ].endswith ("..." )
1413+ assert len (reasons [0 ]) <= 203
1414+ assert any (
1415+ s .get ("meta" , {}).get ("stage" ) == "forward_task_failed" for s in self .states
1416+ )
1417+
1418+
1419+ def test_forward_fallback_truncates_on_non_json_error (monkeypatch ):
1420+ tasks , _ = import_tasks_with_fake_ray (monkeypatch )
1421+ monkeypatch .setattr (tasks , "ELASTICSEARCH_SERVICE" , "http://api" )
1422+ monkeypatch .setattr (tasks , "get_file_size" , lambda * a , ** k : 0 )
1423+ monkeypatch .setattr (tasks , "extract_error_code" , lambda * a , ** k : None )
1424+
1425+ monkeypatch .setattr (
1426+ tasks , "run_async" , lambda coro : (_ for _ in ()).throw (Exception ("n" * 250 ))
1427+ )
1428+
1429+ reasons : list [str ] = []
1430+ monkeypatch .setattr (
1431+ tasks , "save_error_to_redis" , lambda tid , reason , st : reasons .append (reason )
1432+ )
1433+
1434+ self = FakeSelf ("f-fallback" )
1435+ with pytest .raises (Exception ):
1436+ tasks .forward (
1437+ self ,
1438+ processed_data = {"chunks" : [{"content" : "x" , "metadata" : {}}]},
1439+ index_name = "idx" ,
1440+ source = "/a.txt" ,
1441+ )
1442+
1443+ assert reasons and reasons [0 ].endswith ("..." )
1444+ assert len (reasons [0 ]) <= 203
1445+ assert any (
1446+ s .get ("meta" , {}).get ("stage" ) == "forward_task_failed" for s in self .states
1447+ )
1448+
1449+
12761450def test_forward_error_truncates_reason_and_uses_save (monkeypatch ):
12771451 tasks , _ = import_tasks_with_fake_ray (monkeypatch )
12781452 long_message = "m" * 250
0 commit comments