LLM Security

DLP IA générative : empêcher les fuites de données via les prompts

Architecture DLP pour LLM : scan input/output, sensitivity labels Microsoft Purview, intégration Presidio, classification automatique. Stack 2026 et patterns code.

Naim Aouaichia
17 min de lecture
  • DLP
  • architecture
  • Presidio
  • Purview
  • classification

Empêcher les fuites de données via les prompts dans une app LLM corporate (votre chatbot SAV, assistant interne, Copilot custom) demande une architecture DLP inline distincte du DLP endpoint qui protège des outils shadow externes. Cet article documente l'architecture 5 couches (pre-prompt classification, pre-prompt redaction conditionnelle, RAG security layer, output scanning, egress prevention), l'intégration Microsoft Purview / AWS Macie / Google DLP / Presidio / Lakera Guard / classifier maison, propagation sensitivity labels depuis sources documents jusqu'à response output, classification automatique en 3 couches (règles statiques + DLP enterprise + ML classifier), pipeline output scanning 4 couches obligatoires (PII detection, internal markers, cross-tenant leak, egress markers), audit du dispositif 5 dimensions (coverage, précision, latence, conformité, adversarial tests). Cible : architectes data + AI engineers structurant une app LLM corporate, RSSI auditant DLP IA, équipes intégrant Microsoft Purview / AWS Macie dans un pipeline LLM.

Pour la couche endpoint (employé / outils shadow) : empêcher un employé de coller du code source dans ChatGPT. Pour le RAG sécurisé spécifiquement : empêcher l'exfiltration de données sensibles via chatbot RAG.

Différence DLP endpoint vs DLP architectural

Cas d'usage différents

[DLP endpoint / browser extension]
    Protège : utilisateur final
    Cible : shadow AI (chat.openai.com plain, claude.ai perso)
    Vecteur : copy-paste, file upload manuel
    Couche : navigateur / OS device
    Solution : Netskope, Zscaler, Purview Endpoint, browser extension custom
    
[DLP architectural, cet article]
    Protège : application LLM corporate déployée
    Cible : votre chatbot SAV, agent interne, Copilot custom
    Vecteur : prompt légitime via app, RAG retrieval, output response
    Couche : pipeline applicatif inline
    Solution : Presidio + Purview/Macie/GCP DLP + classifier maison + Lakera Guard

Pourquoi les deux sont nécessaires

Endpoint sans architectural : votre app corporate peut leak via output sans contrôle. RAG cross-tenant, system prompt extraction, hallucination de données réelles → tous non couverts par DLP browser extension.

Architectural sans endpoint : employés contournent en utilisant chat.openai.com plain au lieu de votre app corporate.

Stratégie complète = empilement endpoint + architectural.

Architecture DLP 5 couches inline

Vue d'ensemble

[User input]
     │
     ▼
[Couche 1, Pre-prompt classification]
     │ Sensibilité détectée : Public / Internal / Confidential / Secret
     │ Décision routing LLM
     ▼
[Couche 2, Pre-prompt redaction conditionnelle]
     │ Pseudonymisation Presidio si données sensibles
     │ Mapping local pour reverse-substitution
     ▼
[Couche 3, RAG security layer]
     │ Sensitivity labels depuis Microsoft Purview / etc.
     │ Filter sur user clearance
     │ Audit cross-classification
     ▼
[LLM call]
     │ Routing vers : OpenAI standard | Azure OpenAI EU | Mistral EU | on-prem
     │ selon classification
     ▼
[Couche 4, Output scanning]
     │ PII detection
     │ Internal markers
     │ Cross-tenant leak
     ▼
[Couche 5, Egress prevention]
     │ URLs externes, markdown image, encoded data
     ▼
[Response to user]

Latence ajoutée : ~150-400ms p95

Couche 1, Pre-prompt classification

Approche 3 méthodes empilées

# classifier.py
import re
from presidio_analyzer import AnalyzerEngine
from typing import Literal
 
ClassificationLevel = Literal["public", "internal", "confidential", "secret"]
 
class DataClassifier:
    def __init__(self):
        self.analyzer = AnalyzerEngine()
        self.purview_client = PurviewClient()  # custom wrapper
        self.ml_classifier = load_finetuned_classifier()
    
    def classify(self, content: str) -> dict:
        results = {
            "rules": self._classify_by_rules(content),
            "purview": self._classify_by_purview(content),
            "ml": self._classify_by_ml(content),
        }
        
        # Décision finale : max des 3
        levels = [r["level"] for r in results.values()]
        final_level = max(levels, key=lambda l: LEVEL_PRIORITY[l])
        
        return {
            "level": final_level,
            "details": results,
            "confidence": self._compute_confidence(results),
        }
 
 
    def _classify_by_rules(self, content: str) -> dict:
        # Patterns headers explicites
        if re.search(r"\[Sensitivity:\s*(Highly\s*Confidential|Secret)\]", content, re.I):
            return {"level": "secret", "matched": "header_secret"}
        if re.search(r"\[Sensitivity:\s*Confidential\]", content, re.I):
            return {"level": "confidential", "matched": "header_confidential"}
        if re.search(r"\[Sensitivity:\s*Internal\]|Internal\s+Use\s+Only", content, re.I):
            return {"level": "internal", "matched": "header_internal"}
        
        # Patterns code/secrets
        if re.search(r"(?i)(api[_-]?key|password|secret)\s*[=:]\s*['\"][\w\-]{20,}", content):
            return {"level": "secret", "matched": "secret_pattern"}
        
        # Patterns financiers
        if re.search(r"\s*\d{6,}|\$\s*\d{6,}", content):
            return {"level": "confidential", "matched": "high_amount"}
        
        # Patterns noms internes (à customiser)
        INTERNAL_PROJECTS = ["projet_alpha", "ma_2026", "internal_codename_x"]
        for proj in INTERNAL_PROJECTS:
            if proj.lower() in content.lower():
                return {"level": "confidential", "matched": f"internal_project_{proj}"}
        
        return {"level": "public", "matched": None}
    
    def _classify_by_purview(self, content: str) -> dict:
        # Microsoft Purview Sensitive Information Types
        results = self.purview_client.scan(content, sit_categories=[
            "credit_card", "passport_us", "fr_nir", "ssn",
            "iban", "medical_record", "drivers_license",
        ])
        
        if any(r["confidence"] > 0.85 for r in results):
            return {"level": "confidential", "purview_findings": results}
        if any(r["confidence"] > 0.5 for r in results):
            return {"level": "internal", "purview_findings": results}
        
        return {"level": "public", "purview_findings": []}
    
    def _classify_by_ml(self, content: str) -> dict:
        # Classifier DistilBERT fine-tuné sur corpus annoté
        prediction = self.ml_classifier.predict(content)
        # prediction = {"public": 0.7, "internal": 0.2, "confidential": 0.08, "secret": 0.02}
        
        max_label = max(prediction, key=prediction.get)
        confidence = prediction[max_label]
        
        return {
            "level": max_label,
            "confidence": confidence,
            "raw_scores": prediction,
        }
 
 
LEVEL_PRIORITY = {"public": 0, "internal": 1, "confidential": 2, "secret": 3}

Routing LLM par classification

ROUTING_BY_CLASSIFICATION = {
    "public": ["openai_api", "azure_openai_eu", "mistral_eu", "onprem"],
    "internal": ["azure_openai_eu", "mistral_eu", "onprem"],
    "confidential": ["azure_openai_eu", "mistral_eu", "onprem"],
    "secret": ["onprem"],  # uniquement sur infra propre
}
 
async def route_to_llm(prompt: str, classification: str):
    allowed_providers = ROUTING_BY_CLASSIFICATION[classification]
    
    # Sélection selon disponibilité, coût, qualité
    provider = select_optimal_provider(allowed_providers, prompt_complexity=...)
    
    if provider == "openai_api" and classification != "public":
        raise SecurityError(f"Cannot route {classification} data to openai_api")
    
    return await provider.complete(prompt)

Couche 2, Pre-prompt redaction conditionnelle

Pseudonymisation avec mapper réversible

# pii_mapper.py
import hmac
import hashlib
import os
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
 
PSEUDO_KEY = os.environ["PSEUDO_HMAC_KEY"].encode()
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
 
 
class PIIMapperReversible:
    """Pseudonymise PII avec mapping local pour reverse-substitution."""
    
    def __init__(self):
        self._token_to_value = {}
        self._counter = {}
    
    def get_token(self, entity_type: str, value: str) -> str:
        # Stable : même valeur → même token
        existing = next(
            (k for k, v in self._token_to_value.items() if v == value),
            None,
        )
        if existing:
            return existing
        
        self._counter[entity_type] = self._counter.get(entity_type, 0) + 1
        token = f"[{entity_type}_{self._counter[entity_type]:03d}]"
        self._token_to_value[token] = value
        return token
    
    def reverse(self, text: str) -> str:
        for token, value in self._token_to_value.items():
            text = text.replace(token, value)
        return text
 
 
def pseudonymize_for_external_llm(prompt: str, classification: str) -> tuple[str, PIIMapperReversible]:
    """Pseudonymise PII si on route vers LLM externe (non-EU ou non-corporate)."""
    
    # Pas de pseudonymisation si on reste sur infra interne
    if classification == "secret":
        return prompt, PIIMapperReversible()  # ne va pas sortir de toute façon
    
    mapper = PIIMapperReversible()
    
    pii_types = ["EMAIL_ADDRESS", "PHONE_NUMBER", "PERSON", "FR_NIR", "IBAN_CODE", "CREDIT_CARD"]
    results = analyzer.analyze(text=prompt, language="fr", entities=pii_types)
    
    pseudonymized = prompt
    for r in sorted(results, key=lambda x: x.start, reverse=True):
        original = prompt[r.start:r.end]
        token = mapper.get_token(r.entity_type, original)
        pseudonymized = pseudonymized[:r.start] + token + pseudonymized[r.end:]
    
    return pseudonymized, mapper
 
 
# Usage dans pipeline
async def safe_llm_call(prompt: str):
    # Couche 1 : classification
    classification = classifier.classify(prompt)["level"]
    
    # Couche 2 : pseudonymisation si externe
    provider = select_provider(classification)
    if provider == "openai_api":
        # Externe non-EU : pseudonymiser
        prompt_safe, mapper = pseudonymize_for_external_llm(prompt, classification)
    else:
        # EU ou interne : pas de pseudonymisation nécessaire
        prompt_safe = prompt
        mapper = None
    
    # LLM call
    response = await provider.complete(prompt_safe)
    
    # Reverse substitution si pseudonymisé
    if mapper:
        response = mapper.reverse(response)
    
    return response

Couche 3, RAG security layer

Indexation label-aware

# rag_indexing.py
from msgraph_core import GraphClient
from chromadb import PersistentClient
 
graph_client = GraphClient(...)
chroma_client = PersistentClient(path="./chroma_db")
collection = chroma_client.get_or_create_collection("zerodaysupport_rag")
 
 
async def index_sharepoint_docs():
    """Indexer documents SharePoint avec sensitivity label propagé."""
    
    docs = await graph_client.get(f"/sites/{site_id}/drive/root/children")
    
    for doc in docs.value:
        # Lire sensitivity label via Graph API
        item = await graph_client.get(
            f"/sites/{site_id}/drive/items/{doc.id}",
            params={"$expand": "extensions"},
        )
        
        sensitivity_label = extract_sensitivity_label(item)  # ex: "Confidential"
        tenant_id = extract_tenant_id(item)
        
        # Charger contenu
        content = await graph_client.get(f"/sites/{site_id}/drive/items/{doc.id}/content")
        text = extract_text(content)
        
        # Chunker
        chunks = split_into_chunks(text)
        
        # Indexer avec metadata complète
        for i, chunk in enumerate(chunks):
            collection.upsert(
                ids=[f"{doc.id}_chunk_{i}"],
                documents=[chunk],
                metadatas=[{
                    "doc_id": doc.id,
                    "doc_name": doc.name,
                    "sensitivity_label": sensitivity_label,
                    "tenant_id": tenant_id,
                    "source": "sharepoint",
                    "indexed_at": datetime.utcnow().isoformat(),
                }],
            )

Retrieval avec filtre clearance

SENSITIVITY_HIERARCHY = {
    "Public": 0,
    "Internal": 1,
    "Confidential": 2,
    "Highly Confidential": 3,
    "Secret": 4,
}
 
 
async def get_user_clearance(user_id: str) -> int:
    """Récupère niveau de clearance utilisateur depuis Entra ID groups."""
    user = await graph_client.get(f"/users/{user_id}/memberOf")
    
    groups = [g.displayName for g in user.value]
    
    if "Secret_Cleared" in groups:
        return SENSITIVITY_HIERARCHY["Secret"]
    if "Highly_Confidential_Cleared" in groups:
        return SENSITIVITY_HIERARCHY["Highly Confidential"]
    if "Confidential_Cleared" in groups:
        return SENSITIVITY_HIERARCHY["Confidential"]
    if "Internal_Cleared" in groups:
        return SENSITIVITY_HIERARCHY["Internal"]
    
    return SENSITIVITY_HIERARCHY["Public"]
 
 
async def query_rag_label_aware(
    query: str,
    user_id: str,
    tenant_id: str,
    k: int = 5,
) -> dict:
    """Retrieval avec filtrage tenant + clearance."""
    
    user_clearance = await get_user_clearance(user_id)
    user_clearance_label = next(
        label for label, level in SENSITIVITY_HIERARCHY.items()
        if level == user_clearance
    )
    
    # Récupérer chunks pertinents
    raw_results = collection.query(
        query_texts=[query],
        n_results=k * 2,  # over-fetch pour permettre filtering
        where={"tenant_id": tenant_id},  # immutable côté serveur
    )
    
    # Filtrer par clearance
    filtered = []
    blocked_count = 0
    for chunk_doc, metadata in zip(raw_results["documents"][0], raw_results["metadatas"][0]):
        chunk_label = metadata.get("sensitivity_label", "Public")
        chunk_level = SENSITIVITY_HIERARCHY.get(chunk_label, 0)
        
        if chunk_level <= user_clearance:
            filtered.append({"document": chunk_doc, "metadata": metadata})
        else:
            blocked_count += 1
    
    # Truncate au top-k
    filtered = filtered[:k]
    
    # Audit
    if blocked_count > 0:
        await log_clearance_filter({
            "user_id": user_id,
            "user_clearance": user_clearance_label,
            "query": query,
            "blocked_chunks": blocked_count,
            "delivered_chunks": len(filtered),
        })
    
    # Calcul max label dans contexte (pour routing LLM)
    max_label_in_context = max(
        (SENSITIVITY_HIERARCHY.get(c["metadata"]["sensitivity_label"], 0) for c in filtered),
        default=0,
    )
    
    return {
        "chunks": filtered,
        "max_label_level": max_label_in_context,
        "blocked_count": blocked_count,
    }

Routing LLM par max label

async def chat_with_rag(query: str, user_id: str, tenant_id: str):
    # 1. Retrieval label-aware
    rag_result = await query_rag_label_aware(query, user_id, tenant_id)
    
    # 2. Routing selon max label dans contexte
    max_level = rag_result["max_label_level"]
    
    if max_level >= SENSITIVITY_HIERARCHY["Highly Confidential"]:
        provider = "azure_openai_eu"  # ou on-prem
    elif max_level >= SENSITIVITY_HIERARCHY["Confidential"]:
        provider = "azure_openai_eu"
    else:
        provider = "openai_api"  # ou Azure standard
    
    # 3. Build context et appeler
    context_text = "\n\n".join(c["document"] for c in rag_result["chunks"])
    response = await call_llm(
        provider=provider,
        system="Tu es un assistant. Réponds uniquement avec les sources fournies.",
        user=f"Sources:\n{context_text}\n\nQuestion: {query}",
    )
    
    # 4. Output max_label = max des sources
    response_metadata = {
        "max_source_label": max_level,
        "sources_used": [c["metadata"]["doc_id"] for c in rag_result["chunks"]],
    }
    
    return {
        "answer": response,
        "metadata": response_metadata,
    }

Couche 4, Output scanning

Pipeline 4 sous-couches

# output_scanner.py
import re
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
 
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
 
 
class OutputScanner:
    def __init__(self):
        self.internal_patterns = self._load_internal_patterns()
        self.allowed_domains = {"zerodaysupport.com", "support.zerodaysupport.com"}
    
    def scan_and_filter(
        self,
        response: str,
        user_role: str,
        user_request: str,
        tenant_id: str,
    ) -> dict:
        result = {
            "original": response,
            "filtered": response,
            "alerts": [],
            "blocked": False,
        }
        
        # Sub-couche 1 : PII detection
        result = self._scan_pii(result, user_role, user_request)
        
        # Sub-couche 2 : Internal markers
        result = self._scan_internal_markers(result, user_role)
        
        # Sub-couche 3 : Cross-tenant leak
        result = self._scan_cross_tenant(result, tenant_id)
        
        # Sub-couche 4 : Egress markers
        result = self._scan_egress(result)
        
        return result
    
    def _scan_pii(self, result: dict, user_role: str, user_request: str) -> dict:
        """Redact PII non fournis par user et selon role."""
        text = result["filtered"]
        
        # PII fournis par l'user dans son prompt → préserver dans réponse
        user_pii_results = analyzer.analyze(text=user_request, language="fr",
                                            entities=PII_ENTITIES)
        user_pii_values = {user_request[r.start:r.end] for r in user_pii_results}
        
        # Detect PII dans response
        response_pii = analyzer.analyze(text=text, language="fr", entities=PII_ENTITIES)
        
        for r in sorted(response_pii, key=lambda x: x.start, reverse=True):
            value = text[r.start:r.end]
            
            # Skip si fourni par user
            if value in user_pii_values:
                continue
            
            # Skip si user a clearance pour ce type de PII
            if user_role in ["admin", "data_officer"]:
                continue
            
            # Redact
            text = text[:r.start] + f"[{r.entity_type}_REDACTED]" + text[r.end:]
            result["alerts"].append(f"PII redacted: {r.entity_type}")
        
        result["filtered"] = text
        return result
    
    def _scan_internal_markers(self, result: dict, user_role: str) -> dict:
        """Detect mentions internes (codes promo, projets confidentiels, URLs admin)."""
        text = result["filtered"]
        
        for pattern_name, pattern in self.internal_patterns.items():
            if re.search(pattern, text):
                if user_role not in ["employee", "admin"]:
                    # User externe ne doit pas voir ces patterns
                    text = re.sub(pattern, "[INTERNAL_REDACTED]", text)
                    result["alerts"].append(f"Internal marker redacted: {pattern_name}")
        
        result["filtered"] = text
        return result
    
    def _scan_cross_tenant(self, result: dict, tenant_id: str) -> dict:
        """Detect mentions d'autres tenants dans la réponse."""
        text = result["filtered"]
        
        # Patterns tenant_id explicites
        tenant_patterns = re.findall(r"tenant[_-]?id\s*[=:]\s*['\"]?([a-zA-Z0-9_-]+)", text)
        
        for found_tenant in tenant_patterns:
            if found_tenant != tenant_id:
                result["blocked"] = True
                result["alerts"].append(f"CRITICAL: cross-tenant leak detected ({found_tenant})")
                # Block complet
                result["filtered"] = "Désolé, je ne peux pas répondre à cette demande. (incident loggé)"
                return result
        
        return result
    
    def _scan_egress(self, result: dict) -> dict:
        """Bloquer URLs externes, markdown images, encoded data."""
        text = result["filtered"]
        
        # URLs externes
        def url_replacer(match):
            url = match.group(0)
            domain_match = re.search(r"https?://([^/\s]+)", url)
            if not domain_match:
                return url
            domain = domain_match.group(1)
            if any(d in domain for d in self.allowed_domains):
                return url
            result["alerts"].append(f"External URL blocked: {domain}")
            return "[URL_EXTERNE_BLOQUÉE]"
        
        text = re.sub(r"https?://\S+", url_replacer, text)
        
        # Markdown images (vecteur exfil)
        if re.search(r"!\[.*?\]\(.*?\)", text):
            text = re.sub(r"!\[.*?\]\(.*?\)", "[IMAGE_BLOQUÉE]", text)
            result["alerts"].append("Markdown image blocked")
        
        # Long base64 (potentiel exfil)
        long_b64 = re.findall(r"[A-Za-z0-9+/]{100,}={0,3}", text)
        if long_b64:
            result["alerts"].append(f"Long base64 detected ({len(long_b64)} occurrences)")
            # Optionnel : block si pattern suspect
        
        result["filtered"] = text
        return result
    
    def _load_internal_patterns(self):
        return {
            "internal_promo_code": r"\bEMP\d{4}-[A-Z]+\b",
            "admin_url": r"\bhttps?://internal\.[a-z]+\.[a-z]{2,3}/admin",
            "internal_project_alpha": r"\bprojet[_-]?alpha\b",
            "ma_2026": r"\bMA[_-]?2026\b",
            # Customiser selon votre org
        }
 
 
PII_ENTITIES = ["EMAIL_ADDRESS", "PHONE_NUMBER", "PERSON", "FR_NIR", "IBAN_CODE", "CREDIT_CARD"]
 
 
# Usage
scanner = OutputScanner()
 
@app.post("/chat")
async def chat(req: ChatRequest, current_user):
    # Pipeline complet
    classified = classifier.classify(req.message)
    
    rag_result = await query_rag_label_aware(req.message, current_user.id, current_user.tenant_id)
    
    response_raw = await call_llm(...)
    
    # Output scanning
    scan_result = scanner.scan_and_filter(
        response=response_raw,
        user_role=current_user.role,
        user_request=req.message,
        tenant_id=current_user.tenant_id,
    )
    
    if scan_result["blocked"]:
        # Logger incident critical
        await log_critical_incident({
            "request_id": req.id,
            "user_id": current_user.id,
            "alerts": scan_result["alerts"],
            "raw_response": response_raw[:500],
        })
        return {"answer": scan_result["filtered"]}
    
    # Logger alerts non-blocking
    if scan_result["alerts"]:
        await log_alerts(scan_result["alerts"])
    
    return {"answer": scan_result["filtered"]}

Stack par environnement corporate

Microsoft 365 / Azure

[Pipeline DLP IA, stack Microsoft]

Couche 1 (classification) : Microsoft Purview SIT + Sensitivity labels
Couche 2 (redaction) : Presidio + Purview labels propagation
Couche 3 (RAG) : Azure AI Search avec security filters + Graph API labels
Couche 4 (output) : Presidio + Purview SIT scan
Couche 5 (egress) : Microsoft Defender for Cloud Apps

Avantages : intégration native, propagation labels automatique

AWS

[Pipeline DLP IA, stack AWS]

Couche 1 : Amazon Macie pour scan données sensibles
Couche 2 : Presidio + KMS-encrypted secrets store
Couche 3 : OpenSearch / Pinecone avec metadata filters
Couche 4 : Macie + custom regex
Couche 5 : AWS Network Firewall + custom egress

Avantages : ecosystem AWS, IAM intégration

Google Cloud

[Pipeline DLP IA, stack GCP]

Couche 1 : Google Cloud DLP API (info types)
Couche 2 : Presidio + Cloud DLP transformations
Couche 3 : Vertex AI Search avec security filters
Couche 4 : Cloud DLP scan response
Couche 5 : Cloud Armor egress + custom

Avantages : ecosystem GCP, Vertex AI native

Multi-cloud / agnostique

[Pipeline DLP IA, stack open-source]

Couche 1 : Presidio + classifier maison fine-tuné
Couche 2 : Presidio anonymizer + mapper réversible
Couche 3 : ChromaDB / Qdrant / Weaviate + metadata filters
Couche 4 : Presidio + regex custom + Lakera Guard (optionnel)
Couche 5 : Custom egress middleware

Avantages : portable, pas de vendor lock-in, gratuit

Audit DLP, 5 dimensions

Dimension 1, Coverage

# audit_coverage.py
async def audit_dlp_coverage():
    """Vérifier que tous les flux LLM passent par DLP."""
    
    findings = []
    
    # Inventaire endpoints LLM
    endpoints = await discover_llm_endpoints()
    
    for ep in endpoints:
        # Vérifier que les 5 couches sont actives
        coverage = {
            "pre_prompt_classification": check_layer_active(ep, "classifier"),
            "pre_prompt_redaction": check_layer_active(ep, "redaction"),
            "rag_security": check_layer_active(ep, "rag_filter"),
            "output_scanning": check_layer_active(ep, "output_scanner"),
            "egress_prevention": check_layer_active(ep, "egress"),
        }
        
        missing = [layer for layer, active in coverage.items() if not active]
        
        if missing:
            findings.append({
                "endpoint": ep["url"],
                "missing_layers": missing,
                "severity": "high" if "output_scanning" in missing else "medium",
            })
    
    return findings

Dimension 2, Précision

async def measure_dlp_precision():
    """TPR vs FPR sur sample représentatif."""
    
    # Test corpus annoté manuellement
    # 500 prompts publics + 500 confidentiels
    test_corpus = load_annotated_test_corpus()
    
    results = {"tp": 0, "fp": 0, "tn": 0, "fn": 0}
    
    for sample in test_corpus:
        classification = classifier.classify(sample["text"])["level"]
        actual = sample["label"]
        
        is_sensitive_predicted = classification != "public"
        is_sensitive_actual = actual != "public"
        
        if is_sensitive_predicted and is_sensitive_actual:
            results["tp"] += 1
        elif is_sensitive_predicted and not is_sensitive_actual:
            results["fp"] += 1
        elif not is_sensitive_predicted and not is_sensitive_actual:
            results["tn"] += 1
        else:
            results["fn"] += 1
    
    tpr = results["tp"] / (results["tp"] + results["fn"])
    fpr = results["fp"] / (results["fp"] + results["tn"])
    
    return {
        "tpr": tpr,
        "fpr": fpr,
        "details": results,
        "verdict": "PASS" if tpr > 0.9 and fpr < 0.05 else "FAIL",
    }

Dimension 3, Latence

async def benchmark_dlp_latency(n: int = 1000):
    """Mesurer latence ajoutée par chaque couche."""
    
    samples = load_test_samples(n)
    
    timings = {layer: [] for layer in ["classification", "redaction", "rag", "output", "egress"]}
    
    for sample in samples:
        for layer in timings:
            start = time.time()
            await execute_layer(layer, sample)
            timings[layer].append((time.time() - start) * 1000)
    
    return {
        layer: {
            "p50": np.percentile(times, 50),
            "p95": np.percentile(times, 95),
            "p99": np.percentile(times, 99),
        }
        for layer, times in timings.items()
    }
 
 
# Cibles
LATENCY_TARGETS = {
    "classification": {"p95": 100},  # ms
    "redaction": {"p95": 200},
    "rag": {"p95": 200},
    "output": {"p95": 100},
    "egress": {"p95": 50},
    "total": {"p95": 400},
}

Dimension 4, Conformité

Mapping checklist par régulation :

## RGPD coverage
- [x] Détection PII (Presidio)
- [x] Pseudonymisation upstream
- [x] Audit logs pseudonymisés
- [x] Right to erasure procédure
 
## EU AI Act coverage (high-risk)
- [x] Logging Art. 12
- [x] Documentation technique Art. 11
- [x] Cybersecurity Art. 15
- [ ] Conformity assessment (en cours)
 
## HDS (santé)
- [x] Hébergement HDS Azure region EU
- [x] Routing data médicales vers HDS-certified seulement
 
## PCI-DSS (cartes paiement)
- [x] Detection credit_card pattern
- [x] Block transmission vers tier non PCI

Dimension 5, Tests adversariaux

# adversarial_tests.py
ADVERSARIAL_PAYLOADS = [
    # PII obfusqué
    "Contact me at j ohn @ ex ample . com",
    "Téléphone : 06 1 2 3 4 5 6 7 8",
    
    # Encoding tricks
    "My SSN is " + base64.b64encode(b"123-45-6789").decode(),
    "API key: " + " ".join("sk-abc123def456ghi789jkl012"),  # spaced
    
    # Paraphrase
    "Mon employeur a été ZerodaySupport, j'y travaille depuis 5 ans, mon manager est Jean Dupont",
    
    # Langues low-resource
    "মেরা ইমেইল হল jane.smith@example.com",  # Bengali
    
    # Obfuscation visuelle
    "API key: skABCD" + "_REDACTED_HEXSTRING_TOKEN" + "123",
]
 
 
async def test_adversarial_detection():
    results = []
    for payload in ADVERSARIAL_PAYLOADS:
        classification = classifier.classify(payload)["level"]
        results.append({
            "payload": payload[:50] + "...",
            "classified_as": classification,
            "should_be_sensitive": True,
            "passed": classification != "public",
        })
    
    pass_rate = sum(r["passed"] for r in results) / len(results)
    return {"results": results, "pass_rate": pass_rate}

Audit complet

DimensionCibleMesure
Coverage100% endpointsAudit script automatisé mensuel
Précision TPR> 90%Test corpus annoté trimestriel
Précision FPR< 5%Idem
Latence p95 totale< 400msBenchmark continu
ConformitéChecklist par régulationAudit semestriel
Tests adversariaux> 70% catch rateTrimestriel

Erreurs récurrentes

Erreur 1, Pas de classification = routing aveugle

Tous les prompts vers OpenAI standard sans distinction. Classification + routing obligatoire.

Erreur 2, Output scanning oublié

Focus uniquement sur input. Mais la fuite via output est plus grave (user voit le résultat). Output scanning aussi critique.

Erreur 3, Sensitivity labels non propagés

Microsoft Purview labels existent dans SharePoint, mais le pipeline LLM les ignore. Propager bout-en-bout.

Erreur 4, Filtre tenant uniquement dans le prompt

Demander au LLM de filtrer par tenant via system prompt = casseable. Filter immutable côté serveur (RLS / metadata DB).

Erreur 5, Pseudonymisation systématique

Pseudonymiser même pour LLM EU sans nécessité = perte d'information sans bénéfice. Conditionnel selon classification + provider.

Erreur 6, Pas d'audit régulier

Déployé, jamais re-évalué. Drift entre attendu et réel. Audit trimestriel + KPIs trend.

Erreur 7, Tests adversariaux absents

Patterns simples seulement. Vrais attaquants utilisent obfuscation, encoding. Test adversarial trimestriel.

Erreur 8, Pas d'intégration corporate DLP existante

Réinventer la roue. Microsoft Purview / AWS Macie déjà en place. Intégrer plutôt que paralleler.

Ce que vous devriez retenir

  1. DLP architectural ≠ DLP endpoint : les deux nécessaires, complémentaires
  2. 5 couches inline : classification → redaction → RAG security → output scan → egress
  3. Classification 3 méthodes empilées : règles statiques + DLP enterprise + ML classifier
  4. Routing LLM par classification : public→OpenAI, internal→Azure EU, secret→on-prem
  5. Sensitivity labels propagés depuis source documents jusqu'à response
  6. Output scanning 4 sub-couches : PII + internal markers + cross-tenant + egress
  7. Stack par ecosystem : Microsoft (Purview), AWS (Macie), GCP (Cloud DLP), open-source (Presidio + maison)
  8. Audit 5 dimensions : coverage + precision + latence + conformité + adversarial
  9. Latence cible : < 400ms p95 total ajouté
  10. Précision cible : TPR > 90%, FPR < 5%

Une DLP IA architecturale mature 2026 demande rigueur méthodique et intégration ecosystem corporate. Sans elle, votre app LLM est un canal d'exfiltration en attente. Avec elle, vous avez le filet de sécurité applicatif qui complète guardrails + observability + zero-trust.


Pour aller plus loin : pour le RAG sécurisé spécifiquement : empêcher l'exfiltration de données sensibles via chatbot RAG. Pour le shadow AI / endpoint : empêcher un employé de coller du code source dans ChatGPT.

Questions fréquentes

  • Quelle différence entre DLP endpoint (article précédent) et DLP architectural pour LLM apps ?
    **DLP endpoint / browser extension** (cf article *Empêcher un employé de coller du code source dans ChatGPT*) couvre l'**utilisateur final** qui utilise un outil IA externe non-corporate (chat.openai.com plain, claude.ai perso). Son usage : empêcher la fuite via shadow AI. **DLP architectural pour LLM apps** (cet article) couvre les **applications LLM corporate** que vous **développez et déployez** : votre chatbot SAV, votre assistant interne, votre Copilot custom. Le risque ici est différent : (a) les données traitées arrivent **légitimement** dans l'app via fonctionnalité produit, (b) elles peuvent fuiter via prompts (RAG cross-tenant, logs, response contenant data tiers), (c) elles transitent vers le LLM provider (OpenAI, Azure, Mistral) avec ses propres risques. **Stack technique différente** : pas de browser extension mais de la classification automatique inline, sensitivity labels propagation, scan input/output dans le pipeline applicatif, intégration avec sécurité data enterprise (Microsoft Purview, AWS Macie, Google DLP). Les 2 DLP sont **complémentaires** : endpoint pour le shadow, architectural pour l'app corporate.
  • Quelle architecture DLP pour une app LLM corporate ?
    Stack en 5 couches inline. **(1) Pre-prompt classification** : avant envoi LLM, classifier la sensibilité des données dans le prompt (Public / Internal / Confidential / Secret). Décide quel LLM utiliser (cloud non-EU vs Azure EU vs on-prem) selon classification. **(2) Pre-prompt redaction conditionnelle** : si données sensibles présentes, pseudonymisation Presidio upstream avant envoi LLM tier non-souverain. Mapping local pour reverse-substitution. **(3) RAG security layer** : sensitivity labels propagés depuis source documents (Microsoft Purview / sensitivity-aware retrieval), filtre RLS au niveau vector store, audit logs cross-classification. **(4) Output scanning** : scanner la réponse LLM pour PII/secrets/confidential markers AVANT retour utilisateur. Bloquer si user_role n'a pas le clearance. **(5) Egress prevention** : pour réponses à risque (contains URL externe, contains markdown image), block ou warn. **Stack 2026** : Presidio (PII), Microsoft Purview (corporate Microsoft), AWS Macie (corporate AWS), Google Cloud DLP API (corporate GCP), Lakera Guard (commercial LLM-specific), classifier maison fine-tuné (cas spécifiques). **Combiner** : Presidio + Purview pour ecosystem MS, Presidio + Macie pour AWS. Pas de stack unique tout-couvrant, composer selon contexte.
  • Comment classifier automatiquement la sensibilité des données dans un prompt ?
    Trois approches, à empiler. **(1) Règles statiques (regex + keyword)** : patterns rapides pour patterns connus. Headers de classification (`[CONFIDENTIAL]`, `Internal Use Only`), regex montants financiers, regex IBAN/NIR/credit card, mentions noms projets sensibles internes (M&A en cours, secrets industriels). Latence &lt; 5ms. Faux positifs élevés mais base. **(2) Microsoft Purview Sensitive Information Types (SIT)** : corporate Microsoft, ~100 SIT prêts à l'emploi (passeports, IDs nationaux, cartes bancaires, données médicales) + custom. SDK / Graph API pour scan inline. Latence ~50-200ms. **(3) ML classifier custom** : fine-tune DistilBERT sur corpus annoté (Public/Internal/Confidential/Secret) avec labels métier. Latence ~50-100ms (CPU). Précision dépend dataset training. À retraîner trimestriellement. **Combinaison recommandée 2026** : (a) règles statiques d'abord (high-precision haute confiance + rapide). (b) Purview/Macie/GCP DLP pour PII et patterns standard. (c) Classifier custom pour spécificités métier non couvertes par 1+2. **Décision** : score combiné max des 3 → classification finale → routing vers LLM approprié. **Anti-pattern** : se reposer uniquement sur ML classifier. Faux positifs/négatifs. Toujours empiler avec règles déterministes pour les patterns critiques (secrets, headers explicites).
  • Comment propager les sensitivity labels Microsoft Purview dans un pipeline LLM ?
    Pattern Purview-aware. **Étape 1, Inventaire labels** : votre org définit labels (Public, Internal, Confidential, Highly Confidential, Secret, etc.) avec policies Microsoft Purview. **Étape 2, Indexation RAG label-aware** : lors de l'ingestion d'un document SharePoint/OneDrive dans votre RAG, lire le label via Microsoft Graph API (`GET /drives/{drive-id}/items/{item-id}/extensions/microsoft.graph.officeGraphInsights`) et stocker comme metadata du chunk dans le vector store. **Étape 3, Retrieval label-aware** : à la query, récupérer non seulement docs pertinents mais aussi leurs labels. **Étape 4, Filtre sur user clearance** : récupérer le user clearance level (depuis Entra ID groups par exemple). Filtrer chunks où `chunk_label > user_clearance`. **Étape 5, Routing LLM par max label dans contexte** : si un chunk Confidential est retourné, routing vers Azure OpenAI EU plutôt que OpenAI standard. **Étape 6, Output label** : la réponse hérite du max label des sources. Stocker en metadata response. **Étape 7, Egress** : selon label, autoriser/bloquer transmission (ex: ne pas envoyer email externe avec contenu Highly Confidential). **Stack** : Microsoft Graph API + Azure AI Search avec metadata + custom middleware Python/Node. Disponible nativement dans Azure AI Search 2025+ via `securityFilter`. **Bénéfice** : zero-trust data flow respectant la classification existante de l'org sans inventer un nouveau système.
  • Comment scanner et filtrer les outputs LLM pour empêcher fuites ?
    Pipeline output 4 couches obligatoires. **(1) PII detection** : Presidio analyzer sur la réponse, détecte email/téléphone/IBAN/NIR/credit card/personne. Decision : si user_role n'a pas accès à ces PII, redact avant retour. **(2) Internal markers detection** : regex pour mentions internes (codes promo internes, noms projets confidentiels, URLs admin). Block et alerte SOC si présent dans réponse à utilisateur externe. **(3) Cross-tenant leak detection** : si la réponse contient des identifiants/data appartenant à un autre tenant que celui du demandeur, block immédiat + alerte critical. **(4) Egress markers detection** : URLs externes hors allowlist (anti-exfiltration), markdown images vers domaines tiers, encoded data (base64 long, hex), tentatives steganographic. Block si pattern détecté. **Implémentation** : pipeline middleware FastAPI/Express qui intercepte response avant retour, applique chaque couche, bloque ou redact selon policy. Latence ajoutée ~30-100ms. **Cas particulier prompts engageants** (politique, tarif, garantie, cas Air Canada) : ajouter LLM-as-judge sur sujets sensibles pour vérifier non-engagement contractuel, avec disclaimer auto. **Anti-pattern** : scanner uniquement input. La fuite via output est plus dangereuse car le user voit le résultat. Output scanning aussi critique que input filtering.
  • Comment auditer un dispositif DLP IA en place ?
    Audit 5 dimensions. **(1) Coverage** : Quelle proportion des flux LLM passe par DLP ? Pour chaque app LLM corporate, vérifier que toutes les couches (pre-prompt, RAG, output) sont actives. Identifier flux non-couverts (souvent : APIs internes, batch jobs). **(2) Précision** : taux true positive (vraies fuites détectées) vs false positive (légitimes bloqués). Mesurer sur sample représentatif logs 30j. Cibles : TPR > 90%, FPR &lt; 5%. **(3) Latence** : impact sur expérience utilisateur. Mesurer p50/p95 latence ajoutée par couche. Cibles : &lt; 100ms p50, &lt; 300ms p95. **(4) Conformité** : la DLP couvre-t-elle les obligations RGPD (PII detection), sectorielles (HDS pour santé, PCI pour cartes), EU AI Act ? Mapping checklist par régulation. **(5) Tests adversariaux** : envoyer payloads adversariaux (PII obfusqué, encoding tricks, langues low-resource, paraphrase). Si DLP échoue → améliorer. **Stack tests** : (a) Test corpus annoté (1000+ samples Public/Internal/Confidential/Secret). (b) Outils PyRIT pour génération adversariale. (c) Audit logs SIEM pour détection anomalies. **Cadence** : audit complet trimestriel + spot checks mensuels + revue règles après chaque incident. **Documents requis** : matrice DLP coverage, KPIs trend, plan amélioration. Auditeur externe annuel pour validation indépendante.

Écrit par

Naim Aouaichia

Cyber Security Engineer et fondateur de Zeroday Cyber Academy

Ingénieur cybersécurité avec un parcours hybride : développement, DevOps Capgemini, DevSecOps IN Groupe (sécurité des documents d'identité régaliens), audits CAC 40. Fondateur de Hash24Security et Zeroday Cyber Academy. Présence LinkedIn 43 000 abonnés, Substack Zeroday Notes 23 000 abonnés.