diff --git a/src/ASDF.jl b/src/ASDF.jl index f2894b6..17d031c 100644 --- a/src/ASDF.jl +++ b/src/ASDF.jl @@ -2,7 +2,7 @@ module ASDF using ChunkCodecLibBlosc: BloscCodec, BloscEncodeOptions using ChunkCodecLibBzip2: BZ2Codec, BZ2EncodeOptions -using ChunkCodecLibLz4: LZ4FrameCodec, LZ4FrameEncodeOptions +using ChunkCodecLibLz4: LZ4BlockCodec, LZ4FrameCodec, LZ4BlockEncodeOptions, LZ4FrameEncodeOptions using ChunkCodecLibZlib: ZlibCodec, ZlibEncodeOptions using ChunkCodecLibZstd: ZstdCodec, ZstdEncodeOptions, decode, encode using CodecXz: XzCompressor, XzDecompressor @@ -53,6 +53,7 @@ struct BlockHeader used_size::UInt64 data_size::UInt64 checksum::AbstractVector{UInt8} # length 16 + validate_checksum::Bool end mutable struct LazyBlockHeaders @@ -90,6 +91,8 @@ big2native_U8(bytes::AbstractVector{UInt8}) = bytes[1] big2native_U16(bytes::AbstractVector{UInt8}) = (UInt16(bytes[1]) << 8) | bytes[2] big2native_U32(bytes::AbstractVector{UInt8}) = (UInt32(big2native_U16(@view bytes[1:2])) << 16) | big2native_U16(@view bytes[3:4]) big2native_U64(bytes::AbstractVector{UInt8}) = (UInt64(big2native_U32(@view bytes[1:4])) << 32) | big2native_U32(@view bytes[5:8]) +# Read a 4-byte little-endian UInt32 (used for lz4.block store_size prefix) +little2native_U32(bytes::AbstractVector{UInt8}) = (UInt32(bytes[1])) | (UInt32(bytes[2]) << 8) | (UInt32(bytes[3]) << 16) | (UInt32(bytes[4]) << 24) native2big_U8(val::UInt8) = UInt8[val] native2big_U16(val::UInt16) = UInt8[(val >>> 0x08) & 0xff, (val >>> 0x00) & 0xff] @@ -111,7 +114,7 @@ native2big_U16(val::Integer) = native2big_U16(UInt16(val)) native2big_U32(val::Integer) = native2big_U32(UInt32(val)) native2big_U64(val::Integer) = native2big_U64(UInt64(val)) -function read_block_header(io::IO, position::Int64) +function read_block_header(io::IO, position::Int64; validate_checksum::Bool) # Read block header max_header_size = 6 + 48 header = Array{UInt8}(undef, max_header_size) @@ -145,14 +148,14 @@ function read_block_header(io::IO, position::Int64) error("ASDF file header incorrectly specifies amount of space to use") end - return BlockHeader(io, position, token, header_size, flags, compression, allocated_size, used_size, data_size, checksum) + return BlockHeader(io, position, token, header_size, flags, compression, allocated_size, used_size, data_size, checksum, validate_checksum) end -function find_all_blocks(io::IO, pos::Int64=Int64(0)) +function find_all_blocks(io::IO, pos::Int64=Int64(0); validate_checksum::Bool) headers = BlockHeader[] pos = find_next_block(io, pos) while pos !== nothing - header = read_block_header(io, pos) + header = read_block_header(io, pos; validate_checksum) push!(headers, header) pos = Int64(header.position + 6 + header.header_size + header.allocated_size) pos = find_next_block(io, pos) @@ -170,7 +173,7 @@ function read_block(header::BlockHeader) end # Check checksum - if any(!iszero, header.checksum) + if header.validate_checksum && any(!iszero, header.checksum) actual_checksum = md5(data) if any(actual_checksum != header.checksum) error("Checksum mismatch in ASDF file header") @@ -184,13 +187,13 @@ function read_block(header::BlockHeader) # do nothing, the block is uncompressed elseif compression == C_Xz data = transcode(XzDecompressor, data) + elseif compression == C_Lz4 + data = decode_Lz4(data) else if compression == C_Blosc codec = BloscCodec() elseif compression == C_Bzip2 codec = BZ2Codec() - elseif compression == C_Lz4 - codec = LZ4FrameCodec() elseif compression == C_Zlib codec = ZlibCodec() elseif compression == C_Zstd @@ -209,6 +212,46 @@ function read_block(header::BlockHeader) return data end +function decode_Lz4(data) + if (# LZ4 Frame magic bytes 04 22 4D 18 + length(data) >= 4 && + data[1] == 0x04 && data[2] == 0x22 && + data[3] == 0x4D && data[4] == 0x18) + return decode(LZ4FrameCodec(), data) + else + # If the data was originally created from Python's ASDF, then it will be in block instead of frame layout, + # where each chunk is: + # + # [4 bytes, big-endian] compressed chunk size (the ASDF envelope) + # [4 bytes, little-endian] uncompressed chunk size (lz4.block store_size=True prefix) + # [N bytes] raw LZ4 block payload + # + # lz4.block.compress() defaults to store_size=True, which prepends the + # uncompressed size as a little-endian uint32. LZ4BlockCodec expects only + # the raw block, so both the outer BE envelope and the inner LE prefix must + # be stripped, with the LE value used as the uncompressed_size hint. + + out = UInt8[] + pos = 1 + + while pos <= length(data) + # Outer ASDF envelope: big-endian compressed chunk size + compressed_chunk_size = Int(big2native_U32(@view data[pos:pos+3])) + pos += 4 + # Inner lz4.block store_size=True prefix: little-endian uncompressed size + uncompressed_chunk_size = Int(little2native_U32(@view data[pos:pos+3])) + pos += 4 + # Raw LZ4 block payload (compressed_chunk_size includes the 4-byte LE prefix) + payload_len = compressed_chunk_size - 4 + payload = @view data[pos:pos+payload_len-1] + pos += payload_len + append!(out, decode(LZ4BlockCodec(), payload; max_size = uncompressed_chunk_size, size_hint = uncompressed_chunk_size)) + end + + return out + end +end + ################################################################################ """ @@ -561,11 +604,25 @@ end ################################################################################ -asdf_constructors = copy(YAML.default_yaml_constructors) -asdf_constructors["tag:stsci.edu:asdf/core/asdf-1.1.0"] = asdf_constructors["tag:yaml.org,2002:map"] -asdf_constructors["tag:stsci.edu:asdf/core/software-1.0.0"] = asdf_constructors["tag:yaml.org,2002:map"] +function load_file(filename::AbstractString; extensions = false, validate_checksum = true) + asdf_constructors = copy(YAML.default_yaml_constructors) + asdf_constructors["tag:stsci.edu:asdf/core/asdf-1.1.0"] = asdf_constructors["tag:yaml.org,2002:map"] + asdf_constructors["tag:stsci.edu:asdf/core/software-1.0.0"] = asdf_constructors["tag:yaml.org,2002:map"] + asdf_constructors["tag:stsci.edu:asdf/core/extension_metadata-1.0.0"] = asdf_constructors["tag:yaml.org,2002:map"] + + if extensions + # Use fallbacks for now + asdf_constructors[nothing] = (constructor, node) -> begin + if node isa YAML.MappingNode + return YAML.construct_mapping(constructor, node) + elseif node isa YAML.SequenceNode + return YAML.construct_sequence(constructor, node) + else + return YAML.construct_scalar(constructor, node) + end + end + end -function load_file(filename::AbstractString) io = open(filename, "r") lazy_block_headers = LazyBlockHeaders() construct_yaml_ndarray = make_construct_yaml_ndarray(lazy_block_headers) @@ -574,12 +631,13 @@ function load_file(filename::AbstractString) asdf_constructors′ = copy(asdf_constructors) asdf_constructors′["tag:stsci.edu:asdf/core/ndarray-1.0.0"] = construct_yaml_ndarray + asdf_constructors′["tag:stsci.edu:asdf/core/ndarray-1.1.0"] = construct_yaml_ndarray asdf_constructors′["tag:stsci.edu:asdf/core/ndarray-chunk-1.0.0"] = construct_yaml_ndarray_chunk asdf_constructors′["tag:stsci.edu:asdf/core/chunked-ndarray-1.0.0"] = construct_yaml_chunked_ndarray metadata = YAML.load(io, asdf_constructors′) # lazy_block_headers.block_headers = find_all_blocks(io, position(io)) - lazy_block_headers.block_headers = find_all_blocks(io) + lazy_block_headers.block_headers = find_all_blocks(io; validate_checksum) return ASDFFile(filename, metadata, lazy_block_headers) end @@ -603,9 +661,10 @@ struct NDArrayWrapper array::AbstractArray compression::Compression inline::Bool + lz4_layout::Symbol end -function NDArrayWrapper(array::AbstractArray; compression::Compression=C_Bzip2, inline::Bool=false) - return NDArrayWrapper(array, compression, inline) +function NDArrayWrapper(array::AbstractArray; compression::Compression=C_Bzip2, inline::Bool=false, lz4_layout::Symbol=:block) + return NDArrayWrapper(array, compression, inline, lz4_layout) end Base.getindex(val::NDArrayWrapper) = val.array @@ -655,6 +714,40 @@ function YAML._print(io::IO, val::NDArrayWrapper, level::Int=0, ignore_level::Bo YAML._print(io, ndarray, level, ignore_level) end +function encode_Lz4_block(input::AbstractVector{UInt8}; chunk_size::Int = 1024 * 1024 * 8) + out = UInt8[] + offset = 1 + while offset <= length(input) + chunk_end = min(offset + chunk_size - 1, length(input)) + chunk = @view input[offset:chunk_end] + + # Compress the raw chunk with LZ4 block codec + # LZ4BlockEncodeOptions does NOT prepend the uncompressed size, + # so we must prepend the LE uint32 ourselves to match Python's + # lz4.block.compress(store_size=True) behaviour. + compressed_payload = encode(LZ4BlockEncodeOptions(), chunk) + uncompressed_size = UInt32(length(chunk)) + compressed_chunk_size = UInt32(4 + length(compressed_payload)) # LE prefix + raw payload + + # Outer ASDF envelope: big-endian compressed chunk size (includes the 4-byte LE prefix) + append!(out, native2big_U32(compressed_chunk_size)) + + # Inner lz4.block store_size=True prefix: little-endian uncompressed size + append!(out, [ + (uncompressed_size >>> 0x00) & 0xff, + (uncompressed_size >>> 0x08) & 0xff, + (uncompressed_size >>> 0x10) & 0xff, + (uncompressed_size >>> 0x18) & 0xff, + ]) + + # Raw LZ4 block payload + append!(out, compressed_payload) + offset = chunk_end + 1 + end + + return out +end + function write_file(filename::AbstractString, document::Dict{Any,Any}) # Set up block descriptors global blocks @@ -723,12 +816,15 @@ function write_file(filename::AbstractString, document::Dict{Any,Any}) # TODO: Don't copy input input = input isa Vector ? input : Vector(input) data = transcode(XzCompressor, input) + elseif array.compression == C_Lz4 && array.lz4_layout == :block + data = encode_Lz4_block(input) + #data = encode(LZ4BlockEncodeOptions(), input) # Not compatible with Python asdf else if array.compression == C_Blosc encode_options = BloscEncodeOptions(; clevel=9, doshuffle=2, typesize=sizeof(eltype(array.array)), compressor="zstd") elseif array.compression == C_Bzip2 encode_options = BZ2EncodeOptions(; blockSize100k=9) - elseif array.compression == C_Lz4 + elseif array.compression == C_Lz4 && array.lz4_layout == :frame encode_options = LZ4FrameEncodeOptions(; compressionLevel=12, blockSizeID=7) elseif array.compression == C_Zlib encode_options = ZlibEncodeOptions(; level=9) diff --git a/test/runtests.jl b/test/runtests.jl index 145877a..28d5d15 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -62,7 +62,7 @@ const init_code = quote )) write(io, data_bytes) seekstart(io) - return ASDF.read_block_header(io, Int64(0)) + return ASDF.read_block_header(io, Int64(0); validate_checksum = true) end end diff --git a/test/test-blocks.jl b/test/test-blocks.jl index 3f529bf..8ae339e 100644 --- a/test/test-blocks.jl +++ b/test/test-blocks.jl @@ -1,6 +1,6 @@ function test_read_block_header(msg; kwargs...) io = make_raw_header(; kwargs...) |> IOBuffer - @test_throws msg ASDF.read_block_header(io, Int64(0)) + @test_throws msg ASDF.read_block_header(io, Int64(0); validate_checksum = true) end function test_read_block(msg; kwargs...) diff --git a/test/test-read_fallbacks.jl b/test/test-read_fallbacks.jl new file mode 100644 index 0000000..d30cbe7 --- /dev/null +++ b/test/test-read_fallbacks.jl @@ -0,0 +1,120 @@ +function write_asdf(dir, body) + path = joinpath(dir, "temp.asdf") + open(path, "w") do io + print(io, + """ + #ASDF 1.0.0 + #ASDF_STANDARD 1.2.0 + # This is an ASDF file + %YAML 1.1 + %TAG ! tag:stsci.edu:asdf/ + --- + !core/asdf-1.1.0 + $(body) + ... + """ + ) + end + return path +end + +load_tag(tag; kwargs...) = mktempdir() do dir + path = write_asdf(dir, tag) + af = ASDF.load_file(path; kwargs...) +end + +@testset "unknown mapping" begin + tag_unknown_mapping = """ + known_key: hello + custom_obj: ! + width: 42 + height: 7 + """ + + # The default `extensions = false` case + @test_throws Exception load_tag(tag_unknown_mapping; extensions = false) + @test_throws Exception load_tag(tag_unknown_sequence; extensions = false) + @test_throws Exception load_tag(tag_unknown_scalar; extensions = false) + + # Should fall back to an `AbstractDict` + af = load_tag(tag_unknown_mapping; extensions = true) + obj = af.metadata["custom_obj"] + @test obj isa AbstractDict + @test obj["width"] == 42 + @test obj["height"] == 7 + + # Known key still parsed as normal + @test af.metadata["known_key"] == "hello" + + # And loading the file multiple times does not mutate any shared/global state + af1 = load_tag(tag_unknown_mapping; extensions = true) + af2 = load_tag(tag_unknown_mapping; extensions = true) + @test af1.metadata["custom_obj"]["width"] == af2.metadata["custom_obj"]["width"] +end + +@testset "unknown sequence" begin + tag_unknown_sequence = """ + known_key: hello + custom_list: ! + - alpha + - beta + - gamma + """ + + # Should fall back to an `AbstractVector` + af = load_tag(tag_unknown_sequence; extensions = true) + list = af.metadata["custom_list"] + + @test list isa AbstractVector + @test length(list) == 3 + @test list[1] == "alpha" + @test list[2] == "beta" + @test list[3] == "gamma" + +end + +@testset "unknown scalar" begin + tag_unknown_scalar = """ + known_key: hello + custom_value: ! 3.14 + """ + + # Should fall back to an `AbstractString` + af = load_tag(tag_unknown_scalar; extensions = true) + value = af.metadata["custom_value"] + + @test value isa AbstractString + @test value == "3.14" +end + +@testset "unknown all" begin + tag_unknown_all = """ + known_key: hello + mapping_node: ! + width: 42 + height: 7 + sequence_node: ! + - alpha + - beta + scalar_node: ! 3.14 + """ + + # Fallbacks should also work if all unknowns are present in the same file. + af = load_tag(tag_unknown_all; extensions = true) + md = af.metadata + + # Mapping branch + @test md["mapping_node"] isa AbstractDict + @test md["mapping_node"]["width"] == 42 + + # Sequence branch + @test md["sequence_node"] isa AbstractVector + @test md["sequence_node"][1] == "alpha" + + # Scalar branch + @test md["scalar_node"] isa AbstractString + @test md["scalar_node"] == "3.14" + + # Known key unaffected + @test md["known_key"] == "hello" +end diff --git a/test/test-write.jl b/test/test-write.jl index 57d935a..4960107 100644 --- a/test/test-write.jl +++ b/test/test-write.jl @@ -10,10 +10,11 @@ "element1" => ASDF.NDArrayWrapper(array; compression=ASDF.C_None), "element2" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Blosc), "element3" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Bzip2), - "element4" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Lz4), - "element5" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Xz), - "element6" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Zlib), - "element7" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Zstd), + "element4" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Lz4, lz4_layout=:block), + "element5" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Lz4, lz4_layout=:frame), + "element6" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Xz), + "element7" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Zlib), + "element8" => ASDF.NDArrayWrapper(array; compression=ASDF.C_Zstd), ), ) ASDF.write_file(filename, doc) @@ -33,7 +34,7 @@ @test size(data2′) == size(data2) @test data2′ == data2 - for n in 1:7 + for n in 1:8 element = doc["group"]["element$n"][] element′ = doc′.metadata["group"]["element$n"][] @test eltype(element′) == eltype(element)