44from threading import Thread
55
66from playwright .async_api import async_playwright
7+ from playwright ._impl ._errors import TimeoutError
78
89from dtable_events .app .config import INNER_DTABLE_DB_URL , DTABLE_WEB_SERVICE_URL
10+ from dtable_events .convert_page .utils import get_pdf_print_options
911from dtable_events .utils import get_inner_dtable_server_url , get_opt_from_conf_or_env , uuid_str_to_36_chars
1012from dtable_events .utils .dtable_db_api import DTableDBAPI
1113from dtable_events .utils .dtable_server_api import DTableServerAPI , NotFoundException
1719
1820class BrowserWorker (Thread ):
1921
20- def __init__ (self , index , task_queue : Queue ):
22+ def __init__ (self , index , task_queue : Queue , pages = 10 ):
2123 super ().__init__ ()
2224 self .thread_id = index
2325 self .task_queue = task_queue
2426 self .playwright = None
2527 self .browser = None
2628 self .context = None
29+ self .pages = pages
2730
2831 self .loop = asyncio .new_event_loop () # each thread has own event loop
2932
@@ -99,6 +102,25 @@ def check_resources(self, dtable_uuid, plugin_type, page_id, table_id, target_co
99102 'row_ids' : row_ids
100103 }, None
101104
105+ async def row_page_to_pdf (self , url , context , row_id , action_type , per_converted_callbacks ):
106+ page = await context .new_page ()
107+ page .on ("request" , lambda request : logger .debug (f"Request: { request .method } { request .url } " ))
108+ page .on ("response" , lambda response : logger .debug (f"Response: { response .status } { response .url } " ))
109+ page .on ("console" , lambda msg : logger .debug (f"Console [{ msg .type } ]: { msg .text } " ))
110+ try :
111+ await page .goto (url , wait_until = "load" )
112+ await page .wait_for_load_state ('networkidle' , timeout = 180 * 1000 )
113+ content = await page .pdf (** get_pdf_print_options ())
114+ except TimeoutError :
115+ content = await page .pdf (** get_pdf_print_options ())
116+ await page .close ()
117+ if action_type == 'convert_page_to_pdf' :
118+ for callback in per_converted_callbacks :
119+ try :
120+ callback (row_id , content )
121+ except Exception as e :
122+ logger .exception (e )
123+
102124 async def convert_with_rows (self , task_info , resources ):
103125 dtable_uuid = task_info .get ('dtable_uuid' )
104126 plugin_type = task_info .get ('plugin_type' )
@@ -115,48 +137,35 @@ async def convert_with_rows(self, task_info, resources):
115137 context = await self .get_context ()
116138
117139 # convert
118- # open all tabs of rows step by step
140+ # open all tabs of rows pages by pages
119141 # wait render and convert to pdf one by one
120- step = 10
121- for i in range (0 , len (row_ids ), step ):
122- try :
123- step_row_ids = row_ids [i : i + step ]
124- # open rows
125- for row_id in step_row_ids :
126- url = ''
127- if plugin_type == 'page-design' :
128- url = DTABLE_WEB_SERVICE_URL .strip ('/' ) + '/dtable/%s/page-design/%s/row/%s/' % (uuid_str_to_36_chars (dtable_uuid ), page_id , row_id )
129- if not url :
130- continue
131- dtable_server_api = DTableServerAPI ('dtable-events' , dtable_uuid , dtable_server_url )
132- url += '?access-token=%s&need_convert=%s' % (dtable_server_api .internal_access_token , 0 )
133- page = await context .new_page ()
134- page .on ("request" , lambda request : logger .debug (f"Request: { request .method } { request .url } " ))
135- page .on ("response" , lambda response : logger .debug (f"Response: { response .status } { response .url } " ))
136- page .on ("console" , lambda msg : logger .debug (f"Console [{ msg .type } ]: { msg .text } " ))
137- await page .goto (url , wait_until = "load" )
138- await page .wait_for_load_state ('networkidle' )
139- pdf_content = await page .pdf (format = 'A4' )
140- if action_type == 'convert_page_to_pdf' :
141- for callback in per_converted_callbacks :
142- try :
143- callback (row_id , pdf_content )
144- except Exception as e :
145- logging .exception (e )
146- except Exception as e :
147- logger .exception ('convert task: %s error: %s' , task_info , e )
148- continue
149- finally :
150- for page in self .context .pages :
151- await page .close ()
142+ pages = self .pages
143+ dtable_server_api = DTableServerAPI ('dtable-events' , dtable_uuid , dtable_server_url )
144+ for i in range (0 , len (row_ids ), pages ):
145+ tasks = []
146+ # open rows
147+ for row_id in row_ids [i : i + pages ]:
148+ url = ''
149+ if plugin_type == 'page-design' :
150+ url = DTABLE_WEB_SERVICE_URL .strip ('/' ) + '/dtable/%s/page-design/%s/row/%s/' % (uuid_str_to_36_chars (dtable_uuid ), page_id , row_id )
151+ if not url :
152+ continue
153+ url += '?access-token=%s&need_convert=%s' % (dtable_server_api .internal_access_token , 0 )
154+
155+ tasks .append (self .row_page_to_pdf (url , context , row_id , action_type , per_converted_callbacks ))
156+
157+ results = await asyncio .gather (* tasks , return_exceptions = True )
158+ for result in results :
159+ if isinstance (result , Exception ):
160+ logger .exception (result )
152161
153162 # callbacks
154163 if action_type == 'convert_page_to_pdf' :
155164 for callback in all_converted_callbacks :
156165 try :
157166 callback (table , target_column )
158167 except Exception as e :
159- logging .exception (e )
168+ logger .exception (e )
160169
161170 async def convert_without_rows (self , task_info ):
162171 dtable_uuid = task_info .get ('dtable_uuid' )
@@ -175,26 +184,24 @@ async def convert_without_rows(self, task_info):
175184 url += '?access-token=%s&need_convert=%s' % (dtable_server_api .access_token , 0 )
176185
177186 context = await self .get_context ()
187+ page = await context .new_page ()
188+ page .on ("request" , lambda request : logger .debug (f"Request: { request .method } { request .url } " ))
189+ page .on ("response" , lambda response : logger .debug (f"Response: { response .status } { response .url } " ))
190+ page .on ("console" , lambda msg : logger .debug (f"Console [{ msg .type } ]: { msg .text } " ))
178191 try :
179- page = await context .new_page ()
180- page .on ("request" , lambda request : logger .debug (f"Request: { request .method } { request .url } " ))
181- page .on ("response" , lambda response : logger .debug (f"Response: { response .status } { response .url } " ))
182- page .on ("console" , lambda msg : logger .debug (f"Console [{ msg .type } ]: { msg .text } " ))
183192 await page .goto (url , wait_until = "load" )
184- await page .wait_for_load_state ('networkidle' )
185- pdf_content = await page .pdf (format = 'A4' )
186-
187- if action_type == 'convert_document_to_pdf_and_send' :
188- for callback in per_converted_callbacks :
189- try :
190- callback (pdf_content )
191- except Exception as e :
192- logging .exception (e )
193- except Exception as e :
194- logger .exception ('convert task: %s error: %s' , task_info , e )
195- finally :
196- for page in self .context .pages :
197- await page .close ()
193+ await page .wait_for_load_state ('networkidle' , timeout = 180 * 1000 )
194+ pdf_content = await page .pdf (** get_pdf_print_options ())
195+ except TimeoutError :
196+ pdf_content = await page .pdf (** get_pdf_print_options ())
197+
198+ if action_type == 'convert_document_to_pdf_and_send' :
199+ for callback in per_converted_callbacks :
200+ try :
201+ callback (pdf_content )
202+ except Exception as e :
203+ logger .exception (e )
204+ await page .close ()
198205
199206 async def _do_convert (self , task_info ):
200207 dtable_uuid = task_info .get ('dtable_uuid' )
@@ -228,12 +235,12 @@ async def do_convert(self, task_info):
228235 except Exception as e :
229236 logger .exception (f'do convert Thread-{ self .thread_id } Exception in loop.run_until_complete - { e } ' )
230237 try :
231- if self .context :
232- await self .context .close ()
238+ await self .browser .close ()
233239 except Exception as e :
234240 logger .exception (f'do convert Thread-{ self .thread_id } close context error: { e } ' )
235241 finally :
236242 self .context = None
243+ self .browser = None
237244
238245 def run (self ):
239246 asyncio .set_event_loop (self .loop )
@@ -251,29 +258,27 @@ class ConvertPageToPDFManager:
251258 def __init__ (self ):
252259 self .max_workers = 2
253260 self .max_queue = 1000
261+ self .pages = 10
254262
255263 def init (self , config ):
256264 section_name = 'CONERT-PAGE-TO-PDF'
257265 key_max_workers = 'max_workers'
258266 key_max_queue = 'max_queue'
267+ key_pages = 'pages'
259268
260269 self .config = config
261270
262271 if config .has_section ('CONERT-PAGE-TO-PDF' ):
263- try :
264- self .max_workers = int (get_opt_from_conf_or_env (config , section_name , key_max_workers , default = self .max_workers ))
265- except :
266- pass
267- try :
268- self .max_queue = int (get_opt_from_conf_or_env (config , section_name , key_max_queue , default = self .max_queue ))
269- except :
270- pass
272+ self .max_workers = int (get_opt_from_conf_or_env (config , section_name , key_max_workers , default = self .max_workers ))
273+ self .max_queue = int (get_opt_from_conf_or_env (config , section_name , key_max_queue , default = self .max_queue ))
274+ self .pages = int (get_opt_from_conf_or_env (config , section_name , key_pages , default = self .pages ))
275+
271276 self .queue = Queue (self .max_queue ) # element in queue is a dict about task
272277
273278 def start (self ):
274- logger .debug ('convert page to pdf max workers: %s max queue: %s' , self .max_workers , self .max_queue )
279+ logger .debug ('convert page to pdf max workers: %s max queue: %s pages: %s ' , self .max_workers , self .max_queue , self . pages )
275280 for i in range (self .max_workers ):
276- t = BrowserWorker (i , self .queue )
281+ t = BrowserWorker (i , self .queue , self . pages )
277282 t .start ()
278283
279284 def add_task (self , task_info ):
0 commit comments