diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 4287b6c9..6cd96367 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -12,6 +12,8 @@ jobs: build_wheels: name: Build and test on ${{ matrix.os }}${{ matrix.numpy-version && format(' (numpy {0})', matrix.numpy-version) || '' }} runs-on: ${{ matrix.os }} + env: + CMAKE_GENERATOR: Ninja strategy: matrix: os: [ubuntu-latest, windows-latest, macos-latest] @@ -30,15 +32,65 @@ jobs: with: python-version: ${{ matrix.python-version }} + - name: Install sccache (Windows) + if: runner.os == 'Windows' + run: choco install sccache --yes + + - name: Cache sccache (Windows) + if: runner.os == 'Windows' + uses: actions/cache@v4 + with: + path: C:\Users\runneradmin\AppData\Local\sccache + key: sccache-${{ runner.os }}-${{ github.sha }} + restore-keys: | + sccache-${{ runner.os }}- + + - name: Cache pip (Windows) + if: runner.os == 'Windows' + uses: actions/cache@v4 + with: + path: C:\Users\runneradmin\AppData\Local\pip\Cache + key: pip-${{ runner.os }}-${{ hashFiles('pyproject.toml') }} + restore-keys: | + pip-${{ runner.os }}- + - name: Install Ninja uses: seanmiddleditch/gha-setup-ninja@master + - name: Add LLVM to PATH (Windows) + if: runner.os == 'Windows' + run: echo "C:\\Program Files\\LLVM\\bin" >> $env:GITHUB_PATH + - name: Install specific numpy version if: matrix.numpy-version run: pip install "numpy==${{ matrix.numpy-version }}.*" - - name: Build + - name: Build (Windows) + if: runner.os == 'Windows' + id: build_windows + run: pip install -e .[test] + env: + CMAKE_C_COMPILER_LAUNCHER: sccache + CMAKE_CXX_COMPILER_LAUNCHER: sccache + SCCACHE_DIR: C:\Users\runneradmin\AppData\Local\sccache + CC: clang-cl + CXX: clang-cl + CMAKE_BUILD_PARALLEL_LEVEL: 8 + SKBUILD_PARALLEL_LEVEL: 8 + + - name: Build (non-Windows) + if: runner.os != 'Windows' + id: build_non_windows run: pip install -e .[test] - - name: Test + - name: Test (Windows) + if: runner.os == 'Windows' + run: python -m pytest -m "not heavy and (network or not network)" +# env: +# BLOSC_NTHREADS: "1" +# NUMEXPR_NUM_THREADS: "1" +# OMP_NUM_THREADS: "1" + + - name: Test (non-Windows) + if: runner.os != 'Windows' run: python -m pytest -m "not heavy and (network or not network)" diff --git a/.github/workflows/cibuildwheels.yml b/.github/workflows/cibuildwheels.yml index f4bb64eb..3444946a 100644 --- a/.github/workflows/cibuildwheels.yml +++ b/.github/workflows/cibuildwheels.yml @@ -17,6 +17,11 @@ env: # Skip PyPy wheels for now (numexpr needs some adjustments first) # musllinux takes too long to build, and it's not worth it for now CIBW_SKIP: "pp* *musllinux* *-win32" + # Use explicit generator/compiler env vars; CMAKE_ARGS with spaces is not split on Windows. + CIBW_ENVIRONMENT_WINDOWS: >- + CMAKE_GENERATOR=Ninja + CC=clang-cl + CXX=clang-cl jobs: @@ -77,6 +82,10 @@ jobs: id: ninja uses: turtlesec-no/get-ninja@main + - name: Add LLVM to PATH (Windows) + if: ${{ matrix.os == 'windows-latest' }} + run: echo "C:\\Program Files\\LLVM\\bin" >> $env:GITHUB_PATH + - name: Install MSVC amd64 uses: ilammy/msvc-dev-cmd@v1 with: diff --git a/.github/workflows/wasm.yml b/.github/workflows/wasm.yml index ab008eea..8f98cbe3 100644 --- a/.github/workflows/wasm.yml +++ b/.github/workflows/wasm.yml @@ -21,6 +21,7 @@ jobs: env: CIBW_BUILD: ${{ matrix.cibw_build }} CMAKE_ARGS: "-DWITH_OPTIM=OFF" + DEACTIVATE_OPENZL: "1" CIBW_TEST_COMMAND: "pytest {project}/tests/ndarray/test_reductions.py" strategy: matrix: diff --git a/CMakeLists.txt b/CMakeLists.txt index 097ae709..b57452f3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,5 +1,22 @@ cmake_minimum_required(VERSION 3.15.0) + +if(WIN32) + cmake_policy(SET CMP0091 NEW) + set(CMAKE_MSVC_RUNTIME_LIBRARY "MultiThreaded$<$:Debug>DLL" CACHE STRING "" FORCE) +endif() + + +if(WIN32 AND CMAKE_GENERATOR MATCHES "Visual Studio") + if(NOT DEFINED CMAKE_GENERATOR_TOOLSET) + set(CMAKE_GENERATOR_TOOLSET "ClangCL" CACHE STRING "Use ClangCL toolset for C99/C11 support on Windows." FORCE) + endif() +endif() + project(python-blosc2) + +if(WIN32 AND NOT CMAKE_C_COMPILER_ID STREQUAL "Clang") + message(FATAL_ERROR "Windows builds require clang-cl. Set CC/CXX to clang-cl or configure CMake with -T ClangCL.") +endif() # Specifying Python version below is tricky, but if you don't specify the minimum version here, # it would not consider python3 when looking for the executable. This is problematic since Fedora # does not include a python symbolic link to python3. @@ -23,11 +40,55 @@ add_custom_command( "${CMAKE_CURRENT_SOURCE_DIR}/src/blosc2/blosc2_ext.pyx" --output-file blosc2_ext.c DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/src/blosc2/blosc2_ext.pyx" VERBATIM) + # ...and add it to the target Python_add_library(blosc2_ext MODULE blosc2_ext.c WITH_SOABI) + # We need to link against NumPy target_link_libraries(blosc2_ext PRIVATE Python::NumPy) +# Fetch and build miniexpr library +include(FetchContent) + +set(CMAKE_POSITION_INDEPENDENT_CODE ON) +set(MINIEXPR_BUILD_SHARED OFF CACHE BOOL "Build miniexpr shared library" FORCE) +set(MINIEXPR_BUILD_TESTS OFF CACHE BOOL "Build miniexpr tests" FORCE) +set(MINIEXPR_BUILD_EXAMPLES OFF CACHE BOOL "Build miniexpr examples" FORCE) +set(MINIEXPR_BUILD_BENCH OFF CACHE BOOL "Build miniexpr benchmarks" FORCE) + +FetchContent_Declare(miniexpr + GIT_REPOSITORY https://github.com/Blosc/miniexpr.git + GIT_TAG ndim # latest me_compile_nd()/me_eval_nd() APIs + # In case you want to use a local copy of miniexpr for development, uncomment the line below + # SOURCE_DIR "/Users/faltet/blosc/miniexpr" +) +FetchContent_MakeAvailable(miniexpr) + +# Link against miniexpr static library +target_link_libraries(blosc2_ext PRIVATE miniexpr_static) + +target_compile_features(blosc2_ext PRIVATE c_std_11) +if(WIN32 AND CMAKE_C_COMPILER_ID STREQUAL "Clang") + execute_process( + COMMAND "${CMAKE_C_COMPILER}" -print-resource-dir + OUTPUT_VARIABLE _clang_resource_dir + OUTPUT_STRIP_TRAILING_WHITESPACE + ERROR_QUIET + ) + if(_clang_resource_dir) + if(CMAKE_SIZEOF_VOID_P EQUAL 8) + set(_clang_builtins "${_clang_resource_dir}/lib/windows/clang_rt.builtins-x86_64.lib") + else() + set(_clang_builtins "${_clang_resource_dir}/lib/windows/clang_rt.builtins-i386.lib") + endif() + if(EXISTS "${_clang_builtins}") + target_link_libraries(blosc2_ext PRIVATE "${_clang_builtins}") + endif() + unset(_clang_builtins) + endif() + unset(_clang_resource_dir) +endif() + if(DEFINED ENV{USE_SYSTEM_BLOSC2}) set(USE_SYSTEM_BLOSC2 ON) endif() @@ -44,13 +105,20 @@ else() set(BUILD_EXAMPLES OFF CACHE BOOL "Build C-Blosc2 examples") set(BUILD_BENCHMARKS OFF CACHE BOOL "Build C-Blosc2 benchmarks") set(BUILD_FUZZERS OFF CACHE BOOL "Build C-Blosc2 fuzzers") + if(DEFINED ENV{DEACTIVATE_OPENZL}) + set(DEACTIVATE_OPENZL ON CACHE BOOL "Do not include support for the OpenZL library.") + endif() set(CMAKE_POSITION_INDEPENDENT_CODE ON) # we want the binaries of the C-Blosc2 library to go into the wheels set(BLOSC_INSTALL ON) include(FetchContent) FetchContent_Declare(blosc2 GIT_REPOSITORY https://github.com/Blosc/c-blosc2 - GIT_TAG 5a2b0ed9c4d801230c118fbc5811817055b5a3f5 # v2.22.0 + GIT_TAG 011c9e537f28299c536294d842e1a3d0e41db24f # openzl_plugin + miniexpr + # GIT_TAG main + GIT_SHALLOW TRUE # fetch only the latest commit (only works with a branch in GIT_TAG) + # in case you want to use a local copy of c-blosc2 for development, uncomment the line below + # SOURCE_DIR "/Users/faltet/blosc/c-blosc2" ) FetchContent_MakeAvailable(blosc2) include_directories("${blosc2_SOURCE_DIR}/include") diff --git a/README.rst b/README.rst index 3d442419..7480d64f 100644 --- a/README.rst +++ b/README.rst @@ -53,6 +53,17 @@ Conda users can install from conda-forge: conda install -c conda-forge python-blosc2 +Windows note +============ + +When building from source on Windows, clang-cl is required (OpenZL depends on C11 support). +Make sure LLVM is on PATH and use the Ninja generator, for example:: + + CMAKE_GENERATOR=Ninja + CC=clang-cl + CXX=clang-cl + pip install -e . + Documentation ============= diff --git a/README_DEVELOPERS.md b/README_DEVELOPERS.md index 5c2ee0cc..f28a8eca 100644 --- a/README_DEVELOPERS.md +++ b/README_DEVELOPERS.md @@ -22,6 +22,16 @@ You are done! pip install . # add -e for editable mode ``` +On Windows, clang-cl is required (OpenZL depends on C11 support). Make sure LLVM +is on PATH and build with Ninja, for example: + +```bash +CMAKE_GENERATOR=Ninja \ +CC=clang-cl \ +CXX=clang-cl \ +pip install -e . +``` + There are situations where you may want to build the C-Blosc2 library separately, for example, when debugging issues in the C library. In that case, let's assume you have the C-Blosc2 library installed in `/usr/local`: ```bash @@ -38,6 +48,31 @@ LD_LIBRARY_PATH=/usr/local/lib pytest That's it! You can now proceed to the testing section. +### Speeding up local builds (sccache + Ninja) + +If you do frequent local rebuilds, sccache can significantly speed up C/C++ rebuilds. + +```bash +brew install sccache ninja +``` + +Then run: + +```bash +CMAKE_C_COMPILER_LAUNCHER=sccache \ +SKBUILD_BUILD_DIR=build \ +pip install -e . --no-build-isolation +``` + +Using `SKBUILD_BUILD_DIR` keeps a stable build directory between runs, which +improves incremental rebuilds and sccache hit rates. + +Check cache stats with: + +```bash +sccache --show-stats +``` + ## Testing We are using pytest for testing. You can run the tests by executing diff --git a/bench/ndarray/miniexpr-eval.py b/bench/ndarray/miniexpr-eval.py new file mode 100644 index 00000000..c7b89035 --- /dev/null +++ b/bench/ndarray/miniexpr-eval.py @@ -0,0 +1,47 @@ +from time import time +import blosc2 +import numpy as np +import numexpr as ne + +N = 10_000 +# dtype= np.int32 +dtype= np.float32 +# dtype= np.float64 +cparams = blosc2.CParams(codec=blosc2.Codec.BLOSCLZ, clevel=1) + +t0 = time() +# a = blosc2.ones((N, N), dtype=dtype, cparams=cparams) +# a = blosc2.arange(np.prod((N, N)), shape=(N, N), dtype=dtype, cparams=cparams) +a = blosc2.linspace(0., 1., np.prod((N, N)), shape=(N, N), dtype=dtype, cparams=cparams) +print(f"Time to create data: {(time() - t0) * 1000 :.4f} ms") +t0 = time() +b = a.copy() +c = a.copy() +print(f"Time to copy data: {(time() - t0) * 1000 :.4f} ms") + +t0 = time() +res = (2 * a**2 - 3 * b + c + 1.2).compute(cparams=cparams) +t = time() - t0 +print(f"Time to evaluate: {t * 1000 :.4f} ms", end=" ") +print(f"Speed (GB/s): {(a.nbytes * 4 / 1e9) / t:.2f}") +# print(res.info) + +na = a[:] +nb = b[:] +nc = c[:] + +t0 = time() +nres = 2 * na**2 - 3 * nb + nc + 1.2 +nt = time() - t0 +print(f"Time to evaluate with NumPy: {nt * 1000 :.4f} ms", end=" ") +print(f"Speed (GB/s): {(na.nbytes * 4 / 1e9) / nt:.2f}") +print(f"Speedup Blosc2 vs NumPy: {nt / t:.2f}x") +np.testing.assert_allclose(res, nres, rtol=1e-5) + +t0 = time() +neres = ne.evaluate("2 * na**2 - 3 * nb + nc + 1.2") +net = time() - t0 +print(f"Time to evaluate with NumExpr: {net * 1000 :.4f} ms", end=" ") +print(f"Speed (GB/s): {(na.nbytes * 4 / 1e9) / net:.2f}") +print(f"Speedup Blosc2 vs NumExpr: {net / t:.2f}x") +np.testing.assert_allclose(res, neres, rtol=1e-5) diff --git a/bench/ndarray/miniexpr-reduct-sum-multi.py b/bench/ndarray/miniexpr-reduct-sum-multi.py new file mode 100644 index 00000000..badd9647 --- /dev/null +++ b/bench/ndarray/miniexpr-reduct-sum-multi.py @@ -0,0 +1,49 @@ +from time import time +import blosc2 +import numpy as np +import numexpr as ne + +N = 10_000 +dtype= np.float32 +cparams = blosc2.CParams(codec=blosc2.Codec.BLOSCLZ, clevel=1) + +t0 = time() +#a = blosc2.ones((N, N), dtype=dtype, cparams=cparams) +#a = blosc2.arange(np.prod((N, N)), shape=(N, N), dtype=dtype, cparams=cparams) +a = blosc2.linspace(0., 1., np.prod((N, N)), shape=(N, N), dtype=dtype, cparams=cparams) +#rng = np.random.default_rng(1234) +#a = rng.integers(0, 2, size=(N, N), dtype=dtype) +#a = blosc2.asarray(a, cparams=cparams, urlpath="a.b2nd", mode="w") +print(f"Time to create data: {(time() - t0) * 1000 :.4f} ms") +t0 = time() +b = a.copy() +c = a.copy() +print(f"Time to copy data: {(time() - t0) * 1000 :.4f} ms") + +t0 = time() +res = blosc2.sum(2 * a**2 - 3 * b + c + 1.2) +t = time() - t0 +print(f"Time to evaluate: {t * 1000 :.4f} ms", end=" ") +print(f"Speed (GB/s): {(a.nbytes * 3 / 1e9) / t:.2f}") +print("Result:", res, "Mean:", res / (N * N)) + +na = a[:] +nb = b[:] +nc = c[:] + +t0 = time() +nres = np.sum(2 * na**2 - 3 * nb + nc + 1.2) +nt = time() - t0 +print(f"Time to evaluate with NumPy: {nt * 1000 :.4f} ms", end=" ") +print(f"Speed (GB/s): {(na.nbytes * 3 / 1e9) / nt:.2f}") +print("Result:", nres, "Mean:", nres / (N * N)) +print(f"Speedup Blosc2 vs NumPy: {nt / t:.2f}x") +assert np.allclose(res, nres) + +t0 = time() +neres = ne.evaluate("sum(2 * na**2 - 3 * nb + nc + 1.2)") +net = time() - t0 +print(f"Time to evaluate with NumExpr: {net * 1000 :.4f} ms", end=" ") +print(f"Speed (GB/s): {(na.nbytes * 3 / 1e9) / net:.2f}") +print("Result:", neres, "Mean:", neres / (N * N)) +print(f"Speedup Blosc2 vs NumExpr: {net / t:.2f}x") diff --git a/bench/ndarray/miniexpr-reduct-sum.py b/bench/ndarray/miniexpr-reduct-sum.py new file mode 100644 index 00000000..e8a76119 --- /dev/null +++ b/bench/ndarray/miniexpr-reduct-sum.py @@ -0,0 +1,42 @@ +from time import time +import blosc2 +import numpy as np +import numexpr as ne + +N = 10_000 +# dtype= np.int32 +dtype= np.float32 +# dtype= np.float64 +cparams = blosc2.CParams(codec=blosc2.Codec.BLOSCLZ, clevel=1) + +t0 = time() +# a = blosc2.ones((N, N), dtype=dtype, cparams=cparams) +# a = blosc2.arange(np.prod((N, N)), shape=(N, N), dtype=dtype, cparams=cparams) +a = blosc2.linspace(0., 1., np.prod((N, N)), shape=(N, N), dtype=dtype, cparams=cparams) +print(f"Time to create data: {(time() - t0) * 1000 :.4f} ms") + +t0 = time() +res = blosc2.sum(a) +t = time() - t0 +print(f"Time to evaluate: {t * 1000 :.4f} ms", end=" ") +print(f"Speed (GB/s): {(a.nbytes / 1e9) / t:.2f}") +print("Result:", res, "Mean:", res / (N * N)) + +na = a[:] + +t0 = time() +nres = np.sum(na) +nt = time() - t0 +print(f"Time to evaluate with NumPy: {nt * 1000 :.4f} ms", end=" ") +print(f"Speed (GB/s): {(na.nbytes / 1e9) / nt:.2f}") +print("Result:", nres, "Mean:", nres / (N * N)) +print(f"Speedup Blosc2 vs NumPy: {nt / t:.2f}x") +assert np.allclose(res, nres) + +t0 = time() +neres = ne.evaluate("sum(na)") +net = time() - t0 +print(f"Time to evaluate with NumExpr: {net * 1000 :.4f} ms", end=" ") +print(f"Speed (GB/s): {(na.nbytes / 1e9) / net:.2f}") +print("Result:", neres, "Mean:", neres / (N * N)) +print(f"Speedup Blosc2 vs NumExpr: {net / t:.2f}x") diff --git a/doc/reference/classes.rst b/doc/reference/classes.rst index 1d9de69f..cca8c7e0 100644 --- a/doc/reference/classes.rst +++ b/doc/reference/classes.rst @@ -54,3 +54,4 @@ Other Classes Storage Tuner URLPath + FPAccuracy diff --git a/doc/reference/misc.rst b/doc/reference/misc.rst index 8e1d34f4..279ed79a 100644 --- a/doc/reference/misc.rst +++ b/doc/reference/misc.rst @@ -57,6 +57,7 @@ This page documents the miscellaneous members of the ``blosc2`` module that do n SpecialValue, SplitMode, Tuner, + FPAccuracy, compute_chunks_blocks, get_slice_nchunks, remove_urlpath, diff --git a/doc/reference/reduction_functions.rst b/doc/reference/reduction_functions.rst index d9b61834..45f27abe 100644 --- a/doc/reference/reduction_functions.rst +++ b/doc/reference/reduction_functions.rst @@ -3,7 +3,7 @@ Reduction Functions Contrarily to lazy functions, reduction functions are evaluated eagerly, and the result is always a NumPy array (although this can be converted internally into an :ref:`NDArray ` if you pass any :func:`blosc2.empty` arguments in ``kwargs``). -Reduction operations can be used with any of :ref:`NDArray `, :ref:`C2Array `, :ref:`NDField ` and :ref:`LazyExpr `. Again, although these can be part of a :ref:`LazyExpr `, you must be aware that they are not lazy, but will be evaluated eagerly during the construction of a LazyExpr instance (this might change in the future). +Reduction operations can be used with any of :ref:`NDArray `, :ref:`C2Array `, :ref:`NDField ` and :ref:`LazyExpr `. Again, although these can be part of a :ref:`LazyExpr `, you must be aware that they are not lazy, but will be evaluated eagerly during the construction of a LazyExpr instance (this might change in the future). When the input is a :ref:`LazyExpr`, reductions accept ``fp_accuracy`` to control floating-point accuracy, and it is forwarded to :func:`LazyExpr.compute`. .. currentmodule:: blosc2 diff --git a/pyproject.toml b/pyproject.toml index 93073710..86cf1999 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -65,7 +65,8 @@ dev = [ test = [ "pytest", "psutil; platform_machine != 'wasm32'", - "torch; platform_machine != 'wasm32'", + # torch is optional because it is quite large (but will still be used if found) + # "torch; platform_machine != 'wasm32'", ] doc = [ "sphinx>=8", diff --git a/src/blosc2/__init__.py b/src/blosc2/__init__.py index 12a2a908..9a5488de 100644 --- a/src/blosc2/__init__.py +++ b/src/blosc2/__init__.py @@ -113,6 +113,21 @@ class Tuner(Enum): BTUNE = 32 +class FPAccuracy(Enum): + """ + Floating point accuracy modes for Blosc2 computing with lazy expressions. + + This is only relevant when using floating point dtypes with miniexpr. + """ + + #: Use 1.0 ULPs (Units in the Last Place) for floating point functions + HIGH = 1 + #: Use 3.5 ULPs (Units in the Last Place) for floating point functions + MEDIUM = 2 + #: Use default accuracy. This is MEDIUM, which should be enough for most applications. + DEFAULT = MEDIUM + + from .blosc2_ext import ( DEFINED_CODECS_STOP, EXTENDED_HEADER_LENGTH, diff --git a/src/blosc2/blosc2_ext.pyx b/src/blosc2/blosc2_ext.pyx index 91276883..cd4e5697 100644 --- a/src/blosc2/blosc2_ext.pyx +++ b/src/blosc2/blosc2_ext.pyx @@ -23,12 +23,13 @@ from cpython cimport ( PyBytes_FromStringAndSize, PyObject_GetBuffer, ) +from cpython.ref cimport Py_INCREF, Py_DECREF from cpython.pycapsule cimport PyCapsule_GetPointer, PyCapsule_New from cython.operator cimport dereference from libc.stdint cimport uintptr_t -from libc.stdlib cimport free, malloc, realloc +from libc.stdlib cimport free, malloc, realloc, calloc from libc.stdlib cimport abs as c_abs -from libc.string cimport memcpy, strcpy, strdup, strlen +from libc.string cimport memcpy, memset, strcpy, strdup, strlen from libcpp cimport bool as c_bool from enum import Enum @@ -53,6 +54,8 @@ cdef extern from "": ctypedef unsigned int uint32_t ctypedef unsigned long long uint64_t +cdef extern from "": + int printf(const char *format, ...) nogil cdef extern from "blosc2.h": @@ -179,7 +182,7 @@ cdef extern from "blosc2.h": int blosc2_free_resources() int blosc2_cbuffer_sizes(const void* cbuffer, int32_t* nbytes, - int32_t* cbytes, int32_t* blocksize) + int32_t* cbytes, int32_t* blocksize) nogil int blosc1_cbuffer_validate(const void* cbuffer, size_t cbytes, size_t* nbytes) @@ -206,6 +209,7 @@ cdef extern from "blosc2.h": uint8_t* ttmp size_t ttmp_nbytes blosc2_context* ctx + c_bool output_is_disposable ctypedef struct blosc2_postfilter_params: void *user_data @@ -257,7 +261,7 @@ cdef extern from "blosc2.h": blosc2_context* blosc2_create_cctx(blosc2_cparams cparams) nogil - blosc2_context* blosc2_create_dctx(blosc2_dparams dparams) + blosc2_context* blosc2_create_dctx(blosc2_dparams dparams) nogil void blosc2_free_ctx(blosc2_context * context) nogil @@ -280,7 +284,7 @@ cdef extern from "blosc2.h": int blosc2_getitem_ctx(blosc2_context* context, const void* src, int32_t srcsize, int start, int nitems, void* dest, - int32_t destsize) + int32_t destsize) nogil @@ -374,9 +378,9 @@ cdef extern from "blosc2.h": int blosc2_schunk_decompress_chunk(blosc2_schunk *schunk, int64_t nchunk, void *dest, int32_t nbytes) int blosc2_schunk_get_chunk(blosc2_schunk *schunk, int64_t nchunk, uint8_t ** chunk, - c_bool *needs_free) + c_bool *needs_free) nogil int blosc2_schunk_get_lazychunk(blosc2_schunk *schunk, int64_t nchunk, uint8_t ** chunk, - c_bool *needs_free) + c_bool *needs_free) nogil int blosc2_schunk_get_slice_buffer(blosc2_schunk *schunk, int64_t start, int64_t stop, void *buffer) int blosc2_schunk_set_slice_buffer(blosc2_schunk *schunk, int64_t start, int64_t stop, void *buffer) int blosc2_schunk_get_cparams(blosc2_schunk *schunk, blosc2_cparams** cparams) @@ -518,13 +522,106 @@ cdef extern from "b2nd.h": int64_t *buffershape, int64_t buffersize) int b2nd_from_schunk(blosc2_schunk *schunk, b2nd_array_t **array) - void blosc2_unidim_to_multidim(uint8_t ndim, int64_t *shape, int64_t i, int64_t *index) + void blosc2_unidim_to_multidim(uint8_t ndim, int64_t *shape, int64_t i, int64_t *index) nogil int b2nd_copy_buffer2(int8_t ndim, int32_t itemsize, const void *src, const int64_t *src_pad_shape, const int64_t *src_start, const int64_t *src_stop, void *dst, const int64_t *dst_pad_shape, - const int64_t *dst_start); + const int64_t *dst_start) + + +# miniexpr C API declarations +cdef extern from "miniexpr.h": + ctypedef enum me_dtype: + ME_AUTO, + ME_BOOL + ME_INT8 + ME_INT16 + ME_INT32 + ME_INT64 + ME_UINT8 + ME_UINT16 + ME_UINT32 + ME_UINT64 + ME_FLOAT32 + ME_FLOAT64 + ME_COMPLEX64 + ME_COMPLEX128 + + # typedef struct me_variable + ctypedef struct me_variable: + const char *name + me_dtype dtype + const void *address + int type + void *context + + ctypedef struct me_expr: + int type + double value + const double *bound + const void *function + void *output + int nitems + me_dtype dtype + me_dtype input_dtype + void *bytecode + int ncode + void *parameters[1] + + int me_compile(const char *expression, const me_variable *variables, + int var_count, me_dtype dtype, int *error, me_expr **out) + + int me_compile_nd(const char *expression, const me_variable *variables, + int var_count, me_dtype dtype, int ndims, + const int64_t *shape, const int32_t *chunkshape, + const int32_t *blockshape, int *error, me_expr **out) + + ctypedef enum me_compile_status: + ME_COMPILE_SUCCESS + ME_COMPILE_ERR_OOM + ME_COMPILE_ERR_PARSE + ME_COMPILE_ERR_INVALID_ARG + ME_COMPILE_ERR_COMPLEX_UNSUPPORTED + ME_COMPILE_ERR_REDUCTION_INVALID + ME_COMPILE_ERR_VAR_MIXED + ME_COMPILE_ERR_VAR_UNSPECIFIED + ME_COMPILE_ERR_INVALID_ARG_TYPE + ME_COMPILE_ERR_MIXED_TYPE_NESTED + + ctypedef enum me_simd_ulp_mode: + ME_SIMD_ULP_DEFAULT + ME_SIMD_ULP_1 + ME_SIMD_ULP_3_5 + + ctypedef struct me_eval_params: + c_bool disable_simd + me_simd_ulp_mode simd_ulp_mode + + int me_eval(const me_expr *expr, const void **vars_block, + int n_vars, void *output_block, int chunk_nitems, + const me_eval_params *params) nogil + + int me_eval_nd(const me_expr *expr, const void **vars_block, + int n_vars, void *output_block, int block_nitems, + int64_t nchunk, int64_t nblock, const me_eval_params *params) nogil + + int me_nd_valid_nitems(const me_expr *expr, int64_t nchunk, int64_t nblock, int64_t *valid_nitems) nogil + + void me_print(const me_expr *n) nogil + void me_free(me_expr *n) nogil + + +cdef extern from "miniexpr_numpy.h": + me_dtype me_dtype_from_numpy(int numpy_type_num) + +cdef extern from "pythread.h": + ctypedef void* PyThread_type_lock + PyThread_type_lock PyThread_allocate_lock() nogil + int PyThread_acquire_lock(PyThread_type_lock lock, int waitflag) nogil + void PyThread_release_lock(PyThread_type_lock lock) nogil + void PyThread_free_lock(PyThread_type_lock lock) nogil ctypedef struct user_filters_udata: @@ -547,6 +644,16 @@ ctypedef struct udf_udata: int64_t chunks_in_array[B2ND_MAX_DIM] int64_t blocks_in_chunk[B2ND_MAX_DIM] +ctypedef struct me_udata: + b2nd_array_t** inputs + int ninputs + me_eval_params* eval_params + b2nd_array_t* array + void* aux_reduc_ptr + int64_t chunks_in_array[B2ND_MAX_DIM] + int64_t blocks_in_chunk[B2ND_MAX_DIM] + me_expr* miniexpr_handle + MAX_TYPESIZE = BLOSC2_MAXTYPESIZE MAX_BUFFERSIZE = BLOSC2_MAX_BUFFERSIZE MAX_BLOCKSIZE = BLOSC2_MAXBLOCKSIZE @@ -567,9 +674,14 @@ cdef _check_comp_length(comp_name, comp_len): blosc2_init() +cdef PyThread_type_lock chunk_cache_lock = PyThread_allocate_lock() +if chunk_cache_lock == NULL: + raise MemoryError("Could not allocate chunk cache lock") @atexit.register def destroy(): + if chunk_cache_lock != NULL: + PyThread_free_lock(chunk_cache_lock) blosc2_destroy() @@ -770,7 +882,8 @@ cdef _check_cparams(blosc2_cparams *cparams): if ufilters[i] and cparams.filters[i] in blosc2.ufilters_registry.keys(): raise ValueError("Cannot use multi-threading with user defined Python filters") - if cparams.prefilter != NULL: + if cparams.prefilter != NULL and cparams.prefilter != miniexpr_prefilter: + # Note: miniexpr_prefilter uses miniexpr C API which is thread-friendly, raise ValueError("`nthreads` must be 1 when a prefilter is set") cdef _check_dparams(blosc2_dparams* dparams, blosc2_cparams* cparams=NULL): @@ -1368,6 +1481,7 @@ cdef class SChunk: cdef int size cdef int32_t len_chunk = (buf.len + BLOSC2_MAX_OVERHEAD) cdef uint8_t* chunk = malloc(len_chunk) + self.schunk.current_nchunk = nchunk # prefilter needs this value to be set if RELEASEGIL: with nogil: # No need to create another cctx @@ -1406,6 +1520,7 @@ cdef class SChunk: cdef int size cdef int32_t len_chunk = (buf.len + BLOSC2_MAX_OVERHEAD) cdef uint8_t* chunk = malloc(len_chunk) + self.schunk.current_nchunk = nchunk # prefilter needs this value to be set if RELEASEGIL: with nogil: size = blosc2_compress_ctx(self.schunk.cctx, buf.buf, buf.len, chunk, len_chunk) @@ -1428,6 +1543,22 @@ cdef class SChunk: raise RuntimeError("Could not update the desired chunk") return rc + # This is used internally for prefiltering + def _prefilter_data(self, nchunk, data, chunk_data): + cdef Py_buffer buf + PyObject_GetBuffer(data, &buf, PyBUF_SIMPLE) + cdef Py_buffer chunk_buf + PyObject_GetBuffer(chunk_data, &chunk_buf, PyBUF_SIMPLE) + self.schunk.current_nchunk = nchunk # prefilter needs this value to be set + cdef int size = blosc2_compress_ctx(self.schunk.cctx, buf.buf, buf.len, chunk_buf.buf, chunk_buf.len) + PyBuffer_Release(&buf) + PyBuffer_Release(&chunk_buf) + if size < 0: + raise RuntimeError("Could not compress the data") + elif size == 0: + raise RuntimeError("The result could not fit ") + return size + def get_slice(self, start=0, stop=None, out=None): cdef int64_t nitems = self.schunk.nbytes // self.schunk.typesize start, stop, _ = slice(start, stop, 1).indices(nitems) @@ -1616,7 +1747,7 @@ cdef class SChunk: cdef blosc2_cparams* cparams = self.schunk.storage.cparams cparams.prefilter = general_filler - cdef blosc2_prefilter_params* preparams = malloc(sizeof(blosc2_prefilter_params)) + cdef blosc2_prefilter_params* preparams = calloc(1, sizeof(blosc2_prefilter_params)) cdef filler_udata* fill_udata = malloc(sizeof(filler_udata)) fill_udata.py_func = malloc(strlen(func_id) + 1) strcpy(fill_udata.py_func, func_id) @@ -1649,7 +1780,7 @@ cdef class SChunk: cdef blosc2_cparams* cparams = self.schunk.storage.cparams cparams.prefilter = general_prefilter - cdef blosc2_prefilter_params* preparams = malloc(sizeof(blosc2_prefilter_params)) + cdef blosc2_prefilter_params* preparams = calloc(1, sizeof(blosc2_prefilter_params)) cdef user_filters_udata* pref_udata = malloc(sizeof(user_filters_udata)) pref_udata.py_func = malloc(strlen(func_id) + 1) strcpy(pref_udata.py_func, func_id) @@ -1661,24 +1792,54 @@ cdef class SChunk: cparams.preparams = preparams _check_cparams(cparams) - blosc2_free_ctx(self.schunk.cctx) + if self.schunk.cctx != NULL: + # Freeing NULL context can lead to segmentation fault + blosc2_free_ctx(self.schunk.cctx) self.schunk.cctx = blosc2_create_cctx(dereference(cparams)) if self.schunk.cctx == NULL: raise RuntimeError("Could not create compression context") cpdef remove_prefilter(self, func_name, _new_ctx=True): - if func_name is not None: + cdef udf_udata* udf_data + cdef user_filters_udata* udata + + if func_name is not None and func_name in blosc2.prefilter_funcs: del blosc2.prefilter_funcs[func_name] - # From Python the preparams->udata with always have the field py_func - cdef user_filters_udata * udata = self.schunk.storage.cparams.preparams.user_data - free(udata.py_func) - free(self.schunk.storage.cparams.preparams.user_data) - free(self.schunk.storage.cparams.preparams) + # Clean up the miniexpr handle if this is a miniexpr_prefilter + if self.schunk.storage.cparams.prefilter == miniexpr_prefilter: + if self.schunk.storage.cparams.preparams != NULL: + me_data = self.schunk.storage.cparams.preparams.user_data + if me_data != NULL: + if me_data.inputs != NULL: + for i in range(me_data.ninputs): + if me_data.inputs[i].chunk_cache.data != NULL: + free(me_data.inputs[i].chunk_cache.data) + me_data.inputs[i].chunk_cache.data = NULL + me_data.inputs[i].chunk_cache.nchunk = -1 + free(me_data.inputs) + if me_data.miniexpr_handle != NULL: # XXX do we really need the conditional? + me_free(me_data.miniexpr_handle) + if me_data.eval_params != NULL: + free(me_data.eval_params) + free(me_data) + elif self.schunk.storage.cparams.prefilter != NULL: + # From Python the preparams->udata with always have the field py_func + if self.schunk.storage.cparams.preparams != NULL: + udata = self.schunk.storage.cparams.preparams.user_data + if udata != NULL: + if udata.py_func != NULL: + free(udata.py_func) + free(udata) + + if self.schunk.storage.cparams.preparams != NULL: + free(self.schunk.storage.cparams.preparams) self.schunk.storage.cparams.preparams = NULL self.schunk.storage.cparams.prefilter = NULL - blosc2_free_ctx(self.schunk.cctx) + if self.schunk.cctx != NULL: + # Freeing NULL context can lead to segmentation fault + blosc2_free_ctx(self.schunk.cctx) if _new_ctx: self.schunk.cctx = blosc2_create_cctx(dereference(self.schunk.storage.cparams)) if self.schunk.cctx == NULL: @@ -1741,6 +1902,134 @@ cdef int general_filler(blosc2_prefilter_params *params): return 0 +# Auxiliary function for miniexpr as a prefilter +# Only meant for (input and output) arrays that are blosc2.NDArray objects. +cdef int aux_miniexpr(me_udata *udata, int64_t nchunk, int32_t nblock, + c_bool is_postfilter, uint8_t *params_output, int32_t typesize) nogil: + # Declare all C variables at the beginning + cdef int64_t chunk_ndim[B2ND_MAX_DIM] + cdef int64_t block_ndim[B2ND_MAX_DIM] + cdef int64_t start_ndim[B2ND_MAX_DIM] + cdef int64_t stop_ndim[B2ND_MAX_DIM] + cdef int64_t buffershape[B2ND_MAX_DIM] + + cdef b2nd_array_t* ndarr + cdef int rc + cdef void** input_buffers = malloc(udata.ninputs * sizeof(uint8_t*)) + cdef float *buf + cdef uint8_t* src + cdef uint8_t* chunk + cdef c_bool needs_free + cdef int32_t chunk_nbytes, chunk_cbytes, block_nbytes + cdef int start, blocknitems, expected_blocknitems + cdef int64_t valid_nitems + cdef int32_t input_typesize + cdef blosc2_context* dctx + expected_blocknitems = -1 + valid_nitems = 0 + + cdef me_expr* miniexpr_handle = udata.miniexpr_handle + cdef void* aux_reduc_ptr + + if miniexpr_handle == NULL: + raise ValueError("miniexpr: handle not assigned") + + # Query valid (unpadded) items for this block + rc = me_nd_valid_nitems(miniexpr_handle, nchunk, nblock, &valid_nitems) + if rc != 0: + raise RuntimeError(f"miniexpr: invalid block; error code: {rc}") + if valid_nitems <= 0: + # Nothing to compute for this block. + # For reductions, keep aux_reduc neutral values untouched. + if udata.aux_reduc_ptr == NULL: + memset(params_output, 0, udata.array.blocknitems * typesize) + free(input_buffers) + return 0 + for i in range(udata.ninputs): + ndarr = udata.inputs[i] + input_buffers[i] = malloc(ndarr.sc.blocksize) + if ndarr.sc.storage.urlpath == NULL: + src = ndarr.sc.data[nchunk] + else: + # We need to get the chunk from disk/network + if ndarr.chunk_cache.nchunk != nchunk: + PyThread_acquire_lock(chunk_cache_lock, 1) + # We need to check again, as another thread may have updated the cache already + if ndarr.chunk_cache.nchunk != nchunk: + if ndarr.chunk_cache.data != NULL: + free(ndarr.chunk_cache.data) + ndarr.chunk_cache.data = NULL + rc = blosc2_schunk_get_chunk(ndarr.sc, nchunk, &chunk, &needs_free) + if rc < 0: + PyThread_release_lock(chunk_cache_lock) + raise ValueError("miniexpr: error getting chunk") + if not needs_free: + src = malloc(rc) + if src == NULL: + PyThread_release_lock(chunk_cache_lock) + raise MemoryError("miniexpr: cannot allocate chunk copy") + memcpy(src, chunk, rc) + else: + src = chunk + ndarr.chunk_cache.data = src + ndarr.chunk_cache.nchunk = nchunk + PyThread_release_lock(chunk_cache_lock) + src = ndarr.chunk_cache.data + rc = blosc2_cbuffer_sizes(src, &chunk_nbytes, &chunk_cbytes, &block_nbytes) + if rc < 0: + raise ValueError("miniexpr: error getting cbuffer sizes") + input_typesize = ndarr.sc.typesize + blocknitems = block_nbytes // input_typesize + if expected_blocknitems == -1: + expected_blocknitems = blocknitems + elif blocknitems != expected_blocknitems: + raise ValueError("miniexpr: inconsistent block element counts across inputs") + start = nblock * blocknitems + # This is needed for thread safety, but adds a pretty low overhead (< 400ns on a modern CPU) + # In the future, perhaps one can create a specific (serial) context just for + # blosc2_getitem_ctx, but this is probably never going to be necessary. + dctx = blosc2_create_dctx(BLOSC2_DPARAMS_DEFAULTS) + # Unsafe, but it works for special arrays (e.g. blosc2.ones), and can be used for profiling + # dctx = ndarr.sc.dctx + if valid_nitems > blocknitems: + raise ValueError("miniexpr: valid items exceed padded block size") + rc = blosc2_getitem_ctx(dctx, src, chunk_cbytes, start, blocknitems, + input_buffers[i], block_nbytes) + blosc2_free_ctx(dctx) + if rc < 0: + raise ValueError("miniexpr: error decompressing the chunk") + # For reduction operations, we need to track which block we're processing + # The linear_block_index should be based on the INPUT array structure, not the output array + # Get the first input array's chunk and block structure + cdef b2nd_array_t* first_input = udata.inputs[0] + cdef int nblocks_per_chunk = 1 + for i in range(first_input.ndim): + nblocks_per_chunk *= udata.blocks_in_chunk[i] + # Calculate the global linear block index: nchunk * blocks_per_chunk + nblock + # This works because blocks never span chunks (chunks are padded to block boundaries) + cdef int64_t linear_block_index = nchunk * nblocks_per_chunk + nblock + cdef uintptr_t offset_bytes = typesize * linear_block_index + + # Call thread-safe miniexpr C API + if udata.aux_reduc_ptr == NULL: + aux_reduc_ptr = params_output + else: + # Reduction operation: evaluate only valid items into a single output element. + # NOTE: miniexpr handles scalar outputs in me_eval_nd without touching tail bytes. + aux_reduc_ptr = ( udata.aux_reduc_ptr + offset_bytes) + rc = me_eval_nd(miniexpr_handle, input_buffers, udata.ninputs, + aux_reduc_ptr, blocknitems, nchunk, nblock, udata.eval_params) + if rc != 0: + raise RuntimeError(f"miniexpr: issues during evaluation; error code: {rc}") + + # Free resources + for i in range(udata.ninputs): + free(input_buffers[i]) + free(input_buffers) + + return 0 + + # Aux function for prefilter and postfilter udf cdef int aux_udf(udf_udata *udata, int64_t nchunk, int32_t nblock, c_bool is_postfilter, uint8_t *params_output, int32_t typesize): @@ -1814,6 +2103,11 @@ cdef int aux_udf(udf_udata *udata, int64_t nchunk, int32_t nblock, return 0 +cdef int miniexpr_prefilter(blosc2_prefilter_params *params): + return aux_miniexpr( params.user_data, params.nchunk, params.nblock, False, + params.output, params.output_typesize) + + cdef int general_udf_prefilter(blosc2_prefilter_params *params): cdef udf_udata *udata = params.user_data return aux_udf(udata, params.nchunk, params.nblock, False, params.output, params.output_typesize) @@ -2361,6 +2655,10 @@ cdef class NDArray: self.array = PyCapsule_GetPointer(array, "b2nd_array_t*") self.base = base # add reference to base if NDArray is a view + @property + def c_array(self): + return self.array + @property def shape(self) -> tuple[int]: return tuple([self.array.shape[i] for i in range(self.array.ndim)]) @@ -2607,11 +2905,11 @@ cdef class NDArray: def as_ffi_ptr(self): return PyCapsule_New(self.array, "b2nd_array_t*", NULL) - cdef udf_udata *_fill_udf_udata(self, func_id, inputs_id): + cdef udf_udata *_fill_udf_udata(self, func_id, inputs): cdef udf_udata *udata = malloc(sizeof(udf_udata)) udata.py_func = malloc(strlen(func_id) + 1) strcpy(udata.py_func, func_id) - udata.inputs_id = inputs_id + udata.inputs_id = id(inputs) udata.output_cdtype = np.dtype(self.dtype).num udata.array = self.array # Save these in udf_udata to avoid computing them for each block @@ -2621,6 +2919,92 @@ cdef class NDArray: return udata + cdef me_udata *_fill_me_udata(self, inputs, fp_accuracy, aux_reduc): + cdef me_udata *udata = malloc(sizeof(me_udata)) + operands = list(inputs.values()) + ninputs = len(operands) + cdef b2nd_array_t** inputs_ = malloc(ninputs * sizeof(b2nd_array_t*)) + for i, operand in enumerate(operands): + inputs_[i] = operand.c_array + inputs_[i].chunk_cache.nchunk = -1 + inputs_[i].chunk_cache.data = NULL + udata.inputs = inputs_ + udata.ninputs = ninputs + cdef me_eval_params* eval_params = malloc(sizeof(me_eval_params)) + eval_params.disable_simd = False + eval_params.simd_ulp_mode = ME_SIMD_ULP_3_5 if fp_accuracy == blosc2.FPAccuracy.MEDIUM else ME_SIMD_ULP_1 + udata.eval_params = eval_params + udata.array = self.array + cdef void* aux_reduc_ptr = NULL + if aux_reduc is not None: + if not isinstance(aux_reduc, np.ndarray): + raise TypeError("aux_reduc must be a NumPy array") + aux_reduc_ptr = np.PyArray_DATA( aux_reduc) + udata.aux_reduc_ptr = aux_reduc_ptr + # Save these in udf_udata to avoid computing them for each block + for i in range(self.array.ndim): + udata.chunks_in_array[i] = udata.array.extshape[i] // udata.array.chunkshape[i] + udata.blocks_in_chunk[i] = udata.array.extchunkshape[i] // udata.array.blockshape[i] + + return udata + + def _set_pref_expr(self, expression, inputs, fp_accuracy, aux_reduc=None): + # Set prefilter for miniexpr + cdef blosc2_cparams* cparams = self.array.sc.storage.cparams + cparams.prefilter = miniexpr_prefilter + + cdef me_udata* udata = self._fill_me_udata(inputs, fp_accuracy, aux_reduc) + + # Get the compiled expression handle for multi-threading + cdef Py_ssize_t n = len(inputs) + cdef me_variable* variables = malloc(sizeof(me_variable) * n) + if variables == NULL: + raise MemoryError() + cdef me_variable *var + for i, (k, v) in enumerate(inputs.items()): + var = &variables[i] + var_name = k.encode("utf-8") if isinstance(k, str) else k + var.name = malloc(strlen(var_name) + 1) + strcpy(var.name, var_name) + var.dtype = me_dtype_from_numpy(v.dtype.num) + var.address = NULL # chunked compile: addresses provided later + var.type = 0 # auto-set to ME_VARIABLE inside compiler + var.context = NULL + + cdef int error = 0 + expression = expression.encode("utf-8") if isinstance(expression, str) else expression + cdef me_dtype = me_dtype_from_numpy(self.dtype.num) + cdef me_expr *out_expr + cdef int ndims = self.array.ndim + cdef int64_t* shape = &self.array.shape[0] + cdef int32_t* chunkshape = &self.array.chunkshape[0] + cdef int32_t* blockshape = &self.array.blockshape[0] + cdef int rc = me_compile_nd(expression, variables, n, me_dtype, ndims, + shape, chunkshape, blockshape, &error, &out_expr) + if rc == ME_COMPILE_ERR_INVALID_ARG_TYPE: + raise TypeError(f"miniexpr does not support operand or output dtype: {expression}") + if rc != ME_COMPILE_SUCCESS: + raise NotImplementedError(f"Cannot compile expression: {expression}") + udata.miniexpr_handle = out_expr + + # Free resources + for i in range(len(inputs)): + free(variables[i].name) + free(variables) + + cdef blosc2_prefilter_params* preparams = calloc(1, sizeof(blosc2_prefilter_params)) + preparams.user_data = udata + preparams.output_is_disposable = False if aux_reduc is None else True + cparams.preparams = preparams + _check_cparams(cparams) + + if self.array.sc.cctx != NULL: + # Freeing NULL context can lead to segmentation fault + blosc2_free_ctx(self.array.sc.cctx) + self.array.sc.cctx = blosc2_create_cctx(dereference(cparams)) + if self.array.sc.cctx == NULL: + raise RuntimeError("Could not create compression context") + def _set_pref_udf(self, func, inputs_id): if self.array.sc.storage.cparams.nthreads > 1: raise AttributeError("compress `nthreads` must be 1 when assigning a prefilter") @@ -2633,7 +3017,7 @@ cdef class NDArray: cdef blosc2_cparams* cparams = self.array.sc.storage.cparams cparams.prefilter = general_udf_prefilter - cdef blosc2_prefilter_params* preparams = malloc(sizeof(blosc2_prefilter_params)) + cdef blosc2_prefilter_params* preparams = calloc(1, sizeof(blosc2_prefilter_params)) preparams.user_data = self._fill_udf_udata(func_id, inputs_id) cparams.preparams = preparams _check_cparams(cparams) @@ -2661,7 +3045,9 @@ cdef class NDArray: dparams.postparams = postparams _check_dparams(dparams, self.array.sc.storage.cparams) - blosc2_free_ctx(self.array.sc.dctx) + if self.array.sc.dctx != NULL: + # Freeing NULL context can lead to segmentation fault + blosc2_free_ctx(self.array.sc.dctx) self.array.sc.dctx = blosc2_create_dctx(dereference(dparams)) if self.array.sc.dctx == NULL: raise RuntimeError("Could not create decompression context") diff --git a/src/blosc2/core.py b/src/blosc2/core.py index 4b348c79..15f58ed3 100644 --- a/src/blosc2/core.py +++ b/src/blosc2/core.py @@ -1547,12 +1547,14 @@ def compute_chunks_blocks( # noqa: C901 # min_blocksize = blosc2.cpu_info["l1_data_cache_size"] * 4 elif platform.system() == "Darwin" and "arm" in platform.machine(): # For Apple Silicon, experiments say we can use 4x the L1 size - min_blocksize = blosc2.cpu_info["l1_data_cache_size"] * 4 + # min_blocksize = blosc2.cpu_info["l1_data_cache_size"] * 4 + # However, let's adjust for several operands in cache, so let's use just L1 + min_blocksize = blosc2.cpu_info["l1_data_cache_size"] * 1 elif "l1_data_cache_size" in blosc2.cpu_info and isinstance( blosc2.cpu_info["l1_data_cache_size"], int ): - # For other archs, we don't have hints; be conservative and use 2x the L1 size - min_blocksize = blosc2.cpu_info["l1_data_cache_size"] * 2 + # For other archs, we don't have hints; be conservative and use 1x the L1 size + min_blocksize = blosc2.cpu_info["l1_data_cache_size"] * 1 if blocksize < min_blocksize: blocksize = min_blocksize diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index 7613c9f2..c68ad3b6 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -91,6 +91,15 @@ safe_numpy_globals["matrix_transpose"] = np.transpose safe_numpy_globals["vecdot"] = npvecdot +# Set this to False if miniexpr should not be tried out +try_miniexpr = True +if blosc2.IS_WASM: + try_miniexpr = False +if sys.platform == "win32": + # Although miniexpr has support for windows, the integration with Blosc2 + # still has some rough edges. + try_miniexpr = False + def ne_evaluate(expression, local_dict=None, **kwargs): """Safely evaluate expressions using numexpr when possible, falling back to numpy.""" @@ -293,7 +302,12 @@ def sort(self, order: str | list[str] | None = None) -> blosc2.LazyArray: pass @abstractmethod - def compute(self, item: slice | list[slice] | None = None, **kwargs: Any) -> blosc2.NDArray: + def compute( + self, + item: slice | list[slice] | None = None, + fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, + **kwargs: Any, + ) -> blosc2.NDArray: """ Return a :ref:`NDArray` containing the evaluation of the :ref:`LazyArray`. @@ -304,9 +318,14 @@ def compute(self, item: slice | list[slice] | None = None, **kwargs: Any) -> blo the evaluated result. This difference between slicing operands and slicing the final expression is important when reductions or a where clause are used in the expression. + fp_accuracy: :ref:`blosc2.FPAccuracy`, optional + Specifies the floating-point accuracy to be used during computation. + By default, :ref:`blosc2.FPAccuracy.DEFAULT` is used. + kwargs: Any, optional Keyword arguments that are supported by the :func:`empty` constructor. These arguments will be set in the resulting :ref:`NDArray`. + Additionally, the following special kwargs are supported: Returns ------- @@ -1216,12 +1235,25 @@ def fast_eval( # noqa: C901 :ref:`NDArray` or np.ndarray The output array. """ + global try_miniexpr + + # Use a local copy so we don't modify the global + use_miniexpr = try_miniexpr + + # Disable miniexpr for UDFs (callable expressions) + if callable(expression): + use_miniexpr = False + out = kwargs.pop("_output", None) ne_args: dict = kwargs.pop("_ne_args", {}) if ne_args is None: ne_args = {} + fp_accuracy = kwargs.pop("fp_accuracy", blosc2.FPAccuracy.DEFAULT) dtype = kwargs.pop("dtype", None) where: dict | None = kwargs.pop("_where_args", None) + if where is not None: + # miniexpr does not support where(); use the regular path. + use_miniexpr = False if isinstance(out, blosc2.NDArray): # If 'out' has been passed, and is a NDArray, use it as the base array basearr = out @@ -1261,6 +1293,46 @@ def fast_eval( # noqa: C901 # WebAssembly does not support threading, so we cannot use the iter_disk option iter_disk = False + # Check whether we can use miniexpr + if use_miniexpr: + # Require aligned NDArray operands with identical chunk/block grid. + same_shape = all(hasattr(op, "shape") and op.shape == shape for op in operands.values()) + same_chunks = all(hasattr(op, "chunks") and op.chunks == chunks for op in operands.values()) + same_blocks = all(hasattr(op, "blocks") and op.blocks == blocks for op in operands.values()) + if not (same_shape and same_chunks and same_blocks): + use_miniexpr = False + if not (all_ndarray and out is None): + use_miniexpr = False + + if use_miniexpr: + cparams = kwargs.pop("cparams", blosc2.CParams()) + # All values will be overwritten, so we can use an uninitialized array + res_eval = blosc2.uninit(shape, dtype, chunks=chunks, blocks=blocks, cparams=cparams, **kwargs) + try: + res_eval._set_pref_expr(expression, operands, fp_accuracy=fp_accuracy) + print("expr->miniexpr:", expression, fp_accuracy) + # Data to compress is fetched from operands, so it can be uninitialized here + data = np.empty(res_eval.schunk.chunksize, dtype=np.uint8) + # Exercise prefilter for each chunk + for nchunk in range(res_eval.schunk.nchunks): + res_eval.schunk.update_data(nchunk, data, copy=False) + except Exception: + use_miniexpr = False + finally: + res_eval.schunk.remove_prefilter("miniexpr") + global iter_chunks + # Ensure any background reading thread is closed + iter_chunks = None + + if not use_miniexpr: + # If miniexpr failed, fallback to regular evaluation + # (continue to the manual chunked evaluation below) + pass + else: + if getitem: + return res_eval[()] + return res_eval + chunk_operands = {} # Check which chunks intersect with _slice all_chunks = get_intersecting_chunks((), shape, chunks) # if _slice is (), returns all chunks @@ -1450,7 +1522,10 @@ def slices_eval( # noqa: C901 # Typically, we enter here when using UDFs, and out is a NumPy array. # Use operands to get the shape and chunks # operand will be a 'fake' NDArray just to get the necessary chunking information + fp_accuracy = kwargs.pop("fp_accuracy", None) temp = blosc2.empty(shape, dtype=dtype) + if fp_accuracy is not None: + kwargs["fp_accuracy"] = fp_accuracy chunks = temp.chunks del temp @@ -1535,7 +1610,10 @@ def slices_eval( # noqa: C901 if "chunks" in kwargs and (where is not None and len(where) < 2 and len(shape_) > 1): # Remove the chunks argument if the where condition is not a tuple with two elements kwargs.pop("chunks") + fp_accuracy = kwargs.pop("fp_accuracy", None) out = blosc2.empty(shape_, dtype=dtype_, **kwargs) + if fp_accuracy is not None: + kwargs["fp_accuracy"] = fp_accuracy # Check if the in out partitions are well-behaved (i.e. no padding) behaved = blosc2.are_partitions_behaved(out.shape, out.chunks, out.blocks) # Evaluate the expression using chunks of operands @@ -1744,6 +1822,8 @@ def infer_reduction_dtype(dtype, operation): dtype, np.float32 if dtype in (np.float32, np.complex64) else blosc2.DEFAULT_FLOAT ) if operation in {ReduceOp.SUM, ReduceOp.PROD}: + if np.issubdtype(dtype, np.bool_): + return np.int64 if np.issubdtype(dtype, np.unsignedinteger): return np.result_type(dtype, np.uint64) return np.result_type(dtype, np.int64 if np.issubdtype(dtype, np.integer) else my_float) @@ -1808,13 +1888,20 @@ def reduce_slices( # noqa: C901 :ref:`NDArray` or np.ndarray The resulting output array. """ + global try_miniexpr + + # Use a local copy so we don't modify the global + use_miniexpr = try_miniexpr # & False + out = kwargs.pop("_output", None) res_out_ = None # temporary required to store max/min for argmax/argmin ne_args: dict = kwargs.pop("_ne_args", {}) if ne_args is None: ne_args = {} + fp_accuracy = kwargs.pop("fp_accuracy", blosc2.FPAccuracy.DEFAULT) where: dict | None = kwargs.pop("_where_args", None) reduce_op = reduce_args.pop("op") + reduce_op_str = reduce_args.pop("op_str", None) axis = reduce_args["axis"] keepdims = reduce_args["keepdims"] dtype = reduce_args.get("dtype", None) @@ -1861,7 +1948,10 @@ def reduce_slices( # noqa: C901 # Note: we could have expr = blosc2.lazyexpr('numpy_array + 1') (i.e. no choice for chunks) blosc2_arrs = tuple(o for o in operands.values() if hasattr(o, "chunks")) fast_path = False + all_ndarray = False + any_persisted = False chunks = None + blocks = None if blosc2_arrs: # fast path only relevant if there are blosc2 arrays operand = max(blosc2_arrs, key=lambda x: len(x.shape)) @@ -1871,9 +1961,11 @@ def reduce_slices( # noqa: C901 same_chunks = all(operand.chunks == o.chunks for o in operands.values() if hasattr(o, "chunks")) same_blocks = all(operand.blocks == o.blocks for o in operands.values() if hasattr(o, "blocks")) fast_path = same_shape and same_chunks and same_blocks and (0 not in operand.chunks) - aligned, iter_disk = dict.fromkeys(operands.keys(), False), False + aligned = dict.fromkeys(operands.keys(), False) + iter_disk = False if fast_path: chunks = operand.chunks + blocks = operand.blocks # Check that all operands are NDArray for fast path all_ndarray = all( isinstance(value, blosc2.NDArray) and value.shape != () for value in operands.values() @@ -1907,6 +1999,85 @@ def reduce_slices( # noqa: C901 chunks = temp.chunks del temp + # miniexpr reduction path only supported for some cases so far + if not (fast_path and all_ndarray and reduced_shape == ()): + use_miniexpr = False + + # Some reductions are not supported yet in miniexpr + if reduce_op in (ReduceOp.ARGMAX, ReduceOp.ARGMIN): + use_miniexpr = False + + # Check whether we can use miniexpr + if use_miniexpr and isinstance(expression, str): + has_complex = any( + isinstance(op, blosc2.NDArray) and blosc2.isdtype(op.dtype, "complex floating") + for op in operands.values() + ) + if has_complex and any(tok in expression for tok in ("!=", "==", "<=", ">=", "<", ">")): + use_miniexpr = False + if where is not None and len(where) != 2: + use_miniexpr = False + + if use_miniexpr: + # Experiments say that not splitting is best (at least on Apple Silicon M4 Pro) + cparams = kwargs.pop("cparams", blosc2.CParams(splitmode=blosc2.SplitMode.NEVER_SPLIT)) + # Create a fake NDArray just to drive the miniexpr evaluation (values won't be used) + res_eval = blosc2.uninit(shape, dtype, chunks=chunks, blocks=blocks, cparams=cparams, **kwargs) + # Compute the number of blocks in the result + nblocks = res_eval.nbytes // res_eval.blocksize + # Initialize aux_reduc based on the reduction operation + # Padding blocks won't be written, so initial values matter for the final reduction + if reduce_op == ReduceOp.SUM or reduce_op == ReduceOp.ANY: + aux_reduc = np.zeros(nblocks, dtype=dtype) + elif reduce_op == ReduceOp.PROD or reduce_op == ReduceOp.ALL: + aux_reduc = np.ones(nblocks, dtype=dtype) + elif reduce_op == ReduceOp.MIN: + if np.issubdtype(dtype, np.integer): + aux_reduc = np.full(nblocks, np.iinfo(dtype).max, dtype=dtype) + else: + aux_reduc = np.full(nblocks, np.inf, dtype=dtype) + elif reduce_op == ReduceOp.MAX: + if np.issubdtype(dtype, np.integer): + aux_reduc = np.full(nblocks, np.iinfo(dtype).min, dtype=dtype) + else: + aux_reduc = np.full(nblocks, -np.inf, dtype=dtype) + else: + # For other operations, zeros should be safe + aux_reduc = np.zeros(nblocks, dtype=dtype) + try: + if where is not None: + expression_miniexpr = f"{reduce_op_str}(where({expression}, _where_x, _where_y))" + else: + expression_miniexpr = f"{reduce_op_str}({expression})" + res_eval._set_pref_expr(expression_miniexpr, operands, fp_accuracy, aux_reduc) + print("expr->miniexpr:", expression, reduce_op, fp_accuracy) + # Data won't even try to be compressed, so buffers can be unitialized and reused + data = np.empty(res_eval.schunk.chunksize, dtype=np.uint8) + chunk_data = np.empty(res_eval.schunk.chunksize + blosc2.MAX_OVERHEAD, dtype=np.uint8) + # Exercise prefilter for each chunk + for nchunk in range(res_eval.schunk.nchunks): + res_eval.schunk._prefilter_data(nchunk, data, chunk_data) + except Exception: + use_miniexpr = False + finally: + res_eval.schunk.remove_prefilter("miniexpr") + global iter_chunks + # Ensure any background reading thread is closed + iter_chunks = None + + if not use_miniexpr: + # If miniexpr failed, fallback to regular evaluation + # (continue to the manual chunked evaluation below) + pass + else: + if reduce_op == ReduceOp.ANY: + result = np.any(aux_reduc, **reduce_args) + elif reduce_op == ReduceOp.ALL: + result = np.all(aux_reduc, **reduce_args) + else: + result = reduce_op.value.reduce(aux_reduc, **reduce_args) + return result + # Iterate over the operands and get the chunks chunk_operands = {} # Check which chunks intersect with _slice @@ -2129,7 +2300,7 @@ def convert_none_out(dtype, reduce_op, reduced_shape): return out if isinstance(out, tuple) else (out, None) -def chunked_eval( # noqa: C901 +def chunked_eval( expression: str | Callable[[tuple, np.ndarray, tuple[int]], None], operands: dict, item=(), **kwargs ): """ @@ -2223,10 +2394,9 @@ def chunked_eval( # noqa: C901 return slices_eval(expression, operands, getitem=getitem, _slice=item, shape=shape, **kwargs) finally: - # Deactivate cache for NDField instances - for op in operands: - if isinstance(operands[op], blosc2.NDField): - operands[op].ndarr.keep_last_read = False + global iter_chunks + # Ensure any background reading thread is closed + iter_chunks = None def fuse_operands(operands1, operands2): @@ -2685,23 +2855,39 @@ def where(self, value1=None, value2=None): new_expr._dtype = dtype return new_expr - def sum(self, axis=None, dtype=None, keepdims=False, **kwargs): + def sum( + self, + axis=None, + dtype=None, + keepdims=False, + fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, + **kwargs, + ): reduce_args = { "op": ReduceOp.SUM, + "op_str": "sum", "axis": axis, "dtype": dtype, "keepdims": keepdims, } - return self.compute(_reduce_args=reduce_args, **kwargs) - - def prod(self, axis=None, dtype=None, keepdims=False, **kwargs): + return self.compute(_reduce_args=reduce_args, fp_accuracy=fp_accuracy, **kwargs) + + def prod( + self, + axis=None, + dtype=None, + keepdims=False, + fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, + **kwargs, + ): reduce_args = { "op": ReduceOp.PROD, + "op_str": "prod", "axis": axis, "dtype": dtype, "keepdims": keepdims, } - return self.compute(_reduce_args=reduce_args, **kwargs) + return self.compute(_reduce_args=reduce_args, fp_accuracy=fp_accuracy, **kwargs) def get_num_elements(self, axis, item): if hasattr(self, "_where_args") and len(self._where_args) == 1: @@ -2723,9 +2909,22 @@ def get_num_elements(self, axis, item): axis = tuple(a if a >= 0 else a + len(shape) for a in axis) # handle negative indexing return math.prod([shape[i] for i in axis]) - def mean(self, axis=None, dtype=None, keepdims=False, **kwargs): + def mean( + self, + axis=None, + dtype=None, + keepdims=False, + fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, + **kwargs, + ): item = kwargs.pop("item", ()) - total_sum = self.sum(axis=axis, dtype=dtype, keepdims=keepdims, item=item) + total_sum = self.sum( + axis=axis, + dtype=dtype, + keepdims=keepdims, + item=item, + fp_accuracy=fp_accuracy, + ) num_elements = self.get_num_elements(axis, item) if num_elements == 0: raise ValueError("mean of an empty array is not defined") @@ -2738,17 +2937,25 @@ def mean(self, axis=None, dtype=None, keepdims=False, **kwargs): out = blosc2.asarray(out, **kwargs) return out - def std(self, axis=None, dtype=None, keepdims=False, ddof=0, **kwargs): + def std( + self, + axis=None, + dtype=None, + keepdims=False, + ddof=0, + fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, + **kwargs, + ): item = kwargs.pop("item", ()) if item == (): # fast path - mean_value = self.mean(axis=axis, dtype=dtype, keepdims=True) + mean_value = self.mean(axis=axis, dtype=dtype, keepdims=True, fp_accuracy=fp_accuracy) expr = (self - mean_value) ** 2 else: - mean_value = self.mean(axis=axis, dtype=dtype, keepdims=True, item=item) + mean_value = self.mean(axis=axis, dtype=dtype, keepdims=True, item=item, fp_accuracy=fp_accuracy) # TODO: Not optimal because we load the whole slice in memory. Would have to write # a bespoke std function that executed within slice_eval to avoid this probably. expr = (self.slice(item) - mean_value) ** 2 - out = expr.mean(axis=axis, dtype=dtype, keepdims=keepdims) + out = expr.mean(axis=axis, dtype=dtype, keepdims=keepdims, fp_accuracy=fp_accuracy) if ddof != 0: num_elements = self.get_num_elements(axis, item) out = np.sqrt(out * num_elements / (num_elements - ddof)) @@ -2762,17 +2969,25 @@ def std(self, axis=None, dtype=None, keepdims=False, ddof=0, **kwargs): out = blosc2.asarray(out, **kwargs) return out - def var(self, axis=None, dtype=None, keepdims=False, ddof=0, **kwargs): + def var( + self, + axis=None, + dtype=None, + keepdims=False, + ddof=0, + fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, + **kwargs, + ): item = kwargs.pop("item", ()) if item == (): # fast path - mean_value = self.mean(axis=axis, dtype=dtype, keepdims=True) + mean_value = self.mean(axis=axis, dtype=dtype, keepdims=True, fp_accuracy=fp_accuracy) expr = (self - mean_value) ** 2 else: - mean_value = self.mean(axis=axis, dtype=dtype, keepdims=True, item=item) + mean_value = self.mean(axis=axis, dtype=dtype, keepdims=True, item=item, fp_accuracy=fp_accuracy) # TODO: Not optimal because we load the whole slice in memory. Would have to write # a bespoke var function that executed within slice_eval to avoid this probably. expr = (self.slice(item) - mean_value) ** 2 - out = expr.mean(axis=axis, dtype=dtype, keepdims=keepdims) + out = expr.mean(axis=axis, dtype=dtype, keepdims=keepdims, fp_accuracy=fp_accuracy) if ddof != 0: num_elements = self.get_num_elements(axis, item) out = out * num_elements / (num_elements - ddof) @@ -2784,53 +2999,93 @@ def var(self, axis=None, dtype=None, keepdims=False, ddof=0, **kwargs): out = blosc2.asarray(out, **kwargs) return out - def min(self, axis=None, keepdims=False, **kwargs): + def min( + self, + axis=None, + keepdims=False, + fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, + **kwargs, + ): reduce_args = { "op": ReduceOp.MIN, + "op_str": "min", "axis": axis, "keepdims": keepdims, } - return self.compute(_reduce_args=reduce_args, **kwargs) - - def max(self, axis=None, keepdims=False, **kwargs): + return self.compute(_reduce_args=reduce_args, fp_accuracy=fp_accuracy, **kwargs) + + def max( + self, + axis=None, + keepdims=False, + fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, + **kwargs, + ): reduce_args = { "op": ReduceOp.MAX, + "op_str": "max", "axis": axis, "keepdims": keepdims, } - return self.compute(_reduce_args=reduce_args, **kwargs) - - def any(self, axis=None, keepdims=False, **kwargs): + return self.compute(_reduce_args=reduce_args, fp_accuracy=fp_accuracy, **kwargs) + + def any( + self, + axis=None, + keepdims=False, + fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, + **kwargs, + ): reduce_args = { "op": ReduceOp.ANY, + "op_str": "any", "axis": axis, "keepdims": keepdims, } - return self.compute(_reduce_args=reduce_args, **kwargs) - - def all(self, axis=None, keepdims=False, **kwargs): + return self.compute(_reduce_args=reduce_args, fp_accuracy=fp_accuracy, **kwargs) + + def all( + self, + axis=None, + keepdims=False, + fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, + **kwargs, + ): reduce_args = { "op": ReduceOp.ALL, + "op_str": "all", "axis": axis, "keepdims": keepdims, } - return self.compute(_reduce_args=reduce_args, **kwargs) - - def argmax(self, axis=None, keepdims=False, **kwargs): + return self.compute(_reduce_args=reduce_args, fp_accuracy=fp_accuracy, **kwargs) + + def argmax( + self, + axis=None, + keepdims=False, + fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, + **kwargs, + ): reduce_args = { "op": ReduceOp.ARGMAX, "axis": axis, "keepdims": keepdims, } - return self.compute(_reduce_args=reduce_args, **kwargs) - - def argmin(self, axis=None, keepdims=False, **kwargs): + return self.compute(_reduce_args=reduce_args, fp_accuracy=fp_accuracy, **kwargs) + + def argmin( + self, + axis=None, + keepdims=False, + fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, + **kwargs, + ): reduce_args = { "op": ReduceOp.ARGMIN, "axis": axis, "keepdims": keepdims, } - return self.compute(_reduce_args=reduce_args, **kwargs) + return self.compute(_reduce_args=reduce_args, fp_accuracy=fp_accuracy, **kwargs) def _eval_constructor(self, expression, constructor, operands): """Evaluate a constructor function inside a string expression.""" @@ -2989,7 +3244,9 @@ def sort(self, order: str | list[str] | None = None) -> blosc2.LazyArray: lazy_expr._order = order return lazy_expr - def compute(self, item=(), **kwargs) -> blosc2.NDArray: + def compute( + self, item=(), fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, **kwargs + ) -> blosc2.NDArray: # When NumPy ufuncs are called, the user may add an `out` parameter to kwargs if "out" in kwargs: # use provided out preferentially kwargs["_output"] = kwargs.pop("out") @@ -3002,6 +3259,7 @@ def compute(self, item=(), **kwargs) -> blosc2.NDArray: kwargs["_ne_args"] = self._ne_args if hasattr(self, "_where_args"): kwargs["_where_args"] = self._where_args + kwargs.setdefault("fp_accuracy", fp_accuracy) kwargs["dtype"] = self.dtype kwargs["shape"] = self.shape if hasattr(self, "_indices"): @@ -3020,7 +3278,15 @@ def compute(self, item=(), **kwargs) -> blosc2.NDArray: and not isinstance(result, blosc2.NDArray) ): # Get rid of all the extra kwargs that are not accepted by blosc2.asarray - kwargs_not_accepted = {"_where_args", "_indices", "_order", "_ne_args", "dtype", "shape"} + kwargs_not_accepted = { + "_where_args", + "_indices", + "_order", + "_ne_args", + "dtype", + "shape", + "fp_accuracy", + } kwargs = {key: value for key, value in kwargs.items() if key not in kwargs_not_accepted} result = blosc2.asarray(result, **kwargs) return result @@ -3299,7 +3565,7 @@ def sort(self, order: str | list[str] | None = None) -> blosc2.LazyArray: lazy_expr._order = order return lazy_expr - def compute(self, item=(), **kwargs): + def compute(self, item=(), fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, **kwargs): # Get kwargs if kwargs is None: kwargs = {} @@ -3363,7 +3629,9 @@ def compute(self, item=(), **kwargs): # # Register a prefilter for eval # res_eval._set_pref_udf(self.func, id(self.inputs)) + # This line would NOT allocate physical RAM on any modern OS: # aux = np.empty(res_eval.shape, res_eval.dtype) + # Physical allocation happens here (when writing): # res_eval[...] = aux # res_eval.schunk.remove_prefilter(self.func.__name__) # res_eval.schunk.cparams.nthreads = self._cnthreads diff --git a/src/blosc2/ndarray.py b/src/blosc2/ndarray.py index 41367ea6..c6c0c10f 100644 --- a/src/blosc2/ndarray.py +++ b/src/blosc2/ndarray.py @@ -505,6 +505,9 @@ def sum( If set to True, the reduced axes are left in the result as dimensions with size one. With this option, the result will broadcast correctly against the input array. + fp_accuracy: :ref:`blosc2.FPAccuracy`, optional + Specifies the floating-point accuracy for reductions on :ref:`LazyExpr`. + Passed to :func:`LazyExpr.compute` when :paramref:`ndarr` is a LazyExpr. kwargs: dict, optional Additional keyword arguments supported by the :func:`empty` constructor. @@ -600,6 +603,9 @@ def std( If set to True, the reduced axes are left in the result as dimensions with size one. This ensures that the result will broadcast correctly against the input array. + fp_accuracy: :ref:`blosc2.FPAccuracy`, optional + Specifies the floating-point accuracy for reductions on :ref:`LazyExpr`. + Passed to :func:`LazyExpr.compute` when :paramref:`ndarr` is a LazyExpr. kwargs: dict, optional Additional keyword arguments that are supported by the :func:`empty` constructor. @@ -732,6 +738,9 @@ def min( If set to True, the axes which are reduced are left in the result as dimensions with size one. With this option, the result will broadcast correctly against the input array. + fp_accuracy: :ref:`blosc2.FPAccuracy`, optional + Specifies the floating-point accuracy for reductions on :ref:`LazyExpr`. + Passed to :func:`LazyExpr.compute` when :paramref:`ndarr` is a LazyExpr. kwargs: dict, optional Keyword arguments that are supported by the :func:`empty` constructor. @@ -863,6 +872,9 @@ def argmin( keepdims: bool If True, reduced axis included in the result as singleton dimension. Otherwise, axis not included in the result. Default: False. + fp_accuracy: :ref:`blosc2.FPAccuracy`, optional + Specifies the floating-point accuracy for reductions on :ref:`LazyExpr`. + Passed to :func:`LazyExpr.compute` when :paramref:`ndarr` is a LazyExpr. Returns ------- @@ -890,6 +902,9 @@ def argmax( keepdims: bool If True, reduced axis included in the result as singleton dimension. Otherwise, axis not included in the result. Default: False. + fp_accuracy: :ref:`blosc2.FPAccuracy`, optional + Specifies the floating-point accuracy for reductions on :ref:`LazyExpr`. + Passed to :func:`LazyExpr.compute` when :paramref:`ndarr` is a LazyExpr. Returns ------- @@ -3773,7 +3788,7 @@ def ndim(self) -> int: @property def size(self) -> int: - """The size (in bytes) for this container.""" + """The size (in elements) for this container.""" return super().size @property diff --git a/tests/conftest.py b/tests/conftest.py index 35768f59..e4a505dd 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,6 +9,7 @@ import sys import pytest +import requests import blosc2 @@ -36,15 +37,11 @@ def cat2_context(): yield c2params -# This is to avoid sporadic failures in the CI when reaching network, -# but this makes the tests to stuck in local. Perhaps move this to -# every test module that needs it? -# def pytest_runtest_call(item): -# try: -# item.runtest() -# except requests.ConnectTimeout: -# pytest.skip("Skipping test due to sporadic requests.ConnectTimeout") -# except requests.ReadTimeout: -# pytest.skip("Skipping test due to sporadic requests.ReadTimeout") -# except requests.Timeout: -# pytest.skip("Skipping test due to sporadic requests.Timeout") +def pytest_runtest_call(item): + # Skip network-marked tests on transient request failures to keep CI stable. + if item.get_closest_marker("network") is None: + return + try: + item.runtest() + except requests.exceptions.RequestException as exc: + pytest.skip(f"Skipping network test due to request failure: {exc}") diff --git a/tests/ndarray/test_elementwise_funcs.py b/tests/ndarray/test_elementwise_funcs.py index 0a03a4a2..fdce9239 100644 --- a/tests/ndarray/test_elementwise_funcs.py +++ b/tests/ndarray/test_elementwise_funcs.py @@ -3,7 +3,6 @@ import numpy as np import pytest -import torch import blosc2 @@ -311,9 +310,10 @@ def test_unary_funcs(np_func, blosc_func, dtype, shape, chunkshape): @pytest.mark.parametrize(("np_func", "blosc_func"), UNARY_FUNC_PAIRS) @pytest.mark.parametrize("dtype", STR_DTYPES) @pytest.mark.parametrize("shape", [(10,), (20, 20)]) -@pytest.mark.parametrize("xp", [torch]) -def test_unfuncs_proxy(np_func, blosc_func, dtype, shape, xp): - _test_unary_func_proxy(np_func, blosc_func, dtype, shape, xp) +def test_unary_funcs_torch_proxy(np_func, blosc_func, dtype, shape): + """Test unary functions with torch tensors as input (via proxy).""" + torch = pytest.importorskip("torch") + _test_unary_func_proxy(np_func, blosc_func, dtype, shape, torch) @pytest.mark.heavy @@ -334,9 +334,10 @@ def test_binary_funcs(np_func, blosc_func, dtype, shape, chunkshape): @pytest.mark.parametrize(("np_func", "blosc_func"), BINARY_FUNC_PAIRS) @pytest.mark.parametrize("dtype", STR_DTYPES) @pytest.mark.parametrize(("shape", "chunkshape"), SHAPES_CHUNKS) -@pytest.mark.parametrize("xp", [torch]) -def test_binfuncs_proxy(np_func, blosc_func, dtype, shape, chunkshape, xp): - _test_binary_func_proxy(np_func, blosc_func, dtype, shape, chunkshape, xp) +def test_binary_funcs_torch_proxy(np_func, blosc_func, dtype, shape, chunkshape): + """Test binary functions with torch tensors as input (via proxy).""" + torch = pytest.importorskip("torch") + _test_binary_func_proxy(np_func, blosc_func, dtype, shape, chunkshape, torch) @pytest.mark.heavy diff --git a/tests/ndarray/test_lazyexpr.py b/tests/ndarray/test_lazyexpr.py index 88136602..c92d53be 100644 --- a/tests/ndarray/test_lazyexpr.py +++ b/tests/ndarray/test_lazyexpr.py @@ -10,12 +10,20 @@ import numpy as np import pytest -import torch import blosc2 from blosc2.lazyexpr import ne_evaluate from blosc2.utils import get_chunks_idx, npvecdot +# Conditionally import torch for proxy tests +try: + import torch + + PROXY_TEST_XP = [torch, np] +except ImportError: + torch = None + PROXY_TEST_XP = [np] + NITEMS_SMALL = 100 NITEMS = 1000 @@ -172,7 +180,10 @@ def test_simple_expression(array_fixture): expr = a1 + a2 - a3 * a4 nres = ne_evaluate("na1 + na2 - na3 * na4") res = expr.compute(cparams=blosc2.CParams()) - np.testing.assert_allclose(res[:], nres) + if na1.dtype == np.float32: + np.testing.assert_allclose(res[:], nres, rtol=1e-6, atol=1e-6) + else: + np.testing.assert_allclose(res[:], nres) # Mix Proxy and NDArray operands @@ -197,10 +208,16 @@ def test_iXXX(array_fixture): expr **= 2.3 # __ipow__ res = expr.compute() if not blosc2.IS_WASM: - nres = ne_evaluate("(((((na1 ** 3 + na2 ** 2 + na3 ** 3 - na4 + 3) + 5) - 15) * 2) / 7) ** 2.3") + expr_str = "(((((na1 ** 3 + na2 ** 2 + na3 ** 3 - na4 + 3) + 5) - 15) * 2) / 7) ** 2.3" else: - nres = ne_evaluate("(((((na1 ** 3 + na2 ** 2 + na3 ** 3 - na4 + 3) + 5) - 15) * 2) / 7)") - np.testing.assert_allclose(res[:], nres) + expr_str = "(((((na1 ** 3 + na2 ** 2 + na3 ** 3 - na4 + 3) + 5) - 15) * 2) / 7)" + if na1.dtype == np.float32: + with np.errstate(invalid="ignore"): + nres = eval(expr_str, {"np": np}, {"na1": na1, "na2": na2, "na3": na3, "na4": na4}) + np.testing.assert_allclose(res[:], nres, rtol=1e-5, atol=1e-6) + else: + nres = ne_evaluate(expr_str) + np.testing.assert_allclose(res[:], nres) def test_complex_evaluate(array_fixture): @@ -209,7 +226,10 @@ def test_complex_evaluate(array_fixture): expr += 2 nres = ne_evaluate("tan(na1) * (sin(na2) * sin(na2) + cos(na3)) + (sqrt(na4) * 2) + 2") res = expr.compute() - np.testing.assert_allclose(res[:], nres) + if na1.dtype == np.float32: + np.testing.assert_allclose(res[:], nres, rtol=1e-5) + else: + np.testing.assert_allclose(res[:], nres) def test_complex_getitem(array_fixture): @@ -218,7 +238,10 @@ def test_complex_getitem(array_fixture): expr += 2 nres = ne_evaluate("tan(na1) * (sin(na2) * sin(na2) + cos(na3)) + (sqrt(na4) * 2) + 2") res = expr[:] - np.testing.assert_allclose(res, nres) + if na1.dtype == np.float32: + np.testing.assert_allclose(res[:], nres, rtol=1e-5) + else: + np.testing.assert_allclose(res[:], nres) def test_complex_getitem_slice(array_fixture): @@ -236,8 +259,11 @@ def test_func_expression(array_fixture): expr = (a1 + a2) * a3 - a4 expr = blosc2.sin(expr) + blosc2.cos(expr) nres = ne_evaluate("sin((na1 + na2) * na3 - na4) + cos((na1 + na2) * na3 - na4)") - res = expr.compute(storage={}) - np.testing.assert_allclose(res[:], nres) + res = expr.compute() + if na1.dtype == np.float32: + np.testing.assert_allclose(res[:], nres, rtol=1e-5) + else: + np.testing.assert_allclose(res[:], nres) def test_expression_with_constants(array_fixture): @@ -245,7 +271,28 @@ def test_expression_with_constants(array_fixture): # Test with operands with same chunks and blocks expr = a1 + 2 - a3 * 3.14 nres = ne_evaluate("na1 + 2 - na3 * 3.14") - np.testing.assert_allclose(expr[:], nres) + res = expr.compute() + if na1.dtype == np.float32: + np.testing.assert_allclose(res[:], nres, rtol=1e-5, atol=1e-6) + else: + np.testing.assert_allclose(res[:], nres) + + +@pytest.mark.parametrize("dtype", [np.float32, np.float64]) +@pytest.mark.parametrize("accuracy", [blosc2.FPAccuracy.MEDIUM, blosc2.FPAccuracy.HIGH]) +def test_fp_accuracy(accuracy, dtype): + a1 = blosc2.linspace(0, 10, NITEMS, dtype=dtype, chunks=(1000,), blocks=(500,)) + a2 = blosc2.linspace(0, 10, NITEMS, dtype=dtype, chunks=(1000,), blocks=(500,)) + a3 = blosc2.linspace(0, 10, NITEMS, dtype=dtype, chunks=(1000,), blocks=(500,)) + expr = blosc2.sin(a1) ** 2 - blosc2.cos(a2) ** 2 + blosc2.sqrt(a3) + res = expr.compute(fp_accuracy=accuracy) + na1 = a1[:] + na2 = a2[:] + na3 = a3[:] + nres = eval("np.sin(na1) ** 2 - np.cos(na2) ** 2 + np.sqrt(na3)") + # print("res dtypes:", res.dtype, nres.dtype) + tol = 1e-6 if a1.dtype == "float32" else 1e-15 + np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) @pytest.mark.parametrize("compare_expressions", [True, False]) @@ -312,9 +359,9 @@ def test_functions(function, dtype_fixture, shape_fixture): expr_string = f"{function}(na1)" res_numexpr = ne_evaluate(expr_string) # Compare the results - np.testing.assert_allclose(res_lazyexpr[:], res_numexpr) - np.testing.assert_allclose(expr.slice(slice(0, 10, 1)), res_numexpr[:10]) # slice test - np.testing.assert_allclose(expr[:10], res_numexpr[:10]) # getitem test + np.testing.assert_allclose(res_lazyexpr[:], res_numexpr, rtol=1e-5) + np.testing.assert_allclose(expr.slice(slice(0, 10, 1)), res_numexpr[:10], rtol=1e-5) # slice test + np.testing.assert_allclose(expr[:10], res_numexpr[:10], rtol=1e-5) # getitem test # For some reason real and imag are not supported by numpy's assert_allclose # (TypeError: bad operand type for abs(): 'LazyExpr' and segfaults are observed) @@ -324,7 +371,7 @@ def test_functions(function, dtype_fixture, shape_fixture): # Using numpy functions expr = eval(f"np.{function}(a1)", {"a1": a1, "np": np}) # Compare the results - np.testing.assert_allclose(expr[()], res_numexpr) + np.testing.assert_allclose(expr[()], res_numexpr, rtol=1e-5) # In combination with other operands na2 = np.linspace(0, 10, nelems, dtype=dtype_fixture).reshape(shape_fixture) @@ -338,7 +385,11 @@ def test_functions(function, dtype_fixture, shape_fixture): expr_string = f"na1 + {function}(na2)" res_numexpr = ne_evaluate(expr_string) # Compare the results - np.testing.assert_allclose(res_lazyexpr[:], res_numexpr) + if function == "tan": + # tan in miniexpr has not a lot of precision for values that are close to 0 + np.testing.assert_allclose(res_lazyexpr[:], res_numexpr, rtol=5e-4) + else: + np.testing.assert_allclose(res_lazyexpr[:], res_numexpr, rtol=1e-5) # Functions of the form np.function(a1 + a2) expr = eval(f"np.{function}(a1 + a2)", {"a1": a1, "a2": a2, "np": np}) @@ -346,7 +397,7 @@ def test_functions(function, dtype_fixture, shape_fixture): expr_string = f"{function}(na1 + na2)" res_numexpr = ne_evaluate(expr_string) # Compare the results - np.testing.assert_allclose(expr[()], res_numexpr) + np.testing.assert_allclose(expr[()], res_numexpr, rtol=1e-5) @pytest.mark.parametrize( @@ -669,17 +720,18 @@ def test_save_functions(function, dtype_fixture, shape_fixture): expr_string = f"{function}(na1)" res_numexpr = ne_evaluate(expr_string) # Compare the results - np.testing.assert_allclose(res_lazyexpr[:], res_numexpr) + rtol = 1e-6 if dtype_fixture == np.float32 else 1e-15 + np.testing.assert_allclose(res_lazyexpr[:], res_numexpr, rtol=rtol) expr_string = f"blosc2.{function}(a1)" expr = eval(expr_string, {"a1": a1, "blosc2": blosc2}) expr.save(urlpath=urlpath_save) res_lazyexpr = expr.compute() - np.testing.assert_allclose(res_lazyexpr[:], res_numexpr) + np.testing.assert_allclose(res_lazyexpr[:], res_numexpr, rtol=rtol) expr = blosc2.open(urlpath_save) res_lazyexpr = expr.compute() - np.testing.assert_allclose(res_lazyexpr[:], res_numexpr) + np.testing.assert_allclose(res_lazyexpr[:], res_numexpr, rtol=rtol) for urlpath in [urlpath_op, urlpath_save]: blosc2.remove_urlpath(urlpath) @@ -1843,7 +1895,7 @@ def test_lazyexpr_2args(): @pytest.mark.parametrize( "xp", - [torch, np], + PROXY_TEST_XP, ) @pytest.mark.parametrize( "dtype", diff --git a/tests/ndarray/test_lazyudf.py b/tests/ndarray/test_lazyudf.py index 29b44e39..4a66a19d 100644 --- a/tests/ndarray/test_lazyudf.py +++ b/tests/ndarray/test_lazyudf.py @@ -21,7 +21,13 @@ def udf1p(inputs_tuple, output, offset): if blosc2._HAS_NUMBA: import numba - @numba.jit(parallel=True) + # We should avoid parallel=True here because makes the complete test suite crash + # in test_save_ludf. I am not sure why, but it might be some interference with + # a previous test, leaving the threading state in a bad way. + # But all the examples and benchmarks seem to work with parallel=True. + # XXX Investigate more. + # @numba.jit(parallel=True) + @numba.jit(nopython=True) def udf1p_numba(inputs_tuple, output, offset): x = inputs_tuple[0] output[:] = x + 1 diff --git a/tests/ndarray/test_linalg.py b/tests/ndarray/test_linalg.py index 9c1a110d..33ff45d6 100644 --- a/tests/ndarray/test_linalg.py +++ b/tests/ndarray/test_linalg.py @@ -3,12 +3,20 @@ import numpy as np import pytest -import torch import blosc2 from blosc2.lazyexpr import linalg_funcs from blosc2.utils import npvecdot +# Conditionally import torch for proxy tests +try: + import torch + + PROXY_TEST_XP = [torch, np] +except ImportError: + torch = None + PROXY_TEST_XP = [np] + @pytest.mark.parametrize( ("ashape", "achunks", "ablocks"), @@ -826,7 +834,7 @@ def test_diagonal(shape, chunkshape, offset): @pytest.mark.parametrize( "xp", - [torch, np], + PROXY_TEST_XP, ) @pytest.mark.parametrize( "dtype", diff --git a/tests/ndarray/test_reductions.py b/tests/ndarray/test_reductions.py index e1bbfb22..d1031093 100644 --- a/tests/ndarray/test_reductions.py +++ b/tests/ndarray/test_reductions.py @@ -65,6 +65,45 @@ def test_reduce_bool(array_fixture, reduce_op): np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) +# @pytest.mark.parametrize("reduce_op", ["sum"]) +@pytest.mark.parametrize("reduce_op", ["sum", "prod", "min", "max", "any", "all", "argmax", "argmin"]) +def test_reduce_where(array_fixture, reduce_op): + a1, a2, a3, a4, na1, na2, na3, na4 = array_fixture + if reduce_op == "prod": + # To avoid overflow, create a1 and a2 with small values + na1 = np.linspace(0, 0.1, np.prod(a1.shape), dtype=np.float32).reshape(a1.shape) + a1 = blosc2.asarray(na1) + na2 = np.linspace(0, 0.5, np.prod(a1.shape), dtype=np.float32).reshape(a1.shape) + a2 = blosc2.asarray(na2) + expr = a1 + a2 - 0.2 + nres = eval("na1 + na2 - .2") + else: + expr = blosc2.where(a1 < a2, a2, a1) + nres = eval("np.where(na1 < na2, na2, na1)") + res = getattr(expr, reduce_op)() + nres = getattr(nres, reduce_op)() + # print("res:", res, nres, type(res), type(nres)) + tol = 1e-15 if a1.dtype == "float64" else 1e-6 + np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) + + +@pytest.mark.parametrize("dtype", [np.float32, np.float64]) +@pytest.mark.parametrize("accuracy", [blosc2.FPAccuracy.MEDIUM, blosc2.FPAccuracy.HIGH]) +def test_fp_accuracy(accuracy, dtype): + a1 = blosc2.linspace(0, 10, NITEMS, dtype=dtype, chunks=(1000,), blocks=(500,)) + a2 = blosc2.linspace(0, 10, NITEMS, dtype=dtype, chunks=(1000,), blocks=(500,)) + a3 = blosc2.linspace(0, 10, NITEMS, dtype=dtype, chunks=(1000,), blocks=(500,)) + expr = blosc2.sin(a1) ** 2 - blosc2.cos(a2) ** 2 + blosc2.sqrt(a3) + res = expr.sum(fp_accuracy=accuracy) + na1 = a1[:] + na2 = a2[:] + na3 = a3[:] + nres = eval("np.sin(na1) ** 2 - np.cos(na2) ** 2 + np.sqrt(na3)").sum() + # print("res:", res, nres, type(res), type(nres)) + tol = 1e-6 if a1.dtype == "float32" else 1e-15 + np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) + + @pytest.mark.parametrize( "reduce_op", ["sum", "prod", "mean", "std", "var", "min", "max", "any", "all", "argmax", "argmin"] ) @@ -571,4 +610,4 @@ def test_reduce_string(): d = blosc2.lazyexpr("sl + c.sum() + a.std()", operands={"a": a, "c": c, "sl": a.slice((1, 1))}) sum = d.compute()[()] npsum = npa[1, 1] + np.sum(npc) + np.std(npa) - assert np.allclose(sum, npsum) + np.testing.assert_allclose(sum, npsum) diff --git a/tests/ndarray/test_setitem.py b/tests/ndarray/test_setitem.py index a2145b90..71137779 100644 --- a/tests/ndarray/test_setitem.py +++ b/tests/ndarray/test_setitem.py @@ -8,7 +8,6 @@ import numpy as np import pytest -import torch import blosc2 @@ -45,14 +44,6 @@ def test_setitem(shape, chunks, blocks, slices, dtype): nparray[slices] = val np.testing.assert_almost_equal(a[...], nparray) - # Object called via SimpleProxy - slice_shape = a[slices].shape - dtype_ = {np.float32: torch.float32, np.int32: torch.int32, np.float64: torch.float64}[dtype] - val = torch.ones(slice_shape, dtype=dtype_) - a[slices] = val - nparray[slices] = val - np.testing.assert_almost_equal(a[...], nparray) - # blosc2.NDArray if np.prod(slice_shape) == 1 or len(slice_shape) != len(blocks): chunks = None @@ -64,6 +55,22 @@ def test_setitem(shape, chunks, blocks, slices, dtype): np.testing.assert_almost_equal(a[...], nparray) +@pytest.mark.parametrize(argnames, argvalues) +def test_setitem_torch_proxy(shape, chunks, blocks, slices, dtype): + torch = pytest.importorskip("torch") + size = int(np.prod(shape)) + nparray = np.arange(size, dtype=dtype).reshape(shape) + a = blosc2.frombuffer(bytes(nparray), nparray.shape, dtype=dtype, chunks=chunks, blocks=blocks) + + # Object called via SimpleProxy (torch tensor) + slice_shape = a[slices].shape + dtype_ = {np.float32: torch.float32, np.int32: torch.int32, np.float64: torch.float64}[dtype] + val = torch.ones(slice_shape, dtype=dtype_) + a[slices] = val + nparray[slices] = val + np.testing.assert_almost_equal(a[...], nparray) + + @pytest.mark.parametrize( ("shape", "slices"), [