1515from dask import compute , persist
1616from dask .distributed import futures_of , as_completed
1717
18- from .. import Extents , Storage , Data , ShatterConfig
18+ from .. import Extents , Storage , Data , ShatterConfig , Metric
1919from ..resources .taskgraph import Graph
2020
2121
@@ -35,7 +35,7 @@ def final(
3535 storage .vacuum ()
3636
3737
38- def get_data (extents : Extents , filename : str , storage : Storage ):
38+ def get_data (extents : Extents , filename : str , storage : Storage ) -> pd . DataFrame :
3939 """
4040 Execute pipeline and retrieve point cloud data for this extent
4141
@@ -62,7 +62,7 @@ def get_data(extents: Extents, filename: str, storage: Storage):
6262 return points
6363
6464
65- def run_graph (data_in , metrics ) :
65+ def run_graph (data_in : pd . DataFrame , metrics : list [ Metric ]) -> pd . DataFrame :
6666 """
6767 Run DataFrames through metric processes
6868 """
@@ -71,13 +71,16 @@ def run_graph(data_in, metrics):
7171 return graph .run (data_in )
7272
7373
74- def agg_list (data_in , proc_num ) :
74+ def agg_list (data_in : pd . DataFrame , proc_num : int ) -> pd . DataFrame :
7575 """
7676 Make variable-length point data attributes into lists
7777 """
7878 if data_in is None :
7979 return None
8080
81+ if data_in .empty :
82+ return data_in .set_index (['xi' , 'yi' ])
83+
8184 old_dtypes = data_in .dtypes
8285 xyi_dtypes = {'xi' : np .float64 , 'yi' : np .float64 }
8386 o = np .dtype ('O' )
@@ -96,14 +99,16 @@ def agg_list(data_in, proc_num):
9699 return listed
97100
98101
99- def join (list_data : pd .DataFrame , metric_data ) :
102+ def join (list_data : pd .DataFrame , metric_data : pd . DataFrame ) -> pd . DataFrame :
100103 """
101104 Join the list data and metric DataFrames together.
102105 """
103106 return list_data .join (metric_data ).reset_index ()
104107
105108
106- def write (data_in , storage , timestamp ):
109+ def write (
110+ data_in : pd .DataFrame , storage : Storage , timestamp : tuple [int , int ]
111+ ) -> int :
107112 """
108113 Write cell data to database
109114
@@ -141,6 +146,8 @@ def do_one(leaf: Extents, config: ShatterConfig, storage: Storage) -> db.Bag:
141146
142147
143148Leaves = Generator [Extents , None , None ]
149+
150+
144151def run (leaves : Leaves , config : ShatterConfig , storage : Storage ) -> int :
145152 """
146153 Coordinate running of shatter process and handle any interruptions
@@ -172,9 +179,7 @@ def run(leaves: Leaves, config: ShatterConfig, storage: Storage) -> int:
172179 # Handle non-distributed dask scenarios
173180 results = compute (* processes )
174181 pcs = [
175- possible_pc
176- for possible_pc in results
177- if possible_pc is not None
182+ possible_pc for possible_pc in results if possible_pc is not None
178183 ]
179184 pc = sum (pcs )
180185 config .point_count = config .point_count + pc
@@ -217,7 +222,7 @@ def shatter(config: ShatterConfig) -> int:
217222 storage .save_shatter_meta (config )
218223
219224 config .log .debug ('Grabbing leaf nodes.' )
220- es = extents .chunk (data , pc_threshold = (15 * 10 ** 6 ))
225+ es = extents .chunk (data , pc_threshold = (15 * 10 ** 6 ))
221226
222227 for e in es :
223228 if config .tile_size is not None :
0 commit comments