Skip to content

Multi-threading processing error #2227

@jietang00723-cloud

Description

@jietang00723-cloud

[ ] I have checked the documentation and related resources and couldn't resolve my bug.

Describe the bug
When I used the evaluate() method to conduct multi-threaded evaluation of my rag application, an Exception occurred: Exception raised in Job[0]: AttributeError('NoneType' object has no attribute 'generate'). But I checked my Langsmith tracking and found that there were records. But the final result I got here is NAN. Can the evaluate() function not be processed using multi-threading? Only coroutines or sequential processing can be used.

Ragas version: 0.2.15
Python version: v3.12.5

Code to Reproduce

def get_eva_result_from_llm(self,data_q:Queue,result_dict,rag_run_config:dict,stop_event=None,**kwargs):
        self.log.debug(f"开启了评测线程")
        while True:
            if stop_event and stop_event.is_set():
                break
            try:
                index,cont = data_q.get(timeout=150)
                if index is None:
                    data_q.task_done()
                    break
                if isinstance(cont,dict):
                    self.log.debug(f"cont: \n{cont}")
                    cont_copy = cont.copy()

                    for i in cont_copy:
                        if i == "retrieved_contexts":
                            if len(cont[i][0]) == 0:
                                cont[i][0].append("无")
                                self.log.info(f"index:{index},cont_copy:{cont_copy}")
                                continue
                        if len(cont[i]) == 0:
                            cont[i].append("无") 

                    rag_run_config["eval_data"] = cont
                    eva_id =  f"{RunAgentConfig.TASK_ID}-{index}"
                    rag_run_config["experiment_name"] = eva_id
                    result=self.ragEva.ragas_evaluation_from_dict(**rag_run_config)
                    result_dict[index] = result
                    self.log.info(f"eva_id:{eva_id} 评测完成")
                else:
                    self.log.error(f"数据格式不对,评测失败,index:{index},type:{type(cont)},need type:dict()")
            except Exception as e:
                self.log.error(f"评测出错:\n {str(e)}",exc_info=True)
            finally:
                if index  is not None:
                    data_q.task_done()

def test_after_use_llm(self,test_case_path=None,config=None):

        self.log.info("开始工作")

        if test_case_path is None:
            test_case_path = self.rag_run_config.get("test_case_path","空")
        if config is not None:
            self.updata_config(config)
        request_q = Queue(maxsize=10)  # 可选参数:队列最大容量
        response_q =Queue(maxsize=10)
        manager = Manager()
        result_dict = manager.dict()  # 线程安全的字典

        self.log.debug("原始测试用例数据加载中")
        df = self.evadataTool.get_data_from_excel(test_case_path)
        df = df.head(4)
        if df.empty:
            self.log.error(f"test case path is null,case path :{test_case_path}")
            return result_dict
        self.log.debug(f"原始数据加载完毕,共 {len(df)} 条记录")
        max_workers = 6
        stage1_workers = max(1, max_workers // 3)
        stage2_workers = max(1, max_workers - stage1_workers)
        # 创建线程池(推荐使用with语句管理资源)
        try:
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                # 启动第一阶段工作线程
                stage1_futures = [
                    executor.submit(self.evadataTool.get_eva_data_to_queue,request_q,response_q)
                    for _ in range(stage1_workers)
                ]
                rag_run_config = self.rag_run_config
                stage2_futures = [
                    executor.submit(self.get_eva_result_from_llm,response_q,result_dict,rag_run_config)
                    for _ in range(stage2_workers)
                ]
                for i,row in df.iterrows():
                    request_q.put((i,row))
                for _ in range(stage1_workers):
                    request_q.put((None,None))
                # 等待第一阶段工作线程完成
                for f in stage1_futures:
                    f.result()
                for _ in range(stage2_workers):
                    response_q.put((None,None))
                for f in stage2_futures:
                    f.result()

            self.log.info(f"requeset_q: {request_q.qsize()},respons_q:{response_q.qsize()}")
            try:
                sorted_dict = {i:result_dict[i] for i in sorted(result_dict)}
                df = pd.concat(sorted_dict.values(),ignore_index=True)
                self.ragEva.get_result_to_html(df)
            except:
                pass
            finally:
                executor.shutdown(wait=False)
        finally:
            # 确保清理资源
            manager.shutdown()
            self.cleanup_async()
            for thread in threading.enumerate():
                self.log.info(f"  - {thread.name} (ID: {thread.ident}, 存活: {thread.is_alive()})")
                stack = sys._current_frames().get(thread.ident)
                if stack:
                    self.log.info("调用栈:")
                    for filename, lineno, name, line in traceback.extract_stack(stack):
                        self.log.info(f"  {filename}:{lineno} - {name}() - {line or ''}")
                else:
                    self.log.info("  无法获取调用栈信息")
                
                self.log.info("-" * 60)

Error trace

 Job[0]: AttributeError('NoneType' object has no attribute 'generate')

Expected behavior
The evaluation can be conducted using multi-threading, and the evaluation results are stable without any NAN values.

Additional context
Add any other context about the problem here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions