Source code for numcodecs.fixedscaleoffset

import numpy as np


from .abc import Codec
from .compat import ensure_ndarray, ndarray_copy


[docs]class FixedScaleOffset(Codec): """Simplified version of the scale-offset filter available in HDF5. Applies the transformation `(x - offset) * scale` to all chunks. Results are rounded to the nearest integer but are not packed according to the minimum number of bits. Parameters ---------- offset : float Value to subtract from data. scale : int Value to multiply by data. dtype : dtype Data type to use for decoded data. astype : dtype, optional Data type to use for encoded data. Notes ----- If `astype` is an integer data type, please ensure that it is sufficiently large to store encoded values. No checks are made and data may become corrupted due to integer overflow if `astype` is too small. Examples -------- >>> import numcodecs >>> import numpy as np >>> x = np.linspace(1000, 1001, 10, dtype='f8') >>> x array([1000. , 1000.11111111, 1000.22222222, 1000.33333333, 1000.44444444, 1000.55555556, 1000.66666667, 1000.77777778, 1000.88888889, 1001. ]) >>> codec = numcodecs.FixedScaleOffset(offset=1000, scale=10, dtype='f8', astype='u1') >>> y1 = codec.encode(x) >>> y1 array([ 0, 1, 2, 3, 4, 6, 7, 8, 9, 10], dtype=uint8) >>> z1 = codec.decode(y1) >>> z1 array([1000. , 1000.1, 1000.2, 1000.3, 1000.4, 1000.6, 1000.7, 1000.8, 1000.9, 1001. ]) >>> codec = numcodecs.FixedScaleOffset(offset=1000, scale=10**2, dtype='f8', astype='u1') >>> y2 = codec.encode(x) >>> y2 array([ 0, 11, 22, 33, 44, 56, 67, 78, 89, 100], dtype=uint8) >>> z2 = codec.decode(y2) >>> z2 array([1000. , 1000.11, 1000.22, 1000.33, 1000.44, 1000.56, 1000.67, 1000.78, 1000.89, 1001. ]) >>> codec = numcodecs.FixedScaleOffset(offset=1000, scale=10**3, dtype='f8', astype='u2') >>> y3 = codec.encode(x) >>> y3 array([ 0, 111, 222, 333, 444, 556, 667, 778, 889, 1000], dtype=uint16) >>> z3 = codec.decode(y3) >>> z3 array([1000. , 1000.111, 1000.222, 1000.333, 1000.444, 1000.556, 1000.667, 1000.778, 1000.889, 1001. ]) See Also -------- numcodecs.quantize.Quantize """ codec_id = 'fixedscaleoffset' def __init__(self, offset, scale, dtype, astype=None): self.offset = offset self.scale = scale self.dtype = np.dtype(dtype) if astype is None: self.astype = self.dtype else: self.astype = np.dtype(astype) if self.dtype == object or self.astype == object: raise ValueError('object arrays are not supported')
[docs] def encode(self, buf): # normalise input arr = ensure_ndarray(buf).view(self.dtype) # flatten to simplify implementation arr = arr.reshape(-1, order='A') # compute scale offset enc = (arr - self.offset) * self.scale # round to nearest integer enc = np.around(enc) # convert dtype enc = enc.astype(self.astype, copy=False) return enc
[docs] def decode(self, buf, out=None): # interpret buffer as numpy array enc = ensure_ndarray(buf).view(self.astype) # flatten to simplify implementation enc = enc.reshape(-1, order='A') # decode scale offset dec = (enc / self.scale) + self.offset # convert dtype dec = dec.astype(self.dtype, copy=False) # handle output return ndarray_copy(dec, out)
[docs] def get_config(self): # override to handle encoding dtypes return dict( id=self.codec_id, scale=self.scale, offset=self.offset, dtype=self.dtype.str, astype=self.astype.str )
def __repr__(self): r = '%s(scale=%s, offset=%s, dtype=%r' % \ (type(self).__name__, self.scale, self.offset, self.dtype.str) if self.astype != self.dtype: r += ', astype=%r' % self.astype.str r += ')' return r