Source code for opendap_protocol.protocol

# Copyright (c) 2020, MeteoSwiss
# Authors: Philipp Falke <philipp.falke@meteoswiss.ch>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
A pure Python implementation of the OPeNDAP server protocol.

This module allows you to serve arbitrary data structures through the web
framework of your choice as OPeNDAP data objects. It implements just the bare
minimum of the DAP 2.0 protocol: DDS, DAS, and DODS responses and slicing. Array
data needs to be supplied as :class:`numpy.ndarray`.

The classes defined here allow the user to construct a data model in a flexible
way, by describing the data hierarchy using data types defined by DAP.

This library only implements the server side encoding. It is tested to serve
clients using the netCDF4 library. PyDAP client libraries are not supported.
"""

import re
from dataclasses import dataclass

import dask.array as da
import numpy as np

INDENT = '    '
SLICE_CONSTRAINT_RE = r'\[([\d,\W]+)\]$'


[docs]@dataclass class Config: DASK_ENCODE_CHUNK_SIZE: int = 20e6
[docs]class DAPError(Exception): pass
[docs]class DAPObject: """A generic DAP object class. """ def __init__(self, name='', parent=None, *args, **kwargs): try: self.name = '_'.join(name.split(' ')) except AttributeError: self.name = name self.children = [] self.parent = parent self.data = None self._parse_args(args, kwargs)
[docs] def dds(self, constraint=''): if meets_constraint(constraint, self.data_path): yield self.ddshead() for obj in self.children: obj.parent = self for stmt in obj.dds(constraint=constraint): yield stmt yield self.ddstail() return
[docs] def das(self, constraint=''): if meets_constraint(constraint, self.data_path): yield self.dashead() for obj in self.children: obj.parent = self for stmt in obj.das(constraint=constraint): yield stmt yield self.dastail() return
[docs] def dods(self, constraint=''): if meets_constraint(constraint, self.data_path): for stmt in self.dds(constraint=constraint): yield stmt.encode() yield b'\n' if meets_constraint(constraint, self.data_path): for stmt in self.dods_data(constraint=constraint): yield stmt return
[docs] def dods_data(self, constraint=''): if meets_constraint(constraint, self.data_path): for obj in self.children: for stmt in obj.dods_data(constraint=constraint): yield stmt return
[docs] def append(self, *obj): for o in obj: o.parent = self self.children.append(o)
@property def indent(self): if self.__class__.__name__ == 'Dataset': return '' else: return self.parent.indent + INDENT @property def data_path(self): if self.__class__.__name__ == 'Dataset': return '' else: if self.parent.__class__.__name__ == 'Dataset': return self.name else: return '.'.join([self.parent.data_path, self.name])
[docs] def ddshead(self): return '{indent}{obj} {{\n'.format(indent=self.indent, obj=self.__class__.__name__)
[docs] def ddstail(self): return '{indent}}} {name};\n'.format(indent=self.indent, name=self.name)
[docs] def dashead(self): name = self.name if isinstance(self, Dataset): name = 'Attributes' return '{indent}{name} {{\n'.format(indent=self.indent, name=name)
[docs] def dastail(self): return '{indent}}}\n'.format(indent=self.indent)
def _parse_args(self, args, kwargs): pass
[docs]class DAPAtom(DAPObject): """A class for handling DAP atomic variables. """ str = None def __init__(self, value=None, name=None, parent=None): self._val = value super(DAPAtom, self).__init__(name=name, parent=parent) def __str__(self): return self.__class__.__name__ def __repr__(self): return self.__class__.__name__
[docs] @classmethod def byteorder(cls): return np.dtype(cls.dtype.__name__).byteorder
[docs] @classmethod def subclasses(cls): """Return a list of subclasses. """ return cls.__subclasses__()
[docs] @classmethod def type_from_np(cls, nptype): """Return the appropriate DAP type for a given numpy dtype. :param nptype: A :class:`numpy.dtpye` object :returns: A subclass of :class:`DAPAtom` """ # Handle special cases first if nptype == np.int8: return Int16 elif nptype == np.uint8: return Byte elif nptype == np.int64: return Int32 elif nptype == np.uint64: return UInt32 else: # then handle the rest for subclass in cls.subclasses(): if subclass.dtype == nptype: return subclass
[docs] def das(self, constraint=''): if meets_constraint(constraint, self.data_path): yield self.dashead() for obj in self.children: for stmt in obj.das(constraint=constraint): yield stmt yield self.dastail()
[docs] def dds(self, constraint=''): if meets_constraint(constraint, self.data_path): yield '{indent}{dtype} {name};\n'.format(indent=self.indent, dtype=self.__str__(), name=self.name)
[docs] def dods_data(self, constraint=''): if meets_constraint(constraint, self.data_path): yield from dods_encode(self._val, self)
[docs]class Byte(DAPAtom): dtype = np.ubyte str = 'B'
[docs]class Int16(DAPAtom): dtype = np.int16 str = '>i4'
[docs]class UInt16(DAPAtom): dtype = np.uint16 str = '>u4'
[docs]class Int32(DAPAtom): dtype = np.int32 str = '>i4'
[docs]class UInt32(DAPAtom): dtype = np.uint32 str = '>u4'
[docs]class Float32(DAPAtom): dtype = np.float32 str = '>f4'
[docs]class Float64(DAPAtom): dtype = np.float64 str = '>f8'
[docs]class String(DAPAtom): dtype = np.str_ str = 'S'
[docs] def dods_data(self, constraint=''): if meets_constraint(constraint, self.data_path): yield from dods_encode(self._val.encode('ascii'), self)
[docs]class URL(String): dtype = np.str_ str = 'S'
[docs]class Structure(DAPObject): """Class representing a DAP structure. """ pass
[docs]class Dataset(Structure): """Class representing a DAP dataset. """
[docs] def dods_data(self, constraint=''): yield b'Data:\r\n' for obj in self.children: for stmt in obj.dods_data(constraint=constraint): yield stmt
[docs]class Sequence(DAPObject): """Class representing a DAP sequence. """ start_of_inst = b'\x5a\x00\x00\x00' end_of_seq = b'\xa5\x00\x00\x00' def __init__(self, *args, **kwargs): super(Sequence, self).__init__(*args, **kwargs) self.schema = None
[docs] def append(self, *item): for it in item: if it.validates(self.schema): it.parent = self self.children.append(it) else: raise DAPError('Item does not validate against the schema.')
[docs] def add_schema(self, schema): schema.parent = self self.schema = schema
[docs] def das(self, constraint=''): if meets_constraint(constraint, self.data_path): yield self.dashead() for item in self.schema.children: item.parent = self for stmt in item.das(constraint=constraint): yield stmt yield self.dastail()
[docs] def dds(self, constraint=''): if meets_constraint(constraint, self.data_path): yield self.ddshead() for item in self.schema.children: item.parent = self for stmt in item.dds(constraint=constraint): yield stmt yield self.ddstail()
[docs] def dods_data(self, constraint=''): if meets_constraint(constraint, self.data_path): for obj in self.children: yield self.start_of_inst for stmt in obj.dods_data(constraint=constraint): yield stmt yield self.end_of_seq return
[docs]class SequenceInstance(DAPObject): """Class representing a data item that will be added to a sequence. """ @property def data_path(self): return self.parent.data_path
[docs] def validates(self, schema): """Validate the sequence instance against a sequence schema :param schema: A :class:`SequenceSchema` instance. """ # TODO: Implement validataion return True
[docs] def dods_data(self, constraint=''): for obj in self.children: for stmt in obj.dods_data(constraint=constraint): yield stmt return
[docs]class SequenceSchema(DAPObject): """Class holding a schema against which SequenceItems are validated. """ pass
[docs]class DAPDataObject(DAPObject): """A generic class for typed non-atomic objects holding actual data (i.e. Array and Grid). """ def _parse_args(self, args, kwargs): self.data = kwargs.get('data', None) if 'dtype' in kwargs: self.dtype = kwargs['dtype'] else: if isinstance(self.data, str): self.dtype = String else: self.dtype = Float64 if 'dimensions' in kwargs: self.dimensions = kwargs['dimensions'] else: self.dimensions = None
[docs] def dods_data(self, constraint=''): if meets_constraint(constraint, self.data_path): slices = parse_slice_constraint(constraint) yield from dods_encode(self.data[slices], self.dtype) if self.dimensions is not None: for i, dim in enumerate(self.dimensions): sl = slices[i] if i < len(slices) else ... yield from dods_encode(dim.data[sl], dim.dtype)
[docs]class Grid(DAPDataObject):
[docs] def dds(self, constraint=''): if meets_constraint(constraint, self.data_path): slices = parse_slice_constraint(constraint) yield self.ddshead() yield self.indent + ' Array:\n' yield self.indent + INDENT + \ '{dtype} {name}'.format(dtype=self.dtype(), name=self.name) for i, dim in enumerate(self.dimensions): sl = slices[i] if i < len(slices) else ... yield '[{dimname} = {dimlen}]'.format( dimname=dim.name, dimlen=int(np.prod(dim.data[sl].shape))) yield ';\n' yield self.indent + ' Maps:\n' for i, dim in enumerate(self.dimensions): orig_parent = dim.parent dim.parent = self sl = slices[i] if i < len(slices) else ... for stmt in dim.dds(constraint='', slicing=sl): yield stmt dim.parent = orig_parent yield self.ddstail()
[docs]class Array(DAPDataObject):
[docs] def dds(self, constraint='', slicing=None): if meets_constraint(constraint, self.data_path): # Check for slice if slicing is None: slices = parse_slice_constraint(constraint) else: slices = slicing yield self.indent + \ '{dtype} {name}[{name} = {length}];\n' \ .format(dtype=self.dtype(), name=self.name, length=int(np.prod(self.data[slices].shape)))
[docs]class Attribute(DAPObject): def __init__(self, value=None, name=None, dtype=None): self.value = value self.dtype = dtype super(Attribute, self).__init__(name=name)
[docs] def das(self, constraint=''): if self.dtype == String: d = '"' else: d = '' yield '{indent}{dtype} {name} {d}{value}{d};\n' \ .format(indent=self.indent, dtype=self.dtype(), name=self.name, value=self.value, d=d)
[docs] def dds(self, *args, **kwargs): yield ''
[docs]def dods_encode(data, dtype): """This is the fast XDR conversion. A 100x100 array takes around 40 micro- seconds. This is a speedup of factor 100. """ is_scalar = False if not hasattr(data, 'shape'): # if not hasattr(data, '__len__'): data = np.asarray(data) is_scalar = True length = np.prod(data.shape) packed_length = b'' if not is_scalar: packed_length = length.astype('<i4').byteswap().tobytes() * 2 yield packed_length if isinstance(data, da.Array): # Encode in chunks of a defined size if we work with dask.Array chunk_size = int(Config.DASK_ENCODE_CHUNK_SIZE / data.dtype.itemsize) serialize_data = data.ravel().rechunk(chunk_size) for block in serialize_data.blocks: yield block.astype(dtype.str).compute().tobytes() else: # Make sure we always encode an array or we will get wrong results data = np.asarray(data) yield data.astype(dtype.str).tobytes()
[docs]def parse_slice_constraint(constraint): """Parses the slicing part of a constraint expression. :param constraint: A complete constraint string as received through DAP request. :returns: A tuple of slices that can be used for accessing a subdomain of a dataset. """ slice_split = re.split(SLICE_CONSTRAINT_RE, constraint) if len(slice_split) == 3: slice_str = slice_split[1].replace('][', ',') return tuple(parse_slice(s) for s in slice_str.split(',')) else: return ...,
[docs]def parse_slice(token): """Parse a single slice string :param token: A string containing a number [3], a range [3:7] or a colon [:] :returns: An integer for simple numbers, or a slice object """ try: return int(token) except ValueError: if token == ':': return ... elif ':' in token: rng = [int(s) for s in token.split(':')] # The DAP protocol uses slicing including the last index. # [0:20] in DAP translates to [0:21] in Python. rng[1] += 1 return slice(*rng)
[docs]def meets_constraint(constraint_expr, data_path): """Parse the constraint expression and check if data_path meets the criteria. :param constraint_expr: (string) A DAP constraint string :param data_path: (string) Path of a DAP object within the dataset :returns: a boolean """ if constraint_expr == '': return True constraint = constraint_expr.split(',') for constr in constraint: if constr.startswith(data_path): return True return False
[docs]def set_dask_encoding_chunk_size(chunk_size: int): """Set the maximum chunk size used to encode ``dask.Array``s to XDR. :param chunk_size: (int) Encoding chunk size in Bytes :returns: None """ chunk_size = int(chunk_size) if chunk_size > 0: Config.DASK_ENCODE_CHUNK_SIZE = chunk_size else: raise ValueError('Encoding chunk size needs to be greather than 0.')