presidio modulaire

This commit is contained in:
nBiqoz
2025-09-07 12:29:08 +02:00
parent 85d95d05e5
commit c62e5b92d5
42 changed files with 1802 additions and 324 deletions

View File

@@ -0,0 +1,241 @@
from typing import List
from presidio_analyzer import RecognizerResult
import logging
import re
logger = logging.getLogger(__name__)
class OverlapResolver:
"""
Résout les chevauchements entre entités de différents types
Priorités: IBAN > EMAIL > PHONE > IP_ADDRESS > ADDRESS > LOCATION > ORGANIZATION > PERSON
"""
def __init__(self):
# Ordre de priorité (plus haut = plus prioritaire)
self.priority_order = {
'IBAN': 100,
'CREDIT_CARD': 95,
'EMAIL_ADDRESS': 90,
'BE_ENTERPRISE_NUMBER': 88,
'PHONE_NUMBER': 85,
'BE_PHONE_NUMBER': 85,
'IP_ADDRESS': 82,
'BE_ADDRESS': 75,
'FR_ADDRESS': 75,
'DATE_TIME': 70,
'ORGANIZATION': 65,
'LOCATION': 60,
'PERSON': 50,
'NRP': 40,
'URL': 35
}
# Patterns pour identifier les organisations
self.organization_patterns = [
r'\\b\\w+Consult\\b',
r'\\bSPRL\\s+\\w+\\b', # Pattern pour SPRL + nom
r'\\bSRL\\s+\\w+\\b', # Pattern pour SRL + nom
r'\\bSA\\s+\\w+\\b', # Pattern pour SA + nom
r'\\bASBL\\s+\\w+\\b', # Pattern pour ASBL + nom
r'\\bSCS\\s+\\w+\\b', # Pattern pour SCS + nom
r'\\bSNC\\s+\\w+\\b', # Pattern pour SNC + nom
r'\\bSPRL\\b',
r'\\bSRL\\b',
r'\\bSA\\b',
r'\\bASBL\\b',
r'\\bSCS\\b',
r'\\bSNC\\b',
r'\\bLtd\\b',
r'\\bInc\\b',
r'\\bCorp\\b',
r'\\bGmbH\\b'
]
logger.info(f"✅ OverlapResolver initialisé avec {len(self.priority_order)} types d'entités")
def process(self, results: List[RecognizerResult], text: str = "") -> List[RecognizerResult]:
"""
Résout les chevauchements en gardant l'entité la plus prioritaire
"""
if not results:
return results
original_count = len(results)
# Appliquer les corrections spécifiques avant résolution des chevauchements
corrected_results = self._apply_specific_corrections(results, text)
# Trier par position pour traitement séquentiel
sorted_results = sorted(corrected_results, key=lambda x: (x.start, x.end))
resolved_results = []
i = 0
while i < len(sorted_results):
current = sorted_results[i]
overlapping_group = [current]
# Trouver tous les chevauchements avec l'entité courante
j = i + 1
while j < len(sorted_results):
if self._is_overlapping(current, sorted_results[j]):
overlapping_group.append(sorted_results[j])
j += 1
elif sorted_results[j].start >= current.end:
# Plus de chevauchement possible
break
else:
j += 1
# Résoudre le groupe de chevauchements
if len(overlapping_group) > 1:
winner = self._resolve_overlap_group(overlapping_group, text)
resolved_results.append(winner)
# Avancer l'index pour éviter de retraiter les entités du groupe
i = j
else:
resolved_results.append(current)
i += 1
logger.info(f"🔧 OverlapResolver: {original_count} -> {len(resolved_results)} entités")
return resolved_results
def _apply_specific_corrections(self, results: List[RecognizerResult], text: str) -> List[RecognizerResult]:
"""
Applique des corrections spécifiques avant la résolution des chevauchements
"""
corrected_results = []
for result in results:
entity_text = text[result.start:result.end] if text else ""
# Correction 1: PERSON -> ORGANIZATION pour les noms d'entreprise
if result.entity_type == 'PERSON' and self._is_organization_name(entity_text):
corrected_result = RecognizerResult(
entity_type='ORGANIZATION',
start=result.start,
end=result.end,
score=result.score + 0.1 # Bonus de confiance
)
logger.debug(f"🔄 Correction PERSON -> ORGANIZATION: '{entity_text}'")
corrected_results.append(corrected_result)
# Correction 2: Séparer IP des adresses physiques
elif result.entity_type in ['BE_ADDRESS', 'FR_ADDRESS'] and self._contains_ip_address(entity_text):
# Extraire l'IP et créer une entité séparée
ip_matches = list(re.finditer(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b', entity_text))
if ip_matches:
for ip_match in ip_matches:
ip_start = result.start + ip_match.start()
ip_end = result.start + ip_match.end()
# Créer l'entité IP
ip_result = RecognizerResult(
entity_type='IP_ADDRESS',
start=ip_start,
end=ip_end,
score=0.95
)
corrected_results.append(ip_result)
logger.debug(f"🔄 IP extraite de l'adresse: '{ip_match.group()}'")
# Créer une nouvelle entité adresse SANS la partie IP
# Chercher la partie adresse physique (après l'IP)
address_pattern = r'\b(?:Avenue|Rue|Boulevard|Chaussée|Place|Quai|Impasse|Drève|Clos|Allée)\b.*?\b[1-9]\d{3}\s+[A-Za-zà-ÿ\'-]+'
address_match = re.search(address_pattern, entity_text, re.IGNORECASE)
if address_match:
address_start = result.start + address_match.start()
address_end = result.start + address_match.end()
# Vérifier qu'il n'y a pas de chevauchement avec l'IP
ip_overlaps = any(not (address_end <= ip_start or address_start >= ip_end)
for ip_match in ip_matches
for ip_start, ip_end in [(result.start + ip_match.start(), result.start + ip_match.end())])
if not ip_overlaps:
address_result = RecognizerResult(
entity_type=result.entity_type,
start=address_start,
end=address_end,
score=result.score
)
corrected_results.append(address_result)
logger.debug(f"🔄 Adresse physique séparée: '{address_match.group()}'")
else:
corrected_results.append(result)
else:
corrected_results.append(result)
return corrected_results
def _is_organization_name(self, text: str) -> bool:
"""
Détermine si un texte ressemble à un nom d'organisation
"""
for pattern in self.organization_patterns:
if re.search(pattern, text, re.IGNORECASE):
return True
return False
def _contains_ip_address(self, text: str) -> bool:
"""
Vérifie si le texte contient une adresse IP
"""
ip_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
return bool(re.search(ip_pattern, text))
def _is_overlapping(self, entity1: RecognizerResult, entity2: RecognizerResult) -> bool:
"""
Vérifie si deux entités se chevauchent
"""
return not (entity1.end <= entity2.start or entity1.start >= entity2.end)
def _resolve_overlap_group(self, overlapping_entities: List[RecognizerResult], text: str = "") -> RecognizerResult:
"""
Résout un groupe d'entités qui se chevauchent
Critères: 1) Priorité du type, 2) Score de confiance, 3) Longueur
"""
def get_priority_score(entity):
base_priority = self.priority_order.get(entity.entity_type, 0)
confidence_bonus = entity.score * 10 # Score 0.9 = +9 points
# Calculer la longueur depuis les positions
entity_length = entity.end - entity.start
length_bonus = entity_length * 0.1 # Bonus longueur
# Bonus spécial pour IBAN vs FR_DRIVER_LICENSE
if entity.entity_type == 'IBAN':
# Vérifier si c'est un vrai IBAN (commence par code pays)
if text:
entity_text = text[entity.start:entity.end].replace(' ', '')
if re.match(r'^[A-Z]{2}[0-9]{2}', entity_text):
base_priority += 20 # Bonus pour vrai IBAN
return base_priority + confidence_bonus + length_bonus
# Trier par score de priorité décroissant
sorted_entities = sorted(overlapping_entities,
key=get_priority_score,
reverse=True)
winner = sorted_entities[0]
# Log des entités écartées (si texte disponible)
if text:
for loser in sorted_entities[1:]:
loser_text = text[loser.start:loser.end]
logger.debug(f"❌ Écarté: {loser.entity_type} '{loser_text}' (score: {get_priority_score(loser):.1f})")
winner_text = text[winner.start:winner.end]
logger.debug(f"✅ Gagnant: {winner.entity_type} '{winner_text}' (score: {get_priority_score(winner):.1f})")
return winner
def add_entity_priority(self, entity_type: str, priority: int):
"""
Ajoute ou modifie la priorité d'un type d'entité
"""
self.priority_order[entity_type] = priority
logger.info(f"📊 Priorité mise à jour: {entity_type} = {priority}")