|
| 1 | +""" |
| 2 | +This module provides a python-syntax interface for constructing and executing pdal-python json |
| 3 | +pipelines. The API is not explicitly defined but stage names are validated against the pdal executable's drivers when possible. |
| 4 | +
|
| 5 | +To construct pipeline stages, access the driver name from this module. This will create |
| 6 | +a callable function where driver parameters can be specified as keyword arguments. For example: |
| 7 | +
|
| 8 | +>>> from pdal import pio |
| 9 | +>>> las_reader = pio.readers.las(filename="test.las") |
| 10 | +
|
| 11 | +To construct a pipeline, sum stages together. |
| 12 | +
|
| 13 | +>>> pipeline = pio.readers.las(filename="test.las") + pio.writers.ply(filename="test.ply") |
| 14 | +
|
| 15 | +To execute a pipeline and return results, call `execute`. |
| 16 | +
|
| 17 | +>>> arr = pipeline.execute() # returns a numpy structured array |
| 18 | +
|
| 19 | +To access the pipelines as a dict (which may be dumped to json), call `spec`. |
| 20 | +
|
| 21 | +>>> json.dumps(pipeline.spec) |
| 22 | +
|
| 23 | +""" |
| 24 | + |
| 25 | +import types |
| 26 | +import json |
| 27 | +import subprocess |
| 28 | +from functools import partial |
| 29 | +from collections import defaultdict |
| 30 | +from itertools import chain |
| 31 | +import copy |
| 32 | +import warnings |
| 33 | + |
| 34 | +import pdal |
| 35 | + |
| 36 | +try: |
| 37 | + PDAL_DRIVERS_JSON = subprocess.run(["pdal", "--drivers", "--showjson"], capture_output=True).stdout |
| 38 | + PDAL_DRIVERS = json.loads(PDAL_DRIVERS_JSON) |
| 39 | + _PDAL_VALIDATE = True |
| 40 | +except: |
| 41 | + PDAL_DRIVERS = [] |
| 42 | + _PDAL_VALIDATE = False |
| 43 | + |
| 44 | +DEFAULT_STAGE_PARAMS = defaultdict(dict) |
| 45 | +DEFAULT_STAGE_PARAMS.update({ |
| 46 | +# TODO: add stage specific default configurations |
| 47 | +}) |
| 48 | + |
| 49 | + |
| 50 | +class StageSpec(object): |
| 51 | + def __init__(self, prefix, **kwargs): |
| 52 | + self.prefix = prefix |
| 53 | + self.key = ".".join([self.prefix, kwargs.get("type", "")]) |
| 54 | + self.spec = DEFAULT_STAGE_PARAMS[self.key].copy() |
| 55 | + self.spec.update(kwargs) |
| 56 | + self.spec["type"] = self.key |
| 57 | + # NOTE: special case to support reading files without passing an explicit reader |
| 58 | + if (self.prefix in ["readers", "writers"]) and kwargs.get("type") == "auto": |
| 59 | + del self.spec["type"] |
| 60 | + |
| 61 | + @property |
| 62 | + def pipeline(self): |
| 63 | + """ |
| 64 | + Promote this stage to a `pdal.pio.PipelineSpec` with one `pdal.pio.StageSpec` |
| 65 | + and return it. |
| 66 | + """ |
| 67 | + output = PipelineSpec() |
| 68 | + output.add_stage(self) |
| 69 | + return output |
| 70 | + |
| 71 | + def __getattr__(self, name): |
| 72 | + if _PDAL_VALIDATE and (name not in dir(self)): |
| 73 | + raise AttributeError(f"'{self.prefix}.{name}' is an invalid or unsupported PDAL stage") |
| 74 | + return partial(self.__class__, self.prefix, type=name) |
| 75 | + |
| 76 | + def __str__(self): |
| 77 | + return json.dumps(self.spec, indent=4) |
| 78 | + |
| 79 | + def __add__(self, other): |
| 80 | + return self.pipeline + other |
| 81 | + |
| 82 | + def __dir__(self): |
| 83 | + extra_keys = [e["name"][len(self.key):] for e in PDAL_DRIVERS if e["name"].startswith(self.key)] + ["auto"] |
| 84 | + return super().__dir__() + [e for e in extra_keys if len(e) > 0] |
| 85 | + |
| 86 | + def execute(self): |
| 87 | + return self.pipeline.execute() |
| 88 | + |
| 89 | + |
| 90 | +readers = StageSpec("readers") |
| 91 | +filters = StageSpec("filters") |
| 92 | +writers = StageSpec("writers") |
| 93 | + |
| 94 | + |
| 95 | +class PipelineSpec(object): |
| 96 | + stages = [] |
| 97 | + |
| 98 | + def __init__(self, other=None): |
| 99 | + if other is not None: |
| 100 | + self.stages = copy.copy(other.stages) |
| 101 | + |
| 102 | + @property |
| 103 | + def spec(self): |
| 104 | + """ |
| 105 | + Return a `dict` containing the pdal pipeline suitable for dumping to json |
| 106 | + """ |
| 107 | + return { |
| 108 | + "pipeline": [stage.spec for stage in self.stages] |
| 109 | + } |
| 110 | + |
| 111 | + def add_stage(self, stage): |
| 112 | + """ |
| 113 | + Add a StageSpec to the end of this pipeline, and return the updated result. |
| 114 | + """ |
| 115 | + assert isinstance(stage, StageSpec), "Expected StageSpec" |
| 116 | + |
| 117 | + self.stages.append(stage) |
| 118 | + return self |
| 119 | + |
| 120 | + def __str__(self): |
| 121 | + return json.dumps(self.spec, indent=4) |
| 122 | + |
| 123 | + def __add__(self, stage_or_pipeline): |
| 124 | + assert isinstance(stage_or_pipeline, (StageSpec, PipelineSpec)), "Expected StageSpec or PipelineSpec" |
| 125 | + |
| 126 | + output = self.__class__(self) |
| 127 | + if isinstance(stage_or_pipeline, StageSpec): |
| 128 | + output.add_stage(stage_or_pipeline) |
| 129 | + elif isinstance(stage_or_pipeline, PipelineSpec): |
| 130 | + for stage in stage_or_pipeline.stages: |
| 131 | + output.add_stage(stage) |
| 132 | + return output |
| 133 | + |
| 134 | + def execute(self): |
| 135 | + """ |
| 136 | + Shortcut to execute and return the results of the pipeline. |
| 137 | + """ |
| 138 | + # TODO: do some validation before calling execute |
| 139 | + |
| 140 | + # TODO: some exception/error handling around pdal |
| 141 | + pipeline = pdal.Pipeline(json.dumps(self.spec)) |
| 142 | + # pipeline.validate() # NOTE: disabling this because it causes segfaults in certain cases |
| 143 | + pipeline.execute() |
| 144 | + |
| 145 | + return pipeline.arrays[0] # NOTE: are there situation where arrays has multiple elements? |
0 commit comments