Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 25 additions & 64 deletions src/bw_processing/array_creation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,46 +21,45 @@ def chunked(iterable, chunk_size):
return iter(lambda: list(itertools.islice(iterable, chunk_size)), [])


def create_chunked_structured_array(iterable, dtype, bucket_size=20000):
"""Create a numpy structured array from an iterable of indeterminate length.
def create_chunked(iterable, dtype, ncols=None, bucket_size=500):
"""Create a numpy array from an iterable of indeterminate length.

Needed when we can't determine the length of the iterable ahead of time
(e.g. for a generator or a database cursor), so can't create the complete
array in memory in one step.

Needed when we can't determine the length of the iterable ahead of time (e.g. for a generator or a database cursor), so can't create the complete array in memory in on step
Creates a list of arrays with ``bucket_size`` rows until ``iterable`` is
exhausted, then concatenates them along axis 0.

Creates a list of arrays with ``bucket_size`` rows until ``iterable`` is exhausted, then concatenates them.
Pass ``ncols`` for a plain 2D array; omit it for a 1D structured array.

Args:
iterable: Iterable of data used to populate the array.
dtype: Numpy dtype of the created array
format_function: If provided, this function will be called on each row of ``iterable`` before insertion in the array.
dtype: Numpy dtype of the created array.
ncols: Number of columns; if None, a 1D structured array is created.
bucket_size: Number of rows in each intermediate array.

Returns:.
Returns the created array. Will return a zero-length array if ``iterable`` has no data.

Returns:
The created array. Returns a zero-length array if ``iterable`` has no data.
"""
bucket_shape = (bucket_size,) if ncols is None else (bucket_size, ncols)
empty_shape = (0,) if ncols is None else (0, ncols)
arrays = []
array = np.zeros(bucket_size, dtype=dtype)

array = np.zeros(bucket_shape, dtype=dtype)
for chunk in chunked(iterable, bucket_size):
for i, row in enumerate(chunk):
array[i] = row
if i < bucket_size - 1:
array = array[: i + 1]
arrays.append(array)
# .copy() releases the oversized bucket buffer immediately rather
# than keeping it alive as a view until the final concatenation.
arrays.append(array[: i + 1].copy())
else:
arrays.append(array)
array = np.zeros(bucket_size, dtype=dtype)

# Empty iterable - create zero-length array
array = np.zeros(bucket_shape, dtype=dtype)
# Empty iterable - create zero-length array.
# Needed because we return iterators for SQL databases
# but don't know if e.g. sometime a database has
# no biosphere exchanges
if arrays:
array = np.hstack(arrays)
else:
array = np.zeros(0, dtype=dtype)

return array
# but don't know if e.g. sometimes a database has no biosphere exchanges.
return np.concatenate(arrays, axis=0) if arrays else np.zeros(empty_shape, dtype=dtype)


def create_structured_array(iterable, dtype, nrows=None, sort=False, sort_fields=None):
Expand All @@ -80,7 +79,7 @@ def create_structured_array(iterable, dtype, nrows=None, sort=False, sort_fields
array[i] = tuple(row)

else:
array = create_chunked_structured_array(iterable, dtype)
array = create_chunked(iterable, dtype)

if sort:
sort_fields = sort_fields or ()
Expand All @@ -93,44 +92,6 @@ def create_structured_array(iterable, dtype, nrows=None, sort=False, sort_fields
return array


def create_chunked_array(iterable, ncols, dtype=np.float32, bucket_size=500):
"""Create a numpy array from an iterable of indeterminate length.

Needed when we can't determine the length of the iterable ahead of time (e.g. for a generator or a database cursor), so can't create the complete array in memory in on step

Creates a list of arrays with ``bucket_size`` rows until ``iterable`` is exhausted, then concatenates them.

Args:
iterable: Iterable of data used to populate the array.
ncols: Number of columns in the created array.
dtype: Numpy dtype of the created array
bucket_size: Number of rows in each intermediate array.

Returns:.
Returns the created array. Will return a zero-length array if ``iterable`` has no data.

"""
arrays = []
array = np.zeros((bucket_size, ncols), dtype=dtype)

for chunk in chunked(iterable, bucket_size):
for i, row in enumerate(chunk):
array[i, :] = row
if i < bucket_size - 1:
array = array[: i + 1, :]
arrays.append(array)
else:
arrays.append(array)
array = np.zeros((bucket_size, ncols), dtype=dtype)

if arrays:
array = np.hstack(arrays)
else:
array = np.zeros((0, ncols), dtype=dtype)

return array


def create_array(iterable, nrows=None, dtype=np.float32):
"""Create a numpy array data ``iterable``. Returns a filepath of a created file (if ``filepath`` is provided, or the array.

Expand All @@ -156,6 +117,6 @@ def create_array(iterable, nrows=None, dtype=np.float32):
ncols, data = get_ncols(iterable)
except StopIteration:
return np.zeros((0, 0), dtype=dtype)
array = create_chunked_array(data, ncols, dtype)
array = create_chunked(data, dtype, ncols=ncols)

return array
63 changes: 62 additions & 1 deletion tests/test_array_creation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import numpy as np

from bw_processing.array_creation import chunked, create_array
from bw_processing.array_creation import (
chunked,
create_array,
create_chunked,
)


def test_chunked():
Expand Down Expand Up @@ -34,3 +38,60 @@ def test_create_array_nonempty_generator():
assert result.shape == (5, 2)
assert np.allclose(result[:, 0], [0, 1, 2, 3, 4])
assert np.allclose(result[:, 1], [0, 2, 4, 6, 8])


# --- create_chunked (plain 2D) ---

def test_create_chunked_under_one_bucket():
data = [np.ones(3) * i for i in range(10)]
result = create_chunked(iter(data), np.float32, ncols=3, bucket_size=500)
assert result.shape == (10, 3)
assert np.allclose(result[7], [7, 7, 7])


def test_create_chunked_multiple_full_buckets():
# Previously hstack joined on axis=1 → (bucket_size, ncols*n) instead of (total, ncols)
data = [np.ones(3) * i for i in range(1000)]
result = create_chunked(iter(data), np.float32, ncols=3, bucket_size=500)
assert result.shape == (1000, 3)
assert np.allclose(result[0], [0, 0, 0])
assert np.allclose(result[999], [999, 999, 999])


def test_create_chunked_full_plus_partial_bucket():
# Previously hstack raised ValueError when a full and partial chunk were concatenated
data = [np.ones(3) * i for i in range(600)]
result = create_chunked(iter(data), np.float32, ncols=3, bucket_size=500)
assert result.shape == (600, 3)
assert np.allclose(result[499], [499, 499, 499])
assert np.allclose(result[599], [599, 599, 599])


def test_create_chunked_empty_plain():
result = create_chunked(iter([]), np.float32, ncols=4, bucket_size=500)
assert result.shape == (0, 4)


# --- create_chunked (structured 1D) ---

SIMPLE_DTYPE = [("row", np.int32), ("col", np.int32), ("amount", np.float32)]


def test_create_chunked_structured_under_one_bucket():
data = [(i, i + 1, float(i)) for i in range(10)]
result = create_chunked(iter(data), SIMPLE_DTYPE, bucket_size=20000)
assert result.shape == (10,)
assert list(result["row"]) == list(range(10))


def test_create_chunked_structured_multiple_buckets():
data = [(i, i + 1, float(i)) for i in range(500)]
result = create_chunked(iter(data), SIMPLE_DTYPE, bucket_size=200)
assert result.shape == (500,)
assert result["row"][0] == 0
assert result["row"][499] == 499


def test_create_chunked_structured_empty():
result = create_chunked(iter([]), SIMPLE_DTYPE)
assert result.shape == (0,)
Loading