# -*- coding: utf-8 -*-
"""Base class for core models
"""
# Author: Yue Zhao <zhaoy@cmu.edu>
# License: BSD 2 clause
import warnings
from collections import defaultdict
from abc import ABC, abstractmethod
import numpy as np
from numpy import percentile
from scipy.special import erf
from sklearn.preprocessing import MinMaxScaler
from sklearn.utils import column_or_1d
from sklearn.utils.validation import check_is_fitted
from sklearn.utils.multiclass import check_classification_targets
from .sklearn_base import _pprint
from inspect import signature
[docs]class BaseAggregator(ABC):
"""Abstract class for all combination classes.
Parameters
----------
base_estimators: list, length must be greater than 1
A list of base estimators. Certain methods must be present, e.g.,
`fit` and `predict`.
pre_fitted: bool, optional (default=False)
Whether the base estimators are trained. If True, `fit`
process may be skipped.
"""
@abstractmethod
def __init__(self, base_estimators, pre_fitted=False):
assert (isinstance(base_estimators, (list)))
if len(base_estimators) < 2:
raise ValueError('At least 2 estimators are required')
self.base_estimators = base_estimators
self.n_base_estimators_ = len(self.base_estimators)
self.pre_fitted = pre_fitted
[docs] @abstractmethod
def fit(self, X, y=None):
"""Fit estimator. y is optional for unsupervised methods.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
y : numpy array of shape (n_samples,), optional (default=None)
The ground truth of the input samples (labels).
Returns
-------
self
"""
pass
# todo: make sure fit then predict is equivalent to fit_predict
[docs] @abstractmethod
def fit_predict(self, X, y=None):
"""Fit estimator and predict on X. y is optional for unsupervised
methods.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
y : numpy array of shape (n_samples,), optional (default=None)
The ground truth of the input samples (labels).
Returns
-------
labels : numpy array of shape (n_samples,)
Class labels for each data sample.
"""
pass
[docs] @abstractmethod
def predict(self, X):
"""Predict the class labels for the provided data.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
Returns
-------
labels : numpy array of shape (n_samples,)
Class labels for each data sample.
"""
pass
[docs] @abstractmethod
def predict_proba(self, X):
"""Return probability estimates for the test data X.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
Returns
-------
p : numpy array of shape (n_samples,)
The class probabilities of the input samples.
Classes are ordered by lexicographic order.
"""
pass
def _process_decision_scores(self):
"""Internal function to calculate key attributes for outlier detection
combination tasks.
- threshold_: used to decide the binary label
- labels_: binary labels of training data
Returns
-------
self
"""
self.threshold_ = percentile(self.decision_scores_,
100 * (1 - self.contamination))
self.labels_ = (self.decision_scores_ > self.threshold_).astype(
'int').ravel()
# calculate for predict_proba()
self._mu = np.mean(self.decision_scores_)
self._sigma = np.std(self.decision_scores_)
return self
def _detector_predict(self, X):
"""Internal function to predict if a particular sample is an
outlier or not.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. 0 stands for inliers and 1 for outliers.
"""
check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])
pred_score = self.decision_function(X)
return (pred_score > self.threshold_).astype('int').ravel()
def _detector_predict_proba(self, X, proba_method='linear'):
"""Predict the probability of a sample being outlier. Two approaches
are possible:
1. simply use Min-max conversion to linearly transform the outlier
scores into the range of [0,1]. The model must be
fitted first.
2. use unifying scores, see :cite:`kriegel2011interpreting`.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
proba_method : str, optional (default='linear')
Probability conversion method. It must be one of
'linear' or 'unify'.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. Return the outlier probability, ranging
in [0,1].
"""
check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])
train_scores = self.decision_scores_
test_scores = self.decision_function(X)
probs = np.zeros([X.shape[0], int(self._classes)])
if proba_method == 'linear':
scaler = MinMaxScaler().fit(train_scores.reshape(-1, 1))
probs[:, 1] = scaler.transform(
test_scores.reshape(-1, 1)).ravel().clip(0, 1)
probs[:, 0] = 1 - probs[:, 1]
return probs
elif proba_method == 'unify':
# turn output into probability
pre_erf_score = (test_scores - self._mu) / (
self._sigma * np.sqrt(2))
erf_score = erf(pre_erf_score)
probs[:, 1] = erf_score.clip(0, 1).ravel()
probs[:, 0] = 1 - probs[:, 1]
return probs
else:
raise ValueError(proba_method,
'is not a valid probability conversion method')
def _set_n_classes(self, y):
"""Set the number of classes if `y` is presented.
Parameters
----------
y : numpy array of shape (n_samples,)
Ground truth.
Returns
-------
self
"""
self._classes = 2 # default as binary classification
if y is not None:
check_classification_targets(y)
self._classes = len(np.unique(y))
return self
def _set_weights(self, weights):
"""Internal function to set estimator weights.
Parameters
----------
weights : numpy array of shape (n_estimators,)
Estimator weights. May be used after the alignment.
Returns
-------
self
"""
if weights is None:
self.weights = np.ones([1, self.n_base_estimators_])
else:
self.weights = column_or_1d(weights).reshape(1, len(weights))
assert (self.weights.shape[1] == self.n_base_estimators_)
# adjust probability by a factor for integrity (added to 1)
adjust_factor = self.weights.shape[1] / np.sum(weights)
self.weights = self.weights * adjust_factor
print(self.weights)
return self
def __len__(self):
"""Returns the number of estimators in the ensemble."""
return len(self.base_estimators)
def __getitem__(self, index):
"""Returns the index'th estimator in the ensemble."""
return self.base_estimators[index]
def __iter__(self):
"""Returns iterator over estimators in the ensemble."""
return iter(self.base_estimators)
# noinspection PyMethodParameters
def _get_param_names(cls):
# noinspection PyPep8
"""Get parameter names for the estimator
See http://scikit-learn.org/stable/modules/generated/sklearn.base.BaseEstimator.html
and sklearn/base.py for more information.
"""
# fetch the constructor or the original constructor before
# deprecation wrapping if any
init = getattr(cls.__init__, 'deprecated_original', cls.__init__)
if init is object.__init__:
# No explicit constructor to introspect
return []
# introspect the constructor arguments to find the model parameters
# to represent
init_signature = signature(init)
# Consider the constructor parameters excluding 'self'
parameters = [p for p in init_signature.parameters.values()
if p.name != 'self' and p.kind != p.VAR_KEYWORD]
for p in parameters:
if p.kind == p.VAR_POSITIONAL:
raise RuntimeError("scikit-learn estimators should always "
"specify their parameters in the signature"
" of their __init__ (no varargs)."
" %s with constructor %s doesn't "
" follow this convention."
% (cls, init_signature))
# Extract and sort argument names excluding 'self'
return sorted([p.name for p in parameters])
# noinspection PyPep8
[docs] def get_params(self, deep=True):
"""Get parameters for this estimator.
See http://scikit-learn.org/stable/modules/generated/sklearn.base.BaseEstimator.html
and sklearn/base.py for more information.
Parameters
----------
deep : boolean, optional
If True, will return the parameters for this estimator and
contained subobjects that are estimators.
Returns
-------
params : mapping of string to any
Parameter names mapped to their values.
"""
out = dict()
for key in self._get_param_names():
# We need deprecation warnings to always be on in order to
# catch deprecated param values.
# This is set in utils/__init__.py but it gets overwritten
# when running under python3 somehow.
warnings.simplefilter("always", DeprecationWarning)
try:
with warnings.catch_warnings(record=True) as w:
value = getattr(self, key, None)
if len(w) and w[0].category == DeprecationWarning:
# if the parameter is deprecated, don't show it
continue
finally:
warnings.filters.pop(0)
# XXX: should we rather test if instance of estimator?
if deep and hasattr(value, 'get_params'):
deep_items = value.get_params().items()
out.update((key + '__' + k, val) for k, val in deep_items)
out[key] = value
return out
[docs] def set_params(self, **params):
# noinspection PyPep8
"""Set the parameters of this estimator.
The method works on simple estimators as well as on nested objects
(such as pipelines). The latter have parameters of the form
``<component>__<parameter>`` so that it's possible to update each
component of a nested object.
See http://scikit-learn.org/stable/modules/generated/sklearn.base.BaseEstimator.html
and sklearn/base.py for more information.
Returns
-------
self : object
"""
if not params:
# Simple optimization to gain speed (inspect is slow)
return self
valid_params = self.get_params(deep=True)
nested_params = defaultdict(dict) # grouped by prefix
for key, value in params.items():
key, delim, sub_key = key.partition('__')
if key not in valid_params:
raise ValueError('Invalid parameter %s for estimator %s. '
'Check the list of available parameters '
'with `estimator.get_params().keys()`.' %
(key, self))
if delim:
nested_params[key][sub_key] = value
else:
setattr(self, key, value)
for key, sub_params in nested_params.items():
valid_params[key].set_params(**sub_params)
return self
def __repr__(self):
# noinspection PyPep8
"""
See http://scikit-learn.org/stable/modules/generated/sklearn.base.BaseEstimator.html
and sklearn/base.py for more information.
"""
class_name = self.__class__.__name__
return '%s(%s)' % (class_name, _pprint(self.get_params(deep=False),
offset=len(class_name), ),)