diff --git a/.github/workflows/1-fetch-openneuro-datasets-nemar.yml b/.github/workflows/1-fetch-openneuro-datasets-nemar.yml new file mode 100644 index 00000000..9dcf25b8 --- /dev/null +++ b/.github/workflows/1-fetch-openneuro-datasets-nemar.yml @@ -0,0 +1,130 @@ +name: Fetch OpenNeuro & NEMAR Datasets + +on: + pull_request: + branches: + - '**' + # schedule: + # # Run weekly on Monday at 00:00 UTC + # - cron: '0 0 * * 1' + workflow_dispatch: # Allow manual triggering + +jobs: + fetch-datasets: + runs-on: ubuntu-latest + permissions: + contents: write + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + token: ${{ secrets.GITHUB_TOKEN }} + ref: ${{ github.head_ref }} + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.11' + cache: 'pip' + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install gql[requests] requests + pip install -e . + + - name: Fetch OpenNeuro datasets + run: | + python scripts/ingestions/1_fetch_openneuro_datasets.py \ + --page-size 100 \ + --output consolidated/openneuro_datasets.json + + - name: Fetch NEMAR GitHub repositories + run: | + python scripts/ingestions/1_fetch_github_organization.py \ + --organization nemardatasets \ + --output consolidated/nemardatasets_repos.json + + - name: Verify OpenNeuro output + run: | + if [ -f consolidated/openneuro_datasets.json ]; then + echo "βœ“ OpenNeuro dataset file created successfully" + python -c "import json; data = json.load(open('consolidated/openneuro_datasets.json')); print(f'Total entries: {len(data)}'); modalities = set(d['modality'] for d in data); print(f'Modalities: {sorted(modalities)}')" + else + echo "βœ— OpenNeuro dataset file not created" + exit 1 + fi + + - name: Verify NEMAR output + run: | + if [ -f consolidated/nemardatasets_repos.json ]; then + echo "βœ“ NEMAR repositories file created successfully" + python -c "import json; data = json.load(open('consolidated/nemardatasets_repos.json')); print(f'Total repositories: {len(data)}'); topics = set(); [topics.update(d.get('topics', [])) for d in data]; print(f'Topics: {sorted(topics) if topics else \"None\"}')" + else + echo "βœ— NEMAR repositories file not created" + exit 1 + fi + + - name: Filter new OpenNeuro datasets + run: | + python scripts/ingestions/2_filter_new_datasets.py \ + consolidated/openneuro_datasets.json + + - name: Filter new NEMAR datasets + run: | + python scripts/ingestions/2_filter_new_datasets.py \ + consolidated/nemardatasets_repos.json + + - name: Verify filtered outputs + run: | + echo "πŸ“Š Filtering Results:" + echo "" + if [ -f consolidated/to_digest_openneuro_datasets.json ]; then + echo "βœ“ OpenNeuro filtered datasets created" + python -c "import json; data = json.load(open('consolidated/to_digest_openneuro_datasets.json')); print(f' Datasets to digest: {len(data)}')" + else + echo "βœ— OpenNeuro filtered datasets not created" + exit 1 + fi + echo "" + if [ -f consolidated/to_digest_nemardatasets_repos.json ]; then + echo "βœ“ NEMAR filtered datasets created" + python -c "import json; data = json.load(open('consolidated/to_digest_nemardatasets_repos.json')); print(f' Datasets to digest: {len(data)}')" + else + echo "βœ— NEMAR filtered datasets not created" + exit 1 + fi + + - name: Commit and push changes if datasets updated + run: | + git config user.name "github-actions[bot]" + git config user.email "41898282+github-actions[bot]@users.noreply.github.com" + + # Add all dataset files to staging + git add consolidated/openneuro_datasets.json + git add consolidated/nemardatasets_repos.json + git add consolidated/to_digest_openneuro_datasets.json + git add consolidated/to_digest_nemardatasets_repos.json + + # Check if there are actual changes (not just timestamp differences) + if git diff --cached --quiet; then + echo "No changes detected in dataset files, skipping commit" + else + echo "Changes detected, committing..." + git commit -m "chore: update OpenNeuro & NEMAR dataset listings and filtered to_digest files" + git push origin HEAD:${{ github.head_ref }} + echo "βœ“ Changes committed and pushed" + fi + + - name: Upload artifacts for downstream jobs + uses: actions/upload-artifact@v4 + with: + name: dataset-listings + path: | + consolidated/openneuro_datasets.json + consolidated/nemardatasets_repos.json + consolidated/to_digest_openneuro_datasets.json + consolidated/to_digest_nemardatasets_repos.json + retention-days: 7 diff --git a/.github/workflows/clone-openneuro-datasets.yml b/.github/workflows/clone-openneuro-datasets.yml new file mode 100644 index 00000000..a3a97800 --- /dev/null +++ b/.github/workflows/clone-openneuro-datasets.yml @@ -0,0 +1,98 @@ +name: Clone OpenNeuro Datasets + +on: + schedule: + # Run weekly on Monday at 02:00 UTC (after fetch completes) + - cron: '0 2 * * 1' + workflow_dispatch: # Allow manual triggering + # TODO: Add other triggers here as needed + +jobs: + clone-datasets: + runs-on: ubuntu-latest + timeout-minutes: 720 # 12 hours max for all clones + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.11' + + - name: Verify Python script and dataset listings + run: | + if [ ! -f scripts/ingestions/clone_openneuro_datasets.py ]; then + echo "Error: clone_openneuro_datasets.py not found" + exit 1 + fi + if [ ! -f consolidated/openneuro_datasets.json ]; then + echo "Error: consolidated/openneuro_datasets.json not found" + exit 1 + fi + DATASET_COUNT=$(jq 'length' consolidated/openneuro_datasets.json) + echo "Found $DATASET_COUNT dataset entries" + + - name: Create test_diggestion directory + run: mkdir -p test_diggestion + + - name: Clone OpenNeuro datasets + run: | + python scripts/ingestions/clone_openneuro_datasets.py \ + --output-dir test_diggestion \ + --timeout 300 \ + --datasets-file consolidated/openneuro_datasets.json + continue-on-error: true # Don't fail workflow if some clones fail + + - name: Generate clone report + if: always() + run: | + if [ -f test_diggestion/clone_results.json ]; then + echo "## Clone Results" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + jq -r '"- Success: \(.success | length)\n- Failed: \(.failed | length)\n- Timeout: \(.timeout | length)\n- Skipped: \(.skip | length)\n- Errors: \(.error | length)"' test_diggestion/clone_results.json >> $GITHUB_STEP_SUMMARY + fi + + - name: Upload clone results + if: always() + uses: actions/upload-artifact@v4 + with: + name: clone-results + path: | + test_diggestion/clone_results.json + test_diggestion/retry.json + retention-days: 30 + + - name: Create issue if clones failed + if: failure() + uses: actions/github-script@v7 + with: + script: | + const fs = require('fs'); + if (fs.existsSync('test_diggestion/clone_results.json')) { + const results = JSON.parse(fs.readFileSync('test_diggestion/clone_results.json')); + const failedCount = (results.failed || []).length + (results.timeout || []).length; + if (failedCount > 0) { + github.rest.issues.create({ + owner: context.repo.owner, + repo: context.repo.repo, + title: `⚠️ Dataset Cloning: ${failedCount} datasets failed`, + body: `Failed/timeout clones detected.\n\nSee artifacts for details: ${context.runId}`, + labels: ['ci', 'datasets'] + }); + } + } + + - name: Commit cloned datasets (optional) + if: success() + run: | + cd test_diggestion + git config --local user.email "action@github.com" + git config --local user.name "GitHub Action" + git add . + git commit -m "chore: update cloned OpenNeuro datasets" || echo "Nothing to commit" + git push + continue-on-error: true diff --git a/.gitignore b/.gitignore index 59819db3..ab7b092a 100644 --- a/.gitignore +++ b/.gitignore @@ -39,7 +39,6 @@ examples/data .DS_Store data/ -*.json *.isorted *.py.isorted diff --git a/eegdash/api.py b/eegdash/api.py index d867d267..194a0a5e 100644 --- a/eegdash/api.py +++ b/eegdash/api.py @@ -10,13 +10,15 @@ EEG data from S3 for matched records. """ +import json import os from pathlib import Path from typing import Any, Mapping import mne +import numpy as np +import pandas as pd from mne.utils import _soft_import -from pymongo import InsertOne, UpdateOne from .bids_eeg_metadata import ( build_query_from_kwargs, @@ -353,9 +355,18 @@ def _raise_if_conflicting_constraints( ) def add_bids_dataset( - self, dataset: str, data_dir: str, overwrite: bool = True - ) -> None: - """Scan a local BIDS dataset and upsert records into MongoDB. + self, + dataset: str, + data_dir: str, + overwrite: bool = True, + output_path: str | Path | None = None, + ) -> dict[str, Any]: + """Collect metadata for a local BIDS dataset as JSON-ready records. + + Instead of inserting records directly into MongoDB, this method scans + ``data_dir`` and returns a JSON-serializable manifest describing every + EEG recording that was discovered. The manifest can be written to disk + or forwarded to the EEGDash ingestion API for persistence. Parameters ---------- @@ -364,127 +375,91 @@ def add_bids_dataset( data_dir : str Path to the local BIDS dataset directory. overwrite : bool, default True - If ``True``, update existing records when encountered; otherwise, - skip records that already exist. + If ``False``, skip records that already exist in the database based + on ``data_name`` lookups. + output_path : str | Path | None, optional + If provided, the manifest is written to the given JSON file. - Raises - ------ - ValueError - If called on a public client ``(is_public=True)``. + Returns + ------- + dict + A manifest with keys ``dataset``, ``source``, ``records`` and, when + applicable, ``skipped`` or ``errors``. """ - if self.is_public: - raise ValueError("This operation is not allowed for public users") - - if not overwrite and self.exist({"dataset": dataset}): - logger.info("Dataset %s already exists in the database", dataset) - return + source_dir = Path(data_dir).expanduser() try: bids_dataset = EEGBIDSDataset( - data_dir=data_dir, + data_dir=str(source_dir), dataset=dataset, ) - except Exception as e: - logger.error("Error creating bids dataset %s: %s", dataset, str(e)) - raise e - requests = [] - for bids_file in bids_dataset.get_files(): - try: - data_id = f"{dataset}_{Path(bids_file).name}" - - if self.exist({"data_name": data_id}): - if overwrite: - eeg_attrs = load_eeg_attrs_from_bids_file( - bids_dataset, bids_file - ) - requests.append(self._update_request(eeg_attrs)) - else: - eeg_attrs = load_eeg_attrs_from_bids_file(bids_dataset, bids_file) - requests.append(self._add_request(eeg_attrs)) - except Exception as e: - logger.error("Error adding record %s", bids_file) - logger.error(str(e)) - - logger.info("Number of requests: %s", len(requests)) - - if requests: - result = self.__collection.bulk_write(requests, ordered=False) - logger.info("Inserted: %s ", result.inserted_count) - logger.info("Modified: %s ", result.modified_count) - logger.info("Deleted: %s", result.deleted_count) - logger.info("Upserted: %s", result.upserted_count) - logger.info("Errors: %s ", result.bulk_api_result.get("writeErrors", [])) - - def _add_request(self, record: dict) -> InsertOne: - """Create a MongoDB insertion request for a record. - - Parameters - ---------- - record : dict - The record to insert. - - Returns - ------- - InsertOne - A PyMongo ``InsertOne`` object. - - """ - return InsertOne(record) - - def add(self, record: dict) -> None: - """Add a single record to the MongoDB collection. - - Parameters - ---------- - record : dict - The record to add. - - """ - try: - self.__collection.insert_one(record) - except ValueError as e: - logger.error("Validation error for record: %s ", record["data_name"]) - logger.error(e) except Exception as exc: - logger.error( - "Error adding record: %s ", record.get("data_name", "") - ) - logger.debug("Add operation failed", exc_info=exc) + logger.error("Error creating BIDS dataset %s: %s", dataset, exc) + raise exc - def _update_request(self, record: dict) -> UpdateOne: - """Create a MongoDB update request for a record. + records: list[dict[str, Any]] = [] + skipped: list[str] = [] + errors: list[dict[str, str]] = [] - Parameters - ---------- - record : dict - The record to update. - - Returns - ------- - UpdateOne - A PyMongo ``UpdateOne`` object. - - """ - return UpdateOne({"data_name": record["data_name"]}, {"$set": record}) + for bids_file in bids_dataset.get_files(): + data_id = f"{dataset}_{Path(bids_file).name}" + if not overwrite: + try: + if self.exist({"data_name": data_id}): + skipped.append(data_id) + continue + except Exception as exc: + logger.warning( + "Could not verify existing record %s due to: %s", + data_id, + exc, + ) - def update(self, record: dict) -> None: - """Update a single record in the MongoDB collection. + try: + eeg_attrs = load_eeg_attrs_from_bids_file(bids_dataset, bids_file) + records.append(eeg_attrs) + except Exception as exc: # log and continue collecting + logger.error("Error extracting metadata for %s", bids_file) + logger.error(str(exc)) + errors.append({"file": str(bids_file), "error": str(exc)}) + + manifest: dict[str, Any] = { + "dataset": dataset, + "source": str(source_dir.resolve()), + "record_count": len(records), + "records": records, + } + if skipped: + manifest["skipped"] = skipped + if errors: + manifest["errors"] = errors + + if output_path is not None: + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open("w", encoding="utf-8") as fh: + json.dump( + manifest, + fh, + indent=2, + sort_keys=True, + default=_json_default, + ) + logger.info( + "Wrote EEGDash ingestion manifest for %s to %s", + dataset, + output_path, + ) - Parameters - ---------- - record : dict - Record content to set at the matching ``data_name``. + logger.info( + "Prepared %s records for dataset %s (skipped=%s, errors=%s)", + len(records), + dataset, + len(skipped), + len(errors), + ) - """ - try: - self.__collection.update_one( - {"data_name": record["data_name"]}, {"$set": record} - ) - except Exception as exc: # log and continue - logger.error( - "Error updating record: %s", record.get("data_name", "") - ) - logger.debug("Update operation failed", exc_info=exc) + return manifest def exists(self, query: dict[str, Any]) -> bool: """Check if at least one record matches the query. @@ -504,35 +479,6 @@ def exists(self, query: dict[str, Any]) -> bool: """ return self.exist(query) - def remove_field(self, record: dict, field: str) -> None: - """Remove a field from a specific record in the MongoDB collection. - - Parameters - ---------- - record : dict - Record-identifying object with a ``data_name`` key. - field : str - The name of the field to remove. - - """ - self.__collection.update_one( - {"data_name": record["data_name"]}, {"$unset": {field: 1}} - ) - - def remove_field_from_db(self, field: str) -> None: - """Remove a field from all records in the database. - - .. warning:: - This is a destructive operation and cannot be undone. - - Parameters - ---------- - field : str - The name of the field to remove from all documents. - - """ - self.__collection.update_many({}, {"$unset": {field: 1}}) - @property def collection(self): """The underlying PyMongo ``Collection`` object. @@ -545,26 +491,38 @@ def collection(self): """ return self.__collection - def close(self) -> None: - """Close the MongoDB connection. - - .. deprecated:: 0.1 - Connections are now managed globally by :class:`MongoConnectionManager`. - This method is a no-op and will be removed in a future version. - Use :meth:`EEGDash.close_all_connections` to close all clients. - """ - # Individual instances no longer close the shared client - pass - @classmethod def close_all_connections(cls) -> None: """Close all MongoDB client connections managed by the singleton manager.""" MongoConnectionManager.close_all() - def __del__(self) -> None: - """Destructor; no explicit action needed due to global connection manager.""" - # No longer needed since we're using singleton pattern + +def _json_default(value: Any) -> Any: + """Fallback serializer for complex objects when exporting ingestion JSON.""" + try: + if isinstance(value, (np.generic,)): + return value.item() + if isinstance(value, np.ndarray): + return value.tolist() + except Exception: pass + try: + if value is pd.NA: + return None + if isinstance(value, (pd.Timestamp, pd.Timedelta)): + return value.isoformat() + if isinstance(value, pd.Series): + return value.to_dict() + except Exception: + pass + + if isinstance(value, Path): + return value.as_posix() + if isinstance(value, set): + return sorted(value) + + raise TypeError(f"Object of type {type(value).__name__} is not JSON serializable") + __all__ = ["EEGDash"] diff --git a/eegdash/dataset/bids_dataset.py b/eegdash/dataset/bids_dataset.py index 1788765e..7e7c1685 100644 --- a/eegdash/dataset/bids_dataset.py +++ b/eegdash/dataset/bids_dataset.py @@ -16,14 +16,26 @@ import pandas as pd from mne_bids import BIDSPath, find_matching_paths +from mne_bids.config import ALLOWED_DATATYPE_EXTENSIONS, EPHY_ALLOWED_DATATYPES, reader + +# Known companion/sidecar files for specific formats (BIDS spec requirement) +# These files must be downloaded together with the primary file +_COMPANION_FILES = { + ".set": [".fdt"], # EEGLAB: data file + ".vhdr": [".eeg", ".vmrk"], # BrainVision: data + marker files +} class EEGBIDSDataset: - """An interface to a local BIDS dataset containing EEG recordings. + """An interface to a local BIDS dataset containing electrophysiology recordings. This class centralizes interactions with a BIDS dataset on the local filesystem, providing methods to parse metadata, find files, and - retrieve BIDS-related information. + retrieve BIDS-related information. Supports multiple modalities including + EEG, MEG, iEEG, and NIRS. + + The class uses MNE-BIDS constants to stay synchronized with the BIDS + specification and automatically supports all file formats recognized by MNE. Parameters ---------- @@ -31,28 +43,65 @@ class EEGBIDSDataset: The path to the local BIDS dataset directory. dataset : str A name for the dataset (e.g., "ds002718"). + allow_symlinks : bool, default False + If True, accept broken symlinks (e.g., git-annex) for metadata extraction. + If False, require actual readable files for data loading. + Set to True when doing metadata digestion without loading raw data. + modalities : list of str or None, default None + List of modalities to search for (e.g., ["eeg", "meg"]). + If None, defaults to all electrophysiology modalities from MNE-BIDS: + ['meg', 'eeg', 'ieeg', 'nirs']. + + Attributes + ---------- + RAW_EXTENSIONS : dict + Mapping of file extensions to their companion files, dynamically + built from mne_bids.config.reader. + files : list of str + List of all recording file paths found in the dataset. + detected_modality : str + The modality of the first file found (e.g., 'eeg', 'meg'). + + Examples + -------- + >>> # Load EEG-only dataset + >>> dataset = EEGBIDSDataset( + ... data_dir="/path/to/ds002718", + ... dataset="ds002718", + ... modalities=["eeg"] + ... ) + + >>> # Load dataset with multiple modalities + >>> dataset = EEGBIDSDataset( + ... data_dir="/path/to/ds005810", + ... dataset="ds005810", + ... modalities=["meg", "eeg"] + ... ) + + >>> # Metadata extraction from git-annex (symlinks) + >>> dataset = EEGBIDSDataset( + ... data_dir="/path/to/dataset", + ... dataset="ds000001", + ... allow_symlinks=True + ... ) """ - ALLOWED_FILE_FORMAT = ["eeglab", "brainvision", "biosemi", "european"] + # Dynamically build from MNE-BIDS constants (mne_bids.config.reader) + # reader dict maps file extensions to MNE read functions + # This ensures compatibility with the latest BIDS specification + + # Primary extension + companions = files that must be downloaded together RAW_EXTENSIONS = { - ".set": [".set", ".fdt"], # eeglab - ".edf": [".edf"], # european - ".vhdr": [".eeg", ".vhdr", ".vmrk", ".dat", ".raw"], # brainvision - ".bdf": [".bdf"], # biosemi + ext: [ext] + _COMPANION_FILES.get(ext, []) for ext in reader.keys() } - METADATA_FILE_EXTENSIONS = [ - "eeg.json", - "channels.tsv", - "electrodes.tsv", - "events.tsv", - "events.json", - ] def __init__( self, data_dir=None, # location of bids dataset dataset="", # dataset name + allow_symlinks=False, # allow broken symlinks for digestion + modalities=None, # list of modalities to search for (e.g., ["eeg", "meg", "ieeg"]) ): if data_dir is None or not os.path.exists(data_dir): raise ValueError("data_dir must be specified and must exist") @@ -60,6 +109,15 @@ def __init__( self.bidsdir = Path(data_dir) self.dataset = dataset self.data_dir = data_dir + self.allow_symlinks = allow_symlinks + + # Set modalities to search for (default: all electrophysiology modalities from MNE-BIDS) + if modalities is None: + self.modalities = EPHY_ALLOWED_DATATYPES # ['meg', 'eeg', 'ieeg', 'nirs'] + else: + self.modalities = ( + modalities if isinstance(modalities, list) else [modalities] + ) # Accept exact dataset folder or a variant with informative suffixes # (e.g., dsXXXXX-bdf, dsXXXXX-bdf-mini) to avoid collisions. @@ -74,9 +132,12 @@ def __init__( # get all recording files in the bids directory assert len(self.files) > 0, ValueError( - "Unable to construct EEG dataset. No EEG recordings found." + f"Unable to construct dataset. No recordings found for modalities: {self.modalities}" ) - assert self.check_eeg_dataset(), ValueError("Dataset is not an EEG dataset.") + # Store the detected modality for later use + self.detected_modality = self.get_bids_file_attribute( + "modality", self.files[0] + ).lower() def check_eeg_dataset(self) -> bool: """Check if the BIDS dataset contains EEG data. @@ -87,25 +148,37 @@ def check_eeg_dataset(self) -> bool: True if the dataset's modality is EEG, False otherwise. """ - return self.get_bids_file_attribute("modality", self.files[0]).lower() == "eeg" + return self.detected_modality == "eeg" def _init_bids_paths(self) -> None: """Initialize BIDS file paths using mne_bids for fast discovery. Uses mne_bids.find_matching_paths() for efficient pattern-based file - discovery instead of heavy pybids BIDSLayout indexing. + discovery. Falls back to manual glob search if needed. + + When allow_symlinks=True, includes broken symlinks (e.g., git-annex) + for metadata extraction without requiring actual data files. + + Searches across multiple modalities (eeg, meg, ieeg) based on self.modalities. """ # Initialize cache for BIDSPath objects self._bids_path_cache = {} - # Find all EEG recordings using pattern matching (fast!) + # Find all recordings across specified modalities + # Use MNE-BIDS constants to get valid extensions per modality self.files = [] - for ext in self.RAW_EXTENSIONS.keys(): - # find_matching_paths returns BIDSPath objects - paths = find_matching_paths(self.bidsdir, datatypes="eeg", extensions=ext) - if paths: - # Convert BIDSPath objects to filename strings - self.files = [str(p.fpath) for p in paths] + for modality in self.modalities: + for ext in ALLOWED_DATATYPE_EXTENSIONS.get(modality, []): + found_files = _find_bids_files( + self.bidsdir, + ext, + modalities=[modality], + allow_symlinks=self.allow_symlinks, + ) + if found_files: + self.files = found_files + break + if self.files: break def _get_bids_path_from_file(self, data_filepath: str): @@ -127,8 +200,17 @@ def _get_bids_path_from_file(self, data_filepath: str): filepath = Path(data_filepath) filename = filepath.name + # Detect modality from the directory path + # BIDS structure: .../sub-XX/[ses-YY/]/sub-XX_... + path_parts = filepath.parts + modality = "eeg" # default + for part in path_parts: + if part in ["eeg", "meg", "ieeg", "emg"]: + modality = part + break + # Extract entities from filename using BIDS pattern - # Expected format: sub-