Une API LLM en production présente des vulnérabilités spécifiques que les API REST classiques n'ont pas : coût variable par requête (1× à 1000× selon tokens), latence imprévisible (100ms à 30s), facturation directe à l'usage (chaque appel coûte de l'argent réel), amplification interne (recursive tool calling), surface d'extraction (model stealing). Les approches AppSec API classiques (rate limit en req/min, OAuth, OWASP API Top 10) restent nécessaires mais insuffisantes. Cet article documente la stratégie de protection API LLM 2026 : 7 dimensions de rate limiting / quotas à combiner (req/s, tokens/min, coût $/jour, tool calls), 5 mitigations contre le Denial of Wallet (LLM10 OWASP v2), 5 contre le model extraction, 8 contre le credential stuffing spécifique LLM, architecture API Gateway 4 couches (WAF → Gateway → middleware → provider) avec cloud billing en filet ultime. Code Python/Redis/NGINX/Kong concret. Cible : SRE / DevOps / AppSec qui mettent en production une API LLM, AI engineers structurant la couche infra, RSSI fixant les guardrails.
Pour la classe d'attaque DoW : Denial of Wallet (DoW) : épuiser le budget API LLM. Pour la couche LLM elle-même : guardrails LLM efficaces sans dégrader l'UX.
Pourquoi sécuriser une API LLM est différent
Le coût asymétrique des requêtes
Une API REST classique : chaque requête coûte ~la même chose côté infra. GET /users/42 ≈ GET /products/100.
Une API LLM : le coût varie de 1× à 1000× :
| Type requête | Tokens input | Tokens output | Coût (gpt-4o) |
|---|---|---|---|
| "Hello" | 1 | 5 | ~0 € |
| Question simple | 50 | 200 | ~0 € |
| Long context (RAG) | 10000 | 500 | ~0.05 € |
| Max context window | 128000 | 4000 | ~0.65 € |
→ Limiter en req/min seul est insuffisant. Un attaquant qui envoie 30 req/min de 128k tokens chacune coûte 1170 €/heure.
Le coût direct facturé
Chaque appel API LLM est facturé en argent réel (sauf modèle local). Sans rate limit token-aware :
- Compte compromis = budget explosé en heures.
- Bug récursif (agent en boucle) = milliers d'euros / jour.
- Exploitation publique non détectée = budget annuel grillé en jours.
C'est la classe LLM10 Unbounded Consumption (Denial of Wallet, OWASP LLM Top 10 v2 2025).
L'amplification interne (agents)
Un agent IA reçoit 1 requête utilisateur, génère N appels LLM internes (planning, tool selection, ReAct cycles, reflection). Une boucle ReAct mal configurée peut faire 50+ appels pour 1 requête.
→ Le rate limit au niveau API externe n'attrape pas l'amplification interne.
Le model extraction
Un attaquant qui peut interroger librement votre API peut reconstruire un dataset ou cloner le comportement du modèle. Risque IP + concurrent.
Les 7 dimensions de rate limiting / quotas
Vue d'ensemble
| Dimension | Granularité | Cible exemple | Anti |
|---|---|---|---|
| Requêtes/seconde | par user, par IP | 5 req/s | Burst, DDoS |
| Requêtes/minute | par user | 30 req/min | Abuse soutenu |
| Requêtes/jour | par user | 1000 req/jour | Scraping massif |
| Tokens input/min | par user | 50000 t/min | Context window abuse |
| Tokens output/min | par user | 30000 t/min | DoW output |
| Coût $/jour | par user, par org | 9 € user, 900 € org | DoW global |
| Tool calls max | par session | 10 calls | Recursive amplification |
Implémentation Redis sliding window
import time
import redis.asyncio as redis
r = redis.Redis(host="localhost", port=6379)
async def check_rate_limit(
key: str,
max_count: int,
window_seconds: int,
) -> tuple[bool, int]:
"""
Sliding window rate limit avec Redis sorted set.
Retourne (allowed, count_in_window).
"""
now = time.time()
window_start = now - window_seconds
pipe = r.pipeline()
pipe.zremrangebyscore(key, 0, window_start)
pipe.zcard(key)
pipe.zadd(key, {f"{now}:{id(now)}": now})
pipe.expire(key, window_seconds + 1)
_, count, _, _ = await pipe.execute()
if count >= max_count:
return False, count
return True, count + 1
# Application
async def check_user_limits(user_id: str) -> dict:
checks = {
"req_per_second": await check_rate_limit(f"rl:s:{user_id}", 5, 1),
"req_per_minute": await check_rate_limit(f"rl:m:{user_id}", 30, 60),
"req_per_day": await check_rate_limit(f"rl:d:{user_id}", 1000, 86400),
}
blocked = [k for k, (allowed, _) in checks.items() if not allowed]
return {"allowed": len(blocked) == 0, "blocked_dimensions": blocked}Token-aware rate limit
async def check_token_budget(
user_id: str,
estimated_input_tokens: int,
max_output_tokens: int,
) -> bool:
"""Vérifie tokens input/output budget avant appel LLM."""
# Limite par minute
INPUT_TOKENS_PER_MIN = 50000
OUTPUT_TOKENS_PER_MIN = 30000
in_key = f"tokens:in:{user_id}:{int(time.time()) // 60}"
out_key = f"tokens:out:{user_id}:{int(time.time()) // 60}"
current_in = int(await r.get(in_key) or 0)
current_out = int(await r.get(out_key) or 0)
if current_in + estimated_input_tokens > INPUT_TOKENS_PER_MIN:
return False
if current_out + max_output_tokens > OUTPUT_TOKENS_PER_MIN:
return False
# Réserver
pipe = r.pipeline()
pipe.incrby(in_key, estimated_input_tokens)
pipe.incrby(out_key, max_output_tokens)
pipe.expire(in_key, 70)
pipe.expire(out_key, 70)
await pipe.execute()
return True
async def update_actual_token_usage(
user_id: str,
actual_input_tokens: int,
actual_output_tokens: int,
estimated_max_output_tokens: int,
):
"""Réajuste après le vrai usage (refund la différence sur output max)."""
refund = estimated_max_output_tokens - actual_output_tokens
if refund > 0:
out_key = f"tokens:out:{user_id}:{int(time.time()) // 60}"
await r.decrby(out_key, refund)Cost budget $/jour
PRICING = {
"gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
"claude-sonnet-4-6": {"input": 3.00 / 1_000_000, "output": 15.00 / 1_000_000},
"gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
}
async def check_cost_budget(
user_id: str,
org_id: str,
model: str,
input_tokens: int,
estimated_output_tokens: int,
) -> dict:
cost = (
input_tokens * PRICING[model]["input"]
+ estimated_output_tokens * PRICING[model]["output"]
)
user_key = f"cost:user:{user_id}:{datetime.utcnow().date()}"
org_key = f"cost:org:{org_id}:{datetime.utcnow().date()}"
user_spent = float(await r.get(user_key) or 0)
org_spent = float(await r.get(org_key) or 0)
USER_DAILY_BUDGET = 10.0
ORG_DAILY_BUDGET = 1000.0
if user_spent + cost > USER_DAILY_BUDGET:
return {"allowed": False, "reason": "user daily budget exceeded"}
if org_spent + cost > ORG_DAILY_BUDGET:
return {"allowed": False, "reason": "org daily budget exceeded"}
# Soft warn à 80%
if user_spent + cost > USER_DAILY_BUDGET * 0.8:
await alert_soc(f"User {user_id} at 80% daily budget")
return {"allowed": True, "estimated_cost": cost}Tool call budget per request
class RequestBudget:
def __init__(
self,
max_tool_calls: int = 10,
max_total_tokens: int = 50000,
max_duration_s: int = 60,
):
self.max_tool_calls = max_tool_calls
self.max_total_tokens = max_total_tokens
self.max_duration_s = max_duration_s
self.tool_calls = 0
self.total_tokens = 0
self.start = time.time()
def check(self):
if self.tool_calls >= self.max_tool_calls:
raise BudgetExceeded(f"max {self.max_tool_calls} tool calls")
if self.total_tokens >= self.max_total_tokens:
raise BudgetExceeded(f"max {self.max_total_tokens} tokens")
if time.time() - self.start >= self.max_duration_s:
raise BudgetExceeded(f"max {self.max_duration_s}s duration")
def add_tool_call(self):
self.tool_calls += 1
self.check()
def add_tokens(self, n: int):
self.total_tokens += n
self.check()
# Usage dans agent loop
budget = RequestBudget(max_tool_calls=10)
while not done:
budget.check()
response = await llm_call(...)
budget.add_tokens(response.usage.total_tokens)
if response.tool_calls:
for tool_call in response.tool_calls:
budget.add_tool_call()
result = await execute_tool(tool_call)
messages.append(result)Mitigations Denial of Wallet (DoW)
Couche 1, Hard cap budget user/jour avec circuit breaker
Implémentation déjà vue ci-dessus (check_cost_budget). À 80% : alerte SOC + email user. À 100% : block.
Couche 2, max_tokens output systématique côté serveur
async def chat(req: ChatReq):
# JAMAIS faire confiance à req.max_tokens
SERVER_MAX_TOKENS = 1500
response = await openai_client.chat.completions.create(
model=req.model,
messages=req.messages,
max_tokens=SERVER_MAX_TOKENS, # forçage côté serveur
)Couche 3, Détecter inputs longs
def estimate_input_tokens(messages: list[dict]) -> int:
text = " ".join(m["content"] for m in messages if isinstance(m["content"], str))
return len(text) // 4 # approximation rapide
@app.post("/chat")
async def chat(req: ChatReq):
estimated = estimate_input_tokens(req.messages)
if estimated > 50000:
# Cas exceptionnel, demander review
await log_anomaly("large_input", user_id, estimated)
if estimated > 100000:
raise HTTPException(413, "Input too large")Couche 4, Anti-recursive tool calling
Cf RequestBudget ci-dessus. Limite dure sur nombre de tool calls par requête.
Couche 5, Cloud billing alerts (filet ultime)
# AWS Budgets via CDK
aws budgets create-budget \
--account-id $ACCOUNT_ID \
--budget '{
"BudgetName": "OpenAI-Monthly",
"BudgetLimit": {"Amount": "5000", "Unit": "EUR"},
"TimeUnit": "MONTHLY",
"BudgetType": "COST"
}' \
--notifications-with-subscribers '[
{
"Notification": {
"NotificationType": "ACTUAL",
"ComparisonOperator": "GREATER_THAN",
"Threshold": 80
},
"Subscribers": [{"SubscriptionType": "EMAIL", "Address": "soc@example.com"}]
},
{
"Notification": {
"NotificationType": "FORECASTED",
"ComparisonOperator": "GREATER_THAN",
"Threshold": 100
},
"Subscribers": [{"SubscriptionType": "SNS", "Address": "arn:aws:sns:...:incident-response"}]
}
]'Alerte SNS → Lambda qui peut kill l'app si nécessaire.
Mitigations Model Extraction
Détection
async def detect_extraction_attempt(user_id: str, query: str):
# 1. Diversity / entropy de queries
user_queries_24h = await get_user_queries(user_id, hours=24)
if len(set(user_queries_24h)) / len(user_queries_24h) > 0.95 and len(user_queries_24h) > 100:
await flag_user(user_id, "high_query_diversity")
# 2. Patterns systématiques (variations paramétriques)
if detect_parametric_pattern(user_queries_24h):
await flag_user(user_id, "parametric_scraping")
# 3. Volume disproportionné
user_profile = await get_user_profile(user_id)
if len(user_queries_24h) > user_profile.baseline_volume * 10:
await flag_user(user_id, "volume_anomaly")
# 4. User agent / IP suspects
if user_profile.ip in CLOUD_PROVIDER_IPS:
await flag_user(user_id, "cloud_ip")Watermarking outputs
Implémentations 2024-2026 (paper Kirchenbauer et al.) : pendant la génération, biaiser légèrement la distribution sur une "green list" de tokens. Détectable statistiquement sur outputs longs (> 200 tokens) avec p-value < 0.01.
# Pseudo-code (implémentations open-source disponibles)
from watermark import WatermarkProcessor
processor = WatermarkProcessor(
vocab=tokenizer.vocab,
gamma=0.25, # fraction green list
delta=2.0, # logit boost
)
logits_processor = LogitsProcessorList([processor])
output = model.generate(input_ids, logits_processor=logits_processor)
# Détection a posteriori sur output suspect
detector = WatermarkDetector(vocab, gamma=0.25, delta=2.0)
score = detector.detect(suspicious_output)
if score.z_score > 4:
print("Watermark detected with high confidence")Captcha conditionnel
async def maybe_challenge(user_id: str, request):
flags = await get_user_flags(user_id)
if "high_query_diversity" in flags or "parametric_scraping" in flags:
if not request.cookies.get("captcha_solved_recently"):
return require_captcha()
return NoneMitigations Credential Stuffing
MFA obligatoire + HaveIBeenPwned
import httpx
import hashlib
async def check_password_pwned(password: str) -> bool:
sha1 = hashlib.sha1(password.encode()).hexdigest().upper()
prefix, suffix = sha1[:5], sha1[5:]
async with httpx.AsyncClient() as client:
r = await client.get(f"https://api.pwnedpasswords.com/range/{prefix}")
return suffix in r.text
@app.post("/auth/login")
async def login(req: LoginReq):
# 1. Rate limit login
rl = await check_rate_limit(f"login:{req.email}", 10, 3600)
if not rl[0]:
raise HTTPException(429, "Too many attempts")
# 2. Check pwned
if await check_password_pwned(req.password):
await alert_user_password_compromised(req.email)
raise HTTPException(401, "Password found in breach. Reset required.")
# 3. Verify password
user = await verify_credentials(req.email, req.password)
if not user:
await record_failed_attempt(req.email, request.client.host)
raise HTTPException(401)
# 4. Anomaly detection
if await is_new_device_or_location(user, request):
await send_mfa_challenge(user)
return {"mfa_required": True}
# 5. MFA
if user.mfa_enabled:
await send_mfa_challenge(user)
return {"mfa_required": True}
return await issue_token(user)Bot detection sur login
Sur volume anormal d'erreurs auth (> 100/min global), challenger via Captcha.
Architecture API Gateway 4 couches
Vue d'ensemble
[Internet]
│
▼
┌─────────────────────────────────────┐
│ Couche 1, WAF / Edge │
│ CloudFlare, AWS WAF, Akamai │
│ • DDoS protection │
│ • Geo-blocking │
│ • OWASP Web rules │
│ • Bot mitigation │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ Couche 2, API Gateway │
│ Kong, AWS API GW, Apigee, Tyk │
│ • Auth (OAuth, JWT, API keys) │
│ • Rate limit niveau 1 (req/s, /min) │
│ • Routing │
│ • Observability │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ Couche 3, Application middleware │
│ FastAPI / Express │
│ • Rate limit niveau 2 (token-aware) │
│ • RequestBudget per request │
│ • Cost budget │
│ • LLM Guardrails (cf article 1) │
│ • Audit log │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ Couche 4, LLM Provider │
│ OpenAI / Anthropic / Azure / local │
│ • Provider rate limits (tier) │
│ • Provider quotas │
└─────────────────────────────────────┘
Filet ultime :
┌─────────────────────────────────────┐
│ Cloud Billing Alerts │
│ AWS Budgets, GCP Billing │
│ • Notification 80% / 100% │
│ • Auto kill switch │
└─────────────────────────────────────┘
Configuration Kong (exemple)
services:
- name: llm-api
url: http://app-internal:8000
routes:
- name: chat
paths: ["/chat"]
plugins:
- name: rate-limiting
config:
minute: 30
hour: 200
day: 1000
policy: redis
redis_host: redis.internal
- name: jwt
config:
secret_is_base64: false
claims_to_verify: ["exp"]
- name: prometheus
config:
per_consumer: true
- name: request-size-limiting
config:
allowed_payload_size: 256 # KB maxConfiguration NGINX (alternative)
http {
limit_req_zone $http_x_user_id zone=user_per_min:10m rate=30r/m;
limit_req_zone $binary_remote_addr zone=ip_per_sec:10m rate=10r/s;
server {
listen 443 ssl;
server_name api.example.com;
location /chat {
limit_req zone=user_per_min burst=10 nodelay;
limit_req zone=ip_per_sec burst=5;
client_max_body_size 256K;
client_body_timeout 30s;
proxy_pass http://app-internal:8000;
proxy_set_header X-Request-Id $request_id;
proxy_read_timeout 60s;
}
}
}Monitoring et alerting
Logs structurés obligatoires
import structlog
logger = structlog.get_logger()
@app.post("/chat")
async def chat(req: ChatReq, request: Request):
request_id = str(uuid.uuid4())
# ... process ...
logger.info(
"llm_request_completed",
request_id=request_id,
user_id=user_id,
org_id=org_id,
model=req.model,
tokens_in=actual_tokens_in,
tokens_out=actual_tokens_out,
cost_usd=actual_cost,
duration_ms=duration_ms,
guardrail_blocks=guardrail_blocks,
tool_calls=tool_calls,
)Métriques Prometheus
from prometheus_client import Counter, Histogram
llm_requests = Counter("llm_requests_total", "LLM requests", ["model", "status"])
llm_tokens = Counter("llm_tokens_total", "Tokens consumed", ["model", "direction"])
llm_cost = Counter("llm_cost_usd_total", "EUR cost", ["model", "user", "org"])
llm_duration = Histogram("llm_duration_seconds", "Request duration", ["model"])Dashboards Grafana
Panels essentiels :
- Coût $/min en temps réel par org
- p95 latence par modèle
- Taux blocages guardrails
- Top users par coût (24h, 7j, 30j)
- Anomalies (coût > 3× baseline)
Alertes critiques
# alertmanager rules
groups:
- name: llm_cost
rules:
- alert: LLMOrgCostHigh
expr: increase(llm_cost_usd_total[1h]) > 100
for: 5m
annotations:
summary: "Org {{ $labels.org }} spent > 90 € in 1h"
- alert: LLMUserCostHigh
expr: increase(llm_cost_usd_total{user!=""}[1h]) > 20
for: 5m
annotations:
summary: "User {{ $labels.user }} spent > 18 € in 1h"
- alert: LLMRecursiveLoop
expr: rate(llm_tool_calls_total[1m]) > 100
annotations:
summary: "Possible recursive tool calling on {{ $labels.user }}"Erreurs récurrentes en production
Erreur 1, Rate limit en req/min seulement
Pas suffisant. Combiner avec tokens/min et coût $/jour.
Erreur 2, max_tokens contrôlé client
Faire confiance à req.max_tokens envoyé par client = DoW garanti. Forcer max_tokens côté serveur.
Erreur 3, Pas de budget per request
Agent en boucle qui fait 1000 appels = $$$. RequestBudget obligatoire sur tout agent IA.
Erreur 4, Pas de cloud billing alerts
Confiance unique aux app-level limits. Si bug ou bypass → budget grillé. Filet ultime cloud alert non négociable.
Erreur 5, Logs sans cost / tokens
Logger juste 200 OK ne permet pas l'investigation post-incident. Tokens + cost dans tous les logs.
Erreur 6, Pas de MFA sur comptes API
Compte compromis = budget vidé. MFA obligatoire pour tout compte avec accès API LLM.
Erreur 7, API key partagée entre users
Si une key fuit, impossible d'identifier le coupable. Une key par user OU JWT signé pour traçabilité.
Ce que ça change pour votre dispositif
Une API LLM sécurisée 2026 :
- 7 dimensions de rate limit / quotas combinées
- 5 couches anti-DoW (cap user, max_tokens server, anti-input-long, RequestBudget, cloud billing)
- Architecture 4 couches (WAF → Gateway → app → provider) + cloud billing en filet
- MFA + HaveIBeenPwned + bot detection sur login
- Watermarking + Captcha conditionnel anti-extraction
- Logs + métriques + dashboards + alertes sur coût, latence, anomalies
ROI : protection contre incidents documentés 2024-2026 où des apps ont brûlé 9k €-100k+ en heures suite à compromis ou bug. Coût implémentation : 1-2 ETP × 2-3 mois pour un setup robuste, ensuite maintenance < 0.1 ETP en routine.
C'est l'hygiène de base d'une API LLM enterprise, sans laquelle tout le reste (guardrails, audit, red team) reste fragile.
Pour aller plus loin : la suite naturelle est de durcir le modèle de menace lui-même via threat modeling adapté LLM (STRIDE-AI, MAESTRO), avant de coder les défenses, définir formellement ce contre quoi on défend. À découvrir dans le prochain article du cluster.







