Skip to content

Commit 546587d

Browse files
committed
moving around usage of dask, attempting to speed up some long running methods
1 parent cc33818 commit 546587d

File tree

9 files changed

+144
-152
lines changed

9 files changed

+144
-152
lines changed

src/silvimetric/commands/scan.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def scan(
5555
count = dask.delayed(data.estimate_count)(extents.bounds).persist()
5656

5757
if filter_empty:
58-
chunks = extents.chunk(data, resolution, point_count, depth)
58+
chunks = extents.chunk(data, point_count)
5959
cell_counts = [ch.cell_count for ch in chunks]
6060

6161
else:

src/silvimetric/commands/shatter.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import dask.array as da
1818
import dask.bag as db
1919
from dask.diagnostics import ProgressBar
20+
from dask import persist, compute
2021

2122
from .. import Extents, Storage, Data, ShatterConfig
2223
from ..resources.taskgraph import Graph
@@ -159,26 +160,30 @@ def kill_gracefully(signum, frame):
159160
signal.signal(signal.SIGINT, kill_gracefully)
160161

161162
# leaf_bag: db.Bag = db.from_sequence(leaves)
163+
# processes = leaf_bag.map(do_one, config, storage)
162164
processes = [delayed(do_one)(leaf, config, storage) for leaf in leaves]
163165

164166
## If dask is distributed, use the futures feature
165167
dc = get_client()
166168
consolidate_count = 30
167169
count = 0
168170
if dc is not None:
169-
pc_futures = futures_of(processes.persist())
171+
pc_futures = futures_of(persist(processes))
170172
for batch in as_completed(pc_futures, with_results=True).batches():
171173
for _, pack in batch:
172174
if isinstance(pack, CancelledError):
173175
continue
176+
if isinstance(pack, int):
177+
pack = [pack]
174178
for pc in pack:
175179
if isinstance(pc, BaseException):
176-
print('Error found: ', pc)
177-
else:
180+
config.log.warning('Worker returned exception: ', pc)
181+
if isinstance(pc, int):
178182
count += 1
179183
if count >= consolidate_count:
180184
faf = dc.submit(
181-
storage.consolidate_shatter(config.timestamp)
185+
storage.consolidate_shatter,
186+
timestamp=config.timestamp,
182187
)
183188
fire_and_forget(faf)
184189
count = 0
@@ -192,7 +197,7 @@ def kill_gracefully(signum, frame):
192197
else:
193198
# Handle non-distributed dask scenarios
194199
with ProgressBar():
195-
point_count = sum(processes)
200+
point_count = sum(*compute(processes))
196201

197202
return point_count
198203

@@ -229,11 +234,8 @@ def shatter(config: ShatterConfig) -> int:
229234
if config.tile_size is not None:
230235
leaves = extents.get_leaf_children(config.tile_size)
231236
else:
232-
leaves = extents.chunk(
233-
data, res_threshold=storage.config.resolution, depth_threshold=30
234-
)
235-
leaf_count = len(leaves)
236-
config.log.debug(f'{leaf_count} tiles to process.')
237+
chunks = extents.chunk(data, pc_threshold=600000)
238+
leaves = db.from_sequence(chunks).compute()
237239

238240
# Begin main operations
239241
config.log.debug('Fetching and arranging data...')

src/silvimetric/resources/extents.py

Lines changed: 54 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,7 @@ def disjoint(self, other):
9494
def chunk(
9595
self,
9696
data: Data,
97-
res_threshold=100,
9897
pc_threshold=600000,
99-
depth_threshold=6,
10098
):
10199
"""
102100
Split up a dataset into tiles based on the given thresholds. Unlike Scan
@@ -131,34 +129,57 @@ def chunk(
131129

132130
if self.bounds == self.root:
133131
self.root = chunk.bounds
132+
yield from chunk.filter(data, pc_threshold)
134133

135-
filtered = []
136-
curr = db.from_delayed(
137-
[
138-
dask.delayed(ch.filter)(
139-
data, res_threshold, pc_threshold, depth_threshold, 1
140-
)
141-
for ch in chunk.split()
142-
]
143-
)
144-
curr_depth = 1
134+
def filter(
135+
self,
136+
data: Data,
137+
pc_threshold=600000,
138+
prev_estimate=0,
139+
):
140+
"""
141+
Creates quad tree of chunks for this bounds, runs pdal quickinfo over
142+
this to determine if there are any points available. Uses a bottom
143+
resolution of 1km.
144+
145+
:param data: Data object containing point cloud details.
146+
:param res_threshold: Resolution threshold., defaults to 100
147+
:param pc_threshold: Point count threshold., defaults to 600000
148+
:param depth_threshold: Tree depth threshold., defaults to 6
149+
:param depth: Current tree depth., defaults to 0
150+
:return: Returns a list of Extents.
151+
"""
145152

146-
logger = data.storageconfig.log
147-
while curr.npartitions > 0:
148-
logger.debug(
149-
f'Filtering {curr.npartitions} tiles at depth {curr_depth}'
150-
)
151-
n = curr.compute()
152-
to_add = [ne for ne in n if isinstance(ne, Extents)]
153-
if to_add:
154-
filtered = filtered + to_add
153+
pc = data.estimate_count(self.bounds)
155154

156-
curr = db.from_delayed(
157-
[ne for ne in n if not isinstance(ne, Extents)]
158-
)
159-
curr_depth += 1
155+
target_pc = pc_threshold
156+
minx, miny, maxx, maxy = self.bounds.get()
160157

161-
return filtered
158+
# is it empty?
159+
if not pc:
160+
yield self
161+
else:
162+
# has it hit the threshold yet?
163+
area = (maxx - minx) * (maxy - miny)
164+
next_split_x = (maxx - minx) / 2
165+
next_split_y = (maxy - miny) / 2
166+
167+
# if the next split would put our area below the resolution, or if
168+
# the point count is less than the point threshold then use this
169+
# tile as the work unit.
170+
if next_split_x < self.resolution or next_split_y < self.resolution:
171+
yield self
172+
elif pc <= target_pc:
173+
yield self
174+
elif pc == prev_estimate:
175+
yield self
176+
else:
177+
for ch in self.split():
178+
yield from ch.filter(
179+
data,
180+
pc_threshold,
181+
prev_estimate=pc
182+
)
162183

163184
def split(self):
164185
"""
@@ -202,63 +223,6 @@ def split(self):
202223
]
203224
return exts
204225

205-
def filter(
206-
self,
207-
data: Data,
208-
res_threshold=100,
209-
pc_threshold=600000,
210-
depth_threshold=6,
211-
depth=0,
212-
):
213-
"""
214-
Creates quad tree of chunks for this bounds, runs pdal quickinfo over
215-
this to determine if there are any points available. Uses a bottom
216-
resolution of 1km.
217-
218-
:param data: Data object containing point cloud details.
219-
:param res_threshold: Resolution threshold., defaults to 100
220-
:param pc_threshold: Point count threshold., defaults to 600000
221-
:param depth_threshold: Tree depth threshold., defaults to 6
222-
:param depth: Current tree depth., defaults to 0
223-
:return: Returns a list of Extents.
224-
"""
225-
226-
pc = data.estimate_count(self.bounds)
227-
target_pc = pc_threshold
228-
minx, miny, maxx, maxy = self.bounds.get()
229-
230-
# is it empty?
231-
if not pc:
232-
return []
233-
else:
234-
# has it hit the threshold yet?
235-
area = (maxx - minx) * (maxy - miny)
236-
next_split_x = (maxx - minx) / 2
237-
next_split_y = (maxy - miny) / 2
238-
239-
# if the next split would put our area below the resolution, or if
240-
# the point count is less than the threshold (600k) then use this
241-
# tile as the work unit.
242-
if next_split_x < self.resolution or next_split_y < self.resolution:
243-
return [self]
244-
elif pc < target_pc:
245-
return [self]
246-
elif area < res_threshold**2 or depth >= depth_threshold:
247-
pc_per_cell = pc / (area / self.resolution**2)
248-
cell_estimate = ceil(target_pc / pc_per_cell)
249-
250-
return self.get_leaf_children(cell_estimate)
251-
else:
252-
return [
253-
dask.delayed(ch.filter)(
254-
data,
255-
res_threshold,
256-
pc_threshold,
257-
depth_threshold,
258-
depth=depth + 1,
259-
)
260-
for ch in self.split()
261-
]
262226

263227
def _find_dims(self, tile_size):
264228
"""
@@ -308,15 +272,13 @@ def get_leaf_children(self, tile_size):
308272
coords_list = np.array(
309273
[[*x, *y] for x in dx for y in dy], dtype=np.float64
310274
)
311-
yield from [
312-
Extents(
313-
Bounds(minx, miny, maxx, maxy),
314-
self.resolution,
315-
self.alignment,
316-
self.root,
317-
)
318-
for minx, maxx, miny, maxy in coords_list
319-
]
275+
for minx, maxx, miny, maxy in coords_list:
276+
yield Extents(
277+
Bounds(minx, miny, maxx, maxy),
278+
self.resolution,
279+
self.alignment,
280+
self.root,
281+
)
320282

321283
@staticmethod
322284
def from_storage(tdb_dir: str):

src/silvimetric/resources/metrics/l_moments.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ def lmom4(data, *args):
1313
idx = np.arange(n)
1414

1515
# sort in descending order
16-
data = np.sort(data.reshape(n))[::-1]
16+
data = np.sort(data, kind='quickstort')[::-1]
1717

1818
b0 = data.mean()
1919
l1: float = b0

0 commit comments

Comments
 (0)