diff --git a/Dockerfile.analyzer b/Dockerfile.analyzer index ebb1d47..7371432 100644 --- a/Dockerfile.analyzer +++ b/Dockerfile.analyzer @@ -31,7 +31,7 @@ COPY . /app/ # Définir la variable d'environnement pour que Presidio trouve notre fichier de configuration # Dit à Presidio : "Ton fichier de config est ici" -ENV PRESIDIO_ANALYZER_CONFIG_FILE=/app/conf/default.yaml +ENV PRESIDIO_ANALYZER_CONFIG_FILE=/app/conf/main.yaml # Exposer le port que Gunicorn va utiliser EXPOSE 5001 diff --git a/app.py b/app.py index bbcbac8..4ea6fd6 100644 --- a/app.py +++ b/app.py @@ -1,59 +1,83 @@ import os -import re import logging +import re +import yaml from flask import Flask, request, jsonify, make_response - from presidio_analyzer import AnalyzerEngineProvider +from config_loader import ConfigLoader +from presidio_anonymizer import AnonymizerEngine +from presidio_anonymizer.entities import OperatorConfig +from entity_refiners import EntityRefinerManager +from pipeline_manager import AnalysisPipeline +# Initialisation logger logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) app = Flask(__name__) -# Chargement du moteur - +refiner_manager = EntityRefinerManager() analyzer = None +allow_list_terms = set() + try: - logger.info("--- Presidio Analyzer Service Starting ---") - CONFIG_FILE_PATH = os.environ.get("PRESIDIO_ANALYZER_CONFIG_FILE", "conf/default.yaml") - provider = AnalyzerEngineProvider(analyzer_engine_conf_file=CONFIG_FILE_PATH) - analyzer = provider.create_engine() + logger.info("--- Presidio Analyzer Service Starting (Architecture Modulaire) ---") + config_loader = ConfigLoader() + try: + config = config_loader.load_config("main.yaml") + logger.info("✅ Configuration modulaire chargée avec succès") + + allow_list_terms = set(term.lower().strip() for term in config.get('allow_list', [])) + logger.info(f"✅ Allow list chargée avec {len(allow_list_terms)} termes") + + recognizers_count = len(config.get('recognizer_registry', {}).get('recognizers', [])) + logger.info(f"📊 Nombre de recognizers chargés: {recognizers_count}") + + import tempfile + + # Écriture fichier temporaire config pour Presidio + with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False, encoding='utf-8') as tmp_file: + yaml.dump(config, tmp_file, default_flow_style=False, allow_unicode=True) + temp_config_path = tmp_file.name + + with open(temp_config_path, 'r', encoding='utf-8') as f: + temp_content = f.read() + logger.info(f"🔍 Contenu du fichier temporaire COMPLET:\n{temp_content[:1000]}") + + if 'nlp_configuration' in config: + logger.info("✅ nlp_configuration trouvée") + else: + logger.warning("❌ nlp_configuration MANQUANTE dans la config finale") + + provider = AnalyzerEngineProvider(analyzer_engine_conf_file=temp_config_path) + analyzer = provider.create_engine() + os.unlink(temp_config_path) + + except Exception as e: + logger.error(f"❌ Erreur avec la config modulaire: {e}") + logger.warning("🔄 Fallback vers default.yaml") + CONFIG_FILE_PATH = os.environ.get("PRESIDIO_ANALYZER_CONFIG_FILE", "conf/default.yaml") + provider = AnalyzerEngineProvider(analyzer_engine_conf_file=CONFIG_FILE_PATH) + analyzer = provider.create_engine() + logger.info(f"Analyzer ready. Languages: {analyzer.supported_languages}") + except Exception as e: logger.exception("Error during AnalyzerEngine initialization.") analyzer = None -# Test Temporaire pour les Regex via du Python directement - -IBAN_REGEX = re.compile(r"\b[A-Z]{2}[0-9]{2}(?:\s[0-9]{4}){3}\b", re.IGNORECASE) - - -IPV4_REGEX = re.compile( - r"\b(?:(?:25[0-5]|2[0-4][0-9]|1\d{2}|[1-9]?\d)\.){3}" - r"(?:25[0-5]|2[0-4][0-9]|1\d{2}|[1-9]?\d)\b" -) - -# Liste Temporaire en surcouche des labels/phrases à exclure d’anonymisation - -IGNORE_LABELS = { - "témoins", - "témoins clés", - "coordonnées", - "coordonnées bancaires", - "contexte financier", - "données sensibles", - "contexte", - "montrent", - "montrent des", - "montrent des irrégularités", - "bénéficiaire", -} - def normalize_label(text: str) -> str: - return text.strip().lower() + cleaned = re.sub(r'[^\w\s]', '', text.strip().lower()) + return cleaned + + +# Remplacer ligne 18 +pipeline = AnalysisPipeline() + +# Modifier la fonction analyze_text (lignes 73-105) @app.route("/analyze", methods=["POST"]) def analyze_text(): if not analyzer: @@ -67,62 +91,182 @@ def analyze_text(): if not text_to_analyze: return jsonify({"error": "text field is missing or empty"}), 400 - results = analyzer.analyze(text=text_to_analyze, language=language) - - filtered_results = [] - for res in results: - ent_text = text_to_analyze[res.start:res.end].strip() - ent_text_norm = normalize_label(ent_text) - - if ent_text_norm in IGNORE_LABELS: - logger.debug(f"Skipping anonymization of label: '{ent_text}'") - continue - - # Recadrage IBAN - - if res.entity_type == "IBAN": - match = IBAN_REGEX.search(ent_text) - if match: - true_iban = match.group(0) - start_offset = ent_text.find(true_iban) - if start_offset != -1: - old_start, old_end = res.start, res.end - res.start += start_offset - res.end = res.start + len(true_iban) - logger.debug(f"Adjusted IBAN span: {old_start}-{old_end} => {res.start}-{res.end}") - else: - logger.warning(f"IBAN regex match but cannot find substring position: '{ent_text}'") - else: - logger.warning(f"Invalid IBAN detected, skipping: '{ent_text}'") - continue - - # Recadrage IP_ADDRESS - - if res.entity_type == "IP_ADDRESS": - match = IPV4_REGEX.search(ent_text) - if match: - true_ip = match.group(0) - start_offset = ent_text.find(true_ip) - if start_offset != -1: - old_start, old_end = res.start, res.end - res.start += start_offset - res.end = res.start + len(true_ip) - logger.debug(f"Adjusted IP span: {old_start}-{old_end} => {res.start}-{res.end}") - else: - logger.warning(f"IP regex match but cannot find substring position: '{ent_text}'") - else: - logger.warning(f"Invalid IP detected, skipping: '{ent_text}'") - continue - - filtered_results.append(res) - - # Résultat nettoyé - response_data = [res.to_dict() for res in filtered_results] + # Analyse brute + raw_results = analyzer.analyze(text=text_to_analyze, language=language) + + # Pipeline modulaire complet + final_results = pipeline.process(text_to_analyze, raw_results, allow_list_terms) + + response_data = [res.to_dict() for res in final_results] return make_response(jsonify(response_data), 200) except Exception as e: logger.exception("Error processing analysis") return jsonify({"error": str(e)}), 500 + +@app.route("/health", methods=["GET"]) +def health_check(): + if analyzer: + return jsonify({ + "status": "healthy", + "languages": analyzer.supported_languages, + "version": "2.0.0" + }), 200 + else: + return jsonify({"status": "unhealthy", "error": "Analyzer not initialized"}), 503 + + +def load_replacements(): + """Charge les configurations d'anonymisation depuis YAML""" + try: + config_path = "conf/anonymization/replacements.yaml" + if not os.path.exists(config_path): + logger.warning(f"❌ Fichier de configuration non trouvé: {config_path}") + return {} + + with open(config_path, "r", encoding="utf-8") as f: + config = yaml.safe_load(f) + + if not config: + logger.warning("❌ Fichier de configuration vide") + return {} + + anonymizer_config = config.get("anonymizer_config", {}) + replacements = anonymizer_config.get("replacements", {}) + + if not replacements: + logger.warning("❌ Aucun remplacement trouvé dans la configuration") + return {} + + operators = {} + for entity_type, replacement_value in replacements.items(): + try: + operators[entity_type] = OperatorConfig("replace", {"new_value": replacement_value}) + except Exception as e: + logger.error(f"❌ Erreur lors création opérateur {entity_type}: {e}") + continue + + logger.info(f"✅ Loaded {len(operators)} replacement operators from config") + return operators + + except Exception as e: + logger.error(f"❌ Failed to load replacements config: {e}") + return {} + + +# Initialisation anonymizer et opérateurs +try: + anonymizer = AnonymizerEngine() + logger.info("✅ Anonymizer engine initialized successfully") + replacement_operators = load_replacements() + if replacement_operators: + logger.info(f"✅ Loaded {len(replacement_operators)} custom replacement operators") + else: + logger.warning("⚠️ Aucun opérateur remplacement chargé, fallback par défaut") + replacement_operators = {} + +except Exception as e: + logger.error(f"❌ Anonymizer initialization failed: {e}") + anonymizer = None + replacement_operators = {} + + +@app.route("/anonymize", methods=["POST"]) +def anonymize_text(): + logger.error("🚨 ENDPOINT /anonymize APPELÉ") + + global anonymizer, replacement_operators + + if anonymizer is None: + return jsonify({"error": "Anonymizer not initialized"}), 500 + + if not replacement_operators: + logger.warning("⚠️ replacement_operators non défini, rechargement...") + replacement_operators = load_replacements() + + logger.info(f"🔍 Opérateurs disponibles: {list(replacement_operators.keys())}") + + try: + data = request.get_json(force=True) + text_to_anonymize = data.get("text", "") + language = data.get("language", "fr") + mode = data.get("mode", "pii") + + if not text_to_anonymize: + return jsonify({"error": "No text provided"}), 400 + + logger.info(f"🔍 Texte à anonymiser: '{text_to_anonymize}'") + + entities_to_detect = get_entities_by_mode(mode) if 'get_entities_by_mode' in globals() else None + + analyzer_results = analyzer.analyze( + text=text_to_anonymize, + language=language, + entities=entities_to_detect + ) + + logger.info(f"🔍 Entités détectées: {[(r.entity_type, text_to_anonymize[r.start:r.end], r.score) for r in analyzer_results]}") + + filtered_results = [] + for res in analyzer_results: + ent_text = text_to_anonymize[res.start:res.end].strip() + ent_text_norm = normalize_label(ent_text) + + logger.info(f"🔍 Traitement entité: {res.entity_type} = '{ent_text}' (score: {res.score})") + logger.info(f"🔍 Allow list terms: {allow_list_terms}") + + # Vérification améliorée de la allow list + ent_text_clean = re.sub(r'[^\w]', '', ent_text.strip().lower()) + logger.info(f"🔍 Texte nettoyé: '{ent_text_clean}'") + + # Vérifier si le texte correspond exactement ou commence par un terme de la allow list + is_allowed = any(ent_text_clean == term or ent_text_clean.startswith(term) for term in allow_list_terms) + + if is_allowed: + logger.info(f"✅ Entité '{ent_text}' ignorée (dans allow list)") + continue + + refined_positions = refiner_manager.refine_entity(text_to_anonymize, res.entity_type, res.start, res.end) + if refined_positions is None: + logger.info(f"❌ Entité {res.entity_type} supprimée par le refiner") + continue + + res.start, res.end = refined_positions + filtered_results.append(res) + logger.info(f"✅ Entité {res.entity_type} conservée après refinement") + + logger.info(f"🔍 Entités finales pour anonymisation: {[(r.entity_type, text_to_anonymize[r.start:r.end]) for r in filtered_results]}") + + operators_to_use = replacement_operators if replacement_operators else {} + logger.info(f"🔍 Opérateurs utilisés: {list(operators_to_use.keys())}") + + anonymized_result = anonymizer.anonymize( + text=text_to_anonymize, + analyzer_results=filtered_results, + operators=operators_to_use + ) + + logger.info(f"🔍 Résultat anonymisation: '{anonymized_result.text}'") + + return jsonify({ + "original_text": text_to_anonymize, + "anonymized_text": anonymized_result.text, + "entities_found": [ + { + "entity_type": result.entity_type, + "start": result.start, + "end": result.end, + "score": result.score + } for result in filtered_results + ], + "mode": mode + }) + + except Exception as e: + logger.error(f"Error during anonymization: {e}") + return jsonify({"error": str(e)}), 500 + + if __name__ == "__main__": app.run(host="0.0.0.0", port=5001) diff --git a/conf/anonymization/allow_list.yaml b/conf/anonymization/allow_list.yaml new file mode 100644 index 0000000..e6e4572 --- /dev/null +++ b/conf/anonymization/allow_list.yaml @@ -0,0 +1,19 @@ +# Liste blanche - termes à ne pas anonymiser +allow_list: + # Références légales + - Loi + - Code + - Règlement + - Décret + - Arrêté + - BCE + - TVA + - IEC + - expert-comptable + # Termes financiers + - Euro + - EUR + - Euros + - Taux + - Valeur + - Prix diff --git a/conf/anonymization/replacements.yaml b/conf/anonymization/replacements.yaml new file mode 100644 index 0000000..1583740 --- /dev/null +++ b/conf/anonymization/replacements.yaml @@ -0,0 +1,82 @@ +# Configuration d'anonymisation complète +anonymizer_config: + default_anonymizers: + # Entités génériques + PERSON: replace + LOCATION: replace + ORGANIZATION: replace + DATE_TIME: replace + MONEY: replace + EMAIL_ADDRESS: replace + IBAN: replace + IP_ADDRESS: replace + + # PII Génériques - Données sensibles RGPD + HEALTH_DATA: replace + BIOMETRIC_DATA: replace + SEXUAL_ORIENTATION: replace + POLITICAL_OPINIONS: replace + RGPD_FINANCIAL_DATA: replace + + # PII Belges + BE_ENTERPRISE_NUMBER: replace + BE_NATIONAL_REGISTER_NUMBER: replace + BE_PHONE_NUMBER: replace + BE_ADDRESS: replace + BE_ID_CARD: replace + BE_PASSPORT: replace + + # PII Françaises + FR_SOCIAL_SECURITY_NUMBER: replace + FR_SIRET: replace + FR_ADDRESS: replace + FR_TAX_ID: replace + FR_BANK_ACCOUNT: replace + FR_ID_CARD: replace + FR_PASSPORT: replace + FR_DRIVER_LICENSE: replace + + # Business + BE_PROFESSIONAL_ID: replace + MARKET_SHARE: replace + + replacements: + # Entités génériques + PERSON: "[PERSONNE]" + LOCATION: "[LIEU]" + ORGANIZATION: "[ORGANISATION]" + DATE_TIME: "[DATE]" + MONEY: "[MONTANT]" + EMAIL_ADDRESS: "[EMAIL]" + IBAN: "[IBAN]" + IP_ADDRESS: "[ADRESSE_IP]" + + # PII Génériques - Données sensibles RGPD + HEALTH_DATA: "[DONNEES_SANTE]" + BIOMETRIC_DATA: "[DONNEES_BIOMETRIQUES]" + SEXUAL_ORIENTATION: "[ORIENTATION_SEXUELLE]" + POLITICAL_OPINIONS: "[OPINIONS_POLITIQUES]" + RGPD_FINANCIAL_DATA: "[DONNEES_FINANCIERES]" + + # PII Belges + BE_ENTERPRISE_NUMBER: "[ENTREPRISE_BELGE]" + BE_NATIONAL_REGISTER_NUMBER: "[NRN_BELGE]" + BE_PHONE_NUMBER: "[TELEPHONE_BE]" + BE_ADDRESS: "[ADRESSE_BELGE]" + BE_ID_CARD: "[CARTE_ID_BE]" + BE_PASSPORT: "[PASSEPORT_BE]" + + # PII Françaises + FR_SOCIAL_SECURITY_NUMBER: "[NUM_SECU_FR]" + FR_SIRET: "[SIRET_FR]" + FR_ADDRESS: "[ADRESSE_FR]" + FR_TAX_ID: "[NUM_FISCAL_FR]" + FR_BANK_ACCOUNT: "[COMPTE_BANCAIRE_FR]" + FR_ID_CARD: "[CARTE_ID_FR]" + FR_PASSPORT: "[PASSEPORT_FR]" + FR_DRIVER_LICENSE: "[PERMIS_FR]" + + # Business + + BE_PROFESSIONAL_ID: "[ID_PROFESSIONNEL_BE]" + MARKET_SHARE: "[PART_DE_MARCHE]" diff --git a/conf/default.yaml b/conf/default.yaml deleted file mode 100644 index e9ab84c..0000000 --- a/conf/default.yaml +++ /dev/null @@ -1,227 +0,0 @@ -# ======================= -# CONFIGURATION PRESIDIO -# ======================= -supported_languages: [en, fr] - -nlp_configuration: - nlp_engine_name: spacy - models: - - lang_code: en - model_name: en_core_web_lg - - lang_code: fr - model_name: fr_core_news_lg - ner_model_configuration: - labels_to_ignore: - - LOCATION - - MISC - - CARDINAL - - EVENT - - LANGUAGE - - LAW - - ORDINAL - - PERCENT - - PRODUCT - - QUANTITY - - WORK_OF_ART - confidence_thresholds: - DEFAULT_CONFIDENCE: 0.85 - PERSON: 0.85 - ORGANIZATION: 0.55 - -recognizer_registry: - load_predefined_recognizers: true - recognizers: - - name: FlexibleDateRecognizer - supported_language: fr - supported_entity: FLEXIBLE_DATE - patterns: - - name: Date format JJ mois AAAA - regex: "\\b(0?[1-9]|[12][0-9]|3[01])\\s+(janvier|février|mars|avril|mai|juin|juillet|août|septembre|octobre|novembre|décembre)\\s+(19|20)\\d{2}\\b" - score: 1.0 - - name: Date format JJ/MM/AAAA - regex: "\\b(0[1-9]|[12][0-9]|3[01])[-/.](0[1-9]|1[012])[-/.](19|20)\\d{2}\\b" - score: 1.0 - context: ["date", "né le", "signé le", "incident du"] - - - name: BelgianAddressRecognizer - supported_language: fr - supported_entity: BE_ADDRESS - patterns: - - name: Adresse Belge complète - regex: "\\b(?:\\d{1,4}[A-Za-z]?(?:\\s*,)?\\s+)?(?:Avenue|Rue|Boulevard|Chaussée|Place|Quai|Impasse|Drève)(?:\\s+(?:de|la|le|d'|des))?(?:\\s+[A-Z][a-zà-ÿ'-]+)+,?(?:\\s+\\d{1,4}[A-Za-z]?)?,\\s*\\d{4}\\s+[A-Za-zà-ÿ'-]+" - score: 1.0 - context: ["demeurant", "adresse", "siège social", "bureaux situés"] - - - name: BelgianPhoneRecognizer - supported_language: fr - supported_entity: BE_PHONE_NUMBER - patterns: - - name: Numéro téléphone Belge (fixe ou mobile) - regex: "\\b0[1-9](?:[./\\s]?\\d{2,3}){3}\\b" - score: 0.95 - context: ["Tel", "Tél", "téléphone", "gsm", "mobile"] - - - name: SmartOrganizationRecognizer - supported_language: fr - supported_entity: ORGANIZATION - patterns: - - name: Nom + Forme légale (DigitalConsult SPRL) - regex: "\\b([A-Z][a-zà-ÿ]+(?:\\s[A-Z][a-zà-ÿ]+)*)\\s+(SPRL|SRL|SA|SCS|SNC)\\b" - score: 0.9 - - name: Forme légale + Nom (SPRL DigitalConsult) - regex: "\\b(SPRL|SRL|SA|SCS|SNC)\\s+([A-Z][a-zà-ÿ]+(?:\\s[A-Z][a-zà-ÿ]+)*)\\b" - score: 0.9 - context: ["société", "entreprise", "gérant de la"] - - - name: ProfessionalIdRecognizer - supported_language: fr - supported_entity: BE_PRO_ID - patterns: - - name: Numéro IEC - regex: "(n°\\sIEC:?|IEC:?)\\s*\\d{6}" - score: 1.0 - context: ["expert-comptable"] - - - name: BelgianEnterpriseRecognizer - supported_language: fr - supported_entity: BE_ENTERPRISE_NUMBER - patterns: - - name: Numéro BCE/TVA Belge (avec ou sans BE) - regex: "\\b(BE)?\\s?0?\\d{3}[\\.\\s]?\\d{3}[\\.\\s]?\\d{3}\\b" - score: 1.0 - context: ["BCE", "TVA", "intracommunautaire"] - - - name: EmailRecognizer - supported_language: fr - supported_entity: EMAIL_ADDRESS - patterns: - - name: Email Pattern - regex: "\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}\\b" - score: 1.0 - context: ["email", "courriel", "mail"] - - - name: IbanRecognizer - supported_language: fr - supported_entity: IBAN - patterns: - - name: IBAN Pattern - regex: "\\b[A-Z]{2}[0-9]{2}(?:\\s[0-9]{4}){3}\\b" - score: 1.0 - context: ["iban", "compte"] - - - name: BelgianNRNRecognizer - supported_language: fr - supported_entity: BE_NATIONAL_REGISTER_NUMBER - patterns: - - name: NRN Pattern - regex: "\\b[0-9]{2}\\.[0-9]{2}\\.[0-9]{2}-[0-9]{3}\\.[0-9]{2}\\b" - score: 1.0 - context: ["registre national"] - - - name: FrenchINSEERecognizer - supported_language: fr - supported_entity: FR_SOCIAL_SECURITY_NUMBER - patterns: - - name: INSEE Pattern with flexible spaces - regex: "\\b[12]\\s*[0-9]{2}\\s*(?:0[1-9]|1[0-2])\\s*(?:2[ABab]|[0-9]{2})\\s*[0-9]{3}\\s*[0-9]{3}[\\s]?[0-9]{2}\\b" - score: 0.95 - context: ["sécurité sociale", "insee", "nir"] - - - name: IpAddressRecognizer - supported_language: fr - supported_entity: IP_ADDRESS - patterns: - - name: IPv4 - regex: "\\b(?:(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\\.){3}(?:25[0-5]|2[0-4][0-9]|1[0-9]{2}|[1-9]?[0-9])\\b" - score: 1.0 - - name: IPv6 - regex: "\\b([0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\\b" - score: 0.9 - - -allow_list: - - Adresse - - ADRESSE - - Contrat - - Document - - Société - - Investisseur - - Montant - - Prêt - - Intérêt - - Partie - - Parties - - Annexe - - Remboursement - - Conversion - - Financement - - Sortie - - "Juste Valeur Marchande" - - Échéance - - Clause - - Clauses - - Principe - - Coûts - - Notifications - - Article - - Paragraphe - - Directeur - - Gérant - - Président - - DocuSign - - SPRL - - SA - - Loi - - Code - - Règlement - - Décret - - Arrêté - - Euro - - EUR - - Euros - - Taux - - Valeur - - Prix - - Coordonnées - - Témoins - - "Coordonnées bancaires" - - "Témoins clés" - - "montrent" - - "montrent des" - - "montrent des irrégularités" - - "bénéficiaire" - -anonymizer_config: - default_anonymizers: - PERSON: replace - LOCATION: replace - ORGANIZATION: replace - DATE_TIME: replace - MONEY: replace - EMAIL_ADDRESS: replace - IBAN: replace - BE_ENTERPRISE_NUMBER: replace - BE_NATIONAL_REGISTER_NUMBER: replace - FR_SOCIAL_SECURITY_NUMBER: replace - BE_PHONE_NUMBER: replace - FLEXIBLE_DATE: replace - BE_ADDRESS: replace - BE_PRO_ID: replace - IP_ADDRESS: replace - - replacements: - PERSON: "" - LOCATION: "" - ORGANIZATION: "" - DATE_TIME: "" - MONEY: "" - EMAIL_ADDRESS: "" - IBAN: "" - BE_ENTERPRISE_NUMBER: "" - BE_NATIONAL_REGISTER_NUMBER: "" - FR_SOCIAL_SECURITY_NUMBER: "" - BE_PHONE_NUMBER: "" - FLEXIBLE_DATE: "" - BE_ADDRESS: "" - BE_PRO_ID: "" - IP_ADDRESS: "" diff --git a/conf/main.yaml b/conf/main.yaml new file mode 100644 index 0000000..586396c --- /dev/null +++ b/conf/main.yaml @@ -0,0 +1,30 @@ +# ======================= +# CONFIGURATION PRESIDIO MODULAIRE +# ======================= + +# Langues supportées +supported_languages: [en, fr] +default_language: fr + +# Inclusion des modules de configuration +includes: + # Configuration NLP (spaCy préservée) + - nlp/spacy_config.yaml + + # Recognizers PII par dossier (garder uniquement les dossiers récents) + - recognizers/PII/belgian/* + - recognizers/PII/french/* + - recognizers/PII/generic/* + + # Recognizers Business par dossier + - recognizers/Business/belgian/* + - recognizers/Business/french/* + + # Configuration d'anonymisation + - anonymization/* + +# Configuration globale simplifiée +global_settings: + version: "2.0.0" + cache_enabled: true + timeout_seconds: 30 diff --git a/conf/nlp/spacy_config.yaml b/conf/nlp/spacy_config.yaml new file mode 100644 index 0000000..7def878 --- /dev/null +++ b/conf/nlp/spacy_config.yaml @@ -0,0 +1,33 @@ +nlp_configuration: + nlp_engine_name: spacy + models: + - lang_code: en + model_name: en_core_web_lg + - lang_code: fr + model_name: fr_core_news_lg + + # Configuration NER globale (sans confidence_thresholds) + ner_model_configuration: + model_to_presidio_entity_mapping: + PER: PERSON + PERSON: PERSON + ORG: ORGANIZATION + ORGANIZATION: ORGANIZATION + LOC: LOCATION + LOCATION: LOCATION + DATE: DATE_TIME + TIME: DATE_TIME + MISC: DATE_TIME + labels_to_ignore: + - LOCATION + - MISC + - CARDINAL + - EVENT + - LANGUAGE + - LAW + - ORDINAL + - PERCENT + - PRODUCT + - QUANTITY + - WORK_OF_ART + low_score_entity_names: [] diff --git a/conf/recognizers/Business/belgian/enterprise_numbers.yaml b/conf/recognizers/Business/belgian/enterprise_numbers.yaml new file mode 100644 index 0000000..b6da68c --- /dev/null +++ b/conf/recognizers/Business/belgian/enterprise_numbers.yaml @@ -0,0 +1,24 @@ +# Recognizer pour numéros d'entreprise belges +recognizer_registry: + recognizers: + - name: BelgianEnterpriseRecognizer + supported_language: fr + supported_entity: BE_ENTERPRISE_NUMBER + patterns: + - name: Numéro BCE avec deux points + regex: "(?<=\\bBCE\\s*:\\s*)((BE)?\\s?0\\d{3}[\\.\\s]?\\d{3}[\\.\\s]?\\d{3})\\b" + score: 1.0 + - name: Numéro TVA avec deux points + regex: "(?<=\\bTVA\\s*:\\s*)(BE\\d{4}\\.\\d{3}\\.\\d{3})\\b" + score: 1.0 + - name: Numéro d'entreprise général + regex: "(? Dict[str, Any]: + main_config_path = os.path.join(self.config_dir, main_config_file) + + if not os.path.exists(main_config_path): + logger.warning(f"Fichier de configuration principal non trouvé: {main_config_path}") + return self._load_legacy_config() + + with open(main_config_path, 'r', encoding='utf-8') as f: + main_config = yaml.safe_load(f) + + if 'includes' in main_config: + for include_pattern in main_config['includes']: + self._load_includes(include_pattern) + + self._merge_config(main_config) + + logger.info(f"Configuration chargée avec {len(self.config.get('recognizer_registry', {}).get('recognizers', []))} recognizers") + return self.config + + def _load_includes(self, pattern: str): + pattern = os.path.expandvars(pattern) + full_pattern = os.path.join(self.config_dir, pattern) + matching_files = glob.glob(full_pattern, recursive=True) + + for file_path in sorted(matching_files): + if os.path.isfile(file_path) and file_path.endswith('.yaml'): + try: + with open(file_path, 'r', encoding='utf-8') as f: + module_config = yaml.safe_load(f) + if module_config: + self._merge_config(module_config) + logger.debug(f"Module chargé: {file_path}") + except Exception as e: + logger.error(f"Erreur lors du chargement de {file_path}: {e}") + + def _merge_config(self, new_config: Dict[str, Any]): + for key, value in new_config.items(): + if key == 'recognizer_registry': + if 'recognizer_registry' not in self.config: + self.config['recognizer_registry'] = {'recognizers': []} + + if 'recognizers' in value: + self.config['recognizer_registry']['recognizers'].extend(value['recognizers']) + + for reg_key, reg_value in value.items(): + if reg_key != 'recognizers': + self.config['recognizer_registry'][reg_key] = reg_value + + elif key == 'allow_list': + if 'allow_list' not in self.config: + self.config['allow_list'] = [] + if isinstance(value, list): + self.config['allow_list'].extend(value) + + elif key == 'nlp_configuration': + logger.info(f"🔧 Fusion de nlp_configuration: {value}") + if 'nlp_configuration' not in self.config: + self.config['nlp_configuration'] = {} + self._merge_dict(self.config['nlp_configuration'], value) + + elif isinstance(value, dict) and key in self.config and isinstance(self.config[key], dict): + self._merge_dict(self.config[key], value) + else: + self.config[key] = value + + def _merge_dict(self, target: Dict[str, Any], source: Dict[str, Any]): + for key, value in source.items(): + if isinstance(value, dict) and key in target and isinstance(target[key], dict): + self._merge_dict(target[key], value) + else: + target[key] = value + + def _load_legacy_config(self) -> Dict[str, Any]: + legacy_path = os.path.join(self.config_dir, "default.yaml") + if os.path.exists(legacy_path): + logger.info("Utilisation de la configuration legacy: default.yaml") + with open(legacy_path, 'r', encoding='utf-8') as f: + return yaml.safe_load(f) + else: + raise FileNotFoundError(f"Aucun fichier de configuration trouvé dans {self.config_dir}") + + def get_recognizers(self) -> List[Dict[str, Any]]: + return self.config.get('recognizer_registry', {}).get('recognizers', []) + + def get_supported_languages(self) -> List[str]: + return self.config.get('supported_languages', ['fr']) + + def load_single_file(self, file_path: str) -> Dict[str, Any]: + full_path = os.path.join(self.config_dir, file_path) if not os.path.isabs(file_path) else file_path + if not os.path.exists(full_path): + raise FileNotFoundError(f"Fichier de configuration non trouvé: {full_path}") + + with open(full_path, 'r', encoding='utf-8') as f: + return yaml.safe_load(f) diff --git a/docker-compose.yml b/docker-compose.yml index 43761b0..ca96320 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: '3.8' - services: presidio-analyzer: build: @@ -8,11 +6,4 @@ services: container_name: presidio-analyzer restart: unless-stopped ports: - - "5001" # Port corrigé selon la doc Microsoft - - presidio-anonymizer: - image: mcr.microsoft.com/presidio-anonymizer:latest - container_name: presidio-anonymizer - restart: unless-stopped - ports: - - "5002" # Port corrigé selon la doc Microsoft + - "5001:5001" diff --git a/entity_refiners.py b/entity_refiners.py new file mode 100644 index 0000000..721376a --- /dev/null +++ b/entity_refiners.py @@ -0,0 +1,56 @@ +from abc import ABC, abstractmethod +from typing import Optional, Tuple +import re +import logging + +# Imports des raffineurs modulaires +from refiners.iban_refiner import IBANRefiner +from refiners.ip_refiner import IPAddressRefiner +from refiners.date_refiner import DateRefiner +from refiners.location_address_refiner import LocationAddressRefiner +from refiners.word_boundary_refiner import WordBoundaryRefiner + +logger = logging.getLogger(__name__) + +class EntityRefiner(ABC): + """Classe de base pour le recadrage d'entités""" + + def __init__(self, entity_type: str): + self.entity_type = entity_type + + @abstractmethod + def refine(self, text: str, start: int, end: int) -> Optional[Tuple[int, int]]: + """Recadre une entité détectée""" + pass + + def should_process(self, entity_type: str) -> bool: + """Vérifie si ce raffineur doit traiter ce type d'entité""" + return entity_type == self.entity_type + +class EntityRefinerManager: + """Gestionnaire des raffineurs d'entités""" + + def __init__(self): + self.refiners = [ + WordBoundaryRefiner(), # En premier pour étendre aux mots complets + IBANRefiner(), + IPAddressRefiner(), + DateRefiner(), + LocationAddressRefiner() + ] + logger.info(f"Initialized {len(self.refiners)} entity refiners") + + def register_refiner(self, refiner): + """Enregistre un nouveau raffineur""" + self.refiners.append(refiner) + + def refine_entity(self, text: str, entity_type: str, start: int, end: int) -> Optional[Tuple[int, int]]: + """Applique tous les raffineurs applicables à une entité""" + for refiner in self.refiners: + if refiner.should_process(entity_type): + result = refiner.refine(text, start, end) + if result: + logger.debug(f"Entity refined by {refiner.__class__.__name__}: {start}-{end} -> {result[0]}-{result[1]}") + return result + + return (start, end) \ No newline at end of file diff --git a/pipeline_manager.py b/pipeline_manager.py new file mode 100644 index 0000000..75fdcda --- /dev/null +++ b/pipeline_manager.py @@ -0,0 +1,68 @@ +from typing import List +from presidio_analyzer import RecognizerResult +from entity_refiners import EntityRefinerManager +from post_processors import DeduplicationProcessor, OverlapResolver +import logging + +logger = logging.getLogger(__name__) + +class AnalysisPipeline: + def __init__(self): + self.refiner_manager = EntityRefinerManager() + self.overlap_resolver = OverlapResolver() + self.deduplicator = DeduplicationProcessor() + logger.info("🚀 Pipeline d'analyse initialisé") + + def process(self, text: str, results: List[RecognizerResult], allow_list_terms: List[str]) -> List[RecognizerResult]: + """Traite les résultats à travers le pipeline complet""" + # 1. Filtrage allow-list + filtered_results = self._filter_allow_list(results, allow_list_terms, text) + + # 2. Raffinement individuel des entités + refined_results = [] + for result in filtered_results: + refined_coords = self.refiner_manager.refine_entity( + text, + result.entity_type, + result.start, + result.end + ) + + if refined_coords is not None: + # Créer un nouveau RecognizerResult avec les coordonnées raffinées + refined_result = RecognizerResult( + entity_type=result.entity_type, + start=refined_coords[0], + end=refined_coords[1], + score=result.score + ) + refined_results.append(refined_result) + + # 3. Résolution des chevauchements + resolved_results = self.overlap_resolver.process(refined_results, text) + + # 4. Déduplication + final_results = self.deduplicator.process(resolved_results, text) + + logger.info(f"🎯 Pipeline complet: {len(results)} -> {len(final_results)} entités") + return final_results + + def _filter_allow_list(self, results: List[RecognizerResult], allow_list_terms: List[str], text: str) -> List[RecognizerResult]: + """Filtre les résultats en supprimant les termes de la allow-list""" + if not allow_list_terms: + return results + + filtered_results = [] + allow_list_lower = [term.lower().strip() for term in allow_list_terms] + + for result in results: + entity_text = text[result.start:result.end].lower().strip() + + # Garder l'entité si elle n'est pas dans la allow-list + if entity_text not in allow_list_lower: + filtered_results.append(result) + else: + logger.debug(f"🚫 Entité filtrée (allow-list): '{entity_text}'") + + logger.info(f"🔍 Filtrage allow-list: {len(results)} -> {len(filtered_results)} entités") + return filtered_results \ No newline at end of file diff --git a/post_processors/__init__.py b/post_processors/__init__.py new file mode 100644 index 0000000..fe32dac --- /dev/null +++ b/post_processors/__init__.py @@ -0,0 +1,4 @@ +from .deduplication_processor import DeduplicationProcessor +from .overlap_resolver import OverlapResolver + +__all__ = ['DeduplicationProcessor', 'OverlapResolver'] \ No newline at end of file diff --git a/post_processors/deduplication_processor.py b/post_processors/deduplication_processor.py new file mode 100644 index 0000000..761c431 --- /dev/null +++ b/post_processors/deduplication_processor.py @@ -0,0 +1,66 @@ +from typing import List +from presidio_analyzer import RecognizerResult +import logging + +logger = logging.getLogger(__name__) + +class DeduplicationProcessor: + def __init__(self): + self.rules = [ + LocationAddressRule() + ] + logger.info("🔧 DeduplicationProcessor initialisé avec les règles de déduplication") + + def process(self, results: List[RecognizerResult], text: str) -> List[RecognizerResult]: + """Applique les règles de déduplication aux résultats""" + processed_results = results.copy() + + for rule in self.rules: + processed_results = rule.apply(processed_results, text) + + logger.info(f"🔧 DeduplicationProcessor: {len(results)} -> {len(processed_results)} entités") + return processed_results + +class LocationAddressRule: + """Règle pour éviter les doublons entre LOCATION et ADDRESS""" + + def __init__(self): + self.insignificant_terms = {'le', 'la', 'les', 'de', 'du', 'des', 'à', 'au', 'aux'} + + def apply(self, results: List[RecognizerResult], text: str) -> List[RecognizerResult]: + """Supprime les LOCATION qui sont des doublons d'ADDRESS""" + locations = [r for r in results if r.entity_type == 'LOCATION'] + addresses = [r for r in results if r.entity_type == 'ADDRESS'] + others = [r for r in results if r.entity_type not in ['LOCATION', 'ADDRESS']] + + filtered_locations = [] + for location in locations: + if self._should_keep_location(location, addresses, text): + filtered_locations.append(location) + else: + location_text = text[location.start:location.end] + logger.debug(f"🗑️ Suppression LOCATION dupliquée: '{location_text}'") + + return addresses + filtered_locations + others + + def _should_keep_location(self, location: RecognizerResult, addresses: List[RecognizerResult], text: str) -> bool: + location_text = text[location.start:location.end].strip().lower() + + # Ignorer termes non significatifs + if (len(location_text) <= 3 or + location_text in self.insignificant_terms): + return False + + # Vérifier chevauchement avec adresses + for address in addresses: + if self._is_overlapping_or_contained(location, address, text): + return False + + return True + + def _is_overlapping_or_contained(self, loc: RecognizerResult, addr: RecognizerResult, text: str) -> bool: + """Vérifie si une location est contenue dans une address""" + loc_text = text[loc.start:loc.end].strip().lower() + addr_text = text[addr.start:addr.end].strip().lower() + + return loc_text in addr_text \ No newline at end of file diff --git a/post_processors/overlap_resolver.py b/post_processors/overlap_resolver.py new file mode 100644 index 0000000..350faed --- /dev/null +++ b/post_processors/overlap_resolver.py @@ -0,0 +1,241 @@ +from typing import List +from presidio_analyzer import RecognizerResult +import logging +import re + +logger = logging.getLogger(__name__) + +class OverlapResolver: + """ + Résout les chevauchements entre entités de différents types + Priorités: IBAN > EMAIL > PHONE > IP_ADDRESS > ADDRESS > LOCATION > ORGANIZATION > PERSON + """ + + def __init__(self): + # Ordre de priorité (plus haut = plus prioritaire) + self.priority_order = { + 'IBAN': 100, + 'CREDIT_CARD': 95, + 'EMAIL_ADDRESS': 90, + 'BE_ENTERPRISE_NUMBER': 88, + 'PHONE_NUMBER': 85, + 'BE_PHONE_NUMBER': 85, + 'IP_ADDRESS': 82, + 'BE_ADDRESS': 75, + 'FR_ADDRESS': 75, + 'DATE_TIME': 70, + 'ORGANIZATION': 65, + 'LOCATION': 60, + 'PERSON': 50, + 'NRP': 40, + 'URL': 35 + } + + # Patterns pour identifier les organisations + self.organization_patterns = [ + r'\\b\\w+Consult\\b', + r'\\bSPRL\\s+\\w+\\b', # Pattern pour SPRL + nom + r'\\bSRL\\s+\\w+\\b', # Pattern pour SRL + nom + r'\\bSA\\s+\\w+\\b', # Pattern pour SA + nom + r'\\bASBL\\s+\\w+\\b', # Pattern pour ASBL + nom + r'\\bSCS\\s+\\w+\\b', # Pattern pour SCS + nom + r'\\bSNC\\s+\\w+\\b', # Pattern pour SNC + nom + r'\\bSPRL\\b', + r'\\bSRL\\b', + r'\\bSA\\b', + r'\\bASBL\\b', + r'\\bSCS\\b', + r'\\bSNC\\b', + r'\\bLtd\\b', + r'\\bInc\\b', + r'\\bCorp\\b', + r'\\bGmbH\\b' + ] + + logger.info(f"✅ OverlapResolver initialisé avec {len(self.priority_order)} types d'entités") + + def process(self, results: List[RecognizerResult], text: str = "") -> List[RecognizerResult]: + """ + Résout les chevauchements en gardant l'entité la plus prioritaire + """ + if not results: + return results + + original_count = len(results) + + # Appliquer les corrections spécifiques avant résolution des chevauchements + corrected_results = self._apply_specific_corrections(results, text) + + # Trier par position pour traitement séquentiel + sorted_results = sorted(corrected_results, key=lambda x: (x.start, x.end)) + + resolved_results = [] + i = 0 + + while i < len(sorted_results): + current = sorted_results[i] + overlapping_group = [current] + + # Trouver tous les chevauchements avec l'entité courante + j = i + 1 + while j < len(sorted_results): + if self._is_overlapping(current, sorted_results[j]): + overlapping_group.append(sorted_results[j]) + j += 1 + elif sorted_results[j].start >= current.end: + # Plus de chevauchement possible + break + else: + j += 1 + + # Résoudre le groupe de chevauchements + if len(overlapping_group) > 1: + winner = self._resolve_overlap_group(overlapping_group, text) + resolved_results.append(winner) + # Avancer l'index pour éviter de retraiter les entités du groupe + i = j + else: + resolved_results.append(current) + i += 1 + + logger.info(f"🔧 OverlapResolver: {original_count} -> {len(resolved_results)} entités") + return resolved_results + + def _apply_specific_corrections(self, results: List[RecognizerResult], text: str) -> List[RecognizerResult]: + """ + Applique des corrections spécifiques avant la résolution des chevauchements + """ + corrected_results = [] + + for result in results: + entity_text = text[result.start:result.end] if text else "" + + # Correction 1: PERSON -> ORGANIZATION pour les noms d'entreprise + if result.entity_type == 'PERSON' and self._is_organization_name(entity_text): + corrected_result = RecognizerResult( + entity_type='ORGANIZATION', + start=result.start, + end=result.end, + score=result.score + 0.1 # Bonus de confiance + ) + logger.debug(f"🔄 Correction PERSON -> ORGANIZATION: '{entity_text}'") + corrected_results.append(corrected_result) + + # Correction 2: Séparer IP des adresses physiques + elif result.entity_type in ['BE_ADDRESS', 'FR_ADDRESS'] and self._contains_ip_address(entity_text): + # Extraire l'IP et créer une entité séparée + ip_matches = list(re.finditer(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', entity_text)) + if ip_matches: + for ip_match in ip_matches: + ip_start = result.start + ip_match.start() + ip_end = result.start + ip_match.end() + + # Créer l'entité IP + ip_result = RecognizerResult( + entity_type='IP_ADDRESS', + start=ip_start, + end=ip_end, + score=0.95 + ) + corrected_results.append(ip_result) + logger.debug(f"🔄 IP extraite de l'adresse: '{ip_match.group()}'") + + # Créer une nouvelle entité adresse SANS la partie IP + # Chercher la partie adresse physique (après l'IP) + address_pattern = r'\b(?:Avenue|Rue|Boulevard|Chaussée|Place|Quai|Impasse|Drève|Clos|Allée)\b.*?\b[1-9]\d{3}\s+[A-Za-zà-ÿ\'-]+' + address_match = re.search(address_pattern, entity_text, re.IGNORECASE) + + if address_match: + address_start = result.start + address_match.start() + address_end = result.start + address_match.end() + + # Vérifier qu'il n'y a pas de chevauchement avec l'IP + ip_overlaps = any(not (address_end <= ip_start or address_start >= ip_end) + for ip_match in ip_matches + for ip_start, ip_end in [(result.start + ip_match.start(), result.start + ip_match.end())]) + + if not ip_overlaps: + address_result = RecognizerResult( + entity_type=result.entity_type, + start=address_start, + end=address_end, + score=result.score + ) + corrected_results.append(address_result) + logger.debug(f"🔄 Adresse physique séparée: '{address_match.group()}'") + else: + corrected_results.append(result) + else: + corrected_results.append(result) + + return corrected_results + + def _is_organization_name(self, text: str) -> bool: + """ + Détermine si un texte ressemble à un nom d'organisation + """ + for pattern in self.organization_patterns: + if re.search(pattern, text, re.IGNORECASE): + return True + return False + + def _contains_ip_address(self, text: str) -> bool: + """ + Vérifie si le texte contient une adresse IP + """ + ip_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b' + return bool(re.search(ip_pattern, text)) + + def _is_overlapping(self, entity1: RecognizerResult, entity2: RecognizerResult) -> bool: + """ + Vérifie si deux entités se chevauchent + """ + return not (entity1.end <= entity2.start or entity1.start >= entity2.end) + + def _resolve_overlap_group(self, overlapping_entities: List[RecognizerResult], text: str = "") -> RecognizerResult: + """ + Résout un groupe d'entités qui se chevauchent + Critères: 1) Priorité du type, 2) Score de confiance, 3) Longueur + """ + def get_priority_score(entity): + base_priority = self.priority_order.get(entity.entity_type, 0) + confidence_bonus = entity.score * 10 # Score 0.9 = +9 points + + # Calculer la longueur depuis les positions + entity_length = entity.end - entity.start + length_bonus = entity_length * 0.1 # Bonus longueur + + # Bonus spécial pour IBAN vs FR_DRIVER_LICENSE + if entity.entity_type == 'IBAN': + # Vérifier si c'est un vrai IBAN (commence par code pays) + if text: + entity_text = text[entity.start:entity.end].replace(' ', '') + if re.match(r'^[A-Z]{2}[0-9]{2}', entity_text): + base_priority += 20 # Bonus pour vrai IBAN + + return base_priority + confidence_bonus + length_bonus + + # Trier par score de priorité décroissant + sorted_entities = sorted(overlapping_entities, + key=get_priority_score, + reverse=True) + + winner = sorted_entities[0] + + # Log des entités écartées (si texte disponible) + if text: + for loser in sorted_entities[1:]: + loser_text = text[loser.start:loser.end] + logger.debug(f"❌ Écarté: {loser.entity_type} '{loser_text}' (score: {get_priority_score(loser):.1f})") + + winner_text = text[winner.start:winner.end] + logger.debug(f"✅ Gagnant: {winner.entity_type} '{winner_text}' (score: {get_priority_score(winner):.1f})") + + return winner + + def add_entity_priority(self, entity_type: str, priority: int): + """ + Ajoute ou modifie la priorité d'un type d'entité + """ + self.priority_order[entity_type] = priority + logger.info(f"📊 Priorité mise à jour: {entity_type} = {priority}") \ No newline at end of file diff --git a/refiners/__init__.py b/refiners/__init__.py new file mode 100644 index 0000000..2bcc8fb --- /dev/null +++ b/refiners/__init__.py @@ -0,0 +1 @@ +# Refiners package \ No newline at end of file diff --git a/refiners/date_refiner.py b/refiners/date_refiner.py new file mode 100644 index 0000000..bc0c87a --- /dev/null +++ b/refiners/date_refiner.py @@ -0,0 +1,89 @@ +from abc import ABC, abstractmethod +from typing import Optional, Tuple +import re +import logging + +logger = logging.getLogger(__name__) + +class EntityRefiner(ABC): + """Classe de base pour le recadrage d'entités""" + + def __init__(self, entity_type: str): + self.entity_type = entity_type + + @abstractmethod + def refine(self, text: str, start: int, end: int) -> Optional[Tuple[int, int]]: + """Recadre une entité détectée""" + pass + + def should_process(self, entity_type: str) -> bool: + """Vérifie si ce raffineur doit traiter ce type d'entité""" + return entity_type == self.entity_type + +class DateRefiner(EntityRefiner): + """Raffineur pour les dates - élimine les faux positifs""" + + def __init__(self): + super().__init__("DATE_TIME") + # Patterns pour valider les vraies dates + self.valid_date_patterns = [ + # Format DD/MM/YYYY + re.compile(r"\b(?:0[1-9]|[12][0-9]|3[01])/(?:0[1-9]|1[0-2])/(?:19|20)\d{2}\b"), + # Format DD-MM-YYYY + re.compile(r"\b(?:0[1-9]|[12][0-9]|3[01])-(?:0[1-9]|1[0-2])-(?:19|20)\d{2}\b"), + # Format ISO YYYY-MM-DD + re.compile(r"\b(?:19|20)\d{2}-(?:0[1-9]|1[0-2])-(?:0[1-9]|[12][0-9]|3[01])\b"), + # Dates avec mois en lettres + re.compile(r"\b(?:0?[1-9]|[12][0-9]|3[01])\s+(?:janvier|février|mars|avril|mai|juin|juillet|août|septembre|octobre|novembre|décembre)\s+(?:19|20)\d{2}\b", re.IGNORECASE), + # Heures + re.compile(r"\b(?:[01][0-9]|2[0-3]):[0-5][0-9](?::[0-5][0-9])?\b") + ] + + # Patterns à rejeter (faux positifs courants) + self.reject_patterns = [ + # Codes IBAN belges (BE + chiffres) + re.compile(r"\bBE\d{2,}\b", re.IGNORECASE), + # Numéros d'entreprise belges + re.compile(r"\bBE\d{3}\.\d{3}\.\d{3}\b"), + # Mots comme HTVA, TVA, etc. + re.compile(r"\b(?:HTVA|TVA|BCE|ONSS|SIREN|SIRET)\b", re.IGNORECASE), + # Données sensibles (texte) + re.compile(r"\b(?:données?\s+sensibles?)\b", re.IGNORECASE), + # Codes postaux isolés + re.compile(r"^\d{4}$"), + # Codes courts (2-4 caractères alphanumériques) + re.compile(r"^[A-Z]{2}\d{1,2}$") + ] + + def refine(self, text: str, start: int, end: int) -> Optional[Tuple[int, int]]: + """Valide si l'entité détectée est vraiment une date""" + ent_text = text[start:end].strip() + + # Vérifier si c'est un pattern à rejeter + for reject_pattern in self.reject_patterns: + if reject_pattern.search(ent_text): + logger.info(f"Date rejetée (faux positif): '{ent_text}'") + return None + + # Vérifier si c'est un pattern de date valide + for valid_pattern in self.valid_date_patterns: + if valid_pattern.search(ent_text): + logger.info(f"Date validée: '{ent_text}'") + return (start, end) + + # Si aucun pattern valide trouvé, rejeter + logger.info(f"Date rejetée (format invalide): '{ent_text}'") + return None + + def validate_date_logic(self, day: int, month: int, year: int) -> bool: + """Valide la logique de la date (jours/mois corrects)""" + if month < 1 or month > 12: + return False + + days_in_month = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] + + # Année bissextile + if year % 4 == 0 and (year % 100 != 0 or year % 400 == 0): + days_in_month[1] = 29 + + return 1 <= day <= days_in_month[month - 1] \ No newline at end of file diff --git a/refiners/iban_refiner.py b/refiners/iban_refiner.py new file mode 100644 index 0000000..87b8290 --- /dev/null +++ b/refiners/iban_refiner.py @@ -0,0 +1,49 @@ +from abc import ABC, abstractmethod +from typing import Optional, Tuple +import re +import logging + +logger = logging.getLogger(__name__) + +class EntityRefiner(ABC): + """Classe de base pour le recadrage d'entités""" + + def __init__(self, entity_type: str): + self.entity_type = entity_type + + @abstractmethod + def refine(self, text: str, start: int, end: int) -> Optional[Tuple[int, int]]: + """Recadre une entité détectée""" + pass + + def should_process(self, entity_type: str) -> bool: + """Vérifie si ce raffineur doit traiter ce type d'entité""" + return entity_type == self.entity_type + +class IBANRefiner(EntityRefiner): + """Raffineur pour les IBAN""" + + def __init__(self): + super().__init__("IBAN") + self.iban_regex = re.compile(r"\b[A-Z]{2}[0-9]{2}(?:\s[0-9]{4}){3}\b", re.IGNORECASE) + + def refine(self, text: str, start: int, end: int) -> Optional[Tuple[int, int]]: + ent_text = text[start:end].strip() + match = self.iban_regex.search(ent_text) + + if not match: + logger.warning(f"Invalid IBAN detected, skipping: '{ent_text}'") + return None + + true_iban = match.group(0) + start_offset = ent_text.find(true_iban) + + if start_offset == -1: + logger.warning(f"IBAN regex match but cannot find substring position: '{ent_text}'") + return None + + new_start = start + start_offset + new_end = new_start + len(true_iban) + + logger.debug(f"Adjusted IBAN span: {start}-{end} => {new_start}-{new_end}") + return (new_start, new_end) \ No newline at end of file diff --git a/refiners/ip_refiner.py b/refiners/ip_refiner.py new file mode 100644 index 0000000..650c2d7 --- /dev/null +++ b/refiners/ip_refiner.py @@ -0,0 +1,52 @@ +from abc import ABC, abstractmethod +from typing import Optional, Tuple +import re +import logging + +logger = logging.getLogger(__name__) + +class EntityRefiner(ABC): + """Classe de base pour le recadrage d'entités""" + + def __init__(self, entity_type: str): + self.entity_type = entity_type + + @abstractmethod + def refine(self, text: str, start: int, end: int) -> Optional[Tuple[int, int]]: + """Recadre une entité détectée""" + pass + + def should_process(self, entity_type: str) -> bool: + """Vérifie si ce raffineur doit traiter ce type d'entité""" + return entity_type == self.entity_type + +class IPAddressRefiner(EntityRefiner): + """Raffineur pour les adresses IP""" + + def __init__(self): + super().__init__("IP_ADDRESS") + self.ipv4_regex = re.compile( + r"\b(?:(?:25[0-5]|2[0-4][0-9]|1\d{2}|[1-9]?\d)\.){3}" + r"(?:25[0-5]|2[0-4][0-9]|1\d{2}|[1-9]?\d)\b" + ) + + def refine(self, text: str, start: int, end: int) -> Optional[Tuple[int, int]]: + ent_text = text[start:end].strip() + match = self.ipv4_regex.search(ent_text) + + if not match: + logger.warning(f"Invalid IP detected, skipping: '{ent_text}'") + return None + + true_ip = match.group(0) + start_offset = ent_text.find(true_ip) + + if start_offset == -1: + logger.warning(f"IP regex match but cannot find substring position: '{ent_text}'") + return None + + new_start = start + start_offset + new_end = new_start + len(true_ip) + + logger.debug(f"Adjusted IP span: {start}-{end} => {new_start}-{new_end}") + return (new_start, new_end) \ No newline at end of file diff --git a/refiners/location_address_refiner.py b/refiners/location_address_refiner.py new file mode 100644 index 0000000..5be60d7 --- /dev/null +++ b/refiners/location_address_refiner.py @@ -0,0 +1,76 @@ +from typing import List, Optional, Tuple +from presidio_analyzer import RecognizerResult +from abc import ABC, abstractmethod +import logging + +logger = logging.getLogger(__name__) + +class EntityRefiner(ABC): + """Classe de base pour le recadrage d'entités""" + + def __init__(self, entity_type: str): + self.entity_type = entity_type + + @abstractmethod + def refine(self, text: str, start: int, end: int) -> Optional[Tuple[int, int]]: + """ + Recadre une entité détectée + + Args: + text: Le texte complet + start: Position de début de l'entité détectée + end: Position de fin de l'entité détectée + + Returns: + Tuple (nouveau_start, nouveau_end) ou None si l'entité doit être ignorée + """ + pass + + def should_process(self, entity_type: str) -> bool: + """Vérifie si ce raffineur doit traiter ce type d'entité""" + return entity_type == self.entity_type + +class LocationAddressRefiner(EntityRefiner): + """ + Refiner pour filtrer les doublons entre LOCATION et BE_ADDRESS/FR_ADDRESS. + Ce refiner ne modifie pas les positions mais peut supprimer des entités. + """ + + def __init__(self): + super().__init__("LOCATION") # Ne traite que les LOCATION + self.address_entities = {'BE_ADDRESS', 'FR_ADDRESS'} + self.location_entity = 'LOCATION' + # Cache pour stocker les adresses détectées + self._detected_addresses = [] + + def refine(self, text: str, start: int, end: int) -> Optional[Tuple[int, int]]: + """ + Vérifie si cette LOCATION fait partie d'une adresse déjà détectée. + + Args: + text: Le texte complet + start: Position de début de la LOCATION + end: Position de fin de la LOCATION + + Returns: + Tuple (start, end) si la location doit être conservée, None sinon + """ + location_text = text[start:end].strip().lower() + + # Ignorer les locations trop courtes ou non significatives + if len(location_text) <= 3 or location_text in ['tel', 'fax', 'gsm']: + logger.debug(f"Ignoring short/insignificant location: '{location_text}'") + return None + + # Chercher des adresses dans le texte (simple heuristique) + # Cette approche est limitée car on n'a accès qu'à une entité à la fois + # Une meilleure approche serait de modifier l'architecture globale + + # Pour l'instant, on garde toutes les locations valides + # et on laisse un post-processing global gérer les doublons + logger.debug(f"Keeping location: '{location_text}'") + return (start, end) + + def should_process(self, entity_type: str) -> bool: + """Ne traite que les entités LOCATION""" + return entity_type == self.location_entity \ No newline at end of file diff --git a/refiners/word_boundary_refiner.py b/refiners/word_boundary_refiner.py new file mode 100644 index 0000000..22b6f11 --- /dev/null +++ b/refiners/word_boundary_refiner.py @@ -0,0 +1,39 @@ +import re +from typing import Optional, Tuple +import logging + +logger = logging.getLogger(__name__) + +class WordBoundaryRefiner: + """Refiner pour étendre les entités aux limites de mots complets""" + + def __init__(self): + self.entity_type = "ALL" # S'applique à tous les types d'entités + + def should_process(self, entity_type: str) -> bool: + """Ce refiner s'applique à tous les types d'entités""" + return True + + def refine(self, text: str, start: int, end: int) -> Optional[Tuple[int, int]]: + """Étend l'entité pour inclure le mot complet""" + try: + # Trouver le début du mot + new_start = start + while new_start > 0 and text[new_start - 1].isalnum(): + new_start -= 1 + + # Trouver la fin du mot + new_end = end + while new_end < len(text) and text[new_end].isalnum(): + new_end += 1 + + # Retourner les nouvelles positions si elles ont changé + if new_start != start or new_end != end: + logger.debug(f"Extended entity boundaries from [{start}:{end}] to [{new_start}:{new_end}]") + return (new_start, new_end) + + return None + + except Exception as e: + logger.error(f"Error in WordBoundaryRefiner: {e}") + return None \ No newline at end of file