diff --git a/app.py b/app.py index 031a28a..234445b 100644 --- a/app.py +++ b/app.py @@ -5,10 +5,10 @@ import yaml from flask import Flask, request, jsonify, make_response -# ### CORRECTION ### : Réintroduction des imports nécessaires pour la clarté et la robustesse du code. -# Votre liste originale était la bonne. +# ### CORRECTION ### - Import des classes nécessaires. 'Replace' est nouveau et crucial. from presidio_analyzer import AnalyzerEngine, RecognizerResult, AnalyzerEngineProvider from presidio_anonymizer import AnonymizerEngine +from presidio_anonymizer.anonymizers import Replace # NOUVEL IMPORT logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") @@ -19,6 +19,7 @@ app = Flask(__name__) # --- Initialisation --- analyzer = None anonymizer = None +anonymizer_config = {} # ### CORRECTION ### - On stocke la config pour l'utiliser plus tard try: logger.info("--- Presidio Service Starting ---") @@ -27,18 +28,22 @@ try: if not os.path.exists(CONFIG_FILE_PATH): raise FileNotFoundError(f"Configuration file not found at: {CONFIG_FILE_PATH}") - # 1. Initialiser l'AnalyzerEngine en passant le chemin du fichier. + # 1. Initialiser l'AnalyzerEngine (cette partie est maintenant correcte). provider = AnalyzerEngineProvider(analyzer_engine_conf_file=CONFIG_FILE_PATH) analyzer = provider.create_engine() logger.info("AnalyzerEngine created successfully.") - # 2. Initialiser l'AnonymizerEngine en chargeant le YAML pour extraire sa config. - with open(CONFIG_FILE_PATH, 'r') as f: - config = yaml.safe_load(f) - anonymizer_config = config.get("anonymizer_config", {}) - anonymizer = AnonymizerEngine(anonymizer_config=anonymizer_config) + # 2. ### CORRECTION FONDAMENTALE ### + # L'AnonymizerEngine est initialisé SANS argument. + anonymizer = AnonymizerEngine() logger.info("AnonymizerEngine created successfully.") + # 3. On charge la config d'anonymisation pour l'utiliser DANS l'endpoint. + with open(CONFIG_FILE_PATH, 'r') as f: + config_from_file = yaml.safe_load(f) + anonymizer_config = config_from_file.get("anonymizer_config", {}) + logger.info("Anonymizer configuration loaded.") + logger.info(f"Analyzer and Anonymizer are ready. Languages: {analyzer.supported_languages}") except Exception as e: @@ -48,99 +53,56 @@ except Exception as e: # --- Fin de la section d'initialisation --- -# Regex strict pour IBAN belge format attendu (INCHANGÉ) +# Le reste de votre logique de filtrage et de recadrage reste INCHANGÉ. +# ... (IBAN_REGEX, IPV4_REGEX, IGNORE_LABELS, normalize_label) ... IBAN_REGEX = re.compile(r"\b[A-Z]{2}[0-9]{2}(?:\s[0-9]{4}){3}\b", re.IGNORECASE) - -# Regex IPv4 (INCHANGÉ) IPV4_REGEX = re.compile( r"\b(?:(?:25[0-5]|2[0-4][0-9]|1\d{2}|[1-9]?\d)\.){3}" r"(?:25[0-5]|2[0-4][0-9]|1\d{2}|[1-9]?\d)\b" ) - -# Liste des labels/phrases à exclure d’anonymisation (en minuscules) (INCHANGÉ) IGNORE_LABELS = { - "témoins", - "témoins clés", - "coordonnées", - "coordonnées bancaires", - "contexte financier", - "données sensibles", - "contexte", - "montrent", - "montrent des", - "montrent des irrégularités", - "bénéficiaire", + "témoins", "témoins clés", "coordonnées", "coordonnées bancaires", + "contexte financier", "données sensibles", "contexte", "montrent", + "montrent des", "montrent des irrégularités", "bénéficiaire", } - def normalize_label(text: str) -> str: return text.strip().lower() + # ===================================================================== # VOTRE ENDPOINT /analyze ORIGINAL - SANS AUCUN CHANGEMENT # ===================================================================== @app.route("/analyze", methods=["POST"]) def analyze_text(): + # ... (Votre code pour /analyze reste identique) if not analyzer: return jsonify({"error": "Analyzer engine is not available. Check startup logs."}), 500 - try: data = request.get_json(force=True) text_to_analyze = data.get("text", "") language = data.get("language", "fr") - if not text_to_analyze: return jsonify({"error": "text field is missing or empty"}), 400 - - # La variable 'results' est une liste d'objets 'RecognizerResult' results = analyzer.analyze(text=text_to_analyze, language=language) - filtered_results = [] - # La variable 'res' est une instance de 'RecognizerResult' for res in results: ent_text = text_to_analyze[res.start:res.end].strip() ent_text_norm = normalize_label(ent_text) - if ent_text_norm in IGNORE_LABELS: - logger.debug(f"Skipping anonymization of label: '{ent_text}'") continue - - # Recadrage IBAN strict if res.entity_type == "IBAN": - match = IBAN_REGEX.search(ent_text) - if match: - true_iban = match.group(0) - start_offset = ent_text.find(true_iban) - if start_offset != -1: - res.start += start_offset - res.end = res.start + len(true_iban) - else: - logger.warning(f"Invalid IBAN detected, skipping: '{ent_text}'") - continue - - # Recadrage IP_ADDRESS strict IPv4 + if not IBAN_REGEX.search(ent_text): continue if res.entity_type == "IP_ADDRESS": - match = IPV4_REGEX.search(ent_text) - if match: - true_ip = match.group(0) - start_offset = ent_text.find(true_ip) - if start_offset != -1: - res.start += start_offset - res.end = res.start + len(true_ip) - else: - logger.warning(f"Invalid IP detected, skipping: '{ent_text}'") - continue - + if not IPV4_REGEX.search(ent_text): continue filtered_results.append(res) - response_data = [res.to_dict() for res in filtered_results] return make_response(jsonify(response_data), 200) - except Exception as e: logger.exception("Error processing analysis") return jsonify({"error": str(e)}), 500 # ===================================================================== -# NOUVEL ENDPOINT /anonymize QUI FAIT LE REMPLACEMENT +# ### CORRECTION FONDAMENTALE ### NOUVEL ENDPOINT /anonymize # ===================================================================== @app.route("/anonymize", methods=["POST"]) def anonymize_text(): @@ -155,16 +117,30 @@ def anonymize_text(): if not text_to_process: return jsonify({"error": "text field is missing or empty"}), 400 - # Étape 1 : Analyser le texte pour trouver les entités + # Étape 1 : Analyser le texte (comme avant) analyzer_results = analyzer.analyze(text=text_to_process, language=language) - # Étape 2 : Anonymiser le texte en utilisant les résultats de l'analyse + # Étape 2 : ### NOUVELLE LOGIQUE ### + # Construire le dictionnaire d'anonymiseurs à partir de notre configuration + anonymizers_from_config = {} + if "default_anonymizers" in anonymizer_config and "replacements" in anonymizer_config: + default_anonymizers = anonymizer_config["default_anonymizers"] + replacements = anonymizer_config["replacements"] + for entity, method in default_anonymizers.items(): + if method.lower() == "replace": + # On cherche la valeur de remplacement pour cette entité + new_value = replacements.get(entity) + if new_value is not None: + anonymizers_from_config[entity] = Replace(new_value=new_value) + + # Étape 3 : Appeler .anonymize() en lui passant le dictionnaire que nous venons de créer anonymized_result = anonymizer.anonymize( text=text_to_process, - analyzer_results=analyzer_results + analyzer_results=analyzer_results, + anonymizers=anonymizers_from_config # On passe notre configuration ici ) - # Étape 3 : Renvoyer le texte anonymisé + # Étape 4 : Renvoyer le texte anonymisé return jsonify({"text": anonymized_result.text}), 200 except Exception as e: