dask: Cached first, second and last data values by davidhassell · Pull Request #494 · NCAS-CMS/cf-python
Expand Up
@@ -365,7 +365,7 @@ def __init__(
except (AttributeError, TypeError):
pass
else:
self._set_dask(array, copy=copy, delete_source=False)
self._set_dask(array, copy=copy, conform=False)
else:
self._del_dask(None)
Expand Down Expand Up @@ -478,7 +478,7 @@ def __init__( self._Units = units
# Store the dask array self._set_dask(array, delete_source=False) self._set_dask(array, conform=False)
# Override the data type if dtype is not None: Expand Down Expand Up @@ -1107,9 +1107,8 @@ def __setitem__(self, indices, value): shifts = [-shift for shift in shifts] self.roll(shift=shifts, axis=roll_axes, inplace=True)
# Remove a source array, on the grounds that we can't # guarantee its consistency with the updated dask array. self._del_Array(None) # Remove elements made invalid by updating the `dask` array self._conform_after_dask_update()
return
Expand Down Expand Up @@ -1227,12 +1226,32 @@ def __keepdims_indexing__(self): def __keepdims_indexing__(self, value): self._custom["__keepdims_indexing__"] = bool(value)
def _set_dask(self, array, copy=False, delete_source=True): def _conform_after_dask_update(self): """Remove elements made invalid by updating the `dask` array.
Removes or modifies components that can't be guaranteed to be consistent with an updated `dask` array`:
* Deletes a source array. * Deletes cached element values.
.. versionadded:: TODODASKVER
:Returns:
`None`
""" self._del_Array(None) self._del_cached_elements()
def _set_dask(self, array, copy=False, conform=True): """Set the dask array.
.. versionadded:: TODODASKVER
.. seealso:: `to_dask_array`, `_del_dask` .. seealso:: `to_dask_array`, `_conform_after_dask_update`, `_del_dask`
:Parameters:
Expand All @@ -1243,10 +1262,10 @@ def _set_dask(self, array, copy=False, delete_source=True): If True then copy *array* before setting it. By default it is not copied.
delete_source: `bool`, optional If False then do not delete a source array, if one exists, after setting the new dask array. By default a source array is deleted. conform: `bool`, optional If True, the default, then remove elements made invalid by updating the `dask` array. See `_conform_after_dask_update` for details.
:Returns:
Expand All @@ -1255,35 +1274,40 @@ def _set_dask(self, array, copy=False, delete_source=True): """ if array is NotImplemented: logger.warning( "NotImplemented has been set in the place of a dask array" "NotImplemented has been set in the place of a dask array." "\n\n" "This could occur if any sort of exception is raised " "by a function that is run on chunks (via, for " "instance, da.map_blocks or " "dask.array.core.elemwise). Such a function could get " "run at definition time in order to ascertain " "suitability (such as data type casting, " "broadcasting, etc.). Note that the exception may be " "difficult to diagnose, as dask will have silently " "trapped it and returned NotImplemented (for " "instance, see dask.array.core.elemwise). Print " "statements in a local copy of dask are possibly the " "way to go if the cause of the error is not obvious." ) # This could occur if any sort of exception is raised by # function that is run on chunks (such as # `cf_where`). Such a function could get run at definition # time in order to ascertain suitability (such as data # type casting, broadcasting, etc.). Note that the # exception may be difficult to diagnose, as dask will # have silently trapped it and returned NotImplemented # (for instance, see `dask.array.core.elemwise`). Print # statements in a local copy of dask are prossibly the way # to go if the cause of the error is not obvious.
if copy: array = array.copy()
self._custom["dask"] = array
if delete_source: # Remove a source array, on the grounds that we can't # guarantee its consistency with the new dask array. self._del_Array(None) if conform: # Remove elements made invalid by updating the `dask` # array self._conform_after_dask_update()
def _del_dask(self, default=ValueError(), delete_source=True): def _del_dask(self, default=ValueError(), conform=True): """Remove the dask array.
.. versionadded:: TODODASKVER
.. seealso:: `_set_dask`, `to_dask_array` .. seealso:: `to_dask_array`, `_conform_after_dask_update`, `_set_dask`
:Parameters:
Expand All @@ -1293,9 +1317,10 @@ def _del_dask(self, default=ValueError(), delete_source=True):
{{default Exception}}
delete_source: `bool`, optional If False then do not delete a compressed source array, if one exists. conform: `bool`, optional If True, the default, then remove elements made invalid by updating the `dask` array. See `_conform_after_dask_update` for details.
:Returns:
Expand Down Expand Up @@ -1325,14 +1350,71 @@ def _del_dask(self, default=ValueError(), delete_source=True): default, f"{self.__class__.__name__!r} has no dask array" )
if delete_source: # Remove a source array, on the grounds that we can't # guarantee its consistency with any future new dask # array. self._del_Array(None) if conform: # Remove elements made invalid by deleting the `dask` # array self._conform_after_dask_update()
return out
def _del_cached_elements(self): """Delete any cached element values.
Updates *data* in-place to remove the cached element values ``'first_element'``, ``'second_element'`` and ``'last_element'``.
.. note:: By default, `_del_cached_elements` is run whenever the `_set_dask` and `del_dask` methods are used. If the `dask` array is updated or changed without using the default behaviour of either of these two methods, and there is any chance that the cached values might be inconsistent with the new data, then `_del_cached_elements` must be called explicitly to ensure consistency.
.. versionadded:: TODODASKVER
.. seealso:: `_del_dask`, `_set_cached_elements`, `_set_dask`
:Returns:
`None`
""" custom = self._custom for element in ("first_element", "second_element", "last_element"): custom.pop(element, None)
def _set_cached_elements(self, elements): """Cache selected element values.
Updates *data* in-place to store the given element values within its ``custom`` dictionary.
.. versionadded:: TODODASKVER
.. seealso:: `_del_cached_elements`
:Parameters:
elements: `dict` Zero or more element values to be cached, each keyed by a unique identifier to allow unambiguous retrieval. Existing cached elements not specified by *elements* will not be removed.
:Returns:
`None`
**Examples**
>>> d._set_cached_elements({'first_element': 273.15})
""" self._custom.update(elements)
@_inplace_enabled(default=False) def diff(self, axis=-1, n=1, inplace=False): """Calculate the n-th discrete difference along the given axis. Expand Down Expand Up @@ -2181,7 +2263,7 @@ def persist(self, inplace=False):
dx = self.to_dask_array() dx = dx.persist() d._set_dask(dx, delete_source=False) d._set_dask(dx, conform=False)
return d
Expand Down Expand Up @@ -2751,7 +2833,7 @@ def rechunk(
dx = d.to_dask_array() dx = dx.rechunk(chunks, threshold, block_size_limit, balance) d._set_dask(dx, delete_source=False) d._set_dask(dx, conform=False)
return d
Expand Down Expand Up @@ -7518,7 +7600,7 @@ def harden_mask(self): """ dx = self.to_dask_array() dx = dx.map_blocks(cf_harden_mask, dtype=self.dtype) self._set_dask(dx, delete_source=False) self._set_dask(dx, conform=False) self.hardmask = True
def has_calendar(self): Expand Down Expand Up @@ -7615,7 +7697,7 @@ def soften_mask(self): """ dx = self.to_dask_array() dx = dx.map_blocks(cf_soften_mask, dtype=self.dtype) self._set_dask(dx, delete_source=False) self._set_dask(dx, conform=False) self.hardmask = False
@_inplace_enabled(default=False) Expand Down Expand Up @@ -7673,7 +7755,7 @@ def filled(self, fill_value=None, inplace=False):
return d
def first_element(self, verbose=None): def first_element(self): """Return the first element of the data as a scalar.
If the value is deemed too expensive to compute then a Expand Down Expand Up @@ -7705,16 +7787,14 @@ def first_element(self, verbose=None): masked
""" if self.can_compute(): return super().first_element()
raise ValueError( "First element of the data is considered too expensive " "to compute. Consider setting the 'force_compute' attribute, or " "setting the log level to 'DEBUG'." ) try: return self._custom["first_element"] except KeyError: item = super().first_element() self._set_cached_elements({"first_element": item}) return item
def second_element(self, verbose=None): def second_element(self): """Return the second element of the data as a scalar.
If the value is deemed too expensive to compute then a Expand Down Expand Up @@ -7746,14 +7826,12 @@ def second_element(self, verbose=None): masked
""" if self.can_compute(): return super().second_element()
raise ValueError( "Second element of the data is considered too expensive " "to compute. Consider setting the 'force_compute' atribute, or " "setting the log level to 'DEBUG'." ) try: return self._custom["second_element"] except KeyError: item = super().second_element() self._set_cached_elements({"second_element": item}) return item
def last_element(self): """Return the last element of the data as a scalar. Expand Down Expand Up @@ -7787,14 +7865,12 @@ def last_element(self): masked
""" if self.can_compute(): return super().last_element()
raise ValueError( "First element of the data is considered too expensive " "to compute. Consider setting the 'force_compute' attribute, or " "setting the log level to 'DEBUG'." ) try: return self._custom["last_element"] except KeyError: item = super().last_element() self._set_cached_elements({"last_element": item}) return item
def flat(self, ignore_masked=True): """Return a flat iterator over elements of the data array. Expand Down
Expand Down Expand Up @@ -478,7 +478,7 @@ def __init__( self._Units = units
# Store the dask array self._set_dask(array, delete_source=False) self._set_dask(array, conform=False)
# Override the data type if dtype is not None: Expand Down Expand Up @@ -1107,9 +1107,8 @@ def __setitem__(self, indices, value): shifts = [-shift for shift in shifts] self.roll(shift=shifts, axis=roll_axes, inplace=True)
# Remove a source array, on the grounds that we can't # guarantee its consistency with the updated dask array. self._del_Array(None) # Remove elements made invalid by updating the `dask` array self._conform_after_dask_update()
return
Expand Down Expand Up @@ -1227,12 +1226,32 @@ def __keepdims_indexing__(self): def __keepdims_indexing__(self, value): self._custom["__keepdims_indexing__"] = bool(value)
def _set_dask(self, array, copy=False, delete_source=True): def _conform_after_dask_update(self): """Remove elements made invalid by updating the `dask` array.
Removes or modifies components that can't be guaranteed to be consistent with an updated `dask` array`:
* Deletes a source array. * Deletes cached element values.
.. versionadded:: TODODASKVER
:Returns:
`None`
""" self._del_Array(None) self._del_cached_elements()
def _set_dask(self, array, copy=False, conform=True): """Set the dask array.
.. versionadded:: TODODASKVER
.. seealso:: `to_dask_array`, `_del_dask` .. seealso:: `to_dask_array`, `_conform_after_dask_update`, `_del_dask`
:Parameters:
Expand All @@ -1243,10 +1262,10 @@ def _set_dask(self, array, copy=False, delete_source=True): If True then copy *array* before setting it. By default it is not copied.
delete_source: `bool`, optional If False then do not delete a source array, if one exists, after setting the new dask array. By default a source array is deleted. conform: `bool`, optional If True, the default, then remove elements made invalid by updating the `dask` array. See `_conform_after_dask_update` for details.
:Returns:
Expand All @@ -1255,35 +1274,40 @@ def _set_dask(self, array, copy=False, delete_source=True): """ if array is NotImplemented: logger.warning( "NotImplemented has been set in the place of a dask array" "NotImplemented has been set in the place of a dask array." "\n\n" "This could occur if any sort of exception is raised " "by a function that is run on chunks (via, for " "instance, da.map_blocks or " "dask.array.core.elemwise). Such a function could get " "run at definition time in order to ascertain " "suitability (such as data type casting, " "broadcasting, etc.). Note that the exception may be " "difficult to diagnose, as dask will have silently " "trapped it and returned NotImplemented (for " "instance, see dask.array.core.elemwise). Print " "statements in a local copy of dask are possibly the " "way to go if the cause of the error is not obvious." ) # This could occur if any sort of exception is raised by # function that is run on chunks (such as # `cf_where`). Such a function could get run at definition # time in order to ascertain suitability (such as data # type casting, broadcasting, etc.). Note that the # exception may be difficult to diagnose, as dask will # have silently trapped it and returned NotImplemented # (for instance, see `dask.array.core.elemwise`). Print # statements in a local copy of dask are prossibly the way # to go if the cause of the error is not obvious.
if copy: array = array.copy()
self._custom["dask"] = array
if delete_source: # Remove a source array, on the grounds that we can't # guarantee its consistency with the new dask array. self._del_Array(None) if conform: # Remove elements made invalid by updating the `dask` # array self._conform_after_dask_update()
def _del_dask(self, default=ValueError(), delete_source=True): def _del_dask(self, default=ValueError(), conform=True): """Remove the dask array.
.. versionadded:: TODODASKVER
.. seealso:: `_set_dask`, `to_dask_array` .. seealso:: `to_dask_array`, `_conform_after_dask_update`, `_set_dask`
:Parameters:
Expand All @@ -1293,9 +1317,10 @@ def _del_dask(self, default=ValueError(), delete_source=True):
{{default Exception}}
delete_source: `bool`, optional If False then do not delete a compressed source array, if one exists. conform: `bool`, optional If True, the default, then remove elements made invalid by updating the `dask` array. See `_conform_after_dask_update` for details.
:Returns:
Expand Down Expand Up @@ -1325,14 +1350,71 @@ def _del_dask(self, default=ValueError(), delete_source=True): default, f"{self.__class__.__name__!r} has no dask array" )
if delete_source: # Remove a source array, on the grounds that we can't # guarantee its consistency with any future new dask # array. self._del_Array(None) if conform: # Remove elements made invalid by deleting the `dask` # array self._conform_after_dask_update()
return out
def _del_cached_elements(self): """Delete any cached element values.
Updates *data* in-place to remove the cached element values ``'first_element'``, ``'second_element'`` and ``'last_element'``.
.. note:: By default, `_del_cached_elements` is run whenever the `_set_dask` and `del_dask` methods are used. If the `dask` array is updated or changed without using the default behaviour of either of these two methods, and there is any chance that the cached values might be inconsistent with the new data, then `_del_cached_elements` must be called explicitly to ensure consistency.
.. versionadded:: TODODASKVER
.. seealso:: `_del_dask`, `_set_cached_elements`, `_set_dask`
:Returns:
`None`
""" custom = self._custom for element in ("first_element", "second_element", "last_element"): custom.pop(element, None)
def _set_cached_elements(self, elements): """Cache selected element values.
Updates *data* in-place to store the given element values within its ``custom`` dictionary.
.. versionadded:: TODODASKVER
.. seealso:: `_del_cached_elements`
:Parameters:
elements: `dict` Zero or more element values to be cached, each keyed by a unique identifier to allow unambiguous retrieval. Existing cached elements not specified by *elements* will not be removed.
:Returns:
`None`
**Examples**
>>> d._set_cached_elements({'first_element': 273.15})
""" self._custom.update(elements)
@_inplace_enabled(default=False) def diff(self, axis=-1, n=1, inplace=False): """Calculate the n-th discrete difference along the given axis. Expand Down Expand Up @@ -2181,7 +2263,7 @@ def persist(self, inplace=False):
dx = self.to_dask_array() dx = dx.persist() d._set_dask(dx, delete_source=False) d._set_dask(dx, conform=False)
return d
Expand Down Expand Up @@ -2751,7 +2833,7 @@ def rechunk(
dx = d.to_dask_array() dx = dx.rechunk(chunks, threshold, block_size_limit, balance) d._set_dask(dx, delete_source=False) d._set_dask(dx, conform=False)
return d
Expand Down Expand Up @@ -7518,7 +7600,7 @@ def harden_mask(self): """ dx = self.to_dask_array() dx = dx.map_blocks(cf_harden_mask, dtype=self.dtype) self._set_dask(dx, delete_source=False) self._set_dask(dx, conform=False) self.hardmask = True
def has_calendar(self): Expand Down Expand Up @@ -7615,7 +7697,7 @@ def soften_mask(self): """ dx = self.to_dask_array() dx = dx.map_blocks(cf_soften_mask, dtype=self.dtype) self._set_dask(dx, delete_source=False) self._set_dask(dx, conform=False) self.hardmask = False
@_inplace_enabled(default=False) Expand Down Expand Up @@ -7673,7 +7755,7 @@ def filled(self, fill_value=None, inplace=False):
return d
def first_element(self, verbose=None): def first_element(self): """Return the first element of the data as a scalar.
If the value is deemed too expensive to compute then a Expand Down Expand Up @@ -7705,16 +7787,14 @@ def first_element(self, verbose=None): masked
""" if self.can_compute(): return super().first_element()
raise ValueError( "First element of the data is considered too expensive " "to compute. Consider setting the 'force_compute' attribute, or " "setting the log level to 'DEBUG'." ) try: return self._custom["first_element"] except KeyError: item = super().first_element() self._set_cached_elements({"first_element": item}) return item
def second_element(self, verbose=None): def second_element(self): """Return the second element of the data as a scalar.
If the value is deemed too expensive to compute then a Expand Down Expand Up @@ -7746,14 +7826,12 @@ def second_element(self, verbose=None): masked
""" if self.can_compute(): return super().second_element()
raise ValueError( "Second element of the data is considered too expensive " "to compute. Consider setting the 'force_compute' atribute, or " "setting the log level to 'DEBUG'." ) try: return self._custom["second_element"] except KeyError: item = super().second_element() self._set_cached_elements({"second_element": item}) return item
def last_element(self): """Return the last element of the data as a scalar. Expand Down Expand Up @@ -7787,14 +7865,12 @@ def last_element(self): masked
""" if self.can_compute(): return super().last_element()
raise ValueError( "First element of the data is considered too expensive " "to compute. Consider setting the 'force_compute' attribute, or " "setting the log level to 'DEBUG'." ) try: return self._custom["last_element"] except KeyError: item = super().last_element() self._set_cached_elements({"last_element": item}) return item
def flat(self, ignore_masked=True): """Return a flat iterator over elements of the data array. Expand Down