La détection d'abus LLM en temps réel est, en 2026, la couche comportementale de la défense, celle qui complète les guardrails (per-request) et le rate limiting (quotas) en observant les patterns cumulés sur des fenêtres temporelles plus longues. Elle attrape ce que les autres couches ratent : slow extraction, multi-turn Crescendo jailbreaks, comptes compromis exploitant un session valide, scraping graduel pour model stealing, DoW à bas bruit. Cet article documente les 12 signaux faibles observables (volume, pattern, modèle, temporel, auth, output), l'architecture hybride rules + ML (Isolation Forest, autoencoders, scoring agrégé), la stack streaming (Kafka + Flink/Faust + Redis features), 5 stratégies pour réduire le bruit et éviter de saturer le SOC, et l'intégration SOC enterprise (formats CEF/OCSF, MITRE ATLAS mapping, playbooks SOAR). Cible : SOC analysts montant en compétence sur menaces IA, AI engineers structurant la surveillance runtime, RSSI alignant détection LLM avec dispositif cyber existant.
Pour la couche logs sur laquelle s'appuie la détection : logging et observabilité d'un système LLM en production. Pour la détection prompt injection per-request : détecter une tentative de prompt injection en temps réel.
Pourquoi la détection comportementale en plus des guardrails
Ce que les guardrails ratent
Les guardrails opèrent per-request : "ce prompt est-il une injection ? cette réponse contient-elle PII ?". Ils ne voient pas :
| Cas | Pourquoi guardrails ratent |
|---|---|
| Slow extraction (model stealing) | Chaque query est légitime isolément |
| Crescendo multi-turn | Tours 1-9 sont bénins, le tour 10 est ambigu |
| Compte compromis post-login | L'auth est OK, les requêtes passent les filtres |
| Scraping graduel pour dataset | Volume élevé mais sans pattern d'attaque per-request |
| DoW à bas bruit | Sous le seuil rate limit, mais sustained sur 7j |
| Bot mimicking human | Patterns subtilement répétitifs |
→ Couche comportementale nécessaire en complément.
MITRE ATLAS référentiel
ATLAS (Adversarial Threat Landscape for Artificial-Intelligence Systems), équivalent ATT&CK pour systèmes IA. Tactiques observables uniquement par comportement :
- Reconnaissance (AML.TA0002), discovery du modèle, des limits
- Resource Development (AML.TA0004), extraction pour build clone
- ML Model Access (AML.TA0007), usage abusif d'un compte légitime
- Exfiltration (AML.TA0010), sortie progressive de données via outputs
Toutes nécessitent détection comportementale (pattern sur N requêtes), pas per-request.
Les 12 signaux faibles, inventaire
Volume (signaux 1-2)
Signal 1, Requêtes/h × baseline user
# Baseline calculé sur 7-30 jours par user
baseline_req_per_hour = compute_baseline(user_id, "req_per_hour", days=14)
# Si actuel > 3-10× baseline
current = count_requests_last_hour(user_id)
if current > baseline_req_per_hour * 5:
flag(user_id, "volume_anomaly", severity="medium")Signal 2, Tokens/h × baseline
Pareil mais sur tokens (input + output). Plus précis que requêtes (un user peut faire moins de requêtes mais plus longues).
Pattern (signaux 3-5)
Signal 3, Entropy queries élevée (scraping diversity)
import math
from collections import Counter
def query_entropy(queries: list[str]) -> float:
"""Entropie de Shannon des n-grams des queries."""
ngrams = []
for q in queries:
words = q.split()
ngrams.extend(zip(words, words[1:])) # bigrams
counts = Counter(ngrams)
total = sum(counts.values())
return -sum((c/total) * math.log2(c/total) for c in counts.values())
# User normal : entropy modérée (sujets variés mais répétés)
# Scraper : entropy très élevée (toutes queries différentes)
queries_24h = get_user_queries(user_id, hours=24)
if len(queries_24h) > 50 and query_entropy(queries_24h) > 8.0:
flag(user_id, "high_diversity", severity="medium")Signal 4, Entropy queries faible (brute-force / paramétrique)
# Paramétrique : queries identiques avec petites variations
# Pattern : "What is the password for user_001 ?", "What is the password for user_002 ?"
def detect_parametric(queries: list[str]) -> bool:
if len(queries) < 20:
return False
# Calculer similarity moyenne 2-by-2
similarities = []
for i in range(min(len(queries), 50)):
for j in range(i + 1, min(len(queries), 50)):
similarities.append(jaccard_similarity(queries[i], queries[j]))
avg_sim = sum(similarities) / len(similarities)
return avg_sim > 0.7 # high similarity = parametric patternSignal 5, Length distribution drift
User normal : prompts de longueur variable, distribution stable. User attaquant : soudain tous les prompts sont 30k tokens (context abuse).
import numpy as np
from scipy import stats
def length_drift(user_id: str) -> float:
historical = get_user_prompt_lengths(user_id, days_ago=(7, 30))
recent = get_user_prompt_lengths(user_id, hours=1)
if len(recent) < 5:
return 0
# KS test pour drift
statistic, p_value = stats.ks_2samp(historical, recent)
return statistic if p_value < 0.05 else 0Modèle (signaux 6-7)
Signal 6, Bascule modèle premium
# User avait pattern : 80% gpt-4o-mini, 20% gpt-4o
# Soudain : 100% gpt-4o
def model_drift(user_id: str) -> float:
historical_dist = get_model_distribution(user_id, days_ago=(1, 30))
recent_dist = get_model_distribution(user_id, hours=1)
return jensen_shannon_divergence(historical_dist, recent_dist)Signal 7, max_tokens systématiquement maxé
# Pattern attaquant DoW : toujours max_tokens=4000 pour maximiser cost
def max_tokens_pattern(user_id: str) -> bool:
recent = get_user_requests(user_id, hours=1)
if len(recent) < 10:
return False
maxed = sum(1 for r in recent if r["max_tokens"] >= 3500)
return maxed / len(recent) > 0.9Temporel (signaux 8-9)
Signal 8, Activité 24/7 (bot signal)
def activity_24_7(user_id: str) -> bool:
"""Humain a des heures de creux, bot non."""
activity_by_hour = get_hourly_activity(user_id, days=7)
# Sur 24 buckets * 7 jours = 168 buckets
# Si > 90% buckets ont activité → suspect
active_buckets = sum(1 for v in activity_by_hour.values() if v > 0)
return active_buckets / 168 > 0.9Signal 9, Burst patterns réguliers (cron-like)
def cron_pattern(user_id: str) -> bool:
"""Bot avec interval régulier."""
timestamps = get_request_timestamps(user_id, hours=24)
if len(timestamps) < 50:
return False
intervals = [timestamps[i+1] - timestamps[i] for i in range(len(timestamps)-1)]
std_dev = np.std(intervals)
mean_interval = np.mean(intervals)
# Coefficient de variation faible = très régulier
cv = std_dev / mean_interval if mean_interval > 0 else 0
return cv < 0.1Auth (signaux 10-11)
Signal 10, Nouveau pays/IP/device sans MFA
def auth_anomaly(user_id: str, current_request) -> bool:
profile = get_user_profile(user_id)
is_new = (
current_request.ip_country not in profile.known_countries
or current_request.device_fingerprint not in profile.known_devices
)
return is_new and not current_request.mfa_just_validatedSignal 11, Volume immédiat post-login
def immediate_high_volume(user_id: str, login_ts: int) -> bool:
"""Humain explore d'abord (low volume), bot attaque immédiatement."""
seconds_since_login = time.time() - login_ts
requests_since_login = count_requests_since(user_id, login_ts)
if seconds_since_login < 60 and requests_since_login > 30:
return True
return FalseOutput (signal 12)
Signal 12, Ratio refusals soudain (jailbreak attempts)
def refusal_spike(user_id: str) -> float:
"""Si user reçoit beaucoup de refus = il pousse les limites."""
historical_refusal_rate = get_refusal_rate(user_id, days_ago=(1, 30))
recent_refusal_rate = get_refusal_rate(user_id, hours=1)
if recent_refusal_rate > 0.2 and recent_refusal_rate > historical_refusal_rate * 5:
return recent_refusal_rate
return 0Architecture rules + ML hybride
Couche 1, Rules pour le connu
# rules.py
RULES = [
{
"id": "R001_high_volume",
"description": "Volume requests > 5× baseline",
"check": lambda f: f["req_per_hour"] > f["baseline_req_per_hour"] * 5,
"severity_score": 30,
},
{
"id": "R002_dow_signature",
"description": "DoW signature: max_tokens always maxed + cost spike",
"check": lambda f: f["max_tokens_ratio"] > 0.9 and f["cost_per_hour"] > 50,
"severity_score": 80,
},
{
"id": "R003_extraction",
"description": "Model extraction: high entropy + high volume",
"check": lambda f: f["query_entropy"] > 8.0 and f["req_per_hour"] > 100,
"severity_score": 70,
},
{
"id": "R004_credential_stuffing_followed_burst",
"description": "Login from new country + immediate high volume",
"check": lambda f: f["new_country_login"] and f["volume_post_login_60s"] > 30,
"severity_score": 90,
},
# ...
]
def evaluate_rules(features: dict) -> dict:
triggered = []
total_score = 0
for rule in RULES:
if rule["check"](features):
triggered.append(rule["id"])
total_score += rule["severity_score"]
return {"rules_triggered": triggered, "rules_score": min(100, total_score)}Couche 2, ML anomaly detection
from sklearn.ensemble import IsolationForest
import joblib
# Training (offline, hebdo)
def train_anomaly_detector():
# Features par user, sample 30 derniers jours, exclu users marqués abus
df = build_user_feature_dataset(days=30, exclude_abusive=True)
X = df[FEATURE_COLUMNS] # ~50 features
model = IsolationForest(
contamination=0.01, # 1% attendu anomalies
n_estimators=200,
random_state=42,
)
model.fit(X)
joblib.dump(model, "/models/anomaly_v3.pkl")
# Inference (online)
class AnomalyScorer:
def __init__(self):
self.model = joblib.load("/models/anomaly_v3.pkl")
def score(self, features: dict) -> float:
X = vectorize_features(features)
# Isolation Forest : -1 anomalie, 1 normal
# Convertir en 0-100 score
raw = self.model.decision_function([X])[0]
# raw typiquement entre -0.5 (anomalie forte) et 0.5 (normal)
return max(0, min(100, (0.5 - raw) * 100))Score agrégé et actions graduées
def compute_risk_score(user_id: str) -> dict:
features = compute_features(user_id, windows=["1m", "1h", "24h", "7d"])
rules_eval = evaluate_rules(features)
ml_score = anomaly_scorer.score(features)
# Combinaison pondérée
risk_score = max(rules_eval["rules_score"], ml_score)
return {
"risk_score": risk_score,
"rules_triggered": rules_eval["rules_triggered"],
"ml_score": ml_score,
"features": features,
}
def take_action(user_id: str, risk: dict):
score = risk["risk_score"]
if score < 30:
# Normal, log seulement
pass
elif score < 60:
# Suspicious, log intensif + monitor
increase_logging(user_id)
notify_security_log("low_severity", user_id, risk)
elif score < 80:
# High, MFA challenge
require_mfa_next_request(user_id)
notify_security_log("medium_severity", user_id, risk)
else:
# Critical, block + escalate
temp_block_user(user_id, duration_minutes=30)
create_soc_incident(user_id, risk, severity="HIGH")Architecture streaming temps réel
Stack typique
[Application logs JSON]
│
▼
[Kafka topic: llm-events] ← throughput 10-100k events/sec
│
▼
[Stream processor] ← Flink, Faust, Kafka Streams
│ Calcule features rolling :
│ - req/h, tokens/h, entropy, drift, ...
│ par user, par 1m / 1h / 24h windows
▼
[Redis: user features] ← état temps réel par user_id
│
▼
[Detection engine]
├── Rules engine ← signatures, seuils
├── ML scorer (FastAPI) ← Isolation Forest
└── Risk score agrégé
│
▼
[Action dispatcher]
├── Block / MFA / log ← actions auto
└── Alert SIEM / SOC ← humain
│
▼
[SIEM: Splunk / Sentinel / Elastic] ← corrélation entreprise
Implémentation simplifiée (Python + Faust)
import faust
app = faust.App("llm-anomaly-detector", broker="kafka://kafka:9092")
events_topic = app.topic("llm-events", value_type=LlmEvent)
# State : features par user
class UserFeatures(faust.Record):
user_id: str
req_count_1h: int = 0
req_count_24h: int = 0
tokens_1h: int = 0
cost_1h: float = 0.0
last_models: list = []
last_query_entropy: float = 0.0
last_check_at: float = 0.0
user_features_table = app.Table(
"user_features",
default=lambda: UserFeatures(user_id="", req_count_1h=0),
)
@app.agent(events_topic)
async def process_events(events):
async for event in events:
# Update features rolling
f = user_features_table[event.user_id]
f.req_count_1h += 1
f.tokens_1h += event.tokens_total
f.cost_1h += event.cost_usd
f.last_models.append(event.model)
if len(f.last_models) > 50:
f.last_models = f.last_models[-50:]
# Check toutes les 30s par user
if time.time() - f.last_check_at > 30:
risk = compute_risk_score(event.user_id, f)
if risk["risk_score"] > 60:
await take_action(event.user_id, risk)
f.last_check_at = time.time()
user_features_table[event.user_id] = fRéduire le bruit pour ne pas saturer le SOC
Stratégie 1, Risk scoring agrégé
Jamais alerter sur 1 signal. Combiner 3+ ou ML score > seuil.
Stratégie 2, Tuning baseline par profil
USER_PROFILES = {
"consumer": {"req_per_hour_threshold": 50, "cost_threshold": 1.0},
"power_user": {"req_per_hour_threshold": 200, "cost_threshold": 10.0},
"developer": {"req_per_hour_threshold": 1000, "cost_threshold": 50.0},
"service_account": {"req_per_hour_threshold": 10000, "cost_threshold": 500.0},
}
def get_thresholds(user_id: str) -> dict:
profile = classify_user_profile(user_id)
return USER_PROFILES[profile]Stratégie 3, Suppression alertes en cluster
# Si 100 users déclenchent même rule en 5 min = pattern global
def cluster_alerts(alerts: list) -> list:
clusters = group_by(alerts, key=lambda a: a["rule_id"], time_window=300)
grouped_alerts = []
for rule_id, cluster in clusters.items():
if len(cluster) > 10:
# Agrégé en 1 incident
grouped_alerts.append({
"type": "global_pattern",
"rule_id": rule_id,
"user_count": len(cluster),
"users": [a["user_id"] for a in cluster][:20],
})
else:
grouped_alerts.extend(cluster)
return grouped_alertsStratégie 4, Feedback loop ML
def record_alert_feedback(alert_id: str, verdict: str):
"""SOC marque true_positive / false_positive."""
db.alerts.update(alert_id, verdict=verdict, reviewed_at=now())
def retrain_with_feedback():
"""Hebdo : retrain model avec feedback."""
feedback = db.alerts.find(reviewed=True)
# Adjust threshold or retrain features importance
new_threshold = optimize_threshold(feedback, target_fp_rate=0.1)
update_config("anomaly_threshold", new_threshold)Stratégie 5, Auto-mitigation graduée
Évité 80% des escalades vers SOC humain :
- Score 60-80 → MFA challenge auto (résout 80% des cas légitimes)
- Score 80-95 → temp block + email user + notification SOC asynchrone
- Score > 95 → block immédiat + page SOC on-call
Intégration SOC enterprise
Format alertes standard
OCSF (Open Cybersecurity Schema Framework, alternative moderne CEF) :
{
"metadata": {
"version": "1.0.0",
"product": {"vendor_name": "ZerodayApp", "name": "LLM Anomaly Detector"}
},
"category_uid": 2,
"class_uid": 2007,
"type_uid": 200701,
"severity": "Medium",
"severity_id": 3,
"time": 1746178845000,
"actor": {
"user": {"uid": "user_id_hash_abc123", "type": "User"}
},
"src_endpoint": {
"ip": "203.0.113.42",
"country": "AT"
},
"finding": {
"title": "LLM behavioral anomaly detected",
"uid": "alert_xyz789",
"details": "Risk score 85 - rules: R002_dow_signature, R003_extraction"
},
"raw_data": {...},
"atlas_tactics": ["AML.TA0007", "AML.TA0010"],
"atlas_techniques": ["AML.T0024"]
}MITRE ATLAS mapping
Tagger chaque alerte avec techniques ATLAS observées :
| Pattern | ATLAS |
|---|---|
| Slow extraction | AML.T0024 ML Model Extraction |
| Crescendo jailbreak | AML.T0051 LLM Prompt Injection |
| DoW | AML.T0034 Cost Harvesting |
| Output exfiltration | AML.T0048 LLM Output Manipulation |
| Account takeover | AML.T0012 Valid Accounts |
Permet reporting unifié avec menaces classiques (MITRE ATT&CK) et threat hunting ciblé.
Playbooks SOAR
Pour chaque type alerte LLM, runbook documenté :
# playbook-llm-dow.yml
name: "DoW Suspected"
trigger:
rule_id: R002_dow_signature
severity: HIGH
steps:
- name: "Auto-actions"
actions:
- temp_block_user: 30m
- revoke_active_sessions: true
- notify_user: "Suspicious activity detected on your account"
- name: "Investigation"
automated:
- fetch_last_24h_requests: from=victim_user_id
- fetch_login_history: from=victim_user_id, days=7
- check_password_breach: from=victim_user_email
human_required:
- review_request_patterns
- decide: confirm_compromise / false_positive
- name: "Resolution"
if: confirmed_compromise
actions:
- force_password_reset
- require_mfa_setup
- quarantine_account: 24h
- create_jira_ticket: SEC-CRITICALMétriques de succès du dispositif
KPIs à suivre
| KPI | Cible 2026 |
|---|---|
| MTTD (Mean Time To Detect) abus | < 5 min depuis début comportement |
| MTTR (Mean Time To Respond) | < 15 min pour HIGH, < 1h pour MEDIUM |
| True Positive Rate alertes | > 50% |
| False Positive Rate | < 30% (parmi alertes humaines) |
| Volume alertes/jour | < 20 actionables/SOC analyst |
| Coverage MITRE ATLAS techniques | > 60% des techniques observables |
Reviews
- Hebdo SOC : dashboard true/false positives, ajuster seuils
- Mensuel AI eng + SOC : nouvelles classes d'attaque, retraining ML
- Trimestriel : red team interne pour valider couverture
Ce que ça change pour votre dispositif
Une détection abus LLM mature 2026 :
- 12 signaux faibles observés sur features rolling
- Hybride rules + ML avec score agrégé
- Stack streaming Kafka + processor + Redis features + detection engine
- Actions graduées auto avant escalade humaine
- Intégration SOC native (OCSF, MITRE ATLAS, playbooks SOAR)
- KPIs mesurables (MTTD, MTTR, TPR, coverage)
C'est la couche cybersécurité comportementale qui transforme un dispositif AI security du niveau "guardrails + rate limit" (2024) au niveau "SOC AI-aware avec threat detection mature" (2026 avancé).
Maturité actuelle des entreprises : 60% ont déployé GenAI, < 20% ont SOC AI-aware. Différenciation forte pour qui investit maintenant.
ROI : 1-2 ETP × 2-3 mois pour mise en place initiale, ensuite 0.5 ETP en routine + intégration SOC. Bénéfice : détection précoce d'incidents qui passent les guardrails, conformité EU AI Act / NIST AI RMF, capacité à répondre à audits.
Pour aller plus loin : la couche détection alimente l'incident response IA, playbooks, forensics, communication crise, post-mortem, qui transforme une alerte en résolution mesurée. Sujet du prochain article du cluster.







