Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions kwcoco/cli/coco_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class CocoStatsCLI(scfg.DataConfig):
annot_attrs = scfg.Value(False, isflag=True, help='show annotation attribute information')
image_attrs = scfg.Value(False, isflag=True, help='show image attribute information')
video_attrs = scfg.Value(False, isflag=True, help='show video attribute information')
channels = scfg.Value(False, isflag=True, help='show channel and sensor information')
io_workers = scfg.Value(0, help=ub.paragraph(
'''
number of workers when reading multiple kwcoco files
Expand Down Expand Up @@ -195,6 +196,17 @@ def main(cls, cmdline=True, **kw):
if human_readable:
print('hist(annot_attrs) = {}'.format(ub.urepr(attrs, nl=1)))

if config['channels']:
if human_readable:
print('Channel and sensor stats')
stat_types['channels'] = {}
for dset in datasets:
channel_info = _coco_channel_stats(dset)
stat_types['channels'][dset.tag] = channel_info
if human_readable:
rich_print('dset.tag = {!r}'.format(dset.tag))
rich_print(ub.urepr(channel_info, nl=2, sort=0))

if config['boxes']:
if human_readable:
print('Box stats')
Expand Down Expand Up @@ -331,6 +343,84 @@ def main(cls, cmdline=True, **kw):
# print(ub.urepr(dset.boxsize_stats(), nl=-1, precision=2))


def _coco_channel_stats(coco_dset):
"""
Return information about which channels and sensors are available.

This is a streamlined version of the richer geowatch stats, focused on
generic kwcoco datasets.

Example:
>>> import kwcoco
>>> from kwcoco.cli.coco_stats import _coco_channel_stats
>>> dset = kwcoco.CocoDataset()
>>> dset.add_category('a')
>>> gid1 = dset.add_image(file_name='img1.tif', sensor_coarse='S1', width=1, height=1)
>>> gid2 = dset.add_image(file_name='img2.tif', sensor_coarse='S2', width=1, height=1)
>>> dset.add_asset(gid=gid1, file_name='a1.tif', channels='red,green', width=1, height=1)
>>> dset.add_asset(gid=gid1, file_name='a2.tif', channels='blue', width=1, height=1)
>>> dset.add_asset(gid=gid2, file_name='b1.tif', channels='red,green', width=1, height=1)
>>> dset.add_asset(gid=gid2, file_name='b2.tif', channels='nir', width=1, height=1)
>>> info = _coco_channel_stats(dset)
>>> assert info['sensor_hist'] == {'S1': 1, 'S2': 1}
>>> assert info['chan_hist']['blue,red,green,unknown-chan'] == 1
>>> assert info['chan_hist']['nir,red,green,unknown-chan'] == 1
>>> assert info['common_channels'] == '<FusedChannelSpec(red|green|unknown-chan)>'
>>> assert info['all_channels'] == '<FusedChannelSpec(blue|red|green|unknown-chan|nir)>'
"""
import kwcoco
from kwcoco.coco_image import CocoImage

sensor_hist = ub.ddict(int)
chan_hist = ub.ddict(int)
single_chan_hist = ub.ddict(int)
sensorchan_hist = ub.ddict(lambda: ub.ddict(int))
sensorchan_hist2 = ub.ddict(int)
for _gid, img in coco_dset.index.imgs.items():
coco_img: CocoImage = coco_dset.coco_image(_gid)
channels = []
for obj in coco_img.iter_asset_objs():
channels.append(obj.get('channels', 'unknown-chan'))
channels = sorted(channels)
chan = ','.join(channels)
sensor = img.get('sensor_coarse', '*')
chan_hist[chan] += 1
sensor_hist[sensor] += 1
sensorchan_hist[sensor][chan] += 1
sensorchan = f'{sensor}:({chan})'
sensorchan_hist2[sensorchan] += 1

for single_chan in kwcoco.ChannelSpec(chan).unique():
single_chan_hist[single_chan] += 1

CS = kwcoco.ChannelSpec
FS = kwcoco.FusedChannelSpec
osets = [CS.coerce(c).fuse().to_oset() for c in chan_hist]
if len(osets) == 0:
common_channels = FS.coerce([])
all_channels = FS.coerce([])
all_sensorchan = kwcoco.SensorChanSpec.coerce('')
else:
common_channels = FS.coerce(list(ub.oset.intersection(*osets))).concise()
all_channels = FS.coerce(list(ub.oset.union(*osets))).concise()
all_sensorchan = kwcoco.SensorChanSpec.late_fuse(*[
kwcoco.SensorChanSpec.coerce(s)
for s in sensorchan_hist2.keys()]).concise()

info = {
'single_chan_hist': {k: int(v) for k, v in single_chan_hist.items()},
'chan_hist': {k: int(v) for k, v in chan_hist.items()},
'sensor_hist': {k: int(v) for k, v in sensor_hist.items()},
'sensorchan_hist': {k: {k2: int(v2) for k2, v2 in v.items()}
for k, v in sensorchan_hist.items()},
'sensorchan_hist2': {k: int(v) for k, v in sensorchan_hist2.items()},
'common_channels': str(common_channels),
'all_channels': str(all_channels),
'all_sensorchan': str(all_sensorchan),
}
return info


def _dataset_disk_usage(dset):
"""
Compute disk usage of all image assets referenced by this dataset.
Expand Down