Source code for fsfc.generic.LFSBSS

import numpy as np
from sklearn.cluster import KMeans
from sklearn.metrics import euclidean_distances

from fsfc.base import BaseEstimator, ClusterMixin

[docs]class LFSBSS(BaseEstimator, ClusterMixin): """ Localised Feature Selection, Based on Scattered Separability. Selects features and simultaneously builds clustering in an iterative way. Every cluster has it's local set of selected features, and we project input data to a subspace defined by it to predict cluster of a point. This implementation doesn't take into account importance of overlay of clusters and unassigned points for the sake of performance. Based on the article `"Localized feature selection for clustering." <>`_. Algorithm builds clustering in the following way: 1. Find initial clustering using k-Means algorithm 2. For each cluster: 1. Find feature which can be dropped to improve the Scatter Separability Score 2. Recompute clusters without this feature 3. Find cluster that is the most similar to the current one 4. Compute normalized value of Scatter Separability Score for two clusters - current and new 5. If scores were improved, drop the feature and update clustering 3. Repeat step 2 until no changes were made 4. Return found clusters Algorithm predicts clusters for new points in the following way: 1. Project points to the feature subspace of each cluster 2. Find the cluster whose center is the closest to projected point Parameters ---------- clusters: int Number of clusters to find max_iterations: int (default 100) Maximal number of iterations of the algorithm """ def __init__(self, clusters, max_iterations=100): self.clusters = clusters self.labels_ = None self.features_ = None self.means_ = None self.vars_ = None self.max_iterations = max_iterations
[docs] def fit(self, x): """ Fit algorithm to dataset, find clusters and set of features for every cluster Parameters ---------- x: ndarray The dataset Returns ------- self: LFSBSS Returns itself to support chaining """ n_samples, n_features = x.shape # Build initial clustering k_means = KMeans(n_clusters=self.clusters) features = [set(range(n_features)) for _ in range(self.clusters)] clusters, means = self._find_clusters_and_means(k_means, x) for it in range(self.max_iterations): was_changed = False for i in range(self.clusters): cluster = clusters[i] mean = means[i] this_features = features[i] if len(this_features) == 1: # Can't drop anything continue # Find a feature which we can drop and have the highest scatter separability score max_score = None new_features = None for feature in this_features: without_feature = list(this_features - {feature}) score = self._compute_score(x, means, cluster, mean, without_feature) if max_score is None or score > max_score: max_score = score new_features = without_feature if new_features is None: # Nothing to remove from this cluster continue # Repartition dataset using new features, find a cluster that is the most similar to a current one new_x = x[:, new_features] new_clusters, new_means = self._find_clusters_and_means(k_means, new_x) # Use Jaccard difference as the measure of similarity max_score = None most_similar = None new_mean = None for j in range(self.clusters): new_cluster = clusters[j] score = LFSBSS._jaccard_score(new_cluster, cluster) if max_score is None or score > max_score: max_score = score most_similar = new_cluster new_mean = new_means[i] if most_similar is None or new_mean is None: # Nothing to select continue # Compute normalized value of scatter separability nv_old = self._compute_score(x, means, cluster, mean, list(this_features)) * \ self._compute_score(x, means, cluster, mean, new_features) nv_new = self._compute_score(x, means, most_similar, mean, list(this_features)) * \ self._compute_score(new_x, new_means, most_similar, new_mean, range(len(new_features))) if nv_new >= nv_old: # It's better to drop this feature was_changed = True features[i] = set(new_features) clusters[i] = most_similar # means[i] = new_mean if not was_changed: break self.features_ = features self.means_ = means self.vars_ = [None] * x.shape[0] # Compute variances for clusters for (idx, cluster) in enumerate(clusters): feature = np.array(list(features[idx]))[:, np.newaxis] self.vars_[idx] = np.var(x[cluster, feature]) self.labels_ = [None] * x.shape[0] for (idx, cluster) in enumerate(clusters): for sample in cluster: self.labels_[sample] = idx for i in range(x.shape[0]): if self.labels_[i] is None: self.labels_[i] = self.predict(x[i]) return self
[docs] def predict(self, x): """ Predict clusters for one sample Parameters ---------- x: ndarray Samples to predict Returns ------- label: int Predicted cluster """ # Find the closest cluster to samples # To do it, project x to appropriate subspace, find distance to mean value and norm by variance min_score = None closest = None for i in range(self.clusters): projection = x[:, self.features_[i]] norm = euclidean_distances(projection, self.means_[i]) score = norm / self.vars_[i] if min_score is None or score < min_score: min_score = score closest = i return closest
def _find_clusters_and_means(self, k_means, x): initial = k_means.fit_predict(x) clusters = [[] for _ in range(self.clusters)] means = k_means.cluster_centers_.copy() for (idx, c) in enumerate(initial): clusters[c].append(idx) return clusters, means @staticmethod def _compute_score(x, means, cluster, mean, features): cluster = np.array(cluster) features = np.array(features) means = means[:, features] x = x[:, features] mean = mean[features] total_mean = np.mean(x, axis=0) cluster_values = x[cluster, :] cluster_diff = cluster_values - mean in_cluster = LFSBSS._tensor_product_sum(cluster_diff) means_diff = means - total_mean between_cluster = LFSBSS._tensor_product_sum(means_diff) try: separability = np.trace(np.linalg.inv(in_cluster).dot(between_cluster)) except np.linalg.LinAlgError as _: separability = np.trace(between_cluster) / np.trace(in_cluster) return separability / cluster_values.shape[0] @staticmethod def _tensor_product_sum(x): res = np.zeros([x.shape[1], x.shape[1]]) for i in range(x.shape[0]): res = np.add(res, np.tensordot(x[i], x[i], axes=0)) return res @staticmethod def _jaccard_score(a, b): a = set(a) b = set(b) return 1.0 * len(a.intersection(b)) / len(a.union(b))