# -*- coding: utf-8 -*-
"""A collection of methods for combining detectors
"""
# Author: Yue Zhao <zhaoy@cmu.edu>
# License: BSD 2 clause
import numpy as np
from sklearn.utils import check_array
from sklearn.utils import column_or_1d
from sklearn.utils.validation import check_is_fitted
from pyod.utils.utility import standardizer
from .base import BaseAggregator
from .score_comb import average, maximization, median
[docs]class SimpleDetectorAggregator(BaseAggregator):
"""A collection of simple detector combination methods.
Parameters
----------
base_estimators : list, length must be greater than 1
Base unsupervised outlier detectors from PyOD. (Note: requires fit and
decision_function methods)
method : str, optional (default='average')
Combination method: {'average', 'maximization',
'median'}. Pass in weights of detector for weighted version.
contamination : float in (0., 0.5), optional (default=0.1)
The amount of contamination of the data set,
i.e. the proportion of outliers in the data set. Used when fitting to
define the threshold on the decision function.
standardization : bool, optional (default=True)
If True, perform standardization first to convert
prediction score to zero mean and unit variance.
See http://scikit-learn.org/stable/auto_examples/preprocessing/plot_scaling_importance.html
weights : numpy array of shape (1, n_detectors)
detector weights.
pre_fitted : bool, optional (default=False)
Whether the base detectors are trained. If True, `fit`
process may be skipped.
Attributes
----------
decision_scores_ : numpy array of shape (n_samples,)
The outlier scores of the training data.
The higher, the more abnormal. Outliers tend to have higher
scores. This value is available once the detector is fitted.
threshold_ : float
The threshold is based on ``contamination``. It is the
``n_samples * contamination`` most abnormal samples in
``decision_scores_``. The threshold is calculated for generating
binary outlier labels.
labels_ : int, either 0 or 1
The binary labels of the training data. 0 stands for inliers
and 1 for outliers/anomalies. It is generated by applying
``threshold_`` on ``decision_scores_``.
"""
def __init__(self, base_estimators, method='average', contamination=0.1,
standardization=True, weights=None, pre_fitted=False):
super(SimpleDetectorAggregator, self).__init__(
base_estimators=base_estimators, pre_fitted=pre_fitted)
# validate input parameters
if method not in ['average', 'maximization', 'median']:
raise ValueError("{method} is not a valid parameter.".format(
method=method))
self.method = method
if not (0. < contamination <= 0.5):
raise ValueError("contamination must be in (0, 0.5], "
"got: %f" % contamination)
self.contamination = contamination
self.standardization = standardization
if weights is None:
self.weights = np.ones([1, self.n_base_estimators_])
else:
self.weights = column_or_1d(weights).reshape(1, len(weights))
assert (self.weights.shape[1] == self.n_base_estimators_)
# adjust probability by a factor for integrity
adjust_factor = self.weights.shape[1] / np.sum(weights)
self.weights = self.weights * adjust_factor
[docs] def fit(self, X, y=None):
"""Fit detector. y is optional for unsupervised methods.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
y : numpy array of shape (n_samples,), optional (default=None)
The ground truth of the input samples (labels).
Returns
-------
labels_ : numpy array of shape (n_samples,)
Return the generated labels.
"""
# Validate inputs X and y
X = check_array(X)
self._set_n_classes(y)
if self.pre_fitted:
print("Training skipped")
else:
for clf in self.base_estimators:
clf.fit(X, y)
clf.fitted_ = True
self.decision_scores_ = self._create_scores(X)
self._process_decision_scores()
return self
def _create_scores(self, X):
"""Internal function to generate and combine scores.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
Returns
-------
agg_score: numpy array of shape (n_samples,)
Aggregated scores.
"""
all_scores = np.zeros([X.shape[0], self.n_base_estimators_])
for i, clf in enumerate(self.base_estimators):
if hasattr(clf, 'decision_function'):
all_scores[:, i] = clf.decision_function(X)
else:
raise ValueError(
"{clf} does not have decision_function.".format(clf=clf))
if self.standardization:
all_scores = standardizer(all_scores)
if self.method == 'average':
agg_score = average(all_scores, estimator_weights=self.weights)
if self.method == 'maximization':
agg_score = maximization(all_scores)
if self.method == 'median':
agg_score = median(all_scores)
return agg_score
[docs] def decision_function(self, X):
"""Predict raw anomaly scores of X using the fitted detector.
The anomaly score of an input sample is computed based on the fitted
detector. For consistency, outliers are assigned with
higher anomaly scores.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples. Sparse matrices are accepted only
if they are supported by the base estimator.
Returns
-------
anomaly_scores : numpy array of shape (n_samples,)
The anomaly score of the input samples.
"""
check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])
X = check_array(X)
return self._create_scores(X)
[docs] def predict(self, X):
"""Predict if a particular sample is an outlier or not.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. 0 stands for inliers and 1 for outliers.
"""
check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])
X = check_array(X)
return self._detector_predict(X)
[docs] def predict_proba(self, X, proba_method='linear'):
"""Predict the probability of a sample being outlier. Two approaches
are possible:
1. simply use Min-max conversion to linearly transform the outlier
scores into the range of [0,1]. The model must be
fitted first.
2. use unifying scores, see :cite:`kriegel2011interpreting`.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
proba_method : str, optional (default='linear')
Probability conversion method. It must be one of
'linear' or 'unify'.
Returns
-------
outlier_labels : numpy array of shape (n_samples,)
For each observation, tells whether or not
it should be considered as an outlier according to the
fitted model. Return the outlier probability, ranging
in [0,1].
"""
check_is_fitted(self, ['decision_scores_', 'threshold_', 'labels_'])
X = check_array(X)
return self._detector_predict_proba(X, proba_method)
[docs] def fit_predict(self, X, y=None):
"""Fit estimator and predict on X. y is optional for unsupervised
methods.
Parameters
----------
X : numpy array of shape (n_samples, n_features)
The input samples.
y : numpy array of shape (n_samples,), optional (default=None)
The ground truth of the input samples (labels).
Returns
-------
labels : numpy array of shape (n_samples,)
Class labels for each data sample.
"""
self.fit(X)
return self.predict(X)