@@ -25,39 +25,21 @@ def get_data(extents: Extents, filename: str, storage: Storage):
2525 :param storage: :class:`silvimetric.resources.storage.Storage` database.
2626 :return: Point data array from PDAL.
2727 """
28+
29+ attrs = [a .name for a in storage .get_attributes ()]
2830 data = Data (filename , storage .config , bounds = extents .bounds )
2931 p = data .pipeline
3032 data .execute ()
31- return p .get_dataframe (0 )
32-
33-
34- def arrange (points : pd .DataFrame , leaf , attrs : list [str ]):
35- """
36- Arrange data to fit key-value TileDB input format.
37-
38- :param data: Tuple of indices and point data array (xis, yis, data).
39- :param leaf: :class:`silvimetric.resources.extents.Extent` being used.
40- :param attrs: List of attribute names.
41- :raises Exception: Missing attribute error.
42- :return: None if no work is done, or a tuple of indices and rearranged data.
43- """
44- if points is None :
45- return None
46- if points .size == 0 :
47- return None
4833
49- points = points .loc [points .Y < leaf .bounds .maxy ]
50- points = points .loc [points .Y >= leaf .bounds .miny ]
51- points = points .loc [points .X >= leaf .bounds .minx ]
52- points = points .loc [points .X < leaf .bounds .maxx , [* attrs , 'xi' , 'yi' ]]
53-
54- if points .size == 0 :
55- return None
34+ points = p .get_dataframe (0 )
35+ points = points .loc [points .Y < extents .bounds .maxy ]
36+ points = points .loc [points .Y >= extents .bounds .miny ]
37+ points = points .loc [points .X >= extents .bounds .minx ]
38+ points = points .loc [points .X < extents .bounds .maxx , [* attrs , 'xi' , 'yi' ]]
5639
5740 points .loc [:, 'xi' ] = da .floor (points .xi )
5841 # ceil for y because origin is at top left
5942 points .loc [:, 'yi' ] = da .ceil (points .yi )
60-
6143 return points
6244
6345
@@ -84,7 +66,7 @@ def agg_list(data_in, proc_num):
8466 col_dtypes = {a : o for a in data_in .columns if a not in ['xi' , 'yi' ]}
8567
8668 coerced = data_in .astype (col_dtypes | xyi_dtypes )
87- gb = coerced .groupby (['xi' , 'yi' ])
69+ gb = coerced .groupby (['xi' , 'yi' ], sort = False )
8870 listed = gb .agg (lambda x : np .array (x , old_dtypes [x .name ]))
8971 counts_df = gb [first_col_name ].agg ('count' ).rename ('count' )
9072 listed = listed .join (counts_df )
@@ -97,12 +79,6 @@ def join(list_data: pd.DataFrame, metric_data):
9779 """
9880 Join the list data and metric DataFrames together.
9981 """
100- if list_data is None or metric_data is None :
101- return None
102-
103- if isinstance (metric_data , Delayed ):
104- metric_data = metric_data .compute ()
105-
10682 return list_data .join (metric_data ).reset_index ()
10783
10884
@@ -127,38 +103,27 @@ def write(data_in, storage, timestamp):
127103Leaves = Generator [Extents , None , None ]
128104
129105
130- def get_processes (
131- leaves : Leaves , config : ShatterConfig , storage : Storage
106+ def do_one (
107+ leaf : Extents , config : ShatterConfig , storage : Storage
132108) -> db .Bag :
133109 """Create dask bags and the order of operations."""
134110
135- ## Handle dask bag transitions through work states
136- attrs = [a .name for a in storage .config .attrs ]
137111 timestamp = config .timestamp
138112
139113 # remove any extents that have already been done, only skip if full overlap
140- leaf_bag : db .Bag = db .from_sequence (leaves )
141114 if config .mbr :
115+ if not all (leaf .disjoint_by_mbr (m ) for m in config .mbr ):
116+ return 0
142117
143- def mbr_filter (one : Extents ):
144- return all (one .disjoint_by_mbr (m ) for m in config .mbr )
118+ points = get_data (leaf , config .filename , storage )
119+ if points .empty :
120+ return 0
121+ metric_data = run_graph (points , storage .get_metrics ())
122+ listed_data = agg_list (points , config .time_slot )
123+ joined_data = join (listed_data , metric_data )
124+ point_count = write (joined_data , storage , timestamp )
145125
146- leaf_bag = leaf_bag .filter (mbr_filter )
147-
148- def pc_filter (d : pd .DataFrame ):
149- if d is None :
150- return False
151- return not d .empty
152-
153- points : db .Bag = leaf_bag .map (get_data , config .filename , storage )
154- arranged : db .Bag = points .map (arrange , leaf_bag , attrs )
155- filtered : db .Bag = arranged .filter (pc_filter )
156- metrics : db .Bag = filtered .map (run_graph , storage .config .metrics )
157- lists : db .Bag = filtered .map (agg_list , config .time_slot )
158- joined : db .Bag = lists .map (join , metrics )
159- writes : db .Bag = joined .map (write , storage , timestamp )
160-
161- return writes
126+ return point_count
162127
163128
164129def run (leaves : Leaves , config : ShatterConfig , storage : Storage ) -> int :
@@ -189,7 +154,8 @@ def kill_gracefully(signum, frame):
189154
190155 signal .signal (signal .SIGINT , kill_gracefully )
191156
192- processes = get_processes (leaves , config , storage )
157+ leaf_bag : db .Bag = db .from_sequence (leaves )
158+ processes = leaf_bag .map (do_one , config , storage )
193159
194160 ## If dask is distributed, use the futures feature
195161 dc = get_client ()
0 commit comments