"""Locally Selective Combination of Parallel Outlier Ensembles (LSCP).
Implemented on PyOD library (https://github.com/yzhao062/pyod).
"""
# Author: Yue Zhao <zhaoy@cmu.edu>
# License: BSD 2 clause
import numpy as np
from sklearn.utils import check_array
from sklearn.utils import column_or_1d
from sklearn.utils.validation import check_is_fitted
from pyod.models.lscp import LSCP as PyOD_LSCP
from .base import BaseAggregator
[docs]class LSCP(BaseAggregator):
""" Locally Selection Combination in Parallel Outlier Ensembles
LSCP is an unsupervised parallel outlier detection ensemble which selects
competent detectors in the local region of a test instance. This
implementation uses an Average of Maximum strategy. First, a heterogeneous
list of base detectors is fit to the training data and then generates a
pseudo ground truth for each train instance is generated by
taking the maximum outlier score.
For each test instance:
1) The local region is defined to be the set of nearest training points in
randomly sampled feature subspaces which occur more frequently than
a defined threshold over multiple iterations.
2) Using the local region, a local pseudo ground truth is defined and the
pearson correlation is calculated between each base detector's training
outlier scores and the pseudo ground truth.
3) A histogram is built out of pearson correlation scores; detectors in
the largest bin are selected as competent base detectors for the given
test instance.
4) The average outlier score of the selected competent detectors is taken
to be the final score.
See :cite:`zhao2019lscp` for details.
Parameters
----------
base_estimators : list, length must be greater than 1
Base unsupervised outlier detectors from PyOD. (Note: requires fit and
decision_function methods)
local_region_size : int, optional (default=30)
Number of training points to consider in each iteration of the local
region generation process (30 by default).
local_max_features : float in (0.5, 1.), optional (default=1.0)
Maximum proportion of number of features to consider when defining the
local region (1.0 by default).
n_bins : int, optional (default=10)
Number of bins to use when selecting the local region
random_state : RandomState, optional (default=None)
A random number generator instance to define the state of the random
permutations generator.
contamination : float in (0., 0.5), optional (default=0.1)
The amount of contamination of the data set, i.e.
the proportion of outliers in the data set. Used when fitting to
define the threshold on the decision function (0.1 by default).
pre_fitted: bool, optional (default=False)
Whether the base estimators are trained. If True, `fit`
process may be skipped.
Attributes
----------
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is fitted.
threshold_ : float
The threshold is based on ``contamination``. It is the
``n_samples * contamination`` most abnormal samples in
``decision_scores_``. The threshold is calculated for generating
binary outlier labels.
labels_ : int, either 0 or 1
The binary labels of the training data. 0 stands for inliers
and 1 for outliers/anomalies. It is generated by applying
``threshold_`` on ``decision_scores_``.
"""
def __init__(self, base_estimators, local_region_size=30,
local_max_features=1.0, n_bins=10,
random_state=None, contamination=0.1, pre_fitted=False):
super(LSCP, self).__init__(base_estimators=base_estimators,
pre_fitted=pre_fitted)
if not (0. < contamination <= 0.5):
raise ValueError("contamination must be in (0, 0.5], "
"got: %f" % contamination)
self.contamination = contamination
self.base_estimators = base_estimators
self.local_region_size = local_region_size
self.local_max_features = local_max_features
self.n_bins = n_bins
self.random_state = random_state
[docs] def fit(self, X, y=None):
"""Fit detector. y is optional for unsupervised methods.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
y : numpy array of shape (n_samples,), optional (default=None)
The ground truth of the input samples (labels).
"""
# Validate inputs X and y
X = check_array(X)
self._set_n_classes(y)
self.detector_ = PyOD_LSCP(detector_list=self.base_estimators,
local_region_size=self.local_region_size,
local_max_features=self.local_max_features,
n_bins=self.n_bins,
random_state=self.random_state,
contamination=self.contamination)
self.detector_.fit(X)
self.decision_scores_ = self.detector_.decision_scores_
self._process_decision_scores()
return self
[docs] def decision_function(self, X):
"""Predict raw anomaly scores of X using the fitted detector.
The anomaly score of an input sample is computed based on the fitted
detector. For consistency, outliers are assigned with
higher anomaly scores.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples. Sparse matrices are accepted only
if they are supported by the base estimator.
Returns
-------
anomaly_scores : numpy array of shape (n_samples,)
The anomaly score of the input samples.
"""
check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])
X = check_array(X)
return self.detector_.decision_function(X)
[docs] def predict(self, X):
"""Predict if a particular sample is an outlier or not.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. 0 stands for inliers and 1 for outliers.
"""
check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])
X = check_array(X)
return self._detector_predict(X)
[docs] def predict_proba(self, X, proba_method='linear'):
"""Predict the probability of a sample being outlier. Two approaches
are possible:
1. simply use Min-max conversion to linearly transform the outlier
scores into the range of [0,1]. The model must be
fitted first.
2. use unifying scores, see :cite:`kriegel2011interpreting`.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
proba_method : str, optional (default='linear')
Probability conversion method. It must be one of
'linear' or 'unify'.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. Return the outlier probability, ranging
in [0,1].
"""
check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])
X = check_array(X)
return self._detector_predict_proba(X, proba_method)
[docs] def fit_predict(self, X, y=None):
"""Fit estimator and predict on X. y is optional for unsupervised
methods.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
y : numpy array of shape (n_samples,), optional (default=None)
The ground truth of the input samples (labels).
Returns
-------
labels : numpy array of shape (n_samples,)
Class labels for each data sample.
"""
self.fit(X)
return self.predict(X)