Source code for mlvern.data.inspect

import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional

import numpy as np
import pandas as pd


[docs] class DataInspector: """Comprehensive data profiling and validation framework.""" def __init__( self, df: pd.DataFrame, target: Optional[str] = None, mlvern_dir: str = ".", ): self.df = df self.target = target self.mlvern_dir = mlvern_dir self.report: dict[str, Any] = {}
[docs] def validate_input(self) -> bool: """Validate input data.""" if self.df.empty: raise ValueError("Dataset is empty") return True
[docs] def safe_numeric_profile(self, min_rows: int = 2) -> dict[str, Any]: """Check if dataset is large enough for numeric profiling. Returns analysis or explicit skip status. """ n_rows = len(self.df) if n_rows < min_rows: return { "status": "skipped", "reason": "insufficient_rows", "required_min_rows": min_rows, "actual_rows": n_rows, } return {"status": "available"}
[docs] def profile_data(self) -> dict[str, Any]: """Part 1: Comprehensive data profiling.""" profile = { "dataset_shape": self._profile_shape(), "schema": self._profile_schema(), "missing_values": self._profile_missing(), "duplicates": self._profile_duplicates(), "cardinality": self._profile_cardinality(), "numeric_ranges": self._profile_numeric_ranges(), "outliers": self._profile_outliers(), "target_distribution": (self._profile_target() if self.target else {}), } return profile
[docs] def validate_data(self) -> dict[str, Any]: """Part 2: Comprehensive data validation.""" validation = { "schema_validation": self._validate_schema(), "range_constraints": self._validate_ranges(), "null_thresholds": self._validate_null_thresholds(), "type_consistency": self._validate_type_consistency(), "leakage_checks": self._validate_leakage(), "temporal_validity": self._validate_temporal(), } return validation
def _profile_shape(self) -> dict[str, Any]: """Profile dataset shape and size.""" return { "rows": int(self.df.shape[0]), "columns": int(self.df.shape[1]), "memory_mb": round( self.df.memory_usage(deep=True).sum() / (1024**2), 4, ), "sparsity_percent": round( (self.df.size - self.df.count().sum()) / self.df.size * 100, 2 ), } def _profile_schema(self) -> dict[str, Any]: """Profile data types and schema.""" schema = {} for col in self.df.columns: schema[col] = { "dtype": str(self.df[col].dtype), "non_null_count": int(self.df[col].count()), } return schema def _profile_missing(self) -> dict[str, Any]: """Profile missing values with patterns.""" missing = self.df.isnull().sum() missing_pct = (missing / len(self.df) * 100).round(2) result = {} for col in missing[missing > 0].index: result[col] = { "count": int(missing[col]), "percentage": float(missing_pct[col]), } return { "total_missing": int(missing.sum()), "columns_affected": int((missing > 0).sum()), "details": result, } def _profile_duplicates(self) -> dict[str, Any]: """Profile duplicate rows.""" total_dups = int(self.df.duplicated().sum()) dup_info: dict[str, Any] = {"total": total_dups} if total_dups > 0: dup_info["percentage"] = round(total_dups / len(self.df) * 100, 2) # Check duplicates by subset dup_subset = int( self.df.duplicated(subset=self.df.columns[:-1], keep=False).sum() ) dup_info["by_features"] = dup_subset return dup_info def _profile_cardinality(self) -> dict[str, Any]: """Profile cardinality of categorical features.""" cardinality = {} categorical_cols = self.df.select_dtypes(exclude="number").columns for col in categorical_cols: unique_count = int(self.df[col].nunique()) cardinality[col] = { "unique_values": unique_count, "cardinality_ratio": round(unique_count / len(self.df), 4), "top_values": self.df[col].value_counts().head(5).to_dict(), } return cardinality def _profile_numeric_ranges(self) -> dict[str, Any]: """Profile ranges and statistics of numeric features.""" check = self.safe_numeric_profile(min_rows=2) if check["status"] == "skipped": return check numeric_cols = self.df.select_dtypes(include="number").columns ranges = {} for col in numeric_cols: # Use ddof=0 to avoid division-by-zero warnings with np.errstate(divide="ignore", invalid="ignore"): std_val = float(self.df[col].std(ddof=0)) if pd.isna(std_val): std_val = 0.0 ranges[col] = { "min": float(self.df[col].min()), "max": float(self.df[col].max()), "mean": float(self.df[col].mean()), "median": float(self.df[col].median()), "std": std_val, "q25": float(self.df[col].quantile(0.25)), "q75": float(self.df[col].quantile(0.75)), } return ranges def _profile_outliers(self) -> dict[str, Any]: """Detect outliers using IQR method. Skips if <5 rows (insufficient for reliable outlier detection). """ check = self.safe_numeric_profile(min_rows=5) if check["status"] == "skipped": return check numeric_cols = self.df.select_dtypes(include="number").columns outliers = {} for col in numeric_cols: Q1 = self.df[col].quantile(0.25) Q3 = self.df[col].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR outlier_count = int( ((self.df[col] < lower_bound) | (self.df[col] > upper_bound)).sum() ) if outlier_count > 0: outliers[col] = { "count": outlier_count, "percentage": round(outlier_count / len(self.df) * 100, 2), "lower_bound": float(lower_bound), "upper_bound": float(upper_bound), } return outliers def _profile_target(self) -> dict[str, Any]: """Profile target variable distribution.""" if self.target not in self.df.columns: return {"error": f"Target '{self.target}' not found"} target_col = self.df[self.target] class_dist = target_col.value_counts().to_dict() if len(class_dist) == 0: return {} max_class = max(class_dist.values()) min_class = min(class_dist.values()) imbalance_ratio = round(max_class / max(min_class, 1), 2) return { "target": self.target, "type": ("categorical" if target_col.dtype == "object" else "numeric"), "class_distribution": class_dist, "imbalance_ratio": imbalance_ratio, "n_classes": len(class_dist), } def _validate_schema(self) -> dict[str, Any]: """Validate schema consistency.""" issues = [] # Check for unnamed columns if self.df.columns.isnull().any(): issues.append("Dataset has unnamed columns") # Check for duplicate column names if self.df.columns.duplicated().any(): issues.append("Dataset has duplicate column names") return { "is_valid": len(issues) == 0, "issues": issues, } def _validate_ranges(self) -> dict[str, Any]: """Validate value ranges.""" violations = {} numeric_cols = self.df.select_dtypes(include="number").columns for col in numeric_cols: # Check for inf values inf_count = int(np.isinf(self.df[col]).sum()) if inf_count > 0: violations[col] = { "type": "infinite_values", "count": inf_count, } return { "is_valid": len(violations) == 0, "violations": violations, } def _validate_null_thresholds(self) -> dict[str, Any]: """Validate null value thresholds (default: 50%).""" threshold = 0.5 violations = {} missing_pct = self.df.isnull().sum() / len(self.df) * 100 for col in missing_pct[missing_pct >= threshold * 100].index: violations[col] = { "missing_percentage": float(missing_pct[col]), "threshold": threshold * 100, } return { "is_valid": len(violations) == 0, "threshold_percent": threshold * 100, "violations": violations, } def _validate_type_consistency(self) -> dict[str, Any]: """Validate type consistency across columns.""" issues = {} for col in self.df.columns: col_dtype = str(self.df[col].dtype) # Check for mixed types in object columns if col_dtype == "object": types_in_col = self.df[col].apply(lambda x: type(x).__name__).unique() if len(types_in_col) > 2: # Allow None and one other type issues[col] = { "types_found": list(types_in_col), "message": "Mixed types detected", } return { "is_valid": len(issues) == 0, "issues": issues, } def _validate_leakage(self) -> dict[str, Any]: """Check for potential data leakage patterns.""" leakage_indicators = [] # Correlation requires >= 2 rows if len(self.df) < 2: return { "has_leakage_risk": False, "indicators": [], "status": "skipped", "reason": "insufficient_rows", } if self.target and self.target in self.df.columns: # Check for perfect correlations with target numeric_cols = self.df.select_dtypes(include="number").columns if self.target in numeric_cols: for col in numeric_cols: if col != self.target: corr = abs(self.df[col].corr(self.df[self.target])) if corr > 0.99: leakage_indicators.append( { "feature": col, "correlation": float(corr), "message": "Perfect correlation with target", } ) return { "has_leakage_risk": len(leakage_indicators) > 0, "indicators": leakage_indicators, } def _validate_temporal(self) -> dict[str, Any]: """Validate temporal columns if present. Detects temporal columns by name heuristics or successful parsing. """ temporal_cols = {} for col in self.df.columns: try: parsed = pd.to_datetime(self.df[col], errors="coerce") # count entries that parsed as datetime parsed_count = int(parsed.notnull().sum()) original_nulls = int(self.df[col].isnull().sum()) invalid_count = int(parsed.isnull().sum() - original_nulls) # heuristics: column name or any successful parse name_hint = any( key in col.lower() for key in ("date", "time", "timestamp", "created", "updated", "at") ) if name_hint or parsed_count > 0: temporal_cols[col] = { "dtype": str(self.df[col].dtype), "parsed_count": parsed_count, "invalid_dates": invalid_count, "date_range": { "min": (str(parsed.min()) if parsed_count > 0 else None), "max": (str(parsed.max()) if parsed_count > 0 else None), }, } except Exception: temporal_cols[col] = {"error": "Failed to parse as datetime"} return { "has_temporal_columns": len(temporal_cols) > 0, "columns": temporal_cols, }
[docs] def inspect(self) -> dict[str, Any]: """Run complete inspection.""" self.validate_input() self.report = { "metadata": { "generated_at": datetime.now(timezone.utc).isoformat(), "library": "mlvern", "version": "0.1.0", }, "part_1_profiling": self.profile_data(), "part_2_validation": self.validate_data(), "vulnerabilities": [], "recommendations": [], } self._assess_vulnerabilities() return self.report
def _assess_vulnerabilities(self): """Assess vulnerabilities and generate recommendations.""" profile = self.report["part_1_profiling"] validation = self.report["part_2_validation"] # Check missing values if profile["missing_values"]["total_missing"] > 0: missing_count = profile["missing_values"]["total_missing"] self.report["vulnerabilities"].append( { "severity": "WARNING", "type": "MISSING_VALUES", "message": f"{missing_count} missing values detected", } ) msg = ( "Consider imputing missing values using mean, median, " "or KNN imputation" ) self.report["recommendations"].append(msg) # Check duplicates if profile["duplicates"]["total"] > 0: self.report["vulnerabilities"].append( { "severity": "WARNING", "type": "DUPLICATES", "message": ( f"{profile['duplicates']['total']} " "duplicate rows detected" ), } ) self.report["recommendations"].append( "Consider removing or investigating duplicate rows" ) # Check target if self.target and self.target in self.df.columns: target_dist = profile["target_distribution"] if "imbalance_ratio" in target_dist and target_dist["imbalance_ratio"] > 3: self.report["vulnerabilities"].append( { "severity": "WARNING", "type": "CLASS_IMBALANCE", "message": ( f"Imbalance ratio is " f"{target_dist['imbalance_ratio']}" ), } ) self.report["recommendations"].append( "Use SMOTE, class weighting, or resampling " "for class imbalance" ) # Check null thresholds if not validation["null_thresholds"]["is_valid"]: self.report["vulnerabilities"].append( { "severity": "CRITICAL", "type": "HIGH_MISSING_THRESHOLD", "message": "Columns exceed 50% missing values", } ) self.report["recommendations"].append( "Consider dropping or heavily imputing columns " "with >50% missing values" ) # Check leakage if validation["leakage_checks"]["has_leakage_risk"]: self.report["vulnerabilities"].append( { "severity": "CRITICAL", "type": "DATA_LEAKAGE", "message": "Perfect correlation detected with target", } ) self.report["recommendations"].append( "Investigate and remove features with " "perfect target correlation" )
[docs] def save_report(self, filename: str = "data_inspection_report.json") -> Path: """Save inspection report to JSON file and return Path.""" reports_dir = Path(self.mlvern_dir) / "reports" reports_dir.mkdir(parents=True, exist_ok=True) path = reports_dir / filename path.write_text(json.dumps(self.report, indent=4), encoding="utf-8") return path
[docs] def inspect_data( df: pd.DataFrame, target: Optional[str] = None, mlvern_dir: str = ".", ) -> dict[str, Any]: """Convenience function for data inspection.""" inspector = DataInspector(df, target, mlvern_dir) report = inspector.inspect() inspector.save_report() return report