3030from PyQt5 .QtGui import QTextCursor
3131from PyQt5 .QtCore import QObject , pyqtSignal
3232
33+ # pip install dask distributed
34+ import time
35+ from typing import Optional , Sequence , Tuple
36+
37+ from PyQt5 .QtCore import QObject , pyqtSignal , QThread # PyQt6: use PyQt6.QtCore
38+ from dask .distributed import Client
39+
40+ # Helper to normalize log records across dask versions
41+ def _format_sched_record (rec ) -> str :
42+ try :
43+ level , msg , ts = rec [0 ], rec [1 ], rec [2 ]
44+ return f"[SCHEDULER] { ts } { level } : { msg } "
45+ except Exception :
46+ return f"[SCHEDULER] { rec } "
47+
48+ def _format_worker_record (worker : str , item ) -> str :
49+ # item might be (ts, msg), (ts, level, msg), or (level, msg)
50+ try :
51+ if len (item ) == 3 and isinstance (item [0 ], (int , float , str )):
52+ ts , level , msg = item
53+ return f"[{ worker } ] { ts } { level } : { msg } "
54+ elif len (item ) == 2 :
55+ a , b = item
56+ # Heuristic: timestamp first
57+ if isinstance (a , (int , float , str )) and not isinstance (b , (int , float )):
58+ return f"[{ worker } ] { a } : { b } "
59+ else :
60+ return f"[{ worker } ] { a } { b } "
61+ except Exception :
62+ pass
63+ return f"[{ worker } ] { item } "
64+
65+ class DaskLogWorker (QObject ):
66+ # Emits batches of log lines to append
67+ log_lines = pyqtSignal (list )
68+ error = pyqtSignal (str )
69+ connected = pyqtSignal (str ) # emits dashboard link when ready
70+
71+ def __init__ (self , scheduler_address : str , poll_interval_ms : int = 1000 ,
72+ worker_tail : int = 2000 , parent = None ):
73+ super ().__init__ (parent )
74+ self .address = scheduler_address
75+ self .poll_interval_ms = max (200 , poll_interval_ms )
76+ self .worker_tail = worker_tail
77+ self ._running = False
78+ self ._client : Optional [Client ] = None
79+ self ._seen_counts : dict [str , int ] = {}
80+
81+ def start (self ):
82+ """Entry point wired to QThread.started"""
83+ try :
84+ # Create the Client *in this thread* so its IO loop lives here.
85+ self ._client = Client (self .address , set_as_default = False , asynchronous = False )
86+ # Let the UI show a link if desired (useful during dev)
87+ if getattr (self ._client , "dashboard_link" , None ):
88+ self .connected .emit (self ._client .dashboard_link )
89+ except Exception as e :
90+ self .error .emit (f"Failed to connect Client: { e !r} " )
91+ return
92+
93+ self ._running = True
94+ # First fetch primes scheduler logs; worker logs can be large so use tails
95+ while self ._running :
96+ try :
97+ lines : list [str ] = []
98+
99+ # --- Scheduler logs ---
100+ try :
101+ sched_logs : Sequence [Tuple ] = self ._client .get_scheduler_logs () # type: ignore
102+ for rec in sched_logs :
103+ lines .append (_format_sched_record (rec ))
104+ except Exception as e :
105+ # Older/newer versions may differ; keep going
106+ self .error .emit (f"Scheduler logs error: { e !r} " )
107+
108+ # --- Worker logs (tail only) ---
109+ try :
110+ # n=tail only supported on newer versions; fallback handled below
111+ worker_logs = self ._client .get_worker_logs (n = self .worker_tail ) # type: ignore
112+ except TypeError :
113+ worker_logs = self ._client .get_worker_logs () # type: ignore
114+
115+ # worker_logs is {worker_name: list_of_records}
116+ for worker , records in worker_logs .items ():
117+ seen = self ._seen_counts .get (worker , 0 )
118+ # If we requested tail, the list is already short; still show only new items
119+ new = records [seen :] if seen < len (records ) else []
120+ if new :
121+ for item in new :
122+ lines .append (_format_worker_record (worker , item ))
123+ self ._seen_counts [worker ] = seen + len (new )
124+
125+ if lines :
126+ self .log_lines .emit (lines )
127+
128+ except Exception as e :
129+ self .error .emit (f"Polling error: { e !r} " )
130+
131+ # Sleep without blocking the GUI thread
132+ time .sleep (self .poll_interval_ms / 1000.0 )
133+
134+ # Cleanup
135+ try :
136+ if self ._client is not None :
137+ self ._client .close ()
138+ except Exception :
139+ pass
140+
141+ def stop (self ):
142+ self ._running = False
143+
144+ from PyQt5 .QtWidgets import QTextEdit
145+
146+ class DaskLogsPanel (QTextEdit ):
147+ def __init__ (self , scheduler_address : str , parent = None ):
148+ super ().__init__ (parent )
149+ self .setReadOnly (True )
150+ self ._thread = QThread (self ) # owns the worker thread
151+ self ._worker = DaskLogWorker (scheduler_address )
152+
153+ # Wire signals
154+ self ._worker .log_lines .connect (self ._append_lines )
155+ self ._worker .error .connect (self ._append_error )
156+ self ._worker .connected .connect (self ._on_connected )
157+
158+ # Move worker to thread and start
159+ self ._worker .moveToThread (self ._thread )
160+ self ._thread .started .connect (self ._worker .start )
161+ self ._thread .start ()
162+
163+ def _append_lines (self , lines : list [str ]):
164+ # Append efficiently
165+ self .append ("\n " .join (lines ))
166+
167+ def _append_error (self , msg : str ):
168+ self .append (f"<span style='color:#b00;'>[ERROR] { msg } </span>" )
169+
170+ def _on_connected (self , dash_link : str ):
171+ self .append (f"<i>Connected. Dashboard: { dash_link } </i>" )
172+
173+ def closeEvent (self , event ):
174+ try :
175+ self ._worker .stop ()
176+ self ._thread .quit ()
177+ self ._thread .wait (2000 )
178+ finally :
179+ super ().closeEvent (event )
180+
33181class EmittingStream (QObject ):
34182 text_written = pyqtSignal (str )
35183
@@ -87,7 +235,6 @@ def create_raw_data_tab(self):
87235 # Text area to display raw data
88236 self .raw_data_display = QTextEdit ()
89237 self .raw_data_display .setReadOnly (True )
90- layout .addWidget (self .raw_data_display )
91238
92239 tab .setLayout (layout )
93240 return tab
@@ -103,7 +250,7 @@ def load_raw_data(self):
103250 patcher = belib .translators .LabViewH5Patcher ()
104251 patcher .translate (filename )
105252 reader = sr .Usid_reader (filename )
106- self .beps_raw = reader .read ()[:5 ,: 5 ,:,:,:]#TODO: temporary for testing only!!
253+ self .beps_raw = reader .read ()[:30 ,: 30 ,:,:,:]
107254 self .freq_axis = self .beps_raw .labels .index ('Frequency (Hz)' )
108255 self .freq_vec = self .beps_raw ._axes [self .freq_axis ].values
109256 self .all_dims = np .arange (len (self .beps_raw .shape ))
@@ -210,6 +357,7 @@ def create_sho_tab(self):
210357
211358 main_layout .addLayout (controls_layout , 1 )
212359 main_layout .addLayout (self .right_layout , 2 )
360+ main_layout = main_layout
213361
214362 tab .setLayout (main_layout )
215363 return tab
@@ -277,10 +425,21 @@ def on_do_guess(self):
277425 guess_fn = SHOestimateGuess ,ind_dims = self .ind_dims ,
278426 threads = 1 , return_cov = False , return_fit = True , return_std = False ,
279427 km_guess = kmeans_guess ,num_fit_parms = 4 , n_clus = num_clusters )
280-
428+ address = self .fitter .client .dashboard_link
429+ #start outputting the logs of the Dask client here
430+ self ._output_dask_client_logs (address )
281431 self .fitter .do_guess ()
282432 self .do_fit_button .setEnabled (True )
283433
434+ def _output_dask_client_logs (self , address ):
435+ """
436+ Start outputting the code for the stuff here
437+ """
438+ import webbrowser
439+ # This will open the default system browser at that address:
440+ webbrowser .open (address )
441+ return
442+
284443 def append_output_text (self , text ):
285444 """Append new text to the output box and scroll to the end."""
286445 self .output_box .moveCursor (QTextCursor .End ) # Move to the end before inserting
0 commit comments