import logging
from typing import Any, Dict, Optional, Union
import matplotlib
import numpy as np
import pandas as pd
import ppscore as pps
import scipy
from matplotlib import dates, ticker
from statsmodels.stats.proportion import proportion_confint
from datavizml import utils
[docs]class SingleDistribution:
"""A graphical summary of a given feature and its relationship to a target
:param feature: Feature to be analysed
:type feature: pandas Series
:param ax: Axes to plot on
:type ax: matplotlib Axes
:param feature_deskew: reduce feature skew, trialling: squaring, rooting, logging, exponents and Yeo-Johnson
:type feature_deskew: bool, optional
:param target: Target to be predicted
:type target: pandas Series, optional
:param target_score: Precomputed score to avoid recalculation
:type target_score: float, optional
:param target_rebalance: reduce class imbalance in target score
:type target_rebalance: bool, optional
:param binning_threshold: Maximum number of distinct values in the column before binning, defaults to 12
:type binning_threshold: int, optional
:param metric: Metric used for prevalence, "count" or "prop" (default)
:type metric: string, optional
"""
BINNING_THRESHOLD_DEFAULT = 12 # distinct values for binning
CI_SIGNIFICANCE_DEFAULT = 0.05 # confidence interval significance
COLOUR_FEATURE_DEFAULT = "grey" # colour used for feature
COLOURMAP_TARGET_DEFAULT = "tab10" # colour map used for target
def __init__(
self,
feature: Any,
ax: Any,
feature_deskew: bool = False,
target: Optional[Any] = None,
target_score: Optional[float] = None,
target_rebalance: bool = False,
binning_threshold: Optional[int] = None,
metric: str = "prop",
) -> None:
"""Constructor method"""
# input variables
self.ax_feature = ax
self.__feature_deskew = feature_deskew
self.feature = feature
self.__has_target = target is not None
if self.__has_target:
self.target = target
self.__target_rebalance = target_rebalance
if self.feature.name == self.target.name:
# clear target if the same as feature
del self.__target
self.__has_target = False
if isinstance(target_score, (int, float, np.integer, np.floating)):
self.__target_score = target_score
self.__target_score_type = "PPS"
elif target_score is not None:
raise TypeError(
f"target_score is of {target_score.__class__.__name__} type which is not valid"
)
self.__binning_threshold = (
binning_threshold
if binning_threshold
else SingleDistribution.BINNING_THRESHOLD_DEFAULT
)
self.__metric = metric
# check input
if self.__has_target:
if self.feature.shape[0] != self.target.shape[0]:
raise ValueError(
f"Dimension mismatch, feature has {self.feature.shape[0]} elements but the target has {self.target.shape[0]}"
)
# classify inputs
(
self.__feature_is_bool,
self.__feature_is_numeric,
self.__feature_is_datetime,
self.__feature_dtype,
) = utils.classify_type(self.feature)
if self.__has_target:
(
self.__target_is_bool,
self.__target_is_numeric,
_,
self.__target_dtype,
) = utils.classify_type(self.target)
if self.__target_is_numeric and not self.__target_is_bool:
self.__target_type = "regression"
else:
self.__target_type = "classification"
# supplementary/reusable variables
self.__feature_nunique = self.feature.nunique(dropna=False)
missing_proportion = self.feature.isna().value_counts(normalize=True)
self.__missing_proportion = (
missing_proportion[True] if True in missing_proportion.index else 0
)
if self.__has_target:
self.ax_target = self.ax_feature.twinx()
def __str__(self) -> str:
"""Returns a string representation of the instance
:return: A string containing the feature and target name and their data types
:rtype: str
"""
# conditional strings
target_val = (
f"{self.target.name} ({self.__target_dtype} - {self.__target_type})"
if self.__has_target
else "no target provided"
)
# attribute related strings
feature_str = f"feature: {self.feature.name} ({self.__feature_dtype})"
target_str = f"target: {target_val}"
return ", ".join([feature_str, target_str])
def __call__(
self,
ci_significance: float = CI_SIGNIFICANCE_DEFAULT,
colour_feature: str = COLOUR_FEATURE_DEFAULT,
colourmap_target: str = COLOURMAP_TARGET_DEFAULT,
) -> None:
"""Generates and decorates the plot
: param ci_significance: Significance level for the target confidence interval calculation, defaults to 0.05
: type ci_significance: float, optional
: param colour_feature: Colour used for the feature plot, defaults to "grey"
: type colour_feature: str, optional
: param colourmap_target: Colour map used for the target plot, defaults to "tab10"
: type colourmap_target: str, optional
"""
# load colourmap
self.__cmap = matplotlib.colormaps[colourmap_target]
# calculate target score
if not hasattr(self, "_SingleDistribution__target_score"):
self.calculate_target_score()
# calculate feature score
if not hasattr(self, "_SingleDistribution__feature_score"):
self.calculate_feature_score()
# summarise feature
if not hasattr(self, "_SingleDistribution__feature_summary"):
self.summarise_feature()
# plot feature frequency
markerline, stemlines, baseline = self.ax_feature.stem(
self.__feature_summary.index, self.__feature_summary[self.__metric]
)
markerline.set_color(colour_feature)
stemlines.set_color(colour_feature)
baseline.set_color(colour_feature)
# plot target values and uncertainty
if self.__has_target:
ci_diff_all: Dict[Any, Any]
y_plot_all: Dict[Any, Any]
# regression specific calculations
if self.__target_type == "regression":
z_crit = scipy.stats.norm.ppf(1 - ci_significance / 2)
ci_diff_all = {None: self.__feature_summary["std"] * z_crit}
y_plot_all = {None: self.__feature_summary["mean"]}
# classification specific calculations
elif self.__target_type == "classification":
# calculate values for each class
ci_diff_all = {}
y_plot_all = {}
for class_name, values in self.__feature_summary.drop(
columns=["count", "prop"]
).items():
mean = values / self.__feature_summary["count"]
ci_lo, ci_hi = proportion_confint(
values, self.__feature_summary["count"], ci_significance
)
ci_diff_all[class_name] = 100 * np.concatenate(
(
(mean - ci_lo).values.reshape(1, -1),
(ci_hi - mean).values.reshape(1, -1),
)
)
y_plot_all[class_name] = mean * 100
# drop false class for boolean
if self.__target_is_bool:
del ci_diff_all[False]
del y_plot_all[False]
# plot errorbars
for (class_name, ci_diff), (_, y_plot), colour_target in zip(
ci_diff_all.items(),
y_plot_all.items(),
[self.__cmap(i) for i in range(len(y_plot_all))],
):
self.ax_target.errorbar(
self.__feature_summary.index,
y_plot,
yerr=ci_diff,
color=colour_target,
elinewidth=2,
capsize=3,
capthick=2,
label=class_name,
ls="",
marker="D",
markersize=3,
)
# decorate x axis
self.ax_feature.set_xlabel(self.feature.name)
if self.__feature_is_numeric and not self.__feature_is_bool:
self.ax_feature.xaxis.set_minor_locator(ticker.MaxNLocator(integer=True))
self.ax_feature.xaxis.set_major_locator(ticker.MaxNLocator(5, integer=True))
if not self.__feature_is_datetime:
# decorate depending on transform
if self.__feature_transform is None:
_, ax_max = self.ax_feature.get_xlim()
if ax_max > 1000:
self.ax_feature.xaxis.set_major_formatter(
ticker.StrMethodFormatter("{x:,.0f}")
)
elif self.__feature_transform == "square":
self.ax_feature.xaxis.set_major_formatter(
ticker.StrMethodFormatter("$\sqrt{{{x:.0f}}}$")
)
elif self.__feature_transform == "square-root":
self.ax_feature.xaxis.set_major_formatter(
ticker.StrMethodFormatter("${{{x:.0f}}}^2$")
)
elif self.__feature_transform == "log-2":
self.ax_feature.xaxis.set_major_formatter(
ticker.StrMethodFormatter("$2^{{{x:.0f}}}$")
)
elif self.__feature_transform == "exp-2":
self.ax_feature.xaxis.set_major_formatter(
ticker.StrMethodFormatter("$\log_2{{{x:.0f}}}$")
)
elif self.__feature_transform == "yeojohnson":
self.ax_feature.xaxis.set_ticklabels([])
elif self.__feature_is_datetime:
self.ax_feature.xaxis.set_minor_locator(dates.AutoDateLocator())
self.ax_feature.xaxis.set_major_locator(dates.AutoDateLocator(maxticks=5))
else:
if self.__feature_nunique > self.__binning_threshold:
self.ax_feature.set_xticklabels([])
else:
self.ax_feature.tick_params(axis="x", labelrotation=90)
# decorate first y axis
if self.__metric == "count":
self.ax_feature.set_ylabel("Frequency")
self.ax_feature.yaxis.set_major_formatter(
ticker.StrMethodFormatter("{x:,.0f}")
)
elif self.__metric == "prop":
self.ax_feature.set_ylabel("Frequency density")
self.ax_feature.yaxis.set_major_formatter(
ticker.StrMethodFormatter("{x:,.2f}")
)
# decorate second y axis
if self.__has_target:
twin_y_colour = (
"k"
if len(y_plot_all) > 1 and not self.__target_is_bool
else colour_target
)
self.ax_target.set_ylabel(self.target.name, color=twin_y_colour)
self.ax_target.tick_params(axis="y", labelcolor=twin_y_colour)
if self.__target_type == "classification":
self.ax_target.yaxis.set_major_formatter(ticker.PercentFormatter())
if not self.__target_is_bool:
self.ax_target.legend()
# add title
if self.__has_target:
score_type, score = self.__target_score_type, self.__target_score
else:
score_type, score = self.__feature_score_type, self.__feature_score
self.ax_feature.set_title(
f"{score_type} = {score:.2f}\n({100*self.__missing_proportion:.1f}% missing)"
)
[docs] def calculate_feature_score(self) -> None:
"""Calculate the score for the feature based on its skewness"""
self.__feature_score: pd.DataFrame
self.__feature_score_type: Union[None, str]
if (
self.__feature_is_numeric or self.__feature_is_datetime
) and not self.__feature_is_bool:
# calculate skew of median towards deciles
feature = (
self.feature
if not self.__feature_is_datetime
else (self.feature - self.feature.min()).dt.total_seconds()
)
self.__feature_score, self.__feature_score_type = utils.inter_decile_skew(
feature
)
else:
# calculate skew towards the mode
self.__feature_score = self.feature.value_counts(normalize=True).max()
self.__feature_score_type = "Categorical skew"
[docs] def calculate_target_score(self) -> None:
"""Calculate the score for the feature based on its predictive power"""
if self.__has_target:
# rebalance classes
if self.__target_type == "classification" and self.__target_rebalance:
x_balanced, y_balanced = utils.class_rebalance(
self.feature, self.target
)
df = pd.concat([x_balanced, y_balanced], axis=1)
else:
df = pd.concat([self.feature, self.target], axis=1)
## calculate score
self.__target_score = pps.score(
df=df,
x=self.feature.name,
y=self.target.name,
sample=None,
invalid_score=np.nan,
)["ppscore"]
self.__target_score_type = "PPS"
else:
self.__target_score = np.nan
self.__target_score_type = "N/A"
[docs] def summarise_feature(self) -> None:
"""Summarise the feature by calculating summary statistics for each distinct value and binning if there are too many distinct values"""
# join feature and target intro single dataframe
if self.__has_target:
all_data = pd.concat([self.feature, self.target], axis=1)
else:
all_data = self.feature.to_frame()
# bin target variable if there are too many distinct values
if self.__feature_nunique > self.__binning_threshold and (
self.__feature_is_numeric or self.__feature_is_datetime
):
feature = (
self.feature
if not self.__feature_is_datetime
else (self.feature - self.feature.min()).dt.total_seconds()
)
bin_boundaries = np.linspace(
feature.min(), feature.max(), self.__binning_threshold + 1
)
all_data[self.feature.name] = pd.cut(feature, bin_boundaries).apply(
lambda x: x.mid
)
if self.__feature_is_datetime:
all_data[self.feature.name] = (
pd.to_timedelta(all_data[self.feature.name], unit="s")
+ self.feature.min()
)
# calculate summary statistics for each distinct target variable
if self.__has_target:
if self.__target_type == "regression":
self.__feature_summary: pd.DataFrame = all_data.groupby(
self.feature.name
).agg({"count", "mean", "std"})
self.__feature_summary.columns = (
self.__feature_summary.columns.droplevel()
)
elif self.__target_type == "classification":
self.__feature_summary = pd.pivot_table(
all_data.value_counts().to_frame("count"),
values="count",
index=self.feature.name,
columns=self.target.name,
fill_value=0,
)
self.__feature_summary["count"] = self.__feature_summary.sum(axis=1)
else:
self.__feature_summary = all_data.value_counts().to_frame("count")
self.__feature_summary.index = self.__feature_summary.index.map(
lambda x: x[0]
)
self.__feature_summary["prop"] = (
self.__feature_summary["count"] / self.__feature_summary["count"].sum()
)
# convert index to string from boolean for printing purposes
if self.__feature_is_bool:
self.__feature_summary.index = self.__feature_summary.index.map(
{True: "True", False: "False"}
)
[docs] def to_dict(self) -> dict:
"Summarise as a dictionary"
summary = {
"feature_name": self.feature.name,
"feature_dtype": self.__feature_dtype,
"feature_score": self.__feature_score,
"feature_score_type": self.__feature_score_type,
"feature_transform": self.__feature_transform,
"feature_nunique": self.__feature_nunique,
"feature_missing_proportion": self.__missing_proportion,
"target_name": self.target.name if self.__has_target else None,
"target_dtype": self.__target_dtype if self.__has_target else None,
"target_score": self.__target_score,
"target_score_type": self.__target_score_type,
}
return summary
# feature getter
@property
def feature(self) -> pd.Series:
"""The feature data"""
return self.__feature
# feature setter
@feature.setter
def feature(self, feature: Any) -> None:
if hasattr(self, "feature"):
# do not allow changing of data
raise AttributeError("This attribute has already been set")
else:
# convert to series and set
data = utils.to_series(feature)
is_bool, is_numeric, _, _ = utils.classify_type(data)
# reduce feature skew
if self.__feature_deskew and (is_numeric and not is_bool):
self.__feature_transform, self.__feature = utils.reduce_skew(data)
else:
self.__feature_transform, self.__feature = None, data
# target getter
@property
def target(self) -> pd.Series:
"""The target data"""
self.__target: pd.Series
return self.__target
# target setter
@target.setter
def target(self, target: Any) -> None:
if hasattr(self, "target") or not self.__has_target:
# do not allow changing of data
raise AttributeError("This attribute has already been set")
else:
# convert to series and set
self.__target = utils.to_series(target)