|
4 | 4 |
|
5 | 5 | from typing import TYPE_CHECKING |
6 | 6 |
|
7 | | -from segy.schema import Endianness |
8 | | -from segy.transforms import ByteSwapTransform |
9 | | -from segy.transforms import IbmFloatTransform |
| 7 | +from copy import deepcopy |
| 8 | +import numpy as np |
10 | 9 |
|
11 | 10 | if TYPE_CHECKING: |
12 | 11 | from numpy.typing import NDArray |
13 | 12 | from segy import SegyFile |
14 | | - from segy.transforms import Transform |
15 | | - from segy.transforms import TransformPipeline |
16 | 13 |
|
| 14 | +class SegyFileTraceDataWrapper: |
17 | 15 |
|
18 | | -def _reverse_single_transform(data: NDArray, transform: Transform, endianness: Endianness) -> NDArray: |
19 | | - """Reverse a single transform operation.""" |
20 | | - if isinstance(transform, ByteSwapTransform): |
21 | | - # Reverse the endianness conversion |
22 | | - if endianness == Endianness.LITTLE: |
23 | | - return data |
| 16 | + def __init__(self, segy_file: SegyFile, indices: int | list[int] | NDArray | slice): |
| 17 | + self.segy_file = segy_file |
| 18 | + self.indices = indices |
| 19 | + self._header_pipeline = deepcopy(segy_file.accessors.header_decode_pipeline) |
| 20 | + segy_file.accessors.header_decode_pipeline.transforms = [] |
| 21 | + self.traces = segy_file.trace[indices] |
24 | 22 |
|
25 | | - reverse_transform = ByteSwapTransform(Endianness.BIG) |
26 | | - return reverse_transform.apply(data) |
| 23 | + @property |
| 24 | + def header(self): |
| 25 | + # The copy is necessary to avoid applying the pipeline to the original header. |
| 26 | + return self._header_pipeline.apply(self.traces.header.copy()) |
27 | 27 |
|
28 | | - # TODO(BrianMichell): #0000 Do we actually need to worry about IBM/IEEE transforms here? |
29 | | - if isinstance(transform, IbmFloatTransform): |
30 | | - # Reverse IBM float conversion |
31 | | - reverse_direction = "to_ibm" if transform.direction == "to_ieee" else "to_ieee" |
32 | | - reverse_transform = IbmFloatTransform(reverse_direction, transform.keys) |
33 | | - return reverse_transform.apply(data) |
34 | 28 |
|
35 | | - # For unknown transforms, return data unchanged |
36 | | - return data |
| 29 | + @property |
| 30 | + def raw_header(self): |
| 31 | + return np.ascontiguousarray(self.traces.header).view("|V240") |
37 | 32 |
|
38 | | - |
39 | | -def get_header_raw_and_transformed( |
40 | | - segy_file: SegyFile, indices: int | list[int] | NDArray | slice, do_reverse_transforms: bool = True |
41 | | -) -> tuple[NDArray | None, NDArray, NDArray]: |
42 | | - """Get both raw and transformed header data. |
43 | | -
|
44 | | - Args: |
45 | | - segy_file: The SegyFile instance |
46 | | - indices: Which headers to retrieve |
47 | | - do_reverse_transforms: Whether to apply the reverse transform to get raw data |
48 | | -
|
49 | | - Returns: |
50 | | - Tuple of (raw_headers, transformed_headers, traces) |
51 | | - """ |
52 | | - traces = segy_file.trace[indices] |
53 | | - transformed_headers = traces.header |
54 | | - |
55 | | - # Reverse transforms to get raw data |
56 | | - if do_reverse_transforms: |
57 | | - raw_headers = _reverse_transforms( |
58 | | - transformed_headers, segy_file.header.transform_pipeline, segy_file.spec.endianness |
59 | | - ) |
60 | | - else: |
61 | | - raw_headers = None |
62 | | - |
63 | | - return raw_headers, transformed_headers, traces |
64 | | - |
65 | | - |
66 | | -def _reverse_transforms( |
67 | | - transformed_data: NDArray, transform_pipeline: TransformPipeline, endianness: Endianness |
68 | | -) -> NDArray: |
69 | | - """Reverse the transform pipeline to get raw data.""" |
70 | | - raw_data = transformed_data.copy() if hasattr(transformed_data, "copy") else transformed_data |
71 | | - |
72 | | - # Apply transforms in reverse order |
73 | | - for transform in reversed(transform_pipeline.transforms): |
74 | | - raw_data = _reverse_single_transform(raw_data, transform, endianness) |
75 | | - |
76 | | - return raw_data |
| 33 | + @property |
| 34 | + def sample(self): |
| 35 | + return self.traces.sample |
0 commit comments