Source code for numcodecs.checksum32
import struct
import zlib
from collections.abc import Callable
from contextlib import suppress
from types import ModuleType
from typing import TYPE_CHECKING, Literal, Optional
import numpy as np
from .abc import Codec
from .compat import ensure_contiguous_ndarray, ndarray_copy
from .jenkins import jenkins_lookup3
_crc32c: Optional[ModuleType] = None
with suppress(ImportError):
import crc32c as _crc32c # type: ignore[no-redef]
if TYPE_CHECKING: # pragma: no cover
from typing_extensions import Buffer
CHECKSUM_LOCATION = Literal['start', 'end']
class Checksum32(Codec):
# override in sub-class
checksum: Callable[["Buffer", int], int] | None = None
location: CHECKSUM_LOCATION = 'start'
def __init__(self, location: CHECKSUM_LOCATION | None = None):
if location is not None:
self.location = location
if self.location not in ['start', 'end']:
raise ValueError(f"Invalid checksum location: {self.location}")
def encode(self, buf):
arr = ensure_contiguous_ndarray(buf).view('u1')
checksum = self.checksum(arr) & 0xFFFFFFFF
enc = np.empty(arr.nbytes + 4, dtype='u1')
if self.location == 'start':
checksum_view = enc[:4]
payload_view = enc[4:]
else:
checksum_view = enc[-4:]
payload_view = enc[:-4]
checksum_view.view('<u4')[0] = checksum
ndarray_copy(arr, payload_view)
return enc
def decode(self, buf, out=None):
if len(buf) < 4:
raise ValueError("Input buffer is too short to contain a 32-bit checksum.")
if out is not None:
ensure_contiguous_ndarray(out) # check that out is a valid ndarray
arr = ensure_contiguous_ndarray(buf).view('u1')
if self.location == 'start':
checksum_view = arr[:4]
payload_view = arr[4:]
else:
checksum_view = arr[-4:]
payload_view = arr[:-4]
expect = checksum_view.view('<u4')[0]
checksum = self.checksum(payload_view) & 0xFFFFFFFF
if expect != checksum:
raise RuntimeError(
f"Stored and computed {self.codec_id} checksum do not match. Stored: {expect}. Computed: {checksum}."
)
return ndarray_copy(payload_view, out)
[docs]
class CRC32(Checksum32):
"""Codec add a crc32 checksum to the buffer.
Parameters
----------
location : 'start' or 'end'
Where to place the checksum in the buffer.
"""
codec_id = 'crc32'
checksum = zlib.crc32
location = 'start'
[docs]
class Adler32(Checksum32):
"""Codec add a adler32 checksum to the buffer.
Parameters
----------
location : 'start' or 'end'
Where to place the checksum in the buffer.
"""
codec_id = 'adler32'
checksum = zlib.adler32
location = 'start'
[docs]
class JenkinsLookup3(Checksum32):
"""Bob Jenkin's lookup3 checksum with 32-bit output
This is the HDF5 implementation.
https://github.com/HDFGroup/hdf5/blob/577c192518598c7e2945683655feffcdbdf5a91b/src/H5checksum.c#L378-L472
With this codec, the checksum is concatenated on the end of the data
bytes when encoded. At decode time, the checksum is performed on
the data portion and compared with the four-byte checksum, raising
RuntimeError if inconsistent.
Parameters
----------
initval : int
initial seed passed to the hash algorithm, default: 0
prefix : int
bytes prepended to the buffer before evaluating the hash, default: None
"""
checksum = jenkins_lookup3
codec_id = "jenkins_lookup3"
def __init__(self, initval: int = 0, prefix=None):
self.initval = initval
if prefix is None:
self.prefix = None
else:
self.prefix = np.frombuffer(prefix, dtype='uint8')
[docs]
def encode(self, buf):
"""Return buffer plus 4-byte Bob Jenkin's lookup3 checksum"""
buf = ensure_contiguous_ndarray(buf).ravel().view('uint8')
if self.prefix is None:
val = jenkins_lookup3(buf, self.initval)
else:
val = jenkins_lookup3(np.hstack((self.prefix, buf)), self.initval)
return buf.tobytes() + struct.pack("<I", val)
[docs]
def decode(self, buf, out=None):
"""Check Bob Jenkin's lookup3 checksum, and return buffer without it"""
b = ensure_contiguous_ndarray(buf).view('uint8')
if self.prefix is None:
val = jenkins_lookup3(b[:-4], self.initval)
else:
val = jenkins_lookup3(np.hstack((self.prefix, b[:-4])), self.initval)
found = b[-4:].view("<u4")[0]
if val != found:
raise RuntimeError(
f"The Bob Jenkin's lookup3 checksum of the data ({val}) did not"
f" match the expected checksum ({found}).\n"
"This could be a sign that the data has been corrupted."
)
if out is not None:
out.view("uint8")[:] = b[:-4]
return out
return memoryview(b[:-4])
if _crc32c:
[docs]
class CRC32C(Checksum32):
"""Codec add a crc32c checksum to the buffer.
Parameters
----------
location : 'start' or 'end'
Where to place the checksum in the buffer.
"""
codec_id = 'crc32c'
checksum = _crc32c.crc32c # type: ignore[union-attr]
location = 'end'