Source code for mlvern.data.risk_check

import json
from pathlib import Path
from typing import Any, Dict, List, Optional, cast

import numpy as np
import pandas as pd
from scipy import stats

try:
    from sklearn.metrics import mutual_info_score
except Exception:
    mutual_info_score = None


[docs] def class_imbalance(df: pd.DataFrame, target: str) -> Dict[str, Any]: if target not in df.columns: return {"error": f"Target '{target}' not found"} counts = df[target].value_counts().to_dict() max_c = max(counts.values()) if counts else 0 min_c = min(counts.values()) if counts else 0 imbalance_ratio = float(max_c / max(min_c, 1)) return {"counts": counts, "imbalance_ratio": imbalance_ratio}
[docs] def sensitive_attribute_imbalance( df: pd.DataFrame, sensitive_cols: List[str] ) -> Dict[str, Any]: result = {} for col in sensitive_cols: if col not in df.columns: result[col] = {"error": "not_present"} continue result[col] = df[col].value_counts(normalize=True).to_dict() return result
[docs] def sampling_bias( baseline: pd.DataFrame, current: pd.DataFrame, cols: Optional[List[str]] = None, ) -> Dict[str, Any]: """Compare categorical distributions using chi-squared test.""" if cols is None: cols = baseline.columns.intersection(current.columns).tolist() report = {} for col in cols: if pd.api.types.is_numeric_dtype(baseline[col]): # bin numeric into quartiles bins = np.quantile( np.concatenate( [ baseline[col].dropna(), current[col].dropna(), ] ), [0, 0.25, 0.5, 0.75, 1.0], ) b_cat = pd.cut(baseline[col], bins=bins, include_lowest=True) c_cat = pd.cut(current[col], bins=bins, include_lowest=True) tbl = pd.crosstab(b_cat, c_cat) else: tbl = pd.crosstab( baseline[col].fillna("__NA__"), current[col].fillna("__NA__") ) try: chi2, p, _, _ = stats.chi2_contingency(tbl) report[col] = {"chi2": float(chi2), "pvalue": float(p)} except Exception: report[col] = cast(Any, {"error": "chi2_failed"}) return report
[docs] def target_leakage_detection( df: pd.DataFrame, target: str, threshold: float = 0.99 ) -> Dict[str, Any]: out = {} if target not in df.columns: return {"error": f"Target '{target}' not found"} numeric = df.select_dtypes(include="number") for col in numeric.columns: if col == target: continue corr = numeric[col].corr(numeric[target]) mi = None if mutual_info_score is not None: try: mi = mutual_info_score( pd.cut(numeric[col], bins=10).astype(str), pd.cut(numeric[target], bins=10).astype(str), ) except Exception: mi = None if pd.isna(corr): corr = 0.0 if abs(corr) >= threshold: out[col] = {"correlation": float(corr), "mutual_info": mi} return out
[docs] def data_drift( baseline: pd.DataFrame, current: pd.DataFrame, cols: Optional[List[str]] = None, ) -> Dict[str, Any]: """Check drift between baseline and current. Uses KS for numeric, chi2 for categorical. """ if cols is None: cols = baseline.columns.intersection(current.columns).tolist() report = {} for col in cols: try: if pd.api.types.is_numeric_dtype(baseline[col]): stat, p = stats.ks_2samp(baseline[col].dropna(), current[col].dropna()) report[col] = {"ks_stat": float(stat), "pvalue": float(p)} else: # chi2 on contingency table of categories tbl = pd.crosstab( baseline[col].fillna("__NA__"), current[col].fillna("__NA__") ) chi2, p, _, _ = stats.chi2_contingency(tbl) report[col] = {"chi2": float(chi2), "pvalue": float(p)} except Exception: report[col] = cast(Any, {"error": "test_failed"}) return report
[docs] def train_test_mismatch( train: pd.DataFrame, test: pd.DataFrame, cols: Optional[List[str]] = None ) -> Dict[str, Any]: """Wrapper around data_drift to check train vs test mismatch.""" return data_drift(train, test, cols)
[docs] def run_risk_checks( df: pd.DataFrame, target: Optional[str] = None, sensitive: Optional[List[str]] = None, baseline: Optional[pd.DataFrame] = None, train: Optional[pd.DataFrame] = None, test: Optional[pd.DataFrame] = None, mlvern_dir: Optional[str] = None, ) -> Dict[str, Any]: report: Dict[str, Any] = {} if target is not None: report["class_imbalance"] = class_imbalance(df, target) report["target_leakage"] = target_leakage_detection(df, target) if sensitive: report["sensitive_imbalance"] = sensitive_attribute_imbalance(df, sensitive) if baseline is not None: report["sampling_bias"] = sampling_bias(baseline, df) report["data_drift"] = data_drift(baseline, df) if train is not None and test is not None: report["train_test_mismatch"] = train_test_mismatch(train, test) # Save report if mlvern_dir is provided if mlvern_dir is not None: reports_dir = Path(mlvern_dir) / "reports" reports_dir.mkdir(parents=True, exist_ok=True) report_path = reports_dir / "risk_report.json" report_path.write_text(json.dumps(report, indent=4), encoding="utf-8") return report