# MIT License: Copyright (c) 2022, Marie Roald.
# See the LICENSE file in the root directory for full license text.
from abc import ABC, abstractmethod
try:
import condat_tv
HAS_TV = True
except ImportError:
HAS_TV = False
import numpy as np
import tensorly as tl
from scipy.optimize import bisect
from scipy.sparse import csr_matrix, bmat
from scipy.sparse.linalg import spsolve
from ._doc_utils import InheritableDocstrings, copy_ancestor_docstring
from ._unimodal_regression import unimodal_regression
from ._utils import get_svd
[docs]
class ADMMPenalty(ABC, metaclass=InheritableDocstrings):
"""Base class for all regularizers and constraints.
Parameters
----------
aux_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
dual_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
"""
def __init__(self, aux_init="random_uniform", dual_init="random_uniform"):
self.aux_init = aux_init
self.dual_init = dual_init
[docs]
def init_aux(self, matrices, rank, mode, random_state=None):
"""Initialize the auxiliary variables
Initialization schemes
* ``"random_uniform"``: The elements of the auxiliary variables
are drawn from a uniform distribution between 0 and 1.
* ``"random_standard_normal"``: The elements of the auxiliary
variables are drawn from a standard normal distribution.
* ``"zeros"``: The elements of the auxiliary variables are
initialized as zero.
* tl.tensor(ndim=2) : Pre-computed auxiliary variables (mode=0 or mode=2)
* list of tl.tensor(ndim=2): Pre-computed auxiliary variables (mode=1)
Parameters
----------
matrices : list of tensor(ndim=2) or tensor(ndim=3)
The data matrices represented by the coupled matrix factorization
these auxiliary variables correspond to.
rank : int
Rank of the decomposition.
mode : int
The mode represented by the factor matrices that these
auxiliary variables correspond to.
random_state : RandomState
TensorLy random state.
Returns
-------
tl.tensor(ndim=2) or list of tl.tensor(ndim=2)
"""
random_state = tl.check_random_state(random_state)
if not isinstance(rank, int):
raise TypeError("Rank must be int, not {}".format(type(rank)))
if not isinstance(mode, int):
raise TypeError("Mode must be int, not {}".format(type(mode)))
elif mode not in [0, 1, 2]:
raise ValueError("Mode must be 0, 1, or 2.")
if (
not isinstance(self.aux_init, str)
and not tl.is_tensor(self.aux_init)
and not isinstance(self.aux_init, list)
):
raise TypeError(
"self.aux_init must be a tensor, a list of tensors or a string specifiying init method, not {}".format(
type(self.aux_init)
)
)
# Initialize using given aux variables
if mode in {0, 2} and tl.is_tensor(self.aux_init):
length_, rank_ = tl.shape(self.aux_init)
I = len(matrices)
K = tl.shape(matrices[0])[1]
if rank != rank_ or (mode == 0 and length_ != I):
raise ValueError(
"Invalid shape for pre-specified auxiliary variable for mode 0"
"\nShould have shape {}, but has shape {}".format((I, rank), (length_, rank_))
)
elif rank != rank_ or (mode == 2 and length_ != K):
raise ValueError(
"Invalid shape for pre-specified auxiliary variable for mode 2"
"\nShould have shape {}, but has shape {}".format((K, rank), (length_, rank_))
)
return self.aux_init
elif mode in {0, 2} and isinstance(self.aux_init, list):
raise TypeError("Cannot use list of matrices to initialize auxiliary matrices for mode 0 or 2.")
elif mode == 1 and isinstance(self.aux_init, list):
J_is = (tl.shape(matrix)[0] for matrix in matrices)
shapes = ((J_i, rank) for J_i in J_is)
if any(tl.shape(aux) != shape for aux, shape in zip(self.aux_init, shapes)):
raise ValueError(
"Invalid shape for at least one of matrices in the auxiliary variable list for mode 1."
)
elif len(self.aux_init) != len(matrices):
raise ValueError(
"Different number of pre-specified auxiliary factor matrices for mode 1 "
"than the number of coupled matrices."
)
return self.aux_init
elif mode == 1 and tl.is_tensor(self.aux_init):
raise TypeError(
"Cannot use a tensor (matrix) to initialize auxiliary matrices for mode 1. Must be a list instead."
)
# Generate initialization based on option
if self.aux_init == "random_uniform":
if mode == 0:
return tl.tensor(random_state.uniform(size=(len(matrices), rank)))
elif mode == 1:
return [tl.tensor(random_state.uniform(size=(matrix.shape[0], rank))) for matrix in matrices]
elif mode == 2:
return tl.tensor(random_state.uniform(size=(matrices[0].shape[1], rank)))
elif self.aux_init == "random_standard_normal":
if mode == 0:
return tl.tensor(random_state.standard_normal(size=(len(matrices), rank)))
elif mode == 1:
return [tl.tensor(random_state.standard_normal(size=(matrix.shape[0], rank))) for matrix in matrices]
elif mode == 2:
return tl.tensor(random_state.standard_normal(size=(matrices[0].shape[1], rank)))
elif self.aux_init == "zeros":
if mode == 0:
return tl.zeros((len(matrices), rank))
elif mode == 1:
return [tl.zeros((matrix.shape[0], rank)) for matrix in matrices]
elif mode == 2:
return tl.zeros((matrices[0].shape[1], rank))
else:
raise ValueError("Unknown aux init: {}".format(self.aux_init))
[docs]
def init_dual(self, matrices, rank, mode, random_state=None):
"""Initialize the dual variables
Initialization schemes
* ``"random_uniform"``: The elements of the dual variables
are drawn from a uniform distribution between 0 and 1.
* ``"random_standard_normal"``: The elements of the dual
variables are drawn from a standard normal distribution.
* ``"zeros"``: The elements of the dual variables are
initialized as zero.
* tl.tensor(ndim=2) : Pre-computed dual variables (mode=0 or mode=2)
* list of tl.tensor(ndim=2): Pre-computed dual variables (mode=1)
Parameters
----------
matrices : list of tensor(ndim=2) or tensor(ndim=3)
The data matrices represented by the coupled matrix factorization
these dual variables correspond to.
rank : int
Rank of the decomposition.
mode : int
The mode represented by the factor matrices that these
dual variables correspond to.
random_state : RandomState
TensorLy random state.
Returns
-------
tl.tensor(ndim=2) or list of tl.tensor(ndim=2)
"""
random_state = tl.check_random_state(random_state)
if not isinstance(rank, int):
raise TypeError("Rank must be int, not {}".format(type(rank)))
if not isinstance(mode, int):
raise TypeError("Mode must be int, not {}".format(type(mode)))
elif mode not in [0, 1, 2]:
raise ValueError("Mode must be 0, 1, or 2.")
if (
not isinstance(self.dual_init, str)
and not tl.is_tensor(self.dual_init)
and not isinstance(self.dual_init, list)
):
raise TypeError(
"self.dual_init must be a tensor, a list of tensors or a string specifiying init method, not {}".format(
type(self.dual_init)
)
)
# Initialize using given dual variables
if mode in {0, 2} and tl.is_tensor(self.dual_init):
length_, rank_ = tl.shape(self.dual_init)
I = len(matrices)
K = tl.shape(matrices[0])[1]
if rank != rank_ or (mode == 0 and length_ != I):
raise ValueError(
"Invalid shape for pre-specified auxiliary variable for mode 0"
"\nShould have shape {}, but has shape {}".format((I, rank), (length_, rank_))
)
elif rank != rank_ or (mode == 2 and length_ != K):
raise ValueError(
"Invalid shape for pre-specified auxiliary variable for mode 2"
"\nShould have shape {}, but has shape {}".format((K, rank), (length_, rank_))
)
return self.dual_init
elif mode in {0, 2} and isinstance(self.dual_init, list):
raise TypeError("Cannot use list of matrices to initialize auxiliary matrices for mode 0 or 2.")
elif mode == 1 and isinstance(self.dual_init, list):
J_is = (tl.shape(matrix)[0] for matrix in matrices)
shapes = ((J_i, rank) for J_i in J_is)
if any(tl.shape(dual) != shape for dual, shape in zip(self.dual_init, shapes)):
raise ValueError(
"Invalid shape for at least one of matrices in the auxiliary variable list for mode 1."
)
elif len(self.dual_init) != len(matrices):
raise ValueError(
"Different number of pre-specified auxiliary factor matrices for mode 1 "
"than the number of coupled matrices."
)
return self.dual_init
elif mode == 1 and tl.is_tensor(self.dual_init):
raise TypeError(
"Cannot use a tensor (matrix) to initialize auxiliary matrices for mode 1. Must be a list instead."
)
# Generate initialization based on option
if self.dual_init == "random_uniform":
if mode == 0:
return tl.tensor(random_state.uniform(size=(len(matrices), rank)))
elif mode == 1:
return [tl.tensor(random_state.uniform(size=(matrix.shape[0], rank))) for matrix in matrices]
elif mode == 2:
return tl.tensor(random_state.uniform(size=(matrices[0].shape[1], rank)))
elif self.dual_init == "random_standard_normal":
if mode == 0:
return tl.tensor(random_state.standard_normal(size=(len(matrices), rank)))
elif mode == 1:
return [tl.tensor(random_state.standard_normal(size=(matrix.shape[0], rank))) for matrix in matrices]
elif mode == 2:
return tl.tensor(random_state.standard_normal(size=(matrices[0].shape[1], rank)))
elif self.dual_init == "zeros":
if mode == 0:
return tl.zeros((len(matrices), rank))
elif mode == 1:
return [tl.zeros((matrix.shape[0], rank)) for matrix in matrices]
elif mode == 2:
return tl.zeros((matrices[0].shape[1], rank))
else:
raise ValueError("Unknown aux init: {}".format(self.dual_init))
[docs]
@abstractmethod
def penalty(self, x): # pragma: nocover
"""Compute the penalty for the given factor matrix or list of factor matrices."""
raise NotImplementedError
[docs]
def subtract_from_auxes(self, auxes, duals):
"""Compute (aux - dual) for each auxiliary- and dual-factor matrix for mode=1.
For some penalties, the aux is not a list of factor matrices but rather some
other parametrization of a list of factor matrices. This function is used so
the AO-ADMM procedure can work with any auxiliary-variable parametrization
seamlessly.
Parameters
----------
auxes : list of tl.tensor(ndim=2)
Auxiliary variables
duals : list of tl.tensor(ndim=2)
Dual variables (or other variable to subtract from the auxes)
Returns
-------
list of tl.tensor(ndim=2)
The list of differences
"""
return [self.subtract_from_aux(aux, dual) for aux, dual in zip(auxes, duals)]
[docs]
def subtract_from_aux(self, aux, dual):
"""Compute (aux - dual) for mode=0 and mode=2.
For some penalties, the aux is not a factor matrix but rather some other
parametrization of a matrix. This function is used so the AO-ADMM procedure
can work with any auxiliary-variable parametrization seamlessly.
Parameters
----------
aux : tl.tensor(ndim=2)
Auxiliary variables
dual : tl.tensor(ndim=2)
Dual variables (or other variable to subtract from the auxes)
Returns
-------
tl.tensor(ndim=2)
The list of differences
"""
return aux - dual
[docs]
def aux_as_matrix(self, aux):
"""Convert an auxiliary variable to a matrix (mode=0 and mode=2).
This is an identity function that just returns its input. However,
it is required for AO-ADMM to seamlessly work when the auxiliary variable
is a parametrization of a matrix.
Parameters
----------
aux : tl.tensor(ndim=2)
Returns
-------
tl.tensor(ndim=2)
"""
return aux
[docs]
def auxes_as_matrices(self, auxes):
"""Convert a list of auxiliary variables to a list of matrices (mode=1).
This is an identity function that just returns its input. However,
it is required for AO-ADMM to seamlessly work when the auxiliary variable
is a parametrization of a matrix.
Parameters
----------
auxes : list of tl.tensor(ndim=2)
Returns
-------
list of tl.tensor(ndim=2)
"""
return [self.aux_as_matrix(aux) for aux in auxes]
def _auto_add_param_to_repr(self, param):
if param.startswith("_"):
return False
elif param in {"aux_init", "dual_init"}:
return False
return True
def __repr__(self):
param_strings = [
f"{key}={repr(value)}" for key, value in self.__dict__.items() if self._auto_add_param_to_repr(key)
]
if isinstance(self.aux_init, str):
param_strings.append(f"aux_init='{self.aux_init}'")
else:
param_strings.append("aux_init=given_init")
if isinstance(self.dual_init, str):
param_strings.append(f"dual_init='{self.dual_init}'")
else:
param_strings.append("dual_init=given_init")
params = ", ".join(param_strings)
return f"<'{self.__module__}.{type(self).__name__}' with {params})>"
[docs]
class MatricesPenalty(ADMMPenalty):
"""Base class for penalties that are applied to a list of factor matrices simultaneously."""
[docs]
@abstractmethod
def factor_matrices_update(self, factor_matrices, feasibility_penalties, auxes): # pragma: nocover
"""Update all factor matrices in given list according to this penalty.
Parameters
----------
factor_matrices : list of tl.tensor(ndim=2)
List of factor matrix to update.
feasibility_penalties : list of floats
Penalty parameters for the feasibility gap of the different factor matrices.
auxes : list of tl.tensor(ndim=2)
List of auxiliary matrices, each element corresponds to the auxiliary factor matrix
for the same element in ``factor_matrices``.
"""
raise NotImplementedError
[docs]
class MatrixPenalty(MatricesPenalty):
"""Base class for penalties that can be applied to a single factor matrix at a time."""
[docs]
def factor_matrices_update(self, factor_matrices, feasibility_penalties, auxes):
"""Update all factor matrices in given list according to this penalty.
Parameters
----------
factor_matrices : list of tl.tensor(ndim=2)
List of factor matrix to update.
feasibility_penalties : list of floats
Penalty parameters for the feasibility gap of the different factor matrices.
auxes : list of tl.tensor(ndim=2)
List of auxiliary matrices, each element corresponds to the auxiliary factor matrix
for the same element in ``factor_matrices``.
"""
return [
self.factor_matrix_update(fm, feasibility_penalty, aux)
for fm, feasibility_penalty, aux in zip(factor_matrices, feasibility_penalties, auxes)
]
[docs]
@abstractmethod
def factor_matrix_update(self, factor_matrix, feasibility_penalty, aux): # pragma: nocover
"""Update a factor matrix according to this penalty.
Parameters
----------
factor_matrix : tl.tensor(ndim=2)
Factor matrix to update.
feasibility_penalty : float
Penalty parameter for infeasible solutions.
aux : tl.tensor(ndim=2)
Auxiliary matrix that correspond to the factor matrix supplied to ``factor_matrix``.
"""
raise NotImplementedError
[docs]
class RowVectorPenalty(MatrixPenalty):
"""Base class for penalties that can be applied to one row of a factor matrix at a time."""
[docs]
def factor_matrix_update(self, factor_matrix, feasibility_penalty, aux):
"""Update a factor matrix according to this penalty.
Parameters
----------
factor_matrix : tl.tensor(ndim=2)
Factor matrix to update.
feasibility_penalty : float
Penalty parameter for infeasible solutions.
aux : tl.tensor(ndim=2)
Auxiliary matrix that correspond to the factor matrix supplied to ``factor_matrix``.
"""
out = tl.zeros(factor_matrix.shape)
for row, factor_matrix_row in enumerate(factor_matrix):
out[row] = self.factor_matrix_row_update(factor_matrix_row, feasibility_penalty, aux[row])
return out
[docs]
@abstractmethod
def factor_matrix_row_update(self, factor_matrix_row, feasibility_penalty, aux_row): # pragma: nocover
"""Update a single row of a factor matrix according to this constraint.
Parameters
----------
factor_matrix_row : tl.tensor(ndim=1)
Vector (first order tensor) that corresponds to the single row in the factor matrix
we wish to update.
feasibility_penalty : float
Penalty parameter for infeasible solutions.
aux_row : tl.tensor(ndim=1)
Vector (first order tensor) that corresponds to the row in the auxiliary matrix that
correspond to the row supplied to ``factor_matrix_row``.
"""
raise NotImplementedError
[docs]
class HardConstraintMixin(metaclass=InheritableDocstrings):
"""Mixin for hard constraints."""
[docs]
def penalty(self, x):
"""Returns 0 as there is no penalty for hard constraints.
Hard constraints are always penalised with 0 even when the components are infeasible.
Slightly infeasible components would otherwise result in an infinite penalty because
the penalty function of hard constraints is 0 for feasible solutions and infinity for
infeasible solutions. An infinite penalty would stop all convergence checking and not
provide any information on the quality of the components. To ensure that the hard
constraints are sufficiently imposed, it is recommended to examine the feasibility gap
instead of the penalty and ensure that the feasibility gap is low.
Parameters
----------
x : tl.tensor(ndim=2) or list of tl.tensor(ndim=2)
Factor matrix or list of factor matrices.
"""
return 0
[docs]
class NonNegativity(HardConstraintMixin, RowVectorPenalty):
r"""Impose non-negative values for the factor.
The non-negativity constraint works element-wise, constraining the elements of a factor
to satisfy :math:`0 \leq x`, where :math:`x` represents a factor element
Parameters
----------
aux_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
dual_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
"""
[docs]
@copy_ancestor_docstring
def factor_matrix_row_update(self, factor_matrix_row, feasibility_penalty, aux_row):
return tl.clip(factor_matrix_row, 0, float("inf"))
[docs]
@copy_ancestor_docstring
def factor_matrix_update(self, factor_matrix, feasibility_penalty, aux):
return tl.clip(factor_matrix, 0, float("inf"))
[docs]
class Box(HardConstraintMixin, RowVectorPenalty):
r"""Set minimum and maximum value for the factor.
A box constraint works element-wise, constraining the elements of a factor
to satisfy :math:`l \leq x \leq u`, where :math:`x` represents the element
of the factor and :math:`l` and :math:`u` are the lower and upper bound on
the factor elements, respectively.
Parameters
----------
min_val : float or None
Lower bound on the factor elements.
max_val : float or None
Upper bound on the factor elements
aux_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
dual_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
"""
def __init__(self, min_val, max_val, aux_init="random_uniform", dual_init="random_uniform"):
super().__init__(aux_init=aux_init, dual_init=dual_init)
self.min_val = min_val
self.max_val = max_val
[docs]
@copy_ancestor_docstring
def factor_matrix_row_update(self, factor_matrix_row, feasibility_penalty, aux_row):
return tl.clip(factor_matrix_row, self.min_val, self.max_val)
[docs]
@copy_ancestor_docstring
def factor_matrix_update(self, factor_matrix, feasibility_penalty, aux):
return tl.clip(factor_matrix, self.min_val, self.max_val)
[docs]
class L1Penalty(RowVectorPenalty):
r"""Add L1 (LASSO) regularization on the factor elements.
The L1 penalty is frequently used to obtain sparse components. That is,
components with many zero-valued elements. To accomplish this, the L1 penalty
adds a penalty term on the form :math:`\sum_{i r} \gamma |a_{ir}|`, where
:math:`a_{ir}` is the :math:`(i,r)`-th element of the factor matrix.
Parameters
----------
reg_strength : float
The regularization strength, :math:`\gamma` in the equation above
non_negativity : bool
If ``True``, then non-negativity is also imposed on the factor elements.
aux_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
dual_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
"""
def __init__(self, reg_strength, non_negativity=False, aux_init="random_uniform", dual_init="random_uniform"):
super().__init__(aux_init=aux_init, dual_init=dual_init)
if reg_strength < 0:
raise ValueError("Regularization strength must be nonnegative.")
self.reg_strength = reg_strength
self.non_negativity = non_negativity
[docs]
@copy_ancestor_docstring
def factor_matrix_row_update(self, factor_matrix_row, feasibility_penalty, aux_row):
if self.non_negativity:
return tl.clip(factor_matrix_row - self.reg_strength / feasibility_penalty, 0, float("inf"))
sign = tl.sign(factor_matrix_row)
return sign * tl.clip(tl.abs(factor_matrix_row) - self.reg_strength / feasibility_penalty, 0, float("inf"))
[docs]
@copy_ancestor_docstring
def factor_matrix_update(self, factor_matrix, feasibility_penalty, aux):
if self.non_negativity:
return tl.clip(factor_matrix - self.reg_strength / feasibility_penalty, 0, float("inf"))
sign = tl.sign(factor_matrix)
return sign * tl.clip(tl.abs(factor_matrix) - self.reg_strength / feasibility_penalty, 0, float("inf"))
[docs]
@copy_ancestor_docstring
def penalty(self, x):
if tl.is_tensor(x):
return tl.sum(tl.abs(x)) * self.reg_strength
return sum(tl.sum(tl.abs(xi)) for xi in x) * self.reg_strength
[docs]
class GeneralizedL2Penalty(MatrixPenalty):
r"""Penalty on the form :math:`\mathbf{x}^\mathsf{T} \mathbf{Mx}`, where :math:`\mathbf{M}` is any symmetric positive semidefinite matrix.
The generalized L2 penalty adds a squared (semi-)norm penalty on the form
.. math::
g(\mathbf{x}) = \mathbf{x}^\mathsf{T} \mathbf{Mx}
where :math:`\mathbf{M}` is a symmetric positive semidefinite matrix and :math:`\mathbf{x}`
is a vector. This penalty is imposed for all columns of the factor matrix (or matrices for
the :math:`\mathbf{B}^{(i)}`-s). Note that the regular L2-penalty (or Ridge penalty) is a special
case of the generalised L2 penalty, which we obtain by setting :math:`\mathbf{M}=\mathbf{I}`.
Also, this penalty is a squared seminorm penalty since
.. math::
g(\mathbf{x}) = \mathbf{x}^\mathsf{T} \mathbf{Mx} = \| \mathbf{Lx} \|_2^2,
where :math:`\mathbf{L}` is a Cholesky factorization of :math:`\mathbf{M}`. However, the
formulation with :math:`\mathbf{M}` is more practical than the formulation with :math:`\mathbf{L}`,
since
.. math::
\mathbf{M} = \mathbf{L}^\mathsf{T}\mathbf{L}
is easy to compute with :math:`\mathbf{L}` known, but not wise-versa (e.g. if
:math:`\mathbf{M}` is indefinite).
**Graph Laplacian penalty**
A special case of the generalized L2 penalty is the graph Laplacian penalty. This penalty
is on the form
.. math::
g(\mathbf{x}) = \sum_{m=1}^M \sum_{n=1}^N w_{mn} (x_m - x_n)^2.
That is, squared differences between the different component vector elements are penalised.
Graph laplacian penalties are useful, for example, in image processing, where :math:`w_{mn}`
is a high number for vector elements that represent pixels that are close to each other and
a low number for vector elements that represent pixels that are far apart.
To transform the graph Laplacian penalty into a generalised L2 penalty, we consider the
component vector elements as nodes in a graph and :math:`w_{mn}` as the edge weight between
node :math:`m` and node :math:`m`. Then, we set :math:`\mathbf{M}` equal to the graph Laplacian
of this graph. That is
.. math::
m_{mn} = \begin{cases}
-w_{mn} & m \neq n \\
\sum_m & m = n
\end{cases}.
**The proximal operator:**
The proximal operator of the generalised L2 penalty is obtained by solving
.. math::
\text{prox}_{\mathbf{x}^\mathsf{T} \mathbf{Mx}}(\mathbf{x})
= \left(\mathbf{M} + 0.5\rho\mathbf{I}\right)^{-1}0.5\rho\mathbf{x},
where :math:`\rho` is the scale parameter. There are several ways to solve this equation.
One of which is via the SVD. Let :math:`\mathbf{M} = \mathbf{USU}^\mathsf{T}`, then, the
proximal operator is given by
.. math::
\text{prox}_{\mathbf{x}^\mathsf{T} \mathbf{Mx}}(\mathbf{x})
= (\mathbf{U}(\mathbf{S} + 0.5\rho\mathbf{I})^{-1}\mathbf{U}^\mathsf{T}) 0.5 \rho \mathbf{x}.
This operation is fast once :math:`\mathbf{U}` and :math:`\mathbf{S}` are known, since solving the diagonal
system :math:`(\mathbf{S} + 0.5\rho\mathbf{I})` is fast.
Parameters
----------
norm_matrix : tl.tensor(ndim=2)
The :math:`\mathbf{M}`-matrix above
svd : str
String that specifies which SVD algorithm to use. Valid strings are the keys of ``tensorly.SVD_FUNS``.
aux_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
dual_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
validate : bool (default=True)
Enable naive validation of the norm matrix.
Examples
--------
This example creates a generalised L2 penalty for a simple Laplacian penalty on
the form :math:`\sum_{n} (x_n - x_{n-1})^2`.
>>> import numpy as np
>>> import tensorly as tl
>>> from matcouply.penalties import GeneralizedL2Penalty
>>> num_elements = 30
>>> M = 2 * np.eye(num_elements) - np.eye(num_elements, k=1) - np.eye(num_elements, k=-1)
>>> M[0, 0] = 1
>>> M[-1, -1] = 1
>>> M = tl.tensor(M)
>>> penalty = GeneralizedL2Penalty(M)
This penalty can now be added to ``matcouply.decomposition.cmf_aoadmm`` via the ``regs``-parameter.
Alternatively, if the ``generalized_l2_penalty``-argument of ``matcouply.decomposition.cmf_aoadmm`` is
used, then a ``GeneralizedL2Penalty`` is added with ``method="svd"``.
"""
def __init__(
self,
norm_matrix,
svd="truncated_svd",
aux_init="random_uniform",
dual_init="random_uniform",
validate=True,
):
super().__init__(aux_init, dual_init)
self.norm_matrix = norm_matrix
self.svd = svd # Useful for the __repr__
self.validate = validate # Useful for the __repr__
sign_matrix = -tl.ones(tl.shape(norm_matrix))
sign_matrix = sign_matrix + 2 * tl.eye(tl.shape(norm_matrix)[0])
if validate and (
not tl.all(tl.transpose(norm_matrix) == norm_matrix)
# FIXME: Validate eigenvalues also when/if tensorly gets eigvals function
):
raise ValueError("The norm matrix should be symmetric positive semidefinite")
if validate and tl.get_backend() == "numpy":
eigvals = np.linalg.eigvals(norm_matrix)
if np.any(eigvals < -1e-14):
raise ValueError("The norm matrix should be symmetric positive semidefinite")
self._U, self._s, _ = self.svd_fun(norm_matrix) # Ignore Vh since norm matrix is symmetric
@property
def svd_fun(self):
return get_svd(self.svd)
[docs]
@copy_ancestor_docstring
def factor_matrix_update(self, factor_matrix, feasibility_penalty, aux):
s_aug = self._s + 0.5 * feasibility_penalty
tmp = 0.5 * feasibility_penalty * factor_matrix
tmp = tl.dot(tl.transpose(self._U), tmp)
tmp = tl.dot((self._U * (1 / s_aug)), tmp)
return tmp
def _penalty(self, x):
return tl.trace(tl.dot(tl.dot(tl.transpose(x), self.norm_matrix), x))
[docs]
@copy_ancestor_docstring
def penalty(self, x):
if tl.is_tensor(x):
return self._penalty(x)
else:
return sum(self._penalty(xi) for xi in x)
[docs]
class TotalVariationPenalty(MatrixPenalty):
r"""Impose piecewise constant components
Total variation regularization imposes piecewise constant components by
obtaining components whose derivative is sparse. This sparsity is obtained
using an L1 penalty. That is
.. math::
g(\mathbf{x}) = \alpha \|\nabla \mathbf{x}\|_1 = \alpha \sum_{n=1}^{N} |x_n - x_{n-1}|,
where :math:`\alpha` is a regularization coefficient that controls the sparsity
level of the gradient and :math:`\nabla` is the finite difference operator.
:math:`\mathbf{x}` is a column vector of a factor matrix, and all column vectors
are penalised equally.
The total variation penalty is compatible with the L1 penalty in the sense that
it is easy to compute the proximal operator of
.. math::
g(\mathbf{x}) = \alpha \sum_{n=2}^{N} |x_n - x_{n-1}| + \sum_{n=1}^N \| x_n \|_1.
Specifically, if we first evaluate the proximal operator for the total variation penalty,
followed by the proximal operator of the L1 penalty, then that is equivalent to
evaluating the proximal operator of the sum of a TV and an L1 penalty :cite:p:`friedman2007pathwise`.
To evaluate the proximal operator, we use the improved direct total variation algorithm
by Laurent Condat :cite:p:`condat2013direct` (C code of the improved version is available
here: https://lcondat.github.io/publications.html).
Parameters
----------
reg_strength : float (> 0)
The strength of the total variation regularization (:math:`\alpha` above)
l1_strength : float (>= 0)
The strength of the L1 penalty (:math:`\beta` above)
aux_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
dual_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
Note
----
The C-code this penalty is based in has a CeCILL lisence, which is compatible with GPL, but
not MIT. This penalty is therefore not available with the default installation of
MatCoupLy. To use this penalty, you need to install the GPL-lisenced ``condat-tv``.
Note
----
This penalty is only available with the numpy backend, since it is based on
an external C-library.
"""
def __init__(
self,
reg_strength,
l1_strength=0,
aux_init="random_uniform",
dual_init="random_uniform",
):
if not HAS_TV:
raise ModuleNotFoundError(
"Cannot use total variation penalty without the ``condat_tv`` package (GPL-3 lisenced). "
"Install with ``pip install condat_tv``."
)
if reg_strength <= 0:
raise ValueError("The TV regularization strength must be positive.")
if l1_strength < 0:
raise ValueError("The L1 regularization strength must be non-negative.")
super().__init__(aux_init, dual_init)
self.reg_strength = reg_strength
self.l1_strength = l1_strength
[docs]
@copy_ancestor_docstring
def factor_matrix_update(self, factor_matrix, feasibility_penalty, aux):
X = tl.transpose(
condat_tv.tv_denoise_matrix(tl.transpose(factor_matrix), self.reg_strength * 2 / feasibility_penalty)
)
if self.l1_strength:
return tl.sign(X) * tl.clip(tl.abs(X) - self.l1_strength / feasibility_penalty, 0, float("inf"))
else:
return X
def _penalty(self, x):
penalty = self.reg_strength * tl.sum(tl.abs(np.diff(x, axis=0)))
if self.l1_strength:
penalty = penalty + self.l1_strength * tl.sum(tl.abs(x))
return penalty
[docs]
@copy_ancestor_docstring
def penalty(self, x):
if tl.is_tensor(x):
return self._penalty(x)
else:
return sum(self._penalty(xi) for xi in x)
[docs]
class L2Ball(HardConstraintMixin, MatrixPenalty):
r"""Ensure that the L2-norm of component vectors are less than a given scalar.
This is a hard constraint on the L2-norm of the component vectors given by
.. math::
\|\mathbf{x}\|_2 \leq r,
where :math:`\mathbf{x}` is a column vector for a factor matrix and
:math:`r` is a positive constant.
The L2-ball constraint is compatible with the non-negativity constraint.
Parameters
----------
norm_bound : float (> 0)
Maximum L2-norm of the component vectors (:math:`r` above)
non_negativity : float
If true, then non-negativity is imposed too
aux_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
dual_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
Note
----
**Proof of compatibility with non-negativity constraints**
The compatibility with non-negativity constraints can be obtained with the
standard projection onto convex sets (POCS) algorithm, which states that the
projection onto a convex set that is the intersection of two other convex sets
:math:`\mathcal{C} = \mathcal{C}_1 \cap \mathcal{C}_2`,
:math:`\mathcal{P}_\mathcal{C}` is given by
.. math::
\mathcal{P}_\mathcal{C} = \prod_{n=1}^\infty \mathcal{P}_{\mathcal{C}_1} \mathcal{P}_{\mathcal{C}_2},
where :math:`\mathcal{P}_{\mathcal{C}_1}` and :math:`\mathcal{P}_{\mathcal{C}_2}`
are the projections onto :math:`\mathcal{C}_1` and :math:`\mathcal{C}_2`,
respectively. In other words, to project only :math:`\mathcal{C}`, we can
alternatingly project onto :math:`\mathcal{C}_1` and :math:`\mathcal{C}_2`.
We can now use this relation to prove that the projection onto the intersection
of the L2 ball and the non-negative orthant can be obtained by first projecting
onto the L2 ball followed by a projection onto the non-negative orthant. Consider
any point :math:`\mathbf{x} \in \mathbb{R}^N`. The projection onto the L2 ball
of radius :math:`r` is given by:
.. math::
\mathbf{x}^{(0.5)} = \frac{\min(\|\mathbf{x}\|_2, r) \mathbf{x}}{\|\mathbf{x}\|_2}.
If any entries in :math:`\mathbf{x}` are negative, then their sign will not change.
Next, we project :math:`\mathbf{x}^{(0.5)}` onto the non-negative orthant:
.. math::
x_n^{(1)} = \max(x_n^{(0.5)}, 0).
This operation has the property that :math:`\|\mathbf{x}^{(1)}\|_2 \leq \|\mathbf{x}^{(0.5)}\|_2 \leq`.
Thus, any subsequent projection either onto the L2-ball of radius :math:`r` or
the non-negative orthant will not change :math:`\mathbf{x}^{(1)}`, which means that
:math:`\mathbf{x}^{(1)}` is the projection of :math:`\mathbf{x}` onto the intersection
of the L2 ball of radius :math:`r` and the non-negative orthant.
"""
def __init__(self, norm_bound, non_negativity=False, aux_init="random_uniform", dual_init="random_uniform"):
super().__init__(aux_init, dual_init)
self.norm_bound = norm_bound
self.non_negativity = non_negativity
if norm_bound <= 0:
raise ValueError("The norm bound must be positive.")
[docs]
@copy_ancestor_docstring
def factor_matrix_update(self, factor_matrix, feasibility_penalty, aux):
if self.non_negativity:
factor_matrix = tl.clip(factor_matrix, 0, float("inf"))
column_norms = tl.sqrt(tl.sum(factor_matrix**2, axis=0))
column_norms = tl.clip(column_norms, self.norm_bound, float("inf"))
return factor_matrix * self.norm_bound / column_norms
[docs]
class UnitSimplex(HardConstraintMixin, MatrixPenalty):
"""Constrain the component-vectors so they are non-negative and sum to 1.
This is a hard constraint which is useful when the component-vectors represent
probabilities.
Parameters
----------
aux_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
dual_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
"""
def _compute_lagrange_multiplier(self, factor_matrix_column):
"""Compute lagrange multipliers for the equality constraint: sum(x) = 1 with x >= 0.
Parameters
----------
factor_matrix_column : ndarray
Single column of a factor matrix
Returns
lagrange_multiplier : float
The single lagrange multiplier for the simplex constraint.
"""
# Inspired by https://math.stackexchange.com/questions/2402504/orthogonal-projection-onto-the-unit-simplex
# But using bisection instead of Newton's method, since Newton's method requires a C2 function,
# and this is only a C0 function.
# 0 = ∑_i[x_i] − 1 = ∑_i[min((yi−μ), 0)] - 1
min_val = tl.min(factor_matrix_column) - 1
max_val = tl.max(factor_matrix_column)
# Add a little buffer to the tolerance to account for floating point errors
min_val -= 1e-5
min_val = min(0.9 * min_val, 1.1 * min_val)
max_val += 1e-5
max_val = max(0.9 * max_val, 1.1 * max_val)
def f(multiplier):
return tl.sum(tl.clip(factor_matrix_column - multiplier, 0, None)) - 1
return bisect(f, min_val, max_val)
[docs]
@copy_ancestor_docstring
def factor_matrix_update(self, factor_matrix, feasibility_penalty, aux):
output_factor_matrix = tl.zeros(tl.shape(factor_matrix))
for r in range(tl.shape(factor_matrix)[1]):
lagrange_multiplier = self._compute_lagrange_multiplier(factor_matrix[:, r])
output_factor_matrix[:, r] = tl.clip(factor_matrix[:, r] - lagrange_multiplier, 0, None)
return output_factor_matrix
[docs]
class Unimodality(HardConstraintMixin, MatrixPenalty):
r"""Constrain the component-vectors so they are unimodal.
Unimodal vectors :math:`\mathbf{u} \in \mathbb{R}^n` have the property that
.. math::
u_1 \leq u_2 \leq ... \leq u_{t-1} \leq u_t \geq u_{t+1} \geq ... \geq u_{n-1} \geq u_n
Projecting a general vector into the set of unimodal vectors (called unimodal regression) requires solving
a set of isotonic regression problems (i.e. projections onto monotincally increasing or decreasing vectors).
Two isotonic regression problems for each element in the vector. However, there is an incremental algorithm
for fitting isotonic regression problems called *prefix isotonic regression* :cite:p:`stout2008unimodal`,
which can be used to solve unimodal problems in linear time :cite:p:`stout2008unimodal`.
Parameters
----------
non_negativity : bool
If True, then the components will also be non-negative
aux_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
dual_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
"""
def __init__(self, non_negativity=False, aux_init="random_uniform", dual_init="random_uniform"):
if tl.get_backend() != "numpy":
raise RuntimeError("Unimodality is only supported with the Numpy backend")
super().__init__(aux_init, dual_init)
self.non_negativity = non_negativity
[docs]
@copy_ancestor_docstring
def factor_matrix_update(self, factor_matrix, feasibility_penalty, aux):
return unimodal_regression(factor_matrix, non_negativity=self.non_negativity)
[docs]
class Parafac2(MatricesPenalty):
r"""Impose the PARAFAC2 constraint on the uncoupled factor matrices.
The PARAFAC2 constraint can only be imposed on the uncoupled :math:`\mathbf{B}^{(i)}`-matrices, and
states that
.. math::
{\mathbf{B}^{(i_1)}}^{\mathsf{T}}\mathbf{B}^{(i_1)} = {\mathbf{B}^{(i_2)}}^{\mathsf{T}}\mathbf{B}^{(i_2)},
for any :math:`i_1` and :math:`i_2`. This constraint ensures uniqueness on the components so
long as the number of coupled matrices are sufficiently large. A sufficent condition is that
there are :math:`R(R+1)(R+2)(R+3)/24` matrices, where :math:`R` is the rank of the
decomposition :cite:p:`harshman1996uniqueness`. However, the true number of datasets required
for uniqueness is typically lower, and it is conjectured that uniquenes for any :math:`R` holds
in practice whenever there are four or more matrices :cite:p:`kiers1999parafac2`.
**Parametrization of matrix-collections that satisfy the PARAFAC2 constraint**
The standard way of parametrizing collections of matrices that satisfy the PARAFAC2 constraint
is due to Kiers et al. :cite:p:`kiers1999parafac2`. If :math:`\{\mathbf{B}^{(i)}\}_{i=1}^I` satsifies
the PARAFAC2 constraint, then there exists a matrix :math:`\mathbf{\Delta} \in \mathbb{R}^{R \times R}`
(the coordinate matrix) and a collection of orthonormal matrices :math:`\{\mathbf{P}^{(i)}\}_{i=1}^I`
(the orthogonal basis matrices) such that
.. math::
\mathbf{B}^{(i)} = \mathbf{P}^{(i)} \mathbf{\Delta}.
For this implementation, we use the above parametrization of the auxiliary variables, which
is a tuple whose first element is a list of orthogonal basis matrices and second element
is the coordinate matrix.
**The proximal operator**
To evaluate the proximal operator, we use the projection scheme presented in
:cite:p:`roald2021admm,roald2021parafac2`. Specifically, we project with a coordinate descent
scheme, where we first update the basis matrices and then update the coordinate matrix. It
has been observed that only one iteration of this coordinate descent scheme is sufficient
for fitting PARAFAC2 models with AO-ADMM :cite:p:`roald2021admm,roald2021parafac2`.
To project :math:`\{\mathbf{X}^{(i)}\}_{i=1}^I` onto the set of collections of matrices that
satisfy the PARAFAC2 constraint, we first update the orthogonal basis matrices by
.. math::
\mathbf{P}^{(i)} = \mathbf{U}^{(i)} {\mathbf{V}^{(i)}}^\mathsf{T}
where :math:`\mathbf{U}^{(i)}` and :math:`\mathbf{V}^{(i)}` contain the left and right singular vectors
of :math:`\mathbf{X}^{(i)} \mathbf{\Delta}^\mathsf{T}`.
Then we update the coordinate matrix by
.. math::
\mathbf{\Delta}
= \frac{1}{\sum_{i=1}^I \rho_i}\sum_{i=1}^I \rho_i {\mathbf{P}^{(i)}}^\mathsf{T}\mathbf{X}^{(i)},
where :math:`\rho_i` is the feasibility penalty (which parameterizes the norm of the projection)
for the :math:`i`-th factor matrix.
Parameters
----------
svd : str
String that specifies which SVD algorithm to use. Valid strings are the keys of ``tensorly.SVD_FUNS``.
n_iter : int
Number of iterations for the coordinate descent scheme
aux_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
dual_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
"""
def __init__(
self,
svd="truncated_svd",
n_iter=1,
update_basis_matrices=True,
update_coordinate_matrix=True,
aux_init="random_uniform",
dual_init="random_uniform",
):
self.svd = svd
self.aux_init = aux_init
self.dual_init = dual_init
self.update_basis_matrices = update_basis_matrices
self.update_coordinate_matrix = update_coordinate_matrix
self.n_iter = n_iter
@property
def svd_fun(self):
return get_svd(self.svd)
[docs]
def init_aux(self, matrices, rank, mode, random_state=None):
r"""Initialize the auxiliary variables
For all initialization schemes, the orthogonal basis matrices are initialized
using the first :math:`R` rows of an identity matrix.
**Coordinate matrix initialization schemes**
* ``"random_uniform"``: The elements of the coordinate matrix
are drawn from a uniform distribution between 0 and 1.
* ``"random_standard_normal"``: The elements of the coordinate matrix
are drawn from a standard normal distribution.
* ``"zeros"``: The elements of the coordinate matrix are
initialized as zero.
* tl.tensor(ndim=2) : Pre-computed coordinate matrix (mode=0 or mode=2)
* list of tl.tensor(ndim=2): Pre-computed coordinate matrix (mode=1)
Parameters
----------
matrices : list of tensor(ndim=2) or tensor(ndim=3)
The data matrices represented by the coupled matrix factorization
these auxiliary variables correspond to.
rank : int
Rank of the decomposition.
mode : int
The mode represented by the factor matrices that these
auxiliary variables correspond to.
random_state : RandomState
TensorLy random state.
Returns
-------
tuple
Tuple whose first element is the :math:`R \times R` coordinate matrix
and second element is the list of the orthogonal basis matrices.
"""
if not isinstance(self.aux_init, (str, tuple)):
raise TypeError(
"Parafac2 auxiliary variables must be initialized using either a string"
" or a tuple (containing the orthogonal basis matrices and the coordinate matrix)."
)
if not isinstance(rank, int):
raise TypeError("Rank must be int, not {}".format(type(rank)))
if not isinstance(mode, int):
raise TypeError("Mode must be int, not {}".format(type(mode)))
if mode != 1:
raise ValueError("PARAFAC2 constraint can only be imposed with mode=1")
if self.aux_init == "random_uniform":
coordinate_matrix = tl.tensor(random_state.uniform(size=(rank, rank)))
basis_matrices = [tl.eye(M.shape[0], rank) for M in matrices]
return basis_matrices, coordinate_matrix
if self.aux_init == "random_standard_normal":
coordinate_matrix = tl.tensor(random_state.standard_normal(size=(rank, rank)))
basis_matrices = [tl.eye(M.shape[0], rank) for M in matrices]
return basis_matrices, coordinate_matrix
if self.aux_init == "zeros":
coordinate_matrix = tl.zeros((rank, rank))
basis_matrices = [tl.eye(M.shape[0], rank) for M in matrices]
return basis_matrices, coordinate_matrix
elif isinstance(self.aux_init, tuple):
basis_matrices, coordinate_matrix = self.aux_init
if not isinstance(basis_matrices, list) or not tl.is_tensor(coordinate_matrix):
raise TypeError(
"If self.aux_init is a tuple, then its first element must be a list of basis matrices "
"and second element the coordinate matrix."
)
if not len(tl.shape(coordinate_matrix)) == 2:
raise ValueError(
"The coordinate matrix must have two modes, not {}".format(len(tl.shape(coordinate_matrix)))
)
if (
tl.shape(coordinate_matrix)[0] != tl.shape(coordinate_matrix)[1]
or tl.shape(coordinate_matrix)[0] != rank
):
raise ValueError(
"The coordinate matrix must be rank x rank, with rank={}, not {}".format(
rank, tl.shape(coordinate_matrix)
)
)
for matrix, basis_matrix in zip(matrices, basis_matrices):
if not tl.is_tensor(basis_matrix):
raise TypeError("Each basis matrix must be a tensorly tensor")
if not len(tl.shape(basis_matrix)) == 2:
raise ValueError(
"Each basis matrix must be tensor with two modes, not {}".format(len(tl.shape(basis_matrix)))
)
if tl.shape(matrix)[0] != tl.shape(basis_matrix)[0] or tl.shape(basis_matrix)[1] != rank:
raise ValueError(
"The i-th basis matrix must have shape J_i x rank, where J_i is the number of "
"rows in the i-th matrix."
)
cross_product = tl.dot(tl.transpose(basis_matrix), basis_matrix)
if not tl.sum((cross_product - tl.eye(rank)) ** 2) < 1e-8:
raise ValueError("The basis matrices must be orthogonal")
if len(basis_matrices) != len(matrices):
raise ValueError("There must be as many basis matrices as there are matrices")
return self.aux_init
else:
raise ValueError(f"Unknown aux init: {self.aux_init}")
[docs]
@copy_ancestor_docstring
def factor_matrices_update(self, factor_matrices, feasibility_penalties, auxes):
basis_matrices, coordinate_matrix = auxes
R = tl.shape(coordinate_matrix)[0]
for it in range(self.n_iter):
# Update orthogonal basis matrices
if self.update_basis_matrices:
basis_matrices = [] # To prevent inplace editing of basis matrices
for fm in factor_matrices:
U, s, Vh = self.svd_fun(tl.matmul(fm, tl.transpose(coordinate_matrix)), n_eigenvecs=R)
basis_matrices.append(tl.matmul(U, Vh))
if self.update_coordinate_matrix:
# Project all factor matrices onto the space spanned by the orthogonal basis matrices
# and compute weighted mean
coordinate_matrix = 0
for fm, basis_matrix, feasibility_penalty in zip(
factor_matrices, basis_matrices, feasibility_penalties
):
coordinate_matrix += feasibility_penalty * basis_matrix.T @ fm
coordinate_matrix /= sum(feasibility_penalties)
if (not self.update_coordinate_matrix) or (not self.update_basis_matrices):
break
return basis_matrices, coordinate_matrix
[docs]
def subtract_from_aux(self, aux, dual):
"""Raises TypeError since the PARAFAC2 constraint only works with mode=1."""
raise TypeError("The PARAFAC2 constraint cannot shift a single factor matrix.")
[docs]
def subtract_from_auxes(self, auxes, duals):
r"""Compute (aux - dual) for each auxiliary- and dual-factor matrix for mode=1.
Computing the difference between the auxiliary variables and the dual variables
is an essential part of ADMM. However, the auxiliary variables is not a
list of factor matrices but rather a coordinate matrix and a collection of
orthogonal basis matrices, so this difference cannot be computed by simply subtracting
one from the other. First auxiliary matrices must be computed by multiplying
the basis matrices with the coordinate matrix and then the difference can
be computed.
Parameters
----------
auxes : tuple
Tuple whose first element is the :math:`R \times R` coordinate matrix
and second element is the list of the orthogonal basis matrices.
duals : list of tl.tensor(ndim=2)
Dual variables (or other variable to subtract from the auxes)
Returns
-------
list of tl.tensor(ndim=2)
The list of differences
"""
P_is, coord_mat = auxes
return [tl.dot(P_i, coord_mat) - dual for P_i, dual in zip(P_is, duals)]
[docs]
def aux_as_matrix(self, aux):
"""Raises TypeError since the PARAFAC2 constraint only works with mode=1."""
raise TypeError("The PARAFAC2 constraint cannot convert a single aux to a matrix")
[docs]
def auxes_as_matrices(self, auxes):
"""Convert a the auxiliary variables into a list of matrices (mode=1).
This function computes the list of matrices parametrized by the coordinate
matrix and orthogonal basis matrices by multiplying them together.
Parameters
----------
auxes : tuple
Tuple whose first element is the :math:`R \times R` coordinate matrix
and second element is the list of the orthogonal basis matrices.
Returns
-------
list of tl.tensor(ndim=2)
"""
P_is, coord_mat = auxes
return [tl.dot(P_i, coord_mat) for P_i in P_is]
[docs]
def penalty(self, x):
"""Returns 0 as there is no penalty for hard constraints.
Hard constraints are always penalised with 0 even when the components are infeasible.
Slightly infeasible components would otherwise result in an infinite penalty because
the penalty function of hard constraints is 0 for feasible solutions and infinity for
infeasible solutions. An infinite penalty would stop all convergence checking and not
provide any information on the quality of the components. To ensure that the hard
constraints are sufficiently imposed, it is recommended to examine the feasibility gap
instead of the penalty and ensure that the feasibility gap is low.
Parameters
----------
x : list of tl.tensor(ndim=2)
List of factor matrices.
"""
if not isinstance(x, list):
raise TypeError("Cannot compute PARAFAC2 penalty of other types than a list of tensors")
return 0
[docs]
class TemporalSmoothnessPenalty(MatricesPenalty):
r"""Impose temporal smoothness on the uncoupled factor matrices. Formally, this means adding
the following penalty term:
.. math::
\lambda_B \sum_{k=2}^K \lVert \M{B}_k - \M{B}_{k-1} \rVert^2_F .
More information as well as experiments can be found in :cite:p:`chatzis2023timeaware`.
**The proximal operator**
Taking the partial derivative of the objective function with respect to each auxiliary
variable results in a tridiagonal system of equations. More specifically:
.. math::
\begin{flalign}
& \bigl( 2\lambda_{\M{B}} + \rho_{\M{B}_1}\bigr) \M{Z}_{\M{B}_1} -2\lambda_{\M{B}}\M{Z}_{\M{B}_2} = \rho_{\M{B}_1} \bigr( \M{B}_1 + \M{\mu}_{\M{B}_1} \bigl) \notag \\
& \bigl(4\lambda_{\M{B}}+\rho_{\M{B}_k}\bigr)\M{Z}_{\M{B}_k}-2\lambda_{\M{B}} \bigl(\M{Z}_{\M{B}_{k-1}}+\M{Z}_{\M{B}_{k+1}} \bigr) =\rho_{\M{B}_k} \bigr(\M{B}_k+\M{\mu}_{\M{B}_k}\bigl), \qquad k = 2,\ldots,K-1 \notag \\
& \vdots \notag \\
& \bigl( 2\lambda_{\M{B}} + \rho_{\M{B}_K}\bigr) \M{Z}_{\M{B}_K} -2\lambda_{\M{B}}\M{Z}_{\M{B}_{K-1}} = \rho_{\M{B}_K} \bigr( \M{B}_K + \,{\mu}_{\M{B}_K} \bigl) \notag
\end{flalign}
We then form a tridiagonal matrix :math:`\M{A}` and solve the system of equations using Thomas algorithm.
Keep in mind that once we apply a norm-based penalty in the factor matrices, we need to also penalize the
norm of all other factor matrices :cite:p:`roald2021parafac2` to overcome the scaling ambiguity of PARAFAC2.
This can be done, for example, by adding ridge penalties on the norms of the other two factor matrices.
Parameters
----------
smoothness_l : float
Hyperparemeter controlling the strength of the smoothness penalty.
aux_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
dual_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
"""
def __init__(self, smoothness_l, aux_init="random_uniform", dual_init="random_uniform"):
super().__init__(aux_init=aux_init, dual_init=dual_init)
self.smoothness_l = smoothness_l
def _get_laplace_coef(self, i, I):
if i == 0 or i == I - 1:
return 2 * self.smoothness_l
else:
return 4 * self.smoothness_l
[docs]
@copy_ancestor_docstring
def factor_matrices_update(self, factor_matrices, feasibility_penalties, auxes):
B_is = factor_matrices
I = len(factor_matrices) # noqa: E741
rhos = feasibility_penalties
rhs = [rhos[i] * factor_matrices[i] for i in range(I)]
# Construct matrix A to peform thomas algorithm on
A = (
tl.diag(tl.tensor([self._get_laplace_coef(i, I) + rhos[i] for i, rho in enumerate(rhos)]), k=0)
- tl.diag(tl.ones(I - 1) * 2 * self.smoothness_l, k=1)
- tl.diag(tl.ones(I - 1) * 2 * self.smoothness_l, k=-1)
)
# Peform GE
for k in range(1, A.shape[-1]):
m = A[k, k - 1] / A[k - 1, k - 1]
A[k, :] = A[k, :] - m * A[k - 1, :]
rhs[k] = rhs[k] - m * rhs[k - 1] # Also update the respective rhs!
# Back-substitution
new_ZBks = [tl.zeros(B_is[i].shape) for i in range(I)]
new_ZBks[-1] = rhs[-1] / A[-1, -1]
q = new_ZBks[-1]
for i in range(A.shape[-1] - 2, -1, -1):
q = (rhs[i] - A[i, i + 1] * q) / A[i, i]
new_ZBks[i] = q
return new_ZBks
[docs]
def penalty(self, x):
penalty = 0
for x1, x2 in zip(x[:-1], x[1:]):
penalty += tl.sum((x1 - x2) ** 2)
return self.smoothness_l * penalty
[docs]
class LDSPenalty(MatricesPenalty):
r"""Impose Linear Dynamical System Constraint (LDS) on the evolving factor matrices:
.. math::
\lambda_B \sum_{k=2}^K \lVert \mathbf{B}_k - \mathbf{H} \mathbf{B}_{k-1} \rVert^2_F .
More information as well as experiments can be found in :cite:p:`chatzis2025dcmf`.
Keep in mind that once we apply a norm-based penalty in the factor matrices, we need to also penalize the
norm of all other factor matrices :cite:p:`roald2021parafac2` to overcome the scaling ambiguity of PARAFAC2.
This can be done, for example, by adding ridge penalties on the norms of the other two factor matrices.
Parameters
----------
smoothness_l : float
Hyperparemeter controlling the strength of the LDS penalty.
H : tl.tensor(ndim=2)
The transition matrix assumed on the factor matrices.
aux_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
dual_init : {"random_uniform", "random_standard_normal", "zeros", tl.tensor(ndim=2), list of tl.tensor(ndim=2)}
Initialisation method for the auxiliary variables
"""
def __init__(
self,
smoothness_l,
H,
aux_init="random_uniform",
dual_init="random_uniform",
):
super().__init__(aux_init=aux_init, dual_init=dual_init)
self.smoothness_l = smoothness_l
self.H = H
[docs]
@copy_ancestor_docstring
def factor_matrices_update(self, factor_matrices, feasibility_penalties, auxes):
B_is = factor_matrices
rhos = feasibility_penalties
J = B_is[0].shape[-2]
K = len(B_is)
HtH = self.H.T @ self.H
rhs = np.vstack([rhos[i] * factor_matrices[i] for i in range(len(B_is))])
# Build the sparse block matrix A
diagonal_blocks = []
off_diagonal_terms = [] # Store off-diagonal connections
# Step 1: Construct diagonal and off-diagonal blocks
for k in range(K):
# Diagonal block
if k == 0:
A_diag = 2 * self.smoothness_l * HtH + rhos[k] * np.eye(J)
elif k == K - 1:
A_diag = (2 * self.smoothness_l + rhos[k]) * np.eye(J)
else:
A_diag = 2 * self.smoothness_l * HtH + rhos[k] * np.eye(J) + 2 * self.smoothness_l * np.eye(J)
diagonal_blocks.append(csr_matrix(A_diag))
# Off-diagonal terms
if k > 0:
off_diag = -2 * self.smoothness_l * self.H
off_diagonal_terms.append(((k, k - 1), csr_matrix(off_diag)))
if k < K - 1:
off_diag = -2 * self.smoothness_l * self.H.T
off_diagonal_terms.append(((k, k + 1), csr_matrix(off_diag)))
# Step 2: Build the sparse matrix A
block_matrix = [[None for _ in range(K)] for _ in range(K)]
# Fill diagonal blocks
for i, diag in enumerate(diagonal_blocks):
block_matrix[i][i] = diag
# Fill off-diagonal blocks
for (i, j), off_diag in off_diagonal_terms:
block_matrix[i][j] = off_diag
# Convert to sparse block matrix
A_sparse = bmat(block_matrix, format="csr")
# Step 3: Solve the sparse system
Z_vec = spsolve(A_sparse, rhs)
# Step 4: Reshape the solution back into matrices
Z_B = [Z_vec[k * J : (k + 1) * J, :] for k in range(K)]
return Z_B
[docs]
def penalty(self, x):
penalty = 0
for x1, x2 in zip(x[:-1], x[1:]):
penalty += np.sum((self.H @ x1 - x2) ** 2)
return self.smoothness_l * penalty