Skip to content
128 changes: 112 additions & 16 deletions src/ASDF.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module ASDF

using ChunkCodecLibBlosc: BloscCodec, BloscEncodeOptions
using ChunkCodecLibBzip2: BZ2Codec, BZ2EncodeOptions
using ChunkCodecLibLz4: LZ4FrameCodec, LZ4FrameEncodeOptions
using ChunkCodecLibLz4: LZ4BlockCodec, LZ4FrameCodec, LZ4BlockEncodeOptions, LZ4FrameEncodeOptions
using ChunkCodecLibZlib: ZlibCodec, ZlibEncodeOptions
using ChunkCodecLibZstd: ZstdCodec, ZstdEncodeOptions, decode, encode
using CodecXz: XzCompressor, XzDecompressor
Expand Down Expand Up @@ -53,6 +53,7 @@ struct BlockHeader
used_size::UInt64
data_size::UInt64
checksum::AbstractVector{UInt8} # length 16
validate_checksum::Bool
end

mutable struct LazyBlockHeaders
Expand Down Expand Up @@ -90,6 +91,8 @@ big2native_U8(bytes::AbstractVector{UInt8}) = bytes[1]
big2native_U16(bytes::AbstractVector{UInt8}) = (UInt16(bytes[1]) << 8) | bytes[2]
big2native_U32(bytes::AbstractVector{UInt8}) = (UInt32(big2native_U16(@view bytes[1:2])) << 16) | big2native_U16(@view bytes[3:4])
big2native_U64(bytes::AbstractVector{UInt8}) = (UInt64(big2native_U32(@view bytes[1:4])) << 32) | big2native_U32(@view bytes[5:8])
# Read a 4-byte little-endian UInt32 (used for lz4.block store_size prefix)
little2native_U32(bytes::AbstractVector{UInt8}) = (UInt32(bytes[1])) | (UInt32(bytes[2]) << 8) | (UInt32(bytes[3]) << 16) | (UInt32(bytes[4]) << 24)

native2big_U8(val::UInt8) = UInt8[val]
native2big_U16(val::UInt16) = UInt8[(val >>> 0x08) & 0xff, (val >>> 0x00) & 0xff]
Expand All @@ -111,7 +114,7 @@ native2big_U16(val::Integer) = native2big_U16(UInt16(val))
native2big_U32(val::Integer) = native2big_U32(UInt32(val))
native2big_U64(val::Integer) = native2big_U64(UInt64(val))

function read_block_header(io::IO, position::Int64)
function read_block_header(io::IO, position::Int64; validate_checksum::Bool)
# Read block header
max_header_size = 6 + 48
header = Array{UInt8}(undef, max_header_size)
Expand Down Expand Up @@ -145,14 +148,14 @@ function read_block_header(io::IO, position::Int64)
error("ASDF file header incorrectly specifies amount of space to use")
end

return BlockHeader(io, position, token, header_size, flags, compression, allocated_size, used_size, data_size, checksum)
return BlockHeader(io, position, token, header_size, flags, compression, allocated_size, used_size, data_size, checksum, validate_checksum)
end

function find_all_blocks(io::IO, pos::Int64=Int64(0))
function find_all_blocks(io::IO, pos::Int64=Int64(0); validate_checksum::Bool)
headers = BlockHeader[]
pos = find_next_block(io, pos)
while pos !== nothing
header = read_block_header(io, pos)
header = read_block_header(io, pos; validate_checksum)
push!(headers, header)
pos = Int64(header.position + 6 + header.header_size + header.allocated_size)
pos = find_next_block(io, pos)
Expand All @@ -170,7 +173,7 @@ function read_block(header::BlockHeader)
end

# Check checksum
if any(!iszero, header.checksum)
if header.validate_checksum && any(!iszero, header.checksum)
actual_checksum = md5(data)
if any(actual_checksum != header.checksum)
error("Checksum mismatch in ASDF file header")
Expand All @@ -184,13 +187,13 @@ function read_block(header::BlockHeader)
# do nothing, the block is uncompressed
elseif compression == C_Xz
data = transcode(XzDecompressor, data)
elseif compression == C_Lz4
data = decode_Lz4(data)
else
if compression == C_Blosc
codec = BloscCodec()
elseif compression == C_Bzip2
codec = BZ2Codec()
elseif compression == C_Lz4
codec = LZ4FrameCodec()
elseif compression == C_Zlib
codec = ZlibCodec()
elseif compression == C_Zstd
Expand All @@ -209,6 +212,46 @@ function read_block(header::BlockHeader)
return data
end

function decode_Lz4(data)
if (# LZ4 Frame magic bytes 04 22 4D 18
length(data) >= 4 &&
data[1] == 0x04 && data[2] == 0x22 &&
data[3] == 0x4D && data[4] == 0x18)
return decode(LZ4FrameCodec(), data)
else
# If the data was originally created from Python's ASDF, then it will be in block instead of frame layout,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also produce the block layout? Should we? Can Python handle the frame layout?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminding me, will add that in. Looks like they can, but as a plug-in https://github.com/asdf-format/asdf-compression

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in e012b21

There is a layout option for NDArrayWrapper now to flip between frame and block. Does that seem like a reasonable place to control things? I guess it doesn't really make sense for other compression schemes, so I just set the default layout as default

I don't love from a maintainability point of view that there is now a hand-rolled Lz4-specific encode and decode path to accommodate Python's asdf scheme. There's also the matter of compatibility with Lz4 frame support on the Python side, but since that's still an experimental plugin, maybe that can be a problem for future us to deal with.

I am now squarely out of my comfort zone and would gladly accept any suggestions for simplifying things, haha. Thanks again for taking a look!

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is only for lz4 then I would call it lz4_layout, with values :frame and :block. I am sure that other compression schemes will also want to have options in the future, e.g. specifying the compression level.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, just renamed here e5b6bd4

# where each chunk is:
#
# [4 bytes, big-endian] compressed chunk size (the ASDF envelope)
# [4 bytes, little-endian] uncompressed chunk size (lz4.block store_size=True prefix)
# [N bytes] raw LZ4 block payload
#
# lz4.block.compress() defaults to store_size=True, which prepends the
# uncompressed size as a little-endian uint32. LZ4BlockCodec expects only
# the raw block, so both the outer BE envelope and the inner LE prefix must
# be stripped, with the LE value used as the uncompressed_size hint.

out = UInt8[]
pos = 1

while pos <= length(data)
# Outer ASDF envelope: big-endian compressed chunk size
compressed_chunk_size = Int(big2native_U32(@view data[pos:pos+3]))
pos += 4
# Inner lz4.block store_size=True prefix: little-endian uncompressed size
uncompressed_chunk_size = Int(little2native_U32(@view data[pos:pos+3]))
pos += 4
# Raw LZ4 block payload (compressed_chunk_size includes the 4-byte LE prefix)
payload_len = compressed_chunk_size - 4
payload = @view data[pos:pos+payload_len-1]
pos += payload_len
append!(out, decode(LZ4BlockCodec(), payload; max_size = uncompressed_chunk_size, size_hint = uncompressed_chunk_size))
end

return out
end
end

################################################################################

"""
Expand Down Expand Up @@ -561,11 +604,25 @@ end

################################################################################

asdf_constructors = copy(YAML.default_yaml_constructors)
asdf_constructors["tag:stsci.edu:asdf/core/asdf-1.1.0"] = asdf_constructors["tag:yaml.org,2002:map"]
asdf_constructors["tag:stsci.edu:asdf/core/software-1.0.0"] = asdf_constructors["tag:yaml.org,2002:map"]
function load_file(filename::AbstractString; extensions = false, validate_checksum = true)
asdf_constructors = copy(YAML.default_yaml_constructors)
asdf_constructors["tag:stsci.edu:asdf/core/asdf-1.1.0"] = asdf_constructors["tag:yaml.org,2002:map"]
asdf_constructors["tag:stsci.edu:asdf/core/software-1.0.0"] = asdf_constructors["tag:yaml.org,2002:map"]
asdf_constructors["tag:stsci.edu:asdf/core/extension_metadata-1.0.0"] = asdf_constructors["tag:yaml.org,2002:map"]

if extensions
# Use fallbacks for now
asdf_constructors[nothing] = (constructor, node) -> begin
if node isa YAML.MappingNode
return YAML.construct_mapping(constructor, node)
elseif node isa YAML.SequenceNode
return YAML.construct_sequence(constructor, node)
else
return YAML.construct_scalar(constructor, node)
end
end
end

function load_file(filename::AbstractString)
io = open(filename, "r")
lazy_block_headers = LazyBlockHeaders()
construct_yaml_ndarray = make_construct_yaml_ndarray(lazy_block_headers)
Expand All @@ -574,12 +631,13 @@ function load_file(filename::AbstractString)

asdf_constructors′ = copy(asdf_constructors)
asdf_constructors′["tag:stsci.edu:asdf/core/ndarray-1.0.0"] = construct_yaml_ndarray
asdf_constructors′["tag:stsci.edu:asdf/core/ndarray-1.1.0"] = construct_yaml_ndarray
asdf_constructors′["tag:stsci.edu:asdf/core/ndarray-chunk-1.0.0"] = construct_yaml_ndarray_chunk
asdf_constructors′["tag:stsci.edu:asdf/core/chunked-ndarray-1.0.0"] = construct_yaml_chunked_ndarray

metadata = YAML.load(io, asdf_constructors′)
# lazy_block_headers.block_headers = find_all_blocks(io, position(io))
lazy_block_headers.block_headers = find_all_blocks(io)
lazy_block_headers.block_headers = find_all_blocks(io; validate_checksum)
return ASDFFile(filename, metadata, lazy_block_headers)
end

Expand All @@ -603,9 +661,10 @@ struct NDArrayWrapper
array::AbstractArray
compression::Compression
inline::Bool
lz4_layout::Symbol
end
function NDArrayWrapper(array::AbstractArray; compression::Compression=C_Bzip2, inline::Bool=false)
return NDArrayWrapper(array, compression, inline)
function NDArrayWrapper(array::AbstractArray; compression::Compression=C_Bzip2, inline::Bool=false, lz4_layout::Symbol=:block)
return NDArrayWrapper(array, compression, inline, lz4_layout)
end
Base.getindex(val::NDArrayWrapper) = val.array

Expand Down Expand Up @@ -655,6 +714,40 @@ function YAML._print(io::IO, val::NDArrayWrapper, level::Int=0, ignore_level::Bo
YAML._print(io, ndarray, level, ignore_level)
end

function encode_Lz4_block(input::AbstractVector{UInt8}; chunk_size::Int = 1024 * 1024 * 8)
out = UInt8[]
offset = 1
while offset <= length(input)
chunk_end = min(offset + chunk_size - 1, length(input))
chunk = @view input[offset:chunk_end]

# Compress the raw chunk with LZ4 block codec
# LZ4BlockEncodeOptions does NOT prepend the uncompressed size,
# so we must prepend the LE uint32 ourselves to match Python's
# lz4.block.compress(store_size=True) behaviour.
compressed_payload = encode(LZ4BlockEncodeOptions(), chunk)
uncompressed_size = UInt32(length(chunk))
compressed_chunk_size = UInt32(4 + length(compressed_payload)) # LE prefix + raw payload

# Outer ASDF envelope: big-endian compressed chunk size (includes the 4-byte LE prefix)
append!(out, native2big_U32(compressed_chunk_size))

# Inner lz4.block store_size=True prefix: little-endian uncompressed size
append!(out, [
(uncompressed_size >>> 0x00) & 0xff,
(uncompressed_size >>> 0x08) & 0xff,
(uncompressed_size >>> 0x10) & 0xff,
(uncompressed_size >>> 0x18) & 0xff,
])

# Raw LZ4 block payload
append!(out, compressed_payload)
offset = chunk_end + 1
end

return out
end

function write_file(filename::AbstractString, document::Dict{Any,Any})
# Set up block descriptors
global blocks
Expand Down Expand Up @@ -723,12 +816,15 @@ function write_file(filename::AbstractString, document::Dict{Any,Any})
# TODO: Don't copy input
input = input isa Vector ? input : Vector(input)
data = transcode(XzCompressor, input)
elseif array.compression == C_Lz4 && array.lz4_layout == :block
data = encode_Lz4_block(input)
#data = encode(LZ4BlockEncodeOptions(), input) # Not compatible with Python asdf
else
if array.compression == C_Blosc
encode_options = BloscEncodeOptions(; clevel=9, doshuffle=2, typesize=sizeof(eltype(array.array)), compressor="zstd")
elseif array.compression == C_Bzip2
encode_options = BZ2EncodeOptions(; blockSize100k=9)
elseif array.compression == C_Lz4
elseif array.compression == C_Lz4 && array.lz4_layout == :frame
encode_options = LZ4FrameEncodeOptions(; compressionLevel=12, blockSizeID=7)
elseif array.compression == C_Zlib
encode_options = ZlibEncodeOptions(; level=9)
Expand Down
2 changes: 1 addition & 1 deletion test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ const init_code = quote
))
write(io, data_bytes)
seekstart(io)
return ASDF.read_block_header(io, Int64(0))
return ASDF.read_block_header(io, Int64(0); validate_checksum = true)
end
end

Expand Down
2 changes: 1 addition & 1 deletion test/test-blocks.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
function test_read_block_header(msg; kwargs...)
io = make_raw_header(; kwargs...) |> IOBuffer
@test_throws msg ASDF.read_block_header(io, Int64(0))
@test_throws msg ASDF.read_block_header(io, Int64(0); validate_checksum = true)
end

function test_read_block(msg; kwargs...)
Expand Down
120 changes: 120 additions & 0 deletions test/test-read_fallbacks.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
function write_asdf(dir, body)
path = joinpath(dir, "temp.asdf")
open(path, "w") do io
print(io,
"""
#ASDF 1.0.0
#ASDF_STANDARD 1.2.0
# This is an ASDF file <https://asdf-standard.readthedocs.io/>
%YAML 1.1
%TAG ! tag:stsci.edu:asdf/
---
!core/asdf-1.1.0
$(body)
...
"""
)
end
return path
end

load_tag(tag; kwargs...) = mktempdir() do dir
path = write_asdf(dir, tag)
af = ASDF.load_file(path; kwargs...)
end

@testset "unknown mapping" begin
tag_unknown_mapping = """
known_key: hello
custom_obj: !<tag:example.org:mylib/widget-1.0.0>
width: 42
height: 7
"""

# The default `extensions = false` case
@test_throws Exception load_tag(tag_unknown_mapping; extensions = false)
@test_throws Exception load_tag(tag_unknown_sequence; extensions = false)
@test_throws Exception load_tag(tag_unknown_scalar; extensions = false)

# Should fall back to an `AbstractDict`
af = load_tag(tag_unknown_mapping; extensions = true)
obj = af.metadata["custom_obj"]
@test obj isa AbstractDict
@test obj["width"] == 42
@test obj["height"] == 7

# Known key still parsed as normal
@test af.metadata["known_key"] == "hello"

# And loading the file multiple times does not mutate any shared/global state
af1 = load_tag(tag_unknown_mapping; extensions = true)
af2 = load_tag(tag_unknown_mapping; extensions = true)
@test af1.metadata["custom_obj"]["width"] == af2.metadata["custom_obj"]["width"]
end

@testset "unknown sequence" begin
tag_unknown_sequence = """
known_key: hello
custom_list: !<tag:example.org:mylib/series-1.0.0>
- alpha
- beta
- gamma
"""

# Should fall back to an `AbstractVector`
af = load_tag(tag_unknown_sequence; extensions = true)
list = af.metadata["custom_list"]

@test list isa AbstractVector
@test length(list) == 3
@test list[1] == "alpha"
@test list[2] == "beta"
@test list[3] == "gamma"

end

@testset "unknown scalar" begin
tag_unknown_scalar = """
known_key: hello
custom_value: !<tag:example.org:mylib/quantity-1.0.0> 3.14
"""

# Should fall back to an `AbstractString`
af = load_tag(tag_unknown_scalar; extensions = true)
value = af.metadata["custom_value"]

@test value isa AbstractString
@test value == "3.14"
end

@testset "unknown all" begin
tag_unknown_all = """
known_key: hello
mapping_node: !<tag:example.org:mylib/widget-1.0.0>
width: 42
height: 7
sequence_node: !<tag:example.org:mylib/series-1.0.0>
- alpha
- beta
scalar_node: !<tag:example.org:mylib/quantity-1.0.0> 3.14
"""

# Fallbacks should also work if all unknowns are present in the same file.
af = load_tag(tag_unknown_all; extensions = true)
md = af.metadata

# Mapping branch
@test md["mapping_node"] isa AbstractDict
@test md["mapping_node"]["width"] == 42

# Sequence branch
@test md["sequence_node"] isa AbstractVector
@test md["sequence_node"][1] == "alpha"

# Scalar branch
@test md["scalar_node"] isa AbstractString
@test md["scalar_node"] == "3.14"

# Known key unaffected
@test md["known_key"] == "hello"
end
Loading
Loading