From adf495b7921eee7dd0be1605d02fa88a5de99d11 Mon Sep 17 00:00:00 2001 From: nacim Date: Sun, 3 Aug 2025 20:39:14 +0000 Subject: [PATCH] Actualiser app.py --- app.py | 162 +++++++++++++++++++++++---------------------------------- 1 file changed, 65 insertions(+), 97 deletions(-) diff --git a/app.py b/app.py index 234445b..dd16485 100644 --- a/app.py +++ b/app.py @@ -1,14 +1,9 @@ import os import re import logging -import yaml - from flask import Flask, request, jsonify, make_response -# ### CORRECTION ### - Import des classes nécessaires. 'Replace' est nouveau et crucial. -from presidio_analyzer import AnalyzerEngine, RecognizerResult, AnalyzerEngineProvider -from presidio_anonymizer import AnonymizerEngine -from presidio_anonymizer.anonymizers import Replace # NOUVEL IMPORT +from presidio_analyzer import AnalyzerEngineProvider logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") @@ -16,139 +11,112 @@ logger = logging.getLogger(__name__) app = Flask(__name__) -# --- Initialisation --- +# Chargement du moteur Presidio via Provider analyzer = None -anonymizer = None -anonymizer_config = {} # ### CORRECTION ### - On stocke la config pour l'utiliser plus tard - try: - logger.info("--- Presidio Service Starting ---") + logger.info("--- Presidio Analyzer Service Starting ---") CONFIG_FILE_PATH = os.environ.get("PRESIDIO_ANALYZER_CONFIG_FILE", "conf/default.yaml") - - if not os.path.exists(CONFIG_FILE_PATH): - raise FileNotFoundError(f"Configuration file not found at: {CONFIG_FILE_PATH}") - - # 1. Initialiser l'AnalyzerEngine (cette partie est maintenant correcte). provider = AnalyzerEngineProvider(analyzer_engine_conf_file=CONFIG_FILE_PATH) analyzer = provider.create_engine() - logger.info("AnalyzerEngine created successfully.") - - # 2. ### CORRECTION FONDAMENTALE ### - # L'AnonymizerEngine est initialisé SANS argument. - anonymizer = AnonymizerEngine() - logger.info("AnonymizerEngine created successfully.") - - # 3. On charge la config d'anonymisation pour l'utiliser DANS l'endpoint. - with open(CONFIG_FILE_PATH, 'r') as f: - config_from_file = yaml.safe_load(f) - anonymizer_config = config_from_file.get("anonymizer_config", {}) - logger.info("Anonymizer configuration loaded.") - - logger.info(f"Analyzer and Anonymizer are ready. Languages: {analyzer.supported_languages}") - + logger.info(f"Analyzer ready. Languages: {analyzer.supported_languages}") except Exception as e: - logger.exception("FATAL: Error during Presidio engines initialization.") + logger.exception("Error during AnalyzerEngine initialization.") analyzer = None - anonymizer = None -# --- Fin de la section d'initialisation --- - -# Le reste de votre logique de filtrage et de recadrage reste INCHANGÉ. -# ... (IBAN_REGEX, IPV4_REGEX, IGNORE_LABELS, normalize_label) ... +# Regex strict pour IBAN belge format attendu IBAN_REGEX = re.compile(r"\b[A-Z]{2}[0-9]{2}(?:\s[0-9]{4}){3}\b", re.IGNORECASE) + +# Regex IPv4 IPV4_REGEX = re.compile( r"\b(?:(?:25[0-5]|2[0-4][0-9]|1\d{2}|[1-9]?\d)\.){3}" r"(?:25[0-5]|2[0-4][0-9]|1\d{2}|[1-9]?\d)\b" ) + +# Liste des labels/phrases à exclure d’anonymisation (en minuscules) IGNORE_LABELS = { - "témoins", "témoins clés", "coordonnées", "coordonnées bancaires", - "contexte financier", "données sensibles", "contexte", "montrent", - "montrent des", "montrent des irrégularités", "bénéficiaire", + "témoins", + "témoins clés", + "coordonnées", + "coordonnées bancaires", + "contexte financier", + "données sensibles", + "contexte", + "montrent", + "montrent des", + "montrent des irrégularités", + "bénéficiaire", } + def normalize_label(text: str) -> str: return text.strip().lower() - -# ===================================================================== -# VOTRE ENDPOINT /analyze ORIGINAL - SANS AUCUN CHANGEMENT -# ===================================================================== @app.route("/analyze", methods=["POST"]) def analyze_text(): - # ... (Votre code pour /analyze reste identique) if not analyzer: return jsonify({"error": "Analyzer engine is not available. Check startup logs."}), 500 + try: data = request.get_json(force=True) text_to_analyze = data.get("text", "") language = data.get("language", "fr") + if not text_to_analyze: return jsonify({"error": "text field is missing or empty"}), 400 + results = analyzer.analyze(text=text_to_analyze, language=language) + filtered_results = [] for res in results: ent_text = text_to_analyze[res.start:res.end].strip() ent_text_norm = normalize_label(ent_text) + if ent_text_norm in IGNORE_LABELS: + logger.debug(f"Skipping anonymization of label: '{ent_text}'") continue + + # Recadrage IBAN strict if res.entity_type == "IBAN": - if not IBAN_REGEX.search(ent_text): continue + match = IBAN_REGEX.search(ent_text) + if match: + true_iban = match.group(0) + start_offset = ent_text.find(true_iban) + if start_offset != -1: + old_start, old_end = res.start, res.end + res.start += start_offset + res.end = res.start + len(true_iban) + logger.debug(f"Adjusted IBAN span: {old_start}-{old_end} => {res.start}-{res.end}") + else: + logger.warning(f"IBAN regex match but cannot find substring position: '{ent_text}'") + else: + logger.warning(f"Invalid IBAN detected, skipping: '{ent_text}'") + continue + + # Recadrage IP_ADDRESS strict IPv4 (wildcard possible pour IPv6 si besoin) if res.entity_type == "IP_ADDRESS": - if not IPV4_REGEX.search(ent_text): continue + match = IPV4_REGEX.search(ent_text) + if match: + true_ip = match.group(0) + start_offset = ent_text.find(true_ip) + if start_offset != -1: + old_start, old_end = res.start, res.end + res.start += start_offset + res.end = res.start + len(true_ip) + logger.debug(f"Adjusted IP span: {old_start}-{old_end} => {res.start}-{res.end}") + else: + logger.warning(f"IP regex match but cannot find substring position: '{ent_text}'") + else: + logger.warning(f"Invalid IP detected, skipping: '{ent_text}'") + continue + filtered_results.append(res) + + # Retourner le résultat nettoyé response_data = [res.to_dict() for res in filtered_results] return make_response(jsonify(response_data), 200) + except Exception as e: logger.exception("Error processing analysis") return jsonify({"error": str(e)}), 500 -# ===================================================================== -# ### CORRECTION FONDAMENTALE ### NOUVEL ENDPOINT /anonymize -# ===================================================================== -@app.route("/anonymize", methods=["POST"]) -def anonymize_text(): - if not analyzer or not anonymizer: - return jsonify({"error": "Presidio engines are not available. Check startup logs."}), 500 - - try: - data = request.get_json(force=True) - text_to_process = data.get("text", "") - language = data.get("language", "fr") - - if not text_to_process: - return jsonify({"error": "text field is missing or empty"}), 400 - - # Étape 1 : Analyser le texte (comme avant) - analyzer_results = analyzer.analyze(text=text_to_process, language=language) - - # Étape 2 : ### NOUVELLE LOGIQUE ### - # Construire le dictionnaire d'anonymiseurs à partir de notre configuration - anonymizers_from_config = {} - if "default_anonymizers" in anonymizer_config and "replacements" in anonymizer_config: - default_anonymizers = anonymizer_config["default_anonymizers"] - replacements = anonymizer_config["replacements"] - for entity, method in default_anonymizers.items(): - if method.lower() == "replace": - # On cherche la valeur de remplacement pour cette entité - new_value = replacements.get(entity) - if new_value is not None: - anonymizers_from_config[entity] = Replace(new_value=new_value) - - # Étape 3 : Appeler .anonymize() en lui passant le dictionnaire que nous venons de créer - anonymized_result = anonymizer.anonymize( - text=text_to_process, - analyzer_results=analyzer_results, - anonymizers=anonymizers_from_config # On passe notre configuration ici - ) - - # Étape 4 : Renvoyer le texte anonymisé - return jsonify({"text": anonymized_result.text}), 200 - - except Exception as e: - logger.exception("Error processing anonymization request") - return jsonify({"error": str(e)}), 500 - -# ===================================================================== -# DÉMARRAGE DE L'APPLICATION FLASK (INCHANGÉ) -# ===================================================================== if __name__ == "__main__": - app.run(host="0.0.0.0", port=5001) \ No newline at end of file + app.run(host="0.0.0.0", port=5001)