Source code for olliepy.RegressionErrorAnalysisReport

import time
from itertools import product
from typing import List, Dict, Tuple, Union

import pandas as pd
from scipy.spatial.distance import cosine
from scipy.stats import ks_2samp, wasserstein_distance
from sklearn.preprocessing import LabelEncoder
from typeguard import typechecked

from .Report import Report


def validate_create_report_attributes(enable_patterns_report: bool,
                                      patterns_report_group_by_categorical_features: Union[str, List[str]],
                                      patterns_report_group_by_numerical_features: Union[str, List[str]],
                                      patterns_report_number_of_bins: Union[int, List[int]],
                                      enable_parallel_coordinates_plot: bool,
                                      cosine_similarity_threshold: float,
                                      parallel_coordinates_q1_threshold: float,
                                      parallel_coordinates_q2_threshold: float,
                                      parallel_coordinates_features: Union[str, List[str]],
                                      categorical_features: List[str],
                                      numerical_features: List[str],
                                      all_features: List[str]):
    if type(enable_patterns_report) is not bool:
        raise TypeError('provided enable_patterns_report is not valid. enable_patterns_report has to be a bool')

    if type(patterns_report_group_by_categorical_features) is str \
            and patterns_report_group_by_categorical_features != 'all':
        raise AttributeError('''provided patterns_report_group_by_categorical_features is not valid.
            patterns_report_group_by_categorical_features has to be "all" if the provided value is a string''')

    if type(patterns_report_group_by_numerical_features) is str \
            and patterns_report_group_by_numerical_features != 'all':
        raise AttributeError('''provided patterns_report_group_by_numerical_features is not valid.
            patterns_report_group_by_numerical_features has to be "all" if the provided value is a string''')

    if type(patterns_report_group_by_categorical_features) is list \
            and len(patterns_report_group_by_categorical_features) > 0:
        unknown_features = [feature for feature in patterns_report_group_by_categorical_features if
                            feature not in categorical_features]
        if len(unknown_features) > 0:
            raise AttributeError(f'''provided patterns_report_group_by_categorical_features is not valid.
            these features {unknown_features} do not exist in the categorical features''')

    if type(patterns_report_group_by_numerical_features) is list \
            and len(patterns_report_group_by_numerical_features) > 0:
        unknown_features = [feature for feature in patterns_report_group_by_numerical_features if
                            feature not in numerical_features]
        if len(unknown_features) > 0:
            raise AttributeError(f'''provided patterns_report_group_by_numerical_features is not valid.
            these features {unknown_features} do not exist in the numerical features''')

    if type(patterns_report_number_of_bins) is list \
            and type(patterns_report_group_by_numerical_features) is str:
        raise AttributeError('''provided patterns_report_number_of_bins is not valid.
        patterns_report_number_of_bins can be a list of ints if a list of numerical features were provided in patterns_report_group_by_numerical_features''')

    if type(patterns_report_number_of_bins) is list \
            and type(patterns_report_group_by_numerical_features) is list:
        if len(patterns_report_number_of_bins) != len(patterns_report_group_by_numerical_features):
            raise AttributeError('''provided patterns_report_number_of_bins is not valid.
            patterns_report_number_of_bins list length has to be equal to the number of features provided in patterns_report_group_by_numerical_features''')

    if type(enable_parallel_coordinates_plot) is not bool:
        raise TypeError(
            'provided enable_parallel_coordinates_plot is not valid. enable_parallel_coordinates_plot has to be a bool')

    if type(cosine_similarity_threshold) is not float:
        raise TypeError(
            'provided cosine_similarity_threshold is not valid. cosine_similarity_threshold has to be a float')

    if cosine_similarity_threshold <= 0.0 or cosine_similarity_threshold >= 1.0:
        raise AttributeError(
            'provided cosine_similarity_threshold is not valid. cosine_similarity_threshold has to be between 0.0 and 1.0')

    if type(parallel_coordinates_q1_threshold) is not float:
        raise TypeError(
            'provided parallel_coordinates_q1_threshold is not valid. parallel_coordinates_q1_threshold has to be a float')

    if type(parallel_coordinates_q2_threshold) is not float:
        raise TypeError(
            'provided parallel_coordinates_q2_threshold is not valid. parallel_coordinates_q2_threshold has to be a float')

    if parallel_coordinates_q1_threshold <= 0.0 or parallel_coordinates_q1_threshold >= 1.0:
        raise AttributeError(
            'provided parallel_coordinates_q1_threshold is not valid. parallel_coordinates_q1_threshold has to be between 0.0 and 1.0')

    if parallel_coordinates_q2_threshold <= 0.0 or parallel_coordinates_q2_threshold >= 1.0:
        raise AttributeError(
            'provided parallel_coordinates_q2_threshold is not valid. parallel_coordinates_q2_threshold has to be between 0.0 and 1.0')

    if parallel_coordinates_q2_threshold <= parallel_coordinates_q1_threshold:
        raise AttributeError('''provided parallel_coordinates_q1_threshold and parallel_coordinates_q2_threshold are not valid.
                parallel_coordinates_q2_threshold has to greater than parallel_coordinates_q1_threshold''')

    if type(parallel_coordinates_features) is str and parallel_coordinates_features != 'auto':
        raise AttributeError('''provided parallel_coordinates_features is not valid.
                parallel_coordinates_features has to be "auto" if the provided value is a string''')

    if type(parallel_coordinates_features) is list and len(parallel_coordinates_features) > 0:
        unknown_features = [feature for feature in parallel_coordinates_features if feature not in all_features]
        if len(unknown_features) > 0:
            raise AttributeError(f'''provided parallel_coordinates_features is not valid.
            these features {unknown_features} do not exist in the dataframe''')

    if type(parallel_coordinates_features) is list and len(parallel_coordinates_features) < 2:
        raise AttributeError(f'''provided parallel_coordinates_features is not valid.
            parallel_coordinates_features has to contain at least two features to plot''')


def validate_attributes(train_df, test_df, target_feature_name, error_column_name,
                        error_classes, acceptable_error_class, numerical_features, categorical_features):
    if type(train_df) is not pd.DataFrame:
        raise TypeError('provided train_df is not valid. train_df has to be a pandas dataframe')

    if type(test_df) is not pd.DataFrame:
        raise TypeError('provided test_df is not valid. test_df has to be a pandas dataframe')

    train_columns = train_df.columns.to_list()
    test_columns = test_df.columns.to_list()

    if type(target_feature_name) is not str:
        raise TypeError(f'''provided target_feature_name is not valid.
                            \ntarget_feature_name ({target_feature_name}) has to be a str''')

    if target_feature_name not in train_columns:
        raise AttributeError(f'provided target_feature_name ({target_feature_name}) is not train_df')

    if target_feature_name not in test_columns:
        raise AttributeError(f'provided target_feature_name ({target_feature_name}) is not test_df')

    if type(error_column_name) is not str:
        raise TypeError(f'''provided error_column_name is not valid.
                            \ntest_error_column_name ({error_column_name}) has to be a str''')

    if error_column_name not in train_columns:
        raise AttributeError(f'provided error_column_name ({error_column_name}) is not train_df')

    if error_column_name not in test_columns:
        raise AttributeError(f'provided error_column_name ({error_column_name}) is not test_df')

    if acceptable_error_class is not None and type(acceptable_error_class) is not str:
        raise TypeError(f'''provided acceptable_error_class is not valid.
                            \nacceptable_error_class ({acceptable_error_class}) has to be a str or None''')

    if acceptable_error_class is not None and acceptable_error_class not in error_classes:
        raise AttributeError(f'''provided acceptable_error_class is not valid.
                            \n{acceptable_error_class} has to be defined in error_classes''')

    if numerical_features is None and categorical_features is None:
        raise AttributeError('''both numerical_features and categorical_features are not defined.
                                \nyou need to provide one of them or both in order to proceed.''')


def _cosine_similarity(vector_a, vector_b):
    return 1.0 - cosine(vector_a, vector_b)


[docs]@typechecked class RegressionErrorAnalysisReport(Report): """ RegressionErrorAnalysisReport creates a report that analyzes the error in regression problems. Attributes ---------- title : str the title of the report output_directory : str the directory where the report folder will be created train_df : pd.DataFrame the training pandas dataframe of the regression problem which should include the target feature test_df : pd.DataFrame the testing pandas dataframe of the regression problem which should include the target feature and the error column in order to calculate the error class target_feature_name : str the name of the regression target feature error_column_name : str the name of the calculated error column 'Prediction - Target' (see example on github for more information) error_classes : Dict[str, Tuple] a dictionary containing the definition of the error classes that will be created. The key is the error_class name and the value is the minimum (inclusive) and maximum (exclusive) which will be used to calculate the error_class of the test observations. For example: error_classes = { 'EXTREME_UNDER_ESTIMATION': (-8.0, -4.0), returns 'EXTREME_UNDER_ESTIMATION' if -8.0 <= error < -4.0 'HIGH_UNDER_ESTIMATION': (-4.0, -3.0), returns 'HIGH_UNDER_ESTIMATION' if -4.0 <= error < -3.0 'MEDIUM_UNDER_ESTIMATION': (-3.0, -1.0), returns 'MEDIUM_UNDER_ESTIMATION' if -3.0 <= error < -1.0 'LOW_UNDER_ESTIMATION': (-1.0, -0.5), returns 'LOW_UNDER_ESTIMATION' if -1.0 <= error < -0.5 'ACCEPTABLE': (-0.5, 0.5), returns 'ACCEPTABLE' if -0.5 <= error < 0.5 'OVER_ESTIMATING': (0.5, 3.0) } returns 'OVER_ESTIMATING' if -0.5 <= error < 3.0 acceptable_error_class: str the name of the acceptable error class that was defined in error_classes numerical_features : List[str] default=None a list of the numerical features to be included in the report categorical_features : List[str] default=None a list of the categorical features to be included in the report subtitle : str default=None an optional subtitle to describe your report report_folder_name : str default=None the name of the folder that will contain all the generated report files. If not set, the title of the report will be used. encryption_secret : str default=None the 16 characters secret that will be used to encrypt the generated report data. If it is not set, the generated data won't be encrypted. generate_encryption_secret : bool default=False the encryption_secret will be generated and its value returned as output. you can also view encryption_secret to get the generated secret. Methods ------- create_report() creates the error analysis report """ def __init__(self, title: str, output_directory: str, train_df: pd.DataFrame, test_df: pd.DataFrame, target_feature_name: str, error_column_name: str, error_classes: Dict[str, Tuple[float, float]], acceptable_error_class: str, numerical_features: List[str] = None, categorical_features: List[str] = None, subtitle: str = None, report_folder_name: str = None, encryption_secret: str = None, generate_encryption_secret: bool = False): super().__init__(title, output_directory, subtitle, report_folder_name, encryption_secret, generate_encryption_secret) validate_attributes(train_df, test_df, target_feature_name, error_column_name, error_classes, acceptable_error_class, numerical_features, categorical_features) self.train_df = train_df.copy() self.test_df = test_df.copy() self.target_feature_name = target_feature_name self.error_column_name = error_column_name self.error_classes = error_classes.copy() self.acceptable_error_class = acceptable_error_class self.numerical_features = numerical_features[:] self.categorical_features = categorical_features[:] self._training_data_name = 'Training data' self._testing_data_name = 'Testing data' self._error_class_col_name = 'ERROR_CLASS' self._primary_datasets = [self._training_data_name, self.acceptable_error_class] self._secondary_datasets = [self._testing_data_name] self._secondary_datasets.extend(list(self.error_classes.keys())) self._template_name = 'regression-error-analysis-report'
[docs] @typechecked def create_report(self, enable_patterns_report: bool = True, patterns_report_group_by_categorical_features: Union[str, List[str]] = 'all', patterns_report_group_by_numerical_features: Union[str, List[str]] = 'all', patterns_report_number_of_bins: Union[int, List[int]] = 10, enable_parallel_coordinates_plot: bool = True, cosine_similarity_threshold: float = 0.8, parallel_coordinates_q1_threshold: float = 0.25, parallel_coordinates_q2_threshold: float = 0.75, parallel_coordinates_features: Union[str, List[str]] = 'auto') -> None: """ Creates a report using the user defined data and the data calculated based on the error. :param enable_patterns_report: enables the patterns report. default: True :param patterns_report_group_by_categorical_features: categorical features to use in the patterns report. default: 'all' :param patterns_report_group_by_numerical_features: numerical features to use in the patterns report. default: 'all' :param patterns_report_number_of_bins: number of bins to use for each provided numerical feature or one number of bins to use for all provided numerical features. default: 10 :param enable_parallel_coordinates_plot: enables the parallel coordinates plot. default: True :param cosine_similarity_threshold: The cosine similarity threshold to decide if the categorical distribution of the primary and secondary datasets are similar. :param parallel_coordinates_q1_threshold: the first quantile threshold to be used if parallel_coordinates_features == 'auto'. default: 0.25 :param parallel_coordinates_q2_threshold: the second quantile threshold to be used if parallel_coordinates_features == 'auto'. default: 0.75 :param parallel_coordinates_features: The list of features to display on the parallel coordinates plot. default: 'auto' - If parallel_coordinates_features is set to 'auto', OlliePy will select the features with a distribution shift based on 3 thresholds: - cosine_similarity_threshold to be used to select categorical features if the cosine_similarity is lower than the threshold. - parallel_coordinates_q1_threshold and parallel_coordinates_q2_threshold which are two quantile values. if primary_quantile_1 >= secondary_quantile_2 or secondary_quantile_1 >= primary_quantile_2 then the numerical feature is selected and will be added to the plot. :return: None """ self.report_data['report'] = {} validate_create_report_attributes(enable_patterns_report, patterns_report_group_by_categorical_features, patterns_report_group_by_numerical_features, patterns_report_number_of_bins, enable_parallel_coordinates_plot, cosine_similarity_threshold, parallel_coordinates_q1_threshold, parallel_coordinates_q2_threshold, parallel_coordinates_features, self.categorical_features, self.numerical_features, self.train_df.columns.tolist()) tic = time.perf_counter() self._add_user_defined_data() self._add_error_class_to_test_df() self._add_datasets() self._add_statistical_tests(cosine_similarity_threshold) if self.categorical_features is not None and len(self.categorical_features) > 0: self._add_categorical_count_plot() if enable_parallel_coordinates_plot: self._add_parallel_coordinates_plot(cosine_similarity_threshold, parallel_coordinates_q1_threshold, parallel_coordinates_q2_threshold, parallel_coordinates_features) if enable_patterns_report: self._find_and_add_all_secondary_datasets_patterns(patterns_report_group_by_categorical_features, patterns_report_group_by_numerical_features, patterns_report_number_of_bins) toc = time.perf_counter() print(f"The report was created in {toc - tic:0.4f} seconds") if self.encryption_secret: print(f'Your encryption secret is {self.encryption_secret}')
def _add_user_defined_data(self) -> None: """ Adds user defined data to the report. :return: None """ self._update_report({'primaryDatasets': self._primary_datasets}) self._update_report({'secondaryDatasets': self._secondary_datasets}) if self.numerical_features: if self.target_feature_name not in self.numerical_features: self.numerical_features.append(self.target_feature_name) self._update_report({'numericalFeatures': self.numerical_features}) if self.categorical_features: self._update_report({'categoricalFeatures': self.categorical_features}) self._update_report({'targetFeature': self.target_feature_name}) def _add_error_class_to_test_df(self) -> None: """ adds the error class to each observation in the test set (test_df) based on the error classes provided by the user. :return: None """ def add_error_class(error: float) -> str: for error_class, min_max in self.error_classes.items(): minimum, maximum = min_max if minimum <= error < maximum: return error_class return 'UNDEFINED_ERROR_CLASS' self.test_df[self._error_class_col_name] = self.test_df[self.error_column_name].apply(add_error_class) def _add_datasets(self) -> None: """ Adds datasets to reports (info, stats, numerical data). :return: None """ datasets_dict = {} def add_dataset(df: pd.DataFrame, dataset_name: str) -> None: """ Adds a dataset stats and data to the datasets_dict. :param df: pd.DataFrame, the selected dataset dataframe :param dataset_name: str, the dataset name :return: None """ stats = {} data = {} if self.numerical_features is not None and len(self.numerical_features) > 0: for feature in self.numerical_features: stats[feature] = { 'min': df.loc[:, feature].min(), 'mean': df.loc[:, feature].mean(), 'std': df.loc[:, feature].std(), 'median': df.loc[:, feature].median(), 'max': df.loc[:, feature].max(), 'count': int(df.loc[:, feature].count()), 'missingCount': int(df.loc[:, feature].isna().sum()), } data[feature] = df.loc[:, feature].values.tolist() if self.categorical_features is not None and len(self.categorical_features) > 0: for feature in self.categorical_features: stats[feature] = { 'uniqueCount': int(df.loc[:, feature].nunique()), 'missingCount': int(df.loc[:, feature].isna().sum()) } dataset_dict = {dataset_name: { 'info': { 'name': dataset_name, 'numberOfRows': df.shape[0], 'minError': df.loc[:, self.error_column_name].min(), 'meanError': df.loc[:, self.error_column_name].mean(), 'stdError': df.loc[:, self.error_column_name].std(), 'medianError': df.loc[:, self.error_column_name].median(), 'maxError': df.loc[:, self.error_column_name].max(), 'errors': df.loc[:, self.error_column_name].tolist(), 'stats': stats }, 'data': data }} datasets_dict.update(dataset_dict) add_dataset(self.train_df, self._training_data_name) add_dataset(self.test_df, self._testing_data_name) for error_class_name in self.error_classes.keys(): selected_df = self.test_df.loc[self.test_df[self._error_class_col_name] == error_class_name, :] add_dataset(selected_df, error_class_name) self._update_report({'datasets': datasets_dict}) def _count_categories_and_merge_count_dataframes(self, feature_name: str, primary_dataset: str, secondary_dataset: str, normalize=False) -> pd.DataFrame: """ It counts the different categories (of the provided feature) for the primary and secondary dataset then merge the count dataframes into a single dataframe that contains all the categories. It also fills missing values with 0. :param feature_name: the feature name :param primary_dataset: the primary dataset name :param secondary_dataset: the secondary dataset name :param normalize: whether to normalizr the categorical count, default:False :return: the merged dataframe """ if primary_dataset == self._training_data_name: primary_count_df = self.train_df.loc[:, feature_name].value_counts(normalize=normalize) else: primary_count_df = self.test_df.loc[ self.test_df[self._error_class_col_name] == primary_dataset, feature_name].value_counts( normalize=normalize) if secondary_dataset == self._testing_data_name: secondary_count_df = self.test_df.loc[:, feature_name].value_counts(normalize=normalize) else: secondary_count_df = self.test_df.loc[ self.test_df[self._error_class_col_name] == secondary_dataset, feature_name].value_counts( normalize=normalize) primary_count_df = primary_count_df.reset_index() \ .rename({feature_name: primary_dataset, 'index': feature_name}, axis=1) secondary_count_df = secondary_count_df.reset_index() \ .rename({feature_name: secondary_dataset, 'index': feature_name}, axis=1) merged_cat_count = primary_count_df.merge(secondary_count_df, on=feature_name, how='outer').fillna( 0).sort_values(by=primary_dataset, ascending=False) return merged_cat_count def _add_categorical_count_plot(self) -> None: """ Add the categorical count plots (stacked bar plot) data to the report :return: None """ def add_categorical_count_data(feature_dictionary: Dict, feature_name: str, primary_dataset: str, secondary_dataset: str) -> None: """ Calculate the value counts for each dataset and for that particular categorical feature. Then groups the value_counts() dataframes afterwards it computes the data needed for the stacked bar plot in plotly. :param feature_dictionary: the feature dictionary that will be added the categorical count plot data :param feature_name: the feature name :param primary_dataset: the primary dataset name :param secondary_dataset: the secondary dataset name :return: None """ merged_cat_count = self._count_categories_and_merge_count_dataframes(feature_name, primary_dataset, secondary_dataset, normalize=False) key = f'{primary_dataset}_{secondary_dataset}' title = f'{primary_dataset} vs {secondary_dataset}' categories = merged_cat_count.loc[:, feature_name].tolist() primary_data = merged_cat_count.loc[:, primary_dataset].tolist() secondary_data = merged_cat_count.loc[:, secondary_dataset].tolist() feature_dictionary.update({key: { 'title': title, 'categories': categories, 'series': [ { 'name': primary_dataset, 'color': '#8180FF', 'data': primary_data }, { 'name': secondary_dataset, 'color': '#FF938D', 'data': secondary_data } ] }}) categorical_count_dict = {} for feature in self.categorical_features: feature_dict = {} for primary_dataset_name, secondary_dataset_name in product(self._primary_datasets, self._secondary_datasets): if primary_dataset_name != secondary_dataset_name: add_categorical_count_data(feature_dict, feature, primary_dataset_name, secondary_dataset_name) categorical_count_dict.update({feature: feature_dict}) self._update_report({'categorical_count_plots': categorical_count_dict}) def _get_primary_secondary_datasets(self, primary_dataset: str, secondary_dataset: str) -> Tuple[ pd.DataFrame, pd.DataFrame]: """ Finds the correct primary and secondary datasets and return them. :param primary_dataset: the name of the primary dataset :param secondary_dataset: the name of the secondary dataset :return: primary_df, secondary_df """ if primary_dataset == self._training_data_name: primary_df = self.train_df.copy() primary_df.loc[:, self._error_class_col_name] = self._training_data_name else: primary_df = self.test_df.loc[self.test_df[self._error_class_col_name] == primary_dataset, :].copy() if secondary_dataset == self._testing_data_name: secondary_df = self.test_df.copy() secondary_df.loc[:, self._error_class_col_name] = self._testing_data_name else: secondary_df = self.test_df.loc[self.test_df[self._error_class_col_name] == secondary_dataset, :].copy() return primary_df, secondary_df def _add_parallel_coordinates_plot(self, cosine_similarity_threshold, parallel_coordinates_q1_threshold, parallel_coordinates_q2_threshold, parallel_coordinates_features) -> None: """ Check for suitable features (numerical based on quantiles(default: 0.25, 0.75) and categorical based on cosine similarity). Afterwards it adds the needed data for the plotly parallel coordinates plot. :param cosine_similarity_threshold: the cosine similarity threshold for the categorical features :param parallel_coordinates_q1_threshold: the first quantile threshold to be used if parallel_coordinates_features == 'auto'. default: 0.25 :param parallel_coordinates_q2_threshold: the second quantile threshold to be used if parallel_coordinates_features == 'auto'. default: 0.75 :param parallel_coordinates_features: The list of features to display on the parallel coordinates plot. default: 'auto' - If parallel_coordinates_features is set to 'auto', OlliePy will select the features with a distribution shift based on 3 thresholds: - cosine_similarity_threshold to be used to select categorical features if the cosine_similarity is lower than the threshold. - parallel_coordinates_q1_threshold and parallel_coordinates_q2_threshold which are two quantile values. if primary_quantile_1 >= secondary_quantile_2 or secondary_quantile_1 >= primary_quantile_2 then the numerical feature is selected and will be added to the plot. :return: """ def add_parallel_coordinates(parallel_coordinates_dictionary: Dict, primary_dataset: str, secondary_dataset: str) -> None: """ Decides which features will be added to the parallel coordinates plot based on predefined thresholds. Then prepares the data that is expected by the plotly parallel coordinates plot. :param parallel_coordinates_dictionary: the parallel coordinates data dictionary :param primary_dataset: the name of the primary dataset :param secondary_dataset: the name of the secondary dataset :return: None """ selected_features = [] if parallel_coordinates_features == 'auto' else parallel_coordinates_features first_quantile_threshold = parallel_coordinates_q1_threshold second_quantile_threshold = parallel_coordinates_q2_threshold primary_df, secondary_df = self._get_primary_secondary_datasets(primary_dataset, secondary_dataset) if self.categorical_features is not None and parallel_coordinates_features == 'auto': for categorical_feature in self.categorical_features: merged_cat_count = self._count_categories_and_merge_count_dataframes(categorical_feature, primary_dataset, secondary_dataset, normalize=True) primary_vector = merged_cat_count.loc[:, primary_dataset].tolist() secondary_vector = merged_cat_count.loc[:, secondary_dataset].tolist() cosine_similarity = _cosine_similarity(primary_vector, secondary_vector) if cosine_similarity < cosine_similarity_threshold: selected_features.append(categorical_feature) if self.numerical_features is not None and parallel_coordinates_features == 'auto': for numerical_feature in self.numerical_features: primary_q_1 = primary_df.loc[:, numerical_feature].quantile(first_quantile_threshold) primary_q_2 = primary_df.loc[:, numerical_feature].quantile(second_quantile_threshold) secondary_q_1 = secondary_df.loc[:, numerical_feature].quantile(first_quantile_threshold) secondary_q_2 = secondary_df.loc[:, numerical_feature].quantile(second_quantile_threshold) if primary_q_1 >= secondary_q_2 or secondary_q_1 >= primary_q_2: selected_features.append(numerical_feature) if len(selected_features) > 0: key = f'{primary_dataset}_{secondary_dataset}' combined_df = pd.concat([primary_df, secondary_df], axis=0).copy() colors = combined_df.loc[:, self._error_class_col_name].apply( lambda error_class: 0 if error_class == primary_dataset else 1).tolist() dimensions = [] for feature in selected_features: if self.numerical_features is not None and feature in self.numerical_features: feature_min = combined_df.loc[:, feature].min() feature_max = combined_df.loc[:, feature].max() dimensions.append({ 'range': [feature_min, feature_max], 'label': feature, 'values': combined_df.loc[:, feature].tolist() }) elif self.categorical_features is not None and feature in self.categorical_features: label_encoder = LabelEncoder() values = label_encoder.fit_transform(combined_df.loc[:, feature]) values_range = [int(values.min()), int(values.max())] tick_values = label_encoder.transform(label_encoder.classes_).tolist() tick_text = label_encoder.classes_.tolist() dimensions.append({ 'range': values_range, 'label': feature, 'values': values.tolist(), 'tickvals': tick_values, 'ticktext': tick_text }) if len(dimensions) > 1: parallel_coordinates_dictionary.update({key: { 'primaryDatasetName': primary_dataset, 'secondaryDatasetName': secondary_dataset, 'colors': colors, 'dimensions': dimensions }}) parallel_coordinates_dict = {} for primary_dataset_name, secondary_dataset_name in product(self._primary_datasets, self._secondary_datasets): if primary_dataset_name != secondary_dataset_name: add_parallel_coordinates(parallel_coordinates_dict, primary_dataset_name, secondary_dataset_name) if len(parallel_coordinates_dict) > 0: self._update_report({'parallel_coordinates': parallel_coordinates_dict}) def _add_statistical_tests(self, cosine_similarity_threshold) -> None: """ Calculates and adds statistical tests to the report data. :param cosine_similarity_threshold: the cosine similarity threshold for the categorical features :return: None """ def add_statistical_test(statistical_tests_dictionary: Dict, primary_dataset: str, secondary_dataset: str) -> None: """ Calculates statistical tests (ks_2samp) and metrics (wasserstein distance, cosine similarity) then adds the results to the dictionary. :param statistical_tests_dictionary: the statistical tests data dictionary :param primary_dataset: the name of the primary data set :param secondary_dataset: the name of the secondary data set :return: None """ primary_df, secondary_df = self._get_primary_secondary_datasets(primary_dataset, secondary_dataset) key = f'{primary_dataset}_{secondary_dataset}' tests_dictionary = {key: {}} p_value_threshold = 0.01 if self.numerical_features is not None: for numerical_feature in self.numerical_features: primary_values = primary_df.loc[:, numerical_feature].values secondary_values = secondary_df.loc[:, numerical_feature].values p_value = ks_2samp(primary_values, secondary_values)[1] wasser_distance = wasserstein_distance(secondary_values, primary_values) tests_dictionary[key].update({ numerical_feature: { 'ks_2samp': { 'p_value': p_value, 'p_value_threshold': p_value_threshold }, 'wasserstein_distance': wasser_distance } }) if self.categorical_features is not None: for categorical_feature in self.categorical_features: if primary_dataset != secondary_dataset: merged_cat_count = self._count_categories_and_merge_count_dataframes(categorical_feature, primary_dataset, secondary_dataset, normalize=True) primary_vector = merged_cat_count.loc[:, primary_dataset].tolist() secondary_vector = merged_cat_count.loc[:, secondary_dataset].tolist() cosine_similarity = _cosine_similarity(primary_vector, secondary_vector) else: cosine_similarity = 1.0 tests_dictionary[key].update({ categorical_feature: { 'cosine_similarity': { 'cosine_similarity': cosine_similarity, 'cosine_similarity_threshold': cosine_similarity_threshold } } }) statistical_tests_dictionary.update(tests_dictionary) statistical_tests_dict = {} for primary_dataset_name, secondary_dataset_name in product(self._primary_datasets, self._secondary_datasets): add_statistical_test(statistical_tests_dict, primary_dataset_name, secondary_dataset_name) self._update_report({'statistical_tests': statistical_tests_dict})
[docs] def serve_report_from_local_server(self, mode: str = 'server', port: int = None) -> None: """ Serve the report to the user using a web server. Available modes: - 'server': will open a new tab in the default browser using webbrowser package - 'js': will open a new tab in the default browser using IPython - 'jupyter': will open the report in a jupyter notebook :param mode: the selected web server mode. default: 'server' :param port: the server port. default: None. a random port will be generated between (1024-49151) :return: None """ if not port: import random port = random.randint(1024, 49151) super()._serve_report_using_flask(self._template_name, mode, port)
[docs] def save_report(self, zip_report: bool = False) -> None: """ Creates the report directory, copies the web application based on the template name, saves the report data. :param zip_report: enable it in order to zip the directory for downloading. default: False :return: None """ super()._save_the_report(self._template_name, zip_report)
def _find_and_add_all_secondary_datasets_patterns(self, patterns_report_group_by_categorical_features, patterns_report_group_by_numerical_features, patterns_report_number_of_bins) -> None: """ Find all groups in secondary datasets and check if they exist in the primary datasets. Outputs the groups, error and target distributions and the distance between the distributions. :param patterns_report_group_by_categorical_features: categorical features to use in the patterns report. default: 'all' :param patterns_report_group_by_numerical_features: numerical features to use in the patterns report. default: 'all' :param patterns_report_number_of_bins: number of bins to use for each provided numerical feature or one number of bins to use for all provided numerical features. default: 10 :return: None """ def query_datasets_for_count_error_target(primary_df, secondary_df, features_values): query_list = [] for feature, feature_value in features_values: query_list.append(f'{feature} == "{feature_value}"') query = ' and '.join(query_list) filtered_primary_dataset = primary_df.query(query) filtered_secondary_dataset = secondary_df.query(query) output = { 'primaryCount': filtered_primary_dataset.shape[0], 'secondaryCount': filtered_secondary_dataset.shape[0], 'secondaryErrorMean': filtered_secondary_dataset.loc[:, self.error_column_name].mean(), 'secondaryErrorStd': filtered_secondary_dataset.loc[:, self.error_column_name].std(), 'secondaryTargetMean': filtered_secondary_dataset.loc[:, self.target_feature_name].mean(), 'secondaryTargetStd': filtered_secondary_dataset.loc[:, self.target_feature_name].std(), 'primaryTargetValues': filtered_primary_dataset.loc[:, self.target_feature_name].tolist(), 'secondaryTargetValues': filtered_secondary_dataset.loc[:, self.target_feature_name].tolist(), 'primaryErrorValues': filtered_primary_dataset.loc[:, self.error_column_name].tolist(), 'secondaryErrorValues': filtered_secondary_dataset.loc[:, self.error_column_name].tolist(), 'primaryErrorMean': filtered_primary_dataset.loc[:, self.error_column_name].mean(), 'primaryErrorStd': filtered_primary_dataset.loc[:, self.error_column_name].std(), 'primaryTargetMean': filtered_primary_dataset.loc[:, self.target_feature_name].mean(), 'primaryTargetStd': filtered_primary_dataset.loc[:, self.target_feature_name].std() } for dataset in ['primary', 'secondary']: if dataset == 'primary': df = filtered_primary_dataset else: df = filtered_secondary_dataset if output[f'{dataset}Count'] == 1: output.update({ f'{dataset}ErrorMean': df.loc[:, self.error_column_name].values[0], f'{dataset}ErrorStd': None, f'{dataset}TargetMean': df.loc[:, self.target_feature_name].values[0], f'{dataset}TargetStd': None, }) elif output[f'{dataset}Count'] == 0: output.update({ f'{dataset}ErrorMean': None, f'{dataset}ErrorStd': None, f'{dataset}TargetMean': None, f'{dataset}TargetStd': None, }) if output['primaryCount'] > 0 and output['secondaryCount'] > 0: output['errorWassersteinDistance'] = wasserstein_distance( filtered_secondary_dataset.loc[:, self.error_column_name], filtered_primary_dataset.loc[:, self.error_column_name]) output['targetWassersteinDistance'] = wasserstein_distance( filtered_secondary_dataset.loc[:, self.target_feature_name], filtered_primary_dataset.loc[:, self.target_feature_name]) else: output['errorWassersteinDistance'] = None output['targetWassersteinDistance'] = None return output def add_patterns(grouped_patterns_dictionary: Dict, primary_dataset: str, secondary_dataset: str) -> None: """ Group by all features in secondary_dataset and try to find these patterns in primary dataset. :param grouped_patterns_dictionary: the patterns data dictionary :param primary_dataset: the name of the primary data set :param secondary_dataset: the name of the secondary data set :return: None """ primary_df, secondary_df = self._get_primary_secondary_datasets(primary_dataset, secondary_dataset) key = f'{primary_dataset}_{secondary_dataset}' patterns_dictionary = {} if patterns_report_group_by_categorical_features == 'all': group_by_features = self.categorical_features[:] else: group_by_features = patterns_report_group_by_categorical_features[:] if patterns_report_group_by_numerical_features == 'all': numerical_features = list(filter(lambda f_name: f_name != self.target_feature_name, self.numerical_features)) else: numerical_features = patterns_report_group_by_numerical_features[:] for numerical_feature_index, numerical_feature in enumerate(numerical_features): binning_features_name = f'{numerical_feature}_BIN' if type(patterns_report_number_of_bins) is int: number_of_bins = patterns_report_number_of_bins else: number_of_bins = patterns_report_number_of_bins[numerical_feature_index] secondary_df.loc[:, binning_features_name], bins = pd.cut(secondary_df.loc[:, numerical_feature], retbins=True, include_lowest=True, bins=number_of_bins) primary_df.loc[:, binning_features_name] = pd.cut(primary_df.loc[:, numerical_feature], bins=bins) primary_df = primary_df.dropna() primary_df.loc[:, binning_features_name] = primary_df.loc[:, binning_features_name].astype(str) secondary_df.loc[:, binning_features_name] = secondary_df.loc[:, binning_features_name].astype(str) group_by_features.append(binning_features_name) primary_df = primary_df.drop(numerical_features, axis=1) secondary_df = secondary_df.drop(numerical_features, axis=1) secondary_groupby_all_df = secondary_df.groupby(by=group_by_features).mean() secondary_all_groups = secondary_groupby_all_df.index.tolist() patterns_list = [] groupby_features_length = len(group_by_features) for index, group in enumerate(secondary_all_groups): group_dict = {'name': f'Group {index}', 'features': {}} features_values = [] for feature_index, feature in enumerate(group_by_features): if groupby_features_length > 1: group_dict['features'][feature] = group[feature_index] features_values.append((feature, group[feature_index])) else: group_dict['features'][feature] = group features_values.append((feature, group)) count_error_target_dict = query_datasets_for_count_error_target(primary_df, secondary_df, features_values) group_dict.update(count_error_target_dict) patterns_list.append(group_dict) patterns_dictionary[key] = patterns_list grouped_patterns_dictionary.update(patterns_dictionary) grouped_patterns_dict = {} for primary_dataset_name, secondary_dataset_name in product(self._primary_datasets, self._secondary_datasets): add_patterns(grouped_patterns_dict, primary_dataset_name, secondary_dataset_name) self._update_report({'grouped_patterns': grouped_patterns_dict})