"""
Time handling
"""
from __future__ import annotations
from functools import partial
from typing import TYPE_CHECKING, Protocol
import cftime
import numpy as np
if TYPE_CHECKING:
from collections.abc import Iterable
from typing import Any
import xarray as xr
MONTHS_PER_YEAR: int = 12
"""Months per year"""
[docs]class NonUniqueYearMonths(ValueError):
"""
Raised when the user tries to convert to year-month with non-unique values
This happens when the datetime values lead to year-month values that are
not unique
"""
def __init__(
self, unique_vals: Iterable[tuple[int, int]], counts: Iterable[int]
) -> None:
"""
Initialise the error
Parameters
----------
unique_vals
Unique values. In each tuple, the first value is the year and the
second is the month.
counts
Counts of the number of time each unique value appeared in the
original array
"""
non_unique = list((v, c) for v, c in zip(unique_vals, counts) if c > 1)
error_msg = (
"Your year-month axis is not unique. "
f"Year-month values with a count > 1: {non_unique}"
)
super().__init__(error_msg)
[docs]def convert_year_month_to_time(
inp: xr.Dataset,
day: int = 1,
**kwargs: Any,
) -> xr.Dataset:
"""
Convert year and month co-ordinates into a time axis
This is a facade to :func:`convert_to_time`
Parameters
----------
inp
Data to convert
day
Day of the month to assume in output
**kwargs
Passed to intialiser of :class:`cftime.datetime`
Returns
-------
Data with time axis
"""
return convert_to_time(
inp,
time_coords=("year", "month"),
cftime_converter=partial(cftime.datetime, day=day, **kwargs),
)
[docs]class CftimeConverter(Protocol): # pylint: disable=too-few-public-methods
"""
Callable that supports converting stacked time co-ordinates to :obj:`cftime.datetime`
"""
def __call__(
self,
*args: np.float_ | np.int_,
) -> cftime.datetime:
"""
Convert input values to an :obj:`cftime.datetime`
"""
[docs]def convert_to_time(
inp: xr.Dataset,
time_coords: tuple[str, ...],
cftime_converter: CftimeConverter,
) -> xr.Dataset:
"""
Convert some co-ordinates representing time into a time axis
Parameters
----------
inp
Data to convert
time_coords
Co-ordinates from which to create the time axis
cftime_converter
Callable that converts the stacked time co-ordinates to
:obj:`cftime.datetime`
Returns
-------
Data with time axis
"""
inp = inp.stack(time=time_coords)
times = inp["time"].to_numpy()
inp = inp.drop_vars(("time", *time_coords)).assign_coords(
{"time": [cftime_converter(*t) for t in times]}
)
return inp
[docs]def split_time_to_year_month(
inp: xr.Dataset,
time_axis: str = "time",
) -> xr.Dataset:
"""
Convert the time dimension to year and month without stacking
This means there is still a single time dimension in the output, but there
is now also accompanying year and month information
Parameters
----------
inp
Data to convert
Returns
-------
Data with year and month information for the time axis
Raises
------
NonUniqueYearMonths
The years and months are not unique
"""
out = inp.assign_coords(
{
"month": inp[time_axis].dt.month,
"year": inp[time_axis].dt.year,
}
).set_index({time_axis: ("year", "month")})
# Could be updated when https://github.com/pydata/xarray/issues/7104 is
# closed
unique_vals, counts = np.unique(out[time_axis].values, return_counts=True)
if (counts > 1).any():
raise NonUniqueYearMonths(unique_vals, counts)
return out
[docs]def convert_time_to_year_month(
inp: xr.Dataset,
time_axis: str = "time",
) -> xr.Dataset:
"""
Convert the time dimension to year and month co-ordinates
Parameters
----------
inp
Data to convert
Returns
-------
Data with year and month co-ordinates
"""
return split_time_to_year_month(
inp=inp,
time_axis=time_axis,
).unstack(time_axis)
[docs]def get_start_of_next_month(y: int, m: int) -> cftime.datetime:
"""
Get start of next month
Parameters
----------
y
Year
m
Month
Returns
-------
Start of next month
"""
if m == MONTHS_PER_YEAR:
m_out = 1
y_out = y + 1
else:
m_out = m + 1
y_out = y
# This may need to be refactored to allow the cftime_converter to be
# injected, same idea as `convert_to_time`
return cftime.datetime(y_out, m_out, 1)