Skip to content

Commit d70def6

Browse files
authored
Merge pull request #328 from zooniverse/edit-batch-agg
BatchAgg Edits: May 2025
2 parents 85a20a8 + 575af66 commit d70def6

File tree

4 files changed

+149
-79
lines changed

4 files changed

+149
-79
lines changed

docs/user_guide.rst

Lines changed: 70 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,15 @@ Uploading non-image media types
5858
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
5959

6060
If you wish to upload subjects with non-image media (e.g. audio or video),
61-
you will need to make sure you have the ``libmagic`` library installed. If you
62-
don't already have ``libmagic``, please see the `dependency information for
63-
python-magic <https://github.com/ahupp/python-magic#dependencies>`_ for more
64-
details.
61+
it is desirable to have the ``libmagic`` library installed for type detection.
62+
If you don't already have ``libmagic``, please see the `dependency information
63+
for python-magic <https://github.com/ahupp/python-magic#installation>`_ for
64+
more details.
65+
66+
If `libmagic` is not installed, assignment of MIME types (e.g., image/jpeg,
67+
video/mp4, text/plain, application/json, etc) will be based on file extensions.
68+
Be aware that if file names and extension aren't accurate, this could lead to
69+
issues when the media is loaded.
6570

6671
Usage Examples
6772
--------------
@@ -414,58 +419,97 @@ You can also pass an optional `new_subject_set_name` parameter and this would be
414419

415420
Project(project_id).copy(new_subject_set_name='My New Subject Set')
416421

417-
Programmatic Data Exports
418-
~~~~~~~~~~~~~~~~~~~~~~~~~
422+
Data Exports
423+
~~~~~~~~~~~~
419424
The Panoptes Python Client allows you to generate, describe, and download data exports (e.g., classifications, subjects, workflows) via the Python ``panoptes_client`` library.
420425

421-
Multiple types of exports can be generated using the Python Client, including project-level products (classifications, subjects, workflows) as smaller scale classification exports (for workflows and subject sets).
426+
Multiple types of exports can be generated using the Python Client, including project-level products (classifications, subjects, workflows) and smaller scale classification exports (for workflows and subject sets).
422427
For the examples below, we will demonstrate commands for a project wide classifications export, but these functions work for any export type.
423428

424429
**Get Exports**
425430

426-
As the name implies, this method downloads a data export over HTTP. This uses the `get_export` method and can be called by passing in the following parameters::
431+
As the name implies, this method downloads a data export over HTTP. This uses the `get_export` method and can be called by passing in the following parameters:
427432

428-
export_type #string specifying which type of export should be downloaded
433+
* *export_type*: string specifying which type of export should be downloaded.
434+
* *generate*: a boolean specifying whether to generate a new export and wait for it to be ready, or to just download the latest existing export. Default is False.
435+
* *wait*: a boolean specifying whether to wait for an in-progress export to finish, if there is one. Has no effect if `generate` is true (wait will occur in this case). Default is False.
436+
* *wait_timeout*: the number of seconds to wait if `wait` is True or `generate` is True. Has no effect if `wait` and `generate` are both False. Default is None (wait indefinetly).
429437

430-
generate #a boolean specifying if to generate a new export and wait for it to be ready, or to just download the latest existing export
438+
Examples::
431439

432-
wait #a boolean specifying whether to wait for an in-progress export to finish, if there is one. Has no effect if generate is true.
440+
# Fetch existing export
441+
classification_export = Project(1234).get_export('classifications')
433442

434-
wait_timeout #is the number of seconds to wait if wait is True. Has no effect if wait is False or if generate is True.
443+
# Generate export, wait indefinetly for result to complete
444+
classification_export = Project(1234).get_export('classifications', generate=True)
435445

436-
classification_export = Project(project_id).get_export(export_type="classifications")
446+
# Fetch export currently being processed, wait up to 600 seconds for export to complete
447+
classification_export = Project(1234).get_export('classifications', wait=True, wait_timeout=600)
437448

438449
The returned Response object has two additional attributes as a convenience for working with the CSV content; `csv_reader` and `csv_dictreader`, which are wrappers for `csv.reader()` and `csv.DictReader` respectively.
439450
These wrappers take care of correctly decoding the export content for the CSV parser::
440451

441452
classification_export = Project(1234).get_export('classifications')
442453
for row in classification_export.csv_dictreader():
443-
print(row)
454+
print(row)
444455

445456
**Generate Exports**
446457

447458
As the name implies, this method generates/starts a data export. This uses the `generate_export` method and can be called by passing in the `export_type` parameter::
448459

449-
export_info = Project(project_id).generate_export(export_type='classifications')
460+
export_info = Project(1234).generate_export('classifications')
461+
462+
This kick off the export generation process and returns `export_info` as a dictionary containing the metadata on the selected export.
450463

451-
This would return `export_info` as a dictionary containing the metadata on the selected export
464+
**Describing Exports**
452465

453-
**Wait Exports**
466+
This method fetches information/metadata about a specific type of export. This uses the `describe_export` method and can be called by passing the `export_type` (e.g., classifications, subjects) this way::
454467

455-
As the name implies, this method blocks/waits until an in-progress export is ready. It uses the `wait_export` method and can be called passing the following parameters::
468+
export_info = Project(1234).describe_export('classifications')
456469

457-
export_type #string specifying which type of export should be downloaded
470+
This would return `export_info` as a dictionary containing the metadata on the selected export.
458471

459-
timeout #is the maximum number of seconds to wait.
472+
Subject Set Classification Exports
473+
++++++++++++++++++++++++++++++++++
460474

461-
export_info = Project(project_id).wait_export(export_type='classifications')
475+
As mentioned above, it is possible to request a classifications export for project, workflow, or subject set scope.
476+
For the subject set classification export, classifications are included in the export if they satisfy two selection criteria:
462477

463-
This would return `export_info` as a dictionary containing the metadata on the selected export and throw a `PanoptesAPIException` once the time limit is exceeded and the export is not ready
478+
1. The subject referenced in the classification is a member of the relevant subject set.
479+
2. The relevant subject set is currently linked to the workflow referenced in the classification.
464480

465-
**Describing Exports**
481+
Example Usage::
482+
483+
# For a SubjectSet, check which Workflows to which it is currently linked
484+
subject_set = SubjectSet.find(1234)
485+
for wf in subject_set.links.workflows:
486+
print(wf.id, wf.display_name)
487+
488+
# Generate Export
489+
subject_set_classification_export = subject_set.get_export('classifications', generate=True)
490+
491+
Automated Aggregation of Classifications
492+
++++++++++++++++++++++++++++++++++++++++
493+
494+
The Zooniverse supports research teams by maintaining the ``panoptes_aggregation`` Python package
495+
(see `docs <https://aggregation-caesar.zooniverse.org/docs>`_ and `repo <https://github.com/zooniverse/aggregation-for-caesar>`_).
496+
This software requires local installation to run, which can be a deterrent for its use.
497+
As an alternative to installing and running this aggregation code, we provide a Zooniverse-hosted service for producing aggregated results for simple datasets.
498+
This "batch aggregation" feature is built to perform simple workflow-level data aggregation that uses baseline extractors and reducers without any custom configuration.
499+
Please see :py:meth:`.Workflow.run_aggregation` and :py:meth:`.Workflow.get_batch_aggregation_links` docstrings for full details.
500+
501+
Example Usage::
502+
503+
# Generate input data exports: workflow-level classification export and project-level workflows export
504+
Workflow(1234).generate_export('classification')
505+
Project(2345).generate_export('workflows')
466506

467-
This method fetches information/metadata about a specific type of export. This uses the `describe_export` method and can be called by passing in the export_type(classifications, subject_sets) this way::
507+
# Request batch aggregation data product
508+
Workflow(1234).run_aggregation()
468509

469-
export_info = Project(project_id).describe_export(export_type='classifications')
510+
# Fetch batch aggregation download URLs
511+
urls = Workflow(1234).get_batch_aggregation_links()
512+
print(urls)
470513

471-
This would return `export_info` as a dictionary containing the metadata on the selected export
514+
# Load Reductions CSV using Pandas
515+
pd.read_csv(urls['reductions'])

panoptes_client/panoptes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
if os.environ.get('PANOPTES_DEBUG'):
2222
logging.basicConfig(level=logging.DEBUG)
2323
else:
24-
logging.basicConfig()
24+
logging.basicConfig(level=logging.INFO)
2525

2626

2727
class Panoptes(object):

panoptes_client/tests/test_workflow.py

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -216,57 +216,66 @@ def setUp(self):
216216
self.instance = Workflow(1)
217217
self.mock_user_id = 1
218218

219-
def _mock_aggregation(self):
220-
mock_aggregation = MagicMock()
221-
mock_aggregation.object_count = 1
222-
mock_aggregation.next = MagicMock(return_value=MagicMock(id=1))
223-
return mock_aggregation
224-
225219
@patch.object(Aggregation, 'where')
226-
@patch.object(Aggregation, 'find')
227-
def test_run_aggregation_with_user_object(self, mock_find, mock_where):
228-
mock_where.return_value = self._mock_aggregation()
229-
220+
def test_run_aggregation_existing(self, mock_where):
230221
mock_current_agg = MagicMock()
231-
mock_find.return_value = mock_current_agg
222+
mock_current_agg.delete = MagicMock()
223+
224+
mock_aggregations = MagicMock()
225+
mock_aggregations.object_count = 1
226+
mock_aggregations.__next__.return_value = mock_current_agg
227+
mock_where.return_value = mock_aggregations
232228

233229
result = self.instance.run_aggregation(self.mock_user_id, False)
234230

231+
mock_current_agg.delete.assert_not_called()
235232
self.assertEqual(result, mock_current_agg)
236233

237-
@patch.object(Aggregation, 'find')
238234
@patch.object(Aggregation, 'where')
239235
@patch.object(Aggregation, 'save')
240-
def test_run_aggregation_with_delete_if_true(self, mock_save, mock_where, mock_find):
241-
mock_where.return_value = self._mock_aggregation()
242-
236+
def test_run_aggregation_existing_and_delete(self, mock_save, mock_where):
243237
mock_current_agg = MagicMock()
244238
mock_current_agg.delete = MagicMock()
245-
mock_find.return_value = mock_current_agg
246239

247-
mock_save_func = MagicMock()
240+
mock_aggregations = MagicMock()
241+
mock_aggregations.object_count = 1
242+
mock_aggregations.__next__.return_value = mock_current_agg
243+
mock_where.return_value = mock_aggregations
248244

245+
mock_save_func = MagicMock()
249246
mock_save.return_value = mock_save_func()
250-
self.instance.run_aggregation(self.mock_user_id, True)
251247

252-
mock_current_agg.delete.assert_called_once()
248+
result = self.instance.run_aggregation(self.mock_user_id, True)
253249

250+
mock_current_agg.delete.assert_called_once()
254251
mock_save_func.assert_called_once()
252+
self.assertNotEqual(result, mock_current_agg)
255253

256-
@patch.object(Workflow, 'get_batch_aggregations')
257-
def test_get_agg_property(self, mock_get_batch_aggregations):
254+
@patch.object(Aggregation, 'where')
255+
def test_get_batch_aggregation(self, mock_where):
256+
mock_current_agg = MagicMock()
257+
mock_aggregations = MagicMock()
258+
mock_aggregations.__next__.return_value = mock_current_agg
259+
mock_where.return_value = mock_aggregations
260+
261+
result = self.instance.get_batch_aggregation()
262+
263+
self.assertEqual(result, mock_current_agg)
264+
265+
@patch.object(Aggregation, 'where')
266+
def test_get_batch_aggregation_failure(self, mock_where):
267+
mock_where.return_value = iter([])
268+
269+
with self.assertRaises(PanoptesAPIException):
270+
self.instance.get_batch_aggregation()
271+
272+
@patch.object(Workflow, 'get_batch_aggregation')
273+
def test_get_agg_property(self, mock_get_batch_aggregation):
258274
mock_aggregation = MagicMock()
259275
mock_aggregation.test_property = 'returned_test_value'
260276

261-
mock_get_batch_aggregations.return_value = iter([mock_aggregation])
277+
mock_get_batch_aggregation.return_value = mock_aggregation
262278

263279
result = self.instance._get_agg_property('test_property')
264280

265281
self.assertEqual(result, 'returned_test_value')
266-
267-
@patch.object(Workflow, 'get_batch_aggregations')
268-
def test_get_agg_property_failed(self, mock_get_batch_aggregations):
269-
mock_get_batch_aggregations.return_value = iter([])
270-
271-
with self.assertRaises(PanoptesAPIException):
272-
self.instance._get_agg_property('test_property')

panoptes_client/workflow.py

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@
55
from panoptes_client.subject_workflow_status import SubjectWorkflowStatus
66

77
from panoptes_client.exportable import Exportable
8-
from panoptes_client.panoptes import PanoptesObject, LinkResolver, PanoptesAPIException
8+
from panoptes_client.panoptes import Panoptes, PanoptesObject, LinkResolver, PanoptesAPIException
99
from panoptes_client.subject import Subject
1010
from panoptes_client.subject_set import SubjectSet
1111
from panoptes_client.utils import batchable
1212

1313
from panoptes_client.caesar import Caesar
1414
from panoptes_client.user import User
1515
from panoptes_client.aggregation import Aggregation
16-
import six
16+
import logging
1717

1818
class Workflow(PanoptesObject, Exportable):
1919
_api_slug = 'workflows'
@@ -536,39 +536,52 @@ def run_aggregation(self, user=None, delete_if_exists=False):
536536
"""
537537
This method will start a new batch aggregation run, Will return a dict with the created aggregation if successful.
538538
539-
- **user** can be either a :py:class:`.User` or an ID.
540-
- **delete_if_exists** parameter is optional if true, deletes any previous instance
541-
-
539+
- **user** can be either a :py:class:`.User` or an ID. Defaults to logged in user if not set.
540+
- **delete_if_exists** parameter is optional; if true, deletes any previous instance.
541+
542542
Examples::
543543
544-
Workflow(1234).run_aggregation(1234)
544+
Workflow(1234).run_aggregation()
545545
Workflow(1234).run_aggregation(user=1234, delete_if_exists=True)
546546
"""
547547

548548
if(isinstance(user, User)):
549549
_user_id = user.id
550550
elif (isinstance(user, (int, str,))):
551551
_user_id = user
552+
elif User.me():
553+
_user_id = User.me().id
552554
else:
553-
raise TypeError('Invalid user parameter')
555+
raise TypeError('Invalid user parameter. Provide user ID or login.')
554556

555557
try:
556-
workflow_aggs = self.get_batch_aggregations()
558+
workflow_aggs = Aggregation.where(workflow_id=self.id)
557559
if workflow_aggs.object_count > 0:
558-
agg_id = workflow_aggs.next().id
559-
current_wf_agg = Aggregation.find(agg_id)
560+
current_wf_agg = next(workflow_aggs)
560561
if delete_if_exists:
561562
current_wf_agg.delete()
562563
return self._create_agg(_user_id)
563564
else:
565+
logging.getLogger('panoptes_client').info(
566+
'Aggregation exists for Workflow {}. '.format(self.id) +
567+
'Set delete_if_exists to True to create new aggregation.'
568+
)
564569
return current_wf_agg
565570
else:
566571
return self._create_agg(_user_id)
567572
except PanoptesAPIException as err:
568573
raise err
569574

570-
def get_batch_aggregations(self):
571-
return Aggregation.where(workflow_id=self.id)
575+
def get_batch_aggregation(self):
576+
"""
577+
This method will fetch existing aggregation resource, if any.
578+
"""
579+
try:
580+
return next(Aggregation.where(workflow_id=self.id))
581+
except StopIteration:
582+
raise PanoptesAPIException(
583+
'Could not find Aggregation for Workflow {}'.format(self.id)
584+
)
572585

573586
def _create_agg(self, user_id):
574587
new_agg = Aggregation()
@@ -578,25 +591,29 @@ def _create_agg(self, user_id):
578591
return new_agg
579592

580593
def _get_agg_property(self, param):
581-
try:
582-
aggs = self.get_batch_aggregations()
583-
return getattr(six.next(aggs), param, None)
584-
except StopIteration:
585-
raise PanoptesAPIException(
586-
"Could not find Aggregations for Workflow with id='{}'".format(self.id)
587-
)
594+
return getattr(self.get_batch_aggregation(), param, None)
588595

589-
def check_batch_aggregation_run_status(self):
596+
def get_batch_aggregation_status(self):
590597
"""
591-
This method will fetch existing aggregation status if any.
598+
This method will fetch existing aggregation status, if any.
592599
"""
593600
return self._get_agg_property('status')
594601

595602
def get_batch_aggregation_links(self):
596603
"""
597-
This method will fetch existing aggregation links if any.
604+
This method will fetch existing aggregation links, if any.
605+
606+
Data product options, returned as dictionary of type/URL key-value pairs:
607+
1. reductions: subject-level reductions results CSV
608+
2. aggregation: a ZIP file containing all inputs (workflow-level classification export, project-level workflows export) and outputs (extracts, reductions)
598609
"""
599-
return self._get_agg_property('uuid')
610+
uuid = self._get_agg_property('uuid')
611+
aggregation_url = 'https://aggregationdata.blob.core.windows.net'
612+
env = 'production'
613+
if Panoptes.client().endpoint == 'https://panoptes-staging.zooniverse.org':
614+
env = 'staging'
615+
return {'reductions': f'{aggregation_url}/{env}/{uuid}/{self.id}_reductions.csv',
616+
'aggregation': f'{aggregation_url}/{env}/{uuid}/{self.id}_aggregation.zip'}
600617

601618
@property
602619
def versions(self):

0 commit comments

Comments
 (0)