Source code for numcodecs.checksum32

import zlib


import numpy as np
import struct


from .abc import Codec
from .compat import ensure_contiguous_ndarray, ndarray_copy
from .jenkins import jenkins_lookup3


class Checksum32(Codec):

    # override in sub-class
    checksum = None

    def encode(self, buf):
        arr = ensure_contiguous_ndarray(buf).view('u1')
        checksum = self.checksum(arr) & 0xffffffff
        enc = np.empty(arr.nbytes + 4, dtype='u1')
        enc[:4].view('<u4')[0] = checksum
        ndarray_copy(arr, enc[4:])
        return enc

    def decode(self, buf, out=None):
        arr = ensure_contiguous_ndarray(buf).view('u1')
        expect = arr[:4].view('<u4')[0]
        checksum = self.checksum(arr[4:]) & 0xffffffff
        if expect != checksum:
            raise RuntimeError('checksum failed')
        return ndarray_copy(arr[4:], out)


[docs]class CRC32(Checksum32): codec_id = 'crc32' checksum = zlib.crc32
[docs]class Adler32(Checksum32): codec_id = 'adler32' checksum = zlib.adler32
[docs]class JenkinsLookup3(Checksum32): """Bob Jenkin's lookup3 checksum with 32-bit output This is the HDF5 implementation. https://github.com/HDFGroup/hdf5/blob/577c192518598c7e2945683655feffcdbdf5a91b/src/H5checksum.c#L378-L472 With this codec, the checksum is concatenated on the end of the data bytes when encoded. At decode time, the checksum is performed on the data portion and compared with the four-byte checksum, raising RuntimeError if inconsistent. Attributes: initval: initial seed passed to the hash algorithm, default: 0 prefix: bytes prepended to the buffer before evaluating the hash, default: None """ checksum = jenkins_lookup3 codec_id = "jenkins_lookup3" def __init__(self, initval: int = 0, prefix=None): self.initval = initval if prefix is None: self.prefix = None else: self.prefix = np.frombuffer(prefix, dtype='uint8')
[docs] def encode(self, buf): """Return buffer plus 4-byte Bob Jenkin's lookup3 checksum""" buf = ensure_contiguous_ndarray(buf).ravel().view('uint8') if self.prefix is None: val = jenkins_lookup3(buf, self.initval) else: val = jenkins_lookup3(np.hstack((self.prefix, buf)), self.initval) return buf.tobytes() + struct.pack("<I", val)
[docs] def decode(self, buf, out=None): """Check Bob Jenkin's lookup3 checksum, and return buffer without it""" b = ensure_contiguous_ndarray(buf).view('uint8') if self.prefix is None: val = jenkins_lookup3(b[:-4], self.initval) else: val = jenkins_lookup3(np.hstack((self.prefix, b[:-4])), self.initval) found = b[-4:].view("<u4")[0] if val != found: raise RuntimeError( f"The Bob Jenkin's lookup3 checksum of the data ({val}) did not" f" match the expected checksum ({found}).\n" "This could be a sign that the data has been corrupted." ) if out is not None: out.view("uint8")[:] = b[:-4] return out return memoryview(b[:-4])