import os import re import logging from flask import Flask, request, jsonify, make_response from presidio_analyzer import AnalyzerEngineProvider logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) app = Flask(__name__) # Initialisation Presidio Analyzer via Provider analyzer = None try: logger.info("--- Presidio Analyzer Service Starting ---") CONFIG_FILE_PATH = os.environ.get("PRESIDIO_ANALYZER_CONFIG_FILE", "conf/default.yaml") provider = AnalyzerEngineProvider(analyzer_engine_conf_file=CONFIG_FILE_PATH) analyzer = provider.create_engine() logger.info(f"Analyzer ready. Supported languages: {analyzer.supported_languages}") except Exception as e: logger.exception("Error during AnalyzerEngine initialization.") analyzer = None # Regex pour recadrage strict IBAN_REGEX = re.compile(r"\b[A-Z]{2}[0-9]{2}(?:\s[A-Z0-9]{4}){4,7}\b", re.IGNORECASE) IPV4_REGEX = re.compile(r"\b(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)(?:\.(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)){3}\b") # Liste des labels/phrases à exclure de l'anonymisation (en minuscules) IGNORE_LABELS = { "témoins", "témoins clés", "coordonnées", "coordonnées bancaires", "contexte financier", "données sensibles", "contexte", "montrent", "montrent des", "montrent des irrégularités", "bénéficiaire", } def normalize_label(text: str) -> str: return text.strip().lower() @app.route("/analyze", methods=["POST"]) def analyze_text(): if not analyzer: return jsonify({"error": "Analyzer engine is not available. Check startup logs."}), 500 try: data = request.get_json(force=True) text_to_analyze = data.get("text", "") language = data.get("language", "fr") if not text_to_analyze: return jsonify({"error": "text field is missing or empty"}), 400 results = analyzer.analyze(text=text_to_analyze, language=language) filtered_results = [] for res in results: ent_text = text_to_analyze[res.start:res.end].strip() ent_text_norm = normalize_label(ent_text) # Skip anonymization for labels to keep if ent_text_norm in IGNORE_LABELS: logger.debug(f"Skipping anonymization of label: '{ent_text}'") continue # Recadrage stricte IBAN if res.entity_type == "IBAN": match = IBAN_REGEX.search(ent_text) if match: true_iban = match.group(0) start_offset = ent_text.find(true_iban) if start_offset != -1: old_start, old_end = res.start, res.end res.start += start_offset res.end = res.start + len(true_iban) ent_text = true_iban logger.debug(f"Adjusted IBAN span from ({old_start}-{old_end}) to ({res.start}-{res.end}): '{ent_text}'") else: logger.warning(f"Cannot find exact IBAN substring inside entity: '{ent_text}'") else: # Pas un IBAN valide, ignorer cette entité logger.warning(f"Entity IBAN does not match strict IBAN regex: '{ent_text}'") continue # Recadrage stricte IP_ADDRESS if res.entity_type == "IP_ADDRESS": match = IPV4_REGEX.search(ent_text) if match: true_ip = match.group(0) start_offset = ent_text.find(true_ip) if start_offset != -1: old_start, old_end = res.start, res.end res.start += start_offset res.end = res.start + len(true_ip) ent_text = true_ip logger.debug(f"Adjusted IP_ADDRESS span from ({old_start}-{old_end}) to ({res.start}-{res.end}): '{ent_text}'") else: logger.warning(f"Cannot find exact IP substring inside entity: '{ent_text}'") else: logger.warning(f"Entity IP_ADDRESS does not match IPv4 regex: '{ent_text}'") continue filtered_results.append(res) # Option: filtrer les chevauchements response_data = [res.to_dict() for res in filtered_results] return make_response(jsonify(response_data), 200) except Exception as e: logger.exception(f"Error during analysis for language '{language}'.") if "No matching recognizers" in str(e): return jsonify({"error": f"No recognizers available for language '{language}'."}), 400 return jsonify({"error": str(e)}), 500 if __name__ == "__main__": app.run(host="0.0.0.0", port=5001)