Databricks + carbon-llm — pull les audit logs Mosaic AI Gateway
Tracker l'empreinte carbone de chaque LLM call routé via Databricks Mosaic AI Gateway en lisant la table système system.ai.usage_log. Job Databricks horaire qui forward les events vers /api/v1/track. Multi-workspace, multi-catalog Unity.
Databricks Mosaic AI Gateway (anciennement AI Gateway, GA depuis 2024) intercepte tous les LLM calls qui passent par les Serving Endpoints — qu'ils ciblent un foundation model API (OpenAI, Anthropic, Cohere via Marketplace) ou un modèle servi en interne (Llama, Mistral fine-tunés). Chaque appel est loggé dans la vue Unity Catalog `system.ai.usage_log`.
Avantage de l'approche pull (vs instrumenter chaque notebook) : aucun changement de code dans les notebooks utilisateurs, le tracking est transparent. Vos data scientists continuent d'utiliser `mlflow.deployments.get_deploy_client()` ou OpenAI SDK pointé sur l'endpoint Databricks — carbon-llm collecte derrière sans qu'ils s'en rendent compte.
Inconvénient : latence de tracking +1 heure (la table système est mise à jour avec un délai). Acceptable pour un reporting CSRD mensuel, pas pour des alertes temps réel. Si vous voulez du temps réel, instrumentez côté code (cf. /guides/langchain-llm-carbon-tracking).
Prérequis : workspace Databricks avec Unity Catalog activé + permissions Account Admin pour activer la table système.
Console Databricks — Account Admin
# 1. Account Console → Previews → "system.ai schema" : Enable
# 2. Workspace → Catalog → system → ai → usage_log : Verify Read access
# 3. Pour chaque Serving Endpoint qui sert des LLM :
# Serving → <endpoint_name> → Edit → AI Gateway tab :
# [x] Enable usage tracking
# [x] Enable payload logging (optionnel — utile pour debug, pas requis pour carbon)
# 4. Vérification SQL :
SELECT * FROM system.ai.usage_log
WHERE request_time > current_timestamp() - INTERVAL 1 HOUR
LIMIT 10;Pas de clé en clair dans les notebooks. Utilisez le secret store Databricks (Unity Catalog ou legacy KV scopes).
CLI databricks-cli
# Créer un scope si pas déjà fait
databricks secrets create-scope carbon-llm
# Stocker la clé (lecture interactive — pas dans l'historique shell)
databricks secrets put-secret carbon-llm carbon-llm-api-key
# (paste live_xxx en stdin, Ctrl+D)
# Optionnel — base URL si vous routez par un proxy
databricks secrets put-secret carbon-llm carbon-llm-base-url
# (paste https://carbon-llm.com)
# Vérifier
databricks secrets list-secrets carbon-llmLe coeur du système. Lit la dernière heure de `system.ai.usage_log`, dérive un event_id idempotent, POST chaque row vers carbon-llm.
Notebook : carbon_llm_sync_hourly.py
# Databricks notebook source
# COMMAND ----------
import requests
import json
from datetime import datetime, timedelta, timezone
# COMMAND ----------
API_KEY = dbutils.secrets.get(scope="carbon-llm", key="carbon-llm-api-key")
BASE_URL = "https://carbon-llm.com"
# Workspace + catalog → tenant_id naturel pour le multi-tenant
WORKSPACE_ID = spark.conf.get("spark.databricks.workspaceUrl", "unknown")
# COMMAND ----------
# Lire la dernière heure depuis system.ai.usage_log
# Le filtre is null sur error garantit qu'on ne compte que les calls réussis
df = spark.sql("""
SELECT
request_time,
endpoint_name,
served_entity_name AS model,
request_token_count AS prompt_tokens,
response_token_count AS completion_tokens,
request_id,
identity_metadata.run_as AS run_as_user,
request_metadata
FROM system.ai.usage_log
WHERE request_time > current_timestamp() - INTERVAL 1 HOUR + INTERVAL 5 MINUTES
AND request_time <= current_timestamp() - INTERVAL 5 MINUTES
AND served_entity_name IS NOT NULL
AND request_token_count IS NOT NULL
AND error_code IS NULL
""")
print(f"Rows à forwarder : {df.count()}")
# COMMAND ----------
def forward_event(row):
"""POST vers /api/v1/track avec event_id idempotent dérivé de request_id."""
payload = {
"event_id": f"databricks:{WORKSPACE_ID}:{row.request_id}",
"model": row.model,
"prompt_tokens": int(row.prompt_tokens or 0),
"completion_tokens": int(row.completion_tokens or 0),
"tenant_id": f"databricks:{row.endpoint_name}",
"ingestion_source": "databricks_ai_gateway",
"client_instance_id": row.run_as_user or "unknown",
}
try:
r = requests.post(
f"{BASE_URL}/api/v1/track",
headers={
"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json",
},
json=payload,
timeout=10.0,
)
# 202 Accepted ou 409 Conflict (replay event_id) sont OK
return r.status_code in (200, 202, 409)
except Exception as e:
print(f"forward_event failed: {e}")
return False
# Foreach séquentiel — pour les volumes Databricks (1k-100k events/h),
# pas besoin de paralléliser. Le bottleneck est carbon-llm (rate-limit Pro
# = 600 req/min) pas Databricks.
results = [forward_event(row) for row in df.collect()]
print(f"Successfully forwarded: {sum(results)} / {len(results)}")Le notebook ci-dessus tourne en quelques secondes pour 10k events. Schedule horaire : Workflows → Create Job → notebook task → cron `0 * * * * ?` → use existing all-purpose cluster (suffit, pas besoin de jobs cluster dédié).
Notification email sur failure recommandée — un échec silencieux ferait perdre 1 heure de tracking.
Job JSON spec (workflows.create)
{
"name": "carbon-llm-sync-hourly",
"tasks": [{
"task_key": "sync_to_carbon_llm",
"notebook_task": {
"notebook_path": "/Users/data-eng/carbon_llm_sync_hourly",
"source": "WORKSPACE"
},
"existing_cluster_id": "0123-456789-abcdef"
}],
"schedule": {
"quartz_cron_expression": "0 0 * * * ?",
"timezone_id": "Europe/Paris",
"pause_status": "UNPAUSED"
},
"email_notifications": {
"on_failure": ["data-eng@company.com"]
},
"max_concurrent_runs": 1
}Si vous avez plusieurs workspaces Databricks (DEV / PROD, ou par BU), répliquez le notebook dans chaque + un tenant_id distinct par workspace dans le payload (`f"databricks:{WORKSPACE_ID}:{endpoint_name}"`). carbon-llm filtrera par workspace dans le dashboard.
Pour le multi-catalog Unity (ex: catalog_finance, catalog_marketing), ajoutez le catalog dans le tenant_id : la table `system.ai.usage_log` n'a pas la colonne catalog directement mais `endpoint_name` reflète souvent l'organisation (`finance_summarizer_endpoint`).
1. **system.ai schema not enabled** — la vue n'est pas activée par défaut. Account admin doit le faire dans Account Console > Previews. Sans ça, le SELECT échoue.
2. **Délai de remplissage** — la table système est mise à jour toutes les ~10 min, parfois plus. Le filtre `INTERVAL 1 HOUR + INTERVAL 5 MINUTES` (ne lit pas la dernière 5 min) évite de manquer des events.
3. **Modèles externes (Anthropic via Marketplace) sans tokens** — certains foundation models routés via Marketplace ne peuplent pas request_token_count. Vérifiez avec une query SELECT et complétez via le connecteur Pro Anthropic Admin si nécessaire.
4. **rate_limit_exceeded carbon-llm** — Pro = 600 req/min. Si vous avez des bursts > 10/sec, le notebook ralentira. Solution : ajouter un sleep entre les rows OU passer en mode batch (groupBy heure → 1 event aggregé par modèle/heure/tenant).
Prêt à tracer Databricks AI Gateway ?
Gratuit pendant l'early access. Toutes les features Pro débloquées (export ESRS E1-6, bundle audit signé) — sans CB, sans engagement.