Empêcher les fuites de données via les prompts dans une app LLM corporate (votre chatbot SAV, assistant interne, Copilot custom) demande une architecture DLP inline distincte du DLP endpoint qui protège des outils shadow externes. Cet article documente l'architecture 5 couches (pre-prompt classification, pre-prompt redaction conditionnelle, RAG security layer, output scanning, egress prevention), l'intégration Microsoft Purview / AWS Macie / Google DLP / Presidio / Lakera Guard / classifier maison, propagation sensitivity labels depuis sources documents jusqu'à response output, classification automatique en 3 couches (règles statiques + DLP enterprise + ML classifier), pipeline output scanning 4 couches obligatoires (PII detection, internal markers, cross-tenant leak, egress markers), audit du dispositif 5 dimensions (coverage, précision, latence, conformité, adversarial tests). Cible : architectes data + AI engineers structurant une app LLM corporate, RSSI auditant DLP IA, équipes intégrant Microsoft Purview / AWS Macie dans un pipeline LLM.
Pour la couche endpoint (employé / outils shadow) : empêcher un employé de coller du code source dans ChatGPT. Pour le RAG sécurisé spécifiquement : empêcher l'exfiltration de données sensibles via chatbot RAG.
Différence DLP endpoint vs DLP architectural
Cas d'usage différents
[DLP endpoint / browser extension]
Protège : utilisateur final
Cible : shadow AI (chat.openai.com plain, claude.ai perso)
Vecteur : copy-paste, file upload manuel
Couche : navigateur / OS device
Solution : Netskope, Zscaler, Purview Endpoint, browser extension custom
[DLP architectural, cet article]
Protège : application LLM corporate déployée
Cible : votre chatbot SAV, agent interne, Copilot custom
Vecteur : prompt légitime via app, RAG retrieval, output response
Couche : pipeline applicatif inline
Solution : Presidio + Purview/Macie/GCP DLP + classifier maison + Lakera Guard
Pourquoi les deux sont nécessaires
Endpoint sans architectural : votre app corporate peut leak via output sans contrôle. RAG cross-tenant, system prompt extraction, hallucination de données réelles → tous non couverts par DLP browser extension.
Architectural sans endpoint : employés contournent en utilisant chat.openai.com plain au lieu de votre app corporate.
→ Stratégie complète = empilement endpoint + architectural.
Architecture DLP 5 couches inline
Vue d'ensemble
[User input]
│
▼
[Couche 1, Pre-prompt classification]
│ Sensibilité détectée : Public / Internal / Confidential / Secret
│ Décision routing LLM
▼
[Couche 2, Pre-prompt redaction conditionnelle]
│ Pseudonymisation Presidio si données sensibles
│ Mapping local pour reverse-substitution
▼
[Couche 3, RAG security layer]
│ Sensitivity labels depuis Microsoft Purview / etc.
│ Filter sur user clearance
│ Audit cross-classification
▼
[LLM call]
│ Routing vers : OpenAI standard | Azure OpenAI EU | Mistral EU | on-prem
│ selon classification
▼
[Couche 4, Output scanning]
│ PII detection
│ Internal markers
│ Cross-tenant leak
▼
[Couche 5, Egress prevention]
│ URLs externes, markdown image, encoded data
▼
[Response to user]
Latence ajoutée : ~150-400ms p95
Couche 1, Pre-prompt classification
Approche 3 méthodes empilées
# classifier.py
import re
from presidio_analyzer import AnalyzerEngine
from typing import Literal
ClassificationLevel = Literal["public", "internal", "confidential", "secret"]
class DataClassifier:
def __init__(self):
self.analyzer = AnalyzerEngine()
self.purview_client = PurviewClient() # custom wrapper
self.ml_classifier = load_finetuned_classifier()
def classify(self, content: str) -> dict:
results = {
"rules": self._classify_by_rules(content),
"purview": self._classify_by_purview(content),
"ml": self._classify_by_ml(content),
}
# Décision finale : max des 3
levels = [r["level"] for r in results.values()]
final_level = max(levels, key=lambda l: LEVEL_PRIORITY[l])
return {
"level": final_level,
"details": results,
"confidence": self._compute_confidence(results),
}
def _classify_by_rules(self, content: str) -> dict:
# Patterns headers explicites
if re.search(r"\[Sensitivity:\s*(Highly\s*Confidential|Secret)\]", content, re.I):
return {"level": "secret", "matched": "header_secret"}
if re.search(r"\[Sensitivity:\s*Confidential\]", content, re.I):
return {"level": "confidential", "matched": "header_confidential"}
if re.search(r"\[Sensitivity:\s*Internal\]|Internal\s+Use\s+Only", content, re.I):
return {"level": "internal", "matched": "header_internal"}
# Patterns code/secrets
if re.search(r"(?i)(api[_-]?key|password|secret)\s*[=:]\s*['\"][\w\-]{20,}", content):
return {"level": "secret", "matched": "secret_pattern"}
# Patterns financiers
if re.search(r"€\s*\d{6,}|\$\s*\d{6,}", content):
return {"level": "confidential", "matched": "high_amount"}
# Patterns noms internes (à customiser)
INTERNAL_PROJECTS = ["projet_alpha", "ma_2026", "internal_codename_x"]
for proj in INTERNAL_PROJECTS:
if proj.lower() in content.lower():
return {"level": "confidential", "matched": f"internal_project_{proj}"}
return {"level": "public", "matched": None}
def _classify_by_purview(self, content: str) -> dict:
# Microsoft Purview Sensitive Information Types
results = self.purview_client.scan(content, sit_categories=[
"credit_card", "passport_us", "fr_nir", "ssn",
"iban", "medical_record", "drivers_license",
])
if any(r["confidence"] > 0.85 for r in results):
return {"level": "confidential", "purview_findings": results}
if any(r["confidence"] > 0.5 for r in results):
return {"level": "internal", "purview_findings": results}
return {"level": "public", "purview_findings": []}
def _classify_by_ml(self, content: str) -> dict:
# Classifier DistilBERT fine-tuné sur corpus annoté
prediction = self.ml_classifier.predict(content)
# prediction = {"public": 0.7, "internal": 0.2, "confidential": 0.08, "secret": 0.02}
max_label = max(prediction, key=prediction.get)
confidence = prediction[max_label]
return {
"level": max_label,
"confidence": confidence,
"raw_scores": prediction,
}
LEVEL_PRIORITY = {"public": 0, "internal": 1, "confidential": 2, "secret": 3}Routing LLM par classification
ROUTING_BY_CLASSIFICATION = {
"public": ["openai_api", "azure_openai_eu", "mistral_eu", "onprem"],
"internal": ["azure_openai_eu", "mistral_eu", "onprem"],
"confidential": ["azure_openai_eu", "mistral_eu", "onprem"],
"secret": ["onprem"], # uniquement sur infra propre
}
async def route_to_llm(prompt: str, classification: str):
allowed_providers = ROUTING_BY_CLASSIFICATION[classification]
# Sélection selon disponibilité, coût, qualité
provider = select_optimal_provider(allowed_providers, prompt_complexity=...)
if provider == "openai_api" and classification != "public":
raise SecurityError(f"Cannot route {classification} data to openai_api")
return await provider.complete(prompt)Couche 2, Pre-prompt redaction conditionnelle
Pseudonymisation avec mapper réversible
# pii_mapper.py
import hmac
import hashlib
import os
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
PSEUDO_KEY = os.environ["PSEUDO_HMAC_KEY"].encode()
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
class PIIMapperReversible:
"""Pseudonymise PII avec mapping local pour reverse-substitution."""
def __init__(self):
self._token_to_value = {}
self._counter = {}
def get_token(self, entity_type: str, value: str) -> str:
# Stable : même valeur → même token
existing = next(
(k for k, v in self._token_to_value.items() if v == value),
None,
)
if existing:
return existing
self._counter[entity_type] = self._counter.get(entity_type, 0) + 1
token = f"[{entity_type}_{self._counter[entity_type]:03d}]"
self._token_to_value[token] = value
return token
def reverse(self, text: str) -> str:
for token, value in self._token_to_value.items():
text = text.replace(token, value)
return text
def pseudonymize_for_external_llm(prompt: str, classification: str) -> tuple[str, PIIMapperReversible]:
"""Pseudonymise PII si on route vers LLM externe (non-EU ou non-corporate)."""
# Pas de pseudonymisation si on reste sur infra interne
if classification == "secret":
return prompt, PIIMapperReversible() # ne va pas sortir de toute façon
mapper = PIIMapperReversible()
pii_types = ["EMAIL_ADDRESS", "PHONE_NUMBER", "PERSON", "FR_NIR", "IBAN_CODE", "CREDIT_CARD"]
results = analyzer.analyze(text=prompt, language="fr", entities=pii_types)
pseudonymized = prompt
for r in sorted(results, key=lambda x: x.start, reverse=True):
original = prompt[r.start:r.end]
token = mapper.get_token(r.entity_type, original)
pseudonymized = pseudonymized[:r.start] + token + pseudonymized[r.end:]
return pseudonymized, mapper
# Usage dans pipeline
async def safe_llm_call(prompt: str):
# Couche 1 : classification
classification = classifier.classify(prompt)["level"]
# Couche 2 : pseudonymisation si externe
provider = select_provider(classification)
if provider == "openai_api":
# Externe non-EU : pseudonymiser
prompt_safe, mapper = pseudonymize_for_external_llm(prompt, classification)
else:
# EU ou interne : pas de pseudonymisation nécessaire
prompt_safe = prompt
mapper = None
# LLM call
response = await provider.complete(prompt_safe)
# Reverse substitution si pseudonymisé
if mapper:
response = mapper.reverse(response)
return responseCouche 3, RAG security layer
Indexation label-aware
# rag_indexing.py
from msgraph_core import GraphClient
from chromadb import PersistentClient
graph_client = GraphClient(...)
chroma_client = PersistentClient(path="./chroma_db")
collection = chroma_client.get_or_create_collection("zerodaysupport_rag")
async def index_sharepoint_docs():
"""Indexer documents SharePoint avec sensitivity label propagé."""
docs = await graph_client.get(f"/sites/{site_id}/drive/root/children")
for doc in docs.value:
# Lire sensitivity label via Graph API
item = await graph_client.get(
f"/sites/{site_id}/drive/items/{doc.id}",
params={"$expand": "extensions"},
)
sensitivity_label = extract_sensitivity_label(item) # ex: "Confidential"
tenant_id = extract_tenant_id(item)
# Charger contenu
content = await graph_client.get(f"/sites/{site_id}/drive/items/{doc.id}/content")
text = extract_text(content)
# Chunker
chunks = split_into_chunks(text)
# Indexer avec metadata complète
for i, chunk in enumerate(chunks):
collection.upsert(
ids=[f"{doc.id}_chunk_{i}"],
documents=[chunk],
metadatas=[{
"doc_id": doc.id,
"doc_name": doc.name,
"sensitivity_label": sensitivity_label,
"tenant_id": tenant_id,
"source": "sharepoint",
"indexed_at": datetime.utcnow().isoformat(),
}],
)Retrieval avec filtre clearance
SENSITIVITY_HIERARCHY = {
"Public": 0,
"Internal": 1,
"Confidential": 2,
"Highly Confidential": 3,
"Secret": 4,
}
async def get_user_clearance(user_id: str) -> int:
"""Récupère niveau de clearance utilisateur depuis Entra ID groups."""
user = await graph_client.get(f"/users/{user_id}/memberOf")
groups = [g.displayName for g in user.value]
if "Secret_Cleared" in groups:
return SENSITIVITY_HIERARCHY["Secret"]
if "Highly_Confidential_Cleared" in groups:
return SENSITIVITY_HIERARCHY["Highly Confidential"]
if "Confidential_Cleared" in groups:
return SENSITIVITY_HIERARCHY["Confidential"]
if "Internal_Cleared" in groups:
return SENSITIVITY_HIERARCHY["Internal"]
return SENSITIVITY_HIERARCHY["Public"]
async def query_rag_label_aware(
query: str,
user_id: str,
tenant_id: str,
k: int = 5,
) -> dict:
"""Retrieval avec filtrage tenant + clearance."""
user_clearance = await get_user_clearance(user_id)
user_clearance_label = next(
label for label, level in SENSITIVITY_HIERARCHY.items()
if level == user_clearance
)
# Récupérer chunks pertinents
raw_results = collection.query(
query_texts=[query],
n_results=k * 2, # over-fetch pour permettre filtering
where={"tenant_id": tenant_id}, # immutable côté serveur
)
# Filtrer par clearance
filtered = []
blocked_count = 0
for chunk_doc, metadata in zip(raw_results["documents"][0], raw_results["metadatas"][0]):
chunk_label = metadata.get("sensitivity_label", "Public")
chunk_level = SENSITIVITY_HIERARCHY.get(chunk_label, 0)
if chunk_level <= user_clearance:
filtered.append({"document": chunk_doc, "metadata": metadata})
else:
blocked_count += 1
# Truncate au top-k
filtered = filtered[:k]
# Audit
if blocked_count > 0:
await log_clearance_filter({
"user_id": user_id,
"user_clearance": user_clearance_label,
"query": query,
"blocked_chunks": blocked_count,
"delivered_chunks": len(filtered),
})
# Calcul max label dans contexte (pour routing LLM)
max_label_in_context = max(
(SENSITIVITY_HIERARCHY.get(c["metadata"]["sensitivity_label"], 0) for c in filtered),
default=0,
)
return {
"chunks": filtered,
"max_label_level": max_label_in_context,
"blocked_count": blocked_count,
}Routing LLM par max label
async def chat_with_rag(query: str, user_id: str, tenant_id: str):
# 1. Retrieval label-aware
rag_result = await query_rag_label_aware(query, user_id, tenant_id)
# 2. Routing selon max label dans contexte
max_level = rag_result["max_label_level"]
if max_level >= SENSITIVITY_HIERARCHY["Highly Confidential"]:
provider = "azure_openai_eu" # ou on-prem
elif max_level >= SENSITIVITY_HIERARCHY["Confidential"]:
provider = "azure_openai_eu"
else:
provider = "openai_api" # ou Azure standard
# 3. Build context et appeler
context_text = "\n\n".join(c["document"] for c in rag_result["chunks"])
response = await call_llm(
provider=provider,
system="Tu es un assistant. Réponds uniquement avec les sources fournies.",
user=f"Sources:\n{context_text}\n\nQuestion: {query}",
)
# 4. Output max_label = max des sources
response_metadata = {
"max_source_label": max_level,
"sources_used": [c["metadata"]["doc_id"] for c in rag_result["chunks"]],
}
return {
"answer": response,
"metadata": response_metadata,
}Couche 4, Output scanning
Pipeline 4 sous-couches
# output_scanner.py
import re
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
class OutputScanner:
def __init__(self):
self.internal_patterns = self._load_internal_patterns()
self.allowed_domains = {"zerodaysupport.com", "support.zerodaysupport.com"}
def scan_and_filter(
self,
response: str,
user_role: str,
user_request: str,
tenant_id: str,
) -> dict:
result = {
"original": response,
"filtered": response,
"alerts": [],
"blocked": False,
}
# Sub-couche 1 : PII detection
result = self._scan_pii(result, user_role, user_request)
# Sub-couche 2 : Internal markers
result = self._scan_internal_markers(result, user_role)
# Sub-couche 3 : Cross-tenant leak
result = self._scan_cross_tenant(result, tenant_id)
# Sub-couche 4 : Egress markers
result = self._scan_egress(result)
return result
def _scan_pii(self, result: dict, user_role: str, user_request: str) -> dict:
"""Redact PII non fournis par user et selon role."""
text = result["filtered"]
# PII fournis par l'user dans son prompt → préserver dans réponse
user_pii_results = analyzer.analyze(text=user_request, language="fr",
entities=PII_ENTITIES)
user_pii_values = {user_request[r.start:r.end] for r in user_pii_results}
# Detect PII dans response
response_pii = analyzer.analyze(text=text, language="fr", entities=PII_ENTITIES)
for r in sorted(response_pii, key=lambda x: x.start, reverse=True):
value = text[r.start:r.end]
# Skip si fourni par user
if value in user_pii_values:
continue
# Skip si user a clearance pour ce type de PII
if user_role in ["admin", "data_officer"]:
continue
# Redact
text = text[:r.start] + f"[{r.entity_type}_REDACTED]" + text[r.end:]
result["alerts"].append(f"PII redacted: {r.entity_type}")
result["filtered"] = text
return result
def _scan_internal_markers(self, result: dict, user_role: str) -> dict:
"""Detect mentions internes (codes promo, projets confidentiels, URLs admin)."""
text = result["filtered"]
for pattern_name, pattern in self.internal_patterns.items():
if re.search(pattern, text):
if user_role not in ["employee", "admin"]:
# User externe ne doit pas voir ces patterns
text = re.sub(pattern, "[INTERNAL_REDACTED]", text)
result["alerts"].append(f"Internal marker redacted: {pattern_name}")
result["filtered"] = text
return result
def _scan_cross_tenant(self, result: dict, tenant_id: str) -> dict:
"""Detect mentions d'autres tenants dans la réponse."""
text = result["filtered"]
# Patterns tenant_id explicites
tenant_patterns = re.findall(r"tenant[_-]?id\s*[=:]\s*['\"]?([a-zA-Z0-9_-]+)", text)
for found_tenant in tenant_patterns:
if found_tenant != tenant_id:
result["blocked"] = True
result["alerts"].append(f"CRITICAL: cross-tenant leak detected ({found_tenant})")
# Block complet
result["filtered"] = "Désolé, je ne peux pas répondre à cette demande. (incident loggé)"
return result
return result
def _scan_egress(self, result: dict) -> dict:
"""Bloquer URLs externes, markdown images, encoded data."""
text = result["filtered"]
# URLs externes
def url_replacer(match):
url = match.group(0)
domain_match = re.search(r"https?://([^/\s]+)", url)
if not domain_match:
return url
domain = domain_match.group(1)
if any(d in domain for d in self.allowed_domains):
return url
result["alerts"].append(f"External URL blocked: {domain}")
return "[URL_EXTERNE_BLOQUÉE]"
text = re.sub(r"https?://\S+", url_replacer, text)
# Markdown images (vecteur exfil)
if re.search(r"!\[.*?\]\(.*?\)", text):
text = re.sub(r"!\[.*?\]\(.*?\)", "[IMAGE_BLOQUÉE]", text)
result["alerts"].append("Markdown image blocked")
# Long base64 (potentiel exfil)
long_b64 = re.findall(r"[A-Za-z0-9+/]{100,}={0,3}", text)
if long_b64:
result["alerts"].append(f"Long base64 detected ({len(long_b64)} occurrences)")
# Optionnel : block si pattern suspect
result["filtered"] = text
return result
def _load_internal_patterns(self):
return {
"internal_promo_code": r"\bEMP\d{4}-[A-Z]+\b",
"admin_url": r"\bhttps?://internal\.[a-z]+\.[a-z]{2,3}/admin",
"internal_project_alpha": r"\bprojet[_-]?alpha\b",
"ma_2026": r"\bMA[_-]?2026\b",
# Customiser selon votre org
}
PII_ENTITIES = ["EMAIL_ADDRESS", "PHONE_NUMBER", "PERSON", "FR_NIR", "IBAN_CODE", "CREDIT_CARD"]
# Usage
scanner = OutputScanner()
@app.post("/chat")
async def chat(req: ChatRequest, current_user):
# Pipeline complet
classified = classifier.classify(req.message)
rag_result = await query_rag_label_aware(req.message, current_user.id, current_user.tenant_id)
response_raw = await call_llm(...)
# Output scanning
scan_result = scanner.scan_and_filter(
response=response_raw,
user_role=current_user.role,
user_request=req.message,
tenant_id=current_user.tenant_id,
)
if scan_result["blocked"]:
# Logger incident critical
await log_critical_incident({
"request_id": req.id,
"user_id": current_user.id,
"alerts": scan_result["alerts"],
"raw_response": response_raw[:500],
})
return {"answer": scan_result["filtered"]}
# Logger alerts non-blocking
if scan_result["alerts"]:
await log_alerts(scan_result["alerts"])
return {"answer": scan_result["filtered"]}Stack par environnement corporate
Microsoft 365 / Azure
[Pipeline DLP IA, stack Microsoft]
Couche 1 (classification) : Microsoft Purview SIT + Sensitivity labels
Couche 2 (redaction) : Presidio + Purview labels propagation
Couche 3 (RAG) : Azure AI Search avec security filters + Graph API labels
Couche 4 (output) : Presidio + Purview SIT scan
Couche 5 (egress) : Microsoft Defender for Cloud Apps
Avantages : intégration native, propagation labels automatique
AWS
[Pipeline DLP IA, stack AWS]
Couche 1 : Amazon Macie pour scan données sensibles
Couche 2 : Presidio + KMS-encrypted secrets store
Couche 3 : OpenSearch / Pinecone avec metadata filters
Couche 4 : Macie + custom regex
Couche 5 : AWS Network Firewall + custom egress
Avantages : ecosystem AWS, IAM intégration
Google Cloud
[Pipeline DLP IA, stack GCP]
Couche 1 : Google Cloud DLP API (info types)
Couche 2 : Presidio + Cloud DLP transformations
Couche 3 : Vertex AI Search avec security filters
Couche 4 : Cloud DLP scan response
Couche 5 : Cloud Armor egress + custom
Avantages : ecosystem GCP, Vertex AI native
Multi-cloud / agnostique
[Pipeline DLP IA, stack open-source]
Couche 1 : Presidio + classifier maison fine-tuné
Couche 2 : Presidio anonymizer + mapper réversible
Couche 3 : ChromaDB / Qdrant / Weaviate + metadata filters
Couche 4 : Presidio + regex custom + Lakera Guard (optionnel)
Couche 5 : Custom egress middleware
Avantages : portable, pas de vendor lock-in, gratuit
Audit DLP, 5 dimensions
Dimension 1, Coverage
# audit_coverage.py
async def audit_dlp_coverage():
"""Vérifier que tous les flux LLM passent par DLP."""
findings = []
# Inventaire endpoints LLM
endpoints = await discover_llm_endpoints()
for ep in endpoints:
# Vérifier que les 5 couches sont actives
coverage = {
"pre_prompt_classification": check_layer_active(ep, "classifier"),
"pre_prompt_redaction": check_layer_active(ep, "redaction"),
"rag_security": check_layer_active(ep, "rag_filter"),
"output_scanning": check_layer_active(ep, "output_scanner"),
"egress_prevention": check_layer_active(ep, "egress"),
}
missing = [layer for layer, active in coverage.items() if not active]
if missing:
findings.append({
"endpoint": ep["url"],
"missing_layers": missing,
"severity": "high" if "output_scanning" in missing else "medium",
})
return findingsDimension 2, Précision
async def measure_dlp_precision():
"""TPR vs FPR sur sample représentatif."""
# Test corpus annoté manuellement
# 500 prompts publics + 500 confidentiels
test_corpus = load_annotated_test_corpus()
results = {"tp": 0, "fp": 0, "tn": 0, "fn": 0}
for sample in test_corpus:
classification = classifier.classify(sample["text"])["level"]
actual = sample["label"]
is_sensitive_predicted = classification != "public"
is_sensitive_actual = actual != "public"
if is_sensitive_predicted and is_sensitive_actual:
results["tp"] += 1
elif is_sensitive_predicted and not is_sensitive_actual:
results["fp"] += 1
elif not is_sensitive_predicted and not is_sensitive_actual:
results["tn"] += 1
else:
results["fn"] += 1
tpr = results["tp"] / (results["tp"] + results["fn"])
fpr = results["fp"] / (results["fp"] + results["tn"])
return {
"tpr": tpr,
"fpr": fpr,
"details": results,
"verdict": "PASS" if tpr > 0.9 and fpr < 0.05 else "FAIL",
}Dimension 3, Latence
async def benchmark_dlp_latency(n: int = 1000):
"""Mesurer latence ajoutée par chaque couche."""
samples = load_test_samples(n)
timings = {layer: [] for layer in ["classification", "redaction", "rag", "output", "egress"]}
for sample in samples:
for layer in timings:
start = time.time()
await execute_layer(layer, sample)
timings[layer].append((time.time() - start) * 1000)
return {
layer: {
"p50": np.percentile(times, 50),
"p95": np.percentile(times, 95),
"p99": np.percentile(times, 99),
}
for layer, times in timings.items()
}
# Cibles
LATENCY_TARGETS = {
"classification": {"p95": 100}, # ms
"redaction": {"p95": 200},
"rag": {"p95": 200},
"output": {"p95": 100},
"egress": {"p95": 50},
"total": {"p95": 400},
}Dimension 4, Conformité
Mapping checklist par régulation :
## RGPD coverage
- [x] Détection PII (Presidio)
- [x] Pseudonymisation upstream
- [x] Audit logs pseudonymisés
- [x] Right to erasure procédure
## EU AI Act coverage (high-risk)
- [x] Logging Art. 12
- [x] Documentation technique Art. 11
- [x] Cybersecurity Art. 15
- [ ] Conformity assessment (en cours)
## HDS (santé)
- [x] Hébergement HDS Azure region EU
- [x] Routing data médicales vers HDS-certified seulement
## PCI-DSS (cartes paiement)
- [x] Detection credit_card pattern
- [x] Block transmission vers tier non PCIDimension 5, Tests adversariaux
# adversarial_tests.py
ADVERSARIAL_PAYLOADS = [
# PII obfusqué
"Contact me at j ohn @ ex ample . com",
"Téléphone : 06 1 2 3 4 5 6 7 8",
# Encoding tricks
"My SSN is " + base64.b64encode(b"123-45-6789").decode(),
"API key: " + " ".join("sk-abc123def456ghi789jkl012"), # spaced
# Paraphrase
"Mon employeur a été ZerodaySupport, j'y travaille depuis 5 ans, mon manager est Jean Dupont",
# Langues low-resource
"মেরা ইমেইল হল jane.smith@example.com", # Bengali
# Obfuscation visuelle
"API key: skABCD" + "_REDACTED_HEXSTRING_TOKEN" + "123",
]
async def test_adversarial_detection():
results = []
for payload in ADVERSARIAL_PAYLOADS:
classification = classifier.classify(payload)["level"]
results.append({
"payload": payload[:50] + "...",
"classified_as": classification,
"should_be_sensitive": True,
"passed": classification != "public",
})
pass_rate = sum(r["passed"] for r in results) / len(results)
return {"results": results, "pass_rate": pass_rate}Audit complet
| Dimension | Cible | Mesure |
|---|---|---|
| Coverage | 100% endpoints | Audit script automatisé mensuel |
| Précision TPR | > 90% | Test corpus annoté trimestriel |
| Précision FPR | < 5% | Idem |
| Latence p95 totale | < 400ms | Benchmark continu |
| Conformité | Checklist par régulation | Audit semestriel |
| Tests adversariaux | > 70% catch rate | Trimestriel |
Erreurs récurrentes
Erreur 1, Pas de classification = routing aveugle
Tous les prompts vers OpenAI standard sans distinction. Classification + routing obligatoire.
Erreur 2, Output scanning oublié
Focus uniquement sur input. Mais la fuite via output est plus grave (user voit le résultat). Output scanning aussi critique.
Erreur 3, Sensitivity labels non propagés
Microsoft Purview labels existent dans SharePoint, mais le pipeline LLM les ignore. Propager bout-en-bout.
Erreur 4, Filtre tenant uniquement dans le prompt
Demander au LLM de filtrer par tenant via system prompt = casseable. Filter immutable côté serveur (RLS / metadata DB).
Erreur 5, Pseudonymisation systématique
Pseudonymiser même pour LLM EU sans nécessité = perte d'information sans bénéfice. Conditionnel selon classification + provider.
Erreur 6, Pas d'audit régulier
Déployé, jamais re-évalué. Drift entre attendu et réel. Audit trimestriel + KPIs trend.
Erreur 7, Tests adversariaux absents
Patterns simples seulement. Vrais attaquants utilisent obfuscation, encoding. Test adversarial trimestriel.
Erreur 8, Pas d'intégration corporate DLP existante
Réinventer la roue. Microsoft Purview / AWS Macie déjà en place. Intégrer plutôt que paralleler.
Ce que vous devriez retenir
- DLP architectural ≠ DLP endpoint : les deux nécessaires, complémentaires
- 5 couches inline : classification → redaction → RAG security → output scan → egress
- Classification 3 méthodes empilées : règles statiques + DLP enterprise + ML classifier
- Routing LLM par classification : public→OpenAI, internal→Azure EU, secret→on-prem
- Sensitivity labels propagés depuis source documents jusqu'à response
- Output scanning 4 sub-couches : PII + internal markers + cross-tenant + egress
- Stack par ecosystem : Microsoft (Purview), AWS (Macie), GCP (Cloud DLP), open-source (Presidio + maison)
- Audit 5 dimensions : coverage + precision + latence + conformité + adversarial
- Latence cible : < 400ms p95 total ajouté
- Précision cible : TPR > 90%, FPR < 5%
Une DLP IA architecturale mature 2026 demande rigueur méthodique et intégration ecosystem corporate. Sans elle, votre app LLM est un canal d'exfiltration en attente. Avec elle, vous avez le filet de sécurité applicatif qui complète guardrails + observability + zero-trust.
Pour aller plus loin : pour le RAG sécurisé spécifiquement : empêcher l'exfiltration de données sensibles via chatbot RAG. Pour le shadow AI / endpoint : empêcher un employé de coller du code source dans ChatGPT.







