Files
Presidio/app.py
2025-08-03 20:39:14 +00:00

123 lines
4.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import logging
from flask import Flask, request, jsonify, make_response
from presidio_analyzer import AnalyzerEngineProvider
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
app = Flask(__name__)
# Chargement du moteur Presidio via Provider
analyzer = None
try:
logger.info("--- Presidio Analyzer Service Starting ---")
CONFIG_FILE_PATH = os.environ.get("PRESIDIO_ANALYZER_CONFIG_FILE", "conf/default.yaml")
provider = AnalyzerEngineProvider(analyzer_engine_conf_file=CONFIG_FILE_PATH)
analyzer = provider.create_engine()
logger.info(f"Analyzer ready. Languages: {analyzer.supported_languages}")
except Exception as e:
logger.exception("Error during AnalyzerEngine initialization.")
analyzer = None
# Regex strict pour IBAN belge format attendu
IBAN_REGEX = re.compile(r"\b[A-Z]{2}[0-9]{2}(?:\s[0-9]{4}){3}\b", re.IGNORECASE)
# Regex IPv4
IPV4_REGEX = re.compile(
r"\b(?:(?:25[0-5]|2[0-4][0-9]|1\d{2}|[1-9]?\d)\.){3}"
r"(?:25[0-5]|2[0-4][0-9]|1\d{2}|[1-9]?\d)\b"
)
# Liste des labels/phrases à exclure danonymisation (en minuscules)
IGNORE_LABELS = {
"témoins",
"témoins clés",
"coordonnées",
"coordonnées bancaires",
"contexte financier",
"données sensibles",
"contexte",
"montrent",
"montrent des",
"montrent des irrégularités",
"bénéficiaire",
}
def normalize_label(text: str) -> str:
return text.strip().lower()
@app.route("/analyze", methods=["POST"])
def analyze_text():
if not analyzer:
return jsonify({"error": "Analyzer engine is not available. Check startup logs."}), 500
try:
data = request.get_json(force=True)
text_to_analyze = data.get("text", "")
language = data.get("language", "fr")
if not text_to_analyze:
return jsonify({"error": "text field is missing or empty"}), 400
results = analyzer.analyze(text=text_to_analyze, language=language)
filtered_results = []
for res in results:
ent_text = text_to_analyze[res.start:res.end].strip()
ent_text_norm = normalize_label(ent_text)
if ent_text_norm in IGNORE_LABELS:
logger.debug(f"Skipping anonymization of label: '{ent_text}'")
continue
# Recadrage IBAN strict
if res.entity_type == "IBAN":
match = IBAN_REGEX.search(ent_text)
if match:
true_iban = match.group(0)
start_offset = ent_text.find(true_iban)
if start_offset != -1:
old_start, old_end = res.start, res.end
res.start += start_offset
res.end = res.start + len(true_iban)
logger.debug(f"Adjusted IBAN span: {old_start}-{old_end} => {res.start}-{res.end}")
else:
logger.warning(f"IBAN regex match but cannot find substring position: '{ent_text}'")
else:
logger.warning(f"Invalid IBAN detected, skipping: '{ent_text}'")
continue
# Recadrage IP_ADDRESS strict IPv4 (wildcard possible pour IPv6 si besoin)
if res.entity_type == "IP_ADDRESS":
match = IPV4_REGEX.search(ent_text)
if match:
true_ip = match.group(0)
start_offset = ent_text.find(true_ip)
if start_offset != -1:
old_start, old_end = res.start, res.end
res.start += start_offset
res.end = res.start + len(true_ip)
logger.debug(f"Adjusted IP span: {old_start}-{old_end} => {res.start}-{res.end}")
else:
logger.warning(f"IP regex match but cannot find substring position: '{ent_text}'")
else:
logger.warning(f"Invalid IP detected, skipping: '{ent_text}'")
continue
filtered_results.append(res)
# Retourner le résultat nettoyé
response_data = [res.to_dict() for res in filtered_results]
return make_response(jsonify(response_data), 200)
except Exception as e:
logger.exception("Error processing analysis")
return jsonify({"error": str(e)}), 500
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5001)