import numbers
import numpy as np
from scipy import linalg
from scipy.sparse.linalg import svds
from sklearn.decomposition._base import _BasePCA
from sklearn.decomposition._pca import _infer_dimension
from sklearn.exceptions import NotFittedError
from sklearn.kernel_ridge import KernelRidge
from sklearn.linear_model._base import LinearModel
from sklearn.metrics.pairwise import pairwise_kernels
from sklearn.utils import check_random_state
from sklearn.utils._arpack import _init_arpack_v0
from sklearn.utils.extmath import randomized_svd, stable_cumsum, svd_flip
from sklearn.utils.validation import _check_n_features, check_is_fitted, validate_data
from ..preprocessing import KernelNormalizer
from ..utils import check_krr_fit, pcovr_kernel
[docs]
class KernelPCovR(_BasePCA, LinearModel):
r"""Kernel Principal Covariates Regression, as described in [Helfrecht2020]_
determines a latent-space projection :math:`\mathbf{T}` which minimizes a combined
loss in supervised and unsupervised tasks in the reproducing kernel Hilbert space
(RKHS).
This projection is determined by the eigendecomposition of a modified gram matrix
:math:`\mathbf{\tilde{K}}`
.. math::
\mathbf{\tilde{K}} = \alpha \mathbf{K} +
(1 - \alpha) \mathbf{\hat{Y}}\mathbf{\hat{Y}}^T
where :math:`\alpha` is a mixing parameter,
:math:`\mathbf{K}` is the input kernel of shape :math:`(n_{samples}, n_{samples})`
and :math:`\mathbf{\hat{Y}}` is the target matrix of shape
:math:`(n_{samples}, n_{properties})`.
Parameters
----------
mixing : float, default=0.5
mixing parameter, as described in PCovR as :math:`{\alpha}`
n_components : int, float or str, default=None
Number of components to keep.
if n_components is not set all components are kept::
n_components == n_samples
svd_solver : {'auto', 'full', 'arpack', 'randomized'}, default='auto'
If auto :
The solver is selected by a default policy based on `X.shape` and
`n_components`: if the input data is larger than 500x500 and the
number of components to extract is lower than 80% of the smallest
dimension of the data, then the more efficient 'randomized'
method is enabled. Otherwise the exact full SVD is computed and
optionally truncated afterwards.
If full :
run exact full SVD calling the standard LAPACK solver via
`scipy.linalg.svd` and select the components by postprocessing
If arpack :
run SVD truncated to n_components calling ARPACK solver via
`scipy.sparse.linalg.svds`. It requires strictly
0 < n_components < min(X.shape)
If randomized :
run randomized SVD by the method of Halko et al.
regressor : {instance of `sklearn.kernel_ridge.KernelRidge`, `precomputed`, None}, default=None
The regressor to use for computing
the property predictions :math:`\hat{\mathbf{Y}}`.
A pre-fitted regressor may be provided.
If the regressor is not `None`, its kernel parameters
(`kernel`, `gamma`, `degree`, `coef0`, and `kernel_params`)
must be identical to those passed directly to `KernelPCovR`.
If `precomputed`, we assume that the `y` passed to the `fit` function
is the regressed form of the targets :math:`{\mathbf{\hat{Y}}}`.
kernel : "linear" | "poly" | "rbf" | "sigmoid" | "cosine" | "precomputed"
Kernel. Default="linear".
gamma : float, default=None
Kernel coefficient for rbf, poly and sigmoid kernels. Ignored by other
kernels.
degree : int, default=3
Degree for poly kernels. Ignored by other kernels.
coef0 : float, default=1
Independent term in poly and sigmoid kernels.
Ignored by other kernels.
kernel_params : mapping of str to any, default=None
Parameters (keyword arguments) and values for kernel passed as
callable object. Ignored by other kernels.
center : bool, default=False
Whether to center any computed kernels
fit_inverse_transform : bool, default=False
Learn the inverse transform for non-precomputed kernels.
(i.e. learn to find the pre-image of a point)
tol : float, default=1e-12
Tolerance for singular values computed by svd_solver == 'arpack'
and for matrix inversions.
Must be of range [0.0, infinity).
n_jobs : int, default=None
The number of parallel jobs to run.
:obj:`None` means 1 unless in a :obj:`joblib.parallel_backend` context.
``-1`` means using all processors.
iterated_power : int or 'auto', default='auto'
Number of iterations for the power method computed by
svd_solver == 'randomized'.
Must be of range [0, infinity).
random_state : int, :class:`numpy.random.RandomState` instance or None, default=None
Used when the 'arpack' or 'randomized' solvers are used. Pass an int
for reproducible results across multiple function calls.
Attributes
----------
pt__: numpy.darray of size :math:`({n_{components}, n_{components}})`
pseudo-inverse of the latent-space projection, which
can be used to contruct projectors from latent-space
pkt_: numpy.ndarray of size :math:`({n_{samples}, n_{components}})`
the projector, or weights, from the input kernel :math:`\mathbf{K}`
to the latent-space projection :math:`\mathbf{T}`
pky_: numpy.ndarray of size :math:`({n_{samples}, n_{properties}})`
the projector, or weights, from the input kernel :math:`\mathbf{K}`
to the properties :math:`\mathbf{Y}`
pty_: numpy.ndarray of size :math:`({n_{components}, n_{properties}})`
the projector, or weights, from the latent-space projection
:math:`\mathbf{T}` to the properties :math:`\mathbf{Y}`
ptx_: numpy.ndarray of size :math:`({n_{components}, n_{features}})`
the projector, or weights, from the latent-space projection
:math:`\mathbf{T}` to the feature matrix :math:`\mathbf{X}`
X_fit_: numpy.ndarray of shape (n_samples, n_features)
The data used to fit the model. This attribute is used to build kernels
from new data.
Examples
--------
>>> import numpy as np
>>> from skmatter.decomposition import KernelPCovR
>>> from skmatter.preprocessing import StandardFlexibleScaler as SFS
>>> from sklearn.kernel_ridge import KernelRidge
>>> X = np.array([[-1, 1, -3, 1], [1, -2, 1, 2], [-2, 0, -2, -2], [1, 0, 2, -1]])
>>> X = SFS().fit_transform(X)
>>> Y = np.array([[0, -5], [-1, 1], [1, -5], [-3, 2]])
>>> Y = SFS(column_wise=True).fit_transform(Y)
>>> kpcovr = KernelPCovR(
... mixing=0.1,
... n_components=2,
... regressor=KernelRidge(kernel="rbf", gamma=1),
... kernel="rbf",
... gamma=1,
... )
>>> kpcovr.fit(X, Y)
KernelPCovR(gamma=1, kernel='rbf', mixing=0.1, n_components=2,
regressor=KernelRidge(gamma=1, kernel='rbf'))
>>> kpcovr.transform(X)
array([[-0.61261285, -0.18937908],
[ 0.45242098, 0.25453465],
[-0.77871824, 0.04847559],
[ 0.91186937, -0.21211816]])
>>> kpcovr.predict(X)
array([[ 0.5100212 , -0.99488463],
[-0.18992219, 0.82064368],
[ 1.11923584, -1.04798016],
[-1.5635827 , 1.11078662]])
>>> round(kpcovr.score(X, Y), 5)
np.float64(-0.52039)
""" # NoQa: E501
def __init__(
self,
mixing=0.5,
n_components=None,
svd_solver="auto",
regressor=None,
kernel="linear",
gamma=None,
degree=3,
coef0=1,
kernel_params=None,
center=False,
fit_inverse_transform=False,
tol=1e-12,
n_jobs=None,
iterated_power="auto",
random_state=None,
):
self.mixing = mixing
self.n_components = n_components
self.svd_solver = svd_solver
self.tol = tol
self.iterated_power = iterated_power
self.random_state = random_state
self.center = center
self.kernel = kernel
self.gamma = gamma
self.degree = degree
self.coef0 = coef0
self.kernel_params = kernel_params
self.n_jobs = n_jobs
self.fit_inverse_transform = fit_inverse_transform
self.regressor = regressor
def _get_kernel(self, X, Y=None):
if callable(self.kernel):
params = self.kernel_params or {}
else:
params = {"gamma": self.gamma, "degree": self.degree, "coef0": self.coef0}
return pairwise_kernels(
X, Y, metric=self.kernel, filter_params=True, n_jobs=self.n_jobs, **params
)
def _fit(self, K, Yhat, W):
"""Fit the model with the computed kernel and approximated properties."""
K_tilde = pcovr_kernel(mixing=self.mixing, X=K, Y=Yhat, kernel="precomputed")
if self._fit_svd_solver == "full":
_, S, Vt = self._decompose_full(K_tilde)
elif self._fit_svd_solver in ["arpack", "randomized"]:
_, S, Vt = self._decompose_truncated(K_tilde)
else:
raise ValueError(
"Unrecognized svd_solver='{0}'" "".format(self._fit_svd_solver)
)
U = Vt.T
P = (self.mixing * np.eye(K.shape[0])) + (1.0 - self.mixing) * (W @ Yhat.T)
S_inv = np.array([1.0 / s if s > self.tol else 0.0 for s in S])
self.pkt_ = P @ U @ np.sqrt(np.diagflat(S_inv))
T = K @ self.pkt_
self.pt__ = np.linalg.lstsq(T, np.eye(T.shape[0]), rcond=self.tol)[0]
[docs]
def fit(self, X, Y, W=None):
r"""Fit the model with X and Y.
Parameters
----------
X : numpy.ndarray, shape (n_samples, n_features)
Training data, where n_samples is the number of samples and
n_features is the number of features.
It is suggested that :math:`\mathbf{X}` be centered by its column-
means and scaled. If features are related, the matrix should be scaled
to have unit variance, otherwise :math:`\mathbf{X}` should be
scaled so that each feature has a variance of 1 / n_features.
Y : numpy.ndarray, shape (n_samples, n_properties)
Training data, where n_samples is the number of samples and
n_properties is the number of properties
It is suggested that :math:`\mathbf{X}` be centered by its column-
means and scaled. If features are related, the matrix should be scaled
to have unit variance, otherwise :math:`\mathbf{Y}` should be
scaled so that each feature has a variance of 1 / n_features.
W : numpy.ndarray, shape (n_samples, n_properties)
Regression weights, optional when regressor=`precomputed`. If not
passed, it is assumed that `W = np.linalg.lstsq(K, Y, self.tol)[0]`
Returns
-------
self: object
Returns the instance itself.
"""
if self.regressor not in ["precomputed", None] and not isinstance(
self.regressor, KernelRidge
):
raise ValueError("Regressor must be an instance of `KernelRidge`")
X, Y = validate_data(self, X, Y, y_numeric=True, multi_output=True)
self.X_fit_ = X.copy()
if self.n_components is None:
if self.svd_solver != "arpack":
self.n_components_ = X.shape[0]
else:
self.n_components_ = X.shape[0] - 1
else:
self.n_components_ = self.n_components
K = self._get_kernel(X)
if self.center:
self.centerer_ = KernelNormalizer()
K = self.centerer_.fit_transform(K)
self.n_samples_in_, self.n_features_in_ = X.shape
if self.regressor != "precomputed":
if self.regressor is None:
regressor = KernelRidge(
kernel=self.kernel,
gamma=self.gamma,
degree=self.degree,
coef0=self.coef0,
kernel_params=self.kernel_params,
)
else:
regressor = self.regressor
kernel_attrs = ["kernel", "gamma", "degree", "coef0", "kernel_params"]
if not all(
[
getattr(self, attr) == getattr(regressor, attr)
for attr in kernel_attrs
]
):
raise ValueError(
"Kernel parameter mismatch: the regressor has kernel "
"parameters {%s} and KernelPCovR was initialized with kernel "
"parameters {%s}"
% (
", ".join(
[
"%s: %r" % (attr, getattr(regressor, attr))
for attr in kernel_attrs
]
),
", ".join(
[
"%s: %r" % (attr, getattr(self, attr))
for attr in kernel_attrs
]
),
)
)
# Check if regressor is fitted; if not, fit with precomputed K
# to avoid needing to compute the kernel a second time
self.regressor_ = check_krr_fit(regressor, K, X, Y)
W = self.regressor_.dual_coef_.reshape(self.n_samples_in_, -1)
# Use this instead of `self.regressor_.predict(K)`
# so that we can handle the case of the pre-fitted regressor
Yhat = K @ W
# When we have an unfitted regressor,
# we fit it with a precomputed K
# so we must subsequently "reset" it so that
# it will work on the particular X
# of the KPCovR call. The dual coefficients are kept.
# Can be bypassed if the regressor is pre-fitted.
try:
check_is_fitted(regressor)
except NotFittedError:
self.regressor_.set_params(**regressor.get_params())
self.regressor_.X_fit_ = self.X_fit_
_check_n_features(self.regressor_, self.X_fit_, reset=True)
else:
Yhat = Y.copy()
if W is None:
W = np.linalg.lstsq(K, Yhat, self.tol)[0]
# Handle svd_solver
self._fit_svd_solver = self.svd_solver
if self._fit_svd_solver == "auto":
# Small problem or self.n_components_ == 'mle', just call full PCA
if (
max(self.n_samples_in_, self.n_features_in_) <= 500
or self.n_components_ == "mle"
):
self._fit_svd_solver = "full"
elif self.n_components_ >= 1 and self.n_components_ < 0.8 * max(
self.n_samples_in_, self.n_features_in_
):
self._fit_svd_solver = "randomized"
# This is also the case of self.n_components_ in (0,1)
else:
self._fit_svd_solver = "full"
self._fit(K, Yhat, W)
self.ptk_ = self.pt__ @ K
self.pty_ = self.pt__ @ Y
if self.fit_inverse_transform:
self.ptx_ = self.pt__ @ X
self.pky_ = self.pkt_ @ self.pty_
self.components_ = self.pkt_.T # for sklearn compatibility
return self
[docs]
def predict(self, X=None):
"""Predicts the property values"""
check_is_fitted(self, ["pky_", "pty_"])
X = validate_data(self, X, reset=False)
K = self._get_kernel(X, self.X_fit_)
if self.center:
K = self.centerer_.transform(K)
return K @ self.pky_
[docs]
def score(self, X, y):
r"""Computes the (negative) loss values for KernelPCovR on the given predictor
and response variables. The loss in :math:`\mathbf{K}`, as explained in
[Helfrecht2020]_ does not correspond to a traditional Gram loss
:math:`\mathbf{K} - \mathbf{TT}^T`. Indicating the kernel between set A and B as
:math:`\mathbf{K}_{AB}`, the projection of set A as :math:`\mathbf{T}_A`, and
with N and V as the train and validation/test set, one obtains
.. math::
\ell=\frac{\operatorname{Tr}\left[\mathbf{K}_{VV} - 2
\mathbf{K}_{VN} \mathbf{T}_N
(\mathbf{T}_N^T \mathbf{T}_N)^{-1} \mathbf{T}_V^T
+\mathbf{T}_V(\mathbf{T}_N^T \mathbf{T}_N)^{-1} \mathbf{T}_N^T
\mathbf{K}_{NN} \mathbf{T}_N (\mathbf{T}_N^T \mathbf{T}_N)^{-1}
\mathbf{T}_V^T\right]}{\operatorname{Tr}(\mathbf{K}_{VV})}
The negative loss is returned for easier use in sklearn pipelines, e.g., a grid
search, where methods named 'score' are meant to be maximized.
Parameters
----------
X : numpy.ndarray
independent (predictor) variable
Y : numpy.ndarray
dependent (response) variable
Returns
-------
L : float
Negative sum of the KPCA and KRR losses, with the KPCA loss determined by
the reconstruction of the kernel
"""
check_is_fitted(self, ["pkt_", "X_fit_"])
X = validate_data(self, X, reset=False)
K_NN = self._get_kernel(self.X_fit_, self.X_fit_)
K_VN = self._get_kernel(X, self.X_fit_)
K_VV = self._get_kernel(X)
if self.center:
K_NN = self.centerer_.transform(K_NN)
K_VN = self.centerer_.transform(K_VN)
K_VV = self.centerer_.transform(K_VV)
ypred = K_VN @ self.pky_
Lkrr = np.linalg.norm(y - ypred) ** 2 / np.linalg.norm(y) ** 2
t_n = K_NN @ self.pkt_
t_v = K_VN @ self.pkt_
w = (
t_n
@ np.linalg.lstsq(t_n.T @ t_n, np.eye(t_n.shape[1]), rcond=self.tol)[0]
@ t_v.T
)
Lkpca = np.trace(K_VV - 2 * K_VN @ w + w.T @ K_VV @ w) / np.trace(K_VV)
return -sum([Lkpca, Lkrr])
def _decompose_truncated(self, mat):
if not 1 <= self.n_components_ <= self.n_samples_in_:
raise ValueError(
"n_components=%r must be between 1 and "
"n_samples=%r with "
"svd_solver='%s'"
% (
self.n_components_,
self.n_samples_in_,
self.svd_solver,
)
)
elif not isinstance(self.n_components_, numbers.Integral):
raise ValueError(
"n_components=%r must be of type int "
"when greater than or equal to 1, was of type=%r"
% (self.n_components_, type(self.n_components_))
)
elif self.svd_solver == "arpack" and self.n_components_ == self.n_samples_in_:
raise ValueError(
"n_components=%r must be strictly less than "
"n_samples=%r with "
"svd_solver='%s'"
% (
self.n_components_,
self.n_samples_in_,
self.svd_solver,
)
)
random_state = check_random_state(self.random_state)
if self._fit_svd_solver == "arpack":
v0 = _init_arpack_v0(min(mat.shape), random_state)
U, S, Vt = svds(mat, k=self.n_components_, tol=self.tol, v0=v0)
# svds doesn't abide by scipy.linalg.svd/randomized_svd
# conventions, so reverse its outputs.
S = S[::-1]
# flip eigenvectors' sign to enforce deterministic output
U, Vt = svd_flip(U[:, ::-1], Vt[::-1])
# We have already eliminated all other solvers, so this must be "randomized"
else:
# sign flipping is done inside
U, S, Vt = randomized_svd(
mat,
n_components=self.n_components_,
n_iter=self.iterated_power,
flip_sign=True,
random_state=random_state,
)
U[:, S < self.tol] = 0.0
Vt[S < self.tol] = 0.0
S[S < self.tol] = 0.0
return U, S, Vt
def _decompose_full(self, mat):
if self.n_components_ != "mle":
if not (0 <= self.n_components_ <= self.n_samples_in_):
raise ValueError(
"n_components=%r must be between 1 and "
"n_samples=%r with "
"svd_solver='%s'"
% (
self.n_components_,
self.n_samples_in_,
self.svd_solver,
)
)
elif self.n_components_ >= 1:
if not isinstance(self.n_components_, numbers.Integral):
raise ValueError(
"n_components=%r must be of type int "
"when greater than or equal to 1, "
"was of type=%r"
% (self.n_components_, type(self.n_components_))
)
U, S, Vt = linalg.svd(mat, full_matrices=False)
U[:, S < self.tol] = 0.0
Vt[S < self.tol] = 0.0
S[S < self.tol] = 0.0
# flip eigenvectors' sign to enforce deterministic output
U, Vt = svd_flip(U, Vt)
# Get variance explained by singular values
explained_variance_ = (S**2) / (self.n_samples_in_ - 1)
total_var = explained_variance_.sum()
explained_variance_ratio_ = explained_variance_ / total_var
# Postprocess the number of components required
if self.n_components_ == "mle":
self.n_components_ = _infer_dimension(
explained_variance_, self.n_samples_in_
)
elif 0 < self.n_components_ < 1.0:
# number of components for which the cumulated explained
# variance percentage is superior to the desired threshold
# side='right' ensures that number of features selected
# their variance is always greater than self.n_components_ float
# passed. More discussion in issue: #15669
ratio_cumsum = stable_cumsum(explained_variance_ratio_)
self.n_components_ = (
np.searchsorted(ratio_cumsum, self.n_components_, side="right") + 1
)
return (
U[:, : self.n_components_],
S[: self.n_components_],
Vt[: self.n_components_],
)