"""Privacy Scanner Router - PII Detection and Redaction""" from fastapi import APIRouter, UploadFile, File, HTTPException, Form from pydantic import BaseModel from typing import Optional import re import io import tempfile import os import duckdb import json import base64 import unicodedata import html router = APIRouter() # Valid TLDs for email detection - prevents false positives on "at 7pm", "at cvs", etc. VALID_TLDS = { 'com', 'org', 'net', 'edu', 'gov', 'io', 'co', 'ai', 'dev', 'app', 'xyz', 'info', 'biz', 'me', 'uk', 'de', 'fr', 'ca', 'au', 'in', 'jp', 'ru', 'br', 'it', 'nl', 'es', 'ch', 'se', 'no', 'fi', 'dk', 'at', 'be', 'pl', 'pt', 'ie', 'nz', 'sg', 'hk', 'kr', 'mx', 'ar', 'cl', 'za', 'us', 'mil', 'int', 'eu', 'asia', 'mobi', 'name', 'pro', 'aero', 'coop', 'museum', 'jobs', 'travel', 'xxx', 'cat', 'tel', 'post', 'club', 'online', 'site', 'tech', 'store', 'blog', 'shop', 'live', 'cloud', 'news', 'email' } # Words that commonly follow "at" but are NOT domains FALSE_TRIGGER_WORDS = { 'the', 'a', 'an', 'my', 'your', 'his', 'her', 'our', 'their', 'ur', 'home', 'work', 'office', 'school', 'noon', 'night', 'midnight', 'dawn', 'dusk', 'once', 'least', 'most', 'first', 'last', 'all', 'gate', 'terminal', 'platform', 'station', 'airport', 'store', 'mall', 'gym', 'park', 'beach', 'bar', 'restaurant', 'hotel', 'clinic', 'hospital', 'bank', 'church', 'cvs', 'target', 'walmart' } # PII Detection Patterns PII_PATTERNS = { "EMAIL": { # Negative lookbehind (? str: """ Layer 2.0a: Apply Unicode NFKC normalization to catch: - Fullwidth characters: john@example.com → john@example.com - Unicode dots: john․doe@example․com → john.doe@example.com - Other homoglyphs and compatibility characters """ return unicodedata.normalize('NFKC', text) def decode_html_entities(text: str) -> str: """ Layer 2.0b: Decode HTML entities to catch: - john&at;example˙com → john@example.com - john@example.com → john@example.com - & < > etc. """ # First pass: standard HTML entities decoded = html.unescape(text) # Second pass: custom obfuscation patterns custom_entities = [ (r'&at;', '@'), (r'˙', '.'), (r'@', '@'), # @ in decimal (r'.', '.'), # . in decimal (r'@', '@'), # @ in hex (r'.', '.'), # . in hex ] for pattern, replacement in custom_entities: decoded = re.sub(pattern, replacement, decoded, flags=re.IGNORECASE) return decoded def preprocess_text(text: str) -> tuple[str, list[dict]]: """ Layer 2.0: Full pre-processing pipeline combining Unicode and HTML normalization. Returns preprocessed text and a log of transformations applied. """ transformations = [] processed = text # Step 1: Unicode NFKC normalization unicode_normalized = normalize_unicode(processed) if unicode_normalized != processed: transformations.append({ "layer": "2.0a", "type": "unicode_nfkc", "changes": len(processed) - len(unicode_normalized) }) processed = unicode_normalized # Step 2: HTML entity decoding html_decoded = decode_html_entities(processed) if html_decoded != processed: transformations.append({ "layer": "2.0b", "type": "html_entity_decode", "changes": len(processed) - len(html_decoded) }) processed = html_decoded return processed, transformations def is_false_positive(value: str, pii_type: str, context: str = "") -> bool: """ Check if a detected value is a known false positive pattern. Returns True if the value should be excluded. """ value_lower = value.lower() context_lower = context.lower() if context else "" # Git SHA check (for AWS_SECRET_KEY false positives) if pii_type == "AWS_SECRET_KEY": # If it's all lowercase hex and 40 chars, it's probably a git SHA if re.match(r'^[a-f0-9]{40}$', value): return True # Also check if "commit" or "sha" in context if any(kw in context_lower for kw in ["commit", "sha", "hash", "git", "rev"]): return True # Timestamp check (for PHONE_US false positives) if pii_type in ["PHONE_US", "PHONE_US_NORMALIZED"]: # Check if it looks like a timestamp (13 digits starting with 1) digits_only = re.sub(r'\D', '', value) if len(digits_only) == 13 and digits_only.startswith('1'): return True if len(digits_only) == 10 and digits_only.startswith('1'): return True # Slack webhook path check (for DRIVERS_LICENSE false positives) if pii_type == "DRIVERS_LICENSE": if re.match(r'^T[A-Z0-9]{8,}$', value) or re.match(r'^B[A-Z0-9]{8,}$', value): if "slack" in context_lower or "webhook" in context_lower: return True # B followed by 7 digits could be HK ID or other codes, need context if re.match(r'^B\d{7}$', value): # Only valid if context suggests driver's license if not any(kw in context_lower for kw in ["license", "licence", "driver", "dmv", "dl#", "dl:"]): return True # Date check for log timestamps (for DATE_OF_BIRTH false positives) if pii_type == "DATE_OF_BIRTH": # Log timestamps often have time component nearby if any(kw in context_lower for kw in ["log", "timestamp", "created", "updated", "time:", "at ", ":"]): if re.search(r'\d{2}:\d{2}:\d{2}', context): # Has time component return True # Version number check if re.match(r'^v?\d+\.\d+\.\d+', value): return True # EMAIL and EMAIL_OBFUSCATED false positives - connection strings and URIs if pii_type in ["EMAIL", "EMAIL_OBFUSCATED"]: # Connection string pattern: ://user:password@host # This catches mongodb+srv://user:pass@cluster, redis://default:pass@host, etc. if re.search(r'://[^@]*:[^@]*@', context): return True # MongoDB/database connection strings: password@cluster.mongodb.net if any(db in value_lower for db in [".mongodb.net", ".mongodb.com", "cluster0", "cluster1"]): return True # Redis connection strings if any(db in value_lower for db in [".redis", "redis-", "redislabs"]): return True # Other database connection patterns if any(db in context_lower for db in ["mongodb://", "mongodb+srv://", "redis://", "postgres://", "mysql://", "amqp://", "connection_string", "conn_str"]): return True # Part of URL (e.g., user:pass@host pattern) if re.search(r'://[^@]+' + re.escape(value), context): return True # UK_POSTCODE false positives - partial passwords, codes if pii_type == "UK_POSTCODE": # Too short or looks like part of a password/hash if len(value) < 6: return True # Check if it's surrounded by alphanumeric chars (part of longer string) value_pos = context.find(value) if value_pos > 0: char_before = context[value_pos - 1] if value_pos > 0 else ' ' char_after = context[value_pos + len(value)] if value_pos + len(value) < len(context) else ' ' if char_before.isalnum() or char_after.isalnum(): return True # ISIN false positives - needs context validation if pii_type == "ISIN": # ISIN should have country code followed by alphanumeric # Exclude if it looks like a passport number or other ID if any(kw in context_lower for kw in ["passport", "travel", "visa", "id card", "identity"]): return True # Exclude random-looking alphanumeric strings without financial context if not any(kw in context_lower for kw in ["isin", "security", "stock", "bond", "cusip", "sedol", "share", "equity", "fund"]): return True # BANK_ACCOUNT false positives - exclude credit card numbers if pii_type == "BANK_ACCOUNT": digits_only = re.sub(r'\D', '', value) # If it's 15-16 digits and passes Luhn, it's a credit card, not bank account if len(digits_only) in [15, 16]: # Luhn check inline digits = [int(d) for d in digits_only] odd_digits = digits[-1::-2] even_digits = digits[-2::-2] total = sum(odd_digits) for d in even_digits: total += sum(divmod(d * 2, 10)) if total % 10 == 0: return True # It's a credit card, not a bank account # INDIA_AADHAAR false positives - exclude IBAN fragments if pii_type == "INDIA_AADHAAR": # If context suggests IBAN or European bank context, it's likely an IBAN fragment if any(kw in context_lower for kw in ["iban", "bic", "swift", "sepa", "bank", "de", "gb", "fr", "nl", "at", "ch"]): return True # Check if preceded by 2-letter country code + 2 digits (IBAN prefix pattern) # Pattern: XX## followed by the detected value iban_prefix_pattern = r'[A-Z]{2}\d{2}\s*' + re.escape(value.replace(' ', r'\s*')) if re.search(iban_prefix_pattern, context, re.IGNORECASE): return True # CANADIAN_SIN false positives - exclude Australian ABN format if pii_type == "CANADIAN_SIN": # ABN format is similar (9 digits in 3-3-3) but in Australian context if any(kw in context_lower for kw in ["abn", "australia", "australian", "business number", "gst"]): return True # CREDIT_CARD false positives - exclude order/confirmation/reference numbers if pii_type in ["CREDIT_CARD", "POSSIBLE_CARD_PATTERN"]: # Check for common prefixes that indicate non-card numbers order_prefixes = ["order", "order #", "order#", "order:", "order number", "conf", "confirmation", "conf #", "conf#", "confirmation #", "ref", "reference", "ref #", "ref#", "reference #", "invoice", "invoice #", "inv #", "inv#", "tracking", "tracking #", "track #", "ticket", "ticket #", "case #", "case#", "transaction id", "trans id", "txn"] if any(prefix in context_lower for prefix in order_prefixes): return True # Check for pattern: "# followed immediately by the number" if re.search(r'#\s*' + re.escape(value), context): return True return False # ============================================================================ # LAYER 2: Text Normalization Functions # ============================================================================ def normalize_text(text: str) -> tuple[str, dict]: """ Layer 2: Normalize text to reveal hidden PII. Returns normalized text and a mapping of original positions. Handles: - [dot] / (dot) / [.] → . - [at] / (at) → @ - Dashes/underscores in numbers → removed - Spaced out characters → joined """ normalized = text transformations = [] # Track transformations for position mapping # 1. Convert defanged dots: [dot], (dot), [.], (.), [ dot ], etc. # Also handle with surrounding spaces: " [dot] " → "." dot_patterns = [ (r'\s*\[\s*dot\s*\]\s*', '.'), # [dot], [ dot ], etc with optional spaces (r'\s*\(\s*dot\s*\)\s*', '.'), # (dot), ( dot ), etc with optional spaces (r'\s*\[\.\]\s*', '.'), # [.] (r'\s*\(\.\)\s*', '.'), # (.) (r'\s+dot\s+', '.'), # " dot " (word boundary) ] for pattern, replacement in dot_patterns: normalized = re.sub(pattern, replacement, normalized, flags=re.IGNORECASE) # 2. Convert defanged at: [at], (at), [ at ], etc. # IMPORTANT: Bracketed forms [at] and (at) are always intentional obfuscation # But plain " at " must be validated to avoid "meet at 5pm" → "meet@5pm" # 2a. Bracketed forms - always convert (these are intentional obfuscation) bracketed_at_patterns = [ (r'\s*\[\s*at\s*\]\s*', '@'), # [at], [ at ], etc with optional spaces (r'\s*\(\s*at\s*\)\s*', '@'), # (at), ( at ), etc with optional spaces ] for pattern, replacement in bracketed_at_patterns: normalized = re.sub(pattern, replacement, normalized, flags=re.IGNORECASE) # 2b. Plain " at " - ONLY convert if it looks like an email pattern: # - Preceded by username-like chars (alphanumeric, dots, underscores) # - Followed by domain-like pattern (word + dot/space-dot + TLD) # Valid: "john at gmail dot com", "mike at company.org" # Invalid: "meet at 5pm", "look at this", "ready at cvs", "at gate b7" def should_normalize_at(text: str, at_position: int) -> bool: """ Robust validation for ' at ' to '@' conversion. Returns True only if this looks like a genuine obfuscated email. Validates: - Username pattern before "at" - Rejects false trigger words after "at" (the, my, home, cvs, etc.) - Rejects time patterns ("at 7pm", "at 2:30") - Domain pattern with valid TLD after "at" """ # Get context around the "at" before_text = text[:at_position] after_text = text[at_position + 4:] # Skip " at " # 1. Check username pattern (must end with valid email username chars) username_match = re.search(r'([a-zA-Z0-9][a-zA-Z0-9._%+-]{0,63})$', before_text) if not username_match: return False # 2. Get the first word after "at" and check for false triggers first_word_match = re.match(r'([a-zA-Z0-9]+)', after_text) if not first_word_match: return False first_word = first_word_match.group(1).lower() # Reject if first word is a common false trigger if first_word in FALSE_TRIGGER_WORDS: return False # 3. Reject time patterns: "at 7pm", "at 2:30", "at noon" time_pattern = r'^[0-9]{1,2}(?::[0-9]{2})?(?:\s*(?:am|pm|AM|PM))?\s*(?:$|[.,!?\s])' if re.match(time_pattern, after_text): return False # 4. Check for valid domain pattern with TLD # Pattern: domain + (dot or " dot ") + valid TLD domain_pattern = r'^([a-zA-Z0-9][a-zA-Z0-9-]{0,61}[a-zA-Z0-9]?)(?:\.|(?:\s+dot\s+))([a-zA-Z]{2,10})(?:\s|$|[.,!?])' domain_match = re.match(domain_pattern, after_text, re.IGNORECASE) if domain_match: potential_tld = domain_match.group(2).lower() if potential_tld in VALID_TLDS: return True return False def smart_at_replacement(match): """Wrapper function for re.sub that uses should_normalize_at()""" full_match = match.group(0) before = match.group(1) # Username-like part after = match.group(2) # Domain-like part # Find the position of " at " in the original normalized text # The match.start() gives us where the full match begins # We need to check from where "at" actually appears at_pos_in_match = len(before) # " at " starts right after the username at_pos_in_text = match.start() + at_pos_in_match if should_normalize_at(normalized, at_pos_in_text): return before + '@' + after return full_match # Return unchanged # Pattern: (username-like) + " at " + (potential domain) normalized = re.sub( r'([a-zA-Z0-9][a-zA-Z0-9._%+-]*)\s+at\s+([a-zA-Z0-9][a-zA-Z0-9.\s]*)', smart_at_replacement, normalized, flags=re.IGNORECASE ) # 3. Remove separators from potential numbers (SSN, CC, phone) # Only in numeric contexts: sequences that look like numbers with separators # Pattern: digit, separator(s), digit, separator(s), digit... def clean_numeric_separators(match): """Remove dashes, underscores, spaces from numeric sequences""" return re.sub(r'[-_\s]', '', match.group()) # Match sequences that look like formatted numbers (3+ digit groups) normalized = re.sub( r'\b(\d{2,4})[-_\s]+(\d{2,4})[-_\s]+(\d{2,4})(?:[-_\s]+(\d{2,4}))?\b', clean_numeric_separators, normalized ) # 4. Join spaced-out characters (t-e-s-t → test, t e s t → test) # Only for sequences that look intentionally obfuscated (alternating char-separator) def join_spaced_chars(match): """Join characters separated by dashes or spaces""" chars = re.findall(r'[A-Za-z0-9]', match.group()) return ''.join(chars) # Match spaced-out patterns: a-b-c-d or a b c d (3+ chars) normalized = re.sub( r'\b([A-Za-z0-9])(?:[-\s]([A-Za-z0-9])){3,}\b', join_spaced_chars, normalized ) return normalized, {"original_length": len(text), "normalized_length": len(normalized)} # ============================================================================ # LAYER 2.5: JSON Blob Extraction & Deep Scanning # ============================================================================ def extract_json_strings(text: str) -> list[tuple[str, int, int]]: """ Extract JSON objects embedded in text for deep scanning. Returns list of (json_string, start_pos, end_pos) """ json_strings = [] # Pattern to find JSON-like structures: {...} or escaped JSON strings # Look for JSON objects that might be embedded as string values json_patterns = [ # Standard JSON objects r'(\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\})', # Escaped JSON in string values (e.g., "{\"key\": \"value\"}") r'"(\{(?:\\"|[^"])*\})"', ] for pattern in json_patterns: for match in re.finditer(pattern, text): json_str = match.group(1) if match.lastindex else match.group() # Unescape if it was escaped JSON if json_str.startswith('{') and '\\"' in json_str: json_str = json_str.replace('\\"', '"') json_strings.append((json_str, match.start(), match.end())) return json_strings def deep_scan_json(json_str: str) -> list[str]: """ Recursively extract all string values from a JSON structure. Returns list of string values to scan. """ extracted_values = [] def extract_strings(obj): if isinstance(obj, str): extracted_values.append(obj) elif isinstance(obj, dict): for key, value in obj.items(): # Also check keys for PII (e.g., "ssn", "email") extracted_values.append(f"{key}: {value}" if isinstance(value, str) else key) extract_strings(value) elif isinstance(obj, list): for item in obj: extract_strings(item) try: parsed = json.loads(json_str) extract_strings(parsed) except json.JSONDecodeError: # If not valid JSON, try to extract key-value pairs with regex # Pattern for "key": "value" pairs kv_pattern = r'"([^"]+)"\s*:\s*"([^"]*)"' for match in re.finditer(kv_pattern, json_str): extracted_values.append(f"{match.group(1)}: {match.group(2)}") return extracted_values # ============================================================================ # LAYER 2.6: Base64 Auto-Detection & Decoding # ============================================================================ def is_valid_base64(s: str) -> bool: """Check if a string is valid base64 encoded.""" # Must be at least 8 chars and multiple of 4 (with padding) if len(s) < 8: return False # Check for valid base64 characters base64_pattern = r'^[A-Za-z0-9+/]+=*$' if not re.match(base64_pattern, s): return False # Length check (with padding, should be multiple of 4) if len(s) % 4 != 0: return False return True def decode_base64_strings(text: str, max_depth: int = 2) -> list[tuple[str, str, int, int, int]]: """ Find and decode base64 strings in text with recursive decoding support (v1.1). Returns list of (original_b64, decoded_text, start_pos, end_pos, decode_depth) Now supports: - Recursive decoding (double-encoded base64) - JSON extraction from decoded content """ decoded_strings = [] # Pattern to find potential base64 strings (min 16 chars to avoid false positives) # Must contain at least one letter and one number to reduce false positives b64_pattern = r'\b([A-Za-z0-9+/]{16,}={0,2})\b' def recursive_decode(b64_str: str, start: int, end: int, depth: int = 1): """Recursively decode base64 strings up to max_depth""" if depth > max_depth: return if not is_valid_base64(b64_str): return try: decoded = base64.b64decode(b64_str).decode('utf-8', errors='ignore') # Only keep if decoded text is printable and looks like real text if decoded and len(decoded) >= 4: printable_ratio = sum(1 for c in decoded if c.isprintable()) / len(decoded) letter_ratio = sum(1 for c in decoded if c.isalpha()) / len(decoded) if printable_ratio > 0.7 and letter_ratio > 0.2: decoded_strings.append((b64_str, decoded, start, end, depth)) # Recursive: Check if decoded content contains more base64 if depth < max_depth: for inner_match in re.finditer(b64_pattern, decoded): recursive_decode( inner_match.group(1), start, # Keep original position end, depth + 1 ) # Also check if decoded content is JSON and scan it if decoded.strip().startswith('{') or decoded.strip().startswith('['): try: json.loads(decoded) # Valid JSON - will be picked up by JSON layer except json.JSONDecodeError: pass except Exception: pass for match in re.finditer(b64_pattern, text): b64_str = match.group(1) recursive_decode(b64_str, match.start(), match.end()) return decoded_strings # ============================================================================ # LAYER 2.7: Spelled-Out Number Detection (NLP-lite) # ============================================================================ # Word to digit mapping WORD_TO_DIGIT = { 'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4', 'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9', 'oh': '0', 'o': '0', # "oh" is often used for zero } def convert_spelled_numbers(text: str) -> tuple[str, list[tuple[str, str, int, int]]]: """ Convert spelled-out numbers to digits. Returns (converted_text, list of (original, converted, start, end)) Examples: - "nine zero zero dash twelve dash eight eight two one" → "900-12-8821" - "four one five five five one two one two" → "415-555-1212" """ conversions = [] result = text # Build pattern for number words number_words = '|'.join(WORD_TO_DIGIT.keys()) # Pattern: sequence of number words separated by spaces, possibly with "dash" or "hyphen" # Must have at least 3 number words to be considered a spelled-out number spelled_pattern = rf'\b((?:(?:{number_words})\s*)+(?:(?:dash|hyphen)\s*(?:(?:{number_words})\s*)+)+)\b' matches = list(re.finditer(spelled_pattern, text, re.IGNORECASE)) for match in reversed(matches): # Process in reverse to maintain positions original = match.group(1) # Convert words to digits converted_parts = [] current_number = '' words = re.split(r'\s+', original.lower()) for word in words: word = word.strip() if word in WORD_TO_DIGIT: current_number += WORD_TO_DIGIT[word] elif word in ['dash', 'hyphen']: if current_number: converted_parts.append(current_number) current_number = '' if current_number: converted_parts.append(current_number) if converted_parts: converted = '-'.join(converted_parts) conversions.append((original, converted, match.start(), match.end())) result = result[:match.start()] + converted + result[match.end():] return result, conversions # ============================================================================ # LAYER 2.8: Non-Latin Character Support & International Patterns # ============================================================================ # European VAT Number patterns by country EU_VAT_PATTERNS = { "AT": r'ATU\d{8}', # Austria "BE": r'BE0?\d{9,10}', # Belgium "BG": r'BG\d{9,10}', # Bulgaria "CY": r'CY\d{8}[A-Z]', # Cyprus "CZ": r'CZ\d{8,10}', # Czech Republic "DE": r'DE\d{9}', # Germany "DK": r'DK\d{8}', # Denmark "EE": r'EE\d{9}', # Estonia "EL": r'EL\d{9}', # Greece (uses EL not GR) "ES": r'ES[A-Z0-9]\d{7}[A-Z0-9]', # Spain "FI": r'FI\d{8}', # Finland "FR": r'FR[A-Z0-9]{2}\d{9}', # France "HR": r'HR\d{11}', # Croatia "HU": r'HU\d{8}', # Hungary "IE": r'IE\d[A-Z0-9+*]\d{5}[A-Z]{1,2}', # Ireland "IT": r'IT\d{11}', # Italy "LT": r'LT\d{9,12}', # Lithuania "LU": r'LU\d{8}', # Luxembourg "LV": r'LV\d{11}', # Latvia "MT": r'MT\d{8}', # Malta "NL": r'NL\d{9}B\d{2}', # Netherlands "PL": r'PL\d{10}', # Poland "PT": r'PT\d{9}', # Portugal "RO": r'RO\d{2,10}', # Romania "SE": r'SE\d{12}', # Sweden "SI": r'SI\d{8}', # Slovenia "SK": r'SK\d{10}', # Slovakia "GB": r'GB\d{9,12}', # UK (pre-Brexit, still used) } # Combined EU VAT pattern EU_VAT_COMBINED = '|'.join(f'({pattern})' for pattern in EU_VAT_PATTERNS.values()) # Greek character transliteration (for detecting Greek names/text) GREEK_TO_LATIN = { 'α': 'a', 'β': 'b', 'γ': 'g', 'δ': 'd', 'ε': 'e', 'ζ': 'z', 'η': 'h', 'θ': 'th', 'ι': 'i', 'κ': 'k', 'λ': 'l', 'μ': 'm', 'ν': 'n', 'ξ': 'x', 'ο': 'o', 'π': 'p', 'ρ': 'r', 'σ': 's', 'ς': 's', 'τ': 't', 'υ': 'y', 'φ': 'f', 'χ': 'ch', 'ψ': 'ps', 'ω': 'o', 'Α': 'A', 'Β': 'B', 'Γ': 'G', 'Δ': 'D', 'Ε': 'E', 'Ζ': 'Z', 'Η': 'H', 'Θ': 'Th', 'Ι': 'I', 'Κ': 'K', 'Λ': 'L', 'Μ': 'M', 'Ν': 'N', 'Ξ': 'X', 'Ο': 'O', 'Π': 'P', 'Ρ': 'R', 'Σ': 'S', 'Τ': 'T', 'Υ': 'Y', 'Φ': 'F', 'Χ': 'Ch', 'Ψ': 'Ps', 'Ω': 'O', # Accented variants 'ά': 'a', 'έ': 'e', 'ή': 'h', 'ί': 'i', 'ό': 'o', 'ύ': 'y', 'ώ': 'o', 'Ά': 'A', 'Έ': 'E', 'Ή': 'H', 'Ί': 'I', 'Ό': 'O', 'Ύ': 'Y', 'Ώ': 'O', } # Cyrillic character transliteration CYRILLIC_TO_LATIN = { 'а': 'a', 'б': 'b', 'в': 'v', 'г': 'g', 'д': 'd', 'е': 'e', 'ё': 'yo', 'ж': 'zh', 'з': 'z', 'и': 'i', 'й': 'y', 'к': 'k', 'л': 'l', 'м': 'm', 'н': 'n', 'о': 'o', 'п': 'p', 'р': 'r', 'с': 's', 'т': 't', 'у': 'u', 'ф': 'f', 'х': 'kh', 'ц': 'ts', 'ч': 'ch', 'ш': 'sh', 'щ': 'shch', 'ъ': '', 'ы': 'y', 'ь': '', 'э': 'e', 'ю': 'yu', 'я': 'ya', 'А': 'A', 'Б': 'B', 'В': 'V', 'Г': 'G', 'Д': 'D', 'Е': 'E', 'Ё': 'Yo', 'Ж': 'Zh', 'З': 'Z', 'И': 'I', 'Й': 'Y', 'К': 'K', 'Л': 'L', 'М': 'M', 'Н': 'N', 'О': 'O', 'П': 'P', 'Р': 'R', 'С': 'S', 'Т': 'T', 'У': 'U', 'Ф': 'F', 'Х': 'Kh', 'Ц': 'Ts', 'Ч': 'Ch', 'Ш': 'Sh', 'Щ': 'Shch', 'Ъ': '', 'Ы': 'Y', 'Ь': '', 'Э': 'E', 'Ю': 'Yu', 'Я': 'Ya', } def transliterate_text(text: str) -> str: """Transliterate Greek and Cyrillic characters to Latin.""" result = text # Apply Greek transliteration for greek, latin in GREEK_TO_LATIN.items(): result = result.replace(greek, latin) # Apply Cyrillic transliteration for cyrillic, latin in CYRILLIC_TO_LATIN.items(): result = result.replace(cyrillic, latin) return result def detect_eu_vat_numbers(text: str) -> list[tuple[str, str, int, int]]: """ Detect EU VAT numbers in text. Returns list of (vat_number, country_code, start, end) """ vat_matches = [] for country, pattern in EU_VAT_PATTERNS.items(): for match in re.finditer(pattern, text): vat_matches.append((match.group(), country, match.start(), match.end())) return vat_matches def has_non_latin_chars(text: str) -> bool: """Check if text contains non-Latin characters (Greek, Cyrillic, etc.)""" # Check for Greek (U+0370–U+03FF) or Cyrillic (U+0400–U+04FF) return bool(re.search(r'[\u0370-\u03FF\u0400-\u04FF]', text)) # ============================================================================ # LAYER 3: Context-Based Confidence Scoring # ============================================================================ # Context keywords that boost confidence when found near PII CONTEXT_KEYWORDS = { "SSN": ["ssn", "social security", "social sec", "ss#", "ss #", "ss number"], "CREDIT_CARD": ["card", "credit", "debit", "visa", "mastercard", "amex", "payment", "cc#", "card number"], "EMAIL": ["email", "e-mail", "mail", "contact", "reach", "@"], "PHONE_US": ["phone", "tel", "telephone", "mobile", "cell", "call", "fax", "contact"], "PHONE_INTL": ["phone", "tel", "telephone", "mobile", "cell", "call", "fax", "contact"], "IP_ADDRESS": ["ip", "address", "server", "host", "network"], "IP_DEFANGED": ["ip", "address", "server", "host", "network", "indicator", "ioc"], "PASSWORD_IN_URL": ["password", "pwd", "pass", "secret", "credential", "login", "auth"], "IBAN": ["iban", "bank", "account", "transfer", "payment", "wire"], "BANK_ACCOUNT": ["account", "bank", "routing", "aba", "deposit"], "AWS_ACCESS_KEY": ["aws", "amazon", "access", "key", "iam", "credential"], "GITHUB_TOKEN": ["github", "token", "api", "pat", "access"], "STRIPE_KEY": ["stripe", "payment", "api", "key", "publishable", "secret"], "MEDICARE_ID": ["medicare", "mbi", "beneficiary", "cms", "health"], "DATE_OF_BIRTH": ["dob", "birth", "born", "birthday", "age"], "GEO_COORDINATES": ["location", "coordinates", "lat", "lng", "latitude", "longitude", "gps"], "AUTH_CODE": ["auth", "code", "otp", "verification", "2fa", "mfa"], } # Anti-context: keywords that reduce confidence (false positive indicators) ANTI_CONTEXT_KEYWORDS = { "CREDIT_CARD": ["test", "example", "sample", "demo", "fake", "dummy", "xxxx", "part", "sku", "isbn", "serial", "order", "invoice", "ref", "batch", "processing"], "SSN": ["test", "example", "sample", "demo", "fake", "dummy", "000-00-0000", "123-45-6789", "part", "sku", "serial", "order", "ref", "batch", "processing", "validation"], "PHONE_US": ["test", "example", "555-", "000-000", "sample", "demo", "fake"], "IP_ADDRESS": ["example", "test", "localhost", "127.0.0.1", "0.0.0.0"], "EMAIL": ["example.com", "test.com", "example.org", "noreply", "sample", "demo"], "DATE_OF_BIRTH": ["batch", "created", "updated", "modified", "expires", "valid", "effective", "due", "ship", "order", "processing"], "IBAN": ["test", "example", "sample", "demo", "fake", "dummy", "validation"], } def is_future_date(date_str: str) -> bool: """Check if a date string represents a future date (not a valid DOB).""" from datetime import datetime, date current_year = datetime.now().year current_date = date.today() # Try to extract year from common date formats full_year_match = re.search(r'((?:19|20)\d{2})', date_str) if full_year_match: year = int(full_year_match.group(1)) # Future year = definitely not a birth date if year > current_year: return True # Current year - check if the full date is in the future if year == current_year: # Try to parse the full date # Common formats: MM-DD-YYYY, MM/DD/YYYY, YYYY-MM-DD date_patterns = [ (r'(\d{1,2})[-/](\d{1,2})[-/](\d{4})', 'MDY'), # MM-DD-YYYY (r'(\d{4})[-/](\d{1,2})[-/](\d{1,2})', 'YMD'), # YYYY-MM-DD ] for pattern, fmt in date_patterns: match = re.search(pattern, date_str) if match: try: if fmt == 'MDY': month, day, _ = int(match.group(1)), int(match.group(2)), int(match.group(3)) else: # YMD _, month, day = int(match.group(1)), int(match.group(2)), int(match.group(3)) parsed_date = date(year, month, day) if parsed_date > current_date: return True except ValueError: pass # Invalid date # Very old date (before 1900) = probably not a birth date if year < 1900: return True return False def apply_context_scoring(text: str, entities: list[PIIEntity], window: int = 100) -> list[PIIEntity]: """ Layer 3: Adjust confidence scores based on surrounding context. Args: text: Original text entities: List of detected PII entities window: Number of characters to look around each entity Returns: Entities with adjusted confidence scores """ text_lower = text.lower() adjusted_entities = [] for entity in entities: # Get context window around the entity start = max(0, entity.start - window) end = min(len(text), entity.end + window) context = text_lower[start:end] confidence_adjustment = 0.0 # Check for boosting keywords if entity.type in CONTEXT_KEYWORDS: for keyword in CONTEXT_KEYWORDS[entity.type]: if keyword in context: confidence_adjustment += 0.10 # Boost 10% per matching keyword (increased) break # Only apply once per type # Check for anti-context (false positive indicators) if entity.type in ANTI_CONTEXT_KEYWORDS: for keyword in ANTI_CONTEXT_KEYWORDS[entity.type]: if keyword in context: confidence_adjustment -= 0.30 # Reduce 30% for test/example data (increased penalty) break # LOGIC GATE: Date validation - future dates cannot be birth dates if entity.type == "DATE_OF_BIRTH": if is_future_date(entity.value): confidence_adjustment -= 0.50 # Heavily penalize future dates # Apply adjustment (cap between 0.3 and 0.99) new_confidence = max(0.3, min(0.99, entity.confidence + confidence_adjustment)) adjusted_entities.append(PIIEntity( type=entity.type, value=entity.value, start=entity.start, end=entity.end, confidence=new_confidence, masked_value=entity.masked_value )) return adjusted_entities # ============================================================================ # LAYER 4: Checksum Validation Functions # ============================================================================ def validate_iban(iban: str) -> bool: """ Validate IBAN using MOD-97 checksum algorithm (ISO 7064). Returns True if valid, False otherwise. """ # Remove spaces and convert to uppercase iban = re.sub(r'\s', '', iban).upper() # Check minimum length if len(iban) < 15: return False # Move first 4 chars to end rearranged = iban[4:] + iban[:4] # Convert letters to numbers (A=10, B=11, ..., Z=35) numeric = '' for char in rearranged: if char.isdigit(): numeric += char elif char.isalpha(): numeric += str(ord(char) - ord('A') + 10) else: return False # Invalid character # Perform MOD-97 check try: return int(numeric) % 97 == 1 except ValueError: return False def validate_luhn(number: str) -> bool: """Validate number using Luhn algorithm (credit cards, etc.)""" digits = [int(d) for d in re.sub(r'\D', '', number)] if len(digits) < 8: return False checksum = 0 for i, d in enumerate(reversed(digits)): if i % 2 == 1: d *= 2 if d > 9: d -= 9 checksum += d return checksum % 10 == 0 # ============================================================================ # Multi-Layer Detection Pipeline # ============================================================================ def detect_pii_multilayer(text: str, config: Optional[ScanConfig] = None, coordinates_only: bool = False) -> list[PIIEntity]: """ Multi-layer PII detection pipeline (v1.1): - Layer 2.0: Pre-processing (Unicode NFKC + HTML entity decode) - Layer 1: Standard regex matching (high-speed DuckDB-compatible) - Layer 2: Text normalization to catch obfuscated PII - Layer 2.5: JSON blob extraction and deep scanning - Layer 2.6: Base64 auto-decoding (with recursive decode + JSON scan) - Layer 2.7: Spelled-out number detection (NLP-lite) - Layer 2.8: Non-Latin character support (Greek, Cyrillic) - Layer 3: Context-based confidence adjustment - Layer 4: Checksum verification (Luhn, IBAN MOD-97) - Layer 5: False positive filtering """ if config is None: config = ScanConfig() # Layer 2.0: Pre-processing - Unicode NFKC + HTML entity decode (v1.1) preprocessed_text, preprocessing_log = preprocess_text(text) # For coordinates_only mode: if preprocessing changed text length, # we need to use original text for detection to get accurate coordinates if coordinates_only and len(preprocessed_text) != len(text): # Text length changed during preprocessing - use original text for accurate coords detection_text = text else: detection_text = preprocessed_text # Layer 1: Standard detection entities = detect_pii_standard(detection_text, config) # Layer 2: Normalize text and re-scan for hidden PII normalized_text, transform_info = normalize_text(text) if normalized_text != text: # Only re-scan if normalization changed something normalized_entities = detect_pii_standard(normalized_text, config) # Mark normalized entities and add unique ones existing_values = {e.value.lower() for e in entities} for entity in normalized_entities: if entity.value.lower() not in existing_values: # This is a newly discovered entity from normalization # Reduce confidence slightly as it required normalization entity_dict = entity.model_dump() entity_dict['confidence'] = entity.confidence * 0.95 entity_dict['type'] = f"{entity.type}_NORMALIZED" entities.append(PIIEntity(**entity_dict)) existing_values.add(entity.value.lower()) # Layer 2.5: JSON Blob Extraction & Deep Scanning existing_values = {e.value.lower() for e in entities} json_blobs = extract_json_strings(text) for json_str, start_pos, end_pos in json_blobs: extracted_values = deep_scan_json(json_str) for extracted in extracted_values: # Scan extracted JSON values for PII json_entities = detect_pii_standard(extracted, config) for entity in json_entities: if entity.value.lower() not in existing_values: # Mark as found within JSON, adjust positions to original text entities.append(PIIEntity( type=f"{entity.type}_IN_JSON", value=entity.value, start=start_pos, # Use JSON blob position end=end_pos, confidence=entity.confidence * 0.90, # Slight reduction for nested detection masked_value=entity.masked_value )) existing_values.add(entity.value.lower()) # Layer 2.6: Base64 Auto-Decoding (v1.1 - recursive + JSON scan) decoded_b64_strings = decode_base64_strings(preprocessed_text) for original_b64, decoded_text, start_pos, end_pos, depth in decoded_b64_strings: # Scan decoded base64 content for PII/secrets b64_entities = detect_pii_standard(decoded_text, config) # Also scan decoded JSON content if present if decoded_text.strip().startswith('{') or decoded_text.strip().startswith('['): try: json_values = deep_scan_json(decoded_text) for json_val in json_values: json_entities = detect_pii_standard(json_val, config) for je in json_entities: if je.value.lower() not in existing_values: entities.append(PIIEntity( type=f"{je.type}_BASE64_JSON", value=je.value, start=start_pos, end=end_pos, confidence=je.confidence * 0.80, # Lower for double-nested masked_value=je.masked_value )) existing_values.add(je.value.lower()) except Exception: pass for entity in b64_entities: if entity.value.lower() not in existing_values: # Mark as found in base64 encoded content depth_suffix = f"_DEPTH{depth}" if depth > 1 else "" entities.append(PIIEntity( type=f"{entity.type}_BASE64{depth_suffix}", value=entity.value, start=start_pos, end=end_pos, confidence=entity.confidence * (0.85 ** depth), # Reduce more for deeper encoding masked_value=entity.masked_value )) existing_values.add(entity.value.lower()) # Also flag the base64 string itself if it decoded to sensitive content if b64_entities: entities.append(PIIEntity( type="BASE64_ENCODED_SECRET", value=original_b64[:50] + "..." if len(original_b64) > 50 else original_b64, start=start_pos, end=end_pos, confidence=0.85, masked_value="[BASE64 ENCODED CONTENT REDACTED]" )) # Layer 2.7: Spelled-Out Number Detection converted_text, conversions = convert_spelled_numbers(text) if conversions: # Scan the converted text for SSN patterns for original, converted, start_pos, end_pos in conversions: # Check if converted looks like an SSN (XXX-XX-XXXX format) ssn_match = re.match(r'^(\d{3})-(\d{2})-(\d{4})$', converted) if ssn_match: entities.append(PIIEntity( type="SSN_SPELLED_OUT", value=converted, start=start_pos, end=end_pos, confidence=0.90, # High confidence for spelled-out SSN masked_value=f"***-**-{ssn_match.group(3)}" )) # Layer 2.8: Non-Latin Character Support if has_non_latin_chars(text): # Transliterate and re-scan transliterated = transliterate_text(text) if transliterated != text: trans_entities = detect_pii_standard(transliterated, config) for entity in trans_entities: if entity.value.lower() not in existing_values: entities.append(PIIEntity( type=f"{entity.type}_TRANSLITERATED", value=entity.value, start=entity.start, end=entity.end, confidence=entity.confidence * 0.90, masked_value=entity.masked_value )) existing_values.add(entity.value.lower()) # Layer 3: Apply context-based scoring entities = apply_context_scoring(text, entities) # Layer 4: Apply checksum verification for applicable types verified_entities = [] for entity in entities: if entity.type == "IBAN": if validate_iban(entity.value): # Valid IBAN - boost confidence verified_entities.append(PIIEntity( type=entity.type, value=entity.value, start=entity.start, end=entity.end, confidence=min(0.99, entity.confidence + 0.10), masked_value=entity.masked_value )) else: # Invalid checksum - cap at 30% ceiling (consistent across all types) verified_entities.append(PIIEntity( type="POSSIBLE_IBAN_PATTERN", value=entity.value, start=entity.start, end=entity.end, confidence=0.30, # Consistent 30% ceiling for invalid checksums masked_value=entity.masked_value )) elif entity.type == "UUID": # UUIDs are typically trace IDs, not PII - cap at 40% max confidence verified_entities.append(PIIEntity( type=entity.type, value=entity.value, start=entity.start, end=entity.end, confidence=min(0.40, entity.confidence), # Cap UUID confidence at 40% masked_value=entity.masked_value )) elif entity.type == "CREDIT_CARD" or entity.type == "CREDIT_CARD_NORMALIZED": # LOGIC GATE: Apply Luhn validation - reclassify invalid credit cards if validate_luhn(entity.value): # Valid credit card - boost confidence verified_entities.append(PIIEntity( type=entity.type, value=entity.value, start=entity.start, end=entity.end, confidence=min(0.99, entity.confidence + 0.10), masked_value=entity.masked_value )) else: # Invalid Luhn checksum - reclassify as POSSIBLE_CARD_PATTERN # Check if context suggests it's test/example data - if so, discard context_start = max(0, entity.start - 50) context_end = min(len(text), entity.end + 50) context = text[context_start:context_end].lower() # Discard if context clearly indicates test/example data if any(kw in context for kw in ["test", "example", "sample", "demo", "fake", "dummy"]): # Skip this entity entirely - it's test data continue # Reclassify as POSSIBLE_CARD_PATTERN with 30% ceiling (consistent) verified_entities.append(PIIEntity( type="POSSIBLE_CARD_PATTERN", value=entity.value, start=entity.start, end=entity.end, confidence=0.30, # Consistent 30% ceiling for invalid checksums masked_value=mask_value(entity.value, "CREDIT_CARD") )) else: verified_entities.append(entity) # Deduplicate by position AND by overlapping normalized variants # Step 1: Group by exact position deduplicated = {} for entity in verified_entities: key = (entity.start, entity.end) if key not in deduplicated or entity.confidence > deduplicated[key].confidence: deduplicated[key] = entity # Step 2: Merge overlapping entities of same base type (e.g., PHONE_US vs PHONE_US_NORMALIZED) # This prevents showing both "PHONE_US" and "PHONE_US_NORMALIZED" for the same number def get_base_type(pii_type: str) -> str: """Get base type without suffixes like _NORMALIZED, _IN_JSON, _BASE64, etc.""" suffixes = ['_NORMALIZED', '_IN_JSON', '_BASE64', '_TRANSLITERATED', '_SPELLED_OUT'] for suffix in suffixes: if pii_type.endswith(suffix): return pii_type[:-len(suffix)] return pii_type def normalize_value_for_comparison(value: str, pii_type: str) -> str: """Normalize a value for duplicate comparison (strip formatting).""" base = get_base_type(pii_type) if base in ['PHONE_US', 'PHONE_INTL', 'SSN', 'CREDIT_CARD']: # For these types, compare just the digits return re.sub(r'\D', '', value) return value.lower().strip() def overlaps(e1, e2, threshold: int = 5) -> bool: """Check if two entities overlap or are within threshold characters.""" return not (e1.end + threshold < e2.start or e2.end + threshold < e1.start) def same_normalized_value(e1, e2) -> bool: """Check if two entities represent the same underlying value.""" return normalize_value_for_comparison(e1.value, e1.type) == normalize_value_for_comparison(e2.value, e2.type) # Sort by start position for overlap detection sorted_entities = sorted(deduplicated.values(), key=lambda x: x.start) merged = [] for entity in sorted_entities: base_type = get_base_type(entity.type) merged_with_existing = False # Check if this overlaps with any existing entity of same base type # OR has the same normalized value (handles PHONE_US vs PHONE_US_NORMALIZED with different positions) for i, existing in enumerate(merged): existing_base = get_base_type(existing.type) if base_type == existing_base: # Merge if overlapping OR same value (for _NORMALIZED variants) if overlaps(entity, existing) or same_normalized_value(entity, existing): # Keep the one with higher confidence if entity.confidence > existing.confidence: merged[i] = entity merged_with_existing = True break if not merged_with_existing: merged.append(entity) deduplicated = {(e.start, e.end): e for e in merged} # Layer 5: False positive filtering (v1.1) filtered_entities = [] for entity in deduplicated.values(): # Get context for false positive check context_start = max(0, entity.start - 50) context_end = min(len(detection_text), entity.end + 50) context = detection_text[context_start:context_end] # Skip if detected as false positive if is_false_positive(entity.value, entity.type, context): continue filtered_entities.append(entity) # Sort by position result = sorted(filtered_entities, key=lambda x: x.start) return result def detect_pii_standard(text: str, config: Optional[ScanConfig] = None) -> list[PIIEntity]: """Layer 1: Standard regex-based PII detection""" if config is None: config = ScanConfig() entities = [] # Map config to pattern types type_mapping = { "EMAIL": config.detect_emails, # Phone numbers (US and International) "PHONE_US": config.detect_phones, "PHONE_INTL": config.detect_phones, # Identity documents "SSN": config.detect_ssn, "MEDICARE_ID": config.detect_ssn, # Group with SSN as it's US health ID "GERMAN_ID": True, # Always detect national IDs "CREDIT_CARD": config.detect_credit_cards, "IP_ADDRESS": config.detect_ip_addresses, "DATE_OF_BIRTH": config.detect_dates, "ZIP_CODE": config.detect_zip_codes, # Addresses (US and International) "US_ADDRESS": config.detect_addresses, "UK_POSTCODE": config.detect_addresses, "UK_ADDRESS": config.detect_addresses, "EU_ADDRESS": config.detect_addresses, "INTERNATIONAL_ADDRESS": config.detect_addresses, # Financial "IBAN": config.detect_iban, "BANK_ACCOUNT": config.detect_iban, "ROUTING_NUMBER": config.detect_iban, "SWIFT_BIC": config.detect_iban, # Medical IDs (always on - high sensitivity) "MEDICAL_RECORD_NUMBER": True, "NPI_NUMBER": True, "DEA_NUMBER": True, # Secrets - AWS "AWS_ACCESS_KEY": config.detect_secrets, "AWS_SECRET_KEY": config.detect_secrets, # Secrets - GitHub "GITHUB_TOKEN": config.detect_secrets, "GITHUB_TOKEN_CLASSIC": config.detect_secrets, # Secrets - GCP/Azure "GCP_SERVICE_ACCOUNT": config.detect_secrets, "GCP_PRIVATE_KEY_ID": config.detect_secrets, "AZURE_CLIENT_SECRET": config.detect_secrets, "AZURE_CONNECTION_STRING": config.detect_secrets, "AZURE_SAS_TOKEN": config.detect_secrets, # Secrets - Other "SLACK_TOKEN": config.detect_secrets, "GENERIC_API_KEY": config.detect_secrets, "PASSWORD_IN_URL": config.detect_secrets, "PRIVATE_KEY": config.detect_secrets, "JWT_TOKEN": config.detect_secrets, "STRIPE_KEY": config.detect_secrets, "GOOGLE_API_KEY": config.detect_secrets, "API_KEY_IN_URL": config.detect_secrets, "AUTH_CODE": config.detect_secrets, # Evasion-resistant patterns "EMAIL_OBFUSCATED": config.detect_emails, "IP_DEFANGED": config.detect_ip_addresses, # Identifiers "UUID": True, # Always detect UUIDs/trace IDs "GEO_COORDINATES": config.detect_addresses, # Group with addresses # ========================================================================= # NEW v1.1 PATTERNS # ========================================================================= # International IDs "UK_NATIONAL_INSURANCE": True, "CANADIAN_SIN": True, "INDIA_AADHAAR": True, "INDIA_PAN": True, "AUSTRALIA_TFN": True, "BRAZIL_CPF": True, "MEXICO_CURP": True, "SOUTH_AFRICA_ID": True, # Additional cloud tokens "DISCORD_TOKEN": config.detect_secrets, "DISCORD_WEBHOOK": config.detect_secrets, "TWILIO_API_KEY": config.detect_secrets, "TWILIO_AUTH_TOKEN": config.detect_secrets, "SENDGRID_API_KEY": config.detect_secrets, "OPENAI_API_KEY": config.detect_secrets, "ANTHROPIC_API_KEY": config.detect_secrets, "MAILCHIMP_API_KEY": config.detect_secrets, "MAILGUN_API_KEY": config.detect_secrets, "HEROKU_API_KEY": config.detect_secrets, "SHOPIFY_ACCESS_TOKEN": config.detect_secrets, "SHOPIFY_SHARED_SECRET": config.detect_secrets, "NPM_TOKEN": config.detect_secrets, "PYPI_TOKEN": config.detect_secrets, "DOCKER_AUTH": config.detect_secrets, # Crypto addresses "BITCOIN_ADDRESS": config.detect_iban, # Group with financial "ETHEREUM_ADDRESS": config.detect_iban, "MONERO_ADDRESS": config.detect_iban, # Financial identifiers "CUSIP": config.detect_iban, "ISIN": config.detect_iban, "SEDOL": config.detect_iban, } for pii_type, pattern_info in PII_PATTERNS.items(): # Check if this type should be detected if pii_type in type_mapping and not type_mapping[pii_type]: continue pattern = pattern_info["pattern"] for match in re.finditer(pattern, text, re.IGNORECASE): value = match.group() # Calculate confidence based on pattern specificity confidence = 0.85 if pii_type == "EMAIL": confidence = 0.95 elif pii_type == "SSN": # Pattern now requires dashes, so high confidence confidence = 0.95 elif pii_type in ["PHONE_US", "PHONE_INTL"]: confidence = 0.85 elif pii_type == "MEDICARE_ID": confidence = 0.90 elif pii_type == "GERMAN_ID": confidence = 0.85 # Requires context (ausweis keyword) elif pii_type == "CREDIT_CARD": # Validate with Luhn algorithm if validate_credit_card(value): confidence = 0.95 else: confidence = 0.60 elif pii_type == "ZIP_CODE": confidence = 0.70 # Could be any 5-digit number elif pii_type in ["US_ADDRESS", "UK_ADDRESS", "EU_ADDRESS", "INTERNATIONAL_ADDRESS"]: confidence = 0.80 elif pii_type == "UK_POSTCODE": confidence = 0.85 elif pii_type == "IBAN": # Validate IBAN length and format clean_iban = re.sub(r'\s', '', value) if len(clean_iban) >= 15 and len(clean_iban) <= 34: confidence = 0.95 else: confidence = 0.70 elif pii_type in ["BANK_ACCOUNT", "ROUTING_NUMBER"]: confidence = 0.85 elif pii_type == "SWIFT_BIC": confidence = 0.80 # Could be other 8/11 char codes # Cloud provider keys elif pii_type in ["AWS_ACCESS_KEY", "GITHUB_TOKEN", "GITHUB_TOKEN_CLASSIC", "STRIPE_KEY", "GOOGLE_API_KEY"]: confidence = 0.95 # Very specific patterns elif pii_type in ["GCP_SERVICE_ACCOUNT", "GCP_PRIVATE_KEY_ID"]: confidence = 0.95 elif pii_type in ["AZURE_CONNECTION_STRING", "AZURE_SAS_TOKEN"]: confidence = 0.95 elif pii_type == "AZURE_CLIENT_SECRET": confidence = 0.75 # Less specific pattern elif pii_type in ["AWS_SECRET_KEY", "JWT_TOKEN"]: confidence = 0.85 elif pii_type in ["PASSWORD_IN_URL", "GENERIC_API_KEY"]: confidence = 0.80 elif pii_type == "PRIVATE_KEY": confidence = 0.99 # Medical IDs elif pii_type in ["MEDICAL_RECORD_NUMBER", "NPI_NUMBER"]: confidence = 0.90 elif pii_type == "DEA_NUMBER": confidence = 0.85 # Evasion-resistant patterns elif pii_type == "EMAIL_OBFUSCATED": confidence = 0.90 # High - intentional obfuscation is suspicious elif pii_type == "IP_DEFANGED": confidence = 0.95 # Very high - defanging is deliberate elif pii_type == "API_KEY_IN_URL": confidence = 0.85 elif pii_type == "AUTH_CODE": confidence = 0.80 elif pii_type == "UUID": confidence = 0.70 # Could be any GUID elif pii_type == "GEO_COORDINATES": confidence = 0.85 # Create masked value masked_value = mask_value(value, pii_type) entities.append(PIIEntity( type=pii_type, value=value, start=match.start(), end=match.end(), confidence=confidence, masked_value=masked_value )) # Sort by position entities.sort(key=lambda x: x.start) return entities def validate_credit_card(number: str) -> bool: """Validate credit card using Luhn algorithm""" digits = [int(d) for d in re.sub(r'\D', '', number)] if len(digits) < 13 or len(digits) > 19: return False checksum = 0 for i, d in enumerate(reversed(digits)): if i % 2 == 1: d *= 2 if d > 9: d -= 9 checksum += d return checksum % 10 == 0 def mask_value(value: str, pii_type: str) -> str: """Mask a PII value based on its type""" # Handle various suffixes by using base type for masking base_type = pii_type for suffix in ["_NORMALIZED", "_IN_JSON", "_BASE64", "_TRANSLITERATED", "_SPELLED_OUT"]: base_type = base_type.replace(suffix, "") if base_type == "EMAIL": parts = value.split('@') if len(parts) == 2: return parts[0][:2] + '***@' + parts[1] return '***@***' elif base_type in ["PHONE_US", "PHONE_INTL"]: clean = re.sub(r'\D', '', value) if len(clean) >= 4: return '***-***-' + clean[-4:] return '***-***-****' elif base_type == "MEDICARE_ID": return '[MEDICARE ID REDACTED]' elif base_type == "GERMAN_ID": return '[GERMAN ID REDACTED]' elif base_type == "SSN": return '***-**-' + value[-4:] if len(value) >= 4 else '***-**-****' elif base_type == "CREDIT_CARD": clean = re.sub(r'\D', '', value) return '****-****-****-' + clean[-4:] if len(clean) >= 4 else '****-****-****-****' elif base_type == "IP_ADDRESS": parts = value.split('.') if len(parts) == 4: return f'{parts[0]}.***.***.*' return '***.***.***.***' elif base_type == "DATE_OF_BIRTH": return '**/**/' + value[-4:] if len(value) >= 4 else '**/**/****' elif base_type == "IBAN": # Show country code and last 4 chars if len(value) >= 6: return value[:2] + '**' + '*' * (len(value) - 6) + value[-4:] return '*' * len(value) elif base_type in ["US_ADDRESS", "UK_ADDRESS", "EU_ADDRESS", "INTERNATIONAL_ADDRESS"]: # Mask the street number and name return '[ADDRESS REDACTED]' elif base_type == "UK_POSTCODE": return value[:2] + '** ***' if len(value) >= 2 else '[POSTCODE REDACTED]' elif base_type in ["BANK_ACCOUNT", "ROUTING_NUMBER"]: # Show last 4 digits clean = re.sub(r'\D', '', value) return '******' + clean[-4:] if len(clean) >= 4 else '*' * len(value) elif base_type == "SWIFT_BIC": return value[:4] + '****' if len(value) >= 4 else '*' * len(value) elif base_type in ["AWS_ACCESS_KEY", "AWS_SECRET_KEY"]: return value[:4] + '*' * (len(value) - 8) + value[-4:] if len(value) > 8 else '*' * len(value) elif base_type in ["GITHUB_TOKEN", "GITHUB_TOKEN_CLASSIC"]: return value[:4] + '*' * (len(value) - 8) + value[-4:] if len(value) > 8 else '*' * len(value) elif base_type in ["STRIPE_KEY", "GOOGLE_API_KEY"]: return value[:7] + '*' * (len(value) - 11) + value[-4:] if len(value) > 11 else '*' * len(value) elif base_type == "PASSWORD_IN_URL": return '[PASSWORD REDACTED]' elif base_type == "PRIVATE_KEY": return '[PRIVATE KEY REDACTED]' elif base_type == "JWT_TOKEN": return value[:10] + '...[JWT REDACTED]...' + value[-10:] if len(value) > 20 else '[JWT REDACTED]' elif base_type in ["SLACK_TOKEN", "GENERIC_API_KEY"]: if len(value) > 8: return value[:4] + '*' * (len(value) - 8) + value[-4:] return '*' * len(value) # GCP/Azure cloud credentials elif base_type == "GCP_SERVICE_ACCOUNT": parts = value.split('@') if len(parts) == 2: return parts[0][:3] + '***@' + parts[1] return '[GCP SERVICE ACCOUNT REDACTED]' elif base_type == "GCP_PRIVATE_KEY_ID": return '[GCP PRIVATE KEY ID REDACTED]' elif base_type in ["AZURE_CLIENT_SECRET", "AZURE_CONNECTION_STRING", "AZURE_SAS_TOKEN"]: return '[AZURE CREDENTIAL REDACTED]' # Medical IDs elif base_type in ["MEDICAL_RECORD_NUMBER", "NPI_NUMBER", "DEA_NUMBER"]: return '[MEDICAL ID REDACTED]' # Evasion-resistant patterns elif base_type == "EMAIL_OBFUSCATED": return '[OBFUSCATED EMAIL REDACTED]' elif base_type == "IP_DEFANGED": return '[DEFANGED IP REDACTED]' elif base_type == "API_KEY_IN_URL": return '[API KEY IN URL REDACTED]' elif base_type == "AUTH_CODE": return '[AUTH CODE REDACTED]' elif base_type == "UUID": return value[:8] + '-****-****-****-' + value[-12:] if len(value) >= 36 else '[UUID REDACTED]' elif base_type == "GEO_COORDINATES": return '[COORDINATES REDACTED]' # New advanced detection types elif base_type == "EU_VAT_NUMBER": return value[:2] + '***' + value[-4:] if len(value) > 6 else '[VAT REDACTED]' elif base_type == "SSN_SPELLED_OUT": return '***-**-' + value[-4:] if len(value) >= 4 else '***-**-****' elif base_type == "BASE64_ENCODED_SECRET": return '[BASE64 SECRET REDACTED]' else: # Generic masking - show first and last char if len(value) > 2: return value[0] + '*' * (len(value) - 2) + value[-1] return '*' * len(value) def calculate_risk_level(entities: list[PIIEntity]) -> tuple[str, int]: """Calculate risk level based on PII found""" if not entities: return "LOW", 0 # Weight by sensitivity sensitivity_weights = { # High sensitivity - identity theft risk "SSN": 100, "CREDIT_CARD": 95, "DRIVERS_LICENSE": 85, "MEDICARE_ID": 90, # Health insurance ID "GERMAN_ID": 90, # National ID # Financial "IBAN": 85, "BANK_ACCOUNT": 80, "ROUTING_NUMBER": 75, "SWIFT_BIC": 60, # Secrets - security breach risk (AWS) "AWS_ACCESS_KEY": 100, "AWS_SECRET_KEY": 100, "PRIVATE_KEY": 100, # Secrets - GitHub "GITHUB_TOKEN": 95, "GITHUB_TOKEN_CLASSIC": 95, # Secrets - GCP/Azure "GCP_SERVICE_ACCOUNT": 95, "GCP_PRIVATE_KEY_ID": 100, "AZURE_CLIENT_SECRET": 95, "AZURE_CONNECTION_STRING": 100, "AZURE_SAS_TOKEN": 90, # Secrets - Other "STRIPE_KEY": 95, "SLACK_TOKEN": 90, "GOOGLE_API_KEY": 85, "JWT_TOKEN": 85, "PASSWORD_IN_URL": 90, "GENERIC_API_KEY": 80, # Medical IDs - HIPAA compliance "MEDICAL_RECORD_NUMBER": 90, "NPI_NUMBER": 85, "DEA_NUMBER": 80, # Phone numbers "PHONE_US": 35, "PHONE_INTL": 35, # Medium sensitivity - Addresses "DATE_OF_BIRTH": 50, "US_ADDRESS": 55, "UK_ADDRESS": 55, "EU_ADDRESS": 55, "INTERNATIONAL_ADDRESS": 50, "UK_POSTCODE": 40, # Lower sensitivity "EMAIL": 40, "IP_ADDRESS": 30, "ZIP_CODE": 20, # Possible patterns (failed validation but still flagged) "POSSIBLE_CARD_PATTERN": 25, # Low - failed Luhn, likely not a real card # Advanced detection types "EU_VAT_NUMBER": 70, "SSN_SPELLED_OUT": 95, # High - intentional evasion "BASE64_ENCODED_SECRET": 90, # High - deliberately hidden # JSON embedded types inherit from base type + 5 # Base64 decoded types inherit from base type + 5 # Transliterated types inherit from base type } total_score = 0 for entity in entities: # Handle various suffixes by looking up base type base_type = entity.type for suffix in ["_NORMALIZED", "_IN_JSON", "_BASE64", "_TRANSLITERATED", "_SPELLED_OUT"]: base_type = base_type.replace(suffix, "") # Get weight - add bonus for encoded/hidden PII (evasion attempts) weight = sensitivity_weights.get(base_type, sensitivity_weights.get(entity.type, 25)) # Bonus for evasion techniques (intentionally hidden PII is more suspicious) if "_IN_JSON" in entity.type or "_BASE64" in entity.type: weight = min(100, weight + 10) # Cap at 100 total_score += weight * entity.confidence # Normalize score (0-100) risk_score = min(100, int(total_score / max(1, len(entities)) + len(entities) * 5)) if risk_score >= 70: return "CRITICAL", risk_score elif risk_score >= 50: return "HIGH", risk_score elif risk_score >= 30: return "MEDIUM", risk_score else: return "LOW", risk_score def redact_text(text: str, entities: list[PIIEntity], mode: str = "mask") -> str: """Redact PII from text""" if not entities: return text # Sort by position in reverse to not mess up indices sorted_entities = sorted(entities, key=lambda x: x.start, reverse=True) result = text for entity in sorted_entities: if mode == "mask": replacement = entity.masked_value elif mode == "remove": replacement = "[REDACTED]" elif mode == "type": replacement = f"[{entity.type}]" else: replacement = entity.masked_value result = result[:entity.start] + replacement + result[entity.end:] return result # Alias for backwards compatibility detect_pii = detect_pii_multilayer @router.post("/scan-text") async def scan_text( text: str = Form(...), detect_emails: bool = Form(True), detect_phones: bool = Form(True), detect_ssn: bool = Form(True), detect_credit_cards: bool = Form(True), detect_ip_addresses: bool = Form(True), detect_dates: bool = Form(True), detect_addresses: bool = Form(True), detect_iban: bool = Form(True), detect_secrets: bool = Form(True), coordinates_only: bool = Form(False) ): """ Scan text for PII and secrets using multi-layer detection. Security Options: - coordinates_only: If True, returns only PII positions (start, end, type, confidence) without the actual values. The frontend can then perform client-side masking using these coordinates, ensuring the backend never "sees" the raw PII. This is useful for ultra-sensitive data where even the backend shouldn't have access to actual PII values. """ # Normalize CRLF to LF when returning coordinates for client-side redaction # Browser FormData converts LF->CRLF per RFC 7578, but frontend uses LF text if coordinates_only: text = text.replace("\r\n", "\n") config = ScanConfig( detect_emails=detect_emails, detect_phones=detect_phones, detect_ssn=detect_ssn, detect_credit_cards=detect_credit_cards, detect_ip_addresses=detect_ip_addresses, detect_dates=detect_dates, detect_addresses=detect_addresses, detect_iban=detect_iban, detect_secrets=detect_secrets ) entities = detect_pii_multilayer(text, config, coordinates_only=coordinates_only) # Group by type entities_by_type = {} for entity in entities: if entity.type not in entities_by_type: entities_by_type[entity.type] = 0 entities_by_type[entity.type] += 1 risk_level, risk_score = calculate_risk_level(entities) # Coordinates-only mode: Strip actual PII values for client-side redaction if coordinates_only: # Return only positions and types - no actual PII values # Frontend will use these coordinates to mask text client-side entities_coords = [ { "type": e.type, "start": e.start, "end": e.end, "confidence": e.confidence, "length": e.end - e.start # So frontend knows how many chars to mask } for e in entities ] return { "total_entities": len(entities), "entities_by_type": entities_by_type, "entities": entities_coords, # Coordinates only - no values! "risk_level": risk_level, "risk_score": risk_score, "redacted_preview": None, # Frontend handles redaction "coordinates_only": True } # Standard mode: Return full entity details including values redacted_preview = redact_text(text, entities, "mask") return { "total_entities": len(entities), "entities_by_type": entities_by_type, "entities": [e.model_dump() for e in entities], "risk_level": risk_level, "risk_score": risk_score, "redacted_preview": redacted_preview, "coordinates_only": False } @router.post("/scan-file") async def scan_file(file: UploadFile = File(...)): """Scan a file for PII (CSV, TXT, JSON)""" try: content = await file.read() filename = file.filename.lower() if filename.endswith('.csv'): # Use duckdb to read CSV and extract all text with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp: tmp.write(content) tmp_path = tmp.name try: con = duckdb.connect(':memory:') result = con.execute(f"SELECT * FROM read_csv_auto('{tmp_path}')").fetchall() columns = con.execute(f"DESCRIBE SELECT * FROM read_csv_auto('{tmp_path}')").fetchall() all_text = ' '.join(str(cell) for row in result for cell in row if cell is not None) con.close() finally: os.unlink(tmp_path) elif filename.endswith('.json'): # Use duckdb to read JSON with tempfile.NamedTemporaryFile(delete=False, suffix='.json') as tmp: tmp.write(content) tmp_path = tmp.name try: con = duckdb.connect(':memory:') result = con.execute(f"SELECT * FROM read_json_auto('{tmp_path}')").fetchall() all_text = ' '.join(str(cell) for row in result for cell in row if cell is not None) con.close() finally: os.unlink(tmp_path) else: # Treat as text file all_text = content.decode('utf-8', errors='ignore') entities = detect_pii(all_text) # Group by type entities_by_type = {} for entity in entities: if entity.type not in entities_by_type: entities_by_type[entity.type] = 0 entities_by_type[entity.type] += 1 risk_level, risk_score = calculate_risk_level(entities) # Limit preview length preview_text = all_text[:2000] if len(all_text) > 2000 else all_text redacted_preview = redact_text(preview_text, [e for e in entities if e.end <= 2000], "mask") return { "filename": file.filename, "total_entities": len(entities), "entities_by_type": entities_by_type, "entities": [e.model_dump() for e in entities[:100]], # Limit to first 100 "risk_level": risk_level, "risk_score": risk_score, "redacted_preview": redacted_preview } except Exception as e: raise HTTPException(status_code=400, detail=f"Could not process file: {str(e)}") @router.post("/scan-dataframe") async def scan_dataframe(file: UploadFile = File(...)): """Scan a CSV/Excel file and analyze each column for PII""" try: content = await file.read() filename = file.filename.lower() # Determine file extension and create temp file if filename.endswith('.csv'): suffix = '.csv' elif filename.endswith(('.xls', '.xlsx')): suffix = '.xlsx' if filename.endswith('.xlsx') else '.xls' else: raise HTTPException(status_code=400, detail="Unsupported file format. Use CSV or Excel.") with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: tmp.write(content) tmp_path = tmp.name try: con = duckdb.connect(':memory:') # Read file based on type if suffix == '.csv': # Get column names and data schema = con.execute(f"DESCRIBE SELECT * FROM read_csv_auto('{tmp_path}')").fetchall() columns = [col[0] for col in schema] data = con.execute(f"SELECT * FROM read_csv_auto('{tmp_path}')").fetchall() else: # For Excel files, duckdb needs the spatial extension or we use a workaround # DuckDB can read xlsx via read_xlsx (requires extension) - fall back to CSV-like approach try: schema = con.execute(f"DESCRIBE SELECT * FROM st_read('{tmp_path}')").fetchall() columns = [col[0] for col in schema] data = con.execute(f"SELECT * FROM st_read('{tmp_path}')").fetchall() except: # If spatial extension not available, try xlsx reader schema = con.execute(f"DESCRIBE SELECT * FROM read_xlsx('{tmp_path}')").fetchall() columns = [col[0] for col in schema] data = con.execute(f"SELECT * FROM read_xlsx('{tmp_path}')").fetchall() total_rows = len(data) total_columns = len(columns) column_results = [] total_pii = 0 for col_idx, col in enumerate(columns): # Extract all values for this column col_values = [str(row[col_idx]) for row in data if row[col_idx] is not None] col_text = ' '.join(col_values) entities = detect_pii(col_text) # Group by type entities_by_type = {} for entity in entities: if entity.type not in entities_by_type: entities_by_type[entity.type] = 0 entities_by_type[entity.type] += 1 pii_count = len(entities) total_pii += pii_count column_results.append({ "column": col, "pii_count": pii_count, "pii_types": entities_by_type, "sample_pii": [e.model_dump() for e in entities[:3]] if entities else [] }) con.close() finally: os.unlink(tmp_path) # Sort by PII count descending column_results.sort(key=lambda x: x["pii_count"], reverse=True) risk_level = "CRITICAL" if total_pii > 50 else "HIGH" if total_pii > 20 else "MEDIUM" if total_pii > 5 else "LOW" return { "filename": file.filename, "total_rows": total_rows, "total_columns": total_columns, "total_pii_found": total_pii, "risk_level": risk_level, "columns_with_pii": len([c for c in column_results if c["pii_count"] > 0]), "column_analysis": column_results } except Exception as e: raise HTTPException(status_code=400, detail=f"Could not process file: {str(e)}") @router.post("/redact") async def redact_text_endpoint( text: str = Form(...), mode: str = Form("mask") ): """Redact PII from text""" entities = detect_pii(text) redacted = redact_text(text, entities, mode) return { "original_length": len(text), "redacted_length": len(redacted), "entities_redacted": len(entities), "redacted_text": redacted } @router.get("/entity-types") async def list_entity_types(): """List supported PII entity types""" return { "entity_types": [ {"type": key, "description": value["description"]} for key, value in PII_PATTERNS.items() ] }