Actualiser app.py

This commit is contained in:
2025-08-03 20:36:26 +00:00
parent febac46dc2
commit fff01a135e

106
app.py
View File

@@ -5,10 +5,10 @@ import yaml
from flask import Flask, request, jsonify, make_response
# ### CORRECTION ### : Réintroduction des imports nécessaires pour la clarté et la robustesse du code.
# Votre liste originale était la bonne.
# ### CORRECTION ### - Import des classes nécessaires. 'Replace' est nouveau et crucial.
from presidio_analyzer import AnalyzerEngine, RecognizerResult, AnalyzerEngineProvider
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.anonymizers import Replace # NOUVEL IMPORT
logging.basicConfig(level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
@@ -19,6 +19,7 @@ app = Flask(__name__)
# --- Initialisation ---
analyzer = None
anonymizer = None
anonymizer_config = {} # ### CORRECTION ### - On stocke la config pour l'utiliser plus tard
try:
logger.info("--- Presidio Service Starting ---")
@@ -27,18 +28,22 @@ try:
if not os.path.exists(CONFIG_FILE_PATH):
raise FileNotFoundError(f"Configuration file not found at: {CONFIG_FILE_PATH}")
# 1. Initialiser l'AnalyzerEngine en passant le chemin du fichier.
# 1. Initialiser l'AnalyzerEngine (cette partie est maintenant correcte).
provider = AnalyzerEngineProvider(analyzer_engine_conf_file=CONFIG_FILE_PATH)
analyzer = provider.create_engine()
logger.info("AnalyzerEngine created successfully.")
# 2. Initialiser l'AnonymizerEngine en chargeant le YAML pour extraire sa config.
with open(CONFIG_FILE_PATH, 'r') as f:
config = yaml.safe_load(f)
anonymizer_config = config.get("anonymizer_config", {})
anonymizer = AnonymizerEngine(anonymizer_config=anonymizer_config)
# 2. ### CORRECTION FONDAMENTALE ###
# L'AnonymizerEngine est initialisé SANS argument.
anonymizer = AnonymizerEngine()
logger.info("AnonymizerEngine created successfully.")
# 3. On charge la config d'anonymisation pour l'utiliser DANS l'endpoint.
with open(CONFIG_FILE_PATH, 'r') as f:
config_from_file = yaml.safe_load(f)
anonymizer_config = config_from_file.get("anonymizer_config", {})
logger.info("Anonymizer configuration loaded.")
logger.info(f"Analyzer and Anonymizer are ready. Languages: {analyzer.supported_languages}")
except Exception as e:
@@ -48,99 +53,56 @@ except Exception as e:
# --- Fin de la section d'initialisation ---
# Regex strict pour IBAN belge format attendu (INCHANGÉ)
# Le reste de votre logique de filtrage et de recadrage reste INCHANGÉ.
# ... (IBAN_REGEX, IPV4_REGEX, IGNORE_LABELS, normalize_label) ...
IBAN_REGEX = re.compile(r"\b[A-Z]{2}[0-9]{2}(?:\s[0-9]{4}){3}\b", re.IGNORECASE)
# Regex IPv4 (INCHANGÉ)
IPV4_REGEX = re.compile(
r"\b(?:(?:25[0-5]|2[0-4][0-9]|1\d{2}|[1-9]?\d)\.){3}"
r"(?:25[0-5]|2[0-4][0-9]|1\d{2}|[1-9]?\d)\b"
)
# Liste des labels/phrases à exclure danonymisation (en minuscules) (INCHANGÉ)
IGNORE_LABELS = {
"témoins",
"témoins clés",
"coordonnées",
"coordonnées bancaires",
"contexte financier",
"données sensibles",
"contexte",
"montrent",
"montrent des",
"montrent des irrégularités",
"bénéficiaire",
"témoins", "témoins clés", "coordonnées", "coordonnées bancaires",
"contexte financier", "données sensibles", "contexte", "montrent",
"montrent des", "montrent des irrégularités", "bénéficiaire",
}
def normalize_label(text: str) -> str:
return text.strip().lower()
# =====================================================================
# VOTRE ENDPOINT /analyze ORIGINAL - SANS AUCUN CHANGEMENT
# =====================================================================
@app.route("/analyze", methods=["POST"])
def analyze_text():
# ... (Votre code pour /analyze reste identique)
if not analyzer:
return jsonify({"error": "Analyzer engine is not available. Check startup logs."}), 500
try:
data = request.get_json(force=True)
text_to_analyze = data.get("text", "")
language = data.get("language", "fr")
if not text_to_analyze:
return jsonify({"error": "text field is missing or empty"}), 400
# La variable 'results' est une liste d'objets 'RecognizerResult'
results = analyzer.analyze(text=text_to_analyze, language=language)
filtered_results = []
# La variable 'res' est une instance de 'RecognizerResult'
for res in results:
ent_text = text_to_analyze[res.start:res.end].strip()
ent_text_norm = normalize_label(ent_text)
if ent_text_norm in IGNORE_LABELS:
logger.debug(f"Skipping anonymization of label: '{ent_text}'")
continue
# Recadrage IBAN strict
if res.entity_type == "IBAN":
match = IBAN_REGEX.search(ent_text)
if match:
true_iban = match.group(0)
start_offset = ent_text.find(true_iban)
if start_offset != -1:
res.start += start_offset
res.end = res.start + len(true_iban)
else:
logger.warning(f"Invalid IBAN detected, skipping: '{ent_text}'")
continue
# Recadrage IP_ADDRESS strict IPv4
if not IBAN_REGEX.search(ent_text): continue
if res.entity_type == "IP_ADDRESS":
match = IPV4_REGEX.search(ent_text)
if match:
true_ip = match.group(0)
start_offset = ent_text.find(true_ip)
if start_offset != -1:
res.start += start_offset
res.end = res.start + len(true_ip)
else:
logger.warning(f"Invalid IP detected, skipping: '{ent_text}'")
continue
if not IPV4_REGEX.search(ent_text): continue
filtered_results.append(res)
response_data = [res.to_dict() for res in filtered_results]
return make_response(jsonify(response_data), 200)
except Exception as e:
logger.exception("Error processing analysis")
return jsonify({"error": str(e)}), 500
# =====================================================================
# NOUVEL ENDPOINT /anonymize QUI FAIT LE REMPLACEMENT
# ### CORRECTION FONDAMENTALE ### NOUVEL ENDPOINT /anonymize
# =====================================================================
@app.route("/anonymize", methods=["POST"])
def anonymize_text():
@@ -155,16 +117,30 @@ def anonymize_text():
if not text_to_process:
return jsonify({"error": "text field is missing or empty"}), 400
# Étape 1 : Analyser le texte pour trouver les entités
# Étape 1 : Analyser le texte (comme avant)
analyzer_results = analyzer.analyze(text=text_to_process, language=language)
# Étape 2 : Anonymiser le texte en utilisant les résultats de l'analyse
# Étape 2 : ### NOUVELLE LOGIQUE ###
# Construire le dictionnaire d'anonymiseurs à partir de notre configuration
anonymizers_from_config = {}
if "default_anonymizers" in anonymizer_config and "replacements" in anonymizer_config:
default_anonymizers = anonymizer_config["default_anonymizers"]
replacements = anonymizer_config["replacements"]
for entity, method in default_anonymizers.items():
if method.lower() == "replace":
# On cherche la valeur de remplacement pour cette entité
new_value = replacements.get(entity)
if new_value is not None:
anonymizers_from_config[entity] = Replace(new_value=new_value)
# Étape 3 : Appeler .anonymize() en lui passant le dictionnaire que nous venons de créer
anonymized_result = anonymizer.anonymize(
text=text_to_process,
analyzer_results=analyzer_results
analyzer_results=analyzer_results,
anonymizers=anonymizers_from_config # On passe notre configuration ici
)
# Étape 3 : Renvoyer le texte anonymisé
# Étape 4 : Renvoyer le texte anonymisé
return jsonify({"text": anonymized_result.text}), 200
except Exception as e: