diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index dcf72914..8e3bc4aa 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -70,43 +70,6 @@ jobs: run: | echo "FASTPARQUET_DATAPAGE_V2=$FASTPARQUET_DATAPAGE_V2" pytest --verbose --cov=fastparquet - pandas: - name: pandas - runs-on: ubuntu-latest - steps: - - name: APT - run: sudo apt-get install liblzo2-dev - - - name: Checkout - uses: actions/checkout@v2 - with: - fetch-depth: 0 - - - name: Setup conda - uses: mamba-org/provision-with-micromamba@main - with: - environment-file: ci/environment-py310.yml - - - name: pip-install - shell: bash -l {0} - run: | - pip install Cython - pip install hypothesis - pip install pytest-localserver pytest-xdist pytest-asyncio - pip install -e . --no-deps # Install fastparquet - git clone https://github.com/pandas-dev/pandas - cd pandas - python setup.py build_ext -j 4 - pip install -e . --no-build-isolation - - - name: Run Tests - shell: bash -l {0} - run: | - pytest -v fastparquet/ # fastparquet test suite against dev pandas - pytest --verbose pandas/pandas/tests/io/test_parquet.py - # Test parquet with different filesystems - pytest --verbose pandas/pandas/tests/io/test_gcs.py pandas/pandas/tests/io/test_fsspec.py -k "parquet" - win: name: win runs-on: windows-2019 diff --git a/MANIFEST.in b/MANIFEST.in index e0dd1841..f549c532 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,13 +1,10 @@ -recursive-include fastparquet *.py -recursive-include fastparquet *.pyx - include setup.py include README.rst include LICENSE include MANIFEST.in include requirements.txt -prune fastparquet/test/ -prune fastparquet/benchmarks/ -prune test-data/ -prune docs/ +prune .github +prune docs +prune test-data +global-exclude test*.py *.parquet *.parq *.c *.thrift diff --git a/docs/source/conf.py b/docs/source/conf.py index c6e176d2..d0a75e57 100755 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -25,7 +25,7 @@ def __getattr__(cls, name): return MagicMock() -MOCK_MODULES = ['fastparquet.speedups', 'fastparquet.cencoding'] +MOCK_MODULES = ['fastparquet.cencoding'] sys.modules.update((mod_name, Mock()) for mod_name in MOCK_MODULES) APP_DIR = os.path.normpath(os.path.join(os.getcwd(), '../..')) diff --git a/fastparquet/api.py b/fastparquet/api.py index c54e5eb5..24456dc7 100644 --- a/fastparquet/api.py +++ b/fastparquet/api.py @@ -1,25 +1,26 @@ """parquet - read parquet files.""" -import ast from collections import OrderedDict, defaultdict +import concurrent.futures import re -import struct +import os import numpy as np import fsspec -import pandas as pd -from fastparquet import core, schema, converted_types, encoding, dataframe, writer +from fastparquet import core, schema, converted_types, encoding, writer from fastparquet import parquet_thrift from fastparquet.cencoding import ThriftObject, from_buffer from fastparquet.json import json_decoder from fastparquet.util import (default_open, default_remove, ParquetException, val_to_num, ops, ensure_bytes, ensure_str, check_column_names, metadata_from_many, - ex_from_sep, _strip_path_tail, get_fs, PANDAS_VERSION, join_path) + ex_from_sep, _strip_path_tail, join_path, ThreadPool, Task) # Find in names of partition files the integer matching "**part.*.parquet", # as 'i'. PART_ID = re.compile(r'.*part.(?P[\d]+).parquet$') +MAX_WORKERS = min(32, (os.cpu_count() or 1)) # from ThreadPoolExecutor +ex_stash = [None] class ParquetFile(object): @@ -42,24 +43,12 @@ class ParquetFile(object): that they make up a single parquet data set. This parameter can also be any file-like object, in which case this must be a single-file dataset. - verify: bool [False] - test file start/end byte markers - open_with: function - With the signature `func(path, mode)`, returns a context which - evaluated to a file open for reading. Defaults to the built-in `open`. root: str If passing a list of files, the top directory of the data-set may be ambiguous for partitioning where the upmost field has only one value. Use this to specify the dataset root directory, if required. fs: fsspec-compatible filesystem You can use this instead of open_with (otherwise, it will be inferred) - pandas_nulls: bool (True) - If True, columns that are int or bool in parquet, but have nulls, will become - pandas nullale types (Uint, Int, boolean). If False (the only behaviour - prior to v0.7.0), both kinds will be cast to float, and nulls will be NaN - unless pandas metadata indicates that the original datatypes were nullable. - Pandas nullable types were introduces in v1.0.0, but were still marked as - experimental in v1.3.0. Attributes ---------- @@ -77,8 +66,6 @@ class ParquetFile(object): The data columns available count: int Total number of rows - dtypes: dict - Expected output types for each column file_scheme: str 'simple': all row groups are within the same file; 'hive': all row groups are in other files; 'mixed': row groups in this file and others @@ -102,46 +89,24 @@ class ParquetFile(object): _pdm = None _kvm = None _categories = None + _statistics = None - def __init__(self, fn, verify=False, open_with=default_open, root=False, - sep=None, fs=None, pandas_nulls=True, dtypes=None): - self.pandas_nulls = pandas_nulls - self._base_dtype = dtypes - self.tz = None - self._columns_dtype = None - if open_with is default_open and fs is None: - fs = fsspec.filesystem("file") - elif fs is not None: - open_with = fs.open - else: - fs = getattr(open_with, "__self__", None) - if fs is None: - fs, fn, open_with, mkdirs = get_fs(fn, open_with, None) - + def __init__(self, fn: str, fs=fsspec.filesystem("file"), root=False): + self.fs = fs if isinstance(fn, (tuple, list)): if root and fs is not None: root = fs._strip_protocol(root) - basepath, fmd = metadata_from_many(fn, verify_schema=verify, - open_with=open_with, root=root, - fs=fs) + basepath, fmd = metadata_from_many(fn, root=root, fs=fs) writer.consolidate_categories(fmd) self.fn = join_path( basepath, '_metadata') if basepath else '_metadata' self.fmd = fmd self._set_attrs() - elif hasattr(fn, 'read'): - # file-like - self.fn = None - self._parse_header(fn, verify) - if self.file_scheme not in ['simple', 'empty']: - raise ValueError('Cannot use file-like input ' - 'with multi-file data') - open_with = lambda *args, **kwargs: fn elif isinstance(fs, fsspec.AbstractFileSystem): if fs.isfile(fn): self.fn = join_path(fn) - with open_with(fn, 'rb') as f: - self._parse_header(f, verify) + with fs.open(fn, 'rb') as f: + self._parse_header(f) if root: paths = [fn.replace(root, "")] self.file_scheme, self.cats = paths_to_cats(paths, None) @@ -149,8 +114,8 @@ def __init__(self, fn, verify=False, open_with=default_open, root=False, fn2 = join_path(fn, '_metadata') if fs.exists(fn2): self.fn = fn2 - with open_with(fn2, 'rb') as f: - self._parse_header(f, verify) + with fs.open(fn2, 'rb') as f: + self._parse_header(f) fn = fn2 else: # TODO: get details from fs here, rather than do suffix cat in @@ -165,9 +130,7 @@ def __init__(self, fn, verify=False, open_with=default_open, root=False, raise ValueError("No files in dir") if root: root = fs._strip_protocol(root) - basepath, fmd = metadata_from_many(allfiles, verify_schema=verify, - open_with=open_with, root=root, - fs=fs) + basepath, fmd = metadata_from_many(allfiles, root=root, fs=fs) writer.consolidate_categories(fmd) self.fn = join_path(basepath, '_metadata') if basepath \ else '_metadata' @@ -177,45 +140,34 @@ def __init__(self, fn, verify=False, open_with=default_open, root=False, else: raise FileNotFoundError(fn) else: - done = False - try: - self.fn = fn - f = open_with(fn) - self._parse_header(f, verify) - done = True - except IOError: - pass - if not done: - # allow this to error with FileNotFound or whatever - try: - self.fn = join_path(fn, "_metadata") - f = open_with(self.fn) - self._parse_header(f, verify) - except IOError as e: - raise ValueError("Opening directories without a _metadata requires" - "a filesystem compatible with fsspec") from e - self.open = open_with - self._statistics = None - - def _parse_header(self, f, verify=True): + raise TypeError + + def _parse_header(self, f): if self.fn and self.fn.endswith("_metadata"): # no point attempting to read footer only for pure metadata - data = f.read()[4:-8] + data = f.read() + assert data[-4:] == b"PAR1" + data = data[4:-8] self._head_size = len(data) else: try: - f.seek(0) - if verify: - assert f.read(4) == b'PAR1' - f.seek(-8, 2) - head_size = struct.unpack('= nrows: - break - return self[:i+1].to_pandas(**kwargs).head(nrows) - def __getitem__(self, item): """Select among the row-groups using integer/slicing""" import copy @@ -313,14 +246,11 @@ def __getitem__(self, item): if not isinstance(new_rgs, list): new_rgs = [new_rgs] new_pf = object.__new__(ParquetFile) + new_pf.__dict__.update(self.__dict__) fmd = copy.copy(self.fmd) fmd.row_groups = new_rgs - new_pf.__setstate__( - {"fn": self.fn, "open": self.open, "fmd": fmd, - "pandas_nulls": self.pandas_nulls, "_base_dtype": self._base_dtype, - "tz": self.tz, "_columns_dtype": self._columns_dtype} - ) - new_pf._set_attrs() + new_pf.row_groups = new_rgs + self._read_partitions() return new_pf def __len__(self): @@ -335,18 +265,21 @@ def __bool__(self): return True def row_group_filename(self, rg): - if rg.columns and rg.columns[0].file_path: + if self.file_scheme == "simple": + return self.fn + cs = rg.columns + if cs and cs[0].file_path: base = self.basepath if base: - return join_path(base, rg.columns[0].file_path) + return join_path(base, cs[0].file_path) else: - return rg.columns[0].file_path + return cs[0].file_path else: return self.fn def read_row_group_file(self, rg, columns, categories, index=None, assign=None, partition_meta=None, row_filter=False, - infile=None): + infile=None, ex=None): """ Open file for reading, and process it as a row-group assign is None if this method is called directly (not from to_pandas), @@ -364,35 +297,14 @@ def read_row_group_file(self, rg, columns, categories, index=None, """ categories = self.check_categories(categories) fn = self.row_group_filename(rg) - ret = False - if assign is None: - if row_filter and isinstance(row_filter, list): - cs = self._columns_from_filters(row_filter) - df = self.read_row_group_file( - rg, cs, categories, index=False, - infile=infile, row_filter=False) - row_filter = self._column_filter(df, filters=row_filter) - size = row_filter.sum() - if size == rg.num_rows: - row_filter = False - else: - size = rg.num_rows - df, assign = self.pre_allocate( - size, columns, categories, index) - if "PANDAS_ATTRS" in self.key_value_metadata: - import json - df.attrs = json.loads(self.key_value_metadata["PANDAS_ATTRS"]) - ret = True f = infile or self.open(fn, mode='rb') core.read_row_group( f, rg, columns, categories, self.schema, self.cats, selfmade=self.selfmade, index=index, assign=assign, scheme=self.file_scheme, partition_meta=partition_meta, - row_filter=row_filter + row_filter=row_filter, ex=ex ) - if ret: - return df def iter_row_groups(self, filters=None, **kwargs): """ @@ -540,15 +452,6 @@ def write_row_groups(self, data, row_group_offsets=None, sort_key=None, """ from .writer import write_simple, write_multi partition_on = list(self.cats) - if isinstance(data, pd.DataFrame): - self_cols = sorted(self.columns + partition_on) - if self_cols != sorted(data.columns): - diff_cols = set(data.columns) ^ set(self_cols) - raise ValueError( - f'Column names of new data are {sorted(data.columns)}. ' - f'But column names in existing file are {self_cols}. ' - f'{diff_cols} are columns being either only in existing ' - 'file or only in new data. This is not possible.') if (self.file_scheme == 'simple' or (self.file_scheme == 'empty' and self.fn[-9:] != '_metadata')): # Case 'simple'. @@ -688,155 +591,57 @@ def _column_filter(self, df, filters): out |= and_part return out - def to_pandas(self, columns=None, categories=None, filters=[], - index=None, row_filter=False, dtypes=None): + def to_numpy(self, columns=None, filters=None, worker_threads=MAX_WORKERS): """ - Read data from parquet into a Pandas dataframe. - - Parameters - ---------- - columns: list of names or `None` - Column to load (see `ParquetFile.columns`). Any columns in the - data not in this list will be ignored. If `None`, read all columns. - categories: list, dict or `None` - If a column is encoded using dictionary encoding in every row-group - and its name is also in this list, it will generate a Pandas - Category-type column, potentially saving memory and time. If a - dict {col: int}, the value indicates the number of categories, - so that the optimal data-dtype can be allocated. If ``None``, - will automatically set *if* the data was written from pandas. - filters: list of list of tuples or list of tuples - To filter out data. - Filter syntax: [[(column, op, val), ...],...] - where op is [==, =, >, >=, <, <=, !=, in, not in] - The innermost tuples are transposed into a set of filters applied - through an `AND` operation. - The outer list combines these sets of filters through an `OR` - operation. - A single list of tuples can also be used, meaning that no `OR` - operation between set of filters is to be conducted. - index: string or list of strings or False or None - Column(s) to assign to the (multi-)index. If None, index is - inferred from the metadata (if this was originally pandas data); if - the metadata does not exist or index is False, index is simple - sequential integers. - row_filter: bool or boolean ndarray - Whether filters are applied to whole row-groups (False, default) - or row-wise (True, experimental). The latter requires two passes of - any row group that may contain valid rows, but can be much more - memory-efficient, especially if the filter columns are not required - in the output. - If boolean array, it is applied as custom row filter. In this case, - 'filter' parameter is ignored, and length of the array has to be - equal to the total number of rows. - - Returns - ------- - Pandas data-frame + worker_threads: 0 means no threads. """ rgs = filter_row_groups(self, filters) if filters else self.row_groups - index = self._get_index(index) if columns is not None: - columns = columns[:] + check_column_names(self.columns, columns, ()) + cnum = len(columns) else: - columns = self.columns + list(self.cats) - if index: - columns += [i for i in index if i not in columns] - check_column_names(self.columns + list(self.cats), columns, categories) - if row_filter is not False: - if filters and row_filter is True: - # Rows are selected as per filters. - # TODO: special case when filter columns are also in output - cs = self._columns_from_filters(filters) - df = self.to_pandas(columns=cs, filters=filters, row_filter=False, - index=False) - sel = self._column_filter(df, filters=filters) - else: - # Row are selected as per custom 'sel'. - if sum(rg.num_rows for rg in rgs) != len(row_filter): - raise ValueError('Provided boolean array for custom row \ -selection does not match number of rows in DataFrame.') - sel = row_filter - size = sel.sum() - selected = [] - start = 0 - for rg in rgs[:]: - selected.append(sel[start:start+rg.num_rows]) - start += rg.num_rows - else: - size = sum(rg.num_rows for rg in rgs) - selected = [None] * len(rgs) # just to fill zip, below - df, views = self.pre_allocate(size, columns, categories, index, dtypes=dtypes) - if "PANDAS_ATTRS" in self.key_value_metadata: - import json - df.attrs = json.loads(self.key_value_metadata["PANDAS_ATTRS"]) - - start = 0 + cnum = self.schema.root.num_children + views = {} if self.file_scheme == 'simple': - infile = self.open(self.fn, 'rb') + infile = self.fs.open(self.fn, 'rb') else: infile = None - for rg, sel in zip(rgs, selected): - thislen = sel.sum() if sel is not None else rg.num_rows - if thislen == rg.num_rows: - # all good; noop if no row filtering - sel = None - elif thislen == 0: - # no valid rows - continue - parts = {name: (v if name.endswith('-catdef') - else v[start:start + thislen]) - for (name, v) in views.items()} - self.read_row_group_file(rg, columns, categories, index, - assign=parts, partition_meta=self.partition_meta, - row_filter=sel, infile=infile) - start += thislen - return df - - def pre_allocate(self, size, columns, categories, index, dtypes=None): - if dtypes is not None: - columns = list(dtypes) + if worker_threads: + # TODO: consider the case (tiny data) where it's just not worthwhile + ex = ThreadPool(num_workers=worker_threads) else: - dtypes = self._dtypes(categories) - categories = self.check_categories(categories) - cats = {k: v for k, v in self.cats.items() if k in columns} - df, arrs = _pre_allocate(size, columns, categories, index, cats, - dtypes, self.tz, columns_dtype=self._columns_dtype) - i_no_name = re.compile(r"__index_level_\d+__") - if self.has_pandas_metadata: - md = self.pandas_metadata - if categories: - for c in md['columns']: - if c['name'] in categories and c['name'] in df and c['metadata']: - df[c['name']].dtype._ordered = c['metadata']['ordered'] - if md.get('index_columns', False) and not (index or index is False): - if len(md['index_columns']) == 1: - ic = md['index_columns'][0] - if isinstance(ic, dict) and ic.get('kind') == 'range': - from pandas import RangeIndex - df.index = RangeIndex( - start=ic['start'], - stop=ic['start'] + size * ic['step'] + 1, - step=ic['step'] - )[:size] - names = [(c['name'] if isinstance(c, dict) else c) - for c in md['index_columns']] - names = [None if n is None or i_no_name.match(n) else n - for n in names] - df.index.names = names - if md.get('column_indexes', False): - names = [(c['name'] if isinstance(c, dict) else c) - for c in md['column_indexes']] - names = [None if n is None or i_no_name.match(n) else n - for n in names] - if len(names) > 1: - df.columns = pd.MultiIndex.from_tuples( - [ast.literal_eval(c) for c in df.columns if c not in df.index.names], - names=names - ) - else: - df.columns.names = names - return df, arrs + ex = None + # TODO: this condition is too simple, should also depend on + # number of selected/available columns + if ex is not None and (len(rgs) > worker_threads * 2 or cnum < len(rgs)): + # parallel over row-groups + for i, rg in enumerate(rgs): + # This is the same as per rg iteration + ass = views.setdefault(i, {}) + ex.submit(self.read_row_group_file, rg=rg, columns=columns, index=None, categories=None, + assign=ass, partition_meta=self.partition_meta, + infile=infile, ex=None) + ex.go() + else: + # no parallel or parallel over columns + for i, rg in enumerate(rgs): + # This is the same as per rg iteration + ass = views.setdefault(i, {}) + self.read_row_group_file(rg, columns, None, None, + assign=ass, partition_meta=self.partition_meta, + infile=infile, ex=ex) + if ex is not None: + ex.go() + # out = {} + # # simple concat, no evolution for now + # for k in views[0]: + # if k.endswith("-offsets"): + # out[k] = concat_and_add([views[_][k] for _ in range(len(views))]) + # elif k.endswith("-index"): + # out[k] = concat_and_add([views[_][k] for _ in range(len(views))], offset=False) + # else: + # out[k] = simple_concat([views[_][k] for _ in range(len(views))]) + return views def count(self, filters=None, row_filter=False): """ Total number of rows @@ -861,6 +666,8 @@ def info(self): "row_groups": len(self.row_groups)} def check_categories(self, cats): + if cats in [None, [], ()]: + return {} categ = self.categories if not self.has_pandas_metadata: return cats or {} @@ -872,22 +679,20 @@ def check_categories(self, cats): if isinstance(cats, dict): return cats out = {k: v for k, v in categ.items() if k in cats} - out.update({c: pd.RangeIndex(0, 2**14) for c in cats if c not in categ}) + out.update({c: None for c in cats if c not in categ}) return out @property def has_pandas_metadata(self): - if self._pdm: - return True - if self.fmd.key_value_metadata is None: - return False - return bool(self.key_value_metadata.get('pandas', False)) + return bool(self.pandas_metadata) @property def pandas_metadata(self): if self._pdm is None: - if self.has_pandas_metadata: - self._pdm = json_decoder()(self.key_value_metadata['pandas']) + b = [kv.value for kv in self.fmd.key_value_metadata or [] + if kv.key in (b"pandas", "pandas")] + if b: + self._pdm = json_decoder()(b) else: self._pdm = {} return self._pdm @@ -938,74 +743,6 @@ def categories(self): else: return {} - def _dtypes(self, categories=None): - """ Implied types of the columns in the schema """ - import pandas as pd - if self._base_dtype is None: - if self.has_pandas_metadata: - md = self.pandas_metadata['columns'] - md = {c['name']: c for c in md} - tz = {k: v["metadata"]['timezone'] for k, v in md.items() - if v.get('metadata', {}) and v.get('metadata', {}).get('timezone', None)} - else: - tz = None - md = None - self.tz = tz - - dtype = OrderedDict((name, (converted_types.typemap(f, md=md) - if f.num_children in [None, 0] else np.dtype("O"))) - for name, f in self.schema.root["children"].items() - if getattr(f, 'isflat', False) is False) - for i, (col, dt) in enumerate(dtype.copy().items()): - # int and bool columns produce masked pandas types, no need to - # promote types here - if dt.kind == "M": - if self.pandas_metadata and PANDAS_VERSION.major >= 2: - # get original resolution when pandas supports non-ns - dt = md[col]["numpy_type"] - if tz is not None and tz.get(col, False): - z = dataframe.tz_to_dt_tz(tz[col]) - dt_series = pd.Series([], dtype=dt) - if PANDAS_VERSION.major >= 2 and dt_series.dt.tz is not None: - dt = dt_series.dt.tz_convert(z).dtype - else: - dt = dt_series.dt.tz_localize(z).dtype - dtype[col] = dt - elif dt in converted_types.nullable: - if self.pandas_metadata: - tt = md.get(col, {}).get("numpy_type") - if tt and ("int" in tt or "bool" in tt): - continue - # uint/int/bool columns that may have nulls become nullable - # skip is pandas_metadata gives original types - num_nulls = 0 - for rg in self.row_groups: - if rg[3] == 0: - continue - st = rg[1][i][3].get(12) - if st is None: - num_nulls = True - break - if st.get(3): - num_nulls = True - break - if num_nulls: - if self.pandas_nulls: - dtype[col] = converted_types.nullable[dt] - else: - dtype[col] = np.float64() - elif dt == 'S12': - dtype[col] = 'M8[ns]' - self._base_dtype = dtype - dtype = self._base_dtype.copy() - categories = self.check_categories(categories) - for field in categories: - dtype[field] = 'category' - for cat in self.cats: - dtype[cat] = "category" - self.dtypes = dtype - return dtype - def __getstate__(self): if self.fmd.row_groups is None: self.fmd.row_groups = [] @@ -1035,32 +772,6 @@ def __str__(self): __repr__ = __str__ -def _pre_allocate(size, columns, categories, index, cs, dt, tz=None, columns_dtype=None): - index = [index] if isinstance(index, str) else (index or []) - cols = [c for c in columns if c not in index] - categories = categories or {} - cats = cs.copy() - if isinstance(categories, dict): - cats.update(categories) - - def get_type(name, index=False): - if name in categories: - return 'category' - t = dt[name] - if index and isinstance(t, pd.core.arrays.masked.BaseMaskedDtype): - return "int64" - return t - - dtypes = [get_type(c) for c in cols] - index_types = [get_type(i, index=True) for i in index] - cols.extend(cs) - dtypes.extend(['category'] * len(cs)) - df, views = dataframe.empty(dtypes, size, cols=cols, index_names=index, - index_types=index_types, cats=cats, timezones=tz, - columns_dtype=columns_dtype) - return df, views - - def paths_to_cats(paths, partition_meta=None): """ Extract categorical fields and labels from hive- or drill-style paths. diff --git a/fastparquet/cencoding.pyx b/fastparquet/cencoding.pyx index 64f54174..be96a840 100644 --- a/fastparquet/cencoding.pyx +++ b/fastparquet/cencoding.pyx @@ -13,15 +13,16 @@ import cython import numpy as np +cimport numpy as np cdef extern from "string.h": - void *memcpy(void *dest, const void *src, size_t n) + void *memcpy(void *dest, const void *src, size_t n) noexcept nogil from cpython cimport ( - PyBytes_FromStringAndSize, PyBytes_GET_SIZE, PyUnicode_DecodeUTF8, + PyBytes_FromStringAndSize, PyBytes_GET_SIZE, PyUnicode_DecodeUTF8 ) from libc.stdint cimport int8_t, uint8_t, uint32_t, int32_t, uint64_t, int64_t -cpdef void read_rle(NumpyIO file_obj, int32_t header, int32_t bit_width, NumpyIO o, int32_t itemsize=4): +cpdef void read_rle(NumpyIO file_obj, int32_t header, int32_t bit_width, NumpyIO o, int32_t itemsize=4) noexcept nogil: """Read a run-length encoded run from the given fo with the given header and bit_width. The count is determined from the header and the width is used to grab the @@ -52,7 +53,7 @@ cpdef void read_rle(NumpyIO file_obj, int32_t header, int32_t bit_width, NumpyIO file_obj.loc += inptr - file_obj.get_pointer() -cpdef int32_t width_from_max_int(int64_t value): +cpdef int32_t width_from_max_int(int64_t value) noexcept nogil: """Convert the value specified to a bit_width.""" cdef int32_t i for i in range(0, 64): @@ -61,17 +62,16 @@ cpdef int32_t width_from_max_int(int64_t value): value >>= 1 -cdef int32_t _mask_for_bits(int32_t i): +cdef int32_t _mask_for_bits(int32_t i) noexcept nogil: """Generate a mask to grab `i` bits from an int value.""" return (1 << i) - 1 -cpdef void read_bitpacked1(NumpyIO file_obj, int32_t count, NumpyIO o): +cpdef void read_bitpacked1(NumpyIO file_obj, int32_t count, NumpyIO o) noexcept nogil: # implementation of np.unpackbits with output array. Output is int8 array cdef: char * inptr = file_obj.get_pointer() char * outptr = o.get_pointer() - char * endptr unsigned char data int32_t counter, i, startcount=count if count > o.nbytes - o.loc: @@ -126,7 +126,7 @@ cpdef void write_bitpacked1(NumpyIO file_obj, int32_t count, NumpyIO o): o.loc += (count + 7) // 8 -cpdef void read_bitpacked(NumpyIO file_obj, int32_t header, int32_t width, NumpyIO o, int32_t itemsize=4): +cpdef void read_bitpacked(NumpyIO file_obj, int32_t header, int32_t width, NumpyIO o, int32_t itemsize=4) noexcept nogil: """ Read values packed into width-bits each (which can be >8) """ @@ -144,7 +144,7 @@ cpdef void read_bitpacked(NumpyIO file_obj, int32_t header, int32_t width, Numpy return endptr = (o.nbytes - o.loc) + outptr - itemsize mask = _mask_for_bits(width) - data = 0xff & inptr[0] + data = 0xff & (inptr[0]) inptr += 1 while count: if right > 8: @@ -169,14 +169,14 @@ cpdef void read_bitpacked(NumpyIO file_obj, int32_t header, int32_t width, Numpy file_obj.loc += inptr - file_obj.get_pointer() -cpdef uint64_t read_unsigned_var_int(NumpyIO file_obj): +cpdef uint64_t read_unsigned_var_int(NumpyIO file_obj) noexcept nogil: """Read a value using the unsigned, variable int encoding. file-obj is a NumpyIO of bytes; avoids struct to allow numba-jit """ cdef uint64_t result = 0 cdef int32_t shift = 0 cdef char byte - cdef char * inptr = file_obj.get_pointer() + cdef char * inptr = file_obj.ptr + file_obj.loc # file_obj.get_pointer() while True: byte = inptr[0] @@ -185,12 +185,12 @@ cpdef uint64_t read_unsigned_var_int(NumpyIO file_obj): if (byte & 0x80) == 0: break shift += 7 - file_obj.loc += inptr - file_obj.get_pointer() + file_obj.loc += inptr - (file_obj.ptr + file_obj.loc) return result -cpdef void read_rle_bit_packed_hybrid(NumpyIO io_obj, int32_t width, uint32_t length, NumpyIO o, - int32_t itemsize=4): +def read_rle_bit_packed_hybrid(NumpyIO io_obj, int32_t width, uint32_t length, NumpyIO o, + int32_t itemsize=4): """Read values from `io_obj` using the rel/bit-packed hybrid encoding. If length is not specified, then a 32-bit int is read first to grab the @@ -202,27 +202,31 @@ cpdef void read_rle_bit_packed_hybrid(NumpyIO io_obj, int32_t width, uint32_t le at .tell(). """ cdef int32_t start, header - if length is False: - length = io_obj.read_int() - start = io_obj.loc - while io_obj.loc - start < length and o.loc < o.nbytes: - header = read_unsigned_var_int(io_obj) - if header & 1 == 0: - read_rle(io_obj, header, width, o, itemsize) - else: - read_bitpacked(io_obj, header, width, o, itemsize) + with nogil: + if length is False: + length = io_obj.read_int() + start = io_obj.loc + while io_obj.loc - start < length and o.loc < o.nbytes: + header = read_unsigned_var_int(io_obj) + if header & 1 == 0: + read_rle(io_obj, header, width, o, itemsize) + else: + read_bitpacked(io_obj, header, width, o, itemsize) cdef void delta_read_bitpacked(NumpyIO file_obj, uint8_t bitwidth, - NumpyIO o, uint64_t count, uint8_t longval=0): + NumpyIO o, uint64_t count, uint8_t longval=0) noexcept nogil: cdef: uint64_t data = 0 int8_t left = 0 int8_t right = 0 - uint64_t mask = 0XFFFFFFFFFFFFFFFF >> (64 - bitwidth) + uint64_t mask = 0XFFFFFFFFFFFFFFFF + mask = mask >> (64 - bitwidth) while count > 0: if (left - right) < bitwidth: - data = data | (file_obj.read_byte() << left) + # data = data | (file_obj.read_byte() << left) + data = data | (file_obj.ptr[file_obj.loc] << left) + file_obj.loc += 1 left += 8 elif right > 8: data >>= 8 @@ -247,43 +251,44 @@ cpdef void delta_binary_unpack(NumpyIO file_obj, NumpyIO o, uint8_t longval=0): const uint8_t[:] bitwidths uint8_t bitwidth values_per_miniblock = block_size // miniblock_per_block - while True: - min_delta = zigzag_long(read_unsigned_var_int(file_obj)) - bitwidths = file_obj.read(miniblock_per_block) - for i in range(miniblock_per_block): - bitwidth = bitwidths[i] - if bitwidth: - temp = o.loc - if count > 1: - # no more diffs if on last value - delta_read_bitpacked(file_obj, bitwidth, o, values_per_miniblock, longval) - o.loc = temp - for j in range(values_per_miniblock): - if longval: - temp = o.read_long() - o.loc -= 8 - o.write_long(value) - else: - temp = o.read_int() - o.loc -= 4 - o.write_int(value) - value += min_delta + temp - count -= 1 - if count <= 0: - return - else: - for j in range(values_per_miniblock): - if longval: - o.write_long(value) - else: - o.write_int(value) - value += min_delta - count -= 1 - if count <= 0: - return - - -cpdef void encode_unsigned_varint(uint64_t x, NumpyIO o): # pragma: no cover + with nogil: + while True: + min_delta = zigzag_long(read_unsigned_var_int(file_obj)) + bitwidths = file_obj.read(miniblock_per_block) + for i in range(miniblock_per_block): + bitwidth = bitwidths[i] + if bitwidth: + temp = o.loc + if count > 1: + # no more diffs if on last value + delta_read_bitpacked(file_obj, bitwidth, o, values_per_miniblock, longval) + o.loc = temp + for j in range(values_per_miniblock): + if longval: + temp = o.read_long() + o.loc -= 8 + o.write_long(value) + else: + temp = o.read_int() + o.loc -= 4 + o.write_int(value) + value += min_delta + temp + count -= 1 + if count <= 0: + return + else: + for j in range(values_per_miniblock): + if longval: + o.write_long(value) + else: + o.write_int(value) + value += min_delta + count -= 1 + if count <= 0: + return + + +cpdef void encode_unsigned_varint(uint64_t x, NumpyIO o) noexcept nogil: while x > 127: o.write_byte((x & 0x7F) | 0x80) x >>= 7 @@ -342,14 +347,14 @@ cdef class NumpyIO(object): self.ptr = &data[0] self.nbytes = data.shape[0] - cdef char* get_pointer(self): + cdef char* get_pointer(self) noexcept nogil: return self.ptr + self.loc @property def len(self): return self.nbytes - cpdef const uint8_t[:] read(self, int32_t x=-1): + cpdef const uint8_t[:] read(self, int32_t x=-1) noexcept nogil: cdef const uint8_t[:] out if x < 1: x = self.nbytes - self.loc @@ -357,13 +362,13 @@ cdef class NumpyIO(object): self.loc += x return out - cpdef uint8_t read_byte(self): + cpdef uint8_t read_byte(self) noexcept nogil: cdef char out out = self.ptr[self.loc] self.loc += 1 return out - cpdef int32_t read_int(self): + cpdef int32_t read_int(self) noexcept nogil: cdef int32_t i if self.nbytes - self.loc < 4: return 0 @@ -371,30 +376,30 @@ cdef class NumpyIO(object): self.loc += 4 return i - cpdef void write(self, const char[::1] d): + cpdef void write(self, const char[::1] d) noexcept nogil: memcpy(self.ptr[self.loc], &d[0], d.shape[0]) self.loc += d.shape[0] - cpdef void write_byte(self, uint8_t b): + cpdef void write_byte(self, uint8_t b) noexcept nogil: if self.loc >= self.nbytes: # ignore attempt to write past end of buffer return self.ptr[self.loc] = b self.loc += 1 - cpdef void write_int(self, int32_t i): + cpdef void write_int(self, int32_t i) noexcept nogil: if self.nbytes - self.loc < 4: return ( self.get_pointer())[0] = i self.loc += 4 - cdef void write_long(self, int64_t i): + cdef void write_long(self, int64_t i) noexcept nogil: if self.nbytes - self.loc < 8: return ( self.get_pointer())[0] = i self.loc += 8 - cdef int64_t read_long(self): + cdef int64_t read_long(self) noexcept nogil: cdef int64_t i if self.nbytes - self.loc < 8: return 0 @@ -402,15 +407,15 @@ cdef class NumpyIO(object): self.loc += 8 return i - cdef void write_many(self, char b, int32_t count): + cdef void write_many(self, char b, int32_t count) noexcept nogil: cdef int32_t i for i in range(count): self.write_byte(b) - cpdef int32_t tell(self): + cpdef int32_t tell(self) noexcept nogil: return self.loc - cpdef uint32_t seek(self, int32_t loc, int32_t whence=0): + cpdef uint32_t seek(self, int32_t loc, int32_t whence=0) noexcept nogil: if whence == 0: self.loc = loc elif whence == 1: @@ -422,7 +427,7 @@ cdef class NumpyIO(object): return self.loc @cython.wraparound(False) - cpdef const uint8_t[:] so_far(self): + cpdef const uint8_t[:] so_far(self) noexcept nogil: """ In write mode, the data we have gathered until now """ return self.data[:self.loc] @@ -497,7 +502,7 @@ def _assemble_objects(object[:] assign, const uint8_t[:] defi, const uint8_t[:] cdef int64_t nat = -9223372036854775808 -cpdef void time_shift(const int64_t[::1] data, int32_t factor=1000): +cpdef void time_shift(const int64_t[::1] data, int32_t factor=1000) noexcept nogil: cdef int32_t i cdef int64_t * ptr cdef int64_t value @@ -508,93 +513,87 @@ cpdef void time_shift(const int64_t[::1] data, int32_t factor=1000): ptr += 1 -cdef int32_t zigzag_int(uint64_t n): +cdef int32_t zigzag_int(uint64_t n) noexcept nogil: return (n >> 1) ^ -(n & 1) -cdef int64_t zigzag_long(uint64_t n): +cdef int64_t zigzag_long(uint64_t n) noexcept nogil: return (n >> 1) ^ -(n & 1) -cdef uint64_t long_zigzag(int64_t n): +cdef uint64_t long_zigzag(int64_t n) noexcept nogil: return (n << 1) ^ (n >> 63) -cpdef dict read_thrift(NumpyIO data): +cdef dict read_thrift(NumpyIO data): cdef char byte, id = 0, bit cdef int32_t size cdef dict out = {} - cdef bint hasi64 = 0 - cdef bint hasi32 = 0 - cdef list i32 = None while True: - byte = data.read_byte() + # byte = data.read_byte() + byte = data.ptr[data.loc] + data.loc += 1 + if byte == 0: break id += (byte & 0b11110000) >> 4 bit = byte & 0b00001111 - if bit == 5: + if bit == 6: out[id] = zigzag_long(read_unsigned_var_int(data)) - hasi32 = True - if i32 is None: - i32 = list() - i32.append(id) - elif bit == 6: + elif bit == 5: out[id] = zigzag_long(read_unsigned_var_int(data)) - hasi64 = True elif bit == 7: - out[id] = data.get_pointer()[0] + out[id] = (data.ptr + data.loc)[0] data.seek(8, 1) elif bit == 8: size = read_unsigned_var_int(data) - out[id] = PyBytes_FromStringAndSize(data.get_pointer(), size) + out[id] = PyBytes_FromStringAndSize((data.ptr + data.loc), size) data.seek(size, 1) elif bit == 9: out[id] = read_list(data) elif bit == 12: out[id] = read_thrift(data) + elif bit == 4: + out[id] = zigzag_long(read_unsigned_var_int(data)) elif bit == 1: out[id] = True elif bit == 2: out[id] = False - elif bit == 4: - # I16 - out[id] = zigzag_long(read_unsigned_var_int(data)) elif bit == 3: # I8 - out[id] = data.read_byte() + # out[id] = data.read_byte() + out[id] = data.ptr[data.loc] + data.loc = 1 else: print("Corrupted thrift data at ", data.tell(), ": ", id, bit) - if hasi32: - if hasi64: - out["i32list"] = i32 - else: - out["i32"] = 1 return out cdef list read_list(NumpyIO data): cdef unsigned char byte, typ cdef int32_t size, bsize, _ - byte = data.read_byte() + # byte = data.read_byte() + byte = data.ptr[data.loc] + data.loc += 1 + if byte >= 0xf0: # 0b11110000 size = read_unsigned_var_int(data) else: size = ((byte & 0xf0) >> 4) - out = [] typ = byte & 0x0f # 0b00001111 + cdef list out = [None] * size if typ == 5 or typ == 6: for _ in range(size): - out.append(zigzag_long(read_unsigned_var_int(data))) + out[_] = zigzag_long(read_unsigned_var_int(data)) elif typ == 8: for _ in range(size): # all parquet list types contain str, not bytes bsize = read_unsigned_var_int(data) - out.append(PyUnicode_DecodeUTF8(data.get_pointer(), bsize, "ignore")) - data.seek(bsize, 1) + out[_] = PyUnicode_DecodeUTF8(data.ptr + data.loc, bsize, "ignore") + data.loc += bsize else: for _ in range(size): - out.append(read_thrift(data)) + out[_] = read_thrift(data) return out @@ -833,7 +832,7 @@ cdef class ThriftObject: cpdef _asdict(self): """Create dict version with field names instead of integers""" cdef str k - cdef out = {} + cdef dict out = {} for k in self.spec: if k in self.children: lower = getattr(self, k) @@ -873,7 +872,7 @@ cdef class ThriftObject: @staticmethod def from_fields(thrift_name,bint i32=0, list i32list=None, **kwargs): - cdef spec = specs[thrift_name] + cdef dict spec = specs[thrift_name] cdef int i cdef str k cdef dict out = {} @@ -1126,3 +1125,187 @@ cdef dict children = { # if bit: # children[o.__name__] = bit # + + +def value_counts(uint8_t[::1] values, uint64_t[::1] out): + # good enough when there are no mixed refs/defs; can be called repeatedly to fill out + cdef uint64_t i + with nogil: + for i in range(values.shape[0]): + out[values[i]] += 1 + + +def make_offsets_and_masks_no_nulls( + uint8_t[::1] reps, # repetition levels + uint8_t[::1] defs, # definition levels + list offsets, # contains uint64 np arrays + uint64_t[::1] ocounts # offsets counter of length offsets, same length as offsets + ): + # when we have nested required ragged lists + # regular arrays would be a special case of this + cdef: + uint64_t loffs = len(offsets) # == max_def + uint64_t i, j # counters + int64_t[::1] temp # for unbundling + (int64_t*)[256] offset_ptrs # max number of levels allowed here is 256 + + # unbundle mutable inputs + for i in range(loffs): + temp = offsets[i] # checks type and dtype + offset_ptrs[i] = &temp[0] + + # run + with nogil: + for i in range(reps.shape[0]): + for j in range(reps[i], defs[i] + 1): + if j < loffs: + offset_ptrs[j][ocounts[j]] = ocounts[j + 1] + ocounts[j] += 1 + for j in range(loffs): + # last offset + offset_ptrs[j][ocounts[j]] = ocounts[j + 1] + + +def make_offsets_and_masks_no_reps( + uint8_t[::1] defs, # definition levels + list offsets, # contains uint64 np arrays + uint64_t[::1] ocounts # offsets counter of length offsets, same length as offsets + ): + # where we have nested nullable structs, but no lists + cdef: + uint64_t loffs = len(offsets) # == max_def + uint64_t i # counters8 + uint8_t d + int64_t[::1] temp # for unbundling + (int64_t*)[256] offset_ptrs # max number of levels allowed here is 256 + + # unbundle mutable inputs + for i in range(loffs): + temp = offsets[i] # checks type and dtype + offset_ptrs[i] = &temp[0] + + # run + with nogil: + for i in range(defs.shape[0]): + d = defs[i] + if d < loffs: + offset_ptrs[d][i] = -1 + else: + offset_ptrs[d][i] = ocounts[d + 1] + ocounts[d] += 1 + + +def one_level_optional( + uint8_t[::1] defs, + int64_t[::1] inds, + uint64_t count0 = 0, # first index value + uint8_t max_def = 1 # level that means "real value" + ): + # nullable simple type; equivalent: + # inds[defs == 1] = np.arange(count0, len(values) + count0) + # inds[defs == 0] = -1 + # this can be parallel, by passing count0 (values so far), which we always can know + cdef uint64_t i + with nogil: + for i in range(defs.shape[0]): + if defs[i] == max_def: + inds[i] = count0 + count0 += 1 + else: + inds[i] = -1 + return count0 + + +def make_offsets_and_masks( + uint8_t[::1] reps, # repetition levels + uint8_t[::1] defs, # definition levels + list offsets, # contains uint64 np arrays + uint8_t[::1] rep_map, # rep value -> offset index mapping + uint8_t[::1] rep_flags, # of offsets list, 0: offset, 1: is index, 2: not used + uint64_t[::1] ocounts, # offsets counter of length offsets, len(offsets) + 1 + ): + # general case + cdef: + uint64_t loffs = len(offsets) # == max_def + uint64_t i, j # counters + uint8_t r, d + int64_t[::1] temp # for unbundling + (int64_t*)[256] offset_ptrs # max number of levels allowed here is 256 + + # unbundle mutable inputs + for i in range(loffs): + temp = offsets[i] # checks type and dtype + offset_ptrs[i] = &temp[0] + + # run + with nogil: + for i in range(reps.shape[0]): + r = rep_map[reps[i]] + d = defs[i] + for j in range(r, d + 1): + if j < loffs: + if (rep_flags[j] == 1) & (j == d): + offset_ptrs[j][ocounts[j]] = -1 + elif rep_flags[j] != 2: # a value of 2 means do not use + offset_ptrs[j][ocounts[j]] = ocounts[j + 1] + ocounts[j] += 1 + + +def parse_plain_strings(uint8_t[::1] data, uint64_t[::1] offsets, uint64_t nvalues): + """Extract strings into compact form and offsets, like arrow would""" + # may need delta-string decoder, if we ever see such data + # TODO: replace np python calls with CAPI PyArray_SimpleNew? + # https://stackoverflow.com/a/53162659/3821154 + out = np.empty(data.shape[0] - (4 * nvalues), dtype="uint8") + cdef uint8_t[::1] o = out + cdef uint64_t i, offset, tot + cdef uint32_t size + offset = 0 + tot = 0 + with nogil: + for i in range(nvalues): + offsets[i] = tot + size = ((&data[offset]))[0] + offset += 4 + o[tot: tot + size] = data[offset: offset + size] + tot += size + offset += size + offsets[i + 1] = tot + return out + + +def check_arange(int64_t[::1] arr): + """Is this like a range (with NULLs)""" + cdef int64_t val + cdef int64_t last = -1 + cdef uint8_t out = 1 + + with nogil: + for val in arr: + if val < 0: + continue + if val - last != 1: + out = 0 + break + last = val + return bool(out) + + +def filter_rg_cols(ThriftObject rg, list cols): + if cols is None: + return rg.columns + + cdef list out = [] + cdef set columns = set(cols) + cdef str name + cdef dict col + cdef list internals = rg.columns + cdef int i = 0 # enumerator + + for col in rg[1]: + # column.meta_data.path_in_schema + name = ".".join(col[3][3]) + if name in columns: + out.append(internals[i]) + i += 1 + return out diff --git a/fastparquet/compression.py b/fastparquet/compression.py index 01188d70..cf23c95b 100644 --- a/fastparquet/compression.py +++ b/fastparquet/compression.py @@ -59,12 +59,6 @@ def lz4_decomp(data, size): decompressions['LZ4_RAW'] = lz4_decomp compressions['ZSTD'] = cramjam.zstd.compress decompressions['ZSTD'] = cramjam.zstd.decompress -decom_into = { - "GZIP": cramjam.gzip.decompress_into, - "SNAPPY": cramjam.snappy.decompress_raw_into, - "ZSTD": cramjam.zstd.decompress_into, - "BROTLI": cramjam.brotli.decompress_into -} compressions = {k.upper(): v for k, v in compressions.items()} decompressions = {k.upper(): v for k, v in decompressions.items()} @@ -106,9 +100,4 @@ def decompress_data(data, uncompressed_size, algorithm='gzip'): "Decompression '%s' not available. Options: %s" % (algorithm.upper(), sorted(decompressions)) ) - if algorithm.upper() in decom_into: - # ensures writable buffer from cramjam - x = np.empty(uncompressed_size, dtype='uint8') - decom_into[algorithm.upper()](np.frombuffer(data, dtype=np.uint8), x) - return x return decompressions[algorithm.upper()](data, uncompressed_size) diff --git a/fastparquet/converted_types.py b/fastparquet/converted_types.py index f7876621..b7ec8df4 100644 --- a/fastparquet/converted_types.py +++ b/fastparquet/converted_types.py @@ -1,4 +1,3 @@ -# -#- coding: utf-8 -#- """ Deal with parquet logical types (aka converted types), higher-order things built from primitive types. @@ -8,7 +7,6 @@ import logging import numpy as np -import pandas as pd from fastparquet import parquet_thrift from fastparquet.cencoding import time_shift @@ -63,28 +61,6 @@ def tobson(x): parquet_thrift.ConvertedType.TIME_MICROS: np.dtype(' 0: + o.resize(cnt, refcheck=False) def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats, - selfmade=False, assign=None, row_filter=False): + selfmade=False, assign=None, row_filter=False, ex=None): """ Read a row group and return as a dict of arrays @@ -597,57 +654,34 @@ def read_row_group_arrays(file, rg, columns, categories, schema_helper, cats, will be pandas Categorical objects: the codes and the category labels are arrays. """ - out = assign - remains = set(_ for _ in out if not _.endswith("-catdef") and not _ + "-catdef" in out) - maps = {} - - for column in rg.columns: - - if (_is_list_like(schema_helper, column) or - _is_map_like(schema_helper, column)): - name = ".".join(column.meta_data.path_in_schema[:-2]) + # TODO: batch reads, since we know all the offsets, and that this is a single file + # maybe could extend to supra-group too. + # see fsspec.parquet + for column in encoding.filter_rg_cols(rg, columns): + if ex is None: + read_col(column, schema_helper, file, use_cat=False, + assign=assign) else: - name = ".".join(column.meta_data.path_in_schema) - if name not in columns or name in cats: - continue - remains.discard(name) - - read_col(column, schema_helper, file, use_cat=name+'-catdef' in out, - selfmade=selfmade, assign=out[name], - catdef=out.get(name+'-catdef', None), - row_filter=row_filter) - - if _is_map_like(schema_helper, column): - # TODO: could be done in fast loop in _assemble_objects? - if name not in maps: - maps[name] = out[name].copy() - else: - if column.meta_data.path_in_schema[0] == 'key': - key, value = out[name], maps[name] - else: - value, key = out[name], maps[name] - out[name][:] = [dict(zip(k, v)) if k is not None else None - for k, v in zip(key, value)] - del maps[name] - for k in remains: - out[k][:] = None + ex.submit( + read_col, column, schema_helper, file, + use_cat=False, assign=assign + ) def read_row_group(file, rg, columns, categories, schema_helper, cats, selfmade=False, index=None, assign=None, - scheme='hive', partition_meta=None, row_filter=False): + scheme='hive', partition_meta=None, row_filter=False, + ex=None): """ Access row-group in a file and read some columns into a data-frame. """ partition_meta = partition_meta or {} - if assign is None: - raise RuntimeError('Going with pre-allocation!') read_row_group_arrays(file, rg, columns, categories, schema_helper, - cats, selfmade, assign=assign, row_filter=row_filter) + cats, selfmade, assign=assign, row_filter=row_filter, ex=ex) for cat in cats: if cat not in assign: - # do no need to have partition columns in output + # do not need to have partition columns in output continue if scheme == 'hive': partitions = [s.split("=") for s in rg.columns[0].file_path.split("/")] @@ -656,4 +690,5 @@ def read_row_group(file, rg, columns, categories, schema_helper, cats, rg.columns[0].file_path.split('/')[:-1])] key, val = [p for p in partitions if p[0] == cat][0] val = val_to_num(val, meta=partition_meta.get(key)) + # TODO: this is a perfect IndexedArray assign[cat][:] = cats[cat].index(val) diff --git a/fastparquet/dataframe.py b/fastparquet/dataframe.py deleted file mode 100644 index 1e2aa583..00000000 --- a/fastparquet/dataframe.py +++ /dev/null @@ -1,267 +0,0 @@ -import re -from collections import OrderedDict -from packaging.version import Version -import numpy as np -from pandas import ( - Categorical, DataFrame, Series, - CategoricalIndex, RangeIndex, Index, MultiIndex, - DatetimeIndex, CategoricalDtype, - DatetimeTZDtype -) -from pandas.core.arrays.masked import BaseMaskedDtype -import warnings - -from fastparquet.util import PANDAS_VERSION - - -class Dummy(object): - pass - - -def empty(types, size, cats=None, cols=None, index_types=None, index_names=None, - timezones=None, columns_dtype=None): - """ - Create empty DataFrame to assign into - - In the simplest case, will return a Pandas dataframe of the given size, - with columns of the given names and types. The second return value `views` - is a dictionary of numpy arrays into which you can assign values that - show up in the dataframe. - - For categorical columns, you get two views to assign into: if the - column name is "col", you get both "col" (the category codes) and - "col-catdef" (the category labels). - - For a single categorical index, you should use the `.set_categories` - method of the appropriate "-catdef" columns, passing an Index of values - - ``views['index-catdef'].set_categories(pd.Index(newvalues), fastpath=True)`` - - Multi-indexes work a lot like categoricals, even if the types of each - index are not themselves categories, and will also have "-catdef" entries - in the views. However, these will be Dummy instances, providing only a - ``.set_categories`` method, to be used as above. - - Parameters - ---------- - types: like np record structure, 'i4,u2,f4,f2,f4,M8,m8', or using tuples - applies to non-categorical columns. If there are only categorical - columns, an empty string of None will do. - size: int - Number of rows to allocate - cats: dict {col: labels} - Location and labels for categorical columns, e.g., {1: ['mary', 'mo]} - will create column index 1 (inserted amongst the numerical columns) - with two possible values. If labels is an integers, `{'col': 5}`, - will generate temporary labels using range. If None, or column name - is missing, will assume 16-bit integers (a reasonable default). - cols: list of labels - assigned column names, including categorical ones. - index_types: list of str - For one of more index columns, make them have this type. See general - description, above, for caveats about multi-indexing. If None, the - index will be the default RangeIndex. - index_names: list of str - Names of the index column(s), if using - timezones: dict {col: timezone_str} - for timestamp type columns, apply this timezone to the pandas series; - the numpy view will be UTC. - file_has_columns: bool, default False - for files that are filtered but had columns before - - Returns - ------- - - dataframe with correct shape and data-types - - list of numpy views, in order, of the columns of the dataframe. Assign - to this. - """ - views = {} - timezones = timezones or {} - - if isinstance(types, str): - types = types.split(',') - cols = cols if cols is not None else range(len(types)) - - def cat(col): - if cats is None or col not in cats: - return RangeIndex(0, 2**14) - elif isinstance(cats[col], int): - return RangeIndex(0, cats[col]) - else: # explicit labels list - return cats[col] - - df = OrderedDict() - for t, col in zip(types, cols): - if str(t) == 'category': - df[str(col)] = Categorical.from_codes([], categories=cat(col)) - elif isinstance(t, BaseMaskedDtype): - # pandas masked types - arr_type = t.construct_array_type() - df[str(col)] = arr_type( - values=np.empty(0, dtype=t.numpy_dtype), - mask=np.empty(0, dtype=np.bool_), - copy=False - ) - else: - if hasattr(t, 'base'): - # funky pandas not-dtype - t = t.base - if ("M" in str(t) or "time" in str(t)) and "[" not in str(t): - t = str(t) + "[ns]" - d = np.empty(0, dtype=t) - if d.dtype.kind == "M" and str(col) in timezones: - try: - z = tz_to_dt_tz(timezones[str(col)]) - d = Series(d).dt.tz_localize(z) - except: - warnings.warn("Inferring time-zone from %s in column %s " - "failed, using time-zone-agnostic" - "" % (timezones[str(col)], col)) - df[str(col)] = d - - columns = Index(df.keys(), dtype=columns_dtype) if columns_dtype is not None else None - df = DataFrame(df, columns=columns) - if not index_types: - index = RangeIndex(size) - elif len(index_types) == 1: - t, col = index_types[0], index_names[0] - if col is None: - raise ValueError('If using an index, must give an index name') - if str(t) == 'category': - # https://github.com/dask/fastparquet/issues/576#issuecomment-805579337 - temp = Categorical.from_codes([], categories=cat(col)) - vals = np.zeros(size, dtype=temp.codes.dtype) - c = Categorical.from_codes(vals, dtype=temp.dtype) - index = CategoricalIndex(c) - - views[col] = vals - views[col+'-catdef'] = index._data - else: - if hasattr(t, 'base'): - # funky pandas not-dtype - t = t.base - # Initialize datetime index to zero: uninitialized data might fail - # validation due to being an out-of-bounds datetime. xref - # https://github.com/dask/fastparquet/issues/778 - dtype = np.dtype(t) - d = np.zeros(size, dtype=dtype) if dtype.kind == "M" else np.empty(size, dtype=dtype) - if d.dtype.kind == "M" and str(col) in timezones: - # 1) create the DatetimeIndex in UTC as no datetime conversion is needed and - # it works with d uninitialised data (no NonExistentTimeError or AmbiguousTimeError) - # 2) convert to timezone (if UTC=noop, if None=remove tz, if other=change tz) - index = DatetimeIndex(d, tz="UTC").tz_convert( - tz_to_dt_tz(timezones[str(col)])) - else: - index = Index(d) - views[col] = d - else: - index = MultiIndex([[]], [[]]) - # index = MultiIndex.from_arrays(indexes) - index._levels = list() - index._labels = list() - index._codes = list() - index._names = list(index_names) - for i, col in enumerate(index_names): - index._levels.append(Index([None])) - - def set_cats(values, i=i, col=col, **kwargs): - values.name = col - if index._levels[i][0] is None: - index._levels[i] = values - elif not index._levels[i].equals(values): - raise RuntimeError("Different dictionaries encountered" - " while building categorical") - - x = Dummy() - x._set_categories = set_cats - x._multiindex = True - - d = np.zeros(size, dtype=int) - if PANDAS_VERSION >= Version("0.24.0"): - index._codes = list(index._codes) + [d] - else: - index._labels.append(d) - views[col] = d - views[col+'-catdef'] = x - - # Patch our blocks with desired-length arrays. Kids: don't try this at home. - mgr = df._mgr - for block in mgr.blocks: - bvalues = block.values - shape = list(bvalues.shape) - shape[-1] = size - - if isinstance(bvalues, Categorical): - code = np.full(fill_value=-1, shape=shape, dtype=bvalues.codes.dtype) - - values = Categorical.from_codes(codes=code, dtype=bvalues.dtype) - - elif isinstance(bvalues.dtype, DatetimeTZDtype): - dt = "M8[ns]" if PANDAS_VERSION.major < 2 else f'M8[{bvalues.dtype.unit}]' - values = np.zeros(shape=shape, dtype=dt) - values = type(bvalues)._from_sequence(values.view("int64"), copy=False, dtype=bvalues.dtype) - else: - if not isinstance(bvalues, np.ndarray): - # e.g. DatetimeLikeBlock backed by DatetimeArray/TimedeltaArray - if bvalues.dtype.kind == "m": - dt = "m8[ns]" if PANDAS_VERSION.major < 2 else bvalues.dtype - values = np.zeros(shape=shape, dtype=dt) - values = type(bvalues)._from_sequence(values.view("int64"), copy=False, dtype=bvalues.dtype) - elif bvalues.dtype.kind == "M": - dt = "M8[ns]" if PANDAS_VERSION.major < 2 else bvalues.dtype - values = np.zeros(shape=shape, dtype=dt) - values = type(bvalues)._from_sequence(values.view("int64"), copy=False, dtype=bvalues.dtype) - elif str(bvalues.dtype)[0] in {"I", "U"} or str(bvalues.dtype) == "boolean": - arr_type = bvalues.dtype.construct_array_type() - values = arr_type( - values=np.empty(size, dtype=bvalues.dtype.numpy_dtype), - mask=np.zeros(size, dtype=np.bool_) - ) - else: - raise NotImplementedError - else: - values = np.empty(shape=shape, dtype=bvalues.dtype) - - block.values = values - - mgr.axes[-1] = index - - # create views - for block in df._mgr.blocks: - dtype = block.dtype - inds = block.mgr_locs.indexer - if isinstance(inds, slice): - inds = list(range(inds.start, inds.stop, inds.step)) - for i, ind in enumerate(inds): - col = df.columns[ind] - if isinstance(dtype, CategoricalDtype): - views[col] = block.values._codes - views[col+'-catdef'] = block.values - elif getattr(block.dtype, 'tz', None): - arr = np.asarray(block.values, dtype='M8[ns]') - if len(arr.shape) > 1: - # pandas >= 1.3 does this for some reason - arr = arr.squeeze(axis=0) - views[col] = arr - elif str(dtype)[0] in {"I", "U"} or str(dtype) == "boolean": - views[col] = block.values - else: - views[col] = block.values[i] - - if index_names: - df.index.names = [ - None if re.match(r'__index_level_\d+__', n) else n - for n in index_names - ] - return df, views - - -def tz_to_dt_tz(z): - if ":" in z: - import datetime - hours, mins = z.split(":", 1) - sign = z.startswith("-") - z = int(hours) * 3600 - z += (1, -1)[sign] * int(mins) * 60 - z = datetime.timezone(datetime.timedelta(seconds=z)) - return z diff --git a/fastparquet/encoding.py b/fastparquet/encoding.py index 8e69c344..ca77e6fb 100755 --- a/fastparquet/encoding.py +++ b/fastparquet/encoding.py @@ -1,7 +1,6 @@ """encoding.py - methods for reading parquet encoded data blocks.""" import numpy as np from fastparquet.cencoding import read_bitpacked1, NumpyIO -from fastparquet.speedups import unpack_byte_array from fastparquet import parquet_thrift @@ -39,3 +38,48 @@ def read_plain(raw_bytes, type_, count, width=0, utf=False, stat=False): else: return np.array([bytes(raw_bytes)], dtype='O') return unpack_byte_array(raw_bytes, count, utf=utf) + + +def byte_stream_unsplit8(arr): + assert arr.dtype == "f8" + out = np.empty_like(arr) + view1 = out.view('uint8') + view2 = arr.view("uint8") + l = len(arr) + for i in range(8): + view1[i::8] = view2[i*l: (i+1)*l] + return out + + +def byte_stream_split8(arr): + assert arr.dtype == "f8" + out = np.empty_like(arr) + view1 = out.view('uint8') + view2 = arr.view("uint8") + l = len(arr) + for i in range(8): + view1[i*l: (i+1)*l] = view2[i::8] + return out + + + +def byte_stream_unsplit4(arr): + assert arr.dtype == "f4" + out = np.empty_like(arr) + view1 = out.view('uint8') + view2 = arr.view("uint8") + l = len(arr) + for i in range(4): + view1[i::4] = view2[i*l: (i+1)*l] + return out + + +def byte_stream_split4(arr): + assert arr.dtype == "f4" + out = np.empty_like(arr) + view1 = out.view('uint8') + view2 = arr.view("uint8") + l = len(arr) + for i in range(4): + view1[i*l: (i+1)*l] = view2[i::4] + return out diff --git a/fastparquet/json.py b/fastparquet/json.py index a9fb9b8e..a6a7156c 100644 --- a/fastparquet/json.py +++ b/fastparquet/json.py @@ -5,6 +5,7 @@ from typing import Optional logger = logging.getLogger("parquet") +env = os.getenv("FASTPARQUET_JSON_CODEC", "") class JsonCodecError(Exception): @@ -110,7 +111,6 @@ def _get_specific_codec(codec): def _get_cached_codec(): """Return the requested or first available json encoder/decoder implementation.""" - env = os.getenv("FASTPARQUET_JSON_CODEC", "") # return the cached codec instance only if the env variable didn't change if _codec_cache.env == env: return _codec_cache.instance diff --git a/fastparquet/schema.py b/fastparquet/schema.py index 83a43afd..89b4f15a 100755 --- a/fastparquet/schema.py +++ b/fastparquet/schema.py @@ -4,16 +4,23 @@ from fastparquet import parquet_thrift -def schema_tree(schema, i=0): +def schema_tree(schema, i=0, paths={}, path=[]): root = schema[i] + if i : + path = path + [root.name] + paths[".".join(path)] = root root["children"] = OrderedDict() - while len(root["children"]) < root.num_children: + while len(root["children"]) < root[5]: # ,num_children i += 1 s = schema[i] - root["children"][s.name] = s - if s.num_children not in [None, 0]: - i = schema_tree(schema, i) - if root.num_children: + s["parent"] = root + name = s[4] + root["children"][name] = s + if s[5] not in [None, 0]: + i = schema_tree(schema, i, paths, path) + else: + paths[".".join(path + [name])] = s + if root[5]: return i else: return i + 1 @@ -54,25 +61,6 @@ def schema_to_text(root, indent=[]): return text -def flatten(schema, root, name_parts=[]): - if not hasattr(schema, 'children'): - return - if schema is not root: - name_parts = name_parts + [schema.name] - # root["children"].pop('.'.join(name_parts), None) - for name, item in schema["children"].copy().items(): - if schema.repetition_type == parquet_thrift.FieldRepetitionType.REPEATED: - continue - if len(getattr(item, 'children', [])) == 0: - root["children"]['.'.join(name_parts + [name])] = item - elif item.converted_type in [parquet_thrift.ConvertedType.LIST, - parquet_thrift.ConvertedType.MAP]: - root["children"]['.'.join(name_parts + [name])] = item - else: - flatten(item, root, name_parts) - item["isflat"] = True - - class SchemaHelper(object): """Utility providing convenience methods for schema_elements.""" @@ -81,15 +69,16 @@ def __init__(self, schema_elements): self.schema_elements = schema_elements for se in schema_elements: try: - se.name = se.name.decode() + se[4] = se[4].decode() except AttributeError: pass # already a str self.root = schema_elements[0] - self.schema_elements_by_name = dict( - [(se.name, se) for se in schema_elements]) - schema_tree(schema_elements) + # repeats loop above doing decoding, could meld + self.schema_elements_by_name = { + se[4]: se for se in schema_elements} + self.tree = {} + schema_tree(schema_elements, paths=self.tree) self._text = None - flatten(self.root, self.root) @property def text(self): @@ -107,17 +96,16 @@ def __str__(self): return self.text def __repr__(self): - return "".format( - len(self.schema_elements)) + return f"" def schema_element(self, name): """Get the schema element with the given name or path""" - root = self.root - if isinstance(name, str): - name = name.split('.') - for part in name: - root = root["children"][part] - return root + if not isinstance(name, str): + name = ".".join(name) + return self.tree[name] + + def __getitem__(self, item): + return self.schema_element(item) def is_required(self, name): """Return true if the schema element with the given name is required.""" @@ -154,50 +142,3 @@ def max_definition_level(self, parts): if element.repetition_type != parquet_thrift.FieldRepetitionType.REQUIRED: max_level += 1 return max_level - - -def _is_list_like(helper, column): - if len(column.meta_data.path_in_schema) < 3: - return False - se = helper.schema_element( - column.meta_data.path_in_schema[:-2]) - ct = se.converted_type - if ct != parquet_thrift.ConvertedType.LIST: - return False - if len(se["children"]) > 1: - return False - se2 = list(se["children"].values())[0] - if len(se2["children"]) > 1: - return False - if se2.repetition_type != parquet_thrift.FieldRepetitionType.REPEATED: - return False - se3 = list(se2["children"].values())[0] - if se3.repetition_type == parquet_thrift.FieldRepetitionType.REPEATED: - return False - return True - - -def _is_map_like(helper, column): - if len(column.meta_data.path_in_schema) < 3: - return False - se = helper.schema_element( - column.meta_data.path_in_schema[:-2]) - ct = se.converted_type - if ct != parquet_thrift.ConvertedType.MAP: - return False - if len(se["children"]) > 1: - return False - se2 = list(se["children"].values())[0] - if len(se2["children"]) != 2: - return False - if se2.repetition_type != parquet_thrift.FieldRepetitionType.REPEATED: - return False - if set(se2["children"]) != {'key', 'value'}: - return False - se3 = se2["children"]['key'] - if se3.repetition_type != parquet_thrift.FieldRepetitionType.REQUIRED: - return False - se3 = se2["children"]['value'] - if se3.repetition_type == parquet_thrift.FieldRepetitionType.REPEATED: - return False - return True diff --git a/fastparquet/speedups.pyx b/fastparquet/speedups.pyx deleted file mode 100644 index aaaabaf8..00000000 --- a/fastparquet/speedups.pyx +++ /dev/null @@ -1,118 +0,0 @@ -""" -Native accelerators for Parquet encoding and decoding. -""" -# cython: profile=False -# cython: linetrace=False -# cython: binding=False -# cython: language_level=3 -# cython: initializedcheck=False -# cython: boundscheck=False -# cython: wraparound=False -# cython: overflowcheck=False -# cython: initializedcheck=False -# cython: cdivision=True -# cython: always_allow_keywords=False - -cdef extern from "string.h": - void *memcpy(void *dest, const void *src, size_t n) - -from cpython cimport (PyUnicode_AsUTF8String, PyUnicode_DecodeUTF8, - PyBytes_CheckExact, PyBytes_FromStringAndSize, - PyBytes_GET_SIZE, PyBytes_AS_STRING) -from cpython.unicode cimport PyUnicode_DecodeUTF8 - -import numpy as np -cimport numpy as np -import cython - - -_obj_dtype = np.dtype('object') - - -def array_encode_utf8(inp): - """ - utf-8 encode all elements of a 1d ndarray of "object" dtype. - A new ndarray of bytes objects is returned. - """ - # TODO: combine with pack_byte_array as is done for unpack - cdef: - Py_ssize_t i, n - np.ndarray[object, ndim=1] arr - np.ndarray[object] result - - arr = np.array(inp, copy=False) - - n = arr.shape[0] - # TODO: why not inplace? - result = np.empty(n, dtype=object) - for i in range(n): - # Fast utf-8 encoding, avoiding method call and codec lookup indirection - result[i] = PyUnicode_AsUTF8String(arr[i]) - - return result - - -def pack_byte_array(list items): - """ - Pack a variable length byte array column. - A bytes object is returned. - """ - cdef: - Py_ssize_t i, n, itemlen, total_size - unsigned char *start - unsigned char *data - object val, out - - # Strategy: compute the total output size and allocate it in one go. - n = len(items) - total_size = 0 - for i in range(n): - val = items[i] - if not PyBytes_CheckExact(val): - raise TypeError("expected list of bytes") - total_size += 4 + PyBytes_GET_SIZE(val) - - out = PyBytes_FromStringAndSize(NULL, total_size) - start = data = PyBytes_AS_STRING(out) - - # Copy data to output. - for i in range(n): - val = items[i] - # `itemlen` should be >= 0, so no signed extension issues - itemlen = PyBytes_GET_SIZE(val) - ( data)[0] = itemlen - data += 4 - memcpy(data, PyBytes_AS_STRING(val), itemlen) - data += itemlen - - assert (data - start) == total_size - return out - - -@cython.boundscheck(False) -def unpack_byte_array(const unsigned char[::1] raw_bytes, Py_ssize_t n, const char utf=False): - """ - Unpack a variable length byte array column. - An array of bytes objects is returned. - """ - cdef: - Py_ssize_t i = 0 - char* ptr = &raw_bytes[0] - int itemlen, bytecount - np.ndarray[object, ndim=1, mode="c"] out = np.empty(n, dtype="object") - - assert out is not None - bytecount = raw_bytes.shape[0] - while i < n and bytecount > 0: - - itemlen = ( ptr)[0] - ptr += 4 - if utf: - out[i] = PyUnicode_DecodeUTF8(ptr, itemlen, "ignore") - else: - out[i] = PyBytes_FromStringAndSize(ptr, itemlen) - ptr += itemlen - bytecount -= 4 + itemlen - i += 1 - - return out diff --git a/fastparquet/test/test_api.py b/fastparquet/test/test_api.py index 62cf749c..97a28213 100644 --- a/fastparquet/test/test_api.py +++ b/fastparquet/test/test_api.py @@ -1546,3 +1546,22 @@ def test_read_a_non_pandas_parquet_file(tempdir): assert parquet_file.count() == 2 assert parquet_file.head(1).equals(pd.DataFrame({"foo": [0], "bar": ["a"]})) + + +def test_categorical_roundrtip(tmpdir): + # GH#920 + df = pd.Series(["a", "b", "c"]).rename("cat").astype("category").to_frame() + + fn = f"{tmpdir}/cat.parquet" + data = [] + df.to_parquet(fn, engine="fastparquet") + pf = ParquetFile(fn) + breakpoint() + df_ = pd.read_parquet(fn, engine="pyarrow") + {'field_name': 'cat', 'metadata': {'num_categories': 3, 'ordered': False}, 'name': 'cat', 'numpy_type': 'int8', + 'pandas_type': 'categorical'} + {'field_name': 'cat', 'metadata': {'num_categories': 3, 'ordered': False}, 'name': 'cat', 'numpy_type': 'int8', + 'pandas_type': 'categorical'} + + + assert df_['cat'].dtype == "category" diff --git a/fastparquet/test/test_dataframe.py b/fastparquet/test/test_dataframe.py deleted file mode 100644 index 24da294c..00000000 --- a/fastparquet/test/test_dataframe.py +++ /dev/null @@ -1,141 +0,0 @@ -import warnings -from unittest import mock - -import numpy as np -import pandas as pd -import pytest -from numpy import empty as np_empty -from pandas.testing import assert_frame_equal - -from fastparquet.dataframe import empty - -DatetimeTZDtype = pd.DatetimeTZDtype - - -def test_empty(): - n = 100 - df, views = empty('category', size=n, cols=['c']) - assert df.shape == (n, 1) - assert df.dtypes.tolist() == ['category'] - assert views['c'].dtype == 'int16' - - df, views = empty('category', size=n, cols=['c'], cats={'c': 2**20}) - assert df.shape == (n, 1) - assert df.dtypes.tolist() == ['category'] - assert views['c'].dtype == 'int32' - - df, views = empty('category', size=n, cols=['c'], - cats={'c': ['one', 'two']}) - views['c'][0] = 1 - assert df.c[:2].tolist() == ['two', np.nan] - - df, views = empty('i4,i8,f8,f8,O', size=n, - cols=['i4', 'i8', 'f8_1', 'f8_2', 'O']) - assert df.shape == (n, 5) - assert len(views) == 5 - - -def test_no_cats(): - df, views = empty('category', size=10, cols=['c'], - cats={'c': []}) - assert (views["c"] == -1).all() - - -def test_empty_tz_utc(): - with warnings.catch_warnings(): - warnings.simplefilter("error") - empty([DatetimeTZDtype(unit="ns", tz="UTC")], 10, cols=['a'], - timezones={'a': 'UTC'}) - - -# non regression test for https://github.com/dask/fastparquet/issues/532 -def np_empty_mock(shape, dtype): - """mock numpy empty to return an initialised array with all hours in 2020 if shape is 365 and dtype.kind is M. - The objective is to simulate a numpy.empty that returns an uninitialized array with random content that - can cause issues when tz_localize is applied with a timezone with DST""" - import numpy - dtype = numpy.dtype(dtype) - if shape == 8784 and dtype.kind == "M": - a = numpy.arange(start="2020-01-01", stop="2021-01-01", dtype="M8[h]").astype(dtype) - else: - a = np_empty(shape, dtype) - return a - - -@mock.patch("numpy.empty", np_empty_mock) -def test_empty_tz_nonutc(): - df, views = empty(types=[DatetimeTZDtype(unit="ns", tz="CET")], size=8784, cols=['a'], - timezones={'a': 'CET', 'index': 'CET'}, index_types=["datetime64[ns]"], index_names=["index"]) - assert df.index.tz.zone == "CET" - assert df.a.dtype.tz.zone == "CET" - - -# non-regression test for https://github.com/dask/fastparquet/issues/778 -def test_empty_valid_timestamp(): - df, views = empty( - "i4", - size=100, - cols=["a"], - index_types=["datetime64[ms]"], - index_names=["timestamp"], - ) - assert isinstance(df.index, pd.DatetimeIndex) - - -def test_timestamps(): - z = 'US/Eastern' - - # single column - df, views = empty('M8', 100, cols=['t']) - assert df.t.dt.tz is None - views['t'].dtype.kind == "M" - - df, views = empty('M8', 100, cols=['t'], timezones={'t': z}) - assert df.t.dt.tz.zone == z - views['t'].dtype.kind == "M" - - # one time column, one normal - df, views = empty('M8,i', 100, cols=['t', 'i'], timezones={'t': z}) - assert df.t.dt.tz.zone == z - views['t'].dtype.kind == "M" - views['i'].dtype.kind == 'i' - - # no effect of timezones= on non-time column - df, views = empty('M8,i', 100, cols=['t', 'i'], timezones={'t': z, 'i': z}) - assert df.t.dt.tz.zone == z - assert df.i.dtype.kind == 'i' - views['t'].dtype.kind == "M" - views['i'].dtype.kind == 'i' - - # multi-timezones - z2 = 'US/Central' - df, views = empty('M8,M8', 100, cols=['t1', 't2'], timezones={'t1': z, - 't2': z}) - assert df.t1.dt.tz.zone == z - assert df.t2.dt.tz.zone == z - - df, views = empty('M8,M8', 100, cols=['t1', 't2'], timezones={'t1': z}) - assert df.t1.dt.tz.zone == z - assert df.t2.dt.tz is None - - df, views = empty('M8,M8', 100, cols=['t1', 't2'], timezones={'t1': z, - 't2': 'UTC'}) - assert df.t1.dt.tz.zone == z - assert str(df.t2.dt.tz) == 'UTC' - - df, views = empty('M8,M8', 100, cols=['t1', 't2'], timezones={'t1': z, - 't2': z2}) - assert df.t1.dt.tz.zone == z - assert df.t2.dt.tz.zone == z2 - - -def test_pandas_hive_serialization(tmpdir): - parquet_dir = tmpdir.join("test.par") - column = "data" - df = pd.DataFrame( - columns=[column], data=[("42",), ("",), ("0",), ("1",), ("0.0",)] - ) - df.to_parquet(parquet_dir, file_scheme="hive", row_group_offsets=[0, 2, 4], engine='fastparquet') - - df_ = pd.read_parquet(parquet_dir, engine='fastparquet') - assert_frame_equal(df, df_) diff --git a/fastparquet/test/test_nest.py b/fastparquet/test/test_nest.py new file mode 100644 index 00000000..e461abca --- /dev/null +++ b/fastparquet/test/test_nest.py @@ -0,0 +1,29 @@ +import os + +import numpy as np + +import fastparquet + +data = os.path.join( + os.path.dirname(os.path.dirname( + os.path.dirname(__file__)) + ), + "TEST-DATA" +) + + +def test_short(): + pf = fastparquet.ParquetFile(os.path.join(data, "output_table.parquet")) + out = pf.to_numpy() + expected = { + 'foo.with.strings-data': [0, 1, -1], + 'foo.with.strings-cats': ["hey", "there"], + 'foo.with.ints-data': [1, 2, 3], + 'foo.with.lists.list-offsets': [0, 1, 2, 3], + 'foo.with.lists.list.element-data': [0, 0, 0], + 'foo.with.lists.list.element-cats': [0] + } + final = {k: list(v) if isinstance(v, np.ndarray) else v + for k, v in out[0].items()} + + assert final == expected diff --git a/fastparquet/util.py b/fastparquet/util.py index 6c10dc5b..053565ae 100644 --- a/fastparquet/util.py +++ b/fastparquet/util.py @@ -1,4 +1,5 @@ from collections import defaultdict +import contextlib import copy from packaging.version import Version from functools import lru_cache @@ -11,7 +12,6 @@ import zoneinfo import numpy as np -import pandas as pd import fsspec @@ -19,7 +19,7 @@ from fastparquet.cencoding import ThriftObject from fastparquet import __version__ -PANDAS_VERSION = Version(pd.__version__) + created_by = f"fastparquet-python version {__version__} (build 0)" @@ -37,7 +37,7 @@ def default_mkdirs(f): def path_string(o): - if isinstance(o, pd.Timestamp): + if hasattr(o, "isoformat"): return o.isoformat() return str(o) @@ -54,18 +54,13 @@ def default_remove(paths): def val_from_meta(x, meta): - try: - if meta['pandas_type'] == 'categorical': - return x - t = np.dtype(meta['numpy_type']) - if t == "bool": - return x in [True, "true", "True", 't', "T", 1, "1"] - return np.dtype(t).type(x) - except ValueError: - if meta['numpy_type'] == 'datetime64[ns]': - return pd.to_datetime(x, format=PATH_DATE_FMT) - else: - raise + # may raise ValueError + if meta['pandas_type'] == 'categorical': + return x + t = np.dtype(meta['numpy_type']) + if t == "bool": + return x in [True, "true", "True", 't', "T", 1, "1"] + return np.dtype(t).type(x) def val_to_num(x, meta=None): @@ -96,13 +91,11 @@ def _val_to_num(x): except: pass try: - return pd.Timestamp(x) + return np.timedelta64(x) except: pass try: - # TODO: determine the valid usecases for this, then try to limit the set - # ofstrings which may get inadvertently converted to timedeltas - return pd.Timedelta(x) + return np.datetime64(x) except: return x @@ -136,30 +129,6 @@ def check_column_names(columns, *args): "" % (missing, arg, columns)) -def reset_row_idx(data: pd.DataFrame) -> pd.DataFrame: - """Reset row (multi-)index as column(s) of the DataFrame. - - Multi-index are stored in columns, one per index level. - - Parameters - ---------- - data : dataframe - - Returns - ------- - dataframe - """ - if isinstance(data.index, pd.MultiIndex): - for name, cats, codes in zip(data.index.names, data.index.levels, - data.index.codes): - data = data.assign(**{name: pd.Categorical.from_codes(codes, - cats)}) - data.reset_index(drop=True) - else: - data = data.reset_index() - return data - - def metadata_from_many(file_list, verify_schema=False, open_with=default_open, root=False, fs=None): """ @@ -183,73 +152,36 @@ def metadata_from_many(file_list, verify_schema=False, open_with=default_open, basepath: the root path that other paths are relative to fmd: metadata thrift structure """ + # TODO: evolution belongs here, unless we stick strictly to rowgroup-wise iteration from fastparquet import api - legacy = True if all(isinstance(pf, api.ParquetFile) for pf in file_list): pfs = file_list file_list = [pf.fn for pf in pfs] elif all(not isinstance(pf, api.ParquetFile) for pf in file_list): - if verify_schema or fs is None or len(file_list) < 3: - pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list] + f0 = file_list[0] + pf0 = api.ParquetFile(f0, fs=fs) + if pf0.file_scheme not in ['empty', 'simple']: + # set of directories, revert + pfs = [pf0] + [api.ParquetFile(fn, fs=fs) for fn in file_list[1:]] else: - # activate new code path here - f0 = file_list[0] - pf0 = api.ParquetFile(f0, open_with=open_with) - if pf0.file_scheme not in ['empty', 'simple']: - # set of directories, revert - pfs = [pf0] + [api.ParquetFile(fn, open_with=open_with) for fn in file_list[1:]] - else: - # permits concurrent fetch of footers; needs fsspec >= 2021.6 - size = int(1.4 * pf0._head_size) - pieces = fs.cat(file_list[1:], start=-size) - sizes = {path: int.from_bytes(piece[-8:-4], "little") + 8 for - path, piece in pieces.items()} - not_bigenough = [path for path, s in sizes.items() if s > size] - if not_bigenough: - new_pieces = fs.cat(not_bigenough, start=-max(sizes.values())) - pieces.update(new_pieces) - pieces = {k: _get_fmd(v) for k, v in pieces.items()} - pieces = [(fn, pieces[fn]) for fn in file_list[1:]] # recover ordering - legacy = False + # permits concurrent fetch of footers; needs fsspec >= 2021.6 + size = int(1.4 * pf0._head_size) + pieces = fs.cat(file_list[1:], start=-size) + sizes = {path: int.from_bytes(piece[-8:-4], "little") + 8 for + path, piece in pieces.items()} + not_bigenough = [path for path, s in sizes.items() if s > size] + if not_bigenough: + new_pieces = fs.cat(not_bigenough, start=-max(sizes.values())) + pieces.update(new_pieces) + pieces = {k: _get_fmd(v) for k, v in pieces.items()} + pieces = [(fn, pieces[fn]) for fn in file_list[1:]] # recover ordering + legacy = False else: raise ValueError("Merge requires all ParquetFile instances or none") basepath, file_list = analyse_paths(file_list, root=root) - if legacy: - # legacy code path - if verify_schema: - for pf in pfs[1:]: - if pf._schema != pfs[0]._schema: - raise ValueError('Incompatible schemas') - - fmd = copy.copy(pfs[0].fmd) # we inherit "created by" field - rgs = [] - - for pf, fn in zip(pfs, file_list): - if pf.file_scheme not in ['simple', 'empty']: - for rg in pf.row_groups: - rg = copy.copy(rg) - rg.columns = [copy.copy(c) for c in rg.columns] - for chunk in rg.columns: - chunk.file_path = '/'.join( - [fn, chunk.file_path if isinstance(chunk.file_path, str) else chunk.file_path.decode()] - ) - rgs.append(rg) - - else: - for rg in pf.row_groups: - rg = copy.copy(rg) - rg.columns = [copy.copy(c) for c in rg.columns] - for chunk in rg.columns: - chunk.file_path = fn - rgs.append(rg) - - fmd.row_groups = rgs - fmd.num_rows = sum(rg.num_rows for rg in fmd.row_groups) - return basepath, fmd - for rg in pf0.fmd.row_groups: # chunks of first file, which would have file_path=None rg.columns[0].file_path = f0[len(basepath):].lstrip("/") @@ -379,13 +311,6 @@ def analyse_paths(file_list, root=False): return '/'.join(basepath), out_list # use '/'.join() instead of join_path to be consistent with split('/') -def infer_dtype(column): - try: - return pd.api.types.infer_dtype(column, skipna=False) - except AttributeError: - return pd.lib.infer_dtype(column) - - def groupby_types(iterable): groups = defaultdict(list) for x in iterable: @@ -407,42 +332,12 @@ def get_column_metadata(column, name, object_dtype=None): if object_dtype in inferred_dtypes and dtype == "object": inferred_dtype = inferred_dtypes.get(object_dtype, "mixed") else: - inferred_dtype = infer_dtype(column) + inferred_dtype = "object" if str(dtype) == "bool": # pandas accidentally calls this "boolean" inferred_dtype = "bool" - if isinstance(dtype, pd.CategoricalDtype): - extra_metadata = { - 'num_categories': len(column.cat.categories), - 'ordered': column.cat.ordered, - } - dtype = column.cat.codes.dtype - elif isinstance(dtype, pd.DatetimeTZDtype): - if isinstance(dtype.tz, zoneinfo.ZoneInfo): - extra_metadata = {'timezone': dtype.tz.key} - else: - try: - stz = str(dtype.tz) - if "UTC" in stz and ":" in stz: - extra_metadata = {'timezone': stz.strip("UTC")} - elif len(str(stz)) == 3: # like "UTC", "CET", ... - extra_metadata = {'timezone': str(stz)} - elif getattr(dtype.tz, "zone", False): - extra_metadata = {'timezone': dtype.tz.zone} - elif "pytz" not in stz: - pd.Series([pd.to_datetime('now', utc=True)]).dt.tz_localize(stz) - extra_metadata = {'timezone': stz} - elif "Offset" in stz: - extra_metadata = {'timezone': f"{dtype.tz._minutes // 60:+03}:00"} - else: - raise KeyError - except Exception as e: - raise ValueError("Time-zone information could not be serialised: " - "%s, please use another" % str(dtype.tz)) from e - else: - extra_metadata = None - + extra_metadata = None if isinstance(name, tuple): name = str(name) elif not isinstance(name, str): @@ -470,16 +365,7 @@ def get_column_metadata(column, name, object_dtype=None): def get_numpy_type(dtype): - if isinstance(dtype, pd.CategoricalDtype): - return 'category' - elif "Int" in str(dtype): - return str(dtype).lower() - elif str(dtype) == "boolean": - return "bool" - elif str(dtype) == "string": - return "object" - else: - return str(dtype) + return str(dtype) def get_file_scheme(paths): @@ -555,3 +441,84 @@ def get_fs(fn, open_with, mkdirs): mkdirs = mkdirs or (lambda d: fs.mkdirs(d, exist_ok=True)) return fs, fn, open_with, mkdirs + +def simple_concat(*arrs): + # 3x faster than np.concatenate (2.4x with casting="no") + if len(arrs) == 1: + return arrs[0] + tot = sum(len(arr) for arr in arrs) + out = np.empty(tot, dtype=arrs[0].dtype) + off = 0 + for arr in arrs: + out[off:off+len(arr)] = arr + off += len(arr) + return out + + +def concat_and_add(*arrs, offset=True): + if len(arrs) == 1: + return arrs[0] + tot = sum(len(arr) - offset for arr in arrs) + offset + out = np.empty(tot, dtype=arrs[0].dtype) + off = 0 + for arr in arrs: + # TODO: test this! + out[off:off+len(arr)] = arr + out[off] + off += len(arr) - offset + return out + + +class Task: + """Just holds a function and arguments for later call""" + def __init__(self, func, *args, **kwargs): + self.func = func + self.args = args + self.kwargs = kwargs + + def __call__(self): + self.func(*self.args, **self.kwargs) + + +class ThreadPool: + """Simplistic fire&forget pool of X threads""" + + def __init__(self, num_workers, poll_time=0.0003): + self.num_workers = num_workers + self.tasks = [] + self.ntask = 0 + self.th = set() + self.done = [] + self.poll = poll_time + + def run_worker(self): + while True: + try: + self.tasks.pop(0)() + except IndexError: + raise SystemExit + finally: + self.done.append(None) + + def wait(self): + import time + while len(self.done) < self.ntask: + time.sleep(self.poll) + self.th.clear() + self.done.clear() + self.tasks.clear() + self.ntask = 0 + + def submit(self, func, *args, **kwargs): + self.tasks.append(Task(func, *args, **kwargs)) + + def go(self, wait=True): + import _thread + self.th = {_thread.start_new_thread(self.run_worker, ()) + for _ in range(self.num_workers)} + self.ntask = len(self.tasks) + if wait: + self.wait() + + def run_tasks(self, tasks: list[callable], wait=True): + self.tasks.extend(tasks) + self.go(wait) diff --git a/fastparquet/wrappers.py b/fastparquet/wrappers.py new file mode 100644 index 00000000..1f4ad3bc --- /dev/null +++ b/fastparquet/wrappers.py @@ -0,0 +1,104 @@ +import numpy as np + + +class IndexedNullable: + + def __init__(self, index, data): + self.index = index + self.data = data + + def __getitem__(self, item): + if isinstance(item, int): + ind = self.index[item] + return self.data[ind] if ind > 0 else None + elif isinstance(item, (np.ndarray, slice)): + item = np.atleast_1d(item) + return IndexedNullable(self.index[item], self.data) + else: + raise TypeError + + def to_masked(self): + data = np.empty(len(self.index), dtype=self.data.dtype) + mask = self.index >= 0 + data[mask] = self.data + return MaskedNullable(mask, data) + + def __len__(self): + return len(self.data) + + +class MaskedNullable: + + def __init__(self, mask, data) -> None: + self.mask = mask + self.data = data + + def __getitem__(self, item): + if isinstance(item, int): + m = self.mask[item] + return self.data[item] if m else None + elif isinstance(item, (np.ndarray, slice)): + return MaskedNullable(self.mask[item], self.data[item]) + else: + raise TypeError + + def to_indexed(self): + data = self.data[self.mask] + index = np.empty(len(data), dtype="int64") + index[self.mask == 0] = -1 + index[self.mask] = np.arange(len(data)) # could collect uniques + return IndexedNullable(index, data) + + def __len__(self): + return len(self.data) + + +class String: + def __init__(self, offsets, data) -> None: + # TODO: make this start/stop so we can take from length-data encoding? + self.offsets = offsets + self.data = data + + def __getitem__(self, item): + if isinstance(item, int): + return self.data[self.offsets[item]: self.offsets[item + 1]].tobytes().decode() + elif isinstance(item, slice): + assert item.step is None + if item.stop is None or item.stop == -1: + stop = None + else: + stop = item.stop + 1 + return String(self.offsets[item.start:stop], self.data) + elif isinstance(item, np.ndarray): + # completely repacks the data + # or make indexed/masked array? But what if they are then + # indexed? + raise NotImplementedError + else: + raise TypeError + + def __len__(self): + return len(self.offsets) - 1 + + +class Record: + def __init__(self, fields: list=None, contents: list=None, data: dict=None): + if data is None: + data = {f: c for f, c in zip(fields, contents)} + else: + if fields is not None or data is not None: + raise ValueError + self.data = data + + def __getitem__(self, item): + if isinstance(item, str): + return self.data[item] + elif isinstance(item, int): + return {f: c[item] for f, c in self.data.items()} + else: + return Record(data={f: c[item] for f, c in self.data}) + + def __len__(self): + if self.data: + return len(list(self.data.values())[0]) + return 0 diff --git a/fastparquet/writer.py b/fastparquet/writer.py index b023eb2e..f2e541c6 100644 --- a/fastparquet/writer.py +++ b/fastparquet/writer.py @@ -1,13 +1,9 @@ -import ast from copy import copy -import itertools import json import os import struct import numpy as np -import pandas as pd -from pandas.core.arrays.masked import BaseMaskedDtype from fastparquet.util import join_path @@ -18,27 +14,18 @@ from fastparquet.json import json_encoder from fastparquet.util import (default_open, default_mkdirs, check_column_names, created_by, get_column_metadata, - norm_col_name, path_string, reset_row_idx, get_fs, + norm_col_name, path_string, get_fs, update_custom_metadata) -from fastparquet.speedups import array_encode_utf8, pack_byte_array from fastparquet.cencoding import NumpyIO, ThriftObject, from_buffer from decimal import Decimal MARKER = b'PAR1' ROW_GROUP_SIZE = 50_000_000 -NaT = np.timedelta64(None).tobytes() # require numpy version >= 1.7 +NaT = np.timedelta64(None).tobytes() nat = np.datetime64('NaT').view('int64') typemap = { # primitive type, converted type, bit width 'boolean': (parquet_thrift.Type.BOOLEAN, None, 1), - 'Int32': (parquet_thrift.Type.INT32, None, 32), - 'Int64': (parquet_thrift.Type.INT64, None, 64), - 'Int8': (parquet_thrift.Type.INT32, parquet_thrift.ConvertedType.INT_8, 8), - 'Int16': (parquet_thrift.Type.INT32, parquet_thrift.ConvertedType.INT_16, 16), - 'UInt8': (parquet_thrift.Type.INT32, parquet_thrift.ConvertedType.UINT_8, 8), - 'UInt16': (parquet_thrift.Type.INT32, parquet_thrift.ConvertedType.UINT_16, 16), - 'UInt32': (parquet_thrift.Type.INT32, parquet_thrift.ConvertedType.UINT_32, 32), - 'UInt64': (parquet_thrift.Type.INT64, parquet_thrift.ConvertedType.UINT_64, 64), 'bool': (parquet_thrift.Type.BOOLEAN, None, 1), 'int32': (parquet_thrift.Type.INT32, None, 32), 'int64': (parquet_thrift.Type.INT64, None, 64), @@ -61,18 +48,6 @@ parquet_thrift.Type.FLOAT: np.float32, parquet_thrift.Type.DOUBLE: np.float64} -pdoptional_to_numpy_typemap = { - pd.Int8Dtype(): np.int8, - pd.Int16Dtype(): np.int16, - pd.Int32Dtype(): np.int32, - pd.Int64Dtype(): np.int64, - pd.UInt8Dtype(): np.uint8, - pd.UInt16Dtype(): np.uint16, - pd.UInt32Dtype(): np.uint32, - pd.UInt64Dtype(): np.uint64, - pd.BooleanDtype(): bool -} - def find_type(data, fixed_text=None, object_encoding=None, times='int64', is_index:bool=None): @@ -341,7 +316,7 @@ def infer_object_encoding(data): } for i in data: try: - if i is None or i is pd.NA or i is pd.NaT or i is np.nan or pd.isna(i): + if i is None or i is np.nan: continue except (ValueError, TypeError): pass @@ -439,16 +414,12 @@ def make_definitions(data, no_nulls, datapage_version=1): def _rows_per_page(data, selement, has_nulls=True, page_size=None): page_size = page_size or MAX_PAGE_SIZE - if isinstance(data.dtype, pd.CategoricalDtype): - bytes_per_element = data.cat.codes.dtype.itemsize - elif selement.type == parquet_thrift.Type.BOOLEAN: + if selement.type == parquet_thrift.Type.BOOLEAN: bytes_per_element = 0.125 elif selement.type == parquet_thrift.Type.INT64: bytes_per_element = 8 elif selement.type == parquet_thrift.Type.INT32: bytes_per_element = 4 - elif isinstance(data.dtype, BaseMaskedDtype) and data.dtype in pdoptional_to_numpy_typemap: - bytes_per_element = np.dtype(pdoptional_to_numpy_typemap[data.dtype]).itemsize elif data.dtype == "object" or str(data.dtype) == "string": dd = data.iloc[:1000] d2 = dd[dd.notnull()] @@ -512,53 +483,32 @@ def write_column(f, data0, selement, compression=None, datapage_version=None, data_page_offset = column_chunk_start # column global stats - if isinstance(data0.dtype, pd.CategoricalDtype) and stats: - try: - dnnu = data0.unique().as_ordered() - max, min = dnnu.max(), dnnu.min() - if pd.isna(max): - stats = False - else: - if selement.type == parquet_thrift.Type.BYTE_ARRAY: - if selement.converted_type is not None: - max = encode['PLAIN'](pd.Series([max]), selement)[4:] - min = encode['PLAIN'](pd.Series([min]), selement)[4:] - else: - max = encode['PLAIN'](pd.Series([max]), selement) - min = encode['PLAIN'](pd.Series([min]), selement) - except (TypeError, ValueError): - stats = False - elif stats: + if stats: try: max, min = data0.max(), data0.min() - if pd.isna(max): + if np.isna(max): stats = False else: if selement.type == parquet_thrift.Type.BYTE_ARRAY: if selement.converted_type is not None: # max = max.encode("utf8") ? - max = encode['PLAIN'](pd.Series([max], name=name), selement)[4:] - min = encode['PLAIN'](pd.Series([min], name=name), selement)[4:] + max = encode['PLAIN']([max], selement)[4:] + min = encode['PLAIN']([min], selement)[4:] else: - max = encode['PLAIN'](pd.Series([max], name=name, dtype=data0.dtype), selement) - min = encode['PLAIN'](pd.Series([min], name=name, dtype=data0.dtype), selement) + max = encode['PLAIN']([max], selement) + min = encode['PLAIN']([min], selement) except (TypeError, ValueError): stats = False for row_start, row_end in zip(row_offsets[:-1], row_offsets[1:]): data = data0.iloc[row_start:row_end] if has_nulls: - if isinstance(data.dtype, pd.CategoricalDtype): - num_nulls = (data.cat.codes == -1).sum() - else: - num_nulls = len(data) - data.count() + um_nulls = len(data) - data.count() definition_data, data = make_definitions(data, num_nulls == 0, datapage_version=datapage_version) # make_definitions returns `data` with all nulls dropped # the null-stripped `data` can be converted from Optional Types to # their numpy counterparts - if isinstance(data.dtype, BaseMaskedDtype) and data.dtype in pdoptional_to_numpy_typemap: - data = data.astype(pdoptional_to_numpy_typemap[data.dtype], copy=False) - if data.dtype.kind == "O" and not isinstance(data.dtype, pd.CategoricalDtype): + if data.dtype.kind == "O": try: if selement.type == parquet_thrift.Type.INT64: data = data.astype("int64", copy=False) @@ -580,37 +530,6 @@ def write_column(f, data0, selement, compression=None, datapage_version=None, # No nested field handling (encode those as J/BSON) repetition_data = b"" - if isinstance(data.dtype, pd.CategoricalDtype): - if first_page: - # make "index page" - dict_page_offset = column_chunk_start - dph = parquet_thrift.DictionaryPageHeader( - num_values=check_32(len(data.cat.categories)), - encoding=parquet_thrift.Encoding.PLAIN, - i32=1 - ) - bdata = encode['PLAIN'](pd.Series(data.cat.categories), selement) - l0 = len(bdata) - if compression and compression.upper() != "UNCOMPRESSED": - bdata = compress_data(bdata, compression) - l1 = len(bdata) - else: - l1 = l0 - diff += l0 - l1 - ph = parquet_thrift.PageHeader( - type=parquet_thrift.PageType.DICTIONARY_PAGE, - uncompressed_page_size=check_32(l0), - compressed_page_size=check_32(l1), - dictionary_page_header=dph, crc=None, i32=1) - - write_thrift(f, ph) - f.write(bdata) - data_page_offset = f.tell() - ncats = len(data.cat.categories) - dcat = data.cat.categories.dtype - cats = True - encoding = "RLE_DICTIONARY" - data = data.cat.codes if str(data0.dtype) in ['int8', 'int16', 'uint8', 'uint16']: # PLAIN encoding must be upcast to parquet primitive data = data.astype('int32') @@ -727,7 +646,7 @@ def write_column(f, data0, selement, compression=None, datapage_version=None, total_compressed_size=compressed_size, i32list=[1, 4] ) - if cats: + if False: # cats: kvm.append( parquet_thrift.KeyValue(key='num_categories', value=str(ncats))) kvm.append( @@ -749,17 +668,10 @@ def make_row_group(f, data, schema, compression=None, stats=True): rows = len(data) if rows == 0: return - if isinstance(data.columns, pd.MultiIndex): - if any(not isinstance(c, (bytes, str)) for c in itertools.chain(*data.columns.values)): - raise ValueError('Column names must be multi-index, str or bytes:', - {c: type(c) for c in data.columns - if not isinstance(c, (bytes, str))}) - - else: - if any(not isinstance(c, (bytes, str)) for c in data): - raise ValueError('Column names must be multi-index, str or bytes:', - {c: type(c) for c in data.columns - if not isinstance(c, (bytes, str))}) + if any(not isinstance(c, (bytes, str)) for c in data): + raise ValueError('Column names must be multi-index, str or bytes:', + {c: type(c) for c in data.columns + if not isinstance(c, (bytes, str))}) cols = [] for column in schema: @@ -770,14 +682,7 @@ def make_row_group(f, data, schema, compression=None, stats=True): comp = compression.get('_default', None) else: comp = compression - if isinstance(data.columns, pd.MultiIndex): - try: - name = ast.literal_eval(column.name) - except ValueError: - name = column.name - coldata = data[name] - else: - coldata = data[column.name] + coldata = data[column.name] if isinstance(stats, int): st = stats elif stats == "auto": @@ -833,27 +738,11 @@ def make_metadata(data, has_nulls=True, ignore_columns=None, fixed_text=None, raise ValueError('Cannot create parquet dataset with duplicate' ' column names (%s)' % data.columns) index_cols_orig = None - if isinstance(data.columns, pd.MultiIndex): - if isinstance(index_cols, list) and index_cols != []: - index_cols_orig = copy(index_cols) - # TODO: for loop required to manage row multi-index. - name = index_cols[0][0] - index_cols = [{'field_name': name, - 'metadata': None, - 'name': name, - 'numpy_type': 'object', - 'pandas_type': 'mixed-integer'}] - ci = [ - get_column_metadata(ser, n) - for ser, n - in zip(data.columns.levels, data.columns.names) - ] - else: - ci = [{'name': data.columns.name, - 'field_name': data.columns.name, - 'pandas_type': 'mixed-integer', - 'numpy_type': str(cols_dtype), - 'metadata': None}] + ci = [{'name': data.columns.name, + 'field_name': data.columns.name, + 'pandas_type': 'mixed-integer', + 'numpy_type': str(cols_dtype), + 'metadata': None}] if not isinstance(index_cols, list): start = index_cols.start stop = index_cols.stop @@ -870,7 +759,7 @@ def make_metadata(data, has_nulls=True, ignore_columns=None, fixed_text=None, 'column_indexes': ci, 'creator': {'library': 'fastparquet', 'version': __version__}, - 'pandas_version': pd.__version__} + } root = parquet_thrift.SchemaElement(name=b'schema', num_children=0, i32=True) @@ -896,14 +785,9 @@ def make_metadata(data, has_nulls=True, ignore_columns=None, fixed_text=None, get_column_metadata(data[column], column, object_dtype=oencoding)) fixed = None if fixed_text is None else fixed_text.get(column, None) is_index = (column in index_cols_orig) if index_cols_orig else None - if isinstance(data[column].dtype, pd.CategoricalDtype): - se, type = find_type(data[column].cat.categories, fixed_text=fixed, - object_encoding=oencoding, is_index=is_index) - se.name = column - else: - se, type = find_type(data[column], fixed_text=fixed, - object_encoding=oencoding, times=times, - is_index=is_index) + se, type = find_type(data[column], fixed_text=fixed, + object_encoding=oencoding, times=times, + is_index=is_index) col_has_nulls = has_nulls if has_nulls is None: se.repetition_type = data[column].dtype == "O" @@ -954,8 +838,7 @@ def write_simple(fn, data, fmd, row_group_offsets=None, compression=None, if False, never do; and if a list of str, do it only for those specified columns. """ - if isinstance(data, pd.DataFrame): - data = iter_dataframe(data, row_group_offsets) + data = iter_dataframe(data, row_group_offsets) mode = 'rb+' if append else 'wb' if hasattr(fn, "write"): of = fn @@ -1043,8 +926,7 @@ def write_multi(dn, data, fmd, row_group_offsets=None, compression=None, mkdirs(dn) else: i_offset = find_max_part(fmd.row_groups) - if isinstance(data, pd.DataFrame): - data = iter_dataframe(data, row_group_offsets) + data = iter_dataframe(data, row_group_offsets) rg_list = fmd.row_groups for i, row_group in enumerate(data): part = 'part.%i.parquet' % (i + i_offset) @@ -1259,9 +1141,6 @@ def write(filename, data, row_group_offsets=None, partition_on = [partition_on] if append: pf = ParquetFile(filename, open_with=open_with) - if pf._get_index(): - # Format dataframe (manage row index). - data = reset_row_idx(data) if file_scheme == 'simple': # Case 'simple' if pf.file_scheme not in ['simple', 'empty']: @@ -1283,18 +1162,7 @@ def write(filename, data, row_group_offsets=None, # Case 'append=False'. # Define 'index_cols' to be recorded in metadata. cols_dtype = data.columns.dtype - if (write_index or write_index is None - and not isinstance(data.index, pd.RangeIndex)): - # Keep name(s) of index to metadata. - cols = set(data) - data = reset_row_idx(data) - index_cols = [c for c in data if c not in cols] - elif write_index is None and isinstance(data.index, pd.RangeIndex): - # write_index=None, range to metadata - index_cols = data.index - else: - # write_index=False - index_cols = [] + index_cols = [] # Initialize common metadata. if str(has_nulls) == 'infer': has_nulls = None @@ -1558,16 +1426,13 @@ def sort_key(row_group) -> int: # 2nd step (from new and existing data). # Identify row groups from existing data with same partition values as # those in new data. - partition_values_in_new = pd.unique(data.loc[:,defined_partitions] + partition_values_in_new = set(data.loc[:,defined_partitions] .astype(str).agg('/'.join, axis=1)) rgs_to_remove = filter(lambda rg : (partitions(rg, True) in partition_values_in_new), pf.row_groups) # 3rd step (on new data). # Format new data so that it can be written to disk. - if pf._get_index(): - # Reset index of pandas dataframe. - data = reset_row_idx(data) # 4th step: write new data, remove previously existing row groups, # sort row groups and write updated metadata. pf.write_row_groups(data, row_group_offsets=row_group_offsets, diff --git a/setup.py b/setup.py index 3fb7749c..189bca37 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,6 @@ def finalize_options(self): extra = {} else: modules_to_build = { - 'fastparquet.speedups': ['fastparquet/speedups.pyx'], 'fastparquet.cencoding': ['fastparquet/cencoding.pyx'] } try: diff --git a/test-data/deep_ragged.parq b/test-data/deep_ragged.parq new file mode 100644 index 00000000..f97a5c3b Binary files /dev/null and b/test-data/deep_ragged.parq differ diff --git a/test-data/deep_ragged_null.parq b/test-data/deep_ragged_null.parq new file mode 100644 index 00000000..ec4c0f0b Binary files /dev/null and b/test-data/deep_ragged_null.parq differ diff --git a/test-data/deep_struct.parq b/test-data/deep_struct.parq new file mode 100644 index 00000000..6310f1ca Binary files /dev/null and b/test-data/deep_struct.parq differ diff --git a/test-data/dict_str.parq b/test-data/dict_str.parq new file mode 100644 index 00000000..fb88f8ac Binary files /dev/null and b/test-data/dict_str.parq differ diff --git a/test-data/nested.parq b/test-data/nested.parq index 379abc6e..d281eaad 100644 Binary files a/test-data/nested.parq and b/test-data/nested.parq differ diff --git a/test-data/null_ragged_null.parq b/test-data/null_ragged_null.parq new file mode 100644 index 00000000..b7535cad Binary files /dev/null and b/test-data/null_ragged_null.parq differ diff --git a/test-data/output_table.parquet b/test-data/output_table.parquet new file mode 100644 index 00000000..e5806387 Binary files /dev/null and b/test-data/output_table.parquet differ diff --git a/test-data/plain_str.parq b/test-data/plain_str.parq new file mode 100644 index 00000000..f18449c4 Binary files /dev/null and b/test-data/plain_str.parq differ diff --git a/test-data/ragged.parq b/test-data/ragged.parq new file mode 100644 index 00000000..fe65d9d4 Binary files /dev/null and b/test-data/ragged.parq differ diff --git a/test-data/ragged_null.parq b/test-data/ragged_null.parq new file mode 100644 index 00000000..499d74ec Binary files /dev/null and b/test-data/ragged_null.parq differ diff --git a/test-data/simple_optional.parq b/test-data/simple_optional.parq new file mode 100644 index 00000000..f3816258 Binary files /dev/null and b/test-data/simple_optional.parq differ diff --git a/test-data/single.parq b/test-data/single.parq new file mode 100644 index 00000000..ff2dfe76 Binary files /dev/null and b/test-data/single.parq differ