diff --git a/src/bw_processing/array_creation.py b/src/bw_processing/array_creation.py index cddcc2a..e9ccd8a 100644 --- a/src/bw_processing/array_creation.py +++ b/src/bw_processing/array_creation.py @@ -21,46 +21,45 @@ def chunked(iterable, chunk_size): return iter(lambda: list(itertools.islice(iterable, chunk_size)), []) -def create_chunked_structured_array(iterable, dtype, bucket_size=20000): - """Create a numpy structured array from an iterable of indeterminate length. +def create_chunked(iterable, dtype, ncols=None, bucket_size=500): + """Create a numpy array from an iterable of indeterminate length. + + Needed when we can't determine the length of the iterable ahead of time + (e.g. for a generator or a database cursor), so can't create the complete + array in memory in one step. - Needed when we can't determine the length of the iterable ahead of time (e.g. for a generator or a database cursor), so can't create the complete array in memory in on step + Creates a list of arrays with ``bucket_size`` rows until ``iterable`` is + exhausted, then concatenates them along axis 0. - Creates a list of arrays with ``bucket_size`` rows until ``iterable`` is exhausted, then concatenates them. + Pass ``ncols`` for a plain 2D array; omit it for a 1D structured array. Args: iterable: Iterable of data used to populate the array. - dtype: Numpy dtype of the created array - format_function: If provided, this function will be called on each row of ``iterable`` before insertion in the array. + dtype: Numpy dtype of the created array. + ncols: Number of columns; if None, a 1D structured array is created. bucket_size: Number of rows in each intermediate array. - Returns:. - Returns the created array. Will return a zero-length array if ``iterable`` has no data. - + Returns: + The created array. Returns a zero-length array if ``iterable`` has no data. """ + bucket_shape = (bucket_size,) if ncols is None else (bucket_size, ncols) + empty_shape = (0,) if ncols is None else (0, ncols) arrays = [] - array = np.zeros(bucket_size, dtype=dtype) - + array = np.zeros(bucket_shape, dtype=dtype) for chunk in chunked(iterable, bucket_size): for i, row in enumerate(chunk): array[i] = row if i < bucket_size - 1: - array = array[: i + 1] - arrays.append(array) + # .copy() releases the oversized bucket buffer immediately rather + # than keeping it alive as a view until the final concatenation. + arrays.append(array[: i + 1].copy()) else: arrays.append(array) - array = np.zeros(bucket_size, dtype=dtype) - - # Empty iterable - create zero-length array + array = np.zeros(bucket_shape, dtype=dtype) + # Empty iterable - create zero-length array. # Needed because we return iterators for SQL databases - # but don't know if e.g. sometime a database has - # no biosphere exchanges - if arrays: - array = np.hstack(arrays) - else: - array = np.zeros(0, dtype=dtype) - - return array + # but don't know if e.g. sometimes a database has no biosphere exchanges. + return np.concatenate(arrays, axis=0) if arrays else np.zeros(empty_shape, dtype=dtype) def create_structured_array(iterable, dtype, nrows=None, sort=False, sort_fields=None): @@ -80,7 +79,7 @@ def create_structured_array(iterable, dtype, nrows=None, sort=False, sort_fields array[i] = tuple(row) else: - array = create_chunked_structured_array(iterable, dtype) + array = create_chunked(iterable, dtype) if sort: sort_fields = sort_fields or () @@ -93,44 +92,6 @@ def create_structured_array(iterable, dtype, nrows=None, sort=False, sort_fields return array -def create_chunked_array(iterable, ncols, dtype=np.float32, bucket_size=500): - """Create a numpy array from an iterable of indeterminate length. - - Needed when we can't determine the length of the iterable ahead of time (e.g. for a generator or a database cursor), so can't create the complete array in memory in on step - - Creates a list of arrays with ``bucket_size`` rows until ``iterable`` is exhausted, then concatenates them. - - Args: - iterable: Iterable of data used to populate the array. - ncols: Number of columns in the created array. - dtype: Numpy dtype of the created array - bucket_size: Number of rows in each intermediate array. - - Returns:. - Returns the created array. Will return a zero-length array if ``iterable`` has no data. - - """ - arrays = [] - array = np.zeros((bucket_size, ncols), dtype=dtype) - - for chunk in chunked(iterable, bucket_size): - for i, row in enumerate(chunk): - array[i, :] = row - if i < bucket_size - 1: - array = array[: i + 1, :] - arrays.append(array) - else: - arrays.append(array) - array = np.zeros((bucket_size, ncols), dtype=dtype) - - if arrays: - array = np.hstack(arrays) - else: - array = np.zeros((0, ncols), dtype=dtype) - - return array - - def create_array(iterable, nrows=None, dtype=np.float32): """Create a numpy array data ``iterable``. Returns a filepath of a created file (if ``filepath`` is provided, or the array. @@ -156,6 +117,6 @@ def create_array(iterable, nrows=None, dtype=np.float32): ncols, data = get_ncols(iterable) except StopIteration: return np.zeros((0, 0), dtype=dtype) - array = create_chunked_array(data, ncols, dtype) + array = create_chunked(data, dtype, ncols=ncols) return array diff --git a/tests/test_array_creation.py b/tests/test_array_creation.py index d6b5424..44ab1d5 100644 --- a/tests/test_array_creation.py +++ b/tests/test_array_creation.py @@ -1,6 +1,10 @@ import numpy as np -from bw_processing.array_creation import chunked, create_array +from bw_processing.array_creation import ( + chunked, + create_array, + create_chunked, +) def test_chunked(): @@ -34,3 +38,60 @@ def test_create_array_nonempty_generator(): assert result.shape == (5, 2) assert np.allclose(result[:, 0], [0, 1, 2, 3, 4]) assert np.allclose(result[:, 1], [0, 2, 4, 6, 8]) + + +# --- create_chunked (plain 2D) --- + +def test_create_chunked_under_one_bucket(): + data = [np.ones(3) * i for i in range(10)] + result = create_chunked(iter(data), np.float32, ncols=3, bucket_size=500) + assert result.shape == (10, 3) + assert np.allclose(result[7], [7, 7, 7]) + + +def test_create_chunked_multiple_full_buckets(): + # Previously hstack joined on axis=1 → (bucket_size, ncols*n) instead of (total, ncols) + data = [np.ones(3) * i for i in range(1000)] + result = create_chunked(iter(data), np.float32, ncols=3, bucket_size=500) + assert result.shape == (1000, 3) + assert np.allclose(result[0], [0, 0, 0]) + assert np.allclose(result[999], [999, 999, 999]) + + +def test_create_chunked_full_plus_partial_bucket(): + # Previously hstack raised ValueError when a full and partial chunk were concatenated + data = [np.ones(3) * i for i in range(600)] + result = create_chunked(iter(data), np.float32, ncols=3, bucket_size=500) + assert result.shape == (600, 3) + assert np.allclose(result[499], [499, 499, 499]) + assert np.allclose(result[599], [599, 599, 599]) + + +def test_create_chunked_empty_plain(): + result = create_chunked(iter([]), np.float32, ncols=4, bucket_size=500) + assert result.shape == (0, 4) + + +# --- create_chunked (structured 1D) --- + +SIMPLE_DTYPE = [("row", np.int32), ("col", np.int32), ("amount", np.float32)] + + +def test_create_chunked_structured_under_one_bucket(): + data = [(i, i + 1, float(i)) for i in range(10)] + result = create_chunked(iter(data), SIMPLE_DTYPE, bucket_size=20000) + assert result.shape == (10,) + assert list(result["row"]) == list(range(10)) + + +def test_create_chunked_structured_multiple_buckets(): + data = [(i, i + 1, float(i)) for i in range(500)] + result = create_chunked(iter(data), SIMPLE_DTYPE, bucket_size=200) + assert result.shape == (500,) + assert result["row"][0] == 0 + assert result["row"][499] == 499 + + +def test_create_chunked_structured_empty(): + result = create_chunked(iter([]), SIMPLE_DTYPE) + assert result.shape == (0,)