from __future__ import annotations
import itertools
import logging
from collections.abc import Iterable, Iterator, Sequence
from multiprocessing.util import Finalize
import pyarrow
from cloudly.upathlib import LocalUpath, PathType, Upath, resolve_path
from pyarrow.fs import FileSystem, GcsFileSystem
from pyarrow.parquet import FileMetaData, ParquetFile
try:
from cloudly.gcp.auth import get_credentials
except ImportError:
pass
from ._util import FileReader, Seq, locate_idx_in_chunked_seq
# If data is in Google Cloud Storage, `pyarrow.fs.GcsFileSystem` accepts "access_token"
# and "credential_token_expiration". These can be obtained via
# a "google.oauth2.service_account.Credentials" object, e.g.
#
# cred = google.oauth2.service_account.Credentials.from_service_info(
# info_json, scopes=['https://www.googleapis.com/auth/cloud-platform'])
# or
# cred = google.auth.default(scopes=['https://www.googleapis.com/auth/cloud-platform'])
#
# auth_req = google.auth.transport.requests.Request()
# cred.refresh(auth_req)
# # now `cred` has `token` and `expiry`; expiration appears to be in a few hours
#
# gcs = pyarrow.fs.GcsFileSystem(access_token=cred.token, credential_token_expiration=cred.expiry)
# pfile = pyarrow.parquet.ParquetFile(gcs.open_input_file('bucket-name/path/to/file.parquet'))
logger = logging.getLogger(__name__)
[docs]
class ParquetFileReader(FileReader):
[docs]
@classmethod
def get_gcsfs(cls, *, good_for_seconds=600) -> GcsFileSystem:
"""
Obtain a `pyarrow.fs.GcsFileSystem`_ object with credentials given so that
the GCP default process of inferring credentials (which involves
env vars and file reading etc) will not be triggered.
This is provided under the (un-verified) assumption that the
default credential inference process is a high overhead.
"""
cred, renewed = get_credentials(
valid_for_seconds=good_for_seconds,
return_state=True,
)
if renewed or getattr(cls, '_GCSFS', None) is None:
fs = GcsFileSystem(
access_token=cred.token,
credential_token_expiration=cred.expiry,
)
cls._GCSFS = fs
return cls._GCSFS
[docs]
@classmethod
def load_file(cls, path: Upath) -> ParquetFile:
"""
This reads *meta* info and constructs a ``pyarrow.parquet.ParquetFile`` object.
This does not load the entire file.
See :meth:`load` for eager loading.
Parameters
----------
path
Path of the file.
"""
ff, pp = FileSystem.from_uri(str(path))
if isinstance(ff, GcsFileSystem):
ff = cls.get_gcsfs()
file = ParquetFile(pp, filesystem=ff)
Finalize(file, file.reader.close)
# NOTE: can not use
#
# Finalize(file, file.close, kwargs={'force': True})
#
# because the instance method `file.close` can't be used as the callback---the
# object `file` is no long available at that time.
#
# See https://github.com/apache/arrow/issues/35318
return file
[docs]
def __init__(self, path: PathType):
"""
Parameters
----------
path
Path of a Parquet file.
"""
self.path: Upath = resolve_path(path)
self._reset()
def _reset(self):
self._file: ParquetFile | None = None
self._data: ParquetBatchData | None = None
self._row_groups_num_rows = None
self._row_groups_num_rows_cumsum = None
self._row_groups: None | list[ParquetBatchData] = None
self._column_names = None
self._columns = {}
self._getitem_last_row_group = None
self._scalar_as_py = None
self.scalar_as_py = True
def __getstate__(self):
return (self.path,)
def __setstate__(self, data):
self.path = data[0]
self._reset()
@property
def scalar_as_py(self) -> bool:
"""
``scalar_as_py`` controls whether the values returned by :meth:`__getitem__`
(or indirectly by :meth:`__iter__`) are converted from a `pyarrow.Scalar`_ type
such as `pyarrow.lib.StringScalar`_ to a Python builtin type such as ``str``.
This property can be toggled anytime to take effect until it is toggled again.
:getter: Returns this property's value.
:setter: Sets this property's value.
"""
if self._scalar_as_py is None:
self._scalar_as_py = True
return self._scalar_as_py
@scalar_as_py.setter
def scalar_as_py(self, value: bool):
self._scalar_as_py = bool(value)
if self._data is not None:
self._data.scalar_as_py = self._scalar_as_py
if self._row_groups:
for r in self._row_groups:
if r is not None:
r.scalar_as_py = self._scalar_as_py
[docs]
def __len__(self) -> int:
return self.num_rows
[docs]
def load(self) -> None:
"""Eagerly read the whole file into memory as a table."""
if self._data is None:
self._data = ParquetBatchData(
self.file.read(columns=self._column_names, use_threads=True),
)
self._data.scalar_as_py = self.scalar_as_py
if self.num_row_groups == 1:
assert self._row_groups is None
self._row_groups = [self._data]
@property
def file(self) -> ParquetFile:
"""Return a `pyarrow.parquet.ParquetFile`_ object.
Upon initiation of a :class:`ParquetFileReader` object,
the file is not read at all. When this property is requested,
the file is accessed to construct a `pyarrow.parquet.ParquetFile`_ object.
"""
if self._file is None:
self._file = self.load_file(self.path)
return self._file
@property
def metadata(self) -> FileMetaData:
return self.file.metadata
@property
def num_rows(self) -> int:
return self.metadata.num_rows
@property
def num_row_groups(self) -> int:
return self.metadata.num_row_groups
@property
def num_columns(self) -> int:
if self._column_names:
return len(self._column_names)
return self.metadata.num_columns
@property
def column_names(self) -> list[str]:
if self._column_names:
return self._column_names
return self.metadata.schema.names
[docs]
def data(self) -> ParquetBatchData:
"""Return the entire data in the file."""
self.load()
return self._data
def _locate_row_group_for_item(self, idx: int):
# Assuming user is checking neighboring items,
# then the requested item may be in the same row-group
# as the item requested last time.
if self._row_groups_num_rows is None:
meta = self.metadata
self._row_groups_num_rows = [
meta.row_group(i).num_rows for i in range(self.num_row_groups)
]
self._row_groups_num_rows_cumsum = list(
itertools.accumulate(self._row_groups_num_rows)
)
igrp, idx_in_grp, group_info = locate_idx_in_chunked_seq(
idx, self._row_groups_num_rows_cumsum, self._getitem_last_row_group
)
self._getitem_last_row_group = group_info
return igrp, idx_in_grp
[docs]
def __getitem__(self, idx: int):
"""
Get one row (or "record").
If the object has a single column, then return its value in the specified row.
If the object has multiple columns, return a dict with column names as keys.
The values are converted to Python builtin types if :data:`scalar_as_py`
is ``True``.
Parameters
----------
idx
Row index in this file.
Negative value counts from the end as expected.
"""
if idx < 0:
idx = self.num_rows + idx
if idx < 0 or idx >= self.num_rows:
raise IndexError(idx)
if self._data is not None:
return self._data[idx]
igrp, idx_in_row_group = self._locate_row_group_for_item(idx)
row_group = self.row_group(igrp)
return row_group[idx_in_row_group]
[docs]
def __iter__(self):
"""
Iterate over rows.
The type of yielded individual elements is the same as the return of :meth:`__getitem__`.
"""
if self._data is None:
for batch in self.iter_batches():
yield from batch
else:
yield from self._data
[docs]
def iter_batches(self, batch_size=10_000) -> Iterator[ParquetBatchData]:
if self._data is None:
for batch in self.file.iter_batches(
batch_size=batch_size,
columns=self._column_names,
use_threads=True,
):
z = ParquetBatchData(batch)
z.scalar_as_py = self.scalar_as_py
yield z
else:
for batch in self._data.data().to_batches(batch_size):
z = ParquetBatchData(batch)
z.scalar_as_py = self.scalar_as_py
yield z
[docs]
def row_group(self, idx: int) -> ParquetBatchData:
"""
Parameters
----------
idx
Index of the row group of interest.
"""
assert 0 <= idx < self.num_row_groups
if self._row_groups is None:
self._row_groups = [None] * self.num_row_groups
if self._row_groups[idx] is None:
z = ParquetBatchData(
self.file.read_row_group(idx, columns=self._column_names),
)
z.scalar_as_py = self.scalar_as_py
self._row_groups[idx] = z
if self.num_row_groups == 1:
assert self._data is None
self._data = self._row_groups[0]
return self._row_groups[idx]
[docs]
def columns(self, cols: Sequence[str]) -> ParquetFileReader:
"""
Return a new :class:`ParquetFileReader` object that will only load
the specified columns.
The columns of interest have to be within currently available columns.
In other words, a series of calls to this method would incrementally
narrow down the selection of columns. (Note this returns a new
:class:`ParquetFileReader`, hence one can call :meth:`columns` again on the
returned object.)
This method "slices" the data by columns, in contrast to other
data access methods that select rows.
Parameters
----------
cols
Names of the columns to select.
Examples
--------
>>> obj = ParquetFileReader('file_path') # doctest: +SKIP
>>> obj1 = obj.columns(['a', 'b', 'c']) # doctest: +SKIP
>>> print(obj1[2]) # doctest: +SKIP
>>> obj2 = obj1.columns(['b', 'c']) # doctest: +SKIP
>>> print(obj2[3]) # doctest: +SKIP
>>> obj3 = obj.columns(['d']) # doctest: +SKIP
>>> for v in obj: # doctest: +SKIP
>>> print(v) # doctest: +SKIP
"""
assert len(set(cols)) == len(cols) # no repeat values
if self._column_names:
if all(col in self._column_names for col in cols):
if len(cols) == len(self._column_names):
return self
else:
cc = [col for col in cols if col not in self._column_names]
raise ValueError(
f'cannot select the columns {cc} because they are not in existing set of columns'
)
obj = self.__class__(self.path)
obj.scalar_as_py = self.scalar_as_py
obj._file = self._file
obj._row_groups_num_rows = self._row_groups_num_rows
obj._row_groups_num_rows_cumsum = self._row_groups_num_rows_cumsum
if self._row_groups:
obj._row_groups = [
None if v is None else v.columns(cols) for v in self._row_groups
]
if self._data is not None:
obj._data = self._data.columns(cols)
# TODO: also carry over `self._columns`?
obj._column_names = cols
return obj
[docs]
def column(self, idx_or_name: int | str) -> pyarrow.ChunkedArray:
"""Select a single column.
Note: while :meth:`columns` returns a new :class:`ParquetFileReader`,
:meth:`column` returns a `pyarrow.ChunkedArray`_.
"""
z = self._columns.get(idx_or_name)
if z is not None:
return z
if self._data is not None:
return self._data.column(idx_or_name)
if isinstance(idx_or_name, int):
idx = idx_or_name
name = self.column_names[idx]
else:
name = idx_or_name
idx = self.column_names.index(name)
z = self.file.read(columns=[name]).column(name)
self._columns[idx] = z
self._columns[name] = z
return z
[docs]
class ParquetBatchData(Seq):
"""
``ParquetBatchData`` wraps a `pyarrow.Table`_ or `pyarrow.RecordBatch`_.
The data is already in memory; this class does not involve file reading.
:meth:`ParquetFileReader.data` and :meth:`ParquetFileReader.iter_batches` both
return or yield ParquetBatchData.
In addition, the method :meth:`columns` of this class returns a new object
of this class.
Objects of this class can be pickled.
"""
[docs]
def __init__(
self,
data: pyarrow.Table | pyarrow.RecordBatch,
):
# `self.scalar_as_py` may be toggled anytime
# and have its effect right away.
self._data = data
self.scalar_as_py = True
"""Indicate whether scalar values should be converted to Python types from `pyarrow`_ types."""
self.num_rows = data.num_rows
self.num_columns = data.num_columns
self.column_names = data.schema.names
def __repr__(self):
return '<{} with {} rows, {} columns>'.format(
self.__class__.__name__,
self.num_rows,
self.num_columns,
)
def __str__(self):
return self.__repr__()
[docs]
def data(self) -> pyarrow.Table | pyarrow.RecordBatch:
"""Return the underlying `pyarrow`_ data."""
return self._data
[docs]
def __len__(self) -> int:
return self.num_rows
[docs]
def __getitem__(self, idx: int):
"""
Get one row (or "record").
If the object has a single column, then return its value in the specified row.
If the object has multiple columns, return a dict with column names as keys.
The values are converted to Python builtin types if :data:`scalar_as_py`
is ``True``.
Parameters
----------
idx
Row index in this batch.
Negative value counts from the end as expected.
"""
if idx < 0:
idx = self.num_rows + idx
if idx < 0 or idx >= self.num_rows:
raise IndexError(idx)
if self.num_columns == 1:
z = self._data.column(0)[idx]
if self.scalar_as_py:
return z.as_py()
return z
z = {col: self._data.column(col)[idx] for col in self.column_names}
if self.scalar_as_py:
return {k: v.as_py() for k, v in z.items()}
return z
[docs]
def __iter__(self):
"""
Iterate over rows.
The type of yielded individual elements is the same as :meth:`__getitem__`.
"""
if self.num_columns == 1:
if self.scalar_as_py:
yield from (v.as_py() for v in self._data.column(0))
else:
yield from self._data.column(0)
else:
names = self.column_names
if self.scalar_as_py:
for row in zip(*self._data.columns):
yield dict(zip(names, (v.as_py() for v in row)))
else:
for row in zip(*self._data.columns):
yield dict(zip(names, row))
[docs]
def columns(self, cols: Sequence[str]) -> ParquetBatchData:
"""
Return a new :class:`ParquetBatchData` object that only contains
the specified columns.
The columns of interest have to be within currently available columns.
In other words, a series of calls to this method would incrementally
narrow down the selection of columns.
(Note this returns a new :class:`ParquetBatchData`,
hence one can call :meth:`columns` again on the returned object.)
This method "slices" the data by columns, in contrast to other
data access methods that select rows.
Parameters
----------
cols
Names of the columns to select.
Examples
--------
>>> obj = ParquetBatchData(parquet_table) # doctest: +SKIP
>>> obj1 = obj.columns(['a', 'b', 'c']) # doctest: +SKIP
>>> print(obj1[2]) # doctest: +SKIP
>>> obj2 = obj1.columns(['b', 'c']) # doctest: +SKIP
>>> print(obj2[3]) # doctest: +SKIP
>>> obj3 = obj.columns(['d']) # doctest: +SKIP
>>> for v in obj: # doctest: +SKIP
>>> print(v) # doctest: +SKIP
"""
assert len(set(cols)) == len(cols) # no repeat values
if all(col in self.column_names for col in cols):
if len(cols) == len(self.column_names):
return self
else:
cc = [col for col in cols if col not in self.column_names]
raise ValueError(
f'cannot select the columns {cc} because they are not in existing set of columns'
)
z = self.__class__(self._data.select(cols))
z.scalar_as_py = self.scalar_as_py
return z
[docs]
def column(self, idx_or_name: int | str) -> pyarrow.Array | pyarrow.ChunkedArray:
"""
Select a single column specified by name or index.
If ``self._data`` is `pyarrow.Table`_, return `pyarrow.ChunkedArray`_.
If ``self._data`` is `pyarrow.RecordBatch`_, return `pyarrow.Array`_.
"""
return self._data.column(idx_or_name)
[docs]
def read_parquet_file(path: PathType) -> ParquetFileReader:
"""
Parameters
----------
path
Path of the file.
"""
return ParquetFileReader(path)
[docs]
def make_parquet_type(type_spec: str | Sequence):
"""
``type_spec`` is a spec of arguments to one of pyarrow's data type
`factory functions <https://arrow.apache.org/docs/python/api/datatypes.html#factory-functions>`_.
For simple types, this may be just the type name (or function name), e.g. ``'bool_'``, ``'string'``, ``'float64'``.
For type functions expecting arguments, this is a list or tuple with the type name followed by other arguments,
for example,
::
('time32', 's')
('decimal128', 5, -3)
For compound types (types constructed by other types), this is a "recursive" structure, such as
::
('list_', 'int64')
('list_', ('time32', 's'), 5)
where the second element is the spec for the member type, or
::
('map_', 'string', ('list_', 'int64'), True)
where the second and third elements are specs for the key type and value type, respectively,
and the fourth element is the optional argument ``keys_sorted`` to
`pyarrow.map_() <https://arrow.apache.org/docs/python/generated/pyarrow.map_.html#pyarrow.map_>`_.
Below is an example of a struct type::
('struct', [('name', 'string', False), ('age', 'uint8', True), ('income', ('struct', (('currency', 'string'), ('amount', 'uint64'))), False)])
Here, the second element is the list of fields in the struct.
Each field is expressed by a spec that is taken by :meth:`make_parquet_field`.
"""
if isinstance(type_spec, str):
type_name = type_spec
args = ()
else:
type_name = type_spec[0]
args = type_spec[1:]
if type_name in ('string', 'float64', 'bool_', 'int8', 'int64', 'uint8', 'uint64'):
assert not args
return getattr(pyarrow, type_name)()
if type_name == 'list_':
if len(args) > 2:
raise ValueError(f"'pyarrow.list_' expects 1 or 2 args, got `{args}`")
return pyarrow.list_(make_parquet_type(args[0]), *args[1:])
if type_name in ('map_', 'dictionary'):
if len(args) > 3:
raise ValueError(f"'pyarrow.{type_name}' expects 2 or 3 args, got `{args}`")
return getattr(pyarrow, type_name)(
make_parquet_type(args[0]),
make_parquet_type(args[1]),
*args[2:],
)
if type_name == 'struct':
assert len(args) == 1
return pyarrow.struct((make_parquet_field(v) for v in args[0]))
if type_name == 'large_list':
assert len(args) == 1
return pyarrow.large_list(make_parquet_type(args[0]))
if type_name in (
'int16',
'int32',
'uint16',
'uint32',
'float32',
'date32',
'date64',
'month_day_nano_interval',
'utf8',
'large_binary',
'large_string',
'large_utf8',
'null',
):
assert not args
return getattr(pyarrow, type_name)()
if type_name in ('time32', 'time64', 'duration'):
assert len(args) == 1
elif type_name in ('timestamp', 'decimal128'):
assert len(args) in (1, 2)
elif type_name in ('binary',):
assert len(args) <= 1
else:
raise ValueError(f"unknown pyarrow type '{type_name}'")
return getattr(pyarrow, type_name)(*args)
[docs]
def make_parquet_field(field_spec: Sequence):
"""
``filed_spec`` is a list or tuple with 2, 3, or 4 elements.
The first element is the name of the field.
The second element is the spec of the type, to be passed to function :func:`make_parquet_type`.
Additional elements are the optional ``nullable`` and ``metadata`` to the function
`pyarrow.field() <https://arrow.apache.org/docs/python/generated/pyarrow.field.html#pyarrow.field>`_.
"""
field_name = field_spec[0]
type_spec = field_spec[1]
assert len(field_spec) <= 4 # two optional elements are `nullable` and `metadata`.
return pyarrow.field(field_name, make_parquet_type(type_spec), *field_spec[3:])
[docs]
def make_parquet_schema(fields_spec: Iterable[Sequence]):
"""
This function constructs a pyarrow schema that is expressed by simple Python types
that can be json-serialized.
``fields_spec`` is a list or tuple, each of its elements accepted by :func:`make_parquet_field`.
This function is motivated by the need of :class:`~biglist._biglist.ParquetSerializer`.
When :class:`biglist.Biglist` uses a "storage-format" that takes options (such as 'parquet'),
these options can be passed into :func:`biglist.Biglist.new` (via ``serialize_kwargs`` and ``deserialize_kwargs``) and saved in "info.json".
However, this requires the options to be json-serializable.
Therefore, the argument ``schema`` to :meth:`ParquetSerializer.serialize() <biglist._biglist.ParquetSerializer.serialize>`
can not be used by this mechanism.
As an alternative, user can use the argument ``schema_spec``;
this argument can be saved in "info.json", and it is handled by this function.
"""
return pyarrow.schema((make_parquet_field(v) for v in fields_spec))
def write_parquet_table(
table: pyarrow.Table,
path: PathType,
**kwargs,
) -> None:
"""
If the file already exists, it will be overwritten.
Parameters
----------
path
Path of the file to create and write to.
table
pyarrow Table object.
**kwargs
Passed on to `pyarrow.parquet.write_table() <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html>`_.
"""
path = resolve_path(path)
if isinstance(path, LocalUpath):
path.parent.path.mkdir(exist_ok=True, parents=True)
ff, pp = FileSystem.from_uri(str(path))
if isinstance(ff, GcsFileSystem):
ff = ParquetFileReader.get_gcsfs()
pyarrow.parquet.write_table(table, ff.open_output_stream(pp), **kwargs)
[docs]
def write_arrays_to_parquet(
data: Sequence[pyarrow.Array | pyarrow.ChunkedArray | Iterable],
path: PathType,
*,
names: Sequence[str],
**kwargs,
) -> None:
"""
Parameters
----------
path
Path of the file to create and write to.
data
A list of data arrays.
names
List of names for the arrays in ``data``.
**kwargs
Passed on to `pyarrow.parquet.write_table() <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html>`_.
"""
assert len(names) == len(data)
arrays = [
a if isinstance(a, (pyarrow.Array, pyarrow.ChunkedArray)) else pyarrow.array(a)
for a in data
]
table = pyarrow.Table.from_arrays(arrays, names=names)
return write_parquet_table(table, path, **kwargs)
[docs]
def write_pylist_to_parquet(
data: Sequence,
path: PathType,
*,
schema=None,
schema_spec=None,
metadata=None,
**kwargs,
):
if schema is not None:
assert schema_spec is None
elif schema_spec is not None:
assert schema is None
schema = make_parquet_schema(schema_spec)
table = pyarrow.Table.from_pylist(data, schema=schema, metadata=metadata)
return write_parquet_table(table, path, **kwargs)