ai-tools-suite/backend/routers/privacy.py
2025-12-27 15:33:06 +00:00

2476 lines
102 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Privacy Scanner Router - PII Detection and Redaction"""
from fastapi import APIRouter, UploadFile, File, HTTPException, Form
from pydantic import BaseModel
from typing import Optional
import re
import io
import tempfile
import os
import duckdb
import json
import base64
import unicodedata
import html
router = APIRouter()
# Valid TLDs for email detection - prevents false positives on "at 7pm", "at cvs", etc.
VALID_TLDS = {
'com', 'org', 'net', 'edu', 'gov', 'io', 'co', 'ai', 'dev',
'app', 'xyz', 'info', 'biz', 'me', 'uk', 'de', 'fr', 'ca',
'au', 'in', 'jp', 'ru', 'br', 'it', 'nl', 'es', 'ch', 'se',
'no', 'fi', 'dk', 'at', 'be', 'pl', 'pt', 'ie', 'nz', 'sg',
'hk', 'kr', 'mx', 'ar', 'cl', 'za', 'us', 'mil', 'int', 'eu',
'asia', 'mobi', 'name', 'pro', 'aero', 'coop', 'museum', 'jobs',
'travel', 'xxx', 'cat', 'tel', 'post', 'club', 'online', 'site',
'tech', 'store', 'blog', 'shop', 'live', 'cloud', 'news', 'email'
}
# Words that commonly follow "at" but are NOT domains
FALSE_TRIGGER_WORDS = {
'the', 'a', 'an', 'my', 'your', 'his', 'her', 'our', 'their', 'ur',
'home', 'work', 'office', 'school', 'noon', 'night', 'midnight',
'dawn', 'dusk', 'once', 'least', 'most', 'first', 'last', 'all',
'gate', 'terminal', 'platform', 'station', 'airport', 'store',
'mall', 'gym', 'park', 'beach', 'bar', 'restaurant', 'hotel',
'clinic', 'hospital', 'bank', 'church', 'cvs', 'target', 'walmart'
}
# PII Detection Patterns
PII_PATTERNS = {
"EMAIL": {
# Negative lookbehind (?<!:) excludes connection strings like password@host
"pattern": r'(?<!:)\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
"description": "Email addresses",
"category": "pii"
},
"EMAIL_OBFUSCATED": {
# Obfuscated emails: t-e-s-t [at] example [dot] com, test(at)example(dot)com
# IMPORTANT: Do NOT match plain " at " - only match bracketed [at], (at), or literal @
# Plain " at " causes massive false positives ("meet at 5pm", "close at 9am", etc.)
"pattern": r'[A-Za-z0-9](?:[-\s]*[A-Za-z0-9]){2,}\s*(?:\[at\]|\(at\))\s*[A-Za-z0-9](?:[-\s]*[A-Za-z0-9]){2,}\s*(?:\[dot\]|\(dot\)|\s+dot\s+)\s*[A-Za-z]{2,}',
"description": "Obfuscated email addresses",
"category": "pii"
},
"PHONE_US": {
# US phone numbers: (555) 123-4567, 555-123-4567, +1 555 123 4567
"pattern": r'\b(?:\+?1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b',
"description": "US Phone numbers",
"category": "pii"
},
"PHONE_INTL": {
# International phone numbers: +49 30 1234567, +44 20 7946 0958, +55 11 98765-4321
# EU: 49(DE), 44(UK), 33(FR), 39(IT), 34(ES), 31(NL), 32(BE), 43(AT), 41(CH), 48(PL), etc.
# LATAM: 55(BR), 52(MX), 54(AR), 56(CL), 57(CO), 51(PE)
# APAC: 81(JP), 82(KR), 86(CN), 91(IN), 61(AU), 64(NZ), 65(SG), 852(HK)
"pattern": r'\+(?:49|44|33|39|34|31|32|43|41|48|351|353|358|47|46|45|420|36|40|359|385|386|421|370|371|372|352|356|357|30|55|52|54|56|57|51|81|82|86|91|61|64|65|852)\s?[0-9]{1,4}[\s-]?[0-9]{3,4}[\s-]?[0-9]{3,6}\b',
"description": "International Phone numbers (Global)",
"category": "pii"
},
"SSN": {
# US SSN: Requires separators (dashes, dots, spaces, or underscores)
# Matches: 123-45-6789, 123.45.6789, 123 45 6789, 123_45_6789
"pattern": r'\b\d{3}[-.\s_]\d{2}[-.\s_]\d{4}\b',
"description": "Social Security Numbers (US)",
"category": "pii"
},
"MEDICARE_ID": {
# US Medicare Beneficiary Identifier (MBI): 11 characters, alphanumeric
# Format: 1A12-B12-CD12 or 1-A-1-2-B-1-2-C-D-1-2 (with extra dashes)
"pattern": r'\b[1-9][-]?[A-Z][-]?(?:[A-Z0-9][-]?){2}[A-Z][-]?(?:[A-Z0-9][-]?){2}[A-Z][-]?[A-Z][-]?[0-9][-]?[0-9]\b',
"description": "Medicare Beneficiary Identifier (MBI)",
"category": "pii"
},
"GERMAN_ID": {
# German Personalausweisnummer (ID card): 10 characters
# Format: T220001293 or similar
"pattern": r'\b[A-Z0-9]{10}\b(?=.*(?:ausweis|personalausweis|id\s*card|national\s*id))',
"description": "German National ID (Personalausweis)",
"category": "pii"
},
"CREDIT_CARD": {
# Matches Visa, Mastercard, Amex, Discover with optional spaces/dashes
"pattern": r'\b(?:4[0-9]{3}[-\s]?[0-9]{4}[-\s]?[0-9]{4}[-\s]?[0-9]{4}|4[0-9]{12}(?:[0-9]{3})?|5[1-5][0-9]{2}[-\s]?[0-9]{4}[-\s]?[0-9]{4}[-\s]?[0-9]{4}|5[1-5][0-9]{14}|3[47][0-9]{2}[-\s]?[0-9]{6}[-\s]?[0-9]{5}|3[47][0-9]{13}|6(?:011|5[0-9]{2})[-\s]?[0-9]{4}[-\s]?[0-9]{4}[-\s]?[0-9]{4}|6(?:011|5[0-9]{2})[0-9]{12})\b',
"description": "Credit card numbers",
"category": "financial"
},
"IP_ADDRESS": {
"pattern": r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b',
"description": "IP addresses",
"category": "pii"
},
"IP_DEFANGED": {
# Defanged IPs: 192[.]168[.]1[.]1, 192[dot]168[dot]1[dot]1, 192(.)168(.)1(.)1
"pattern": r'\b(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?:\[(?:\.|\s*dot\s*)\]|\((?:\.|\s*dot\s*)\)|\[\.\])){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b',
"description": "Defanged IP addresses",
"category": "pii"
},
"DATE_OF_BIRTH": {
# Matches multiple date formats:
# MM/DD/YYYY, MM-DD-YYYY, DD/MM/YYYY, YYYY-MM-DD, Month DD, YYYY
"pattern": r'\b(?:(?:0?[1-9]|1[0-2])[/-](?:0?[1-9]|[12][0-9]|3[01])[/-](?:19|20)\d{2}|(?:0?[1-9]|[12][0-9]|3[01])[/-](?:0?[1-9]|1[0-2])[/-](?:19|20)\d{2}|(?:19|20)\d{2}[-/](?:0?[1-9]|1[0-2])[-/](?:0?[1-9]|[12][0-9]|3[01])|(?:January|February|March|April|May|June|July|August|September|October|November|December|Jan|Feb|Mar|Apr|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[.,]?\s+(?:0?[1-9]|[12][0-9]|3[01])(?:st|nd|rd|th)?[.,]?\s+(?:19|20)\d{2})\b',
"description": "Dates of birth (multiple formats)",
"category": "pii"
},
"IBAN": {
# International Bank Account Numbers - very flexible pattern
# Format: 2 letters (country) + 2 digits (check) + up to 30 alphanumeric (BBAN)
# Supports with/without spaces, common country formats
"pattern": r'\b(?:DE|GB|FR|ES|IT|NL|BE|AT|CH|PL|PT|IE|FI|NO|SE|DK|CZ|HU|RO|BG|HR|SI|SK|LT|LV|EE|LU|MT|CY|GR)\d{2}[\s]?[A-Z0-9]{4}[\s]?[A-Z0-9]{4}[\s]?[A-Z0-9]{4}[\s]?[A-Z0-9]{0,18}\b',
"description": "International Bank Account Numbers",
"category": "financial"
},
"BANK_ACCOUNT": {
# Generic bank account patterns (routing + account, with labels)
"pattern": r'\b(?:account|acct|a/c|acc)[:\s#]*[0-9]{8,17}\b',
"description": "Bank Account Numbers",
"category": "financial"
},
"ROUTING_NUMBER": {
# US Bank Routing/ABA numbers (9 digits)
"pattern": r'\b(?:routing|aba|rtn)[:\s#]*[0-9]{9}\b',
"description": "Bank Routing/ABA Numbers",
"category": "financial"
},
"SWIFT_BIC": {
# SWIFT/BIC codes (8 or 11 chars) - requires colon separator to avoid false positives
# Format: 4 letters (bank) + 2 letters (country ISO) + 2 alphanum (location) + optional 3 alphanum (branch)
# Pattern requires colon or "code" keyword to distinguish from "SWIFT transfer" etc.
"pattern": r'(?:swift|bic|swift/bic)(?::|(?:\s+code\s*))[:\s]*([A-Z]{4}[A-Z]{2}[A-Z0-9]{2}(?:[A-Z0-9]{3})?)\b',
"description": "SWIFT/BIC Codes",
"category": "financial"
},
"ZIP_CODE": {
"pattern": r'\b\d{5}(?:[-\s]\d{4})?\b',
"description": "US ZIP codes",
"category": "pii"
},
"DRIVERS_LICENSE": {
# US Driver's License - requires context to avoid false positives
# Most states use letter + 7-12 digits, but pattern too generic without context
"pattern": r'\b[A-Z][0-9]{7,12}\b(?=.*(?:license|licence|driver|dmv|dl[:#\s]|driving))',
"description": "Driver's license numbers",
"category": "pii"
},
# Physical Address patterns
"US_ADDRESS": {
"pattern": r'\b\d{1,5}\s+(?:[A-Za-z]+\s+){1,4}(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Drive|Dr|Lane|Ln|Court|Ct|Way|Place|Pl|Circle|Cir|Highway|Hwy)\.?(?:\s+(?:Apt|Suite|Ste|Unit|#)\s*[A-Za-z0-9-]+)?(?:,\s*[A-Za-z\s]+,\s*[A-Z]{2}\s+\d{5}(?:-\d{4})?)?\b',
"description": "US physical addresses",
"category": "pii"
},
# Secrets and API Keys
"AWS_ACCESS_KEY": {
"pattern": r'\b(?:AKIA|ABIA|ACCA|ASIA)[A-Z0-9]{16}\b',
"description": "AWS Access Key IDs",
"category": "secret"
},
"AWS_SECRET_KEY": {
"pattern": r'\b[A-Za-z0-9/+=]{40}\b',
"description": "AWS Secret Access Keys (40 char base64)",
"category": "secret"
},
"GITHUB_TOKEN": {
"pattern": r'\b(?:ghp|gho|ghu|ghs|ghr)_[A-Za-z0-9]{36,}\b',
"description": "GitHub Personal Access Tokens",
"category": "secret"
},
"GITHUB_TOKEN_CLASSIC": {
"pattern": r'\bgh[pousr]_[A-Za-z0-9]{36}\b',
"description": "GitHub Classic Tokens",
"category": "secret"
},
"SLACK_TOKEN": {
"pattern": r'\bxox[baprs]-[0-9]{10,13}-[0-9]{10,13}[a-zA-Z0-9-]*\b',
"description": "Slack API Tokens",
"category": "secret"
},
"GENERIC_API_KEY": {
"pattern": r'\b(?:api[_-]?key|apikey|api[_-]?secret|secret[_-]?key)["\']?\s*[:=]\s*["\']?([A-Za-z0-9_\-]{20,64})["\']?\b',
"description": "Generic API keys in key=value format",
"category": "secret"
},
"PASSWORD_IN_URL": {
# Passwords with = or : or "to" keyword
# Matches: password=secret, pwd: MyPass123, changed pwd to P@ssw0rd123!
"pattern": r'(?:password|passwd|pwd|pass|secret|credential)["\']?\s*(?:[:=]|(?:\s+(?:is|to|as)\s+))\s*["\']?([^\s"\'&,]{6,})["\']?',
"description": "Passwords in plaintext",
"category": "secret"
},
"PRIVATE_KEY": {
"pattern": r'-----BEGIN\s+(?:RSA\s+)?PRIVATE\s+KEY-----',
"description": "Private key headers",
"category": "secret"
},
"JWT_TOKEN": {
"pattern": r'\beyJ[A-Za-z0-9_-]*\.eyJ[A-Za-z0-9_-]*\.[A-Za-z0-9_-]*\b',
"description": "JSON Web Tokens",
"category": "secret"
},
"STRIPE_KEY": {
# Stripe keys: sk_live_xxx, pk_test_xxx - min 8 chars after prefix to catch truncated
"pattern": r'\b(?:sk|pk)_(?:test|live)_[A-Za-z0-9]{8,}\b',
"description": "Stripe API Keys",
"category": "secret"
},
"API_KEY_IN_URL": {
# API keys embedded in URLs: /key/xxx, /api_key/xxx, /token/xxx
"pattern": r'(?:/(?:key|api[_-]?key|token|secret|auth)[/=])([A-Za-z0-9_-]{16,})',
"description": "API Keys in URLs",
"category": "secret"
},
"AUTH_CODE": {
# Authorization codes: auth_code: 9921, authorization: ABC123
"pattern": r'(?:auth[_-]?code|authorization|auth[_-]?token|otp|verification[_-]?code)[:\s]+[A-Za-z0-9]{4,12}\b',
"description": "Authorization/Verification Codes",
"category": "secret"
},
"UUID": {
# UUIDs/GUIDs: 550e8400-e29b-41d4-a716-446655440000
"pattern": r'\b[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}\b',
"description": "UUIDs/Trace IDs",
"category": "pii"
},
"GEO_COORDINATES": {
# Geo coordinates: 40.7128° N, 74.0060° W or 40.7128, -74.0060
"pattern": r'\b-?\d{1,3}\.\d{4,}°?\s*[NS]?\s*,?\s*-?\d{1,3}\.\d{4,}°?\s*[EW]?\b',
"description": "Geographic Coordinates",
"category": "pii"
},
"GOOGLE_API_KEY": {
"pattern": r'\bAIza[A-Za-z0-9_-]{35}\b',
"description": "Google API Keys",
"category": "secret"
},
# GCP and Azure Cloud Keys
"GCP_SERVICE_ACCOUNT": {
"pattern": r'\b[a-z0-9-]+@[a-z0-9-]+\.iam\.gserviceaccount\.com\b',
"description": "GCP Service Account Emails",
"category": "secret"
},
"GCP_PRIVATE_KEY_ID": {
"pattern": r'"private_key_id"\s*:\s*"([a-f0-9]{40})"',
"description": "GCP Private Key IDs in JSON",
"category": "secret"
},
"AZURE_CLIENT_SECRET": {
"pattern": r'\b[a-zA-Z0-9~._-]{34,40}\b(?=.*(?:azure|client[_-]?secret|tenant))',
"description": "Azure Client Secrets",
"category": "secret"
},
"AZURE_CONNECTION_STRING": {
# Azure connection strings - match AccountKey=xxx pattern
"pattern": r'AccountKey=[A-Za-z0-9+/=]{10,}',
"description": "Azure Storage Connection Strings",
"category": "secret"
},
"AZURE_SAS_TOKEN": {
"pattern": r'\?sv=\d{4}-\d{2}-\d{2}&[^"\s]+sig=[A-Za-z0-9%]+',
"description": "Azure SAS Tokens",
"category": "secret"
},
# International Address Patterns
"UK_POSTCODE": {
# Valid UK postcode formats: A9 9AA, A99 9AA, A9A 9AA, AA9 9AA, AA99 9AA, AA9A 9AA
# Must have space between outward and inward codes, and be word-bounded
"pattern": r'\b(?:[A-Z]{1,2}[0-9][0-9A-Z]?\s+[0-9][A-Z]{2})\b',
"description": "UK Postcodes",
"category": "pii"
},
"UK_ADDRESS": {
"pattern": r'\b\d{1,5}\s+[A-Za-z]+(?:\s+[A-Za-z]+)*\s*,\s*[A-Za-z]+(?:\s+[A-Za-z]+)*\s*,?\s*[A-Z]{1,2}[0-9][0-9A-Z]?\s*[0-9][A-Z]{2}\b',
"description": "UK Addresses with Postcode",
"category": "pii"
},
"EU_ADDRESS": {
# European addresses: German (strasse 42), French (25 rue xxx, 75016 Paris), etc
"pattern": r'\b(?:[A-Za-z]+(?:strasse|straße|street|straat|calle|via|strada|gasse|weg|platz|plein|place|plaza)\s+\d{1,5}|\d{1,5}\s+(?:rue|av\.?|avenue|boulevard|blvd\.?|chemin|allée|impasse|passage)\s+[A-Za-z][A-Za-z\s]{2,25})[,\s]+\d{4,5}\s+[A-Za-z]+',
"description": "European Addresses (DE/FR/NL/IT/ES)",
"category": "pii"
},
"INTERNATIONAL_ADDRESS": {
# Generic pattern for addresses with postal codes
"pattern": r'\b\d{1,5}\s+[A-Za-z][A-Za-z\s]{2,30},\s*[A-Za-z][A-Za-z\s]{2,20},?\s*(?:[A-Z]{2}\s*)?\d{4,6}(?:-\d{4})?\b',
"description": "International Addresses with Postal Codes",
"category": "pii"
},
# Medical IDs (distinct from passports)
"MEDICAL_RECORD_NUMBER": {
"pattern": r'\b(?:MRN|MR#|Medical\s*Record|Patient\s*ID)[:\s#]*[A-Z0-9]{6,12}\b',
"description": "Medical Record Numbers",
"category": "pii"
},
"NPI_NUMBER": {
"pattern": r'\b(?:NPI)[:\s#]*[0-9]{10}\b',
"description": "National Provider Identifier (NPI)",
"category": "pii"
},
"DEA_NUMBER": {
"pattern": r'\b[A-Z][A-Z9][0-9]{7}\b',
"description": "DEA Registration Numbers",
"category": "pii"
},
# EU VAT Numbers (detected directly via dedicated function for better accuracy)
"EU_VAT_NUMBER": {
"pattern": r'\b(?:ATU\d{8}|BE0?\d{9,10}|BG\d{9,10}|CY\d{8}[A-Z]|CZ\d{8,10}|DE\d{9}|DK\d{8}|EE\d{9}|EL\d{9}|ES[A-Z0-9]\d{7}[A-Z0-9]|FI\d{8}|FR[A-Z0-9]{2}\d{9}|HR\d{11}|HU\d{8}|IE\d[A-Z0-9+*]\d{5}[A-Z]{1,2}|IT\d{11}|LT\d{9,12}|LU\d{8}|LV\d{11}|MT\d{8}|NL\d{9}B\d{2}|PL\d{10}|PT\d{9}|RO\d{2,10}|SE\d{12}|SI\d{8}|SK\d{10}|GB\d{9,12})\b',
"description": "European Union VAT Numbers",
"category": "financial"
},
# Spelled-out SSN pattern (detected via NLP layer)
"SSN_SPELLED": {
"pattern": r'\b(?:S\.?S\.?N\.?|social\s*security)\s+(?:is\s+)?(?:(?:zero|one|two|three|four|five|six|seven|eight|nine|oh|o)\s+)+(?:dash|hyphen)\s+(?:(?:zero|one|two|three|four|five|six|seven|eight|nine|oh|o)\s+)+(?:dash|hyphen)\s+(?:(?:zero|one|two|three|four|five|six|seven|eight|nine|oh|o)\s*)+',
"description": "Spelled-out Social Security Numbers",
"category": "pii"
},
# Base64 encoded secrets (marker for decoded content)
"BASE64_SECRET": {
"pattern": r'PLACEHOLDER_FOR_DECODED_BASE64', # Detected via decode layer
"description": "Base64 Encoded Secrets",
"category": "secret"
},
# JSON embedded PII (marker for nested content)
"JSON_EMBEDDED_PII": {
"pattern": r'PLACEHOLDER_FOR_JSON_PII', # Detected via JSON extraction layer
"description": "PII Found in JSON Blobs",
"category": "pii"
},
# =========================================================================
# INTERNATIONAL ID PATTERNS (v1.1)
# =========================================================================
"UK_NATIONAL_INSURANCE": {
# UK National Insurance Number: AB123456C
"pattern": r'\b[A-CEGHJ-PR-TW-Z]{2}\s?\d{2}\s?\d{2}\s?\d{2}\s?[A-D]\b',
"description": "UK National Insurance Number",
"category": "pii"
},
"CANADIAN_SIN": {
# Canadian Social Insurance Number: 123-456-789 or 123 456 789
"pattern": r'\b\d{3}[-\s]\d{3}[-\s]\d{3}\b',
"description": "Canadian Social Insurance Number",
"category": "pii"
},
"INDIA_AADHAAR": {
# India Aadhaar: 1234 5678 9012 (12 digits with spaces)
"pattern": r'\b\d{4}\s\d{4}\s\d{4}\b',
"description": "India Aadhaar Number",
"category": "pii"
},
"INDIA_PAN": {
# India PAN: ABCDE1234F (5 letters, 4 digits, 1 letter)
"pattern": r'\b[A-Z]{5}\d{4}[A-Z]\b',
"description": "India PAN Card Number",
"category": "pii"
},
"AUSTRALIA_TFN": {
# Australian Tax File Number: 123 456 789 (9 digits)
"pattern": r'\b\d{3}\s\d{3}\s\d{3}\b',
"description": "Australian Tax File Number",
"category": "pii"
},
"BRAZIL_CPF": {
# Brazil CPF: 123.456.789-00
"pattern": r'\b\d{3}\.\d{3}\.\d{3}-\d{2}\b',
"description": "Brazil CPF Number",
"category": "pii"
},
"MEXICO_CURP": {
# Mexico CURP: 18 alphanumeric
"pattern": r'\b[A-Z]{4}\d{6}[HM][A-Z]{5}[A-Z\d]{2}\b',
"description": "Mexico CURP",
"category": "pii"
},
"SOUTH_AFRICA_ID": {
# South Africa ID: 13 digits YYMMDDGGGSCAZ
"pattern": r'\b\d{2}(?:0[1-9]|1[0-2])(?:0[1-9]|[12]\d|3[01])\d{4}[01]\d{2}\b',
"description": "South Africa ID Number",
"category": "pii"
},
# =========================================================================
# ADDITIONAL CLOUD TOKENS (v1.1)
# =========================================================================
"DISCORD_TOKEN": {
# Discord bot/user tokens: base64.base64.base64 format
"pattern": r'\b[MN][A-Za-z\d]{23,}\.[\w-]{6}\.[\w-]{27}\b',
"description": "Discord Bot/User Tokens",
"category": "secret"
},
"DISCORD_WEBHOOK": {
"pattern": r'https://discord(?:app)?\.com/api/webhooks/\d+/[\w-]+',
"description": "Discord Webhook URLs",
"category": "secret"
},
"TWILIO_API_KEY": {
"pattern": r'\bSK[a-f0-9]{32}\b',
"description": "Twilio API Keys",
"category": "secret"
},
"TWILIO_AUTH_TOKEN": {
"pattern": r'\b[a-f0-9]{32}\b(?=.*(?:twilio|auth_token))',
"description": "Twilio Auth Tokens",
"category": "secret"
},
"SENDGRID_API_KEY": {
"pattern": r'\bSG\.[A-Za-z0-9_-]{22}\.[A-Za-z0-9_-]{43}\b',
"description": "SendGrid API Keys",
"category": "secret"
},
"OPENAI_API_KEY": {
"pattern": r'\bsk-[A-Za-z0-9]{48}\b',
"description": "OpenAI API Keys",
"category": "secret"
},
"ANTHROPIC_API_KEY": {
"pattern": r'\bsk-ant-[A-Za-z0-9_-]{40,}\b',
"description": "Anthropic API Keys",
"category": "secret"
},
"MAILCHIMP_API_KEY": {
"pattern": r'\b[a-f0-9]{32}-us\d{1,2}\b',
"description": "Mailchimp API Keys",
"category": "secret"
},
"MAILGUN_API_KEY": {
"pattern": r'\bkey-[a-f0-9]{32}\b',
"description": "Mailgun API Keys",
"category": "secret"
},
"HEROKU_API_KEY": {
"pattern": r'\b[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}\b',
"description": "Heroku API Keys",
"category": "secret"
},
"SHOPIFY_ACCESS_TOKEN": {
"pattern": r'\bshpat_[a-fA-F0-9]{32}\b',
"description": "Shopify Access Tokens",
"category": "secret"
},
"SHOPIFY_SHARED_SECRET": {
"pattern": r'\bshpss_[a-fA-F0-9]{32}\b',
"description": "Shopify Shared Secrets",
"category": "secret"
},
"NPM_TOKEN": {
"pattern": r'\bnpm_[A-Za-z0-9]{36}\b',
"description": "NPM Access Tokens",
"category": "secret"
},
"PYPI_TOKEN": {
"pattern": r'\bpypi-[A-Za-z0-9_-]{50,}\b',
"description": "PyPI API Tokens",
"category": "secret"
},
"DOCKER_AUTH": {
"pattern": r'"auth"\s*:\s*"[A-Za-z0-9+/=]{20,}"',
"description": "Docker Registry Auth",
"category": "secret"
},
# =========================================================================
# CRYPTO ADDRESSES (v1.1)
# =========================================================================
"BITCOIN_ADDRESS": {
# Bitcoin: 1xxx, 3xxx (P2SH), bc1xxx (Bech32)
"pattern": r'\b(?:[13][a-km-zA-HJ-NP-Z1-9]{25,34}|bc1[a-zA-HJ-NP-Z0-9]{39,59})\b',
"description": "Bitcoin Wallet Addresses",
"category": "financial"
},
"ETHEREUM_ADDRESS": {
# Ethereum: 0x followed by 40 hex chars
"pattern": r'\b0x[a-fA-F0-9]{40}\b',
"description": "Ethereum Wallet Addresses",
"category": "financial"
},
"MONERO_ADDRESS": {
# Monero: 4 or 8 followed by 93 or 103 chars
"pattern": r'\b[48][0-9AB][1-9A-HJ-NP-Za-km-z]{93}\b',
"description": "Monero Wallet Addresses",
"category": "financial"
},
# =========================================================================
# FINANCIAL IDENTIFIERS (v1.1)
# =========================================================================
"CUSIP": {
# CUSIP: 9 alphanumeric chars (US/Canada securities)
"pattern": r'\b[A-Z0-9]{9}\b(?=.*(?:cusip|security|stock|bond))',
"description": "CUSIP Security Identifiers",
"category": "financial"
},
"ISIN": {
# ISIN: Valid country code + 9 alphanumeric (NSIN) + 1 check digit
# Requires financial context to reduce false positives
"pattern": r'\b(?:US|GB|DE|FR|CH|JP|CA|AU|NL|BE|IT|ES|AT|SE|NO|DK|FI|IE|PT|LU|HK|SG|KR|TW|IN|BR|ZA|MX)[A-Z0-9]{9}\d\b(?=.*(?:isin|security|stock|bond|share|equity|fund))',
"description": "ISIN Security Identifiers",
"category": "financial"
},
"SEDOL": {
# SEDOL: 7 alphanumeric (UK securities)
"pattern": r'\b[B-DF-HJ-NP-TV-Z0-9]{7}\b(?=.*(?:sedol|london|lse))',
"description": "SEDOL Security Identifiers",
"category": "financial"
}
}
class PIIEntity(BaseModel):
type: str
value: str
start: int
end: int
confidence: float
masked_value: str
class ScanResult(BaseModel):
total_entities: int
entities_by_type: dict
entities: list[PIIEntity]
risk_level: str
risk_score: int
redacted_preview: Optional[str] = None
class ScanConfig(BaseModel):
detect_emails: bool = True
detect_phones: bool = True
detect_ssn: bool = True
detect_credit_cards: bool = True
detect_ip_addresses: bool = True
detect_dates: bool = True
detect_zip_codes: bool = False
detect_addresses: bool = True
detect_iban: bool = True
detect_secrets: bool = True # AWS keys, GitHub tokens, passwords, etc.
# ============================================================================
# FALSE POSITIVE EXCLUSION PATTERNS (v1.1)
# ============================================================================
# Patterns to EXCLUDE from detection (known false positives)
FALSE_POSITIVE_PATTERNS = {
# Git commit SHAs (40 hex chars) - not AWS secrets
"GIT_SHA": r'\b[a-f0-9]{40}\b',
# Unix timestamps (13 digits) - not phone numbers
"UNIX_TIMESTAMP_MS": r'\b1[0-9]{12}\b',
# Unix timestamps (10 digits) - not phone numbers
"UNIX_TIMESTAMP": r'\b1[0-9]{9}\b',
# Slack webhook path identifiers (T/B followed by alphanumeric)
"SLACK_WEBHOOK_PATH": r'T[A-Z0-9]{8,}/B[A-Z0-9]{8,}',
# Common test/example patterns
"TEST_PATTERN": r'\b(?:test|example|sample|demo|fake|dummy|xxx+|000[-\s]?000[-\s]?0000)\b',
# Version numbers (could look like dates)
"VERSION_NUMBER": r'\bv?\d+\.\d+\.\d+(?:\.\d+)?\b',
# File hashes (MD5, SHA variants)
"FILE_HASH_MD5": r'\b[a-f0-9]{32}\b(?!.*(?:twilio|auth_token))',
"FILE_HASH_SHA256": r'\b[a-f0-9]{64}\b',
}
# ============================================================================
# LAYER 2.0: Pre-Processing - Unicode & HTML Normalization (v1.1)
# ============================================================================
def normalize_unicode(text: str) -> str:
"""
Layer 2.0a: Apply Unicode NFKC normalization to catch:
- Fullwidth characters: @. → john@example.com
- Unicode dots: johndoe@examplecom → john.doe@example.com
- Other homoglyphs and compatibility characters
"""
return unicodedata.normalize('NFKC', text)
def decode_html_entities(text: str) -> str:
"""
Layer 2.0b: Decode HTML entities to catch:
- john&at;example&dot;com → john@example.com
- john&#64;example&#46;com → john@example.com
- &amp; &lt; &gt; etc.
"""
# First pass: standard HTML entities
decoded = html.unescape(text)
# Second pass: custom obfuscation patterns
custom_entities = [
(r'&at;', '@'),
(r'&dot;', '.'),
(r'&#64;', '@'), # @ in decimal
(r'&#46;', '.'), # . in decimal
(r'&#x40;', '@'), # @ in hex
(r'&#x2e;', '.'), # . in hex
]
for pattern, replacement in custom_entities:
decoded = re.sub(pattern, replacement, decoded, flags=re.IGNORECASE)
return decoded
def preprocess_text(text: str) -> tuple[str, list[dict]]:
"""
Layer 2.0: Full pre-processing pipeline combining Unicode and HTML normalization.
Returns preprocessed text and a log of transformations applied.
"""
transformations = []
processed = text
# Step 1: Unicode NFKC normalization
unicode_normalized = normalize_unicode(processed)
if unicode_normalized != processed:
transformations.append({
"layer": "2.0a",
"type": "unicode_nfkc",
"changes": len(processed) - len(unicode_normalized)
})
processed = unicode_normalized
# Step 2: HTML entity decoding
html_decoded = decode_html_entities(processed)
if html_decoded != processed:
transformations.append({
"layer": "2.0b",
"type": "html_entity_decode",
"changes": len(processed) - len(html_decoded)
})
processed = html_decoded
return processed, transformations
def is_false_positive(value: str, pii_type: str, context: str = "") -> bool:
"""
Check if a detected value is a known false positive pattern.
Returns True if the value should be excluded.
"""
value_lower = value.lower()
context_lower = context.lower() if context else ""
# Git SHA check (for AWS_SECRET_KEY false positives)
if pii_type == "AWS_SECRET_KEY":
# If it's all lowercase hex and 40 chars, it's probably a git SHA
if re.match(r'^[a-f0-9]{40}$', value):
return True
# Also check if "commit" or "sha" in context
if any(kw in context_lower for kw in ["commit", "sha", "hash", "git", "rev"]):
return True
# Timestamp check (for PHONE_US false positives)
if pii_type in ["PHONE_US", "PHONE_US_NORMALIZED"]:
# Check if it looks like a timestamp (13 digits starting with 1)
digits_only = re.sub(r'\D', '', value)
if len(digits_only) == 13 and digits_only.startswith('1'):
return True
if len(digits_only) == 10 and digits_only.startswith('1'):
return True
# Slack webhook path check (for DRIVERS_LICENSE false positives)
if pii_type == "DRIVERS_LICENSE":
if re.match(r'^T[A-Z0-9]{8,}$', value) or re.match(r'^B[A-Z0-9]{8,}$', value):
if "slack" in context_lower or "webhook" in context_lower:
return True
# B followed by 7 digits could be HK ID or other codes, need context
if re.match(r'^B\d{7}$', value):
# Only valid if context suggests driver's license
if not any(kw in context_lower for kw in ["license", "licence", "driver", "dmv", "dl#", "dl:"]):
return True
# Date check for log timestamps (for DATE_OF_BIRTH false positives)
if pii_type == "DATE_OF_BIRTH":
# Log timestamps often have time component nearby
if any(kw in context_lower for kw in ["log", "timestamp", "created", "updated", "time:", "at ", ":"]):
if re.search(r'\d{2}:\d{2}:\d{2}', context): # Has time component
return True
# Version number check
if pii_type != "IP_ADDRESS" and re.match(r'^v?\d+\.\d+\.\d+', value):
return True
# EMAIL and EMAIL_OBFUSCATED false positives - connection strings and URIs
if pii_type in ["EMAIL", "EMAIL_OBFUSCATED"]:
# Connection string pattern: ://user:password@host
# This catches mongodb+srv://user:pass@cluster, redis://default:pass@host, etc.
if re.search(r'://[^@]*:[^@]*@', context):
return True
# MongoDB/database connection strings: password@cluster.mongodb.net
if any(db in value_lower for db in [".mongodb.net", ".mongodb.com", "cluster0", "cluster1"]):
return True
# Redis connection strings
if any(db in value_lower for db in [".redis", "redis-", "redislabs"]):
return True
# Other database connection patterns
if any(db in context_lower for db in ["mongodb://", "mongodb+srv://", "redis://", "postgres://", "mysql://", "amqp://", "connection_string", "conn_str"]):
return True
# Part of URL (e.g., user:pass@host pattern)
if re.search(r'://[^@]+' + re.escape(value), context):
return True
# UK_POSTCODE false positives - partial passwords, codes
if pii_type == "UK_POSTCODE":
# Too short or looks like part of a password/hash
if len(value) < 6:
return True
# Check if it's surrounded by alphanumeric chars (part of longer string)
value_pos = context.find(value)
if value_pos > 0:
char_before = context[value_pos - 1] if value_pos > 0 else ' '
char_after = context[value_pos + len(value)] if value_pos + len(value) < len(context) else ' '
if char_before.isalnum() or char_after.isalnum():
return True
# ISIN false positives - needs context validation
if pii_type == "ISIN":
# ISIN should have country code followed by alphanumeric
# Exclude if it looks like a passport number or other ID
if any(kw in context_lower for kw in ["passport", "travel", "visa", "id card", "identity"]):
return True
# Exclude random-looking alphanumeric strings without financial context
if not any(kw in context_lower for kw in ["isin", "security", "stock", "bond", "cusip", "sedol", "share", "equity", "fund"]):
return True
# BANK_ACCOUNT false positives - exclude credit card numbers
if pii_type == "BANK_ACCOUNT":
digits_only = re.sub(r'\D', '', value)
# If it's 15-16 digits and passes Luhn, it's a credit card, not bank account
if len(digits_only) in [15, 16]:
# Luhn check inline
digits = [int(d) for d in digits_only]
odd_digits = digits[-1::-2]
even_digits = digits[-2::-2]
total = sum(odd_digits)
for d in even_digits:
total += sum(divmod(d * 2, 10))
if total % 10 == 0:
return True # It's a credit card, not a bank account
# INDIA_AADHAAR false positives - exclude IBAN fragments
if pii_type == "INDIA_AADHAAR":
# If context suggests IBAN or European bank context, it's likely an IBAN fragment
if any(kw in context_lower for kw in ["iban", "bic", "swift", "sepa", "bank", "de", "gb", "fr", "nl", "at", "ch"]):
return True
# Check if preceded by 2-letter country code + 2 digits (IBAN prefix pattern)
# Pattern: XX## followed by the detected value
iban_prefix_pattern = r'[A-Z]{2}\d{2}\s*' + re.escape(value.replace(' ', r'\s*'))
if re.search(iban_prefix_pattern, context, re.IGNORECASE):
return True
# CANADIAN_SIN false positives - exclude Australian ABN format
if pii_type == "CANADIAN_SIN":
# ABN format is similar (9 digits in 3-3-3) but in Australian context
if any(kw in context_lower for kw in ["abn", "australia", "australian", "business number", "gst"]):
return True
# CREDIT_CARD false positives - exclude order/confirmation/reference numbers
if pii_type in ["CREDIT_CARD", "POSSIBLE_CARD_PATTERN"]:
# Check for common prefixes that indicate non-card numbers
order_prefixes = ["order", "order #", "order#", "order:", "order number",
"conf", "confirmation", "conf #", "conf#", "confirmation #",
"ref", "reference", "ref #", "ref#", "reference #",
"invoice", "invoice #", "inv #", "inv#",
"tracking", "tracking #", "track #",
"ticket", "ticket #", "case #", "case#",
"transaction id", "trans id", "txn"]
if any(prefix in context_lower for prefix in order_prefixes):
return True
# Check for pattern: "# followed immediately by the number"
if re.search(r'#\s*' + re.escape(value), context):
return True
return False
# ============================================================================
# LAYER 2: Text Normalization Functions
# ============================================================================
def normalize_text(text: str) -> tuple[str, dict]:
"""
Layer 2: Normalize text to reveal hidden PII.
Returns normalized text and a mapping of original positions.
Handles:
- [dot] / (dot) / [.] → .
- [at] / (at) → @
- Dashes/underscores in numbers → removed
- Spaced out characters → joined
"""
normalized = text
transformations = []
# Track transformations for position mapping
# 1. Convert defanged dots: [dot], (dot), [.], (.), [ dot ], etc.
# Also handle with surrounding spaces: " [dot] " → "."
dot_patterns = [
(r'\s*\[\s*dot\s*\]\s*', '.'), # [dot], [ dot ], etc with optional spaces
(r'\s*\(\s*dot\s*\)\s*', '.'), # (dot), ( dot ), etc with optional spaces
(r'\s*\[\.\]\s*', '.'), # [.]
(r'\s*\(\.\)\s*', '.'), # (.)
(r'\s+dot\s+', '.'), # " dot " (word boundary)
]
for pattern, replacement in dot_patterns:
normalized = re.sub(pattern, replacement, normalized, flags=re.IGNORECASE)
# 2. Convert defanged at: [at], (at), [ at ], etc.
# IMPORTANT: Bracketed forms [at] and (at) are always intentional obfuscation
# But plain " at " must be validated to avoid "meet at 5pm" → "meet@5pm"
# 2a. Bracketed forms - always convert (these are intentional obfuscation)
bracketed_at_patterns = [
(r'\s*\[\s*at\s*\]\s*', '@'), # [at], [ at ], etc with optional spaces
(r'\s*\(\s*at\s*\)\s*', '@'), # (at), ( at ), etc with optional spaces
]
for pattern, replacement in bracketed_at_patterns:
normalized = re.sub(pattern, replacement, normalized, flags=re.IGNORECASE)
# 2b. Plain " at " - ONLY convert if it looks like an email pattern:
# - Preceded by username-like chars (alphanumeric, dots, underscores)
# - Followed by domain-like pattern (word + dot/space-dot + TLD)
# Valid: "john at gmail dot com", "mike at company.org"
# Invalid: "meet at 5pm", "look at this", "ready at cvs", "at gate b7"
def should_normalize_at(text: str, at_position: int) -> bool:
"""
Robust validation for ' at ' to '@' conversion.
Returns True only if this looks like a genuine obfuscated email.
Validates:
- Username pattern before "at"
- Rejects false trigger words after "at" (the, my, home, cvs, etc.)
- Rejects time patterns ("at 7pm", "at 2:30")
- Domain pattern with valid TLD after "at"
"""
# Get context around the "at"
before_text = text[:at_position]
after_text = text[at_position + 4:] # Skip " at "
# 1. Check username pattern (must end with valid email username chars)
username_match = re.search(r'([a-zA-Z0-9][a-zA-Z0-9._%+-]{0,63})$', before_text)
if not username_match:
return False
# 2. Get the first word after "at" and check for false triggers
first_word_match = re.match(r'([a-zA-Z0-9]+)', after_text)
if not first_word_match:
return False
first_word = first_word_match.group(1).lower()
# Reject if first word is a common false trigger
if first_word in FALSE_TRIGGER_WORDS:
return False
# 3. Reject time patterns: "at 7pm", "at 2:30", "at noon"
time_pattern = r'^[0-9]{1,2}(?::[0-9]{2})?(?:\s*(?:am|pm|AM|PM))?\s*(?:$|[.,!?\s])'
if re.match(time_pattern, after_text):
return False
# 4. Check for valid domain pattern with TLD
# Pattern: domain + (dot or " dot ") + valid TLD
domain_pattern = r'^([a-zA-Z0-9][a-zA-Z0-9-]{0,61}[a-zA-Z0-9]?)(?:\.|(?:\s+dot\s+))([a-zA-Z]{2,10})(?:\s|$|[.,!?])'
domain_match = re.match(domain_pattern, after_text, re.IGNORECASE)
if domain_match:
potential_tld = domain_match.group(2).lower()
if potential_tld in VALID_TLDS:
return True
return False
def smart_at_replacement(match):
"""Wrapper function for re.sub that uses should_normalize_at()"""
full_match = match.group(0)
before = match.group(1) # Username-like part
after = match.group(2) # Domain-like part
# Find the position of " at " in the original normalized text
# The match.start() gives us where the full match begins
# We need to check from where "at" actually appears
at_pos_in_match = len(before) # " at " starts right after the username
at_pos_in_text = match.start() + at_pos_in_match
if should_normalize_at(normalized, at_pos_in_text):
return before + '@' + after
return full_match # Return unchanged
# Pattern: (username-like) + " at " + (potential domain)
normalized = re.sub(
r'([a-zA-Z0-9][a-zA-Z0-9._%+-]*)\s+at\s+([a-zA-Z0-9][a-zA-Z0-9.\s]*)',
smart_at_replacement,
normalized,
flags=re.IGNORECASE
)
# 3. Remove separators from potential numbers (SSN, CC, phone)
# Only in numeric contexts: sequences that look like numbers with separators
# Pattern: digit, separator(s), digit, separator(s), digit...
def clean_numeric_separators(match):
"""Remove dashes, underscores, spaces from numeric sequences"""
return re.sub(r'[-_\s]', '', match.group())
# Match sequences that look like formatted numbers (3+ digit groups)
normalized = re.sub(
r'\b(\d{2,4})[-_\s]+(\d{2,4})[-_\s]+(\d{2,4})(?:[-_\s]+(\d{2,4}))?\b',
clean_numeric_separators,
normalized
)
# 4. Join spaced-out characters (t-e-s-t → test, t e s t → test)
# Only for sequences that look intentionally obfuscated (alternating char-separator)
def join_spaced_chars(match):
"""Join characters separated by dashes or spaces"""
chars = re.findall(r'[A-Za-z0-9]', match.group())
return ''.join(chars)
# Match spaced-out patterns: a-b-c-d or a b c d (3+ chars)
normalized = re.sub(
r'\b([A-Za-z0-9])(?:[-\s]([A-Za-z0-9])){3,}\b',
join_spaced_chars,
normalized
)
return normalized, {"original_length": len(text), "normalized_length": len(normalized)}
# ============================================================================
# LAYER 2.5: JSON Blob Extraction & Deep Scanning
# ============================================================================
def extract_json_strings(text: str) -> list[tuple[str, int, int]]:
"""
Extract JSON objects embedded in text for deep scanning.
Returns list of (json_string, start_pos, end_pos)
"""
json_strings = []
# Pattern to find JSON-like structures: {...} or escaped JSON strings
# Look for JSON objects that might be embedded as string values
json_patterns = [
# Standard JSON objects
r'(\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\})',
# Escaped JSON in string values (e.g., "{\"key\": \"value\"}")
r'"(\{(?:\\"|[^"])*\})"',
]
for pattern in json_patterns:
for match in re.finditer(pattern, text):
json_str = match.group(1) if match.lastindex else match.group()
# Unescape if it was escaped JSON
if json_str.startswith('{') and '\\"' in json_str:
json_str = json_str.replace('\\"', '"')
json_strings.append((json_str, match.start(), match.end()))
return json_strings
def deep_scan_json(json_str: str) -> list[str]:
"""
Recursively extract all string values from a JSON structure.
Returns list of string values to scan.
"""
extracted_values = []
def extract_strings(obj):
if isinstance(obj, str):
extracted_values.append(obj)
elif isinstance(obj, dict):
for key, value in obj.items():
# Also check keys for PII (e.g., "ssn", "email")
extracted_values.append(f"{key}: {value}" if isinstance(value, str) else key)
extract_strings(value)
elif isinstance(obj, list):
for item in obj:
extract_strings(item)
try:
parsed = json.loads(json_str)
extract_strings(parsed)
except json.JSONDecodeError:
# If not valid JSON, try to extract key-value pairs with regex
# Pattern for "key": "value" pairs
kv_pattern = r'"([^"]+)"\s*:\s*"([^"]*)"'
for match in re.finditer(kv_pattern, json_str):
extracted_values.append(f"{match.group(1)}: {match.group(2)}")
return extracted_values
# ============================================================================
# LAYER 2.6: Base64 Auto-Detection & Decoding
# ============================================================================
def is_valid_base64(s: str) -> bool:
"""Check if a string is valid base64 encoded."""
# Must be at least 8 chars and multiple of 4 (with padding)
if len(s) < 8:
return False
# Check for valid base64 characters
base64_pattern = r'^[A-Za-z0-9+/]+=*$'
if not re.match(base64_pattern, s):
return False
# Length check (with padding, should be multiple of 4)
if len(s) % 4 != 0:
return False
return True
def decode_base64_strings(text: str, max_depth: int = 2) -> list[tuple[str, str, int, int, int]]:
"""
Find and decode base64 strings in text with recursive decoding support (v1.1).
Returns list of (original_b64, decoded_text, start_pos, end_pos, decode_depth)
Now supports:
- Recursive decoding (double-encoded base64)
- JSON extraction from decoded content
"""
decoded_strings = []
# Pattern to find potential base64 strings (min 16 chars to avoid false positives)
# Must contain at least one letter and one number to reduce false positives
b64_pattern = r'\b([A-Za-z0-9+/]{16,}={0,2})\b'
def recursive_decode(b64_str: str, start: int, end: int, depth: int = 1):
"""Recursively decode base64 strings up to max_depth"""
if depth > max_depth:
return
if not is_valid_base64(b64_str):
return
try:
decoded = base64.b64decode(b64_str).decode('utf-8', errors='ignore')
# Only keep if decoded text is printable and looks like real text
if decoded and len(decoded) >= 4:
printable_ratio = sum(1 for c in decoded if c.isprintable()) / len(decoded)
letter_ratio = sum(1 for c in decoded if c.isalpha()) / len(decoded)
if printable_ratio > 0.7 and letter_ratio > 0.2:
decoded_strings.append((b64_str, decoded, start, end, depth))
# Recursive: Check if decoded content contains more base64
if depth < max_depth:
for inner_match in re.finditer(b64_pattern, decoded):
recursive_decode(
inner_match.group(1),
start, # Keep original position
end,
depth + 1
)
# Also check if decoded content is JSON and scan it
if decoded.strip().startswith('{') or decoded.strip().startswith('['):
try:
json.loads(decoded)
# Valid JSON - will be picked up by JSON layer
except json.JSONDecodeError:
pass
except Exception:
pass
for match in re.finditer(b64_pattern, text):
b64_str = match.group(1)
recursive_decode(b64_str, match.start(), match.end())
return decoded_strings
# ============================================================================
# LAYER 2.7: Spelled-Out Number Detection (NLP-lite)
# ============================================================================
# Word to digit mapping
WORD_TO_DIGIT = {
'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4',
'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9',
'oh': '0', 'o': '0', # "oh" is often used for zero
}
def convert_spelled_numbers(text: str) -> tuple[str, list[tuple[str, str, int, int]]]:
"""
Convert spelled-out numbers to digits.
Returns (converted_text, list of (original, converted, start, end))
Examples:
- "nine zero zero dash twelve dash eight eight two one""900-12-8821"
- "four one five five five one two one two""415-555-1212"
"""
conversions = []
result = text
# Build pattern for number words
number_words = '|'.join(WORD_TO_DIGIT.keys())
# Pattern: sequence of number words separated by spaces, possibly with "dash" or "hyphen"
# Must have at least 3 number words to be considered a spelled-out number
spelled_pattern = rf'\b((?:(?:{number_words})\s*)+(?:(?:dash|hyphen)\s*(?:(?:{number_words})\s*)+)+)\b'
matches = list(re.finditer(spelled_pattern, text, re.IGNORECASE))
for match in reversed(matches): # Process in reverse to maintain positions
original = match.group(1)
# Convert words to digits
converted_parts = []
current_number = ''
words = re.split(r'\s+', original.lower())
for word in words:
word = word.strip()
if word in WORD_TO_DIGIT:
current_number += WORD_TO_DIGIT[word]
elif word in ['dash', 'hyphen']:
if current_number:
converted_parts.append(current_number)
current_number = ''
if current_number:
converted_parts.append(current_number)
if converted_parts:
converted = '-'.join(converted_parts)
conversions.append((original, converted, match.start(), match.end()))
result = result[:match.start()] + converted + result[match.end():]
return result, conversions
# ============================================================================
# LAYER 2.8: Non-Latin Character Support & International Patterns
# ============================================================================
# European VAT Number patterns by country
EU_VAT_PATTERNS = {
"AT": r'ATU\d{8}', # Austria
"BE": r'BE0?\d{9,10}', # Belgium
"BG": r'BG\d{9,10}', # Bulgaria
"CY": r'CY\d{8}[A-Z]', # Cyprus
"CZ": r'CZ\d{8,10}', # Czech Republic
"DE": r'DE\d{9}', # Germany
"DK": r'DK\d{8}', # Denmark
"EE": r'EE\d{9}', # Estonia
"EL": r'EL\d{9}', # Greece (uses EL not GR)
"ES": r'ES[A-Z0-9]\d{7}[A-Z0-9]', # Spain
"FI": r'FI\d{8}', # Finland
"FR": r'FR[A-Z0-9]{2}\d{9}', # France
"HR": r'HR\d{11}', # Croatia
"HU": r'HU\d{8}', # Hungary
"IE": r'IE\d[A-Z0-9+*]\d{5}[A-Z]{1,2}', # Ireland
"IT": r'IT\d{11}', # Italy
"LT": r'LT\d{9,12}', # Lithuania
"LU": r'LU\d{8}', # Luxembourg
"LV": r'LV\d{11}', # Latvia
"MT": r'MT\d{8}', # Malta
"NL": r'NL\d{9}B\d{2}', # Netherlands
"PL": r'PL\d{10}', # Poland
"PT": r'PT\d{9}', # Portugal
"RO": r'RO\d{2,10}', # Romania
"SE": r'SE\d{12}', # Sweden
"SI": r'SI\d{8}', # Slovenia
"SK": r'SK\d{10}', # Slovakia
"GB": r'GB\d{9,12}', # UK (pre-Brexit, still used)
}
# Combined EU VAT pattern
EU_VAT_COMBINED = '|'.join(f'({pattern})' for pattern in EU_VAT_PATTERNS.values())
# Greek character transliteration (for detecting Greek names/text)
GREEK_TO_LATIN = {
'α': 'a', 'β': 'b', 'γ': 'g', 'δ': 'd', 'ε': 'e', 'ζ': 'z', 'η': 'h',
'θ': 'th', 'ι': 'i', 'κ': 'k', 'λ': 'l', 'μ': 'm', 'ν': 'n', 'ξ': 'x',
'ο': 'o', 'π': 'p', 'ρ': 'r', 'σ': 's', 'ς': 's', 'τ': 't', 'υ': 'y',
'φ': 'f', 'χ': 'ch', 'ψ': 'ps', 'ω': 'o',
'Α': 'A', 'Β': 'B', 'Γ': 'G', 'Δ': 'D', 'Ε': 'E', 'Ζ': 'Z', 'Η': 'H',
'Θ': 'Th', 'Ι': 'I', 'Κ': 'K', 'Λ': 'L', 'Μ': 'M', 'Ν': 'N', 'Ξ': 'X',
'Ο': 'O', 'Π': 'P', 'Ρ': 'R', 'Σ': 'S', 'Τ': 'T', 'Υ': 'Y',
'Φ': 'F', 'Χ': 'Ch', 'Ψ': 'Ps', 'Ω': 'O',
# Accented variants
'ά': 'a', 'έ': 'e', 'ή': 'h', 'ί': 'i', 'ό': 'o', 'ύ': 'y', 'ώ': 'o',
'Ά': 'A', 'Έ': 'E', 'Ή': 'H', 'Ί': 'I', 'Ό': 'O', 'Ύ': 'Y', 'Ώ': 'O',
}
# Cyrillic character transliteration
CYRILLIC_TO_LATIN = {
'а': 'a', 'б': 'b', 'в': 'v', 'г': 'g', 'д': 'd', 'е': 'e', 'ё': 'yo',
'ж': 'zh', 'з': 'z', 'и': 'i', 'й': 'y', 'к': 'k', 'л': 'l', 'м': 'm',
'н': 'n', 'о': 'o', 'п': 'p', 'р': 'r', 'с': 's', 'т': 't', 'у': 'u',
'ф': 'f', 'х': 'kh', 'ц': 'ts', 'ч': 'ch', 'ш': 'sh', 'щ': 'shch',
'ъ': '', 'ы': 'y', 'ь': '', 'э': 'e', 'ю': 'yu', 'я': 'ya',
'А': 'A', 'Б': 'B', 'В': 'V', 'Г': 'G', 'Д': 'D', 'Е': 'E', 'Ё': 'Yo',
'Ж': 'Zh', 'З': 'Z', 'И': 'I', 'Й': 'Y', 'К': 'K', 'Л': 'L', 'М': 'M',
'Н': 'N', 'О': 'O', 'П': 'P', 'Р': 'R', 'С': 'S', 'Т': 'T', 'У': 'U',
'Ф': 'F', 'Х': 'Kh', 'Ц': 'Ts', 'Ч': 'Ch', 'Ш': 'Sh', 'Щ': 'Shch',
'Ъ': '', 'Ы': 'Y', 'Ь': '', 'Э': 'E', 'Ю': 'Yu', 'Я': 'Ya',
}
def transliterate_text(text: str) -> str:
"""Transliterate Greek and Cyrillic characters to Latin."""
result = text
# Apply Greek transliteration
for greek, latin in GREEK_TO_LATIN.items():
result = result.replace(greek, latin)
# Apply Cyrillic transliteration
for cyrillic, latin in CYRILLIC_TO_LATIN.items():
result = result.replace(cyrillic, latin)
return result
def detect_eu_vat_numbers(text: str) -> list[tuple[str, str, int, int]]:
"""
Detect EU VAT numbers in text.
Returns list of (vat_number, country_code, start, end)
"""
vat_matches = []
for country, pattern in EU_VAT_PATTERNS.items():
for match in re.finditer(pattern, text):
vat_matches.append((match.group(), country, match.start(), match.end()))
return vat_matches
def has_non_latin_chars(text: str) -> bool:
"""Check if text contains non-Latin characters (Greek, Cyrillic, etc.)"""
# Check for Greek (U+0370U+03FF) or Cyrillic (U+0400U+04FF)
return bool(re.search(r'[\u0370-\u03FF\u0400-\u04FF]', text))
# ============================================================================
# LAYER 3: Context-Based Confidence Scoring
# ============================================================================
# Context keywords that boost confidence when found near PII
CONTEXT_KEYWORDS = {
"SSN": ["ssn", "social security", "social sec", "ss#", "ss #", "ss number"],
"CREDIT_CARD": ["card", "credit", "debit", "visa", "mastercard", "amex", "payment", "cc#", "card number"],
"EMAIL": ["email", "e-mail", "mail", "contact", "reach", "@"],
"PHONE_US": ["phone", "tel", "telephone", "mobile", "cell", "call", "fax", "contact"],
"PHONE_INTL": ["phone", "tel", "telephone", "mobile", "cell", "call", "fax", "contact"],
"IP_ADDRESS": ["ip", "address", "server", "host", "network"],
"IP_DEFANGED": ["ip", "address", "server", "host", "network", "indicator", "ioc"],
"PASSWORD_IN_URL": ["password", "pwd", "pass", "secret", "credential", "login", "auth"],
"IBAN": ["iban", "bank", "account", "transfer", "payment", "wire"],
"BANK_ACCOUNT": ["account", "bank", "routing", "aba", "deposit"],
"AWS_ACCESS_KEY": ["aws", "amazon", "access", "key", "iam", "credential"],
"GITHUB_TOKEN": ["github", "token", "api", "pat", "access"],
"STRIPE_KEY": ["stripe", "payment", "api", "key", "publishable", "secret"],
"MEDICARE_ID": ["medicare", "mbi", "beneficiary", "cms", "health"],
"DATE_OF_BIRTH": ["dob", "birth", "born", "birthday", "age"],
"GEO_COORDINATES": ["location", "coordinates", "lat", "lng", "latitude", "longitude", "gps"],
"AUTH_CODE": ["auth", "code", "otp", "verification", "2fa", "mfa"],
}
# Anti-context: keywords that reduce confidence (false positive indicators)
ANTI_CONTEXT_KEYWORDS = {
"CREDIT_CARD": ["test", "example", "sample", "demo", "fake", "dummy", "xxxx", "part", "sku", "isbn", "serial", "order", "invoice", "ref", "batch", "processing"],
"SSN": ["test", "example", "sample", "demo", "fake", "dummy", "000-00-0000", "123-45-6789", "part", "sku", "serial", "order", "ref", "batch", "processing", "validation"],
"PHONE_US": ["test", "example", "555-", "000-000", "sample", "demo", "fake"],
"IP_ADDRESS": ["example", "test", "localhost", "127.0.0.1", "0.0.0.0"],
"EMAIL": ["example.com", "test.com", "example.org", "noreply", "sample", "demo"],
"DATE_OF_BIRTH": ["batch", "created", "updated", "modified", "expires", "valid", "effective", "due", "ship", "order", "processing"],
"IBAN": ["test", "example", "sample", "demo", "fake", "dummy", "validation"],
}
def is_future_date(date_str: str) -> bool:
"""Check if a date string represents a future date (not a valid DOB)."""
from datetime import datetime, date
current_year = datetime.now().year
current_date = date.today()
# Try to extract year from common date formats
full_year_match = re.search(r'((?:19|20)\d{2})', date_str)
if full_year_match:
year = int(full_year_match.group(1))
# Future year = definitely not a birth date
if year > current_year:
return True
# Current year - check if the full date is in the future
if year == current_year:
# Try to parse the full date
# Common formats: MM-DD-YYYY, MM/DD/YYYY, YYYY-MM-DD
date_patterns = [
(r'(\d{1,2})[-/](\d{1,2})[-/](\d{4})', 'MDY'), # MM-DD-YYYY
(r'(\d{4})[-/](\d{1,2})[-/](\d{1,2})', 'YMD'), # YYYY-MM-DD
]
for pattern, fmt in date_patterns:
match = re.search(pattern, date_str)
if match:
try:
if fmt == 'MDY':
month, day, _ = int(match.group(1)), int(match.group(2)), int(match.group(3))
else: # YMD
_, month, day = int(match.group(1)), int(match.group(2)), int(match.group(3))
parsed_date = date(year, month, day)
if parsed_date > current_date:
return True
except ValueError:
pass # Invalid date
# Very old date (before 1900) = probably not a birth date
if year < 1900:
return True
return False
def apply_context_scoring(text: str, entities: list[PIIEntity], window: int = 100) -> list[PIIEntity]:
"""
Layer 3: Adjust confidence scores based on surrounding context.
Args:
text: Original text
entities: List of detected PII entities
window: Number of characters to look around each entity
Returns:
Entities with adjusted confidence scores
"""
text_lower = text.lower()
adjusted_entities = []
for entity in entities:
# Get context window around the entity
start = max(0, entity.start - window)
end = min(len(text), entity.end + window)
context = text_lower[start:end]
confidence_adjustment = 0.0
# Check for boosting keywords
if entity.type in CONTEXT_KEYWORDS:
for keyword in CONTEXT_KEYWORDS[entity.type]:
if keyword in context:
confidence_adjustment += 0.10 # Boost 10% per matching keyword (increased)
break # Only apply once per type
# Check for anti-context (false positive indicators)
if entity.type in ANTI_CONTEXT_KEYWORDS:
for keyword in ANTI_CONTEXT_KEYWORDS[entity.type]:
if keyword in context:
confidence_adjustment -= 0.30 # Reduce 30% for test/example data (increased penalty)
break
# LOGIC GATE: Date validation - future dates cannot be birth dates
if entity.type == "DATE_OF_BIRTH":
if is_future_date(entity.value):
confidence_adjustment -= 0.50 # Heavily penalize future dates
# Apply adjustment (cap between 0.3 and 0.99)
new_confidence = max(0.3, min(0.99, entity.confidence + confidence_adjustment))
adjusted_entities.append(PIIEntity(
type=entity.type,
value=entity.value,
start=entity.start,
end=entity.end,
confidence=new_confidence,
masked_value=entity.masked_value
))
return adjusted_entities
# ============================================================================
# LAYER 4: Checksum Validation Functions
# ============================================================================
def validate_iban(iban: str) -> bool:
"""
Validate IBAN using MOD-97 checksum algorithm (ISO 7064).
Returns True if valid, False otherwise.
"""
# Remove spaces and convert to uppercase
iban = re.sub(r'\s', '', iban).upper()
# Check minimum length
if len(iban) < 15:
return False
# Move first 4 chars to end
rearranged = iban[4:] + iban[:4]
# Convert letters to numbers (A=10, B=11, ..., Z=35)
numeric = ''
for char in rearranged:
if char.isdigit():
numeric += char
elif char.isalpha():
numeric += str(ord(char) - ord('A') + 10)
else:
return False # Invalid character
# Perform MOD-97 check
try:
return int(numeric) % 97 == 1
except ValueError:
return False
def validate_luhn(number: str) -> bool:
"""Validate number using Luhn algorithm (credit cards, etc.)"""
digits = [int(d) for d in re.sub(r'\D', '', number)]
if len(digits) < 8:
return False
checksum = 0
for i, d in enumerate(reversed(digits)):
if i % 2 == 1:
d *= 2
if d > 9:
d -= 9
checksum += d
return checksum % 10 == 0
# ============================================================================
# Multi-Layer Detection Pipeline
# ============================================================================
def detect_pii_multilayer(text: str, config: Optional[ScanConfig] = None, coordinates_only: bool = False) -> list[PIIEntity]:
"""
Multi-layer PII detection pipeline (v1.1):
- Layer 2.0: Pre-processing (Unicode NFKC + HTML entity decode)
- Layer 1: Standard regex matching (high-speed DuckDB-compatible)
- Layer 2: Text normalization to catch obfuscated PII
- Layer 2.5: JSON blob extraction and deep scanning
- Layer 2.6: Base64 auto-decoding (with recursive decode + JSON scan)
- Layer 2.7: Spelled-out number detection (NLP-lite)
- Layer 2.8: Non-Latin character support (Greek, Cyrillic)
- Layer 3: Context-based confidence adjustment
- Layer 4: Checksum verification (Luhn, IBAN MOD-97)
- Layer 5: False positive filtering
"""
if config is None:
config = ScanConfig()
# Layer 2.0: Pre-processing - Unicode NFKC + HTML entity decode (v1.1)
preprocessed_text, preprocessing_log = preprocess_text(text)
# For coordinates_only mode: if preprocessing changed text length,
# we need to use original text for detection to get accurate coordinates
if coordinates_only and len(preprocessed_text) != len(text):
# Text length changed during preprocessing - use original text for accurate coords
detection_text = text
else:
detection_text = preprocessed_text
# Layer 1: Standard detection
entities = detect_pii_standard(detection_text, config)
# Layer 2: Normalize text and re-scan for hidden PII
normalized_text, transform_info = normalize_text(text)
if normalized_text != text:
# Only re-scan if normalization changed something
normalized_entities = detect_pii_standard(normalized_text, config)
# Mark normalized entities and add unique ones
existing_values = {e.value.lower() for e in entities}
for entity in normalized_entities:
if entity.value.lower() not in existing_values:
# This is a newly discovered entity from normalization
# Reduce confidence slightly as it required normalization
entity_dict = entity.model_dump()
entity_dict['confidence'] = entity.confidence * 0.95
entity_dict['type'] = f"{entity.type}_NORMALIZED"
entities.append(PIIEntity(**entity_dict))
existing_values.add(entity.value.lower())
# Layer 2.5: JSON Blob Extraction & Deep Scanning
existing_values = {e.value.lower() for e in entities}
json_blobs = extract_json_strings(text)
for json_str, start_pos, end_pos in json_blobs:
extracted_values = deep_scan_json(json_str)
for extracted in extracted_values:
# Scan extracted JSON values for PII
json_entities = detect_pii_standard(extracted, config)
for entity in json_entities:
if entity.value.lower() not in existing_values:
# Mark as found within JSON, adjust positions to original text
entities.append(PIIEntity(
type=f"{entity.type}_IN_JSON",
value=entity.value,
start=start_pos, # Use JSON blob position
end=end_pos,
confidence=entity.confidence * 0.90, # Slight reduction for nested detection
masked_value=entity.masked_value
))
existing_values.add(entity.value.lower())
# Layer 2.6: Base64 Auto-Decoding (v1.1 - recursive + JSON scan)
decoded_b64_strings = decode_base64_strings(preprocessed_text)
for original_b64, decoded_text, start_pos, end_pos, depth in decoded_b64_strings:
# Scan decoded base64 content for PII/secrets
b64_entities = detect_pii_standard(decoded_text, config)
# Also scan decoded JSON content if present
if decoded_text.strip().startswith('{') or decoded_text.strip().startswith('['):
try:
json_values = deep_scan_json(decoded_text)
for json_val in json_values:
json_entities = detect_pii_standard(json_val, config)
for je in json_entities:
if je.value.lower() not in existing_values:
entities.append(PIIEntity(
type=f"{je.type}_BASE64_JSON",
value=je.value,
start=start_pos,
end=end_pos,
confidence=je.confidence * 0.80, # Lower for double-nested
masked_value=je.masked_value
))
existing_values.add(je.value.lower())
except Exception:
pass
for entity in b64_entities:
if entity.value.lower() not in existing_values:
# Mark as found in base64 encoded content
depth_suffix = f"_DEPTH{depth}" if depth > 1 else ""
entities.append(PIIEntity(
type=f"{entity.type}_BASE64{depth_suffix}",
value=entity.value,
start=start_pos,
end=end_pos,
confidence=entity.confidence * (0.85 ** depth), # Reduce more for deeper encoding
masked_value=entity.masked_value
))
existing_values.add(entity.value.lower())
# Also flag the base64 string itself if it decoded to sensitive content
if b64_entities:
entities.append(PIIEntity(
type="BASE64_ENCODED_SECRET",
value=original_b64[:50] + "..." if len(original_b64) > 50 else original_b64,
start=start_pos,
end=end_pos,
confidence=0.85,
masked_value="[BASE64 ENCODED CONTENT REDACTED]"
))
# Layer 2.7: Spelled-Out Number Detection
converted_text, conversions = convert_spelled_numbers(text)
if conversions:
# Scan the converted text for SSN patterns
for original, converted, start_pos, end_pos in conversions:
# Check if converted looks like an SSN (XXX-XX-XXXX format)
ssn_match = re.match(r'^(\d{3})-(\d{2})-(\d{4})$', converted)
if ssn_match:
entities.append(PIIEntity(
type="SSN_SPELLED_OUT",
value=converted,
start=start_pos,
end=end_pos,
confidence=0.90, # High confidence for spelled-out SSN
masked_value=f"***-**-{ssn_match.group(3)}"
))
# Layer 2.8: Non-Latin Character Support
if has_non_latin_chars(text):
# Transliterate and re-scan
transliterated = transliterate_text(text)
if transliterated != text:
trans_entities = detect_pii_standard(transliterated, config)
for entity in trans_entities:
if entity.value.lower() not in existing_values:
entities.append(PIIEntity(
type=f"{entity.type}_TRANSLITERATED",
value=entity.value,
start=entity.start,
end=entity.end,
confidence=entity.confidence * 0.90,
masked_value=entity.masked_value
))
existing_values.add(entity.value.lower())
# Layer 3: Apply context-based scoring
entities = apply_context_scoring(text, entities)
# Layer 4: Apply checksum verification for applicable types
verified_entities = []
for entity in entities:
if entity.type == "IBAN":
if validate_iban(entity.value):
# Valid IBAN - boost confidence
verified_entities.append(PIIEntity(
type=entity.type,
value=entity.value,
start=entity.start,
end=entity.end,
confidence=min(0.99, entity.confidence + 0.10),
masked_value=entity.masked_value
))
else:
# Invalid checksum - cap at 30% ceiling (consistent across all types)
verified_entities.append(PIIEntity(
type="POSSIBLE_IBAN_PATTERN",
value=entity.value,
start=entity.start,
end=entity.end,
confidence=0.30, # Consistent 30% ceiling for invalid checksums
masked_value=entity.masked_value
))
elif entity.type == "UUID":
# UUIDs are typically trace IDs, not PII - cap at 40% max confidence
verified_entities.append(PIIEntity(
type=entity.type,
value=entity.value,
start=entity.start,
end=entity.end,
confidence=min(0.40, entity.confidence), # Cap UUID confidence at 40%
masked_value=entity.masked_value
))
elif entity.type == "CREDIT_CARD" or entity.type == "CREDIT_CARD_NORMALIZED":
# LOGIC GATE: Apply Luhn validation - reclassify invalid credit cards
if validate_luhn(entity.value):
# Valid credit card - boost confidence
verified_entities.append(PIIEntity(
type=entity.type,
value=entity.value,
start=entity.start,
end=entity.end,
confidence=min(0.99, entity.confidence + 0.10),
masked_value=entity.masked_value
))
else:
# Invalid Luhn checksum - reclassify as POSSIBLE_CARD_PATTERN
# Check if context suggests it's test/example data - if so, discard
context_start = max(0, entity.start - 50)
context_end = min(len(text), entity.end + 50)
context = text[context_start:context_end].lower()
# Discard if context clearly indicates test/example data
if any(kw in context for kw in ["test", "example", "sample", "demo", "fake", "dummy"]):
# Skip this entity entirely - it's test data
continue
# Reclassify as POSSIBLE_CARD_PATTERN with 30% ceiling (consistent)
verified_entities.append(PIIEntity(
type="POSSIBLE_CARD_PATTERN",
value=entity.value,
start=entity.start,
end=entity.end,
confidence=0.30, # Consistent 30% ceiling for invalid checksums
masked_value=mask_value(entity.value, "CREDIT_CARD")
))
else:
verified_entities.append(entity)
# Deduplicate by position AND by overlapping normalized variants
# Step 1: Group by exact position
deduplicated = {}
for entity in verified_entities:
key = (entity.start, entity.end)
if key not in deduplicated or entity.confidence > deduplicated[key].confidence:
deduplicated[key] = entity
# Step 2: Merge overlapping entities of same base type (e.g., PHONE_US vs PHONE_US_NORMALIZED)
# This prevents showing both "PHONE_US" and "PHONE_US_NORMALIZED" for the same number
def get_base_type(pii_type: str) -> str:
"""Get base type without suffixes like _NORMALIZED, _IN_JSON, _BASE64, etc."""
suffixes = ['_NORMALIZED', '_IN_JSON', '_BASE64', '_TRANSLITERATED', '_SPELLED_OUT']
for suffix in suffixes:
if pii_type.endswith(suffix):
return pii_type[:-len(suffix)]
return pii_type
def normalize_value_for_comparison(value: str, pii_type: str) -> str:
"""Normalize a value for duplicate comparison (strip formatting)."""
base = get_base_type(pii_type)
if base in ['PHONE_US', 'PHONE_INTL', 'SSN', 'CREDIT_CARD']:
# For these types, compare just the digits
return re.sub(r'\D', '', value)
return value.lower().strip()
def overlaps(e1, e2, threshold: int = 5) -> bool:
"""Check if two entities overlap or are within threshold characters."""
return not (e1.end + threshold < e2.start or e2.end + threshold < e1.start)
def same_normalized_value(e1, e2) -> bool:
"""Check if two entities represent the same underlying value."""
return normalize_value_for_comparison(e1.value, e1.type) == normalize_value_for_comparison(e2.value, e2.type)
# Sort by start position for overlap detection
sorted_entities = sorted(deduplicated.values(), key=lambda x: x.start)
merged = []
for entity in sorted_entities:
base_type = get_base_type(entity.type)
merged_with_existing = False
# Check if this overlaps with any existing entity of same base type
# OR has the same normalized value (handles PHONE_US vs PHONE_US_NORMALIZED with different positions)
for i, existing in enumerate(merged):
existing_base = get_base_type(existing.type)
if base_type == existing_base:
# Merge if overlapping OR same value (for _NORMALIZED variants)
if overlaps(entity, existing) or same_normalized_value(entity, existing):
# Keep the one with higher confidence
if entity.confidence > existing.confidence:
merged[i] = entity
merged_with_existing = True
break
if not merged_with_existing:
merged.append(entity)
deduplicated = {(e.start, e.end): e for e in merged}
# Layer 5: False positive filtering (v1.1)
filtered_entities = []
for entity in deduplicated.values():
# Get context for false positive check
context_start = max(0, entity.start - 50)
context_end = min(len(detection_text), entity.end + 50)
context = detection_text[context_start:context_end]
# Skip if detected as false positive
if is_false_positive(entity.value, entity.type, context):
continue
filtered_entities.append(entity)
# Layer 6: Cross-type overlap resolution
# Sort by confidence (desc), then by length (desc) to prioritize better matches
sorted_by_priority = sorted(filtered_entities, key=lambda x: (-x.confidence, -(x.end - x.start)))
final_entities = []
for entity in sorted_by_priority:
# Check if this entity overlaps with any already accepted entity
overlaps_existing = False
for existing in final_entities:
# Check for any overlap
if not (entity.end <= existing.start or entity.start >= existing.end):
overlaps_existing = True
break
if not overlaps_existing:
final_entities.append(entity)
# Sort by position for output
result = sorted(final_entities, key=lambda x: x.start)
return result
def detect_pii_standard(text: str, config: Optional[ScanConfig] = None) -> list[PIIEntity]:
"""Layer 1: Standard regex-based PII detection"""
if config is None:
config = ScanConfig()
entities = []
# Map config to pattern types
type_mapping = {
"EMAIL": config.detect_emails,
# Phone numbers (US and International)
"PHONE_US": config.detect_phones,
"PHONE_INTL": config.detect_phones,
# Identity documents
"SSN": config.detect_ssn,
"MEDICARE_ID": config.detect_ssn, # Group with SSN as it's US health ID
"GERMAN_ID": True, # Always detect national IDs
"CREDIT_CARD": config.detect_credit_cards,
"IP_ADDRESS": config.detect_ip_addresses,
"DATE_OF_BIRTH": config.detect_dates,
"ZIP_CODE": config.detect_zip_codes,
# Addresses (US and International)
"US_ADDRESS": config.detect_addresses,
"UK_POSTCODE": config.detect_addresses,
"UK_ADDRESS": config.detect_addresses,
"EU_ADDRESS": config.detect_addresses,
"INTERNATIONAL_ADDRESS": config.detect_addresses,
# Financial
"IBAN": config.detect_iban,
"BANK_ACCOUNT": config.detect_iban,
"ROUTING_NUMBER": config.detect_iban,
"SWIFT_BIC": config.detect_iban,
# Medical IDs (always on - high sensitivity)
"MEDICAL_RECORD_NUMBER": True,
"NPI_NUMBER": True,
"DEA_NUMBER": True,
# Secrets - AWS
"AWS_ACCESS_KEY": config.detect_secrets,
"AWS_SECRET_KEY": config.detect_secrets,
# Secrets - GitHub
"GITHUB_TOKEN": config.detect_secrets,
"GITHUB_TOKEN_CLASSIC": config.detect_secrets,
# Secrets - GCP/Azure
"GCP_SERVICE_ACCOUNT": config.detect_secrets,
"GCP_PRIVATE_KEY_ID": config.detect_secrets,
"AZURE_CLIENT_SECRET": config.detect_secrets,
"AZURE_CONNECTION_STRING": config.detect_secrets,
"AZURE_SAS_TOKEN": config.detect_secrets,
# Secrets - Other
"SLACK_TOKEN": config.detect_secrets,
"GENERIC_API_KEY": config.detect_secrets,
"PASSWORD_IN_URL": config.detect_secrets,
"PRIVATE_KEY": config.detect_secrets,
"JWT_TOKEN": config.detect_secrets,
"STRIPE_KEY": config.detect_secrets,
"GOOGLE_API_KEY": config.detect_secrets,
"API_KEY_IN_URL": config.detect_secrets,
"AUTH_CODE": config.detect_secrets,
# Evasion-resistant patterns
"EMAIL_OBFUSCATED": config.detect_emails,
"IP_DEFANGED": config.detect_ip_addresses,
# Identifiers
"UUID": True, # Always detect UUIDs/trace IDs
"GEO_COORDINATES": config.detect_addresses, # Group with addresses
# =========================================================================
# NEW v1.1 PATTERNS
# =========================================================================
# International IDs
"UK_NATIONAL_INSURANCE": True,
"CANADIAN_SIN": True,
"INDIA_AADHAAR": True,
"INDIA_PAN": True,
"AUSTRALIA_TFN": True,
"BRAZIL_CPF": True,
"MEXICO_CURP": True,
"SOUTH_AFRICA_ID": True,
# Additional cloud tokens
"DISCORD_TOKEN": config.detect_secrets,
"DISCORD_WEBHOOK": config.detect_secrets,
"TWILIO_API_KEY": config.detect_secrets,
"TWILIO_AUTH_TOKEN": config.detect_secrets,
"SENDGRID_API_KEY": config.detect_secrets,
"OPENAI_API_KEY": config.detect_secrets,
"ANTHROPIC_API_KEY": config.detect_secrets,
"MAILCHIMP_API_KEY": config.detect_secrets,
"MAILGUN_API_KEY": config.detect_secrets,
"HEROKU_API_KEY": config.detect_secrets,
"SHOPIFY_ACCESS_TOKEN": config.detect_secrets,
"SHOPIFY_SHARED_SECRET": config.detect_secrets,
"NPM_TOKEN": config.detect_secrets,
"PYPI_TOKEN": config.detect_secrets,
"DOCKER_AUTH": config.detect_secrets,
# Crypto addresses
"BITCOIN_ADDRESS": config.detect_iban, # Group with financial
"ETHEREUM_ADDRESS": config.detect_iban,
"MONERO_ADDRESS": config.detect_iban,
# Financial identifiers
"CUSIP": config.detect_iban,
"ISIN": config.detect_iban,
"SEDOL": config.detect_iban,
}
for pii_type, pattern_info in PII_PATTERNS.items():
# Check if this type should be detected
if pii_type in type_mapping and not type_mapping[pii_type]:
continue
pattern = pattern_info["pattern"]
for match in re.finditer(pattern, text, re.IGNORECASE):
value = match.group()
# Calculate confidence based on pattern specificity
confidence = 0.85
if pii_type == "EMAIL":
confidence = 0.95
elif pii_type == "SSN":
# Pattern now requires dashes, so high confidence
confidence = 0.95
elif pii_type in ["PHONE_US", "PHONE_INTL"]:
confidence = 0.85
elif pii_type == "MEDICARE_ID":
confidence = 0.90
elif pii_type == "GERMAN_ID":
confidence = 0.85 # Requires context (ausweis keyword)
elif pii_type == "CREDIT_CARD":
# Validate with Luhn algorithm
if validate_credit_card(value):
confidence = 0.95
else:
confidence = 0.60
elif pii_type == "ZIP_CODE":
confidence = 0.70 # Could be any 5-digit number
elif pii_type in ["US_ADDRESS", "UK_ADDRESS", "EU_ADDRESS", "INTERNATIONAL_ADDRESS"]:
confidence = 0.80
elif pii_type == "UK_POSTCODE":
confidence = 0.85
elif pii_type == "IBAN":
# Validate IBAN length and format
clean_iban = re.sub(r'\s', '', value)
if len(clean_iban) >= 15 and len(clean_iban) <= 34:
confidence = 0.95
else:
confidence = 0.70
elif pii_type in ["BANK_ACCOUNT", "ROUTING_NUMBER"]:
confidence = 0.85
elif pii_type == "SWIFT_BIC":
confidence = 0.80 # Could be other 8/11 char codes
# Cloud provider keys
elif pii_type in ["AWS_ACCESS_KEY", "GITHUB_TOKEN", "GITHUB_TOKEN_CLASSIC", "STRIPE_KEY", "GOOGLE_API_KEY"]:
confidence = 0.95 # Very specific patterns
elif pii_type in ["GCP_SERVICE_ACCOUNT", "GCP_PRIVATE_KEY_ID"]:
confidence = 0.95
elif pii_type in ["AZURE_CONNECTION_STRING", "AZURE_SAS_TOKEN"]:
confidence = 0.95
elif pii_type == "AZURE_CLIENT_SECRET":
confidence = 0.75 # Less specific pattern
elif pii_type in ["AWS_SECRET_KEY", "JWT_TOKEN"]:
confidence = 0.85
elif pii_type in ["PASSWORD_IN_URL", "GENERIC_API_KEY"]:
confidence = 0.80
elif pii_type == "PRIVATE_KEY":
confidence = 0.99
# Medical IDs
elif pii_type in ["MEDICAL_RECORD_NUMBER", "NPI_NUMBER"]:
confidence = 0.90
elif pii_type == "DEA_NUMBER":
confidence = 0.85
# Evasion-resistant patterns
elif pii_type == "EMAIL_OBFUSCATED":
confidence = 0.90 # High - intentional obfuscation is suspicious
elif pii_type == "IP_DEFANGED":
confidence = 0.95 # Very high - defanging is deliberate
elif pii_type == "API_KEY_IN_URL":
confidence = 0.85
elif pii_type == "AUTH_CODE":
confidence = 0.80
elif pii_type == "UUID":
confidence = 0.70 # Could be any GUID
elif pii_type == "GEO_COORDINATES":
confidence = 0.85
# Create masked value
masked_value = mask_value(value, pii_type)
entities.append(PIIEntity(
type=pii_type,
value=value,
start=match.start(),
end=match.end(),
confidence=confidence,
masked_value=masked_value
))
# Sort by position
entities.sort(key=lambda x: x.start)
return entities
def validate_credit_card(number: str) -> bool:
"""Validate credit card using Luhn algorithm"""
digits = [int(d) for d in re.sub(r'\D', '', number)]
if len(digits) < 13 or len(digits) > 19:
return False
checksum = 0
for i, d in enumerate(reversed(digits)):
if i % 2 == 1:
d *= 2
if d > 9:
d -= 9
checksum += d
return checksum % 10 == 0
def mask_value(value: str, pii_type: str) -> str:
"""Mask a PII value based on its type"""
# Handle various suffixes by using base type for masking
base_type = pii_type
for suffix in ["_NORMALIZED", "_IN_JSON", "_BASE64", "_TRANSLITERATED", "_SPELLED_OUT"]:
base_type = base_type.replace(suffix, "")
if base_type == "EMAIL":
parts = value.split('@')
if len(parts) == 2:
return parts[0][:2] + '***@' + parts[1]
return '***@***'
elif base_type in ["PHONE_US", "PHONE_INTL"]:
clean = re.sub(r'\D', '', value)
if len(clean) >= 4:
return '***-***-' + clean[-4:]
return '***-***-****'
elif base_type == "MEDICARE_ID":
return '[MEDICARE ID REDACTED]'
elif base_type == "GERMAN_ID":
return '[GERMAN ID REDACTED]'
elif base_type == "SSN":
return '***-**-' + value[-4:] if len(value) >= 4 else '***-**-****'
elif base_type == "CREDIT_CARD":
clean = re.sub(r'\D', '', value)
return '****-****-****-' + clean[-4:] if len(clean) >= 4 else '****-****-****-****'
elif base_type == "IP_ADDRESS":
parts = value.split('.')
if len(parts) == 4:
return f'{parts[0]}.***.***.*'
return '***.***.***.***'
elif base_type == "DATE_OF_BIRTH":
return '**/**/' + value[-4:] if len(value) >= 4 else '**/**/****'
elif base_type == "IBAN":
# Show country code and last 4 chars
if len(value) >= 6:
return value[:2] + '**' + '*' * (len(value) - 6) + value[-4:]
return '*' * len(value)
elif base_type in ["US_ADDRESS", "UK_ADDRESS", "EU_ADDRESS", "INTERNATIONAL_ADDRESS"]:
# Mask the street number and name
return '[ADDRESS REDACTED]'
elif base_type == "UK_POSTCODE":
return value[:2] + '** ***' if len(value) >= 2 else '[POSTCODE REDACTED]'
elif base_type in ["BANK_ACCOUNT", "ROUTING_NUMBER"]:
# Show last 4 digits
clean = re.sub(r'\D', '', value)
return '******' + clean[-4:] if len(clean) >= 4 else '*' * len(value)
elif base_type == "SWIFT_BIC":
return value[:4] + '****' if len(value) >= 4 else '*' * len(value)
elif base_type in ["AWS_ACCESS_KEY", "AWS_SECRET_KEY"]:
return value[:4] + '*' * (len(value) - 8) + value[-4:] if len(value) > 8 else '*' * len(value)
elif base_type in ["GITHUB_TOKEN", "GITHUB_TOKEN_CLASSIC"]:
return value[:4] + '*' * (len(value) - 8) + value[-4:] if len(value) > 8 else '*' * len(value)
elif base_type in ["STRIPE_KEY", "GOOGLE_API_KEY"]:
return value[:7] + '*' * (len(value) - 11) + value[-4:] if len(value) > 11 else '*' * len(value)
elif base_type == "PASSWORD_IN_URL":
return '[PASSWORD REDACTED]'
elif base_type == "PRIVATE_KEY":
return '[PRIVATE KEY REDACTED]'
elif base_type == "JWT_TOKEN":
return value[:10] + '...[JWT REDACTED]...' + value[-10:] if len(value) > 20 else '[JWT REDACTED]'
elif base_type in ["SLACK_TOKEN", "GENERIC_API_KEY"]:
if len(value) > 8:
return value[:4] + '*' * (len(value) - 8) + value[-4:]
return '*' * len(value)
# GCP/Azure cloud credentials
elif base_type == "GCP_SERVICE_ACCOUNT":
parts = value.split('@')
if len(parts) == 2:
return parts[0][:3] + '***@' + parts[1]
return '[GCP SERVICE ACCOUNT REDACTED]'
elif base_type == "GCP_PRIVATE_KEY_ID":
return '[GCP PRIVATE KEY ID REDACTED]'
elif base_type in ["AZURE_CLIENT_SECRET", "AZURE_CONNECTION_STRING", "AZURE_SAS_TOKEN"]:
return '[AZURE CREDENTIAL REDACTED]'
# Medical IDs
elif base_type in ["MEDICAL_RECORD_NUMBER", "NPI_NUMBER", "DEA_NUMBER"]:
return '[MEDICAL ID REDACTED]'
# Evasion-resistant patterns
elif base_type == "EMAIL_OBFUSCATED":
return '[OBFUSCATED EMAIL REDACTED]'
elif base_type == "IP_DEFANGED":
return '[DEFANGED IP REDACTED]'
elif base_type == "API_KEY_IN_URL":
return '[API KEY IN URL REDACTED]'
elif base_type == "AUTH_CODE":
return '[AUTH CODE REDACTED]'
elif base_type == "UUID":
return value[:8] + '-****-****-****-' + value[-12:] if len(value) >= 36 else '[UUID REDACTED]'
elif base_type == "GEO_COORDINATES":
return '[COORDINATES REDACTED]'
# New advanced detection types
elif base_type == "EU_VAT_NUMBER":
return value[:2] + '***' + value[-4:] if len(value) > 6 else '[VAT REDACTED]'
elif base_type == "SSN_SPELLED_OUT":
return '***-**-' + value[-4:] if len(value) >= 4 else '***-**-****'
elif base_type == "BASE64_ENCODED_SECRET":
return '[BASE64 SECRET REDACTED]'
else:
# Generic masking - show first and last char
if len(value) > 2:
return value[0] + '*' * (len(value) - 2) + value[-1]
return '*' * len(value)
def calculate_risk_level(entities: list[PIIEntity]) -> tuple[str, int]:
"""Calculate risk level based on PII found"""
if not entities:
return "LOW", 0
# Weight by sensitivity
sensitivity_weights = {
# High sensitivity - identity theft risk
"SSN": 100,
"CREDIT_CARD": 95,
"DRIVERS_LICENSE": 85,
"MEDICARE_ID": 90, # Health insurance ID
"GERMAN_ID": 90, # National ID
# Financial
"IBAN": 85,
"BANK_ACCOUNT": 80,
"ROUTING_NUMBER": 75,
"SWIFT_BIC": 60,
# Secrets - security breach risk (AWS)
"AWS_ACCESS_KEY": 100,
"AWS_SECRET_KEY": 100,
"PRIVATE_KEY": 100,
# Secrets - GitHub
"GITHUB_TOKEN": 95,
"GITHUB_TOKEN_CLASSIC": 95,
# Secrets - GCP/Azure
"GCP_SERVICE_ACCOUNT": 95,
"GCP_PRIVATE_KEY_ID": 100,
"AZURE_CLIENT_SECRET": 95,
"AZURE_CONNECTION_STRING": 100,
"AZURE_SAS_TOKEN": 90,
# Secrets - Other
"STRIPE_KEY": 95,
"SLACK_TOKEN": 90,
"GOOGLE_API_KEY": 85,
"JWT_TOKEN": 85,
"PASSWORD_IN_URL": 90,
"GENERIC_API_KEY": 80,
# Medical IDs - HIPAA compliance
"MEDICAL_RECORD_NUMBER": 90,
"NPI_NUMBER": 85,
"DEA_NUMBER": 80,
# Phone numbers
"PHONE_US": 35,
"PHONE_INTL": 35,
# Medium sensitivity - Addresses
"DATE_OF_BIRTH": 50,
"US_ADDRESS": 55,
"UK_ADDRESS": 55,
"EU_ADDRESS": 55,
"INTERNATIONAL_ADDRESS": 50,
"UK_POSTCODE": 40,
# Lower sensitivity
"EMAIL": 40,
"IP_ADDRESS": 30,
"ZIP_CODE": 20,
# Possible patterns (failed validation but still flagged)
"POSSIBLE_CARD_PATTERN": 25, # Low - failed Luhn, likely not a real card
# Advanced detection types
"EU_VAT_NUMBER": 70,
"SSN_SPELLED_OUT": 95, # High - intentional evasion
"BASE64_ENCODED_SECRET": 90, # High - deliberately hidden
# JSON embedded types inherit from base type + 5
# Base64 decoded types inherit from base type + 5
# Transliterated types inherit from base type
}
total_score = 0
for entity in entities:
# Handle various suffixes by looking up base type
base_type = entity.type
for suffix in ["_NORMALIZED", "_IN_JSON", "_BASE64", "_TRANSLITERATED", "_SPELLED_OUT"]:
base_type = base_type.replace(suffix, "")
# Get weight - add bonus for encoded/hidden PII (evasion attempts)
weight = sensitivity_weights.get(base_type, sensitivity_weights.get(entity.type, 25))
# Bonus for evasion techniques (intentionally hidden PII is more suspicious)
if "_IN_JSON" in entity.type or "_BASE64" in entity.type:
weight = min(100, weight + 10) # Cap at 100
total_score += weight * entity.confidence
# Normalize score (0-100)
risk_score = min(100, int(total_score / max(1, len(entities)) + len(entities) * 5))
if risk_score >= 70:
return "CRITICAL", risk_score
elif risk_score >= 50:
return "HIGH", risk_score
elif risk_score >= 30:
return "MEDIUM", risk_score
else:
return "LOW", risk_score
def redact_text(text: str, entities: list[PIIEntity], mode: str = "mask") -> str:
"""Redact PII from text"""
if not entities:
return text
# Sort by position in reverse to not mess up indices
sorted_entities = sorted(entities, key=lambda x: x.start, reverse=True)
result = text
for entity in sorted_entities:
if mode == "mask":
replacement = entity.masked_value
elif mode == "remove":
replacement = "[REDACTED]"
elif mode == "type":
replacement = f"[{entity.type}]"
else:
replacement = entity.masked_value
result = result[:entity.start] + replacement + result[entity.end:]
return result
# Alias for backwards compatibility
detect_pii = detect_pii_multilayer
@router.post("/scan-text")
async def scan_text(
text: str = Form(...),
detect_emails: bool = Form(True),
detect_phones: bool = Form(True),
detect_ssn: bool = Form(True),
detect_credit_cards: bool = Form(True),
detect_ip_addresses: bool = Form(True),
detect_dates: bool = Form(True),
detect_addresses: bool = Form(True),
detect_iban: bool = Form(True),
detect_secrets: bool = Form(True),
coordinates_only: bool = Form(False)
):
"""
Scan text for PII and secrets using multi-layer detection.
Security Options:
- coordinates_only: If True, returns only PII positions (start, end, type, confidence)
without the actual values. The frontend can then perform client-side masking
using these coordinates, ensuring the backend never "sees" the raw PII.
This is useful for ultra-sensitive data where even the backend shouldn't
have access to actual PII values.
"""
# Normalize CRLF to LF when returning coordinates for client-side redaction
# Browser FormData converts LF->CRLF per RFC 7578, but frontend uses LF text
if coordinates_only:
text = text.replace("\r\n", "\n")
config = ScanConfig(
detect_emails=detect_emails,
detect_phones=detect_phones,
detect_ssn=detect_ssn,
detect_credit_cards=detect_credit_cards,
detect_ip_addresses=detect_ip_addresses,
detect_dates=detect_dates,
detect_addresses=detect_addresses,
detect_iban=detect_iban,
detect_secrets=detect_secrets
)
entities = detect_pii_multilayer(text, config, coordinates_only=coordinates_only)
# Group by type
entities_by_type = {}
for entity in entities:
if entity.type not in entities_by_type:
entities_by_type[entity.type] = 0
entities_by_type[entity.type] += 1
risk_level, risk_score = calculate_risk_level(entities)
# Coordinates-only mode: Strip actual PII values for client-side redaction
if coordinates_only:
# Return only positions and types - no actual PII values
# Frontend will use these coordinates to mask text client-side
entities_coords = [
{
"type": e.type,
"start": e.start,
"end": e.end,
"confidence": e.confidence,
"length": e.end - e.start # So frontend knows how many chars to mask
}
for e in entities
]
return {
"total_entities": len(entities),
"entities_by_type": entities_by_type,
"entities": entities_coords, # Coordinates only - no values!
"risk_level": risk_level,
"risk_score": risk_score,
"redacted_preview": None, # Frontend handles redaction
"coordinates_only": True
}
# Standard mode: Return full entity details including values
redacted_preview = redact_text(text, entities, "mask")
return {
"total_entities": len(entities),
"entities_by_type": entities_by_type,
"entities": [e.model_dump() for e in entities],
"risk_level": risk_level,
"risk_score": risk_score,
"redacted_preview": redacted_preview,
"coordinates_only": False
}
@router.post("/scan-file")
async def scan_file(file: UploadFile = File(...)):
"""Scan a file for PII (CSV, TXT, JSON)"""
try:
content = await file.read()
filename = file.filename.lower()
if filename.endswith('.csv'):
# Use duckdb to read CSV and extract all text
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as tmp:
tmp.write(content)
tmp_path = tmp.name
try:
con = duckdb.connect(':memory:')
result = con.execute(f"SELECT * FROM read_csv_auto('{tmp_path}')").fetchall()
columns = con.execute(f"DESCRIBE SELECT * FROM read_csv_auto('{tmp_path}')").fetchall()
all_text = ' '.join(str(cell) for row in result for cell in row if cell is not None)
con.close()
finally:
os.unlink(tmp_path)
elif filename.endswith('.json'):
# Use duckdb to read JSON
with tempfile.NamedTemporaryFile(delete=False, suffix='.json') as tmp:
tmp.write(content)
tmp_path = tmp.name
try:
con = duckdb.connect(':memory:')
result = con.execute(f"SELECT * FROM read_json_auto('{tmp_path}')").fetchall()
all_text = ' '.join(str(cell) for row in result for cell in row if cell is not None)
con.close()
finally:
os.unlink(tmp_path)
else:
# Treat as text file
all_text = content.decode('utf-8', errors='ignore')
entities = detect_pii(all_text)
# Group by type
entities_by_type = {}
for entity in entities:
if entity.type not in entities_by_type:
entities_by_type[entity.type] = 0
entities_by_type[entity.type] += 1
risk_level, risk_score = calculate_risk_level(entities)
# Limit preview length
preview_text = all_text[:2000] if len(all_text) > 2000 else all_text
redacted_preview = redact_text(preview_text, [e for e in entities if e.end <= 2000], "mask")
return {
"filename": file.filename,
"total_entities": len(entities),
"entities_by_type": entities_by_type,
"entities": [e.model_dump() for e in entities[:100]], # Limit to first 100
"risk_level": risk_level,
"risk_score": risk_score,
"redacted_preview": redacted_preview
}
except Exception as e:
raise HTTPException(status_code=400, detail=f"Could not process file: {str(e)}")
@router.post("/scan-dataframe")
async def scan_dataframe(file: UploadFile = File(...)):
"""Scan a CSV/Excel file and analyze each column for PII"""
try:
content = await file.read()
filename = file.filename.lower()
# Determine file extension and create temp file
if filename.endswith('.csv'):
suffix = '.csv'
elif filename.endswith(('.xls', '.xlsx')):
suffix = '.xlsx' if filename.endswith('.xlsx') else '.xls'
else:
raise HTTPException(status_code=400, detail="Unsupported file format. Use CSV or Excel.")
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp.write(content)
tmp_path = tmp.name
try:
con = duckdb.connect(':memory:')
# Read file based on type
if suffix == '.csv':
# Get column names and data
schema = con.execute(f"DESCRIBE SELECT * FROM read_csv_auto('{tmp_path}')").fetchall()
columns = [col[0] for col in schema]
data = con.execute(f"SELECT * FROM read_csv_auto('{tmp_path}')").fetchall()
else:
# For Excel files, duckdb needs the spatial extension or we use a workaround
# DuckDB can read xlsx via read_xlsx (requires extension) - fall back to CSV-like approach
try:
schema = con.execute(f"DESCRIBE SELECT * FROM st_read('{tmp_path}')").fetchall()
columns = [col[0] for col in schema]
data = con.execute(f"SELECT * FROM st_read('{tmp_path}')").fetchall()
except:
# If spatial extension not available, try xlsx reader
schema = con.execute(f"DESCRIBE SELECT * FROM read_xlsx('{tmp_path}')").fetchall()
columns = [col[0] for col in schema]
data = con.execute(f"SELECT * FROM read_xlsx('{tmp_path}')").fetchall()
total_rows = len(data)
total_columns = len(columns)
column_results = []
total_pii = 0
for col_idx, col in enumerate(columns):
# Extract all values for this column
col_values = [str(row[col_idx]) for row in data if row[col_idx] is not None]
col_text = ' '.join(col_values)
entities = detect_pii(col_text)
# Group by type
entities_by_type = {}
for entity in entities:
if entity.type not in entities_by_type:
entities_by_type[entity.type] = 0
entities_by_type[entity.type] += 1
pii_count = len(entities)
total_pii += pii_count
column_results.append({
"column": col,
"pii_count": pii_count,
"pii_types": entities_by_type,
"sample_pii": [e.model_dump() for e in entities[:3]] if entities else []
})
con.close()
finally:
os.unlink(tmp_path)
# Sort by PII count descending
column_results.sort(key=lambda x: x["pii_count"], reverse=True)
risk_level = "CRITICAL" if total_pii > 50 else "HIGH" if total_pii > 20 else "MEDIUM" if total_pii > 5 else "LOW"
return {
"filename": file.filename,
"total_rows": total_rows,
"total_columns": total_columns,
"total_pii_found": total_pii,
"risk_level": risk_level,
"columns_with_pii": len([c for c in column_results if c["pii_count"] > 0]),
"column_analysis": column_results
}
except Exception as e:
raise HTTPException(status_code=400, detail=f"Could not process file: {str(e)}")
@router.post("/redact")
async def redact_text_endpoint(
text: str = Form(...),
mode: str = Form("mask")
):
"""Redact PII from text"""
entities = detect_pii(text)
redacted = redact_text(text, entities, mode)
return {
"original_length": len(text),
"redacted_length": len(redacted),
"entities_redacted": len(entities),
"redacted_text": redacted
}
@router.get("/entity-types")
async def list_entity_types():
"""List supported PII entity types"""
return {
"entity_types": [
{"type": key, "description": value["description"]}
for key, value in PII_PATTERNS.items()
]
}