Dynamic Time Warping#
This section covers works related to Dynamic Time Warping for time series.
Dynamic Time Warping (DTW) [Sakoe and Chiba, 1978] is a similarity measure
between time series.
Consider two time series
tslearn tip
In tslearn
, such time series would be represented as arrays of
respective
shapes (n, p)
and (m, p)
and DTW can be computed using the following code:
from tslearn.metrics import dtw, dtw_path
dtw_score = dtw(x, x_prime)
# Or, if the path is also
# an important information:
path, score = dtw_path(x, x_prime)
Optimization Problem#
In the following, a path
DTW between
where
is a sequence of index pairs with and andfor all
, is related to as follows:
In what follows, we will denote
The following image exhibits the DTW path (in white) for a given pair of time
series, on top of the cross-similarity matrix that stores
Show code cell source
%config InlineBackend.figure_format = 'svg'
import matplotlib.pyplot as plt
import numpy
import warnings
warnings.filterwarnings('ignore')
plt.ion()
def plot_constraints(title, sz):
for pos in range(sz):
plt.plot([-.5, sz - .5], [pos + .5, pos + .5],
color='w', linestyle='-', linewidth=3)
plt.plot([pos + .5, pos + .5], [-.5, sz - .5],
color='w', linestyle='-', linewidth=3)
plt.xticks([])
plt.yticks([])
plt.gca().axis("off")
plt.title(title)
plt.tight_layout()
def plot_path(s_y1, s_y2, similarity_matrix, path):
sz = s_y1.shape[0]
plt.figure(1, figsize=(4, 4))
# definitions for the axes
left, bottom = 0.01, 0.1
w_ts = h_ts = 0.2
left_h = left + w_ts + 0.02
width = height = 0.65
bottom_h = bottom + height + 0.02
rect_s_y = [left, bottom, w_ts, height]
rect_gram = [left_h, bottom, width, height]
rect_s_x = [left_h, bottom_h, width, h_ts]
ax_gram = plt.axes(rect_gram)
ax_s_x = plt.axes(rect_s_x)
ax_s_y = plt.axes(rect_s_y)
ax_gram.imshow(similarity_matrix[::-1], origin='lower')
ax_gram.axis("off")
ax_gram.autoscale(False)
ax_gram.plot([j for (i, j) in path], [sz - i - 1 for (i, j) in path], "w-",
linewidth=3.)
ax_s_x.plot(numpy.arange(sz), s_y2, "b-", linewidth=3.)
ax_s_x.axis("off")
ax_s_x.set_xlim((0, sz - 1))
ax_s_y.plot(- s_y1[::-1], numpy.arange(sz), "b-", linewidth=3.)
ax_s_y.axis("off")
ax_s_y.set_ylim((0, sz - 1))
plt.tight_layout()
plt.show()
import numpy
from scipy.spatial.distance import cdist
from tslearn import metrics
s_x = numpy.loadtxt("../data/sample_series.txt")
s_y1 = numpy.concatenate((s_x, s_x)).reshape((-1, 1))
s_y2 = numpy.concatenate((s_x, s_x[::-1])).reshape((-1, 1))
path, sim = metrics.dtw_path(s_y1, s_y2)
mat = cdist(s_y1, s_y2)
plot_path(s_y1, s_y2, mat, path)
Algorithmic Solution#
There exists an
def dtw(x, x_prime, q=2):
for i in 1..n:
for j in 1..m:
dist = d(x[i], x_prime[j]) ** q
if i == 1 and j == 1:
gamma[i, j] = dist
else:
gamma[i, j] = dist + min(gamma[i-1, j] if i > 1
else inf,
gamma[i, j-1] if j > 1
else inf,
gamma[i-1, j-1] if (i > 1 and j > 1)
else inf)
return (gamma[n, m]) ** (1. / q)
The basic idea behind this algorithm is that there exists a recurrence relationship between partial DTW computations.
More precisely, if we denote by
and
comes from the constraints on admissible paths : the last element on an admissible path needs to match the last elements of the series; comes from the contiguity conditions on the admissible paths: all admissible paths that match with need to go through one of these 3 possible ancestors: , or .
The dynamic programming algorithm presented above relies on this recurrence formula and stores intermediate computations for efficiency.
Dot product notation
Dynamic Time Warping can also be formalized using the following notation:
where
Properties#
Dynamic Time Warping holds the following properties:
Suppose
is a time series that is constant except for a motif that occurs at some point in the series, and let us denote by a copy of in which the motif is temporally shifted by timestamps, then .
However, mathematically speaking, DTW is not a valid metric since it satisfies neither the triangular inequality nor the identity of indiscernibles.
Setting Additional Constraints#
The set of temporal deformations to which DTW is invariant can be reduced by imposing additional constraints on the set of acceptable paths. Such constraints typically consist in forcing paths to stay close to the diagonal.
The Sakoe-Chiba band is parametrized by a radius
Show code cell source
from tslearn.metrics import sakoe_chiba_mask
sz = 10
m = sakoe_chiba_mask(sz, sz, radius=3)
m[m == numpy.inf] = 1.
numpy.fill_diagonal(m, .5)
plt.figure()
plt.imshow(m, cmap="gray")
plot_constraints("Sakoe-Chiba mask of radius $r=3$.", sz)
tslearn tip
A typical call to DTW with Sakoe-Chiba band constraint in
tslearn
would be:
from tslearn.metrics import dtw
cost = dtw(
x, x_prime,
global_constraint="sakoe_chiba",
sakoe_chiba_radius=3
)
The Itakura parallelogram sets a maximum slope
Show code cell source
from tslearn.metrics import itakura_mask
sz = 10
m = itakura_mask(sz, sz, max_slope=2.)
m[m == numpy.inf] = 1.
numpy.fill_diagonal(m, .5)
plt.figure()
plt.imshow(m, cmap="gray")
plot_constraints("Itakura parallelogram mask of maximum slope $s=2$.", sz)
tslearn tip
A typical call to DTW with Itakura parallelogram constraint in
tslearn
would be:
from tslearn.metrics import dtw
cost = dtw(
x, x_prime,
global_constraint="itakura",
itakura_max_slope=2.
)
Computing Barycenters#
As illustrated in our introductory example, the ability to compute barycenters
for a given similarity measure is necessary for its use in some methods such as
A barycenter of a set of samples
When
However, here we are rather interested in computing barycenters with respect to the Dynamic Time Warping similarity measure. Optimizing this quantity can be done through the DTW Barycenter Averaging (DBA) algorithm presented in [Petitjean et al., 2011]. This algorithm repeats the following two steps until convergence:
Compute Dynamic Time Warping and associated matching
between the current barycenter and each of the time series in the set. Based on that, update, for each temporal position of the barycenter, a list of the points in time series from the set that are matched to this timestamp through DTW ;Update each element in the barycenter as the average of all its associated points (as stored at the previous step).
This algorithm is illustrated in the animation below (do not hesitate to slow it down using the “-” button) in which black segments figure assignments of time series elements to barycenter elements (here a barycenter of 2 time series is performed and the barycenter is represented in purple).
Show code cell source
from IPython.display import HTML
from celluloid import Camera
import warnings
from sklearn.exceptions import ConvergenceWarning
from tslearn.barycenters import _set_weights, euclidean_barycenter
from tslearn.metrics import dtw_path
from tslearn.utils import ts_size, to_time_series_dataset
def _mm_assignment(X, barycenter, weights, metric_params=None):
"""Computes item assignement based on DTW alignments and return cost as a
bonus.
Parameters
----------
X : numpy.array of shape (n, sz, d)
Time-series to be averaged
barycenter : numpy.array of shape (barycenter_size, d)
Barycenter as computed at the current step of the algorithm.
weights: array
Weights of each X[i]. Must be the same size as len(X).
metric_params: dict or None (default: None)
DTW constraint parameters to be used.
See :ref:`tslearn.metrics.dtw_path <fun-tslearn.metrics.dtw_path>` for
a list of accepted parameters
If None, no constraint is used for DTW computations.
Returns
-------
list of index pairs
Warping paths
float
Current alignment cost
"""
if metric_params is None:
metric_params = {}
n = X.shape[0]
cost = 0.
list_p_k = []
for i in range(n):
path, dist_i = dtw_path(barycenter, X[i], **metric_params)
cost += dist_i ** 2 * weights[i]
list_p_k.append(path)
cost /= weights.sum()
return list_p_k, cost
def _subgradient_valence_warping(list_p_k, barycenter_size, weights):
"""Compute Valence and Warping matrices from paths.
Valence matrices are denoted :math:`V^{(k)}` and Warping matrices are
:math:`W^{(k)}` in [1]_.
This function returns a list of :math:`V^{(k)}` diagonals (as a vector)
and a list of :math:`W^{(k)}` matrices.
Parameters
----------
list_p_k : list of index pairs
Warping paths
barycenter_size : int
Size of the barycenter to generate.
weights: array
Weights of each X[i]. Must be the same size as len(X).
Returns
-------
list of numpy.array of shape (barycenter_size, )
list of weighted :math:`V^{(k)}` diagonals (as a vector)
list of numpy.array of shape (barycenter_size, sz_k)
list of weighted :math:`W^{(k)}` matrices
References
----------
.. [1] D. Schultz and B. Jain. Nonsmooth Analysis and Subgradient Methods
for Averaging in Dynamic Time Warping Spaces.
Pattern Recognition, 74, 340-358.
"""
list_v_k = []
list_w_k = []
for k, p_k in enumerate(list_p_k):
sz_k = p_k[-1][1] + 1
w_k = numpy.zeros((barycenter_size, sz_k))
for i, j in p_k:
w_k[i, j] = 1.
list_w_k.append(w_k * weights[k])
list_v_k.append(w_k.sum(axis=1) * weights[k])
return list_v_k, list_w_k
def _mm_valence_warping(list_p_k, barycenter_size, weights):
"""Compute Valence and Warping matrices from paths.
Valence matrices are denoted :math:`V^{(k)}` and Warping matrices are
:math:`W^{(k)}` in [1]_.
This function returns the sum of :math:`V^{(k)}` diagonals (as a vector)
and a list of :math:`W^{(k)}` matrices.
Parameters
----------
list_p_k : list of index pairs
Warping paths
barycenter_size : int
Size of the barycenter to generate.
weights: array
Weights of each X[i]. Must be the same size as len(X).
Returns
-------
numpy.array of shape (barycenter_size, )
sum of weighted :math:`V^{(k)}` diagonals (as a vector)
list of numpy.array of shape (barycenter_size, sz_k)
list of weighted :math:`W^{(k)}` matrices
References
----------
.. [1] D. Schultz and B. Jain. Nonsmooth Analysis and Subgradient Methods
for Averaging in Dynamic Time Warping Spaces.
Pattern Recognition, 74, 340-358.
"""
list_v_k, list_w_k = _subgradient_valence_warping(
list_p_k=list_p_k,
barycenter_size=barycenter_size,
weights=weights)
diag_sum_v_k = numpy.zeros(list_v_k[0].shape)
for v_k in list_v_k:
diag_sum_v_k += v_k
return diag_sum_v_k, list_w_k
def _mm_update_barycenter(X, diag_sum_v_k, list_w_k):
"""Update barycenters using the formula from Algorithm 2 in [1]_.
Parameters
----------
X : numpy.array of shape (n, sz, d)
Time-series to be averaged
diag_sum_v_k : numpy.array of shape (barycenter_size, )
sum of weighted :math:`V^{(k)}` diagonals (as a vector)
list_w_k : list of numpy.array of shape (barycenter_size, sz_k)
list of weighted :math:`W^{(k)}` matrices
Returns
-------
numpy.array of shape (barycenter_size, d)
Updated barycenter
References
----------
.. [1] D. Schultz and B. Jain. Nonsmooth Analysis and Subgradient Methods
for Averaging in Dynamic Time Warping Spaces.
Pattern Recognition, 74, 340-358.
"""
d = X.shape[2]
barycenter_size = diag_sum_v_k.shape[0]
sum_w_x = numpy.zeros((barycenter_size, d))
for k, (w_k, x_k) in enumerate(zip(list_w_k, X)):
sum_w_x += w_k.dot(x_k[:ts_size(x_k)])
barycenter = numpy.diag(1. / diag_sum_v_k).dot(sum_w_x)
return barycenter
def dtw_barycenter_averaging(X, barycenter_size=None,
init_barycenter=None,
max_iter=30, tol=1e-5, weights=None,
metric_params=None,
verbose=False):
"""DTW Barycenter Averaging (DBA) method estimated through
Expectation-Maximization algorithm.
DBA was originally presented in [1]_.
This implementation is based on a idea from [2]_ (Majorize-Minimize Mean
Algorithm).
Parameters
----------
X : array-like, shape=(n_ts, sz, d)
Time series dataset.
barycenter_size : int or None (default: None)
Size of the barycenter to generate. If None, the size of the barycenter
is that of the data provided at fit
time or that of the initial barycenter if specified.
init_barycenter : array or None (default: None)
Initial barycenter to start from for the optimization process.
max_iter : int (default: 30)
Number of iterations of the Expectation-Maximization optimization
procedure.
tol : float (default: 1e-5)
Tolerance to use for early stopping: if the decrease in cost is lower
than this value, the
Expectation-Maximization procedure stops.
weights: None or array
Weights of each X[i]. Must be the same size as len(X).
If None, uniform weights are used.
metric_params: dict or None (default: None)
DTW constraint parameters to be used.
See :ref:`tslearn.metrics.dtw_path <fun-tslearn.metrics.dtw_path>` for
a list of accepted parameters
If None, no constraint is used for DTW computations.
verbose : boolean (default: False)
Whether to print information about the cost at each iteration or not.
Returns
-------
numpy.array of shape (barycenter_size, d) or (sz, d) if barycenter_size \
is None
DBA barycenter of the provided time series dataset.
float
Associated inertia
list of numpy.array of shape (barycenter_size, d) or (sz, d) if \
barycenter_size is None
list of DBA barycenters along iterations.
list of list of alignment paths
list of alignment paths along iterations.
References
----------
.. [1] F. Petitjean, A. Ketterlin & P. Gancarski. A global averaging method
for dynamic time warping, with applications to clustering. Pattern
Recognition, Elsevier, 2011, Vol. 44, Num. 3, pp. 678-693
.. [2] D. Schultz and B. Jain. Nonsmooth Analysis and Subgradient Methods
for Averaging in Dynamic Time Warping Spaces.
Pattern Recognition, 74, 340-358.
"""
X_ = to_time_series_dataset(X)
if barycenter_size is None:
barycenter_size = X_.shape[1]
weights = _set_weights(weights, X_.shape[0])
if init_barycenter is None:
barycenter = euclidean_barycenter(X_)
else:
barycenter_size = init_barycenter.shape[0]
barycenter = init_barycenter
cost_prev, cost = numpy.inf, numpy.inf
list_barycenters = [barycenter]
list_assignments = []
for it in range(max_iter):
list_p_k, cost = _mm_assignment(X_, barycenter, weights, metric_params)
list_assignments.append(list_p_k)
diag_sum_v_k, list_w_k = _mm_valence_warping(list_p_k, barycenter_size,
weights)
if verbose:
print("[DBA] epoch %d, cost: %.3f" % (it + 1, cost))
barycenter = _mm_update_barycenter(X_, diag_sum_v_k, list_w_k)
list_barycenters.append(barycenter)
if abs(cost_prev - cost) < tol:
break
elif cost_prev < cost:
warnings.warn("DBA loss is increasing while it should not be. "
"Stopping optimization.", ConvergenceWarning)
break
else:
cost_prev = cost
return barycenter, cost, list_barycenters, list_assignments
def plot_and_snap(X, bar, assign, step, camera):
if assign is not None:
for idx in range(n_ts):
for ti, tj in assign[idx]:
plt.plot([ti, tj], [bar[ti, 0], X[idx, tj, 0]], 'k-',
alpha=.5)
plt.plot(numpy.arange(sz), X[0].ravel(), "r-", marker="o")
plt.plot(numpy.arange(sz), X[1].ravel(), "b-", marker="o")
plt.plot(numpy.arange(sz), bar.ravel(), "-", marker="o", color="purple")
plt.ylim([X.min() - .1, X.max() + .1])
plt.xticks([])
plt.yticks([])
plt.tight_layout()
camera.snap()
# Main code
from tslearn.datasets import CachedDatasets
from tslearn.preprocessing import TimeSeriesScalerMinMax, TimeSeriesResampler
X_train, y_train, X_test, y_test = CachedDatasets().load_dataset("Trace")
sz, d = X_train.shape[1:]
n_ts = 2
X_out = numpy.empty((n_ts, sz, d))
X_out[0] = X_train[y_train == 1][0]
X_out[1] = X_train[y_train == 2][0]
X_out = TimeSeriesScalerMinMax().fit_transform(X_out)
sz = 30
X_out = TimeSeriesResampler(sz=sz).fit_transform(X_out)
_, _, list_barycenters, list_assignments = dtw_barycenter_averaging(X_out)
# For visualization purposes
X_out[0] += 1.
X_out[1] -= 1.
assignments_i = None
bar_i = None
step = 0
fig = plt.figure()
camera = Camera(fig)
for i in range(len(list_assignments)):
# Update barycenter
bar_i = list_barycenters[i]
plot_and_snap(X_out, bar_i, assignments_i, step, camera)
step += 1
# Update assignments
assignments_i = list_assignments[i]
plot_and_snap(X_out, bar_i, assignments_i, step, camera)
step += 1
anim = camera.animate()
plt.close()
HTML(anim.to_jshtml())
Some other alignment-based metrics
Though DTW is probably the most well-known alignment-based similarity measure for time series, it is not the only one. Some other similarity measures rely on local matches rather than matching the whole series. This is the case of Longest Common Sub-Sequence (LCSS) and Longest Common Substring (LCS) algorithms.
Also, the problem of aligning heterogeneous time series (i.e. time series whose elements either do not lie in the same ambient space or cannot be compared directly) has received some attention. A first similarity measure in this context is Canonical Time Warping [Zhou and Torre, 2009] which finds both a common embedding space for features of both time series (using Canonical Correlation Analysis) and an optimal alignment path à la DTW. More recent works on this problem aim at either mapping one series’ features to the other series’ feature space [Vayer et al., 2020] or matching self-similarity matrices [Cohen et al., 2020].
References#
- CLT+20
Samuel Cohen, Giulia Luise, Alexander Terenin, Brandon Amos, and Marc Peter Deisenroth. Aligning time series on incomparable spaces. 2020. arXiv:2006.12648.
- PKGanccarski11
François Petitjean, Alain Ketterlin, and Pierre Gançarski. A global averaging method for dynamic time warping, with applications to clustering. Elsevier Pattern Recognition, 44(3):678 – 693, 2011.
- SC78
Hiroaki Sakoe and Seibi Chiba. Dynamic programming algorithm optimization for spoken word recognition. IEEE Transactions on Acoustics, Speech and Signal Processing, 26(1):43–49, 1978.
- VCC+20
Titouan Vayer, Laetitia Chapel, Nicolas Courty, Rémi Flamary, Yann Soullard, and Romain Tavenard. Time series alignment with global invariances. 2020. arXiv:2002.03848.
- ZT09
Feng Zhou and Fernando Torre. Canonical Time Warping for alignment of human behavior. In Neural Information Processing Systems, 2286–2294. 2009.