Source code for combo.models.classifier_des

# -*- coding: utf-8 -*-
"""Dynamic Classifier Selection (DES) is an established combination framework
for classification tasks.
"""
# Author: Yue Zhao <zhaoy@cmu.edu>
# License: BSD 2 clause

import warnings
import numpy as np

from sklearn.neighbors import KDTree
from sklearn.metrics import accuracy_score
from sklearn.utils import check_array
from sklearn.utils import check_X_y
from sklearn.utils.validation import check_is_fitted
from sklearn.utils.multiclass import check_classification_targets

from pyod.utils.utility import check_parameter
from pyod.utils.utility import argmaxn

from .base import BaseAggregator
from ..utils.utility import score_to_proba
from .classifier_comb import average
from .classifier_comb import majority_vote


[docs]class DES_LA(BaseAggregator): """Dynamic Ensemble Selection (DES) is an established combination framework for classification tasks. The technique was based on Dynamic Classifier Selection (DCS) proposed by Ho et al. in 1994 :cite:`ho1994decision`. The motivation behind this approach is that base classifiers often make distinctive errors and over a degree of complementarity. Consequently, selectively combining base classifier can result in a performance improvement over generic ensembles which use the majority vote of all base classifiers. Compared with DCS, DES uses a group of best classifiers to conduct a second phase combination, other than only the best classifier. The implemented version in this class is DES_LA which uses local accuracy as the metric for evaluating base classifier performance. `predict` uses (weighted) majority vote and `predict_proba` uses (weighted) average. See :cite:`ko2008dynamic` for details. Parameters ---------- base_estimators: list or numpy array (n_estimators,) A list of base classifiers. local_region_size : int, optional (default=30) Number of training points to consider in each iteration of the local region generation process (30 by default). n_selected_clfs : int, optional (default=None) Number of selected base classifiers in the second phase combination. If None, set it to 1/2 * n_base_estimators use_weights : bool, optional (default=False) If True, use the classifiers' performance on the local region as their weight. threshold : float in (0, 1), optional (default=None) Cut-off value to convert scores into binary labels. pre_fitted : bool, optional (default=False) Whether the base classifiers are trained. If True, `fit` process may be skipped. """ def __init__(self, base_estimators, local_region_size=30, n_selected_clfs=None, use_weights=False, threshold=None, pre_fitted=None): super(DES_LA, self).__init__( base_estimators=base_estimators, pre_fitted=pre_fitted) # validate input parameters if not isinstance(local_region_size, int): raise ValueError('local_region_size must be an integer variable') check_parameter(local_region_size, low=2, include_left=True, param_name='local_region_size') self.local_region_size = local_region_size if n_selected_clfs is None: self.n_selected_clfs = int(self.n_base_estimators_ * 0.5) else: if not isinstance(n_selected_clfs, int): raise ValueError('n_selected_clfs must be an integer variable') check_parameter(n_selected_clfs, low=1, high=self.n_base_estimators_, include_left=True, include_right=True, param_name='n_selected_clfs') self.n_selected_clfs = n_selected_clfs self.use_weights = use_weights if threshold is not None: warnings.warn( "DES does not support threshold setting option. " "Please set the threshold in classifiers directly.") if pre_fitted is not None: warnings.warn("DES does not support pre_fitted option.")
[docs] def fit(self, X, y): """Fit classifier. Parameters ---------- X : numpy array of shape (n_samples, n_features) The input samples. y : numpy array of shape (n_samples,), optional (default=None) The ground truth of the input samples (labels). """ # Validate inputs X and y X, y = check_X_y(X, y) X = check_array(X) check_classification_targets(y) self._classes = len(np.unique(y)) n_samples = X.shape[0] # save the train ground truth for evaluation purpose self.y_train_ = y # build KDTree out of training subspace self.tree_ = KDTree(X) self.y_train_predicted_ = np.zeros( [n_samples, self.n_base_estimators_]) # train all base classifiers on X, and get their local predicted scores # iterate over all base classifiers for i, clf in enumerate(self.base_estimators): clf.fit(X, y) self.y_train_predicted_[:, i] = clf.predict(X) clf.fitted_ = True self.fitted_ = True return
[docs] def predict(self, X): """Predict the class labels for the provided data. Parameters ---------- X : numpy array of shape (n_samples, n_features) The input samples. Returns ------- labels : numpy array of shape (n_samples,) Class labels for each data sample. """ return self._predict_internal(X, predict_proba=False)
[docs] def predict_proba(self, X): """Return probability estimates for the test data X. Parameters ---------- X : numpy array of shape (n_samples, n_features) The input samples. Returns ------- p : numpy array of shape (n_samples,) The class probabilities of the input samples. Classes are ordered by lexicographic order. """ return self._predict_internal(X, predict_proba=True)
def _predict_internal(self, X, predict_proba): """Internal function for predict and predict_proba Parameters ---------- X : numpy array of shape (n_samples, n_features) The input samples. predict_proba : bool if True, return the result of predict_proba Returns ------- """ check_is_fitted(self, ['fitted_']) X = check_array(X) n_samples = X.shape[0] # Find neighbors for all test instances _, ind_arr = self.tree_.query(X, k=self.local_region_size) if predict_proba: y_predicted = np.zeros([n_samples, self._classes]) else: y_predicted = np.zeros([n_samples, ]) # For each test sample for i in range(n_samples): test_sample = X[i, :].reshape(1, -1) train_inds = ind_arr[i, :] # ground truth y_train_sample = self.y_train_[train_inds] clf_performance = np.zeros([self.n_base_estimators_, ]) for j, clf in enumerate(self.base_estimators): y_train_clf = self.y_train_predicted_[train_inds, j] clf_performance[j] = accuracy_score(y_train_sample, y_train_clf) # print(clf_performance) # get the indices of the best performing clfs select_clf_inds = argmaxn(clf_performance, n=self.n_selected_clfs) select_clf_weights = clf_performance[select_clf_inds]. \ reshape(1, len(select_clf_inds)) # print(select_clf_inds) all_scores = np.zeros([1, len(select_clf_inds)]) all_proba = np.zeros([1, self._classes, len(select_clf_inds)]) for k, clf_ind in enumerate(select_clf_inds): clf = self.base_estimators[clf_ind] # make prediction if predict_proba: all_proba[:, :, k] = clf.predict_proba(test_sample) else: all_scores[:, k] = clf.predict(test_sample) # print('score', len(select_clf_inds), all_scores) if predict_proba: if self.use_weights: y_predicted[i] = np.mean(all_proba * select_clf_weights, axis=2) else: y_predicted[i] = np.mean(all_proba, axis=2) else: if self.use_weights: y_predicted[i] = majority_vote(all_scores, n_classes=self._classes, weights=select_clf_weights) else: y_predicted[i] = majority_vote(all_scores, n_classes=self._classes) if predict_proba: return score_to_proba(y_predicted) else: return y_predicted
[docs] def fit_predict(self, X, y): """Fit estimator and predict on X Parameters ---------- X : numpy array of shape (n_samples, n_features) The input samples. y : numpy array of shape (n_samples,), optional (default=None) The ground truth of the input samples (labels). Returns ------- labels : numpy array of shape (n_samples,) Class labels for each data sample. """ raise NotImplementedError( 'fit_predict should not be used in supervised learning models.')