biglist#

(Generated on Feb 01 2024 for biglist version 0.8.9.)

The package biglist provides persisted, out-of-memory Python data structures that implement the Seq interface (a simplified Sequence) with the capabilities of concurrent and distributed reading and writing. The main use case is sequentially processing large amounts of data that can not fit in memory.

Currently, two kinds of “biglists” are provided, namely Biglist and ParquetBiglist.

Biglist manages writing and reading. Here, “writing” refers to adding data to this facility and be managed by it. The class manages its data files in addition to meta info. Writing is append-only; updating existing data is not supported. Appending can be conducted by a number of distributed workers.

ParquetBiglist defines a kind of “external biglist”. When given the paths to a set of pre-existing data files in the Apache Parquet format, this class provides a rich set of facilities for reading the data. (Potentially confusingly, Biglist can save data in Parquet format among others; that type of a Biglist is not a ParquetBiglist, which supports reading only.)

Biglist and ParquetBiglist share the same core API for reading. Although random element access is supported, it is not optimized and is not the target usage pattern. Random element access can be very inefficient. The intended way of data consumption is by iteration, which can be done collectively by distributed workers.

Persistence can be on local disk or in cloud storage. Thanks to the package upathlib, the implementation as wel as the end-user API is agnostic to the location of storage.

In support of ParquetBiglist, some utilities are provided for reading and writing Parquet data files. These utilities are useful in their own right.

Additional utilities provide mechanisms for “slicing and dicing” a biglist, as well as “chaining up” a series of biglists. These utilities work not only for biglist, but also for any Seq.

Installation#

To install biglist and use local disk for persistence, simply do

python3 -m pip install biglist

If you need to use Google Cloud Storage for persistence, do

python3 -m pip install "biglist[gcs]"

Creating a Biglist#

Create a new Biglist object via the classmethod new():

>>> from biglist import Biglist
>>> mylist = Biglist.new(batch_size=100)

then add data to it, for example,

>>> for x in range(10_023):
...     mylist.append(x)

This saves a new data file for every 100 elements accumulated. In the end, there are 23 elements in a memory buffer that are not yet persisted to disk. The code has no way to know whether we will append more elements soon, hence it does not save this partial batch. Suppose we’re done with adding data, we call flush() to persist the content of the buffer to disk:

>>> mylist.flush()

If, after a while, we decide to append more data to mylist, we just call append() again. We can continue to add more data as long as the disk has space. New data files will be saved. The smaller file containing 23 elements will stay there among larger files with no problem.

Now let’s take a look at the biglist object:

>>> mylist  
<Biglist at '/tmp/19f88a17-3e78-430f-aad0-a35d39485f80' with 10023 elements in 101 data file(s)>
>>> len(mylist)
10023
>>> mylist.path  
LocalUpath('/tmp/19f88a17-3e78-430f-aad0-a35d39485f80')
>>> mylist.num_data_files
101

The data have been saved in the directory /tmp/19f88a17-3e78-430f-aad0-a35d39485f80, which is a temporary one because we did not tell new() where to save data. When the object mylist gets garbage collected, this directory will be deleted automatically. This has its uses, but often we want to save the data for future use. In that case, just pass a currently non-existent directory to new(), for example,

>>> yourlist = Biglist.new('/tmp/project/data/store-a', batch_size=10_000)

Later, initiate a Biglist object for reading the existing dataset:

>>> yourlist = Biglist('/tmp/project/data/store-a')

If we want to persist the data in Google Cloud Storage, we would specify a path in the 'gs://bucket-name/path/to/data' format.

The Seq protocol and FileReader class#

Before going further with Biglist, let’s digress a bit and introduce a few helper facilities.

BiglistBase (and its subclasses Biglist and ParquetBiglist) could have implemented the Sequence interface in the standard library. However, that interface contains a few methods that are potentially hugely inefficient for Biglist, and hence are not supposed to be used on a Biglist. These methods include __contains__, count, and index. These methods require iterating over the entire dataset for purposes about one particular data item. For Biglist, this would require loading and unloading each of a possibly large number of data files. Biglist does not want to give user the illusion that they can use these methods at will and lightly.

For this reason, the protocol Seq is defined, which has three methods: __len__(), __iter__(), and __getitem__(). Therefore, classes that implement this protocol are Sized, Iterable, and support random element access by index. Most classes in the biglist package implement this protocol rather than the standard Sequence.

Because a biglist manages any number of data files, a basic operation concerns reading one data file. Each subclass of BiglistBase implements its file-reading class as a subclass of FileReader. FileReader implements the Seq protocol, hence the data items in one data file can be used like a list. Importantly, a FileReader instance does not load the data file upon initialization. At that moment, the instance can be pickled. This lends itself to uses in multiprocessing. This point of the design will be showcased later.

The centerpiece of a biglist is a sequence of data files in persistence, or correspondingly, a sequence of FileReader’s in memory. The property BiglistBase.files() of BiglistBase returns a FileSeq to manage the FileReader objects of the biglist. Besides implementing the Seq protocol, FileSeq provides ways to use the FileReader’s by distributed workers.

Finally, BiglistBase implements the Seq protocol for its data items across the data files.

To sum up, a BiglistBase is a Seq of data items across data files; BiglistBase.files is a FileSeq, which in turn is a Seq of FileReaders; a FileReader is a Seq of data items in one data file.

Reading a Biglist#

Random element access#

We can access any element of a Biglist like we do a list:

>>> mylist[18]
18
>>> mylist[-3]
10020

Biglist does not support slicing directly. However, the class Slicer wraps a Seq and enables element access by a single index, by a slice, or by a list of indices:

>>> from biglist import Slicer
>>> v = Slicer(mylist)
>>> len(v)
10023
>>> v  
<Slicer into 10023/10023 of <Biglist at '/tmp/dc260854-8041-40e8-801c-34084451d7a3' with 10023 elements in 101 data file(s)>>
>>> v[83]
83
>>> v[100:104]  
<Slicer into 4/10023 of <Biglist at '/tmp/dc260854-8041-40e8-801c-34084451d7a3' with 10023 elements in 101 data file(s)>>
>>>

Note that slicing the slicer does not return a list of values. Instead, it returns another Slicer object, which, naturally, can be used the same way, including slicing further.

A Slicer object is a Iterable (in fact, it is a Seq), hence we can gather all of its elements in a list:

>>> list(v[100:104])
[100, 101, 102, 103]

Slicer provides a convenience method collect() to do the same:

>>> v[100:104].collect()
[100, 101, 102, 103]

A few more examples:

>>> v[-8:].collect()
[10015, 10016, 10017, 10018, 10019, 10020, 10021, 10022]
>>> v[-8::2].collect()
[10015, 10017, 10019, 10021]
>>> v[[1, 83, 250, -2]]  
<Slicer into 4/10023 of <Biglist at '/tmp/dc260854-8041-40e8-801c-34084451d7a3' with 10023 elements in 101 data file(s)>>
>>> v[[1, 83, 250, -2]].collect()
[1, 83, 250, 10021]
>>> v[[1, 83, 250, -2]][-3:].collect()
[83, 250, 10021]

Iteration#

Don’t be carried away by the many easy and flexible ways of random access. Random element access for Biglist is inefficient. The reason is that it needs to load a data file that contains the element of interest. If the biglist has many data files and we are “jumping around” randomly, it is wasting a lot of time loading entire files just to access a few data elements in them. (However, consecutive random accesses to elements residing in the same file will not load the file repeatedly.)

The preferred way to consume the data of a Biglist is to iterate over it. For example,

>>> for i, x in enumerate(mylist):
...     print(x)
...     if i > 4:
...         break
0
1
2
3
4
5

Conceptually, this loads each data file in turn and yields the elements in each file. The implementation “pre-loads” a few files in background threads to speed up the iteration.

Reading from a Biglist in multiple processes#

To collectively consume a Biglist object from multiple processes, we can distribute FileReaders to the processes. The FileReader’s of mylist is accessed via its property files(), which returns a FileSeq:

>>> files = mylist.files
>>> files  
<BiglistFileSeq at '/tmp/dc260854-8041-40e8-801c-34084451d7a3' with 10023 elements in 101 data file(s)>
>>> len(files)
101
>>> files.num_data_files
101
>>> files.num_data_items
10023
>>> files[0]  
<BiglistFileReader for '/tmp/cfb39dc0-94bb-4557-a056-c7cea20ea653/store/1669667946.647939_46eb97f6-bdf3-45d2-809c-b90c613d69c7_100.pickle_zstd'>

A FileReader object is light-weight. Upon initialization, it has not loaded the file yet—it merely records the file path along with the function that will be used to load the file. In addition, FileReader objects are friendly to pickling, hence lend themselves to multiprocessing code. Let’s design a small experiment to consume this dataset in multiple processes:

>>> def worker(file_reader):
...     total = 0
...     for x in file_reader:
...         total += x
...     return total
>>> from concurrent.futures import ProcessPoolExecutor
>>> total = 0
>>> with ProcessPoolExecutor(5) as pool:  
...     tasks = [pool.submit(worker, fr) for fr in mylist.files]  
...     for t in tasks:  
...         total += t.result()  
>>> total  
50225253

What is the expected result?

>>> sum(mylist)
50225253

Sure enough, this verifies that the entire biglist is consumed by the processes collectively.

If the file loading is the bottleneck of the task, we can use threads in place of processes.

Similarly, it is possible to read mylist from multiple machines if mylist is stored in the cloud. Since a FileReader object is pickle-able, it works just fine if we pickle it and send it to another machine, provided the file path that is contained in the FileReader object is in the cloud, hence accessible from the other machine. We need a mechanism to distribute these FileReader objects to machines. For that, check out Multiplexer from upathlib.

Writing to a Biglist in multiple workers#

The flip side of distributed reading is distributed writing. If we have a biglist on the local disk, we can append to it from multiple processes or threads. If we have a biglist in the cloud, we can append to it from multiple machines. Let’s use multiprocessing to demo the idea.

First, we create a new Biglist at a storage location of our choosing:

>>> from upathlib import LocalUpath
>>> path = LocalUpath('/tmp/a/b/c/d')
>>> path.rmrf()
0
>>> yourlist = Biglist.new(path, batch_size=6)

In each worker process, we will open this biglist by Biglist(path) and append data to it. Now that this has a presence on the disk, Biglist(path) will not complain the dataset does not exist.

>>> yourlist.info
{'storage_format': 'pickle-zstd', 'storage_version': 3, 'batch_size': 6, 'data_files_info': []}
>>> yourlist.path
LocalUpath('/tmp/a/b/c/d')
>>> len(yourlist)
0

Then we can tell workers, “here is the location, add data to it.” Let’s design a simple worker:

>>> def worker(path, idx):
...     yourlist = Biglist(path)
...     for i in range(idx):
...         yourlist.append(100 * idx + i)
...     yourlist.flush()

From the main process, let’s instruct the workers to write data to the same Biglist:

>>> import multiprocessing
>>> with ProcessPoolExecutor(10, mp_context=multiprocessing.get_context('spawn')) as pool:  
...     tasks = [pool.submit(worker, path, idx) for idx in range(10)]  
...     for t in tasks:  
...         _ = t.result()  

Let’s see what we’ve got:

>>> yourlist.reload()  
>>> len(yourlist)  
45
>>> yourlist.num_data_files  
12
>>> list(yourlist)  
[400, 401, 402, 403, 500, 501, 502, 503, 504, 600, 601, 602, 603, 604, 605, 700, 701, 702, 703, 704, 705, 706, 900, 901, 902, 903, 904, 905, 906, 907, 908, 100, 800, 801, 802, 803, 804, 805, 806, 807, 200, 201, 300, 301, 302]
>>>

Does this look right? It took me a moment to realize that idx = 0 did not append anything. So, the data elements are in the 100, 200, …, 900 ranges; that looks right. But the order of things is confusing.

Well, in a distributed setting, there’s no guarantee of order. It’s not a problem that numbers in the 800 range come after those in the 900 range.

We can get more insights if we dive to the file level:

>>> for f in yourlist.files:  
...     print(f)  
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.971439_f84d0cf3-e2c4-40a7-acf2-a09296ff73bc_1.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.973651_63e4ca6d-4e44-49e1-a035-6d60a88f7789_.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.975576_f59ab2f0-be9c-477d-a95b-70d3dfc00d94_6.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.982828_3219d2d1-50e2-4b41-b595-2c6df4e63d3c_6.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.983024_674e57de-66ed-4e3b-bb73-1db36c13fd6f_1.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.985425_78eec966-8139-4401-955a-7b81fb8b47b9_6.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073410.985555_752b4975-fbf3-4172-9063-711722a83abc_3.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073411.012161_3a7620f5-b040-4cec-9018-e8bd537ea98d_1.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073411.034502_4a340751-fa1c-412e-8f49-13f2ae83fc3a_6.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073411.035010_32c58dbe-e3a2-4ba1-9ffe-32c127df11a6_2.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073411.067370_20a0e926-7a5d-46a1-805d-86d16c346852_2.pickle_zstd'>
<BiglistFileReader for '/tmp/a/b/c/d/store/20230129073411.119890_89ae31bc-7c48-488d-8dd1-e22212773d79_3.pickle_zstd'>

The file names do not appear to be totally random. They follow some pattern that facilitates ordering, and they have encoded some useful info. In fact, the part before the first underscore is the date and time of file creation, with a resolution to microseconds. This is followed by a uuid.uuid4 random string. When we iterate the Biglist object, files are read in the order of their paths, hence in the order of creation time. The number in the file name before the suffix is the number of elements in the file.

We can get similar info in a more readable format:

>>> for v in yourlist.files.data_files_info:  
...     print(v)  
['/tmp/a/b/c/d/store/20230129073410.971439_f84d0cf3-e2c4-40a7-acf2-a09296ff73bc_4.pickle_zstd', 4, 4]
['/tmp/a/b/c/d/store/20230129073410.973651_63e4ca6d-4e44-49e1-a035-6d60a88f7789_5.pickle_zstd', 5, 9]
['/tmp/a/b/c/d/store/20230129073410.975576_f59ab2f0-be9c-477d-a95b-70d3dfc00d94_6.pickle_zstd', 6, 15]
['/tmp/a/b/c/d/store/20230129073410.982828_3219d2d1-50e2-4b41-b595-2c6df4e63d3c_6.pickle_zstd', 6, 21]
['/tmp/a/b/c/d/store/20230129073410.983024_674e57de-66ed-4e3b-bb73-1db36c13fd6f_1.pickle_zstd', 1, 22]
['/tmp/a/b/c/d/store/20230129073410.985425_78eec966-8139-4401-955a-7b81fb8b47b9_6.pickle_zstd', 6, 28]
['/tmp/a/b/c/d/store/20230129073410.985555_752b4975-fbf3-4172-9063-711722a83abc_3.pickle_zstd', 3, 31]
['/tmp/a/b/c/d/store/20230129073411.012161_3a7620f5-b040-4cec-9018-e8bd537ea98d_1.pickle_zstd', 1, 32]
['/tmp/a/b/c/d/store/20230129073411.034502_4a340751-fa1c-412e-8f49-13f2ae83fc3a_6.pickle_zstd', 6, 38]
['/tmp/a/b/c/d/store/20230129073411.035010_32c58dbe-e3a2-4ba1-9ffe-32c127df11a6_2.pickle_zstd', 2, 40]
['/tmp/a/b/c/d/store/20230129073411.067370_20a0e926-7a5d-46a1-805d-86d16c346852_2.pickle_zstd', 2, 42]
['/tmp/a/b/c/d/store/20230129073411.119890_89ae31bc-7c48-488d-8dd1-e22212773d79_3.pickle_zstd', 3, 45]

The values for each entry are file path, number of elements in the file, and accumulative number of elements. The accumulative count is obviously the basis for random access—Biglist uses this to figure out which file contains the element at a specified index.

Creating a ParquetBiglist#

Apache Parquet is a popular file format in the “big data” domain. Many tools save large amounts of data in this format, often in a large number of files, sometimes in nested directories.

ParquetBiglist takes such data files as pre-existing, read-only, external data, and provides an API to read the data in various ways. This is analogous to, for example, the “external table” concept in BigQuery.

Let’s create a couple small Parquet files to demonstrate this API.

>>> from upathlib import LocalUpath
>>> import random
>>> from biglist import write_arrays_to_parquet
>>>
>>> path = LocalUpath('/tmp/a/b/c/e')
>>> path.rmrf()
0
>>> year = list(range(1970, 2021))
>>> make = ['honda'] * len(year)
>>> sales = list(range(123, 123 + len(make)))
>>> write_arrays_to_parquet([make, year, sales], path / 'honda.parquet', names=['make', 'year', 'sales'], row_group_size=10)
>>>
>>> year = list(range(1960, 2021))
>>> make = ['ford'] * len(year)
>>> sales = list(range(234, 234 + len(make)))
>>> write_arrays_to_parquet([make, year, sales], path / 'ford.parquet', names=['make', 'year', 'sales'], row_group_size=10)

Now we want to treat the contents of honda.parquet and ford.parquet combined as one dataset, and use biglist tools to read it.

>>> from biglist import ParquetBiglist
>>> car_data = ParquetBiglist.new(path)
>>> car_data  
<ParquetBiglist at '/tmp/edd9cefb-179b-46d2-8946-7dc8ae1bdc50' with 112 records in 2 data file(s) stored at ['/tmp/a/b/c/e']>
>>> car_data.path  
LocalUpath('/tmp/edd9cefb-179b-46d2-8946-7dc8ae1bdc50')
>>> len(car_data)
112
>>> car_data.num_data_files
2
>>> list(car_data.files)
[<ParquetFileReader for '/tmp/a/b/c/e/ford.parquet'>, <ParquetFileReader for '/tmp/a/b/c/e/honda.parquet'>]

what ParquetBiglist.new() does is to read the meta data of each file in the directory, recursively, and save relevant info to facilitate its reading later. The location given by car_data.path is the directory where ParquetBiglist saves its meta info, and not where the actual data are. As is the case with Biglist, this directory is a temporary one, which will be deleted once the object car_data goes away. If we wanted to keep the directory for future use, we should have specified a location when calling new().

Reading a ParquetBiglist#

The fundamental reading API is the same between Biglist and ParquetBiglist: random access, slicing/dicing using Slicer, iteration, distributed reading via its files()—these are all used the same way.

However, the structures of the data files are very different between Biglist and ParquetBiglist. For Biglist, each data file contains a straight Python list, elements of which being whatever have been passed into Biglist.append(). For ParquetBiglist, each data file is in a sophisticated columnar format, which is publicly documented. A variety of ways are provided to get data out of the Parquet format; some favor convenience, some others favor efficiency. Let’s see some examples.

A row perspective#

>>> for i, x in enumerate(car_data):
...     print(x)
...     if i > 5:
...         break
{'make': 'ford', 'year': 1960, 'sales': 234}
{'make': 'ford', 'year': 1961, 'sales': 235}
{'make': 'ford', 'year': 1962, 'sales': 236}
{'make': 'ford', 'year': 1963, 'sales': 237}
{'make': 'ford', 'year': 1964, 'sales': 238}
{'make': 'ford', 'year': 1965, 'sales': 239}
{'make': 'ford', 'year': 1966, 'sales': 240}

This is the most basic iteration, Biglist-style, one row (or “record”) at a time. When there are multiple columns, each row is presented as a dict with column names as keys.

Reading a Parquet data file is performed by ParquetFileReader.

>>> f0 = car_data.files[0]
>>> f0
<ParquetFileReader for '/tmp/a/b/c/e/ford.parquet'>
>>> f0.path
LocalUpath('/tmp/a/b/c/e/ford.parquet')

First of all, a FileReader object is a Seq, providing row-based view into the data:

>>> len(f0)
61
>>> f0[2]
{'make': 'ford', 'year': 1962, 'sales': 236}
>>> f0[-10]
{'make': 'ford', 'year': 2011, 'sales': 285}
>>> Slicer(f0)[-3:].collect()
[{'make': 'ford', 'year': 2018, 'sales': 292}, {'make': 'ford', 'year': 2019, 'sales': 293}, {'make': 'ford', 'year': 2020, 'sales': 294}]
>>> for i, x in enumerate(f0):
...     print(x)
...     if i > 5:
...         break
{'make': 'ford', 'year': 1960, 'sales': 234}
{'make': 'ford', 'year': 1961, 'sales': 235}
{'make': 'ford', 'year': 1962, 'sales': 236}
{'make': 'ford', 'year': 1963, 'sales': 237}
{'make': 'ford', 'year': 1964, 'sales': 238}
{'make': 'ford', 'year': 1965, 'sales': 239}
{'make': 'ford', 'year': 1966, 'sales': 240}

ParquetFileReader uses pyarrow to read the Parquet files. The values above are nice and simple Python types, but they are not the original pyarrow types; they have undergone a conversion. This conversion can be toggled by the property ParquetFileReader.scalar_as_py:

>>> f0[8]
{'make': 'ford', 'year': 1968, 'sales': 242}
>>> f0.scalar_as_py = False
>>> f0[8]
{'make': <pyarrow.StringScalar: 'ford'>, 'year': <pyarrow.Int64Scalar: 1968>, 'sales': <pyarrow.Int64Scalar: 242>}
>>> f0.scalar_as_py = True

A Parquet file consists of one or more “row groups”. Each row-group is a batch of rows stored column-wise. We can get info about the row-groups, or even retrieve a row-group as the unit of processing:

>>> f0.num_row_groups
7
>>> f0.metadata  
<pyarrow._parquet.FileMetaData object at 0x7...>
  created_by: parquet-cpp-arrow version 1...
  num_columns: 3
  num_rows: 61
  num_row_groups: 7
  format_version: 2.6
  serialized_size: 2375
>>> f0.metadata.row_group(1)  
<pyarrow._parquet.RowGroupMetaData object at 0x7...>
  num_columns: 3
  num_rows: 10
  total_byte_size: 408
  sorting_columns: ()
>>> f0.metadata.row_group(0)  
<pyarrow._parquet.RowGroupMetaData object at 0x7...>
  num_columns: 3
  num_rows: 10
  total_byte_size: 408
  sorting_columns: ()
>>> rg = f0.row_group(0)
>>> rg
<ParquetBatchData with 10 rows, 3 columns>

(We have specified row_group_size=10 in the call to write_arrays_to_parquet() for demonstration. In practice, a row-group tends to be much larger.)

A ParquetBatchData object is again a Seq. All of our row access tools are available:

>>> rg.num_rows
10
>>> len(rg)
10
>>> rg.num_columns
3
>>> rg[3]
{'make': 'ford', 'year': 1963, 'sales': 237}
>>> rg[-2]
{'make': 'ford', 'year': 1968, 'sales': 242}
>>> Slicer(rg)[4:7].collect()
[{'make': 'ford', 'year': 1964, 'sales': 238}, {'make': 'ford', 'year': 1965, 'sales': 239}, {'make': 'ford', 'year': 1966, 'sales': 240}]
>>> rg.scalar_as_py = False
>>> rg[3]
{'make': <pyarrow.StringScalar: 'ford'>, 'year': <pyarrow.Int64Scalar: 1963>, 'sales': <pyarrow.Int64Scalar: 237>}
>>> rg.scalar_as_py = True

When we request a specific row, ParquetFileReader will load the row-group that contains the row of interest. It doe not load the entire data in the file. However, we can get greedy and ask for the whole data in one go:

>>> f0
<ParquetFileReader for '/tmp/a/b/c/e/ford.parquet'>
>>> f0.data()
<ParquetBatchData with 61 rows, 3 columns>

This, again, is a ParquetBatchData object. All the familiar row access tools are at our disposal.

Finally, if the file is large, we may choose to iterate over it by batches instead of by rows:

>>> for batch in f0.iter_batches(batch_size=10):
...     print(batch)
<ParquetBatchData with 10 rows, 3 columns>
<ParquetBatchData with 10 rows, 3 columns>
<ParquetBatchData with 10 rows, 3 columns>
<ParquetBatchData with 10 rows, 3 columns>
<ParquetBatchData with 10 rows, 3 columns>
<ParquetBatchData with 10 rows, 3 columns>
<ParquetBatchData with 1 rows, 3 columns>

The batches are again ParquetBatchData objects. At the core of a ParquetBatchData is a pyarrow.Table or pyarrow.RecordBatch. ParquetBatchData is friendly to pickle and, I suppose, pickling pyarrow objects are very efficient. So, the batches could be useful in multiprocessing code.

A column perspective#

Parquet is a columnar format. If we only need a subset of the columns, we should say so, so that the un-needed columns will not be loaded from disk (or cloud, as it may be).

Both ParquetFileReader and ParquetBatchData provide the method columns() (columns()) to return a new object with only the selected columns. For ParquetFileReader, if data have not been loaded, reading of the new object will only load the selected columns. For ParquetBatchData, its data is already in memory, hence column selection leads to a data subset.

>>> f0.column_names
['make', 'year', 'sales']
>>> cols = f0.columns(['year', 'sales'])
>>> cols
<ParquetFileReader for '/tmp/a/b/c/e/ford.parquet'>
>>> cols.num_columns
2
>>> cols.column_names
['year', 'sales']

ParquetFileReader.columns() returns another ParquetFileReader, whereas ParquetBatchData.columns() returns another ParquetBatchData:

>>> rg
<ParquetBatchData with 10 rows, 3 columns>
>>> rg.column_names
['make', 'year', 'sales']
>>> rgcols = rg.columns(['make', 'year'])
>>> rgcols.column_names
['make', 'year']
>>> len(rgcols)
10
>>> rgcols[5]
{'make': 'ford', 'year': 1965}

It’s an interesting case when there’s only one column:

>>> f0
<ParquetFileReader for '/tmp/a/b/c/e/ford.parquet'>
>>> sales = f0.columns(['sales'])
>>> sales
<ParquetFileReader for '/tmp/a/b/c/e/ford.parquet'>
>>> sales.column_names
['sales']
>>> len(sales)
61
>>> sales[3]
237
>>> list(sales)  
[234, 235, 236, 237, 238, 239, ..., 291, 292, 293, 294]
>>> Slicer(sales)[:8].collect()
[234, 235, 236, 237, 238, 239, 240, 241]

Notice the type of the values (rows) returned from the element access methods: it’s not dict. Because there’s only one column whose name is known, there is no need to carry that info with every row. Also note that the values have been converted to Python builtin types. The original pyarrow values will not look as nice:

>>> sales.scalar_as_py = False
>>> Slicer(sales)[:8].collect()
[<pyarrow.Int64Scalar: 234>, <pyarrow.Int64Scalar: 235>, <pyarrow.Int64Scalar: 236>, <pyarrow.Int64Scalar: 237>, <pyarrow.Int64Scalar: 238>, <pyarrow.Int64Scalar: 239>, <pyarrow.Int64Scalar: 240>, <pyarrow.Int64Scalar: 241>]
>>> sales.scalar_as_py = True

Both ParquetFileReader and ParquetBatchData have another method called column() (column()), which retrieves a single column and returns a pyarrow.Array or pyarrow.ChunkedArray. For example,

>>> sales2 = f0.column('sales')
>>> sales2  
<pyarrow.lib.ChunkedArray object at 0x...>
[
  [
    234,
    235,
    236,
    237,
    238,
    ...
    290,
    291,
    292,
    293,
    294
  ]
]

ParquetFileReader.column() returns a pyarrow.ChunkedArray, whereas ParquetBatchData.column() returns either a pyarrow.ChunkedArray or a pyarrow.Array.

Performance considerations#

While some biglist facilities shown here provide convenience and API elegance, it may be a safe bet to use pyarrow facilities directly if ultimate performance is a requirement.

We have seen ParquetFileReader.scalar_as_py (and ParquetBatchData.scalar_as_py); it’s worthwhile to experiment whether that conversion impacts performance in a particular context.

There are several ways to get to a pyarrow object quickly and proceed with it. A newly initiated ParquetFileReader has not loaded any data yet. Its property file initiates a pyarrow.parquet.ParquetFile object (reading meta data during initiation) and returns it. We may take it and go all the way down the pyarrow path:

>>> f1 = car_data.files[1]
>>> f1._data is None
True
>>> file = f1.file
>>> file  
<pyarrow.parquet.core.ParquetFile object at 0x7...>
>>> f1._file
<pyarrow.parquet.core.ParquetFile object at 0x7...>

We have seen that ParquetFileReader.row_group() and ParquetFileReader.iter_batches() both return ParquetBatchData objects. In contrast to ParquetFileReader, which is “lazy” in terms of data loading, a ParquetBatchData already has its data in memory. ParquetFileReader has another method, namely data(), that eagerly loads the entire data of the file and wraps it in a ParquetBatchData object:

>>> data = f1.data()
>>> data
<ParquetBatchData with 51 rows, 3 columns>

The pyarrow data wrapped in ParquetBatchData can be acquired easily:

>>> padata = data.data()
>>> padata
pyarrow.Table
make: string
year: int64
sales: int64
----
make: [["honda","honda","honda","honda","honda",...,"honda","honda","honda","honda","honda"]]
year: [[1970,1971,1972,1973,1974,...,2016,2017,2018,2019,2020]]
sales: [[123,124,125,126,127,...,169,170,171,172,173]]

Finally, we have seen that ParquetFileReader.column() and ParquetBatchData.column()—the single-column selectors—return a pyarrow object. It is either a pyarrow.Array or a pyarrow.ChunkedArray.

Other utilities#

Chain#

Chain takes a series of Seqs and returns a combined Seq without data copy. For example,

>>> from biglist import Chain
>>> numbers = list(range(10))
>>> car_data  
<ParquetBiglist at '/tmp/edd9cefb-179b-46d2-8946-7dc8ae1bdc50' with 112 records in 2 data file(s) stored at ['/tmp/a/b/c/e']>
>>> combined = Chain(numbers, car_data)
>>> combined[3]
3
>>> combined[9]
9
>>> combined[10]
{'make': 'ford', 'year': 1960, 'sales': 234}
>>>
>>> car_data[0]
{'make': 'ford', 'year': 1960, 'sales': 234}

Slicer#

Slicer takes any Seq and provides __getitem__() that accepts a single index, or a slice, or a list of indices. A single-index access will return the requested element; the other two scenarios return a new Slicer via a zero-copy operation. To get all the elements out of a Slicer, either iterate it or call its method collect().

We should emphasize that Chain and Slicer work with any Seq, hence they are useful independent of the other biglist classes.

Reading Parquet files#

The function read_parquet_file() is provided to read a single Parquet file independent of ParquetBiglist. It returns a ParquetFileReader. All the facilities of this class, as demonstrated above, are ready for use:

>>> [v.path for v in car_data.files]
[LocalUpath('/tmp/a/b/c/e/ford.parquet'), LocalUpath('/tmp/a/b/c/e/honda.parquet')]
>>>
>>> from biglist import read_parquet_file
>>> ff = read_parquet_file(car_data.files[1].path)
>>> ff
<ParquetFileReader for '/tmp/a/b/c/e/honda.parquet'>
>>> len(ff)
51
>>> ff.column_names
['make', 'year', 'sales']
>>> ff[3]
{'make': 'honda', 'year': 1973, 'sales': 126}
>>> Slicer(ff.columns(['year', 'sales']))[10:16].collect()
[{'year': 1980, 'sales': 133}, {'year': 1981, 'sales': 134}, {'year': 1982, 'sales': 135}, {'year': 1983, 'sales': 136}, {'year': 1984, 'sales': 137}, {'year': 1985, 'sales': 138}]
>>> ff.num_row_groups
6
>>> ff.row_group(3).column('sales')  
<pyarrow.lib.ChunkedArray object at 0x7...>
[
  [
    153,
    154,
    155,
    156,
    157,
    158,
    159,
    160,
    161,
    162
  ]
]

Writing Parquet files#

The function write_arrays_to_parquet() is provided to write data columns to a single Parquet file.

>>> from uuid import uuid4
>>> from biglist import write_arrays_to_parquet, read_parquet_file
>>> import random
>>> from upathlib import LocalUpath
>>> N = 10000
>>> path = LocalUpath('/tmp/a/b/c/d')
>>> path.rmrf()
1
>>> write_arrays_to_parquet([[random.randint(0, 10000) for _ in range(N)], [str(uuid4()) for _ in range(N)]], path / 'data.parquet', names=['key', 'value'])
>>> f = read_parquet_file(path / 'data.parquet')
>>> f
<ParquetFileReader for '/tmp/a/b/c/d/data.parquet'>
>>> len(f)
10000
>>> f.metadata   
<pyarrow._parquet.FileMetaData object at 0x7...>
  created_by: parquet-cpp-arrow version 1...
  num_columns: 2
  num_rows: 10000
  num_row_groups: 1
  format_version: 2.6
  serialized_size: 609
>>> f.metadata.schema  
<pyarrow._parquet.ParquetSchema object at 0x7...>
required group field_id=-1 schema {
  optional int64 field_id=-1 key;
  optional binary field_id=-1 value (String);
}

>>>

Similarly, write_pylist_to_parquet() writes data rows to a Parquet file:

>>> from biglist import write_pylist_to_parquet
>>> data = [{'name': str(uuid4()), 'age': random.randint(1, 100), 'income': {'employer': str(uuid4()), 'amount': random.randint(10000, 100000)}} for _ in range(100)]
>>> f = LocalUpath('/tmp/test/data.parquet')
>>> f.rmrf()  
0
>>> write_pylist_to_parquet(data, f)
>>> ff = read_parquet_file(f)
>>> ff[0]  
{'name': '066ced72-fd33-492a-9180-39eeca541b1a', 'age': 75, 'income': {'amount': 17840, 'employer': 'bfc176a0-5257-4913-bd1e-3c4d51885e0c'}}
>>> ff[11]  
{'name': 'a239af28-41ff-4215-b560-9c45db15478e', 'age': 12, 'income': {'amount': 17488, 'employer': 'e97f70c9-1659-4fa6-9123-eb39779d00d6'}}
>>>

API reference#

biglist._util.Element = ~Element#

This type variable is used to annotate the type of a data element.

class biglist.Seq[source]#

Bases: Protocol[Element]

The protocol Seq is simpler and broader than the standard Sequence. The former requires/provides only __len__, __getitem__, and __iter__, whereas the latter adds __contains__, __reversed__, index and count to these three. Although the extra methods can be implemented using the three basic methods, they could be massively inefficient in particular cases, and that is the case in the applications targeted by biglist. For this reason, the classes defined in this package implement the protocol Seq rather than Sequence, to prevent the illusion that methods __contains__, etc., are usable.

A class that implements this protocol is sized, iterable, and subscriptable by an int index. This is a subset of the methods provided by Sequence. In particular, Sequence implements this protocol, hence is considered a subclass of Seq for type checking purposes:

>>> from biglist import Seq
>>> from collections.abc import Sequence
>>> issubclass(Sequence, Seq)
True

The built-in dict and tuple also implement the Seq protocol.

The type parameter Element indicates the type of each data element.

__len__() int[source]#
__getitem__(index: int) Element[source]#
__iter__() Iterator[Element][source]#
class biglist.FileReader[source]#

Bases: Seq[Element]

A FileReader is a “lazy” loader of a data file. It keeps track of the path of a data file along with a loader function, but performs the loading only when needed. In particular, upon initiation of a FileReader object, file loading has not happened, and the object is light weight and friendly to pickling.

Once data have been loaded, this class provides various ways to navigate the data. At a minimum, the Seq API is implemented.

With loaded data and associated facilities, this object may no longer be pickle-able, depending on the specifics of a subclass.

One use case of this class is to pass around FileReader objects (that are initiated but not loaded) in multiprocessing code for concurrent data processing.

This class is generic with a parameter indicating the type of the elements in the data sequence contained in the file. For example you can write:

def func(file_reader: FileReader[int]):
    ...
abstract load() None[source]#

This method eagerly loads all the data from the file into memory.

Once this method has been called, subsequent data consumption should all draw upon this in-memory copy. However, if the data file is large, and especially if only part of the data is of interest, calling this method may not be the best approach. This all depends on the specifics of the subclass.

A subclass may allow consuming the data and load parts of data in a “as-needed” or “streaming” fashion. In that approach, __getitem__() and __iter__() do not require this method to be called (although they may take advantage of the in-memory data if this method has been called.).

class biglist.Slicer[source]#

Bases: Seq[Element]

This class wraps a Seq and enables element access by slice or index array, in addition to single integer.

A Slicer object makes “zero-copy”—it holds a reference to the underlying Seq and keeps track of indices of the selected elements. A Slicer object may be sliced again in a repeated “zoom in” fashion. Actual data elements are retrieved from the underlying Seq only when a single-element is accessed or iteration is performed. In other words, until an actual data element needs to be returned, it’s all operations on the indices.

__init__(list_: Seq[Element], range_: None | range | Seq[int] = None)[source]#

This provides a “slice” of, or “window” into, list_.

The selection of elements is represented by the optional range_, which is eithe a range such as range(3, 8), or a list of indices such as [1, 3, 5, 6]. If range_ is None, the “window” covers the entire list_. A common practice is to create a Slicer object without range_, and then access a slice of it, for example, Slicer(obj)[3:8] rather than Slicer(obj, range(3,8)).

During the use of this object, the underlying list_ must remain unchanged. Otherwise purplexing and surprising things may happen.

__len__() int[source]#

Number of elements in the current window or “slice”.

__getitem__(idx: int | slice | Seq[int])[source]#

Element access by a single index, slice, or an index array. Negative index and standard slice syntax work as expected.

Single-index access returns the requested data element. Slice and index-array accesses return a new Slicer object, which, naturally, can be sliced again, like

>>> x = list(range(30))
>>> Slicer(x)[[1, 3, 5, 6, 7, 8, 9, 13, 14]][::2][-2]
9
__iter__() Iterator[Element][source]#

Iterate over the elements in the current window or “slice”.

property raw: Seq[Element]#

Return the underlying data Seq, that is, the list_ that was passed into __init__().

property range: None | range | Seq[int]#

Return the parameter range_ that was provided to __init__(), representing the selection of items in the underlying Seq.

collect() list[Element][source]#

Return a list containing the elements in the current window. This is equivalent to list(self).

This is often used to substantiate a small slice as a list, because a slice is still a Slicer object, which does not directly reveal the data items. For example,

>>> x = list(range(30))
>>> Slicer(x)[3:11]
<Slicer into 8/30 of [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]>
>>> Slicer(x)[3:11].collect()
[3, 4, 5, 6, 7, 8, 9, 10]

(A list is used for illustration. In reality, list supports slicing directly, hence would not need Slicer.)

Warning

Do not call this on “big” data!

class biglist.Chain[source]#

Bases: Seq[Element]

This class tracks a series of Seq objects to provide random element access and iteration on the series as a whole, with zero-copy.

This class is in contrast with the standard itertools.chain, which takes iterables.

__init__(list_: Seq[Element], *lists: Seq[Element])[source]#
__len__() int[source]#
__getitem__(idx: int) Element[source]#
__iter__() Iterator[Element][source]#
property raw: tuple[Seq[Element], ...]#

Return the underlying list of Seqs.

A member Seq could be a Slicer. The current method does not follow a Slicer to its “raw” component, b/c that could represent a different set of elements than the Slicer object.

class biglist._biglist.BiglistBase[source]#

Bases: Seq[Element]

This base class contains code mainly concerning reading. The subclass Biglist adds functionalities for writing. Another subclass ParquetBiglist is read-only. Here, “reading” and “read-only” is talking about the data files. This class always needs to write meta info about the data files. In addition, the subclass Biglist also creates and manages the data files, whereas ParquetBiglist provides methods to read existing data files, treating them as read-only.

This class is generic with a parameter indicating the type of the data items, but this is useful only for the subclass Biglist. For the subclass ParquetBiglist, this parameter is essentially Any because the data items (or rows) in Parquet files are composite and flexible.

classmethod get_temp_path() Upath[source]#

If user does not specify path when calling new() (in a subclass), this method is used to determine a temporary directory.

This implementation returns a temporary location in the local file system.

Subclasses may want to customize this if they prefer other ways to find temporary locations. For example, they may want to use a temporary location in a cloud storage.

classmethod new(path: str | Path | Upath | None = None, *, keep_files: bool | None = None, init_info: dict | None = None, **kwargs) BiglistBase[source]#

Create a new object of this class (of a subclass, to be precise) and then add data to it.

Parameters:
path

A directory in which this BiglistBase will save whatever it needs to save.

The directory must be non-existent. It is not necessary to pre-create the parent directory of this path.

This path can be either on local disk or in a cloud storage.

If not specified, BiglistBase.get_temp_path() is called to determine a temporary path.

The subclass Biglist saves both data and meta-info in this path. The subclass ParquetBiglist saves meta-info only.

keep_files

If not specified, the default behavior is the following:

  • If path is None, then this is False—the temporary directory will be deleted when this BiglistBase object goes away.

  • If path is not None, i.e. user has deliberately specified a location, then this is True—files saved by this BiglistBase object will stay.

User can pass in True or False explicitly to override the default behavior.

init_info

Initial info that should be written into the info file before __init__ is called. This is in addition to whatever this method internally decides to write.

The info file info.json is written before __init__() is called. In __init__(), this file is read into self.info.

This parameter can be used to write some high-level info that __init__ needs.

If the info is not needed in __init__, then user can always add it to self.info after the object has been instantiated, hence saving general info in info.json is not the intended use of this parameter.

User rarely needs to use this parameter. It is mainly used by the internals of the method new of subclasses.

**kwargs

additional arguments are passed on to __init__().

Notes

A BiglistBase object construction is in either of the two modes below:

  1. create a new BiglistBase to store new data.

  2. create a BiglistBase object pointing to storage of existing data, which was created by a previous call to new().

In case (a), one has called new(). In case (b), one has called BiglistBase(..) (i.e. __init__()).

Some settings are applicable only in mode (a), b/c in mode (b) they can’t be changed and, if needed, should only use the value already set in mode (a). Such settings can be parameters to new() but should not be parameters to __init__(). Examples include storage_format and batch_size for the subclass Biglist. These settings typically should be taken care of in new(), before and/or after the object has been created by a call to __init__() within new().

__init__() should be defined in such a way that it works for both a barebone object that is created in this new(), as well as a fleshed out object that already has data in persistence.

Some settings may be applicable to an existing BiglistBase object, e.g., they control styles of display and not intrinsic attributes persisted along with the BiglistBase. Such settings should be parameters to __init__() but not to new(). If provided in a call to new(), these parameters will be passed on to __init__().

Subclass authors should keep these considerations in mind.

__init__(path: str | Path | Upath)[source]#
Parameters:
path

Directory that contains files written by an instance of this class.

path: Upath#

Root directory of the storage space for this object.

info: dict#

Various meta info.

__len__() int[source]#

Number of data items in this biglist.

This is an alias to num_data_items().

destroy(*, concurrent=True) None[source]#
__getitem__(idx: int) Element[source]#

Access a data item by its index; negative index works as expected.

__iter__() Iterator[Element][source]#

Iterate over all the elements.

When there are multiple data files, as the data in one file is being yielded, the next file(s) may be pre-loaded in background threads. For this reason, although the following is equivalent in the final result:

for file in self.files:
    for item in file:
        ... use item ...

it could be less efficient than iterating over self directly, as in

for item in self:
    ... use item ...
property num_data_files: int#
property num_data_items: int#
abstract property files: FileSeq[FileReader[Element]]#
class biglist.Biglist[source]#

Bases: BiglistBase[Element]

registered_storage_formats = {'json': <class 'upathlib.serializer.JsonSerializer'>, 'parquet': <class 'biglist._biglist.ParquetSerializer'>, 'pickle': <class 'upathlib.serializer.PickleSerializer'>, 'pickle-zstd': <class 'upathlib.serializer.ZstdPickleSerializer'>}#
DEFAULT_STORAGE_FORMAT = 'pickle-zstd'#
classmethod register_storage_format(name: str, serializer: type[Serializer]) None[source]#

Register a new serializer to handle data file dumping and loading.

This class has a few serializers registered out of the box. They should be adequate for most applications.

Parameters:
name

Name of the format to be associated with the new serializer.

After registering the new serializer with name “xyz”, one can use storage_format='xyz' in calls to new(). When reading the object back from persistence, make sure this registry is also in place so that the correct deserializer can be found.

serializer

A subclass of upathlib.serializer.Serializer.

Although this class needs to provide the Serializer API, it is possible to write data files in text mode. The registered ‘json’ format does that.

classmethod new(path: str | Path | Upath | None = None, *, batch_size: int | None = None, storage_format: str | None = None, serialize_kwargs: dict | None = None, deserialize_kwargs: dict | None = None, init_info: dict | None = None, **kwargs) Self[source]#
Parameters:
path

Passed on to BiglistBase.new().

batch_size

Max number of data elements in each persisted data file.

There’s no good default value for this parameter, although one is provided (currently the default is 1000), because the code of new() doesn’t know the typical size of the data elements. User is recommended to specify the value of this parameter.

In choosing a value for batch_size, the most important consideration is the size of each data file, which is determined by the typical size of the data elements as well as batch_size, which is the upper bound of the the number of elements in each file.

There are several considerations about the data file sizes:

  • It should not be so small that the file reading/writing is a large overhead relative to actual processing of the data. This is especially important when path is cloud storage.

  • It should not be so large that it is “unwieldy”, e.g. approaching 1GB.

  • When __iter__()ating over a Biglist object, there can be up to (by default) 4 files-worth of data in memory at any time, where 4 is self._n_read_threads plus 1.

  • When append()ing or extend()ing to a Biglist object at high speed, there can be up to (by default) 9 times batch_size data elements in memory at any time, where 9 is self._n_write_threads plus 1. See _flush() and Dumper.

Another consideration is access pattern of elements in the Biglist. If there are many “jumping around” with random element access, large data files will lead to very wasteful file loading, because to read any element, its hosting file must be read into memory. (After all, if the application is heavy on random access, then Biglist is not the right tool.)

The performance of iteration is not expected to be highly sensitive to the value of batch_size, as long as it is in a reasonable range.

A rule of thumb: it is recommended to keep the persisted files between 32-128MB in size. (Note: no benchmark was performed to back this recommendation.)

storage_format

This should be a key in registered_storage_formats. If not specified, DEFAULT_STORAGE_FORMAT is used.

serialize_kwargs

Additional keyword arguments to the serialization function.

deserialize_kwargs

Additional keyword arguments to the deserialization function.

serialize_kwargs and deserialize_kwargs are rarely needed. One use case is schema when storage format is “parquet”. See ParquetSerializer.

serialize_kwargs and deserialize_kwargs, if not None, will be saved in the “info.json” file, hence they must be JSON serializable, meaning they need to be the few simple native Python types that are supported by the standard json library. (However, the few formats “natively” supported by Biglist may get special treatment to relax this requirement.) If this is not possible, the solution is to define a custom serialization class and register it with register_storage_format().

**kwargs

additional arguments are passed on to BiglistBase.new().

Returns:
Biglist

A new Biglist object.

__init__(*args, **kwargs)[source]#

Please see the base class for additional documentation.

keep_files: bool#

Indicates whether the persisted files should be kept or deleted when the object is garbage-collected.

property batch_size: int#

The max number of data items in one data file.

property data_path: Upath#
property storage_format: str#

The value of storage_format used in new(), either user-specified or the default value.

property storage_version: int#

The internal format used in persistence. This is a read-only attribute for information only.

__len__() int[source]#

Number of data items in this biglist.

If data is being appended to this biglist, then this method only includes the items that have been “flushed” to storage. Data items in the internal memory buffer are not counted. The buffer is empty upon calling _flush() (internally and automatically) or flush() (explicitly by user).

Changed in version 0.7.4: In previous versions, this count includes items that are not yet flushed.

__getitem__(idx: int) Element[source]#

Access a data item by its index; negative index works as expected.

Items not yet “flushed” are not accessible by this method. They are considered “invisible” to this method. Similarly, negative idx operates in the range of flushed items only.

Changed in version 0.7.4: In previous versions, the accessible items include those that are not yet flushed.

__iter__() Iterator[Element][source]#

Iterate over all the elements.

Items that are not yet “flushed” are invisible to this iteration.

Changed in version 0.7.4: In previous versions, this iteration includes those items that are not yet flushed.

append(x: Element) None[source]#

Append a single element to the Biglist.

In implementation, this appends to an in-memory buffer. Once the buffer size reaches batch_size, the buffer’s content will be persisted as a new data file, and the buffer will re-start empty. In other words, whenever the buffer is non-empty, its content is not yet persisted.

You can append data to a common biglist from multiple processes. In the processes, use independent Biglist objects that point to the same “path”. Each of the objects will maintain its own in-memory buffer and save its own files once the buffer fills up. Remember to flush() at the end of work in each process.

extend(x: Iterable[Element]) None[source]#

This simply calls append() repeatedly.

make_file_name(buffer_len: int, extra: str = '') str[source]#

This method constructs the file name of a data file. If you need to customize this method for any reason, you should do it via extra and keep the other patterns unchanged. The string extra will appear between other fixed patterns in the file name.

One possible usecase is this: in distributed writing, you want files written by different workers to be distinguishable by the file names. Do something like this:

def worker(datapath: str, worker_id: str, ...):
    out = Biglist(datapath)
    _make_file_name = out.make_file_name
    out.make_file_name = lambda buffer_len: _make_file_name(buffer_len, worker_id)
    ...
flush(*, lock_timeout=300, raise_on_write_error: bool = True) None[source]#

_flush() is called automatically whenever the “append buffer” is full, so to persist the data and empty the buffer. (The capacity of this buffer is equal to self.batch_size.) However, if this buffer is only partially filled when the user is done adding elements to the biglist, the data in the buffer will not be persisted. This is the first reason that user should call flush when they are done adding data (via append() or extend()).

Although _flush() creates new data files, it does not update the “meta info file” (info.json in the root of self.path) to include the new data files; it only updates the in-memory self.info. This is for efficiency reasons, because updating info.json involves locking.

Updating info.json to include new data files (created due to append() and extend()) is performed by flush(). This is the second reason that user should call flush() at the end of their data writting session, regardless of whether all the new data have been persisted in data files. (They would be if their count happens to be a multiple of self.batch_size.)

If there are multiple workers adding data to this biglist at the same time (from multiple processes or machines), data added by one worker will not be visible to another worker until the writing worker calls flush() and the reading worker calls reload().

Further, user should assume that data not yet persisted (i.e. still in “append buffer”) are not visible to data reading via __getitem__() or __iter__() and not included in __len__(), even to the same worker. In common use cases, we do not start reading data until we’re done adding data to the biglist (at least “for now”), hence this is not a big issue.

In summary, call flush() when

  • You are done adding data (for this “session”)

  • or you need to start reading data

flush() has overhead. You should call it only in the two situations above. Do not call it frequently “just to be safe”.

After a call to flush(), there’s no problem to add more elements again by append() or extend(). Data files created by flush() with less than batch_size elements will stay as is among larger files. This is a legitimate case in parallel or distributed writing, or writing in multiple sessions.

reload() None[source]#

Reload the meta info.

This is used in this scenario: suppose we have this object pointing to a biglist on the local disk; another object in another process is appending data to the same biglist (that is, it points to the same storage location); then after a while, the meta info file on the disk has been modified by the other process, hence the current object is out-dated; calling this method will bring it up to date. The same idea applies if the storage is in the cloud, and another machine is appending data to the same remote biglist.

Creating a new object pointing to the same storage location would achieve the same effect.

property files#
class biglist.BiglistFileReader[source]#

Bases: FileReader[Element]

__init__(path: str | Path | Upath, loader: Callable[[Upath], Any])[source]#
Parameters:
path

Path of a data file.

loader

A function that will be used to load the data file. This must be pickle-able. Usually this is the bound method load of a subclass of upathlib.serializer.Serializer. If you customize this, please see the doc of FileReader.

load() None[source]#

This method eagerly loads all the data from the file into memory.

Once this method has been called, subsequent data consumption should all draw upon this in-memory copy. However, if the data file is large, and especially if only part of the data is of interest, calling this method may not be the best approach. This all depends on the specifics of the subclass.

A subclass may allow consuming the data and load parts of data in a “as-needed” or “streaming” fashion. In that approach, __getitem__() and __iter__() do not require this method to be called (although they may take advantage of the in-memory data if this method has been called.).

data() list[Element][source]#

Return the data loaded from the file.

__len__() int[source]#
__getitem__(idx: int) Element[source]#
__iter__() Iterator[Element][source]#
class biglist.ParquetBiglist[source]#

Bases: BiglistBase

ParquetBiglist defines a kind of “external biglist”, that is, it points to pre-existing Parquet files and provides facilities to read them.

As long as you use a ParquetBiglist object to read, it is assumed that the dataset (all the data files) have not changed since the object was created by new().

classmethod new(data_path: str | Path | Upath | Sequence[str | Path | Upath], path: str | Path | Upath | None = None, *, suffix: str = '.parquet', **kwargs) ParquetBiglist[source]#

This classmethod gathers info of the specified data files and saves the info to facilitate reading the data files. The data files remain “external” to the ParquetBiglist object; the “data” persisted and managed by the ParquetBiglist object are the meta info about the Parquet data files.

If the number of data files is small, it’s feasible to create a temporary object of this class (by leaving path at the default value None) “on-the-fly” for one-time use.

Parameters:
path

Passed on to BiglistBase.new() of BiglistBase.

data_path

Parquet file(s) or folder(s) containing Parquet files.

If this is a single path, then it’s either a Parquet file or a directory. If this is a list, each element is either a Parquet file or a directory; there can be a mix of files and directories. Directories are traversed recursively for Parquet files. The paths can be local, or in the cloud, or a mix of both.

Once the info of all Parquet files are gathered, their order is fixed as far as this ParquetBiglist is concerned. The data sequence represented by this ParquetBiglist follows this order of the files. The order is determined as follows:

The order of the entries in data_path is preserved; if any entry is a directory, the files therein (recursively) are sorted by the string value of each file’s full path.

suffix

Only files with this suffix will be included. To include all files, use suffix='*'.

**kwargs

additional arguments are passed on to __init__().

__init__(*args, **kwargs)[source]#

Please see doc of the base class.

keep_files: bool#

Indicates whether the meta info persisted by this object should be kept or deleted when this object is garbage-collected.

This does not affect the external Parquet data files.

property storage_version: int#
property files#
class biglist.ParquetFileReader[source]#

Bases: FileReader

classmethod get_gcsfs(*, good_for_seconds=600) GcsFileSystem[source]#

Obtain a pyarrow.fs.GcsFileSystem object with credentials given so that the GCP default process of inferring credentials (which involves env vars and file reading etc) will not be triggered.

This is provided under the (un-verified) assumption that the default credential inference process is a high overhead.

classmethod load_file(path: Upath) ParquetFile[source]#

This reads meta info and constructs a pyarrow.parquet.ParquetFile object. This does not load the entire file. See load() for eager loading.

Parameters:
path

Path of the file.

__init__(path: str | Path | Upath)[source]#
Parameters:
path

Path of a Parquet file.

property scalar_as_py: bool#

scalar_as_py controls whether the values returned by __getitem__() (or indirectly by __iter__()) are converted from a pyarrow.Scalar type such as pyarrow.lib.StringScalar to a Python builtin type such as str.

This property can be toggled anytime to take effect until it is toggled again.

Getter:

Returns this property’s value.

Setter:

Sets this property’s value.

__len__() int[source]#
load() None[source]#

Eagerly read the whole file into memory as a table.

property file: ParquetFile#

Return a pyarrow.parquet.ParquetFile object.

Upon initiation of a ParquetFileReader object, the file is not read at all. When this property is requested, the file is accessed to construct a pyarrow.parquet.ParquetFile object.

property metadata: FileMetaData#
property num_rows: int#
property num_row_groups: int#
property num_columns: int#
property column_names: list[str]#
data() ParquetBatchData[source]#

Return the entire data in the file.

__getitem__(idx: int)[source]#

Get one row (or “record”).

If the object has a single column, then return its value in the specified row. If the object has multiple columns, return a dict with column names as keys. The values are converted to Python builtin types if scalar_as_py is True.

Parameters:
idx

Row index in this file. Negative value counts from the end as expected.

__iter__()[source]#

Iterate over rows. The type of yielded individual elements is the same as the return of __getitem__().

iter_batches(batch_size=10000) Iterator[ParquetBatchData][source]#
row_group(idx: int) ParquetBatchData[source]#
Parameters:
idx

Index of the row group of interest.

columns(cols: Sequence[str]) ParquetFileReader[source]#

Return a new ParquetFileReader object that will only load the specified columns.

The columns of interest have to be within currently available columns. In other words, a series of calls to this method would incrementally narrow down the selection of columns. (Note this returns a new ParquetFileReader, hence one can call columns() again on the returned object.)

This method “slices” the data by columns, in contrast to other data access methods that select rows.

Parameters:
cols

Names of the columns to select.

Examples

>>> obj = ParquetFileReader('file_path')  
>>> obj1 = obj.columns(['a', 'b', 'c'])  
>>> print(obj1[2])  
>>> obj2 = obj1.columns(['b', 'c'])  
>>> print(obj2[3])  
>>> obj3 = obj.columns(['d'])  
>>> for v in obj:  
>>>     print(v)  
column(idx_or_name: int | str) ChunkedArray[source]#

Select a single column.

Note: while columns() returns a new ParquetFileReader, column() returns a pyarrow.ChunkedArray.

class biglist.ParquetBatchData[source]#

Bases: Seq

ParquetBatchData wraps a pyarrow.Table or pyarrow.RecordBatch. The data is already in memory; this class does not involve file reading.

ParquetFileReader.data() and ParquetFileReader.iter_batches() both return or yield ParquetBatchData. In addition, the method columns() of this class returns a new object of this class.

Objects of this class can be pickled.

__init__(data: Table | RecordBatch)[source]#
scalar_as_py#

Indicate whether scalar values should be converted to Python types from pyarrow types.

data() Table | RecordBatch[source]#

Return the underlying pyarrow data.

__len__() int[source]#
__getitem__(idx: int)[source]#

Get one row (or “record”).

If the object has a single column, then return its value in the specified row. If the object has multiple columns, return a dict with column names as keys. The values are converted to Python builtin types if scalar_as_py is True.

Parameters:
idx

Row index in this batch. Negative value counts from the end as expected.

__iter__()[source]#

Iterate over rows. The type of yielded individual elements is the same as __getitem__().

columns(cols: Sequence[str]) ParquetBatchData[source]#

Return a new ParquetBatchData object that only contains the specified columns.

The columns of interest have to be within currently available columns. In other words, a series of calls to this method would incrementally narrow down the selection of columns. (Note this returns a new ParquetBatchData, hence one can call columns() again on the returned object.)

This method “slices” the data by columns, in contrast to other data access methods that select rows.

Parameters:
cols

Names of the columns to select.

Examples

>>> obj = ParquetBatchData(parquet_table)  
>>> obj1 = obj.columns(['a', 'b', 'c'])  
>>> print(obj1[2])  
>>> obj2 = obj1.columns(['b', 'c'])  
>>> print(obj2[3])  
>>> obj3 = obj.columns(['d'])  
>>> for v in obj:  
>>>     print(v)  
column(idx_or_name: int | str) Array | ChunkedArray[source]#

Select a single column specified by name or index.

If self._data is pyarrow.Table, return pyarrow.ChunkedArray. If self._data is pyarrow.RecordBatch, return pyarrow.Array.

biglist.make_parquet_schema(fields_spec: Iterable[Sequence])[source]#

This function constructs a pyarrow schema that is expressed by simple Python types that can be json-serialized.

fields_spec is a list or tuple, each of its elements accepted by make_parquet_field().

This function is motivated by the need of ParquetSerializer. When biglist.Biglist uses a “storage-format” that takes options (such as ‘parquet’), these options can be passed into biglist.Biglist.new() (via serialize_kwargs and deserialize_kwargs) and saved in “info.json”. However, this requires the options to be json-serializable. Therefore, the argument schema to ParquetSerializer.serialize() can not be used by this mechanism. As an alternative, user can use the argument schema_spec; this argument can be saved in “info.json”, and it is handled by this function.

biglist.make_parquet_field(field_spec: Sequence)[source]#

filed_spec is a list or tuple with 2, 3, or 4 elements. The first element is the name of the field. The second element is the spec of the type, to be passed to function make_parquet_type(). Additional elements are the optional nullable and metadata to the function pyarrow.field().

biglist.make_parquet_type(type_spec: str | Sequence)[source]#

type_spec is a spec of arguments to one of pyarrow’s data type factory functions.

For simple types, this may be just the type name (or function name), e.g. 'bool_', 'string', 'float64'.

For type functions expecting arguments, this is a list or tuple with the type name followed by other arguments, for example,

('time32', 's')
('decimal128', 5, -3)

For compound types (types constructed by other types), this is a “recursive” structure, such as

('list_', 'int64')
('list_', ('time32', 's'), 5)

where the second element is the spec for the member type, or

('map_', 'string', ('list_', 'int64'), True)

where the second and third elements are specs for the key type and value type, respectively, and the fourth element is the optional argument keys_sorted to pyarrow.map_(). Below is an example of a struct type:

('struct', [('name', 'string', False), ('age', 'uint8', True), ('income', ('struct', (('currency', 'string'), ('amount', 'uint64'))), False)])

Here, the second element is the list of fields in the struct. Each field is expressed by a spec that is taken by make_parquet_field().

biglist.read_parquet_file(path: str | Path | Upath) ParquetFileReader[source]#
Parameters:
path

Path of the file.

biglist.write_arrays_to_parquet(data: Sequence[Array | ChunkedArray | Iterable], path: str | Path | Upath, *, names: Sequence[str], **kwargs) None[source]#
Parameters:
path

Path of the file to create and write to.

data

A list of data arrays.

names

List of names for the arrays in data.

**kwargs

Passed on to pyarrow.parquet.write_table().

biglist.write_pylist_to_parquet(data: Sequence, path: str | Path | Upath, *, schema=None, schema_spec=None, metadata=None, **kwargs)[source]#
class biglist._biglist.ParquetSerializer[source]#

Bases: Serializer

classmethod serialize(x: list[dict], schema: Schema | None = None, schema_spec: Sequence | None = None, metadata=None, **kwargs)[source]#

x is a list of data items. Each item is a dict. In the output Parquet file, each item is a “row”.

The content of the item dict should follow a regular pattern. Not every structure is supported. The data x must be acceptable to pyarrow.Table.from_pylist. If unsure, use a list with a couple data elements and experiment with pyarrow.Table.from_pylist directly.

When using storage_format='parquet' for Biglist, each data element is a dict with a consistent structure that is acceptable to pyarrow.Table.from_pylist. When reading the Biglist, the original Python data elements are returned. (A record read out may not be exactly equal to the original that was written, in that elements that were missing in a record when written may have been filled in with None when read back out.) In other words, the reading is not like that of ParquetBiglist. You can always create a separate ParquetBiglist for the data files of the Biglist in order to use Parquet-style data reading. The data files are valid Parquet files.

If neither schema nor schema_spec is specified, then the data schema is auto-inferred based on the first element of x. If this does not work, you can specify either schema or schema_spec. The advantage of schema_spec is that it is json-serializable Python types, hence can be passed into Biglist.new() via serialize_kwargs and saved in “info.json” of the biglist.

If schema_spec is not flexible or powerful enough for your usecase, then you may have to use schema.

classmethod deserialize(y: bytes, **kwargs)[source]#

Indices and tables#