127 lines
4.9 KiB
Python
127 lines
4.9 KiB
Python
import os
|
|
import re
|
|
import logging
|
|
from flask import Flask, request, jsonify, make_response
|
|
|
|
from presidio_analyzer import AnalyzerEngineProvider
|
|
|
|
logging.basicConfig(level=logging.INFO,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = Flask(__name__)
|
|
|
|
# Initialisation Presidio Analyzer via Provider
|
|
analyzer = None
|
|
try:
|
|
logger.info("--- Presidio Analyzer Service Starting ---")
|
|
CONFIG_FILE_PATH = os.environ.get("PRESIDIO_ANALYZER_CONFIG_FILE", "conf/default.yaml")
|
|
provider = AnalyzerEngineProvider(analyzer_engine_conf_file=CONFIG_FILE_PATH)
|
|
analyzer = provider.create_engine()
|
|
logger.info(f"Analyzer ready. Supported languages: {analyzer.supported_languages}")
|
|
except Exception as e:
|
|
logger.exception("Error during AnalyzerEngine initialization.")
|
|
analyzer = None
|
|
|
|
|
|
# Regex pour recadrage strict
|
|
IBAN_REGEX = re.compile(r"\b[A-Z]{2}[0-9]{2}(?:\s[A-Z0-9]{4}){4,7}\b", re.IGNORECASE)
|
|
IPV4_REGEX = re.compile(r"\b(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)(?:\.(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)){3}\b")
|
|
|
|
# Liste des labels/phrases à exclure de l'anonymisation (en minuscules)
|
|
IGNORE_LABELS = {
|
|
"témoins",
|
|
"témoins clés",
|
|
"coordonnées",
|
|
"coordonnées bancaires",
|
|
"contexte financier",
|
|
"données sensibles",
|
|
"contexte",
|
|
"montrent",
|
|
"montrent des",
|
|
"montrent des irrégularités",
|
|
"bénéficiaire",
|
|
}
|
|
|
|
def normalize_label(text: str) -> str:
|
|
return text.strip().lower()
|
|
|
|
@app.route("/analyze", methods=["POST"])
|
|
def analyze_text():
|
|
if not analyzer:
|
|
return jsonify({"error": "Analyzer engine is not available. Check startup logs."}), 500
|
|
|
|
try:
|
|
data = request.get_json(force=True)
|
|
text_to_analyze = data.get("text", "")
|
|
language = data.get("language", "fr")
|
|
|
|
if not text_to_analyze:
|
|
return jsonify({"error": "text field is missing or empty"}), 400
|
|
|
|
results = analyzer.analyze(text=text_to_analyze, language=language)
|
|
|
|
filtered_results = []
|
|
for res in results:
|
|
ent_text = text_to_analyze[res.start:res.end].strip()
|
|
ent_text_norm = normalize_label(ent_text)
|
|
|
|
# Skip anonymization for labels to keep
|
|
if ent_text_norm in IGNORE_LABELS:
|
|
logger.debug(f"Skipping anonymization of label: '{ent_text}'")
|
|
continue
|
|
|
|
# Recadrage stricte IBAN
|
|
if res.entity_type == "IBAN":
|
|
match = IBAN_REGEX.search(ent_text)
|
|
if match:
|
|
true_iban = match.group(0)
|
|
start_offset = ent_text.find(true_iban)
|
|
if start_offset != -1:
|
|
old_start, old_end = res.start, res.end
|
|
res.start += start_offset
|
|
res.end = res.start + len(true_iban)
|
|
ent_text = true_iban
|
|
logger.debug(f"Adjusted IBAN span from ({old_start}-{old_end}) to ({res.start}-{res.end}): '{ent_text}'")
|
|
else:
|
|
logger.warning(f"Cannot find exact IBAN substring inside entity: '{ent_text}'")
|
|
else:
|
|
# Pas un IBAN valide, ignorer cette entité
|
|
logger.warning(f"Entity IBAN does not match strict IBAN regex: '{ent_text}'")
|
|
continue
|
|
|
|
# Recadrage stricte IP_ADDRESS (IPv4 uniquement ici)
|
|
if res.entity_type == "IP_ADDRESS":
|
|
match = IPV4_REGEX.search(ent_text)
|
|
if match:
|
|
true_ip = match.group(0)
|
|
start_offset = ent_text.find(true_ip)
|
|
if start_offset != -1:
|
|
old_start, old_end = res.start, res.end
|
|
res.start += start_offset
|
|
res.end = res.start + len(true_ip)
|
|
ent_text = true_ip
|
|
logger.debug(f"Adjusted IP_ADDRESS span from ({old_start}-{old_end}) to ({res.start}-{res.end}): '{ent_text}'")
|
|
else:
|
|
logger.warning(f"Cannot find exact IP substring inside entity: '{ent_text}'")
|
|
else:
|
|
logger.warning(f"Entity IP_ADDRESS does not match IPv4 regex: '{ent_text}'")
|
|
continue
|
|
|
|
filtered_results.append(res)
|
|
|
|
# Optionnel: ici tu peux aussi filtrer les chevauchements si tu veux
|
|
|
|
response_data = [res.to_dict() for res in filtered_results]
|
|
return make_response(jsonify(response_data), 200)
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Error during analysis for language '{language}'.")
|
|
if "No matching recognizers" in str(e):
|
|
return jsonify({"error": f"No recognizers available for language '{language}'."}), 400
|
|
return jsonify({"error": str(e)}), 500
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app.run(host="0.0.0.0", port=5001)
|