Skip to content

Commit fea27bb

Browse files
committed
made timestamp usage more consistent, fixed grid metrics to use copies,
made info and get_history have fewer required parameters
1 parent 0a6d382 commit fea27bb

File tree

17 files changed

+188
-227
lines changed

17 files changed

+188
-227
lines changed

src/silvimetric/cli/cli.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,12 +150,15 @@ def info_cmd(
150150

151151
start_date = dates[0] if dates else date
152152
end_date = dates[1] if dates else date
153+
if start_date is None and end_date is None:
154+
timestamp=None
155+
else:
156+
timestamp = tuple(start_date.timestamp(), end_date.timestamp())
153157

154158
i = info.info(
155159
app.tdb_dir,
156160
bounds=bounds,
157-
start_time=start_date,
158-
end_time=end_date,
161+
timestamp=timestamp,
159162
name=name,
160163
concise=True,
161164
)
@@ -170,7 +173,6 @@ def info_cmd(
170173
]
171174

172175
i['metadata'].pop('metrics')
173-
# print(metrics.keys())
174176
if any([history, metadata, attributes, metrics]):
175177
filtered = {}
176178
if history:

src/silvimetric/commands/info.py

Lines changed: 2 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -6,48 +6,9 @@
66
from typing import Optional
77

88

9-
def check_values(
10-
start_time: datetime,
11-
end_time: datetime,
12-
bounds: Bounds,
13-
name: Union[UUID, str],
14-
):
15-
"""
16-
Validate arguments for info command.
17-
18-
:param start_time: Starting datetime object.
19-
:param end_time: Ending datetime object.
20-
:param bounds: Bounds to query by.
21-
:param name: Name to query by.
22-
:raises TypeError: Incorrect type of start_time argument.
23-
:raises TypeError: Incorrect type of end_time argument.
24-
:raises TypeError: Incorrect type of bounds argument.
25-
:raises TypeError: Incorrect type of name argument.
26-
:raises TypeError: Incorrect type of name argument.
27-
:meta public:
28-
"""
29-
if start_time is not None and not isinstance(start_time, datetime):
30-
raise TypeError('Incorrect type of "start_time" argument.')
31-
if end_time is not None and not isinstance(end_time, datetime):
32-
raise TypeError('Incorrect type of "end_time" argument.')
33-
if bounds is not None and not isinstance(bounds, Bounds):
34-
raise TypeError('Incorrect type of "bounds" argument.')
35-
if name is not None:
36-
if isinstance(name, UUID):
37-
pass
38-
elif isinstance(name, str):
39-
try:
40-
UUID(name)
41-
except Exception as e:
42-
raise TypeError('Incorrect type of "name" argument.') from e
43-
else:
44-
raise TypeError('Incorrect type of "name" argument.')
45-
46-
479
def info(
4810
tdb_dir: str,
49-
start_time: Optional[datetime] = None,
50-
end_time: Optional[datetime] = None,
11+
timestamp: Optional[tuple[int, int]] = None,
5112
bounds: Optional[Bounds] = None,
5213
name: Optional[Union[str, UUID]] = None,
5314
concise: Optional[bool] = False,
@@ -62,13 +23,7 @@ def info(
6223
:param name: Name query, defaults to None
6324
:return: Returns json object containing information on database.
6425
"""
65-
check_values(start_time, end_time, bounds, name)
66-
6726
with Storage.from_db(tdb_dir) as tdb:
68-
if start_time is None:
69-
start_time = datetime.fromtimestamp(0)
70-
if end_time is None:
71-
end_time = datetime.now()
7227
if bounds is None:
7328
bounds = tdb.config.root
7429
if name is not None:
@@ -85,10 +40,7 @@ def info(
8540
}
8641

8742
try:
88-
history = tdb.get_history(
89-
start_time, end_time, bounds, name, concise
90-
)
91-
43+
history = tdb.get_history(timestamp, bounds, name, concise)
9244
if bool(history) and isinstance(history, list):
9345
history = [h for h in history]
9446
elif bool(history):

src/silvimetric/commands/scan.py

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -47,51 +47,53 @@ def scan(
4747
)
4848
logger.debug(json.dumps(thresholds, indent=2))
4949

50-
with ProgressBar():
51-
extents = Extents.from_sub(tdb_dir, data.bounds)
52-
logger.info('Gathering initial chunks...')
53-
count = dask.delayed(data.estimate_count)(extents.bounds).persist()
54-
cell_counts = extent_handle(
55-
extents, data, resolution, point_count, depth, log
56-
)
50+
extents = Extents.from_sub(tdb_dir, data.bounds)
51+
logger.info('Gathering initial chunks...')
52+
count = dask.delayed(data.estimate_count)(extents.bounds).persist()
53+
cell_counts = extent_handle(
54+
extents, data, resolution, point_count, depth, log
55+
)
5756

58-
np_cell_counts = np.array(cell_counts)
59-
num_cells = np.sum(np_cell_counts).item()
57+
np_cell_counts = np.array(cell_counts)
58+
num_cells = np.sum(np_cell_counts).item()
59+
if np_cell_counts.size > 1:
6060
q1, q3 = np.percentile(np_cell_counts, [25,75])
6161
iqr = q3 - q1
6262
low_bounds = q1 - (1.5 * iqr)
6363
up_bounds = q3 + (1.5 * iqr)
6464

6565
adjusted = np_cell_counts[np_cell_counts > low_bounds]
6666
adjusted = adjusted[adjusted < up_bounds]
67-
68-
std = np.std(adjusted)
69-
mean = np.mean(adjusted)
70-
median = np.median(adjusted)
71-
rec = median
72-
73-
pc_info = dict(
74-
pc_info=dict(
75-
storage_bounds=tdb.config.root.to_json(),
76-
data_bounds=data.bounds.to_json(),
77-
count=dask.compute(count),
78-
)
67+
else:
68+
adjusted = np_cell_counts
69+
70+
std = np.std(adjusted)
71+
mean = np.mean(adjusted)
72+
median = np.median(adjusted)
73+
rec = median
74+
75+
pc_info = dict(
76+
pc_info=dict(
77+
storage_bounds=tdb.config.root.to_json(),
78+
data_bounds=data.bounds.to_json(),
79+
count=dask.compute(count),
7980
)
80-
tiling_info = dict(
81-
tile_info=dict(
82-
num_cells=num_cells,
83-
num_tiles=len(cell_counts),
84-
mean=mean,
85-
std_dev=std,
86-
median=median,
87-
recommended=rec,
88-
)
81+
)
82+
tiling_info = dict(
83+
tile_info=dict(
84+
num_cells=num_cells,
85+
num_tiles=len(cell_counts),
86+
mean=mean,
87+
std_dev=std,
88+
median=median,
89+
recommended=rec,
8990
)
91+
)
9092

91-
final_info = pc_info | tiling_info
92-
logger.info(json.dumps(final_info, indent=2))
93+
final_info = pc_info | tiling_info
94+
logger.info(json.dumps(final_info, indent=2))
9395

94-
return final_info
96+
return final_info
9597

9698

9799
def extent_handle(

src/silvimetric/commands/shatter.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ def final(
3030
config.finished = finished
3131
storage.save_shatter_meta(config)
3232

33-
persist(delayed(storage.consolidate)(config.timestamp))
34-
compute(delayed(storage.vacuum)())
33+
storage.consolidate(config.timestamp)
34+
storage.vacuum()
3535

3636

3737
def get_data(extents: Extents, filename: str, storage: Storage) -> pd.DataFrame:
@@ -44,16 +44,16 @@ def get_data(extents: Extents, filename: str, storage: Storage) -> pd.DataFrame:
4444
:return: Point data array from PDAL.
4545
"""
4646

47-
attrs = [a.name for a in storage.get_attributes()]
47+
attrs = [*[a.name for a in storage.get_attributes()], 'xi', 'yi']
4848
data = Data(filename, storage.config, bounds=extents.bounds)
4949
p = data.pipeline
50-
data.execute()
50+
data.execute(allowed_dims = [*attrs, 'X', 'Y'])
5151

5252
points = p.get_dataframe(0)
5353
points = points.loc[points.Y < extents.bounds.maxy]
5454
points = points.loc[points.Y >= extents.bounds.miny]
5555
points = points.loc[points.X >= extents.bounds.minx]
56-
points = points.loc[points.X < extents.bounds.maxx, [*attrs, 'xi', 'yi']]
56+
points = points.loc[points.X < extents.bounds.maxx][attrs]
5757

5858
points.loc[:, 'xi'] = np.floor(points.xi)
5959
# ceil for y because origin is at top left
@@ -125,21 +125,20 @@ def write(
125125
return p
126126

127127

128+
@delayed
128129
def do_one(leaf: Extents, config: ShatterConfig, storage: Storage) -> db.Bag:
129130
"""Create dask bags and the order of operations."""
130131

131-
timestamp = config.timestamp
132-
133132
# remove any extents that have already been done, only skip if full overlap
134133
if config.mbr:
135134
if not all(leaf.disjoint_by_mbr(m) for m in config.mbr):
136135
return 0
137136

138-
points = delayed(get_data)(leaf, config.filename, storage)
139-
listed_data = delayed(agg_list)(points, config.time_slot)
140-
metric_data = delayed(run_graph)(points, storage.get_metrics())
141-
joined_data = delayed(join)(listed_data, metric_data)
142-
point_count = delayed(write)(joined_data, storage, timestamp)
137+
points = get_data(leaf, config.filename, storage)
138+
listed_data = agg_list(points, config.time_slot)
139+
metric_data = run_graph(points, storage.get_metrics())
140+
joined_data = join(listed_data, metric_data)
141+
point_count = write(joined_data, storage, config.timestamp)
143142

144143
return point_count
145144

@@ -157,6 +156,9 @@ def run(leaves: Leaves, config: ShatterConfig, storage: Storage) -> int:
157156
:return: Number of points processed.
158157
"""
159158
## If dask is distributed, use the futures feature
159+
leaves = [delayed(leaf) for leaf in leaves]
160+
storage = delayed(storage)
161+
160162
processes = [do_one(leaf, config, storage) for leaf in leaves]
161163
dc = get_client()
162164
if dc is not None:
@@ -221,19 +223,19 @@ def shatter(config: ShatterConfig) -> int:
221223
storage.save_shatter_meta(config)
222224

223225
config.log.debug('Grabbing leaf nodes.')
224-
es = extents.chunk(data, pc_threshold=(15 * 10**6))
226+
es = extents.chunk(data, pc_threshold=(11 * 10**6))
225227

226228
for e in es:
227229
if config.tile_size is not None:
228230
leaves = e.get_leaf_children(config.tile_size)
229231
else:
230-
leaves = itertools.chain(e.chunk(data))
232+
leaves = e.chunk(data, pc_threshold=(600*10**3))
231233

232234
# Begin main operations
233235
config.log.debug('Shattering.')
234236
try:
235237
run(leaves, config, storage)
236-
persist(delayed(storage.consolidate)(config.timestamp))
238+
storage.consolidate(config.timestamp)
237239
except Exception as e:
238240
final(config, storage)
239241
raise e

0 commit comments

Comments
 (0)