@@ -43,6 +43,8 @@ def __init__(
4343 self .args = args
4444 self .remote_vit = args .enable_remote_vit or args .run_mode == "visual"
4545 self .cache_port = cache_port
46+ self .visual_port = visual_port
47+ self .next_module_port = next_module_port
4648 self .waiting_reqs : List [GroupReqIndexes ] = []
4749 self .infer_batch_size = args .visual_infer_batch_size
4850 self .trust_remote_code = args .trust_remote_code
@@ -151,20 +153,16 @@ async def loop_for_fwd(self):
151153 processing_group_reqs = []
152154 images_need_infer = []
153155
154- def _recv_reqs (self ):
155- if self .remote_vit :
156- recv_req : GroupReqIndexes = self .recv_from_httpserver .recv_pyobj (zmq .NOBLOCK )
157- for img in recv_req .multimodal_params .images :
158- image_patch = self .tokenizer .get_image_patch_func (img )
159- data = img ._preload_data
160- # img._preload_data = None
161- md5sum = "{}_{}" .format (hashlib .md5 (data ).hexdigest (), image_patch )
162- md5 = int (md5sum , 16 )
163- # create_shm(get_shm_name_data(uid), data)
164- self .cache_client .root .set_items_data ([md5 ])
165- return recv_req
166- else :
167- return self .recv_from_httpserver .recv_pyobj (zmq .NOBLOCK )
156+ # def _recv_reqs(self):
157+ # if self.remote_vit:
158+ # recv_req: GroupReqIndexes = self.recv_from_httpserver.recv_pyobj(zmq.NOBLOCK)
159+ # recv_req.multimodal_params.images[:]= [
160+ # img for img in recv_req.multimodal_params.images
161+ # if not self.cache_client.root.get_item_embed(img.uuid) # embed已存在的被丢弃 , ref +1
162+ # ]
163+ # return recv_req
164+ # else:
165+ # return self.recv_from_httpserver.recv_pyobj(zmq.NOBLOCK)
168166
169167 async def loop_for_netio_req (self ):
170168 if not hasattr (self , "visual_recv_max_count" ):
@@ -173,7 +171,7 @@ async def loop_for_netio_req(self):
173171 while True :
174172 try :
175173 for _ in range (self .visual_recv_max_count ):
176- recv_req : GroupReqIndexes = self ._recv_reqs ( )
174+ recv_req : GroupReqIndexes = self .recv_from_httpserver . recv_pyobj ( zmq . NOBLOCK )
177175 if isinstance (recv_req , GroupReqIndexes ):
178176 self .waiting_reqs .append (recv_req )
179177 else :
@@ -196,8 +194,6 @@ async def loop_for_fwd_visual_only(self):
196194 visual_req = self .waiting_reqs .pop (0 )
197195
198196 for img in visual_req .multimodal_params .images :
199- if img .is_abort :
200- continue
201197 images_need_infer .append (img )
202198
203199 if len (images_need_infer ) == self .infer_batch_size :
@@ -249,9 +245,6 @@ def handle_exception(loop, context):
249245 loop = asyncio .new_event_loop ()
250246 loop .set_exception_handler (handle_exception )
251247 asyncio .set_event_loop (loop )
252- if args .run_mode == "visual" :
253- loop .create_task (visualserver .loop_for_fwd_visual_only ())
254- else :
255- loop .create_task (visualserver .loop_for_fwd ())
248+ create_forward_loop (visualserver , loop )
256249 loop .run_until_complete (visualserver .loop_for_netio_req ())
257250 return
0 commit comments