diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..66349c8 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,124 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +.idea/* + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ + +# Visual Code +.vscode/ + +# terraform +.terraform/ +*.lock.hcl +*.tfstate +*.tfstate.* + + +# local dev stuff +.devcontainer/ +*.ipynb +*.rdb +/protobuf* + +# Git +.git/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index f6a2aac..1b9a916 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,9 +1,8 @@ -FROM tiangolo/uwsgi-nginx-flask:python3.7 +ARG PYTHON_VERSION=3.11 +FROM tiangolo/uwsgi-nginx-flask:python${PYTHON_VERSION} ENV GIT_SSL_NO_VERIFY=1 RUN mkdir -p /home/nginx/.cloudvolume/secrets && chown -R nginx /home/nginx && usermod -d /home/nginx -s /bin/bash nginx COPY . /app -RUN pip install pip==20.0.1 \ - && pip install --no-cache-dir --upgrade -r requirements.txt \ - && pip install --upgrade git+https://github.com/seung-lab/KVDbClient.git@main \ No newline at end of file +RUN pip install --no-cache-dir --upgrade -r requirements.txt \ No newline at end of file diff --git a/cloudbuild.v1.ingest.yaml b/cloudbuild.v1.ingest.yaml index 675ef5f..b57784a 100644 --- a/cloudbuild.v1.ingest.yaml +++ b/cloudbuild.v1.ingest.yaml @@ -7,23 +7,23 @@ steps: timeout: 600s args: - build - - "--tag=gcr.io/neuromancer-seung-import/pcgl2cache:ingest_v1_$TAG_NAME" - - "--file=./ingest.Dockerfile" + - "--tag=gcr.io/$PROJECT_ID/pcgl2cache:ingest_v1" + - "--file=./ingest.v1.Dockerfile" - . - name: "gcr.io/cloud-builders/docker" entrypoint: "bash" args: [ "-c", - "docker tag gcr.io/$PROJECT_ID/pcgl2cache:ingest_v1_$TAG_NAME $$USERNAME/pcgl2cache:ingest_v1_$TAG_NAME", + "docker tag gcr.io/$PROJECT_ID/pcgl2cache:ingest_v1 $$USERNAME/pcgl2cache:ingest_v1", ] secretEnv: ["USERNAME"] - name: "gcr.io/cloud-builders/docker" entrypoint: "bash" - args: ["-c", "docker push $$USERNAME/pcgl2cache:ingest_v1_$TAG_NAME"] + args: ["-c", "docker push $$USERNAME/pcgl2cache:ingest_v1"] secretEnv: ["USERNAME"] images: - - "gcr.io/neuromancer-seung-import/pcgl2cache:ingest_v1_$TAG_NAME" + - "gcr.io/$PROJECT_ID/pcgl2cache:ingest_v1" availableSecrets: secretManager: - versionName: projects/$PROJECT_ID/secrets/docker-password/versions/1 diff --git a/cloudbuild.v1.worker.yaml b/cloudbuild.v1.worker.yaml index f766188..4bd9bec 100644 --- a/cloudbuild.v1.worker.yaml +++ b/cloudbuild.v1.worker.yaml @@ -7,8 +7,8 @@ steps: timeout: 600s args: - build - - "--tag=gcr.io/neuromancer-seung-import/pcgl2cache:worker_pcgv1_$TAG_NAME" - - "--file=./ingest.Dockerfile" + - "--tag=gcr.io/$PROJECT_ID/pcgl2cache:worker_pcgv1_$TAG_NAME" + - "--file=./ingest.v1.Dockerfile" - . - name: "gcr.io/cloud-builders/docker" entrypoint: "bash" @@ -23,7 +23,7 @@ steps: args: ["-c", "docker push $$USERNAME/pcgl2cache:worker_pcgv1_$TAG_NAME"] secretEnv: ["USERNAME"] images: - - "gcr.io/neuromancer-seung-import/pcgl2cache:worker_pcgv1_$TAG_NAME" + - "gcr.io/$PROJECT_ID/pcgl2cache:worker_pcgv1_$TAG_NAME" availableSecrets: secretManager: - versionName: projects/$PROJECT_ID/secrets/docker-password/versions/1 diff --git a/cloudbuild.v2.ingest.yaml b/cloudbuild.v2.ingest.yaml new file mode 100644 index 0000000..b139577 --- /dev/null +++ b/cloudbuild.v2.ingest.yaml @@ -0,0 +1,32 @@ +steps: + - name: "gcr.io/cloud-builders/docker" + entrypoint: "bash" + args: ["-c", "docker login --username=$$USERNAME --password=$$PASSWORD"] + secretEnv: ["USERNAME", "PASSWORD"] + - name: "gcr.io/cloud-builders/docker" + timeout: 600s + args: + - build + - "--tag=gcr.io/$PROJECT_ID/pcgl2cache:ingest_v2" + - "--file=./ingest.v2.Dockerfile" + - . + - name: "gcr.io/cloud-builders/docker" + entrypoint: "bash" + args: + [ + "-c", + "docker tag gcr.io/$PROJECT_ID/pcgl2cache:ingest_v2 $$USERNAME/pcgl2cache:ingest_v2", + ] + secretEnv: ["USERNAME"] + - name: "gcr.io/cloud-builders/docker" + entrypoint: "bash" + args: ["-c", "docker push $$USERNAME/pcgl2cache:ingest_v2"] + secretEnv: ["USERNAME"] +images: + - "gcr.io/$PROJECT_ID/pcgl2cache:ingest_v2" +availableSecrets: + secretManager: + - versionName: projects/$PROJECT_ID/secrets/docker-password/versions/1 + env: "PASSWORD" + - versionName: projects/$PROJECT_ID/secrets/docker-username/versions/1 + env: "USERNAME" diff --git a/cloudbuild.v2.worker.yaml b/cloudbuild.v2.worker.yaml new file mode 100644 index 0000000..fa47360 --- /dev/null +++ b/cloudbuild.v2.worker.yaml @@ -0,0 +1,32 @@ +steps: + - name: "gcr.io/cloud-builders/docker" + entrypoint: "bash" + args: ["-c", "docker login --username=$$USERNAME --password=$$PASSWORD"] + secretEnv: ["USERNAME", "PASSWORD"] + - name: "gcr.io/cloud-builders/docker" + timeout: 600s + args: + - build + - "--tag=gcr.io/$PROJECT_ID/pcgl2cache:worker_pcgv2_$TAG_NAME" + - "--file=./ingest.v2.Dockerfile" + - . + - name: "gcr.io/cloud-builders/docker" + entrypoint: "bash" + args: + [ + "-c", + "docker tag gcr.io/$PROJECT_ID/pcgl2cache:worker_pcgv2_$TAG_NAME $$USERNAME/pcgl2cache:worker_pcgv2_$TAG_NAME", + ] + secretEnv: ["USERNAME"] + - name: "gcr.io/cloud-builders/docker" + entrypoint: "bash" + args: ["-c", "docker push $$USERNAME/pcgl2cache:worker_pcgv2_$TAG_NAME"] + secretEnv: ["USERNAME"] +images: + - "gcr.io/$PROJECT_ID/pcgl2cache:worker_pcgv2_$TAG_NAME" +availableSecrets: + secretManager: + - versionName: projects/$PROJECT_ID/secrets/docker-password/versions/1 + env: "PASSWORD" + - versionName: projects/$PROJECT_ID/secrets/docker-username/versions/1 + env: "USERNAME" diff --git a/ingest.Dockerfile b/ingest.Dockerfile deleted file mode 100644 index 38bf837..0000000 --- a/ingest.Dockerfile +++ /dev/null @@ -1,10 +0,0 @@ -FROM gcr.io/neuromancer-seung-import/pychunkedgraph:graph-tool_dracopy - -ENV GIT_SSL_NO_VERIFY=1 -RUN mkdir -p /home/nginx/.cloudvolume/secrets && chown -R nginx /home/nginx && usermod -d /home/nginx -s /bin/bash nginx - -COPY . /app -RUN pip install pip==20.0.1 \ - && pip install --no-cache-dir --upgrade -r requirements.txt \ - && pip install --upgrade git+https://github.com/seung-lab/PyChunkedGraph.git@pcgv1 \ - && pip install --upgrade git+https://github.com/seung-lab/KVDbClient.git@main \ No newline at end of file diff --git a/ingest.v1.Dockerfile b/ingest.v1.Dockerfile new file mode 100644 index 0000000..ad46359 --- /dev/null +++ b/ingest.v1.Dockerfile @@ -0,0 +1,8 @@ +FROM caveconnectome/pychunkedgraph:base_042124 + +ENV GIT_SSL_NO_VERIFY=1 +RUN mkdir -p /home/nginx/.cloudvolume/secrets && chown -R nginx /home/nginx && usermod -d /home/nginx -s /bin/bash nginx + +COPY . /app +RUN pip install --no-cache-dir --upgrade -r requirements.txt \ + && pip install --upgrade git+https://github.com/CAVEconnectome/PyChunkedGraph.git@pcgv1 \ No newline at end of file diff --git a/ingest.v2.Dockerfile b/ingest.v2.Dockerfile new file mode 100644 index 0000000..1d717e6 --- /dev/null +++ b/ingest.v2.Dockerfile @@ -0,0 +1,13 @@ +FROM caveconnectome/pychunkedgraph:base_042124 +ENV VIRTUAL_ENV=/app/venv +ENV PATH="$VIRTUAL_ENV/bin:$PATH" +ENV GIT_SSL_NO_VERIFY=1 +ENV CHUNKEDGRAPH_VERSION=2 + +RUN mkdir -p /home/nginx/.cloudvolume/secrets && chown -R nginx /home/nginx && usermod -d /home/nginx -s /bin/bash nginx + +COPY requirements.txt . +RUN pip install --upgrade --no-cache-dir -r requirements.txt \ + && pip install --upgrade git+https://github.com/CAVEconnectome/PyChunkedGraph.git@main + +COPY . /app \ No newline at end of file diff --git a/pcgl2cache/core/attributes.py b/pcgl2cache/core/attributes.py index ee4487b..20607c1 100644 --- a/pcgl2cache/core/attributes.py +++ b/pcgl2cache/core/attributes.py @@ -1,7 +1,7 @@ import numpy as np from kvdbclient.serializers import NumPyArray from kvdbclient.serializers import NumPyValue -from kvdbclient.bigtable.attributes import Attribute +from kvdbclient.attributes import _Attribute as Attribute UINT64 = np.dtype("uint64").newbyteorder("L") UINT32 = np.dtype("uint32").newbyteorder("L") diff --git a/pcgl2cache/core/features.py b/pcgl2cache/core/features.py index 2dd2934..e091db5 100644 --- a/pcgl2cache/core/features.py +++ b/pcgl2cache/core/features.py @@ -6,6 +6,7 @@ from edt import edt from sklearn import decomposition from kvdbclient import BigTableClient +from kvdbclient.base import serialize_uint64 from cloudvolume import CloudVolume from . import attributes @@ -103,7 +104,9 @@ def get_remapped_segmentation(self, l2id=None): def _get_l2_ids(l2vol: L2ChunkVolume, svids: np.array) -> np.array: if l2vol.cg: - l2ids = l2vol.cg.get_roots(svids, stop_layer=2, time_stamp=l2vol.timestamp) + l2ids = l2vol.cg.get_roots( + svids, stop_layer=2, fail_to_zero=True, time_stamp=l2vol.timestamp + ) layers = l2vol.cg.get_chunk_layers(l2ids) sv_mask = layers == 1 l2ids[sv_mask] = 0 @@ -312,10 +315,7 @@ def run_l2cache( def write_to_db(client: BigTableClient, result_d: dict) -> None: - from kvdbclient.base import Entry - from kvdbclient.base import EntryKey - - entries = [] + rows = [] for tup in zip(*result_d.values()): ( l2id, @@ -338,5 +338,5 @@ def write_to_db(client: BigTableClient, result_d: dict) -> None: attributes.PCA: pca_comp, attributes.PCA_VAL: pca_vals, } - entries.append(Entry(EntryKey(l2id), val_d)) - client.write_entries(entries) + rows.append(client.mutate_row(serialize_uint64(l2id), val_d)) + client.write(rows) diff --git a/pcgl2cache/ingest/__init__.py b/pcgl2cache/ingest/__init__.py index 7dd9098..236e755 100644 --- a/pcgl2cache/ingest/__init__.py +++ b/pcgl2cache/ingest/__init__.py @@ -1,3 +1,4 @@ +from os import environ from collections import namedtuple from os import environ @@ -6,16 +7,14 @@ _cluster_ingest_config_fields = ( "REDIS_URL", - "BATCH_SIZE", - "L2CACHE_Q_NAME", - "L2CACHE_Q_LIMIT", # these limits ensure the queue won't use too much memory - "L2CACHE_Q_INTERVAL", # sleep interval before queuing the next job when limit is reached + "QUEUE_NAME", + "QUEUE_SIZE", # these limits ensure the queue won't use too much memory + "QUEUE_INTERVAL", # sleep interval before queuing the next job when limit is reached ) _cluster_ingest_defaults = ( REDIS_URL, - 10, - "atomic", - int(environ.get("L2CACHE_Q_LIMIT", 500000)), + "l2", + int(environ.get("QUEUE_SIZE", 1000000)), 60, ) ClusterIngestConfig = namedtuple( diff --git a/pcgl2cache/ingest/cli.py b/pcgl2cache/ingest/cli.py index fa007b7..fdb6e3b 100644 --- a/pcgl2cache/ingest/cli.py +++ b/pcgl2cache/ingest/cli.py @@ -5,6 +5,7 @@ import click from flask.cli import AppGroup +import pcgl2cache from .manager import IngestionManager from .redis import get_redis_connection @@ -23,28 +24,78 @@ def flush_redis(): @click.argument("graph_id", type=str) @click.argument("cv_path", type=str) @click.argument("timestamp", type=str) -@click.option("--create", is_flag=True) -@click.option("--test", is_flag=True) -def ingest_graph( +@click.option("--create", is_flag=True, help="Creates a bigtable named CACHE_ID.") +@click.option( + "--test", is_flag=True, help="Queues 8 chunks at the center of the dataset." +) +def ingest_cache( cache_id: str, graph_id: str, cv_path: str, timestamp: str, create: bool, test: bool ): """ - Main ingest command - Takes ingest config from a yaml file and queues atomic tasks + Main ingest command for old pychunkedgraph format. """ from datetime import datetime + from datetime import timezone from . import IngestConfig from . import ClusterIngestConfig from .v1.jobs import enqueue_atomic_tasks if create: from kvdbclient import BigTableClient + from kvdbclient import get_default_client_info - client = BigTableClient() - client.create_table(cache_id) + meta = {"graph_id": graph_id, "cv_path": cv_path, "timestamp": timestamp} + client = BigTableClient(cache_id, config=get_default_client_info().CONFIG) + client.create_table(meta=meta, version=pcgl2cache.__version__) - # example format Jun 1 2005 1:33PM - timestamp = datetime.strptime(timestamp, "%b %d %Y %I:%M%p") + # example format 2018-06-29 08:15:27 + timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S").replace( + tzinfo=timezone.utc + ) + enqueue_atomic_tasks( + IngestionManager( + IngestConfig(CLUSTER=ClusterIngestConfig(), TEST_RUN=test), + cache_id, + graph_id, + ), + cv_path, + timestamp, + ) + + +@ingest_cli.command("v2") +@click.argument("cache_id", type=str) +@click.argument("graph_id", type=str) +@click.argument("cv_path", type=str) +@click.argument("timestamp", type=str) +@click.option("--create", is_flag=True, help="Creates a bigtable named CACHE_ID.") +@click.option( + "--test", is_flag=True, help="Queues 8 chunks at the center of the dataset." +) +def ingest_cache_v2( + cache_id: str, graph_id: str, cv_path: str, timestamp: str, create: bool, test: bool +): + """ + Main ingest command for new pychunkedgraph format. + """ + from datetime import datetime + from datetime import timezone + from . import IngestConfig + from . import ClusterIngestConfig + from .v2.jobs import enqueue_atomic_tasks + + if create: + from kvdbclient import BigTableClient + from kvdbclient import get_default_client_info + + meta = {"graph_id": graph_id, "cv_path": cv_path, "timestamp": timestamp} + client = BigTableClient(cache_id, config=get_default_client_info().CONFIG) + client.create_table(meta=meta, version=pcgl2cache.__version__) + + # example format 2022-06-29 08:15:27 + timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S").replace( + tzinfo=timezone.utc + ) enqueue_atomic_tasks( IngestionManager( IngestConfig(CLUSTER=ClusterIngestConfig(), TEST_RUN=test), @@ -58,8 +109,15 @@ def ingest_graph( @ingest_cli.command("status") def ingest_status(): + """ + Print progress completed/total. + """ + from .redis import keys as r_keys + redis = get_redis_connection() - print(redis.scard("2c")) + imanager = IngestionManager.from_pickle(redis.get(r_keys.INGESTION_MANAGER)) + l2chunk_count = imanager.cg.meta.layer_chunk_counts[0] + print(f"{redis.scard('2c')} / {l2chunk_count}") def init_ingest_cmds(app): diff --git a/pcgl2cache/ingest/redis.py b/pcgl2cache/ingest/redis.py index 18bf699..c293286 100644 --- a/pcgl2cache/ingest/redis.py +++ b/pcgl2cache/ingest/redis.py @@ -15,8 +15,8 @@ REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD", "") REDIS_URL = f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0" -keys_fields = ("INGESTION_MANAGER", "ATOMIC_HASH_FINISHED") -keys_defaults = ("pcg:imanager", "rq:finished:atomic") +keys_fields = ("INGESTION_MANAGER", "L2CACHE_HASH_FINISHED") +keys_defaults = ("pcg:imanager", "rq:finished:l2") Keys = namedtuple("keys", keys_fields, defaults=keys_defaults) keys = Keys() diff --git a/pcgl2cache/ingest/rq_cli.py b/pcgl2cache/ingest/rq_cli.py index 7992cdc..446d21f 100644 --- a/pcgl2cache/ingest/rq_cli.py +++ b/pcgl2cache/ingest/rq_cli.py @@ -10,9 +10,8 @@ from rq import Worker from rq.worker import WorkerStatus from rq.job import Job -from rq.exceptions import InvalidJobOperationError -from rq.registry import StartedJobRegistry from rq.registry import FailedJobRegistry +from rq.exceptions import NoSuchJobError from flask import current_app from flask.cli import AppGroup @@ -87,6 +86,8 @@ def enqueue(queue, job_ids): @click.argument("job_ids", nargs=-1) def requeue(queue, all, job_ids): """Requeue failed jobs.""" + from rq.exceptions import InvalidJobOperationError + failed_job_registry = FailedJobRegistry(queue, connection=connection) if all: job_ids = failed_job_registry.get_job_ids() @@ -100,7 +101,7 @@ def requeue(queue, all, job_ids): for job_id in job_ids: try: failed_job_registry.requeue(job_id) - except InvalidJobOperationError: + except (InvalidJobOperationError, NoSuchJobError): fail_count += 1 if fail_count > 0: @@ -117,6 +118,8 @@ def clean_start_registry(queue): Sometimes started jobs are not moved to failed registry (network issues) This command takes the jobs off the started registry and reueues them """ + from rq.registry import StartedJobRegistry + registry = StartedJobRegistry(name=queue, connection=connection) cleaned_jobs = registry.cleanup() print(f"Requeued {len(cleaned_jobs)} jobs from the started job registry.") @@ -127,9 +130,16 @@ def clean_start_registry(queue): def clear_failed_registry(queue): failed_job_registry = FailedJobRegistry(queue, connection=connection) job_ids = failed_job_registry.get_job_ids() + count = 0 for job_id in job_ids: - failed_job_registry.remove(job_id, delete_job=True) + try: + failed_job_registry.remove(job_id, delete_job=True) + except NoSuchJobError: + count += 1 + print(f"Deleted {len(job_ids)} jobs from the failed job registry.") + if count > 0: + click.secho(f"Unable to clear {count} jobs from failed queue.", fg="red") def init_rq_cmds(app): diff --git a/pcgl2cache/ingest/v1/jobs.py b/pcgl2cache/ingest/v1/jobs.py index 06f6afe..7dcd4df 100644 --- a/pcgl2cache/ingest/v1/jobs.py +++ b/pcgl2cache/ingest/v1/jobs.py @@ -27,7 +27,7 @@ def enqueue_atomic_tasks( imanager.redis.flushdb() bbox = np.array(imanager.cg.cv.bounds.to_list()) dataset_size = bbox[3:] - bbox[:3] - atomic_chunk_bounds = np.ceil(dataset_size / imanager.cg.chunk_size).astype(np.int) + atomic_chunk_bounds = np.ceil(dataset_size / imanager.cg.chunk_size).astype(int) chunk_coords = list(product(*[range(r) for r in atomic_chunk_bounds])) np.random.shuffle(chunk_coords) @@ -43,11 +43,11 @@ def enqueue_atomic_tasks( chunked_jobs = chunked(chunk_coords, 1000) for batch in chunked_jobs: - q = imanager.get_task_queue(imanager.config.CLUSTER.L2CACHE_Q_NAME) + q = imanager.get_task_queue(imanager.config.CLUSTER.QUEUE_NAME) # for optimal use of redis memory wait if queue limit is reached - if len(q) > imanager.config.CLUSTER.L2CACHE_Q_LIMIT: - print(f"Sleeping {imanager.config.CLUSTER.L2CACHE_Q_INTERVAL}s...") - sleep(imanager.config.CLUSTER.L2CACHE_Q_INTERVAL) + if len(q) > imanager.config.CLUSTER.QUEUE_SIZE: + print(f"Sleeping {imanager.config.CLUSTER.QUEUE_INTERVAL}s...") + sleep(imanager.config.CLUSTER.QUEUE_INTERVAL) job_datas = [] for chunk in batch: @@ -93,7 +93,7 @@ def _ingest_chunk( mip=imanager.cg.cv.mip, ) - chunk_coord = np.array(list(chunk_coord), dtype=np.int) + chunk_coord = np.array(list(chunk_coord), dtype=int) r = run_l2cache(cv, imanager.cg, chunk_coord, timestamp) print(f"L2ID count: {len(r.get('l2id', []))}") write_to_db(BigTableClient(imanager.cache_id), r) diff --git a/pcgl2cache/ingest/v2/__init__.py b/pcgl2cache/ingest/v2/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pcgl2cache/ingest/v2/jobs.py b/pcgl2cache/ingest/v2/jobs.py new file mode 100644 index 0000000..9f1e494 --- /dev/null +++ b/pcgl2cache/ingest/v2/jobs.py @@ -0,0 +1,114 @@ +from datetime import datetime +from typing import Sequence +from typing import Optional +from os import environ + +import numpy as np + +from ..utils import chunk_id_str +from ..manager import IngestionManager + + +def _post_task_completion(imanager: IngestionManager, layer: int, coords: np.ndarray): + chunk_str = "_".join(map(str, coords)) + # remove from queued hash and put in completed hash + imanager.redis.sadd(f"{layer}c", chunk_str) + return + + +def randomize_grid_points(X: int, Y: int, Z: int): + indices = np.arange(X * Y * Z) + np.random.shuffle(indices) + for index in indices: + yield np.unravel_index(index, (X, Y, Z)) + + +def enqueue_atomic_tasks( + imanager: IngestionManager, cv_path: str, timestamp: Optional[datetime] = None +): + from time import sleep + from rq import Queue as RQueue + + atomic_chunk_bounds = imanager.cg.meta.layer_chunk_bounds[2] + chunk_coords = randomize_grid_points(*atomic_chunk_bounds) + + if imanager.config.TEST_RUN: + x, y, z = np.array(atomic_chunk_bounds) // 2 + f = lambda r1, r2, r3: np.array(np.meshgrid(r1, r2, r3), dtype=int).T.reshape( + -1, 3 + ) + chunk_coords = f((x, x + 1), (y, y + 1), (z, z + 1)) + print(f"Test jobs count: {len(chunk_coords)}") + + print(f"Total jobs count: {imanager.cg.meta.layer_chunk_counts[0]}") + batch_size = int(environ.get("L2JOB_BATCH_SIZE", 10000)) + + job_datas = [] + for chunk_coord in chunk_coords: + q = imanager.get_task_queue(imanager.config.CLUSTER.QUEUE_NAME) + # buffer for optimal use of redis memory + if len(q) > imanager.config.CLUSTER.QUEUE_SIZE: + print(f"Sleeping {imanager.config.CLUSTER.QUEUE_INTERVAL}s...") + sleep(imanager.config.CLUSTER.QUEUE_INTERVAL) + + x, y, z = chunk_coord + chunk_str = f"{x}_{y}_{z}" + if imanager.redis.sismember("2c", chunk_str): + # already done, skip + continue + job_datas.append( + RQueue.prepare_data( + _ingest_chunk, + args=( + imanager.serialize_info(pickled=True), + cv_path, + chunk_coord, + timestamp, + ), + timeout=environ.get("JOB_TIMEOUT", "5m"), + result_ttl=0, + job_id=chunk_id_str(2, chunk_coord), + ) + ) + if len(job_datas) % batch_size == 0: + q.enqueue_many(job_datas) + job_datas = [] + q.enqueue_many(job_datas) + + +def _ingest_chunk( + im_info: str, + cv_path: str, + chunk_coord: Sequence[int], + timestamp: datetime, +): + from cloudvolume import CloudVolume + from pychunkedgraph.graph import ChunkedGraph + from kvdbclient import BigTableClient + from kvdbclient import get_default_client_info + from ...core.features import run_l2cache + from ...core.features import write_to_db + + imanager = IngestionManager.from_pickle(im_info) + cg = ChunkedGraph(graph_id=imanager.graph_id) + cv = CloudVolume( + cv_path, + bounded=False, + fill_missing=True, + progress=False, + mip=cg.meta.cv.mip, + ) + + chunk_coord = np.array(chunk_coord, dtype=int) + r = run_l2cache( + cv, + cg=cg, + chunk_coord=chunk_coord, + timestamp=timestamp, + ) + + config = get_default_client_info().CONFIG + print(f"L2ID count: {len(r.get('l2id', []))}") + + write_to_db(BigTableClient(imanager.cache_id, config=config), r) + _post_task_completion(imanager, 2, chunk_coord) diff --git a/pcgl2cache/utils.py b/pcgl2cache/utils.py index 68c468a..309ada8 100644 --- a/pcgl2cache/utils.py +++ b/pcgl2cache/utils.py @@ -29,7 +29,10 @@ def read_l2cache_config() -> dict: """ import yaml - yml_path = environ["GRAPH_L2CACHE_CONFIG_PATH"] + try: + yml_path = environ["GRAPH_L2CACHE_CONFIG_PATH"] + except KeyError: + return {} with open(yml_path, "r") as stream: return yaml.safe_load(stream) diff --git a/requirements-dev.txt b/requirements-dev.txt index bbb1248..f063141 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,5 +1,4 @@ pylint black pyopenssl -jupyter -memory-profiler \ No newline at end of file +jupyter \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index f445cfd..9e02149 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,23 +1,23 @@ +fastremap cloud-files cloud-volume +redis +rq urllib3>=1.26.3 requests>=2.25.0 grpcio>=1.36.1 certifi>=2020.12.5 -numpy pandas scikit-learn google-cloud-bigtable -google-cloud-datastore>=1.8<=2.0dev +google-cloud-datastore>=1.8 flask flask_cors multiwrapper middle-auth-client>=3.11.0 python-json-logger zstandard -redis -rq -fastremap +kvdbclient>=0.3.1 edt pyyaml cachetools diff --git a/workers/common.py b/workers/common.py index d12267a..b45b5f7 100644 --- a/workers/common.py +++ b/workers/common.py @@ -3,8 +3,7 @@ def calculate_features(l2ids, l2cache_id, cv_path): import numpy as np from cloudvolume import CloudVolume from kvdbclient import BigTableClient, get_default_client_info - from kvdbclient.base import Entry - from kvdbclient.base import EntryKey + from kvdbclient.base import serialize_uint64 from pcgl2cache.core.attributes import SIZE_NM3 from pcgl2cache.core.features import run_l2cache from pcgl2cache.core.features import write_to_db @@ -17,7 +16,7 @@ def calculate_features(l2ids, l2cache_id, cv_path): continue result = run_l2cache(cv, l2id=_id) if not result: - entry = Entry(EntryKey(_id), {SIZE_NM3: np.uint64(0)}) - client.write_entries([entry]) + row = client.mutate_row(serialize_uint64(_id), {SIZE_NM3: np.uint64(0)}) + client.write([row]) write_to_db(client, result) gc.collect()