"""
=================
ispline
=================
Implements :class:`Isplines`, which are monotonic spline functions that are
defined in terms of :class:`Msplines`. Also implements :class:`Isplines_total`
for the weighted sum of a :class:`Isplines` family.
See `Ramsay (1988)`_ for details about these splines, and also note the
corrections in the `Praat manual`_ to the errors in the I-spline formula
by `Ramsay (1988)`_.
.. _`Ramsay (1988)`: https://www.jstor.org/stable/2245395
.. _`Praat manual`: http://www.fon.hum.uva.nl/praat/manual/spline.html
"""
import numpy
[docs]
class Isplines_total:
r"""Evaluate the weighted sum of an I-spline family (see `Ramsay (1988)`_).
Parameters
----------
order : int
Sets :attr:`Isplines_total.order`.
mesh : array-like
Sets :attr:`Isplines_total.mesh`.
x : numpy.ndarray
Sets :attr:`Isplines_total.x`.
Attributes
----------
order : int
See :attr:`Isplines.order`.
mesh : numpy.ndarray
See :attr:`Isplines.mesh`.
n : int
See :attr:`Isplines.n`.
lower : float
See :attr:`Isplines.lower`.
upper : float
See :attr:`Isplines.upper`.
Note
----
Evaluates the full interpolating curve from the I-splines. When
:math:`x` falls within the lower :math:`L` and upper :math:`U`
bounds of the range covered by the I-splines (:math:`L \le x \le U`),
then this curve is defined as:
.. math::
I_{\rm{total}}\left(x\right)
=
w_{\rm{lower}} + \sum_i w_i I_i\left(x\right).
When :math:`x` is outside the range of the mesh covered by the splines,
the values are linearly extrapolated from first derivative at the
bounds. Specifically, if :math:`x < L` then:
.. math::
I_{\rm{total}}\left(x\right)
=
I_{\rm{total}}\left(L\right) +
\left(x - L\right)
\left.\frac{\partial I_{\rm{total}}\left(y\right)}
{\partial y}\right\rvert_{y=L},
and if :math:`x > U` then:
.. math::
I_{\rm{total}}\left(x\right)
=
I_{\rm{total}}\left(U\right) +
\left(x - U\right)
\left.\frac{\partial I_{\rm{total}}\left(y\right)}
{\partial y}\right\rvert_{y=U}.
Note also that:
.. math::
I_{\rm{total}}\left(L\right) &=& w_{\rm{lower}}, \\
I_{\rm{total}}\left(U\right) &=& w_{\rm{lower}} + \sum_i w_i
Example
-------
Short examples to demonstrate and test :class:`Isplines_total`:
.. plot::
:context: reset
>>> import itertools
>>> import numpy
>>> import pandas as pd
>>> import scipy.optimize
>>> from dms_variants.ispline import Isplines_total
>>> order = 3
>>> mesh = [0.0, 0.3, 0.5, 0.6, 1.0]
>>> x = numpy.array([0, 0.2, 0.3, 0.4, 0.8, 0.99999])
>>> isplines_total = Isplines_total(order, mesh, x)
>>> weights = numpy.array([1.2, 2, 1.2, 1.2, 3, 0]) / 6
>>> numpy.round(isplines_total.Itotal(weights, w_lower=0), 2)
array([0. , 0.38, 0.54, 0.66, 1.21, 1.43])
Now calculate using some points that require linear extrapolation
outside the mesh and also have a nonzero `w_lower`:
>>> x2 = numpy.array([-0.5, -0.25, 0, 0.01, 1.0, 1.5])
>>> isplines_total2 = Isplines_total(order, mesh, x2)
>>> numpy.round(isplines_total2.Itotal(weights, w_lower=1), 3)
array([0. , 0.5 , 1. , 1.02 , 2.433, 2.433])
Test :meth:`Isplines_total.dItotal_dx`:
>>> x_deriv = numpy.array([-0.5, -0.25, 0, 0.01, 0.5, 0.7, 1.0, 1.5])
>>> for xval in x_deriv:
... xval = numpy.array([xval])
... def func(xval):
... return Isplines_total(order, mesh, xval).Itotal(weights, 0)
... def dfunc(xval):
... return Isplines_total(order, mesh, xval).dItotal_dx(weights)
... err = scipy.optimize.check_grad(func, dfunc, xval)
... if err > 1e-5:
... raise ValueError(f"excess err {err} for {xval}")
>>> (isplines_total.dItotal_dw_lower() == numpy.ones(x.shape)).all()
True
Test :meth:`Isplines_total.dItotal_dweights`:
>>> isplines_total3 = Isplines_total(order, mesh, x_deriv)
>>> wl = 1.5
>>> (isplines_total3.dItotal_dweights(weights, wl).shape ==
... (len(x_deriv), len(weights)))
True
>>> weightslist = list(weights)
>>> for ix, iw in itertools.product(range(len(x_deriv)),
... range(len(weights))):
... w = numpy.array([weightslist[iw]])
... def func(w):
... iweights = numpy.array(weightslist[: iw] +
... list(w) +
... weightslist[iw + 1:])
... return isplines_total3.Itotal(iweights, wl)[ix]
... def dfunc(w):
... iweights = numpy.array(weightslist[: iw] +
... list(w) +
... weightslist[iw + 1:])
... return isplines_total3.dItotal_dweights(iweights, wl)[ix,
... iw]
... err = scipy.optimize.check_grad(func, dfunc, w)
... if err > 1e-6:
... raise ValueError(f"excess err {err} for {ix, iw}")
Plot the total of the I-spline family shown in Fig. 1 of
`Ramsay (1988)`_, adding some linear extrapolation outside the
mesh range:
>>> xplot = numpy.linspace(-0.2, 1.2, 1000)
>>> isplines_totalplot = Isplines_total(order, mesh, xplot)
>>> df = pd.DataFrame({'x': xplot,
... 'Itotal': isplines_totalplot.Itotal(weights, 0)})
>>> _ = df.plot(x='x', y='Itotal')
.. _`Ramsay (1988)`: https://www.jstor.org/stable/2245395
"""
def __init__(self, order, mesh, x):
"""See main class docstring."""
if not (isinstance(order, int) and order >= 1):
raise ValueError(f"`order` not int >= 1: {order}")
self.order = order
self.mesh = numpy.array(mesh, dtype="float")
if self.mesh.ndim != 1:
raise ValueError(f"`mesh` not array-like of dimension 1: {mesh}")
if len(self.mesh) < 2:
raise ValueError(f"`mesh` not length >= 2: {mesh}")
if not numpy.array_equal(self.mesh, numpy.unique(self.mesh)):
raise ValueError(f"`mesh` elements not unique and sorted: {mesh}")
self.lower = self.mesh[0]
self.upper = self.mesh[-1]
assert self.lower < self.upper
self.n = len(self.mesh) - 2 + self.order
self._x = x.copy()
self._x.flags.writeable = False
# indices of `x` in, above, or below I-spline range
self._index = {
"lower": numpy.flatnonzero(self.x < self.lower),
"upper": numpy.flatnonzero(self.x > self.upper),
"in": numpy.flatnonzero((self.x >= self.lower) & (self.x <= self.upper)),
}
# values of x in each range
self._x_byrange = {
rangename: self.x[index] for rangename, index in self._index.items()
}
# Isplines for each range: for lower and upper it is value at bound
self._isplines = {
"in": Isplines(self.order, self.mesh, self._x_byrange["in"]),
"lower": Isplines(self.order, self.mesh, numpy.array([self.lower])),
"upper": Isplines(self.order, self.mesh, numpy.array([self.upper])),
}
# for caching values
self._cache = {}
self._max_cache_size = 100
@property
def x(self):
"""numpy.ndarray: Points at which spline is evaluated."""
return self._x
[docs]
def Itotal(self, weights, w_lower):
r"""Weighted sum of spline family at points :attr:`Isplines_total.x`.
Parameters
----------
weights : array-like
Nonnegative weights :math:`w_i` of members :math:`I_i` of spline
family, should be of length equal to :attr:`Isplines.n`.
w_lower : float
The value at the lower bound :math:`L` of the spline range,
:math:`w_{\rm{lower}}`.
Returns
-------
numpy.ndarray
:math:`I_{\rm{total}}` for each point in :attr:`Isplines_total.x`.
"""
args = (tuple(weights), w_lower, "Itotal")
if args not in self._cache:
if len(self._cache) > self._max_cache_size:
self._cache = {}
self._cache[args] = self._calculate_Itotal_or_dItotal(*args)
return self._cache[args]
def _calculate_Itotal_or_dItotal(self, weights, w_lower, quantity):
"""Calculate :meth:`Isplines.Itotal` or derivatives.
Parameters have same meaning as for :meth:`Isplines.Itotal`
except for `quantity`, which should be
- 'Itotal' to compute :meth:`Isplines.Itotal`
- 'dItotal_dx' to compute :meth:`Isplines.dItotal_dx`
- 'dItotal_dweights` to compute :meth:`Isplines.dItotal_dweights`
Also, `weights` must be hashable (e.g., a tuple).
"""
# check validity of `weights`
if len(weights) != self.n:
raise ValueError(f"invalid length of `weights`: {weights}")
if any(weight < 0 for weight in weights):
raise ValueError(f"`weights` not all non-negative: {weights}")
# compute return values for each category of indices
returnvals = {}
if quantity == "Itotal":
returnshape = len(self.x)
if len(self._index["in"]):
returnvals["in"] = (
numpy.sum(
[
self._isplines["in"].I(i) * weights[i - 1]
for i in range(1, self.n + 1)
],
axis=0,
)
+ w_lower
)
# values of Itotal at limits
Itotal_limits = {"lower": w_lower, "upper": w_lower + sum(weights)}
for name, limit in [("lower", self.lower), ("upper", self.upper)]:
if not len(self._index[name]):
continue
returnvals[name] = Itotal_limits[name] + (
self._x_byrange[name] - limit
) * sum(
self._isplines[name].dI_dx(i) * weights[i - 1]
for i in range(1, self.n + 1)
)
elif quantity == "dItotal_dx":
returnshape = len(self.x)
if len(self._index["in"]):
returnvals["in"] = numpy.sum(
[
self._isplines["in"].dI_dx(i) * weights[i - 1]
for i in range(1, self.n + 1)
],
axis=0,
)
for name in ["lower", "upper"]:
if not len(self._index[name]):
continue
returnvals[name] = sum(
self._isplines[name].dI_dx(i) * weights[i - 1]
for i in range(1, self.n + 1)
)
elif quantity == "dItotal_dweights":
returnshape = (len(self.x), len(weights))
if len(self._index["in"]):
returnvals["in"] = (
numpy.vstack(
[self._isplines["in"].I(i) for i in range(1, self.n + 1)]
)
).transpose()
# values of I at limits
I_limits = {"lower": 0.0, "upper": 1.0}
for name, limit in [("lower", self.lower), ("upper", self.upper)]:
if not len(self._index[name]):
continue
returnvals[name] = numpy.vstack(
[
I_limits[name]
+ (self._x_byrange[name] - limit)
* self._isplines[name].dI_dx(i)
for i in range(1, self.n + 1)
]
).transpose()
else:
raise ValueError(f"invalid `quantity` {quantity}")
# reconstruct single return value from indices and returnvalues
returnval = numpy.full(returnshape, fill_value=numpy.nan)
for name, name_index in self._index.items():
if len(name_index):
returnval[name_index] = returnvals[name]
assert not numpy.isnan(returnval).any()
returnval.flags.writeable = False
return returnval
[docs]
def dItotal_dx(self, weights):
r"""Deriv :meth:`Isplines_total.Itotal` by :attr:`Isplines_total.x`.
Note
----
Derivatives calculated from equations in :meth:`Isplines_total.Itotal`:
.. math::
\frac{\partial I_{\rm{total}}\left(x\right)}{\partial x}
=
\begin{cases}
\sum_i w_i \frac{\partial I_i\left(x\right)}{\partial x}
& \rm{if\;} L \le x \le U, \\
\left.\frac{\partial I_{\rm{total}}\left(y\right)}
{\partial y}\right\rvert_{y=L}
& \rm{if\;} x < L, \\
\left.\frac{\partial I_{\rm{total}}\left(y\right)}
{\partial y}\right\rvert_{y=U}
& \rm{otherwise}.
\end{cases}
Note that
.. math::
\left.\frac{\partial I_{\rm{total}}\left(y\right)}
{\partial y}\right\rvert_{y=L}
&=&
\sum_i w_i \left.\frac{\partial I_i\left(y\right)}{\partial y}
\right\rvert_{y=L}
\\
\left.\frac{\partial I_{\rm{total}}\left(y\right)}
{\partial y}\right\rvert_{y=U}
&=&
\sum_i w_i \left.\frac{\partial I_i\left(y\right)}{\partial y}
\right\rvert_{y=U}
Parameters
----------
weights : array-like
Same meaning as for :meth:`Isplines_total.Itotal`.
Returns
-------
numpy.ndarray
Derivative :math:`\frac{\partial I_{\rm{total}}}{\partial x}`
for each point in :attr:`Isplines_total.x`.
"""
args = (tuple(weights), None, "dItotal_dx")
if args not in self._cache:
if len(self._cache) > self._max_cache_size:
self._cache = {}
self._cache[args] = self._calculate_Itotal_or_dItotal(*args)
return self._cache[args]
[docs]
def dItotal_dweights(self, weights, w_lower):
r"""Derivative of :meth:`Isplines_total.Itotal` by :math:`w_i`.
Parameters
----------
weights : array-like
Same meaning as for :meth:`Isplines.Itotal`.
w_lower : float
Same meaning as for :meth:`Isplines.Itotal`.
Returns
-------
numpy.ndarray
Array is of shape `(len(x), len(weights))`, and element
`ix, iweight` gives derivative with respect to weight
`weights[iweight]` at element `[ix]` of :attr:`Isplines_total.x`.
Note
----
The derivative is:
.. math::
\frac{\partial I_{\rm{total}}\left(x\right)}{\partial w_i}
=
\begin{cases}
I_i\left(x\right)
& \rm{if\;} L \le x \le U, \\
I_i\left(L\right) + \left(x-L\right)
\left.\frac{\partial I_i\left(y\right)}{\partial y}\right\vert_{y=L}
& \rm{if\;} x < L, \\
I_i\left(U\right) + \left(x-U\right)
\left.\frac{\partial I_i\left(y\right)}{\partial y}\right\vert_{y=U}
& \rm{if\;} x > U.
\end{cases}
Note that:
.. math::
I_i\left(L\right) &=& 0 \\
I_i\left(U\right) &=& 1.
"""
return self._calculate_Itotal_or_dItotal(
tuple(weights), w_lower, "dItotal_dweights"
)
[docs]
def dItotal_dw_lower(self):
r"""Deriv of :meth:`Isplines_total.Itotal` by :math:`w_{\rm{lower}}`.
Returns
-------
numpy.ndarray
:math:`\frac{\partial{I_{\rm{total}}}}{\partial w_{\rm{lower}}}`,
which is just one for all :attr:`Isplines_total.x`.
"""
res = numpy.ones(self.x.shape, dtype="float")
res.flags.writeable = False
return res
[docs]
class Isplines:
r"""Implements I-splines (see `Ramsay (1988)`_).
Parameters
----------
order : int
Sets :attr:`Isplines.order`.
mesh : array-like
Sets :attr:`Isplines.mesh`.
x : numpy.ndarray
Sets :attr:`Isplines.x`.
Attributes
----------
order : int
Order of spline, :math:`k` in notation of `Ramsay (1988)`_. Note that
the degree of the I-spline is equal to :math:`k`, while the
associated M-spline has order :math:`k` but degree :math:`k - 1`.
mesh : numpy.ndarray
Mesh sequence, :math:`\xi_1 < \ldots < \xi_q` in the notation
of `Ramsay (1988)`_. This class implements **fixed** mesh sequences.
n : int
Number of members in spline, denoted as :math:`n` in `Ramsay (1988)`_.
Related to number of points :math:`q` in the mesh and the order
:math:`k` by :math:`n = q - 2 + k`.
lower : float
Lower end of interval spanned by the splines (first point in mesh).
upper : float
Upper end of interval spanned by the splines (last point in mesh).
Note
----
The methods of this class cache their results and return immutable
numpy arrays. Do **not** make these arrays mutable and change their
values, as this will lead to invalid caching.
Example
-------
Short examples to demonstrate and test :class:`Isplines`:
.. plot::
:context: reset
>>> import itertools
>>> import numpy
>>> import pandas as pd
>>> import scipy.optimize
>>> from dms_variants.ispline import Isplines
>>> order = 3
>>> mesh = [0.0, 0.3, 0.5, 0.6, 1.0]
>>> x = numpy.array([0, 0.2, 0.3, 0.4, 0.8, 0.99999])
>>> isplines = Isplines(order, mesh, x)
>>> isplines.order
3
>>> isplines.mesh
array([0. , 0.3, 0.5, 0.6, 1. ])
>>> isplines.n
6
>>> isplines.lower
0.0
>>> isplines.upper
1.0
Evaluate the I-splines at some selected points:
>>> for i in range(1, isplines.n + 1):
... print(f"I{i}: {numpy.round(isplines.I(i), 2)}")
... # doctest: +NORMALIZE_WHITESPACE
I1: [0. 0.96 1. 1. 1. 1. ]
I2: [0. 0.52 0.84 0.98 1. 1. ]
I3: [0. 0.09 0.3 0.66 1. 1. ]
I4: [0. 0. 0. 0.02 0.94 1. ]
I5: [0. 0. 0. 0. 0.58 1. ]
I6: [0. 0. 0. 0. 0.13 1. ]
Check that gradients are correct for :meth:`Isplines.dI_dx`:
>>> for i, xval in itertools.product(range(1, isplines.n + 1), x):
... xval = numpy.array([xval])
... def func(xval):
... return Isplines(order, mesh, xval).I(i)
... def dfunc(xval):
... return Isplines(order, mesh, xval).dI_dx(i)
... err = scipy.optimize.check_grad(func, dfunc, xval)
... if err > 1e-5:
... raise ValueError(f"excess err {err} for {i}, {xval}")
Plot the I-splines in Fig. 1 of `Ramsay (1988)`_:
>>> xplot = numpy.linspace(0, 1, 1000)
>>> isplines_xplot = Isplines(order, mesh, xplot)
>>> data = {'x': xplot}
>>> for i in range(1, isplines.n + 1):
... data[f"I{i}"] = isplines_xplot.I(i)
>>> df = pd.DataFrame(data)
>>> _ = df.plot(x='x')
.. _`Ramsay (1988)`: https://www.jstor.org/stable/2245395
"""
def __init__(self, order, mesh, x):
"""See main class docstring."""
if not (isinstance(order, int) and order >= 1):
raise ValueError(f"`order` not int >= 1: {order}")
self.order = order
self.mesh = numpy.array(mesh, dtype="float")
if self.mesh.ndim != 1:
raise ValueError(f"`mesh` not array-like of dimension 1: {mesh}")
if len(self.mesh) < 2:
raise ValueError(f"`mesh` not length >= 2: {mesh}")
if not numpy.array_equal(self.mesh, numpy.unique(self.mesh)):
raise ValueError(f"`mesh` elements not unique and sorted: {mesh}")
self.lower = self.mesh[0]
self.upper = self.mesh[-1]
assert self.lower < self.upper
self.n = len(self.mesh) - 2 + self.order
if not (isinstance(x, numpy.ndarray) and x.ndim == 1):
raise ValueError("`x` is not numpy.ndarray of dimension 1")
if (x < self.lower).any() or (x > self.upper).any():
raise ValueError(f"`x` outside {self.lower} and {self.upper}: {x}")
self._x = x.copy()
self._x.flags.writeable = False
self._msplines = Msplines(order + 1, mesh, self.x)
# for caching values
self._cache = {}
self._max_cache_size = 100
@property
def x(self):
"""numpy.ndarray: Points at which spline is evaluated."""
return self._x
[docs]
def I(self, i): # noqa: E743,E741
r"""Evaluate spline :math:`I_i` at point(s) :attr:`Isplines.x`.
Parameters
----------
i : int
Spline member :math:`I_i`, where :math:`1 \le i \le`
:attr:`Isplines.n`.
Returns
-------
numpy.ndarray
The values of the I-spline at each point in :attr:`Isplines.x`.
Note
----
The spline is evaluated using the formula given in the
`Praat manual`_, which corrects some errors in the formula
provided by `Ramsay (1988)`_:
.. math::
I_i\left(x\right)
=
\begin{cases}
0 & \rm{if\;} i > j, \\
1 & \rm{if\;} i < j - k, \\
\sum_{m=i+1}^j \left(t_{m+k+1} - t_m\right)
M_m\left(x \mid k + 1\right) / \left(k + 1 \right)
& \rm{otherwise},
\end{cases}
where :math:`j` is the index such that :math:`t_j \le x < t_{j+1}`
(the :math:`\left\{t_j\right\}` are the :attr:`Msplines.knots` for a
M-spline of order :math:`k + 1`) and :math:`k` is
:attr:`Isplines.order`.
.. _`Ramsay (1988)`: https://www.jstor.org/stable/2245395
.. _`Praat manual`: http://www.fon.hum.uva.nl/praat/manual/spline.html
"""
args = (i, "I")
if args not in self._cache:
if len(self._cache) > self._max_cache_size:
self._cache = {}
self._cache[args] = self._calculate_I_or_dI(*args)
return self._cache[args]
@property
def j(self):
"""numpy.ndarray: :math:`j` as defined in :meth:`Isplines.I`."""
if not hasattr(self, "_j"):
self._j = numpy.searchsorted(self._msplines.knots, self.x, "right")
assert (1 <= self._j).all() and (self._j <= len(self._msplines.knots)).all()
assert self.x.shape == self._j.shape
return self._j
@property
def _sum_terms_I(self):
"""numpy.ndarray: sum terms for :meth:`Isplines.I`.
Row `m - 1` has summation term for `m`.
"""
if not hasattr(self, "_sum_terms_I_val"):
k = self.order
self._sum_terms_I_val = numpy.vstack(
[
(self._msplines.knots[m + k] - self._msplines.knots[m - 1])
* self._msplines.M(m, k + 1)
/ (k + 1)
for m in range(1, self._msplines.n + 1)
]
)
assert self._sum_terms_I_val.shape == (self._msplines.n, len(self.x))
return self._sum_terms_I_val
@property
def _sum_terms_dI_dx(self):
"""numpy.ndarray: sum terms for :meth:`Isplines.dI_dx`.
Row `m - 1` has summation term for `m`.
"""
if not hasattr(self, "_sum_terms_dI_dx_val"):
k = self.order
self._sum_terms_dI_dx_val = numpy.vstack(
[
(self._msplines.knots[m + k] - self._msplines.knots[m - 1])
* self._msplines.dM_dx(m, k + 1)
/ (k + 1)
for m in range(1, self._msplines.n + 1)
]
)
assert self._sum_terms_dI_dx_val.shape == (self._msplines.n, len(self.x))
return self._sum_terms_dI_dx_val
def _calculate_I_or_dI(self, i, quantity):
"""Calculate :meth:`Isplines.I` or :meth:`Isplines.dI_dx`.
Parameters
----------
i : int
Same meaning as for :meth:`Isplines.I`.
quantity : {'I', 'dI'}
Calculate :meth:`Isplines.I` or :meth:`Isplines.dI_dx`?
Returns
-------
numpy.ndarray
The return value of :meth:`Isplines.I` or :meth:`Isplines.dI_dx`.
Note
----
Most calculations for :meth:`Isplines.I` and :meth:`Isplines.dI_dx`
are the same, so this method implements both.
"""
if quantity == "I":
sum_terms = self._sum_terms_I
i_lt_jminusk = 1.0
elif quantity == "dI":
sum_terms = self._sum_terms_dI_dx
i_lt_jminusk = 0.0
else:
raise ValueError(f"invalid `quantity` {quantity}")
if not (1 <= i <= self.n):
raise ValueError(f"invalid spline member `i` of {i}")
k = self.order
# create `binary_terms` where entry (m - 1, x) is 1 if and only if
# the corresponding `sum_terms` entry is part of the sum.
binary_terms = numpy.vstack(
[
numpy.zeros(len(self.x)) if m < i + 1 else (m <= self.j).astype(int)
for m in range(1, self._msplines.n + 1)
]
)
assert binary_terms.shape == sum_terms.shape
# compute sums from `sum_terms` and `binary_terms`
sums = numpy.sum(sum_terms * binary_terms, axis=0)
assert sums.shape == self.x.shape
# return value with sums, 0, or 1
res = numpy.where(
i > self.j, 0.0, numpy.where(i < self.j - k, i_lt_jminusk, sums)
)
res.flags.writeable = False
return res
[docs]
def dI_dx(self, i):
r"""Derivative of :meth:`Isplines.I` by :attr:`Isplines.x`.
Parameters
----------
i : int
Same meaning as for :meth:`Isplines.I`.
Returns
-------
numpy.ndarray
Derivative of I-spline with respect to :attr:`Isplines.x`.
Note
----
The derivative is calculated from the equation in :meth:`Isplines.I`:
.. math::
\frac{\partial I_i\left(x\right)}{\partial x}
=
\begin{cases}
0 & \rm{if\;} i > j \rm{\; or \;} i < j - k, \\
\sum_{m=i+1}^j\left(t_{m+k+1} - t_m\right)
\frac{\partial M_m\left(x \mid k+1\right)}{\partial x}
\frac{1}{k + 1}
& \rm{otherwise}.
\end{cases}
"""
args = (i, "dI")
if args not in self._cache:
if len(self._cache) > self._max_cache_size:
self._cache = {}
self._cache[args] = self._calculate_I_or_dI(*args)
return self._cache[args]
[docs]
class Msplines:
r"""Implements M-splines (see `Ramsay (1988)`_).
Parameters
----------
order : int
Sets :attr:`Msplines.order`.
mesh : array-like
Sets :attr:`Msplines.mesh`.
x : numpy.ndarray
Sets :attr:`Msplines.x`.
Attributes
----------
order : int
Order of spline, :math:`k` in notation of `Ramsay (1988)`_.
Polynomials are of degree :math:`k - 1`.
mesh : numpy.ndarray
Mesh sequence, :math:`\xi_1 < \ldots < \xi_q` in the notation
of `Ramsay (1988)`_. This class implements **fixed** mesh sequences.
n : int
Number of members in spline, denoted as :math:`n` in `Ramsay (1988)`_.
Related to number of points :math:`q` in the mesh and the order
:math:`k` by :math:`n = q - 2 + k`.
knots : numpy.ndarray
The knot sequence, :math:`t_1, \ldots, t_{n + k}` in the notation of
`Ramsay (1988)`_.
lower : float
Lower end of interval spanned by the splines (first point in mesh).
upper : float
Upper end of interval spanned by the splines (last point in mesh).
Note
----
The methods of this class cache their results and return immutable
numpy arrays. Do **not** make those arrays mutable and change their
values as this will lead to invalid caching.
Example
-------
Demonstrate and test :class:`Msplines`:
.. plot::
:context: reset
>>> import functools
>>> import itertools
>>> import numpy
>>> import pandas as pd
>>> import scipy.optimize
>>> from dms_variants.ispline import Msplines
>>> order = 3
>>> mesh = [0.0, 0.3, 0.5, 0.6, 1.0]
>>> x = numpy.array([0, 0.2, 0.3, 0.4, 0.8, 0.99999])
>>> msplines = Msplines(order, mesh, x)
>>> msplines.order
3
>>> msplines.mesh
array([0. , 0.3, 0.5, 0.6, 1. ])
>>> msplines.n
6
>>> msplines.knots
array([0. , 0. , 0. , 0.3, 0.5, 0.6, 1. , 1. , 1. ])
>>> msplines.lower
0.0
>>> msplines.upper
1.0
Evaluate the M-splines at some selected points:
>>> for i in range(1, msplines.n + 1):
... print(f"M{i}: {numpy.round(msplines.M(i), 2)}")
... # doctest: +NORMALIZE_WHITESPACE
M1: [10. 1.11 0. 0. 0. 0. ]
M2: [0. 3.73 2.4 0.6 0. 0. ]
M3: [0. 1.33 3. 3.67 0. 0. ]
M4: [0. 0. 0. 0.71 0.86 0. ]
M5: [0. 0. 0. 0. 3.3 0. ]
M6: [0. 0. 0. 0. 1.88 7.5 ]
Check that the gradients are correct:
>>> for i, xval in itertools.product(range(1, msplines.n + 1), x):
... xval = numpy.array([xval])
... def func(xval):
... return Msplines(order, mesh, xval).M(i)
... def dfunc(xval):
... return Msplines(order, mesh, xval).dM_dx(i)
... err = scipy.optimize.check_grad(func, dfunc, xval)
... if err > 1e-5:
... raise ValueError(f"excess err {err} for {i}, {xval}")
Plot the M-splines in in Fig. 1 of `Ramsay (1988)`_:
>>> xplot = numpy.linspace(0, 1, 1000, endpoint=False)
>>> msplines_plot = Msplines(order, mesh, xplot)
>>> data = {'x': xplot}
>>> for i in range(1, msplines_plot.n + 1):
... data[f"M{i}"] = msplines_plot.M(i)
>>> df = pd.DataFrame(data)
>>> _ = df.plot(x='x')
.. _`Ramsay (1988)`: https://www.jstor.org/stable/2245395
"""
def __init__(self, order, mesh, x):
"""See main class docstring."""
if not (isinstance(order, int) and order >= 1):
raise ValueError(f"`order` not int >= 1: {order}")
self.order = order
self.mesh = numpy.array(mesh, dtype="float")
if self.mesh.ndim != 1:
raise ValueError(f"`mesh` not array-like of dimension 1: {mesh}")
if len(self.mesh) < 2:
raise ValueError(f"`mesh` not length >= 2: {mesh}")
if not numpy.array_equal(self.mesh, numpy.unique(self.mesh)):
raise ValueError(f"`mesh` elements not unique and sorted: {mesh}")
self.lower = self.mesh[0]
self.upper = self.mesh[-1]
assert self.lower < self.upper
self.knots = numpy.array(
[self.lower] * self.order
+ list(self.mesh[1:-1])
+ [self.upper] * self.order,
dtype="float",
)
self.n = len(self.knots) - self.order
assert self.n == len(self.mesh) - 2 + self.order
if not (isinstance(x, numpy.ndarray) and x.ndim == 1):
raise ValueError("`x` is not numpy.ndarray of dimension 1")
if (x < self.lower).any() or (x > self.upper).any():
raise ValueError(f"`x` outside {self.lower} and {self.upper}: {x}")
self._x = x.copy()
self._x.flags.writeable = False
self._ti_le_x_lt_tiplusk_cache = {}
# for caching values
self._M_cache = {}
self._dM_cache = {}
self._max_cache_size = 100
def _ti_le_x_lt_tiplusk(self, ti, tiplusk):
r"""Indices where :math:`t_i \le x \le t_{i+k}`.
Parameters
----------
ti : float
:math:`t_i`
tiplusk : float
:math:`t_{i+k}`
Returns
-------
numpy.ndarray
Array of booleans of same length as :attr:`Msplines.x` indicating
if :math:`t_i \le x \le t_{i+k}`.
"""
key = (ti, tiplusk)
if key not in self._ti_le_x_lt_tiplusk_cache:
val = (ti <= self.x) & (self.x < tiplusk)
val.flags.writeable = False
assert val.dtype == bool
if len(self._ti_le_x_lt_tiplusk_cache) > self._max_cache_size:
self._ti_le_x_lt_tiplusk_cache = {}
self._ti_le_x_lt_tiplusk_cache[key] = val
return self._ti_le_x_lt_tiplusk_cache[key]
@property
def x(self):
"""numpy.ndarray: Points at which spline is evaluated."""
return self._x
[docs]
def M(self, i, k=None, invalid_i="raise"):
r"""Evaluate spline :math:`M_i` at point(s) :attr:`Msplines.x`.
Parameters
----------
i : int
Spline member :math:`M_i`, where :math:`1 \le i \le`
:attr:`Msplines.n`.
k : int or None
Order of spline. If `None`, assumed to be :attr:`Msplines.order`.
invalid_i : {'raise', 'zero'}
If `i` is invalid, do we raise an error or return 0?
Returns
-------
numpy.ndarray
The values of the M-spline at each point in :attr:`Msplines.x`.
Note
----
The spline is evaluated using the recursive relationship given by
`Ramsay (1988) <https://www.jstor.org/stable/2245395>`_:
.. math::
M_i\left(x \mid k=1\right)
&=&
\begin{cases}
1 / \left(t_{i+1} - t_i\right), & \rm{if\;} t_i \le x < t_{i+1} \\
0, & \rm{otherwise}
\end{cases} \\
M_i\left(x \mid k > 1\right) &=&
\begin{cases}
\frac{k\left[\left(x - t_i\right) M_i\left(x \mid k-1\right) +
\left(t_{i+k} -x\right) M_{i+1}\left(x \mid k-1\right)
\right]}
{\left(k - 1\right)\left(t_{i + k} - t_i\right)},
& \rm{if\;} t_i \le x < t_{i+k} \\
0, & \rm{otherwise}
\end{cases}
"""
args = (i, k, invalid_i)
if args not in self._M_cache:
if len(self._M_cache) > self._max_cache_size:
self._M_cache = {}
self._M_cache[args] = self._calculate_M(*args)
return self._M_cache[args]
def _calculate_M(self, i, k, invalid_i):
"""Calculate :meth:`Msplines.M` with result caching."""
if not (1 <= i <= self.n):
if invalid_i == "raise":
raise ValueError(f"invalid spline member `i` of {i}")
elif invalid_i == "zero":
return 0
else:
raise ValueError(f"invalid `invalid_i` of {invalid_i}")
if k is None:
k = self.order
if not 1 <= k <= self.order:
raise ValueError(f"invalid spline order `k` of {k}")
tiplusk = self.knots[i + k - 1]
ti = self.knots[i - 1]
if tiplusk == ti:
return 0
boolindex = self._ti_le_x_lt_tiplusk(ti, tiplusk)
if k == 1:
res = numpy.where(boolindex, 1.0 / (tiplusk - ti), 0.0)
res.flags.writeable = False
return res
else:
assert k > 1
res = numpy.where(
boolindex,
(
k
* (
(self.x - ti) * self.M(i, k - 1)
+ (tiplusk - self.x) * self.M(i + 1, k - 1, invalid_i="zero")
)
/ ((k - 1) * (tiplusk - ti))
),
0.0,
)
res.flags.writeable = False
return res
[docs]
def dM_dx(self, i, k=None, invalid_i="raise"):
r"""Derivative of :meth:`Msplines.M` by to :attr:`Msplines.x`.
Parameters
----------
i : int
Same as for :meth:`Msplines.M`.
k : int or None
Same as for :meth:`Msplines.M`.
invalid_i : {'raise', 'zero'}
Same as for :meth:`Msplines.M`.
Returns
-------
numpy.ndarray
Derivative of M-spline with respect to :attr:`Msplines.x`.
Note
----
The derivative is calculated from the equation in :meth:`Msplines.M`:
.. math::
\frac{\partial M_i\left(x \mid k=1\right)}{\partial x} &=& 0
\\
\frac{\partial M_i\left(x \mid k > 1\right)}{\partial x}
&=&
\begin{cases}
\frac{k\left[\left(x - t_i\right)
\frac{\partial M_i\left(x \mid k-1\right)}{\partial x}
+
M_i\left(x \mid k-1\right)
+
\left(t_{i+k} -x\right)
\frac{\partial M_{i+1}\left(x \mid k-1\right)}
{\partial x}
-
M_{i+1}\left(x \mid k-1\right)
\right]}
{\left(k - 1\right)\left(t_{i + k} - t_i\right)},
& \rm{if\;} t_i \le x < t_{i+1} \\
0, & \rm{otherwise}
\end{cases}
"""
args = (i, k, invalid_i)
if args not in self._dM_cache:
if len(self._dM_cache) > self._max_cache_size:
self._dM_cache = {}
self._dM_cache[args] = self._calculate_dM_dx(*args)
return self._dM_cache[args]
def _calculate_dM_dx(self, i, k, invalid_i):
"""Calculate :meth:`Msplines.dM_dx` with results caching."""
if not (1 <= i <= self.n):
if invalid_i == "raise":
raise ValueError(f"invalid spline member `i` of {i}")
elif invalid_i == "zero":
return 0
else:
raise ValueError(f"invalid `invalid_i` of {invalid_i}")
if k is None:
k = self.order
if not 1 <= k <= self.order:
raise ValueError(f"invalid spline order `k` of {k}")
tiplusk = self.knots[i + k - 1]
ti = self.knots[i - 1]
if tiplusk == ti or k == 1:
return 0
else:
assert k > 1
boolindex = self._ti_le_x_lt_tiplusk(ti, tiplusk)
res = numpy.where(
boolindex,
(
k
* (
(self.x - ti) * self.dM_dx(i, k - 1)
+ self.M(i, k - 1)
+ (tiplusk - self.x)
* self.dM_dx(i + 1, k - 1, invalid_i="zero")
- self.M(i + 1, k - 1, invalid_i="zero")
)
/ ((k - 1) * (tiplusk - ti))
),
0.0,
)
res.flags.writeable = False
return res
if __name__ == "__main__":
import doctest
doctest.testmod()