biglist#
(Generated on Feb 01 2024 for biglist version 0.8.9.)
The package biglist provides persisted, out-of-memory Python data structures
that implement the Seq
interface (a simplified Sequence
) with the capabilities of
concurrent and distributed reading and writing.
The main use case is sequentially processing large amounts of data that can not fit in memory.
Currently, two kinds of “biglists” are provided, namely Biglist
and ParquetBiglist
.
Biglist
manages writing and reading.
Here, “writing” refers to adding data to this facility and be managed by it.
The class manages its data files in addition to meta info.
Writing is append-only; updating existing data is not supported.
Appending can be conducted by a number of distributed workers.
ParquetBiglist
defines a kind of “external biglist”. When given the paths to a set of
pre-existing data files in the Apache Parquet format,
this class provides a rich set of facilities for reading the data.
(Potentially confusingly, Biglist
can save data in Parquet format among others; that type of a Biglist
is not a ParquetBiglist
, which supports reading only.)
Biglist
and ParquetBiglist
share the same core API for reading.
Although random element access is supported, it is not optimized
and is not the target usage pattern. Random element access can be very inefficient.
The intended way of data consumption is by iteration, which
can be done collectively by distributed workers.
Persistence can be on local disk or in cloud storage. Thanks to the package upathlib, the implementation as wel as the end-user API is agnostic to the location of storage.
In support of ParquetBiglist
, some utilities are provided for reading and writing Parquet data files.
These utilities are useful in their own right.
Additional utilities provide mechanisms for “slicing and dicing” a biglist,
as well as “chaining up” a series of biglists. These utilities work not only for biglist,
but also for any Seq
.
Installation#
To install biglist
and use local disk for persistence, simply do
python3 -m pip install biglist
If you need to use Google Cloud Storage for persistence, do
python3 -m pip install "biglist[gcs]"
Creating a Biglist#
Create a new Biglist
object via the classmethod new()
:
>>> from biglist import Biglist
>>> mylist = Biglist.new(batch_size=100)
then add data to it, for example,
>>> for x in range(10_023):
... mylist.append(x)
This saves a new data file for every 100 elements
accumulated. In the end, there are 23 elements in a memory buffer that are
not yet persisted to disk. The code has no way to know whether we will append
more elements soon, hence it does not save this partial batch.
Suppose we’re done with adding data, we call flush()
to persist
the content of the buffer to disk:
>>> mylist.flush()
If, after a while, we decide to append more data to mylist
, we just call append()
again.
We can continue to add more data as long as the disk has space.
New data files will be saved. The smaller file containing 23 elements will stay there
among larger files with no problem.
Now let’s take a look at the biglist object:
>>> mylist
<Biglist at '/tmp/19f88a17-3e78-430f-aad0-a35d39485f80' with 10023 elements in 101 data file(s)>
>>> len(mylist)
10023
>>> mylist.path
LocalUpath('/tmp/19f88a17-3e78-430f-aad0-a35d39485f80')
>>> mylist.num_data_files
101
The data have been saved in the directory /tmp/19f88a17-3e78-430f-aad0-a35d39485f80
,
which is a temporary one because we did not tell new()
where to save data.
When the object mylist
gets garbage collected, this directory will be deleted automatically.
This has its uses, but often we want to save the data for future use. In that case, just pass
a currently non-existent directory to new()
, for example,
>>> yourlist = Biglist.new('/tmp/project/data/store-a', batch_size=10_000)
Later, initiate a Biglist
object for reading the existing dataset:
>>> yourlist = Biglist('/tmp/project/data/store-a')
If we want to persist the data in Google Cloud Storage, we would specify a path in the
'gs://bucket-name/path/to/data'
format.
The Seq protocol and FileReader class#
Before going further with Biglist, let’s digress a bit and introduce a few helper facilities.
BiglistBase
(and its subclasses Biglist
and ParquetBiglist
)
could have implemented the Sequence
interface in the standard library.
However, that interface contains a few methods that are potentially hugely inefficient for Biglist,
and hence are not supposed to be used on a Biglist.
These methods include __contains__
, count
, and index
.
These methods require iterating over the entire dataset for purposes about one particular data item.
For Biglist, this would require loading and unloading each of a possibly large number of data files.
Biglist does not want to give user the illusion that they can use these methods at will and lightly.
For this reason, the protocol Seq
is defined, which has three methods:
__len__()
, __iter__()
, and __getitem__()
.
Therefore, classes that implement this protocol are
Sized,
Iterable,
and support random element access by index. Most classes in the biglist
package
implement this protocol rather than the standard Sequence
.
Because a biglist manages any number of data files, a basic operation concerns reading one data file.
Each subclass of BiglistBase
implements its file-reading class as a subclass of
FileReader
. FileReader implements the Seq
protocol, hence
the data items in one data file can be used like a list.
Importantly, a FileReader instance does not load the data file upon initialization.
At that moment, the instance can be pickled. This lends itself to uses in multiprocessing.
This point of the design will be showcased later.
The centerpiece of a biglist is a sequence of data files in persistence, or correspondingly,
a sequence of FileReader’s in memory. The property BiglistBase.files()
of BiglistBase
returns a FileSeq
to manage the FileReader objects of the biglist.
Besides implementing the Seq
protocol, FileSeq provides ways to use the FileReader’s
by distributed workers.
Finally, BiglistBase
implements the Seq
protocol for its data items across the data files.
To sum up, a BiglistBase is a Seq of data items across data files; BiglistBase.files is a FileSeq, which in turn is a Seq of FileReaders; a FileReader is a Seq of data items in one data file.
Reading a Biglist#
Random element access#
We can access any element of a Biglist
like we do a list:
>>> mylist[18]
18
>>> mylist[-3]
10020
Biglist does not support slicing directly.
However, the class Slicer
wraps a Seq
and enables element access by a single index, by a slice, or by a list of indices:
>>> from biglist import Slicer
>>> v = Slicer(mylist)
>>> len(v)
10023
>>> v
<Slicer into 10023/10023 of <Biglist at '/tmp/dc260854-8041-40e8-801c-34084451d7a3' with 10023 elements in 101 data file(s)>>
>>> v[83]
83
>>> v[100:104]
<Slicer into 4/10023 of <Biglist at '/tmp/dc260854-8041-40e8-801c-34084451d7a3' with 10023 elements in 101 data file(s)>>
>>>
Note that slicing the slicer does not return a list of values.
Instead, it returns another Slicer
object, which, naturally, can be used the same way,
including slicing further.
A Slicer
object is a Iterable
(in fact, it is a Seq
),
hence we can gather all of its elements in a list:
>>> list(v[100:104])
[100, 101, 102, 103]
Slicer
provides a convenience method collect()
to do the same:
>>> v[100:104].collect()
[100, 101, 102, 103]
A few more examples:
>>> v[-8:].collect()
[10015, 10016, 10017, 10018, 10019, 10020, 10021, 10022]
>>> v[-8::2].collect()
[10015, 10017, 10019, 10021]
>>> v[[1, 83, 250, -2]]
<Slicer into 4/10023 of <Biglist at '/tmp/dc260854-8041-40e8-801c-34084451d7a3' with 10023 elements in 101 data file(s)>>
>>> v[[1, 83, 250, -2]].collect()
[1, 83, 250, 10021]
>>> v[[1, 83, 250, -2]][-3:].collect()
[83, 250, 10021]
Iteration#
Don’t be carried away by the many easy and flexible ways of random access.
Random element access for Biglist
is inefficient.
The reason is that it needs to load a data file that contains the element of interest.
If the biglist has many data files and we are “jumping around” randomly,
it is wasting a lot of time loading entire files just to access a few data elements in them.
(However, consecutive random accesses to elements residing in the same file will not load the file
repeatedly.)
The preferred way to consume the data of a Biglist
is to iterate over it. For example,
>>> for i, x in enumerate(mylist):
... print(x)
... if i > 4:
... break
0
1
2
3
4
5
Conceptually, this loads each data file in turn and yields the elements in each file. The implementation “pre-loads” a few files in background threads to speed up the iteration.
Reading from a Biglist in multiple processes#
To collectively consume a Biglist
object from multiple processes,
we can distribute FileReader
s to the processes.
The FileReader’s of mylist
is accessed via its property files()
, which returns a FileSeq
:
>>> files = mylist.files
>>> files
<BiglistFileSeq at '/tmp/dc260854-8041-40e8-801c-34084451d7a3' with 10023 elements in 101 data file(s)>
>>> len(files)
101
>>> files.num_data_files
101
>>> files.num_data_items
10023
>>> files[0]
<BiglistFileReader for '/tmp/cfb39dc0-94bb-4557-a056-c7cea20ea653/store/1669667946.647939_46eb97f6-bdf3-45d2-809c-b90c613d69c7_100.pickle_zstd'>
A FileReader
object is light-weight. Upon initialization, it has not loaded the file yet—it merely records the file path along with the function that will be used to load the file.
In addition, FileReader objects are friendly to pickling, hence lend themselves to multiprocessing code.
Let’s design a small experiment to consume this dataset in multiple processes:
>>> def worker(file_reader):
... total = 0
... for x in file_reader:
... total += x
... return total
>>> from concurrent.futures import ProcessPoolExecutor
>>> total = 0
>>> with ProcessPoolExecutor(5) as pool:
... tasks = [pool.submit(worker, fr) for fr in mylist.files]
... for t in tasks:
... total += t.result()
>>> total
50225253
What is the expected result?
>>> sum(mylist)
50225253
Sure enough, this verifies that the entire biglist is consumed by the processes collectively.
If the file loading is the bottleneck of the task, we can use threads in place of processes.
Similarly, it is possible to read mylist
from multiple machines if mylist
is stored
in the cloud. Since a FileReader object is pickle-able, it works just fine if we pickle it
and send it to another machine, provided the file path that is contained in the FileReader object
is in the cloud, hence accessible from the other machine.
We need a mechanism to distribute these FileReader objects to machines.
For that, check out Multiplexer
from upathlib
.
Writing to a Biglist in multiple workers#
The flip side of distributed reading is distributed writing. If we have a biglist on the local disk, we can append to it from multiple processes or threads. If we have a biglist in the cloud, we can append to it from multiple machines. Let’s use multiprocessing to demo the idea.
First, we create a new Biglist
at a storage location of our choosing:
>>> from upathlib import LocalUpath
>>> path = LocalUpath('/tmp/a/b/c/d')
>>> path.rmrf()
0
>>> yourlist = Biglist.new(path, batch_size=6)
In each worker process, we will open this biglist by Biglist(path)
and append data to it.
Now that this has a presence on the disk, Biglist(path)
will not complain the dataset does not exist.
>>> yourlist.info
{'storage_format': 'pickle-zstd', 'storage_version': 3, 'batch_size': 6, 'data_files_info': []}
>>> yourlist.path
LocalUpath('/tmp/a/b/c/d')
>>> len(yourlist)
0
Then we can tell workers, “here is the location, add data to it.” Let’s design a simple worker:
>>> def worker(path, idx):
... yourlist = Biglist(path)
... for i in range(idx):
... yourlist.append(100 * idx + i)
... yourlist.flush()
From the main process, let’s instruct the workers to write data to the same Biglist
:
>>> import multiprocessing
>>> with ProcessPoolExecutor(10, mp_context=multiprocessing.get_context('spawn')) as pool:
... tasks = [pool.submit(worker, path, idx) for idx in range(10)]
... for t in tasks:
... _ = t.result()
Let’s see what we’ve got:
>>> yourlist.reload()
>>> len(yourlist)
45
>>> yourlist.num_data_files
12
>>> list(yourlist)
[400, 401, 402, 403, 500, 501, 502, 503, 504, 600, 601, 602, 603, 604, 605, 700, 701, 702, 703, 704, 705, 706, 900, 901, 902, 903, 904, 905, 906, 907, 908, 100, 800, 801, 802, 803, 804, 805, 806, 807, 200, 201, 300, 301, 302]
>>>
Does this look right? It took me a moment to realize that idx = 0
did not append anything.
So, the data elements are in the 100, 200, …, 900 ranges; that looks right.
But the order of things is confusing.
Well, in a distributed setting, there’s no guarantee of order. It’s not a problem that numbers in the 800 range come after those in the 900 range.
We can get more insights if we dive to the file level:
>>> for f in yourlist.files:
... print(f)
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.971439_f84d0cf3-e2c4-40a7-acf2-a09296ff73bc_1.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.973651_63e4ca6d-4e44-49e1-a035-6d60a88f7789_.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.975576_f59ab2f0-be9c-477d-a95b-70d3dfc00d94_6.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.982828_3219d2d1-50e2-4b41-b595-2c6df4e63d3c_6.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.983024_674e57de-66ed-4e3b-bb73-1db36c13fd6f_1.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.985425_78eec966-8139-4401-955a-7b81fb8b47b9_6.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.985555_752b4975-fbf3-4172-9063-711722a83abc_3.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073411.012161_3a7620f5-b040-4cec-9018-e8bd537ea98d_1.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073411.034502_4a340751-fa1c-412e-8f49-13f2ae83fc3a_6.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073411.035010_32c58dbe-e3a2-4ba1-9ffe-32c127df11a6_2.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073411.067370_20a0e926-7a5d-46a1-805d-86d16c346852_2.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073411.119890_89ae31bc-7c48-488d-8dd1-e22212773d79_3.pickle_zstd'>
The file names do not appear to be totally random. They follow some pattern that facilitates ordering, and they have encoded some useful info.
In fact, the part before the first underscore is the date and time of file creation, with a resolution to microseconds.
This is followed by a uuid.uuid4 random string.
When we iterate the Biglist
object, files are read in the order of their paths, hence in the order of creation time.
The number in the file name before the suffix is the number of elements in the file.
We can get similar info in a more readable format:
>>> for v in yourlist.files.data_files_info:
... print(v)
['/tmp/a/b/c/d/store/20230129073410.971439_f84d0cf3-e2c4-40a7-acf2-a09296ff73bc_4.pickle_zstd', 4, 4]
['/tmp/a/b/c/d/store/20230129073410.973651_63e4ca6d-4e44-49e1-a035-6d60a88f7789_5.pickle_zstd', 5, 9]
['/tmp/a/b/c/d/store/20230129073410.975576_f59ab2f0-be9c-477d-a95b-70d3dfc00d94_6.pickle_zstd', 6, 15]
['/tmp/a/b/c/d/store/20230129073410.982828_3219d2d1-50e2-4b41-b595-2c6df4e63d3c_6.pickle_zstd', 6, 21]
['/tmp/a/b/c/d/store/20230129073410.983024_674e57de-66ed-4e3b-bb73-1db36c13fd6f_1.pickle_zstd', 1, 22]
['/tmp/a/b/c/d/store/20230129073410.985425_78eec966-8139-4401-955a-7b81fb8b47b9_6.pickle_zstd', 6, 28]
['/tmp/a/b/c/d/store/20230129073410.985555_752b4975-fbf3-4172-9063-711722a83abc_3.pickle_zstd', 3, 31]
['/tmp/a/b/c/d/store/20230129073411.012161_3a7620f5-b040-4cec-9018-e8bd537ea98d_1.pickle_zstd', 1, 32]
['/tmp/a/b/c/d/store/20230129073411.034502_4a340751-fa1c-412e-8f49-13f2ae83fc3a_6.pickle_zstd', 6, 38]
['/tmp/a/b/c/d/store/20230129073411.035010_32c58dbe-e3a2-4ba1-9ffe-32c127df11a6_2.pickle_zstd', 2, 40]
['/tmp/a/b/c/d/store/20230129073411.067370_20a0e926-7a5d-46a1-805d-86d16c346852_2.pickle_zstd', 2, 42]
['/tmp/a/b/c/d/store/20230129073411.119890_89ae31bc-7c48-488d-8dd1-e22212773d79_3.pickle_zstd', 3, 45]
The values for each entry are file path, number of elements in the file, and accumulative number of elements.
The accumulative count is obviously the basis for random access—Biglist
uses this to
figure out which file contains the element at a specified index.
Creating a ParquetBiglist#
Apache Parquet is a popular file format in the “big data” domain. Many tools save large amounts of data in this format, often in a large number of files, sometimes in nested directories.
ParquetBiglist
takes such data files as pre-existing, read-only, external data,
and provides an API to read the data in various ways.
This is analogous to, for example, the “external table” concept in BigQuery.
Let’s create a couple small Parquet files to demonstrate this API.
>>> from upathlib import LocalUpath
>>> import random
>>> from biglist import write_arrays_to_parquet
>>>
>>> path = LocalUpath('/tmp/a/b/c/e')
>>> path.rmrf()
0
>>> year = list(range(1970, 2021))
>>> make = ['honda'] * len(year)
>>> sales = list(range(123, 123 + len(make)))
>>> write_arrays_to_parquet([make, year, sales], path / 'honda.parquet', names=['make', 'year', 'sales'], row_group_size=10)
>>>
>>> year = list(range(1960, 2021))
>>> make = ['ford'] * len(year)
>>> sales = list(range(234, 234 + len(make)))
>>> write_arrays_to_parquet([make, year, sales], path / 'ford.parquet', names=['make', 'year', 'sales'], row_group_size=10)
Now we want to treat the contents of honda.parquet
and ford.parquet
combined as one dataset, and
use biglist
tools to read it.
>>> from biglist import ParquetBiglist
>>> car_data = ParquetBiglist.new(path)
>>> car_data
<ParquetBiglist at '/tmp/edd9cefb-179b-46d2-8946-7dc8ae1bdc50' with 112 records in 2 data file(s) stored at ['/tmp/a/b/c/e']>
>>> car_data.path
LocalUpath('/tmp/edd9cefb-179b-46d2-8946-7dc8ae1bdc50')
>>> len(car_data)
112
>>> car_data.num_data_files
2
>>> list(car_data.files)
[<ParquetFileReader for '/tmp/a/b/c/e/ford.parquet'>, <ParquetFileReader for '/tmp/a/b/c/e/honda.parquet'>]
what ParquetBiglist.new()
does is to read the meta data of each file in the directory, recursively,
and save relevant info to facilitate its reading later.
The location given by car_data.path
is the directory where ParquetBiglist
saves its meta info,
and not where the actual data are.
As is the case with Biglist
, this directory is a temporary one, which will be deleted once the object
car_data
goes away. If we wanted to keep the directory for future use, we should have specified a location
when calling new()
.
Reading a ParquetBiglist#
The fundamental reading API is the same between Biglist
and ParquetBiglist
:
random access, slicing/dicing using Slicer
, iteration,
distributed reading via its files()
—these are all used the same way.
However, the structures of the data files are very different between Biglist
and ParquetBiglist
.
For Biglist, each data file contains a straight Python list, elements of which being whatever have been
passed into Biglist.append()
.
For ParquetBiglist, each data file is in a sophisticated columnar format, which is publicly documented.
A variety of ways are provided to get data out of the Parquet format;
some favor convenience, some others favor efficiency. Let’s see some examples.
A row perspective#
>>> for i, x in enumerate(car_data):
... print(x)
... if i > 5:
... break
{'make': 'ford', 'year': 1960, 'sales': 234}
{'make': 'ford', 'year': 1961, 'sales': 235}
{'make': 'ford', 'year': 1962, 'sales': 236}
{'make': 'ford', 'year': 1963, 'sales': 237}
{'make': 'ford', 'year': 1964, 'sales': 238}
{'make': 'ford', 'year': 1965, 'sales': 239}
{'make': 'ford', 'year': 1966, 'sales': 240}
This is the most basic iteration, Biglist
-style, one row (or “record”) at a time.
When there are multiple columns, each row is presented as a dict with column names as keys.
Reading a Parquet data file is performed by ParquetFileReader
.
>>> f0 = car_data.files[0]
>>> f0
<ParquetFileReader for '/tmp/a/b/c/e/ford.parquet'>
>>> f0.path
LocalUpath('/tmp/a/b/c/e/ford.parquet')
First of all, a FileReader
object is a Seq
, providing row-based view into the data:
>>> len(f0)
61
>>> f0[2]
{'make': 'ford', 'year': 1962, 'sales': 236}
>>> f0[-10]
{'make': 'ford', 'year': 2011, 'sales': 285}
>>> Slicer(f0)[-3:].collect()
[{'make': 'ford', 'year': 2018, 'sales': 292}, {'make': 'ford', 'year': 2019, 'sales': 293}, {'make': 'ford', 'year': 2020, 'sales': 294}]
>>> for i, x in enumerate(f0):
... print(x)
... if i > 5:
... break
{'make': 'ford', 'year': 1960, 'sales': 234}
{'make': 'ford', 'year': 1961, 'sales': 235}
{'make': 'ford', 'year': 1962, 'sales': 236}
{'make': 'ford', 'year': 1963, 'sales': 237}
{'make': 'ford', 'year': 1964, 'sales': 238}
{'make': 'ford', 'year': 1965, 'sales': 239}
{'make': 'ford', 'year': 1966, 'sales': 240}
ParquetFileReader
uses pyarrow to read the Parquet files.
The values above are nice and simple Python types, but they are not the original
pyarrow types;
they have undergone a conversion. This conversion can be toggled by the property
ParquetFileReader.scalar_as_py
:
>>> f0[8]
{'make': 'ford', 'year': 1968, 'sales': 242}
>>> f0.scalar_as_py = False
>>> f0[8]
{'make': <pyarrow.StringScalar: 'ford'>, 'year': <pyarrow.Int64Scalar: 1968>, 'sales': <pyarrow.Int64Scalar: 242>}
>>> f0.scalar_as_py = True
A Parquet file consists of one or more “row groups”. Each row-group is a batch of rows stored column-wise. We can get info about the row-groups, or even retrieve a row-group as the unit of processing:
>>> f0.num_row_groups
7
>>> f0.metadata
<pyarrow._parquet.FileMetaData object at 0x7...>
created_by: parquet-cpp-arrow version 1...
num_columns: 3
num_rows: 61
num_row_groups: 7
format_version: 2.6
serialized_size: 2375
>>> f0.metadata.row_group(1)
<pyarrow._parquet.RowGroupMetaData object at 0x7...>
num_columns: 3
num_rows: 10
total_byte_size: 408
sorting_columns: ()
>>> f0.metadata.row_group(0)
<pyarrow._parquet.RowGroupMetaData object at 0x7...>
num_columns: 3
num_rows: 10
total_byte_size: 408
sorting_columns: ()
>>> rg = f0.row_group(0)
>>> rg
<ParquetBatchData with 10 rows, 3 columns>
(We have specified row_group_size=10
in the call to write_arrays_to_parquet()
for demonstration.
In practice, a row-group tends to be much larger.)
A ParquetBatchData
object is again a Seq
.
All of our row access tools are available:
>>> rg.num_rows
10
>>> len(rg)
10
>>> rg.num_columns
3
>>> rg[3]
{'make': 'ford', 'year': 1963, 'sales': 237}
>>> rg[-2]
{'make': 'ford', 'year': 1968, 'sales': 242}
>>> Slicer(rg)[4:7].collect()
[{'make': 'ford', 'year': 1964, 'sales': 238}, {'make': 'ford', 'year': 1965, 'sales': 239}, {'make': 'ford', 'year': 1966, 'sales': 240}]
>>> rg.scalar_as_py = False
>>> rg[3]
{'make': <pyarrow.StringScalar: 'ford'>, 'year': <pyarrow.Int64Scalar: 1963>, 'sales': <pyarrow.Int64Scalar: 237>}
>>> rg.scalar_as_py = True
When we request a specific row, ParquetFileReader
will load the row-group that contains the row of interest.
It doe not load the entire data in the file.
However, we can get greedy and ask for the whole data in one go:
>>> f0
<ParquetFileReader for '/tmp/a/b/c/e/ford.parquet'>
>>> f0.data()
<ParquetBatchData with 61 rows, 3 columns>
This, again, is a ParquetBatchData
object. All the familiar row access tools are at our disposal.
Finally, if the file is large, we may choose to iterate over it by batches instead of by rows:
>>> for batch in f0.iter_batches(batch_size=10):
... print(batch)
<ParquetBatchData with 10 rows, 3 columns>
<ParquetBatchData with 10 rows, 3 columns>
<ParquetBatchData with 10 rows, 3 columns>
<ParquetBatchData with 10 rows, 3 columns>
<ParquetBatchData with 10 rows, 3 columns>
<ParquetBatchData with 10 rows, 3 columns>
<ParquetBatchData with 1 rows, 3 columns>
The batches are again ParquetBatchData
objects.
At the core of a ParquetBatchData is
a pyarrow.Table
or pyarrow.RecordBatch.
ParquetBatchData is friendly to pickle and,
I suppose, pickling pyarrow objects are very efficient.
So, the batches could be useful in multiprocessing code.
A column perspective#
Parquet is a columnar format. If we only need a subset of the columns, we should say so, so that the un-needed columns will not be loaded from disk (or cloud, as it may be).
Both ParquetFileReader
and ParquetBatchData
provide the method columns()
(columns()
) to return a new object
with only the selected columns.
For ParquetFileReader, if data have not been loaded, reading of the new object will only load the selected columns.
For ParquetBatchData, its data is already in memory, hence column selection leads to a data subset.
>>> f0.column_names
['make', 'year', 'sales']
>>> cols = f0.columns(['year', 'sales'])
>>> cols
<ParquetFileReader for '/tmp/a/b/c/e/ford.parquet'>
>>> cols.num_columns
2
>>> cols.column_names
['year', 'sales']
ParquetFileReader.columns()
returns another ParquetFileReader
, whereas
ParquetBatchData.columns()
returns another ParquetBatchData
:
>>> rg
<ParquetBatchData with 10 rows, 3 columns>
>>> rg.column_names
['make', 'year', 'sales']
>>> rgcols = rg.columns(['make', 'year'])
>>> rgcols.column_names
['make', 'year']
>>> len(rgcols)
10
>>> rgcols[5]
{'make': 'ford', 'year': 1965}
It’s an interesting case when there’s only one column:
>>> f0
<ParquetFileReader for '/tmp/a/b/c/e/ford.parquet'>
>>> sales = f0.columns(['sales'])
>>> sales
<ParquetFileReader for '/tmp/a/b/c/e/ford.parquet'>
>>> sales.column_names
['sales']
>>> len(sales)
61
>>> sales[3]
237
>>> list(sales)
[234, 235, 236, 237, 238, 239, ..., 291, 292, 293, 294]
>>> Slicer(sales)[:8].collect()
[234, 235, 236, 237, 238, 239, 240, 241]
Notice the type of the values (rows) returned from the element access methods: it’s not dict
.
Because there’s only one column whose name is known, there is no need to carry that info with every row.
Also note that the values have been converted to Python builtin types.
The original pyarrow values will not look as nice:
>>> sales.scalar_as_py = False
>>> Slicer(sales)[:8].collect()
[<pyarrow.Int64Scalar: 234>, <pyarrow.Int64Scalar: 235>, <pyarrow.Int64Scalar: 236>, <pyarrow.Int64Scalar: 237>, <pyarrow.Int64Scalar: 238>, <pyarrow.Int64Scalar: 239>, <pyarrow.Int64Scalar: 240>, <pyarrow.Int64Scalar: 241>]
>>> sales.scalar_as_py = True
Both ParquetFileReader
and ParquetBatchData
have another method called column()
(column()
), which retrieves a single column
and returns a
pyarrow.Array or
pyarrow.ChunkedArray. For example,
>>> sales2 = f0.column('sales')
>>> sales2
<pyarrow.lib.ChunkedArray object at 0x...>
[
[
234,
235,
236,
237,
238,
...
290,
291,
292,
293,
294
]
]
ParquetFileReader.column()
returns a
pyarrow.ChunkedArray, whereas
ParquetBatchData.column()
returns either a
pyarrow.ChunkedArray or a
pyarrow.Array.
Performance considerations#
While some biglist
facilities shown here provide convenience and API elegance,
it may be a safe bet to use pyarrow facilities directly if ultimate performance is a requirement.
We have seen ParquetFileReader.scalar_as_py
(and ParquetBatchData.scalar_as_py
); it’s worthwhile to experiment whether that conversion impacts performance in a particular context.
There are several ways to get to a pyarrow object quickly and proceed with it.
A newly initiated ParquetFileReader
has not loaded any data yet.
Its property file
initiates a
pyarrow.parquet.ParquetFile object (reading meta data during initiation)
and returns it. We may take it and go all the way down the pyarrow path:
>>> f1 = car_data.files[1]
>>> f1._data is None
True
>>> file = f1.file
>>> file
<pyarrow.parquet.core.ParquetFile object at 0x7...>
>>> f1._file
<pyarrow.parquet.core.ParquetFile object at 0x7...>
We have seen that ParquetFileReader.row_group()
and ParquetFileReader.iter_batches()
both
return ParquetBatchData
objects. In contrast to ParquetFileReader
, which is “lazy” in terms of data loading,
a ParquetBatchData already has its data in memory. ParquetFileReader has another method,
namely data()
, that
eagerly loads the entire data of the file and wraps it in a ParquetBatchData object:
>>> data = f1.data()
>>> data
<ParquetBatchData with 51 rows, 3 columns>
The pyarrow data wrapped in ParquetBatchData
can be acquired easily:
>>> padata = data.data()
>>> padata
pyarrow.Table
make: string
year: int64
sales: int64
----
make: [["honda","honda","honda","honda","honda",...,"honda","honda","honda","honda","honda"]]
year: [[1970,1971,1972,1973,1974,...,2016,2017,2018,2019,2020]]
sales: [[123,124,125,126,127,...,169,170,171,172,173]]
Finally, we have seen that ParquetFileReader.column()
and ParquetBatchData.column()
—the single-column selectors—return
a pyarrow object. It is either a
pyarrow.Array or a
pyarrow.ChunkedArray.
Other utilities#
Chain#
Chain
takes a series of Seq
s and returns a combined Seq without data copy.
For example,
>>> from biglist import Chain
>>> numbers = list(range(10))
>>> car_data
<ParquetBiglist at '/tmp/edd9cefb-179b-46d2-8946-7dc8ae1bdc50' with 112 records in 2 data file(s) stored at ['/tmp/a/b/c/e']>
>>> combined = Chain(numbers, car_data)
>>> combined[3]
3
>>> combined[9]
9
>>> combined[10]
{'make': 'ford', 'year': 1960, 'sales': 234}
>>>
>>> car_data[0]
{'make': 'ford', 'year': 1960, 'sales': 234}
Slicer#
Slicer
takes any Seq
and provides __getitem__()
that accepts
a single index, or a slice, or a list of indices. A single-index access will return
the requested element; the other two scenarios return a new Slicer via a zero-copy operation.
To get all the elements out of a Slicer, either iterate it or call its method collect()
.
We should emphasize that Chain
and Slicer
work with any Seq
,
hence they are useful independent of the other biglist
classes.
Reading Parquet files#
The function read_parquet_file()
is provided to read a single Parquet file independent of
ParquetBiglist
. It returns a ParquetFileReader
. All the facilities of this class,
as demonstrated above, are ready for use:
>>> [v.path for v in car_data.files]
[LocalUpath('/tmp/a/b/c/e/ford.parquet'), LocalUpath('/tmp/a/b/c/e/honda.parquet')]
>>>
>>> from biglist import read_parquet_file
>>> ff = read_parquet_file(car_data.files[1].path)
>>> ff
<ParquetFileReader for '/tmp/a/b/c/e/honda.parquet'>
>>> len(ff)
51
>>> ff.column_names
['make', 'year', 'sales']
>>> ff[3]
{'make': 'honda', 'year': 1973, 'sales': 126}
>>> Slicer(ff.columns(['year', 'sales']))[10:16].collect()
[{'year': 1980, 'sales': 133}, {'year': 1981, 'sales': 134}, {'year': 1982, 'sales': 135}, {'year': 1983, 'sales': 136}, {'year': 1984, 'sales': 137}, {'year': 1985, 'sales': 138}]
>>> ff.num_row_groups
6
>>> ff.row_group(3).column('sales')
<pyarrow.lib.ChunkedArray object at 0x7...>
[
[
153,
154,
155,
156,
157,
158,
159,
160,
161,
162
]
]
Writing Parquet files#
The function write_arrays_to_parquet()
is provided to write data columns to a single Parquet file.
>>> from uuid import uuid4
>>> from biglist import write_arrays_to_parquet, read_parquet_file
>>> import random
>>> from upathlib import LocalUpath
>>> N = 10000
>>> path = LocalUpath('/tmp/a/b/c/d')
>>> path.rmrf()
1
>>> write_arrays_to_parquet([[random.randint(0, 10000) for _ in range(N)], [str(uuid4()) for _ in range(N)]], path / 'data.parquet', names=['key', 'value'])
>>> f = read_parquet_file(path / 'data.parquet')
>>> f
<ParquetFileReader for '/tmp/a/b/c/d/data.parquet'>
>>> len(f)
10000
>>> f.metadata
<pyarrow._parquet.FileMetaData object at 0x7...>
created_by: parquet-cpp-arrow version 1...
num_columns: 2
num_rows: 10000
num_row_groups: 1
format_version: 2.6
serialized_size: 609
>>> f.metadata.schema
<pyarrow._parquet.ParquetSchema object at 0x7...>
required group field_id=-1 schema {
optional int64 field_id=-1 key;
optional binary field_id=-1 value (String);
}
>>>
Similarly, write_pylist_to_parquet()
writes data rows to a Parquet file:
>>> from biglist import write_pylist_to_parquet
>>> data = [{'name': str(uuid4()), 'age': random.randint(1, 100), 'income': {'employer': str(uuid4()), 'amount': random.randint(10000, 100000)}} for _ in range(100)]
>>> f = LocalUpath('/tmp/test/data.parquet')
>>> f.rmrf()
0
>>> write_pylist_to_parquet(data, f)
>>> ff = read_parquet_file(f)
>>> ff[0]
{'name': '066ced72-fd33-492a-9180-39eeca541b1a', 'age': 75, 'income': {'amount': 17840, 'employer': 'bfc176a0-5257-4913-bd1e-3c4d51885e0c'}}
>>> ff[11]
{'name': 'a239af28-41ff-4215-b560-9c45db15478e', 'age': 12, 'income': {'amount': 17488, 'employer': 'e97f70c9-1659-4fa6-9123-eb39779d00d6'}}
>>>
API reference#
- biglist._util.Element = ~Element#
This type variable is used to annotate the type of a data element.
- class biglist.Seq[source]#
Bases:
Protocol
[Element
]The protocol
Seq
is simpler and broader than the standardSequence
. The former requires/provides only__len__
,__getitem__
, and__iter__
, whereas the latter adds__contains__
,__reversed__
,index
andcount
to these three. Although the extra methods can be implemented using the three basic methods, they could be massively inefficient in particular cases, and that is the case in the applications targeted bybiglist
. For this reason, the classes defined in this package implement the protocolSeq
rather thanSequence
, to prevent the illusion that methods__contains__
, etc., are usable.A class that implements this protocol is sized, iterable, and subscriptable by an int index. This is a subset of the methods provided by
Sequence
. In particular,Sequence
implements this protocol, hence is considered a subclass ofSeq
for type checking purposes:>>> from biglist import Seq >>> from collections.abc import Sequence >>> issubclass(Sequence, Seq) True
The built-in dict and tuple also implement the
Seq
protocol.The type parameter
Element
indicates the type of each data element.
- class biglist.FileReader[source]#
-
A
FileReader
is a “lazy” loader of a data file. It keeps track of the path of a data file along with a loader function, but performs the loading only when needed. In particular, upon initiation of aFileReader
object, file loading has not happened, and the object is light weight and friendly to pickling.Once data have been loaded, this class provides various ways to navigate the data. At a minimum, the
Seq
API is implemented.With loaded data and associated facilities, this object may no longer be pickle-able, depending on the specifics of a subclass.
One use case of this class is to pass around
FileReader
objects (that are initiated but not loaded) in multiprocessing code for concurrent data processing.This class is generic with a parameter indicating the type of the elements in the data sequence contained in the file. For example you can write:
def func(file_reader: FileReader[int]): ...
- abstract load() None [source]#
This method eagerly loads all the data from the file into memory.
Once this method has been called, subsequent data consumption should all draw upon this in-memory copy. However, if the data file is large, and especially if only part of the data is of interest, calling this method may not be the best approach. This all depends on the specifics of the subclass.
A subclass may allow consuming the data and load parts of data in a “as-needed” or “streaming” fashion. In that approach,
__getitem__()
and__iter__()
do not require this method to be called (although they may take advantage of the in-memory data if this method has been called.).
- class biglist.Slicer[source]#
-
This class wraps a
Seq
and enables element access by slice or index array, in addition to single integer.A
Slicer
object makes “zero-copy”—it holds a reference to the underlyingSeq
and keeps track of indices of the selected elements. ASlicer
object may be sliced again in a repeated “zoom in” fashion. Actual data elements are retrieved from the underlyingSeq
only when a single-element is accessed or iteration is performed. In other words, until an actual data element needs to be returned, it’s all operations on the indices.- __init__(list_: Seq[Element], range_: None | range | Seq[int] = None)[source]#
This provides a “slice” of, or “window” into,
list_
.The selection of elements is represented by the optional
range_
, which is eithe a range such asrange(3, 8)
, or a list of indices such as[1, 3, 5, 6]
. Ifrange_
isNone
, the “window” covers the entirelist_
. A common practice is to create aSlicer
object withoutrange_
, and then access a slice of it, for example,Slicer(obj)[3:8]
rather thanSlicer(obj, range(3,8))
.During the use of this object, the underlying
list_
must remain unchanged. Otherwise purplexing and surprising things may happen.
- __getitem__(idx: int | slice | Seq[int])[source]#
Element access by a single index, slice, or an index array. Negative index and standard slice syntax work as expected.
Single-index access returns the requested data element. Slice and index-array accesses return a new
Slicer
object, which, naturally, can be sliced again, like>>> x = list(range(30)) >>> Slicer(x)[[1, 3, 5, 6, 7, 8, 9, 13, 14]][::2][-2] 9
- property raw: Seq[Element]#
Return the underlying data
Seq
, that is, thelist_
that was passed into__init__()
.
- property range: None | range | Seq[int]#
Return the parameter
range_
that was provided to__init__()
, representing the selection of items in the underlyingSeq
.
- collect() list[Element] [source]#
Return a list containing the elements in the current window. This is equivalent to
list(self)
.This is often used to substantiate a small slice as a list, because a slice is still a
Slicer
object, which does not directly reveal the data items. For example,>>> x = list(range(30)) >>> Slicer(x)[3:11] <Slicer into 8/30 of [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]> >>> Slicer(x)[3:11].collect() [3, 4, 5, 6, 7, 8, 9, 10]
(A list is used for illustration. In reality, list supports slicing directly, hence would not need
Slicer
.)Warning
Do not call this on “big” data!
- class biglist.Chain[source]#
-
This class tracks a series of
Seq
objects to provide random element access and iteration on the series as a whole, with zero-copy.This class is in contrast with the standard itertools.chain, which takes iterables.
- class biglist._biglist.BiglistBase[source]#
-
This base class contains code mainly concerning reading. The subclass
Biglist
adds functionalities for writing. Another subclassParquetBiglist
is read-only. Here, “reading” and “read-only” is talking about the data files. This class always needs to write meta info about the data files. In addition, the subclassBiglist
also creates and manages the data files, whereasParquetBiglist
provides methods to read existing data files, treating them as read-only.This class is generic with a parameter indicating the type of the data items, but this is useful only for the subclass
Biglist
. For the subclassParquetBiglist
, this parameter is essentiallyAny
because the data items (or rows) in Parquet files are composite and flexible.- classmethod get_temp_path() Upath [source]#
If user does not specify
path
when callingnew()
(in a subclass), this method is used to determine a temporary directory.This implementation returns a temporary location in the local file system.
Subclasses may want to customize this if they prefer other ways to find temporary locations. For example, they may want to use a temporary location in a cloud storage.
- classmethod new(path: str | Path | Upath | None = None, *, keep_files: bool | None = None, init_info: dict | None = None, **kwargs) BiglistBase [source]#
Create a new object of this class (of a subclass, to be precise) and then add data to it.
- Parameters:
- path
A directory in which this
BiglistBase
will save whatever it needs to save.The directory must be non-existent. It is not necessary to pre-create the parent directory of this path.
This path can be either on local disk or in a cloud storage.
If not specified,
BiglistBase.get_temp_path()
is called to determine a temporary path.The subclass
Biglist
saves both data and meta-info in this path. The subclassParquetBiglist
saves meta-info only.- keep_files
If not specified, the default behavior is the following:
If
path
isNone
, then this isFalse
—the temporary directory will be deleted when thisBiglistBase
object goes away.If
path
is notNone
, i.e. user has deliberately specified a location, then this isTrue
—files saved by thisBiglistBase
object will stay.
User can pass in
True
orFalse
explicitly to override the default behavior.- init_info
Initial info that should be written into the info file before
__init__
is called. This is in addition to whatever this method internally decides to write.The info file info.json is written before
__init__()
is called. In__init__()
, this file is read intoself.info
.This parameter can be used to write some high-level info that
__init__
needs.If the info is not needed in
__init__
, then user can always add it toself.info
after the object has been instantiated, hence saving general info ininfo.json
is not the intended use of this parameter.User rarely needs to use this parameter. It is mainly used by the internals of the method
new
of subclasses.- **kwargs
additional arguments are passed on to
__init__()
.
Notes
A
BiglistBase
object construction is in either of the two modes below:create a new
BiglistBase
to store new data.create a
BiglistBase
object pointing to storage of existing data, which was created by a previous call tonew()
.
In case (a), one has called
new()
. In case (b), one has calledBiglistBase(..)
(i.e.__init__()
).Some settings are applicable only in mode (a), b/c in mode (b) they can’t be changed and, if needed, should only use the value already set in mode (a). Such settings can be parameters to
new()
but should not be parameters to__init__()
. Examples includestorage_format
andbatch_size
for the subclassBiglist
. These settings typically should be taken care of innew()
, before and/or after the object has been created by a call to__init__()
withinnew()
.__init__()
should be defined in such a way that it works for both a barebone object that is created in thisnew()
, as well as a fleshed out object that already has data in persistence.Some settings may be applicable to an existing
BiglistBase
object, e.g., they control styles of display and not intrinsic attributes persisted along with the BiglistBase. Such settings should be parameters to__init__()
but not tonew()
. If provided in a call tonew()
, these parameters will be passed on to__init__()
.Subclass authors should keep these considerations in mind.
- __init__(path: str | Path | Upath)[source]#
- Parameters:
- path
Directory that contains files written by an instance of this class.
- path: Upath#
Root directory of the storage space for this object.
- info: dict#
Various meta info.
- __len__() int [source]#
Number of data items in this biglist.
This is an alias to
num_data_items()
.
- __getitem__(idx: int) Element [source]#
Access a data item by its index; negative index works as expected.
- __iter__() Iterator[Element] [source]#
Iterate over all the elements.
When there are multiple data files, as the data in one file is being yielded, the next file(s) may be pre-loaded in background threads. For this reason, although the following is equivalent in the final result:
for file in self.files: for item in file: ... use item ...
it could be less efficient than iterating over self directly, as in
for item in self: ... use item ...
- property num_data_files: int#
- property num_data_items: int#
- abstract property files: FileSeq[FileReader[Element]]#
- class biglist.Biglist[source]#
Bases:
BiglistBase
[Element
]- registered_storage_formats = {'json': <class 'upathlib.serializer.JsonSerializer'>, 'parquet': <class 'biglist._biglist.ParquetSerializer'>, 'pickle': <class 'upathlib.serializer.PickleSerializer'>, 'pickle-zstd': <class 'upathlib.serializer.ZstdPickleSerializer'>}#
- DEFAULT_STORAGE_FORMAT = 'pickle-zstd'#
- classmethod register_storage_format(name: str, serializer: type[Serializer]) None [source]#
Register a new serializer to handle data file dumping and loading.
This class has a few serializers registered out of the box. They should be adequate for most applications.
- Parameters:
- name
Name of the format to be associated with the new serializer.
After registering the new serializer with name “xyz”, one can use
storage_format='xyz'
in calls tonew()
. When reading the object back from persistence, make sure this registry is also in place so that the correct deserializer can be found.- serializer
A subclass of upathlib.serializer.Serializer.
Although this class needs to provide the
Serializer
API, it is possible to write data files in text mode. The registered ‘json’ format does that.
- classmethod new(path: str | Path | Upath | None = None, *, batch_size: int | None = None, storage_format: str | None = None, serialize_kwargs: dict | None = None, deserialize_kwargs: dict | None = None, init_info: dict | None = None, **kwargs) Self [source]#
- Parameters:
- path
Passed on to
BiglistBase.new()
.- batch_size
Max number of data elements in each persisted data file.
There’s no good default value for this parameter, although one is provided (currently the default is 1000), because the code of
new()
doesn’t know the typical size of the data elements. User is recommended to specify the value of this parameter.In choosing a value for
batch_size
, the most important consideration is the size of each data file, which is determined by the typical size of the data elements as well asbatch_size
, which is the upper bound of the the number of elements in each file.There are several considerations about the data file sizes:
It should not be so small that the file reading/writing is a large overhead relative to actual processing of the data. This is especially important when
path
is cloud storage.It should not be so large that it is “unwieldy”, e.g. approaching 1GB.
When
__iter__()
ating over aBiglist
object, there can be up to (by default) 4 files-worth of data in memory at any time, where 4 isself._n_read_threads
plus 1.When
append()
ing orextend()
ing to aBiglist
object at high speed, there can be up to (by default) 9 timesbatch_size
data elements in memory at any time, where 9 isself._n_write_threads
plus 1. See_flush()
andDumper
.
Another consideration is access pattern of elements in the
Biglist
. If there are many “jumping around” with random element access, large data files will lead to very wasteful file loading, because to read any element, its hosting file must be read into memory. (After all, if the application is heavy on random access, thenBiglist
is not the right tool.)The performance of iteration is not expected to be highly sensitive to the value of
batch_size
, as long as it is in a reasonable range.A rule of thumb: it is recommended to keep the persisted files between 32-128MB in size. (Note: no benchmark was performed to back this recommendation.)
- storage_format
This should be a key in
registered_storage_formats
. If not specified,DEFAULT_STORAGE_FORMAT
is used.- serialize_kwargs
Additional keyword arguments to the serialization function.
- deserialize_kwargs
Additional keyword arguments to the deserialization function.
serialize_kwargs
anddeserialize_kwargs
are rarely needed. One use case isschema
when storage format is “parquet”. SeeParquetSerializer
.serialize_kwargs
anddeserialize_kwargs
, if notNone
, will be saved in the “info.json” file, hence they must be JSON serializable, meaning they need to be the few simple native Python types that are supported by the standardjson
library. (However, the few formats “natively” supported by Biglist may get special treatment to relax this requirement.) If this is not possible, the solution is to define a custom serialization class and register it withregister_storage_format()
.- **kwargs
additional arguments are passed on to
BiglistBase.new()
.
- Returns:
- Biglist
A new
Biglist
object.
- keep_files: bool#
Indicates whether the persisted files should be kept or deleted when the object is garbage-collected.
- property batch_size: int#
The max number of data items in one data file.
- property data_path: Upath#
- property storage_format: str#
The value of
storage_format
used innew()
, either user-specified or the default value.
- property storage_version: int#
The internal format used in persistence. This is a read-only attribute for information only.
- __len__() int [source]#
Number of data items in this biglist.
If data is being appended to this biglist, then this method only includes the items that have been “flushed” to storage. Data items in the internal memory buffer are not counted. The buffer is empty upon calling
_flush()
(internally and automatically) orflush()
(explicitly by user).Changed in version 0.7.4: In previous versions, this count includes items that are not yet flushed.
- __getitem__(idx: int) Element [source]#
Access a data item by its index; negative index works as expected.
Items not yet “flushed” are not accessible by this method. They are considered “invisible” to this method. Similarly, negative
idx
operates in the range of flushed items only.Changed in version 0.7.4: In previous versions, the accessible items include those that are not yet flushed.
- __iter__() Iterator[Element] [source]#
Iterate over all the elements.
Items that are not yet “flushed” are invisible to this iteration.
Changed in version 0.7.4: In previous versions, this iteration includes those items that are not yet flushed.
- append(x: Element) None [source]#
Append a single element to the
Biglist
.In implementation, this appends to an in-memory buffer. Once the buffer size reaches
batch_size
, the buffer’s content will be persisted as a new data file, and the buffer will re-start empty. In other words, whenever the buffer is non-empty, its content is not yet persisted.You can append data to a common biglist from multiple processes. In the processes, use independent
Biglist
objects that point to the same “path”. Each of the objects will maintain its own in-memory buffer and save its own files once the buffer fills up. Remember toflush()
at the end of work in each process.
- make_file_name(buffer_len: int, extra: str = '') str [source]#
This method constructs the file name of a data file. If you need to customize this method for any reason, you should do it via
extra
and keep the other patterns unchanged. The stringextra
will appear between other fixed patterns in the file name.One possible usecase is this: in distributed writing, you want files written by different workers to be distinguishable by the file names. Do something like this:
def worker(datapath: str, worker_id: str, ...): out = Biglist(datapath) _make_file_name = out.make_file_name out.make_file_name = lambda buffer_len: _make_file_name(buffer_len, worker_id) ...
- flush(*, lock_timeout=300, raise_on_write_error: bool = True) None [source]#
_flush()
is called automatically whenever the “append buffer” is full, so to persist the data and empty the buffer. (The capacity of this buffer is equal toself.batch_size
.) However, if this buffer is only partially filled when the user is done adding elements to the biglist, the data in the buffer will not be persisted. This is the first reason that user should callflush
when they are done adding data (viaappend()
orextend()
).Although
_flush()
creates new data files, it does not update the “meta info file” (info.json
in the root ofself.path
) to include the new data files; it only updates the in-memoryself.info
. This is for efficiency reasons, because updatinginfo.json
involves locking.Updating
info.json
to include new data files (created due toappend()
andextend()
) is performed byflush()
. This is the second reason that user should callflush()
at the end of their data writting session, regardless of whether all the new data have been persisted in data files. (They would be if their count happens to be a multiple ofself.batch_size
.)If there are multiple workers adding data to this biglist at the same time (from multiple processes or machines), data added by one worker will not be visible to another worker until the writing worker calls
flush()
and the reading worker callsreload()
.Further, user should assume that data not yet persisted (i.e. still in “append buffer”) are not visible to data reading via
__getitem__()
or__iter__()
and not included in__len__()
, even to the same worker. In common use cases, we do not start reading data until we’re done adding data to the biglist (at least “for now”), hence this is not a big issue.In summary, call
flush()
whenYou are done adding data (for this “session”)
or you need to start reading data
flush()
has overhead. You should call it only in the two situations above. Do not call it frequently “just to be safe”.After a call to
flush()
, there’s no problem to add more elements again byappend()
orextend()
. Data files created byflush()
with less thanbatch_size
elements will stay as is among larger files. This is a legitimate case in parallel or distributed writing, or writing in multiple sessions.
- reload() None [source]#
Reload the meta info.
This is used in this scenario: suppose we have this object pointing to a biglist on the local disk; another object in another process is appending data to the same biglist (that is, it points to the same storage location); then after a while, the meta info file on the disk has been modified by the other process, hence the current object is out-dated; calling this method will bring it up to date. The same idea applies if the storage is in the cloud, and another machine is appending data to the same remote biglist.
Creating a new object pointing to the same storage location would achieve the same effect.
- property files#
- class biglist.BiglistFileReader[source]#
Bases:
FileReader
[Element
]- __init__(path: str | Path | Upath, loader: Callable[[Upath], Any])[source]#
- Parameters:
- path
Path of a data file.
- loader
A function that will be used to load the data file. This must be pickle-able. Usually this is the bound method
load
of a subclass ofupathlib.serializer.Serializer
. If you customize this, please see the doc ofFileReader
.
- load() None [source]#
This method eagerly loads all the data from the file into memory.
Once this method has been called, subsequent data consumption should all draw upon this in-memory copy. However, if the data file is large, and especially if only part of the data is of interest, calling this method may not be the best approach. This all depends on the specifics of the subclass.
A subclass may allow consuming the data and load parts of data in a “as-needed” or “streaming” fashion. In that approach,
__getitem__()
and__iter__()
do not require this method to be called (although they may take advantage of the in-memory data if this method has been called.).
- class biglist.ParquetBiglist[source]#
Bases:
BiglistBase
ParquetBiglist
defines a kind of “external biglist”, that is, it points to pre-existing Parquet files and provides facilities to read them.As long as you use a ParquetBiglist object to read, it is assumed that the dataset (all the data files) have not changed since the object was created by
new()
.- classmethod new(data_path: str | Path | Upath | Sequence[str | Path | Upath], path: str | Path | Upath | None = None, *, suffix: str = '.parquet', **kwargs) ParquetBiglist [source]#
This classmethod gathers info of the specified data files and saves the info to facilitate reading the data files. The data files remain “external” to the
ParquetBiglist
object; the “data” persisted and managed by the ParquetBiglist object are the meta info about the Parquet data files.If the number of data files is small, it’s feasible to create a temporary object of this class (by leaving
path
at the default valueNone
) “on-the-fly” for one-time use.- Parameters:
- path
Passed on to
BiglistBase.new()
ofBiglistBase
.- data_path
Parquet file(s) or folder(s) containing Parquet files.
If this is a single path, then it’s either a Parquet file or a directory. If this is a list, each element is either a Parquet file or a directory; there can be a mix of files and directories. Directories are traversed recursively for Parquet files. The paths can be local, or in the cloud, or a mix of both.
Once the info of all Parquet files are gathered, their order is fixed as far as this
ParquetBiglist
is concerned. The data sequence represented by this ParquetBiglist follows this order of the files. The order is determined as follows:The order of the entries in
data_path
is preserved; if any entry is a directory, the files therein (recursively) are sorted by the string value of each file’s full path.- suffix
Only files with this suffix will be included. To include all files, use
suffix='*'
.- **kwargs
additional arguments are passed on to
__init__()
.
- keep_files: bool#
Indicates whether the meta info persisted by this object should be kept or deleted when this object is garbage-collected.
This does not affect the external Parquet data files.
- property storage_version: int#
- property files#
- class biglist.ParquetFileReader[source]#
Bases:
FileReader
- classmethod get_gcsfs(*, good_for_seconds=600) GcsFileSystem [source]#
Obtain a pyarrow.fs.GcsFileSystem object with credentials given so that the GCP default process of inferring credentials (which involves env vars and file reading etc) will not be triggered.
This is provided under the (un-verified) assumption that the default credential inference process is a high overhead.
- classmethod load_file(path: Upath) ParquetFile [source]#
This reads meta info and constructs a
pyarrow.parquet.ParquetFile
object. This does not load the entire file. Seeload()
for eager loading.- Parameters:
- path
Path of the file.
- property scalar_as_py: bool#
scalar_as_py
controls whether the values returned by__getitem__()
(or indirectly by__iter__()
) are converted from a pyarrow.Scalar type such as pyarrow.lib.StringScalar to a Python builtin type such asstr
.This property can be toggled anytime to take effect until it is toggled again.
- Getter:
Returns this property’s value.
- Setter:
Sets this property’s value.
- property file: ParquetFile#
Return a pyarrow.parquet.ParquetFile object.
Upon initiation of a
ParquetFileReader
object, the file is not read at all. When this property is requested, the file is accessed to construct a pyarrow.parquet.ParquetFile object.
- property metadata: FileMetaData#
- property num_rows: int#
- property num_row_groups: int#
- property num_columns: int#
- property column_names: list[str]#
- data() ParquetBatchData [source]#
Return the entire data in the file.
- __getitem__(idx: int)[source]#
Get one row (or “record”).
If the object has a single column, then return its value in the specified row. If the object has multiple columns, return a dict with column names as keys. The values are converted to Python builtin types if
scalar_as_py
isTrue
.- Parameters:
- idx
Row index in this file. Negative value counts from the end as expected.
- __iter__()[source]#
Iterate over rows. The type of yielded individual elements is the same as the return of
__getitem__()
.
- iter_batches(batch_size=10000) Iterator[ParquetBatchData] [source]#
- row_group(idx: int) ParquetBatchData [source]#
- Parameters:
- idx
Index of the row group of interest.
- columns(cols: Sequence[str]) ParquetFileReader [source]#
Return a new
ParquetFileReader
object that will only load the specified columns.The columns of interest have to be within currently available columns. In other words, a series of calls to this method would incrementally narrow down the selection of columns. (Note this returns a new
ParquetFileReader
, hence one can callcolumns()
again on the returned object.)This method “slices” the data by columns, in contrast to other data access methods that select rows.
- Parameters:
- cols
Names of the columns to select.
Examples
>>> obj = ParquetFileReader('file_path') >>> obj1 = obj.columns(['a', 'b', 'c']) >>> print(obj1[2]) >>> obj2 = obj1.columns(['b', 'c']) >>> print(obj2[3]) >>> obj3 = obj.columns(['d']) >>> for v in obj: >>> print(v)
- column(idx_or_name: int | str) ChunkedArray [source]#
Select a single column.
Note: while
columns()
returns a newParquetFileReader
,column()
returns a pyarrow.ChunkedArray.
- class biglist.ParquetBatchData[source]#
Bases:
Seq
ParquetBatchData
wraps a pyarrow.Table or pyarrow.RecordBatch. The data is already in memory; this class does not involve file reading.ParquetFileReader.data()
andParquetFileReader.iter_batches()
both return or yield ParquetBatchData. In addition, the methodcolumns()
of this class returns a new object of this class.Objects of this class can be pickled.
- scalar_as_py#
Indicate whether scalar values should be converted to Python types from pyarrow types.
- __getitem__(idx: int)[source]#
Get one row (or “record”).
If the object has a single column, then return its value in the specified row. If the object has multiple columns, return a dict with column names as keys. The values are converted to Python builtin types if
scalar_as_py
isTrue
.- Parameters:
- idx
Row index in this batch. Negative value counts from the end as expected.
- __iter__()[source]#
Iterate over rows. The type of yielded individual elements is the same as
__getitem__()
.
- columns(cols: Sequence[str]) ParquetBatchData [source]#
Return a new
ParquetBatchData
object that only contains the specified columns.The columns of interest have to be within currently available columns. In other words, a series of calls to this method would incrementally narrow down the selection of columns. (Note this returns a new
ParquetBatchData
, hence one can callcolumns()
again on the returned object.)This method “slices” the data by columns, in contrast to other data access methods that select rows.
- Parameters:
- cols
Names of the columns to select.
Examples
>>> obj = ParquetBatchData(parquet_table) >>> obj1 = obj.columns(['a', 'b', 'c']) >>> print(obj1[2]) >>> obj2 = obj1.columns(['b', 'c']) >>> print(obj2[3]) >>> obj3 = obj.columns(['d']) >>> for v in obj: >>> print(v)
- column(idx_or_name: int | str) Array | ChunkedArray [source]#
Select a single column specified by name or index.
If
self._data
is pyarrow.Table, return pyarrow.ChunkedArray. Ifself._data
is pyarrow.RecordBatch, return pyarrow.Array.
- biglist.make_parquet_schema(fields_spec: Iterable[Sequence])[source]#
This function constructs a pyarrow schema that is expressed by simple Python types that can be json-serialized.
fields_spec
is a list or tuple, each of its elements accepted bymake_parquet_field()
.This function is motivated by the need of
ParquetSerializer
. Whenbiglist.Biglist
uses a “storage-format” that takes options (such as ‘parquet’), these options can be passed intobiglist.Biglist.new()
(viaserialize_kwargs
anddeserialize_kwargs
) and saved in “info.json”. However, this requires the options to be json-serializable. Therefore, the argumentschema
toParquetSerializer.serialize()
can not be used by this mechanism. As an alternative, user can use the argumentschema_spec
; this argument can be saved in “info.json”, and it is handled by this function.
- biglist.make_parquet_field(field_spec: Sequence)[source]#
filed_spec
is a list or tuple with 2, 3, or 4 elements. The first element is the name of the field. The second element is the spec of the type, to be passed to functionmake_parquet_type()
. Additional elements are the optionalnullable
andmetadata
to the function pyarrow.field().
- biglist.make_parquet_type(type_spec: str | Sequence)[source]#
type_spec
is a spec of arguments to one of pyarrow’s data type factory functions.For simple types, this may be just the type name (or function name), e.g.
'bool_'
,'string'
,'float64'
.For type functions expecting arguments, this is a list or tuple with the type name followed by other arguments, for example,
('time32', 's') ('decimal128', 5, -3)
For compound types (types constructed by other types), this is a “recursive” structure, such as
('list_', 'int64') ('list_', ('time32', 's'), 5)
where the second element is the spec for the member type, or
('map_', 'string', ('list_', 'int64'), True)
where the second and third elements are specs for the key type and value type, respectively, and the fourth element is the optional argument
keys_sorted
to pyarrow.map_(). Below is an example of a struct type:('struct', [('name', 'string', False), ('age', 'uint8', True), ('income', ('struct', (('currency', 'string'), ('amount', 'uint64'))), False)])
Here, the second element is the list of fields in the struct. Each field is expressed by a spec that is taken by
make_parquet_field()
.
- biglist.read_parquet_file(path: str | Path | Upath) ParquetFileReader [source]#
- Parameters:
- path
Path of the file.
- biglist.write_arrays_to_parquet(data: Sequence[Array | ChunkedArray | Iterable], path: str | Path | Upath, *, names: Sequence[str], **kwargs) None [source]#
- Parameters:
- path
Path of the file to create and write to.
- data
A list of data arrays.
- names
List of names for the arrays in
data
.- **kwargs
Passed on to pyarrow.parquet.write_table().
- biglist.write_pylist_to_parquet(data: Sequence, path: str | Path | Upath, *, schema=None, schema_spec=None, metadata=None, **kwargs)[source]#
- class biglist._biglist.ParquetSerializer[source]#
Bases:
Serializer
- classmethod serialize(x: list[dict], schema: Schema | None = None, schema_spec: Sequence | None = None, metadata=None, **kwargs)[source]#
x is a list of data items. Each item is a dict. In the output Parquet file, each item is a “row”.
The content of the item dict should follow a regular pattern. Not every structure is supported. The data x must be acceptable to pyarrow.Table.from_pylist. If unsure, use a list with a couple data elements and experiment with
pyarrow.Table.from_pylist
directly.When using
storage_format='parquet'
forBiglist
, each data element is a dict with a consistent structure that is acceptable topyarrow.Table.from_pylist
. When reading the Biglist, the original Python data elements are returned. (A record read out may not be exactly equal to the original that was written, in that elements that were missing in a record when written may have been filled in withNone
when read back out.) In other words, the reading is not like that ofParquetBiglist
. You can always create a separate ParquetBiglist for the data files of the Biglist in order to use Parquet-style data reading. The data files are valid Parquet files.If neither
schema
norschema_spec
is specified, then the data schema is auto-inferred based on the first element ofx
. If this does not work, you can specify eitherschema
orschema_spec
. The advantage ofschema_spec
is that it is json-serializable Python types, hence can be passed intoBiglist.new()
viaserialize_kwargs
and saved in “info.json” of the biglist.If
schema_spec
is not flexible or powerful enough for your usecase, then you may have to useschema
.