Carbon-LLMGuides · Databricks
Retour à l'accueil

Databricks + carbon-llm — pull les audit logs Mosaic AI Gateway

Tracker l'empreinte carbone de chaque LLM call routé via Databricks Mosaic AI Gateway en lisant la table système system.ai.usage_log. Job Databricks horaire qui forward les events vers /api/v1/track. Multi-workspace, multi-catalog Unity.

Pourquoi cette approche pull-based pour Databricks ?

Databricks Mosaic AI Gateway (anciennement AI Gateway, GA depuis 2024) intercepte tous les LLM calls qui passent par les Serving Endpoints — qu'ils ciblent un foundation model API (OpenAI, Anthropic, Cohere via Marketplace) ou un modèle servi en interne (Llama, Mistral fine-tunés). Chaque appel est loggé dans la vue Unity Catalog `system.ai.usage_log`.

Avantage de l'approche pull (vs instrumenter chaque notebook) : aucun changement de code dans les notebooks utilisateurs, le tracking est transparent. Vos data scientists continuent d'utiliser `mlflow.deployments.get_deploy_client()` ou OpenAI SDK pointé sur l'endpoint Databricks — carbon-llm collecte derrière sans qu'ils s'en rendent compte.

Inconvénient : latence de tracking +1 heure (la table système est mise à jour avec un délai). Acceptable pour un reporting CSRD mensuel, pas pour des alertes temps réel. Si vous voulez du temps réel, instrumentez côté code (cf. /guides/langchain-llm-carbon-tracking).

Activer AI Gateway usage tracking

Prérequis : workspace Databricks avec Unity Catalog activé + permissions Account Admin pour activer la table système.

Console Databricks — Account Admin

# 1. Account Console → Previews → "system.ai schema" : Enable
# 2. Workspace → Catalog → system → ai → usage_log : Verify Read access

# 3. Pour chaque Serving Endpoint qui sert des LLM :
#    Serving → <endpoint_name> → Edit → AI Gateway tab :
#    [x] Enable usage tracking
#    [x] Enable payload logging (optionnel — utile pour debug, pas requis pour carbon)

# 4. Vérification SQL :
SELECT * FROM system.ai.usage_log
WHERE request_time > current_timestamp() - INTERVAL 1 HOUR
LIMIT 10;
Stocker la clé carbon-llm en secret Databricks

Pas de clé en clair dans les notebooks. Utilisez le secret store Databricks (Unity Catalog ou legacy KV scopes).

CLI databricks-cli

# Créer un scope si pas déjà fait
databricks secrets create-scope carbon-llm

# Stocker la clé (lecture interactive — pas dans l'historique shell)
databricks secrets put-secret carbon-llm carbon-llm-api-key
# (paste live_xxx en stdin, Ctrl+D)

# Optionnel — base URL si vous routez par un proxy
databricks secrets put-secret carbon-llm carbon-llm-base-url
# (paste https://carbon-llm.com)

# Vérifier
databricks secrets list-secrets carbon-llm
Notebook Python qui pull et forward

Le coeur du système. Lit la dernière heure de `system.ai.usage_log`, dérive un event_id idempotent, POST chaque row vers carbon-llm.

Notebook : carbon_llm_sync_hourly.py

# Databricks notebook source
# COMMAND ----------

import requests
import json
from datetime import datetime, timedelta, timezone

# COMMAND ----------

API_KEY = dbutils.secrets.get(scope="carbon-llm", key="carbon-llm-api-key")
BASE_URL = "https://carbon-llm.com"

# Workspace + catalog → tenant_id naturel pour le multi-tenant
WORKSPACE_ID = spark.conf.get("spark.databricks.workspaceUrl", "unknown")

# COMMAND ----------

# Lire la dernière heure depuis system.ai.usage_log
# Le filtre is null sur error garantit qu'on ne compte que les calls réussis
df = spark.sql("""
    SELECT
        request_time,
        endpoint_name,
        served_entity_name AS model,
        request_token_count AS prompt_tokens,
        response_token_count AS completion_tokens,
        request_id,
        identity_metadata.run_as AS run_as_user,
        request_metadata
    FROM system.ai.usage_log
    WHERE request_time > current_timestamp() - INTERVAL 1 HOUR + INTERVAL 5 MINUTES
      AND request_time <= current_timestamp() - INTERVAL 5 MINUTES
      AND served_entity_name IS NOT NULL
      AND request_token_count IS NOT NULL
      AND error_code IS NULL
""")

print(f"Rows à forwarder : {df.count()}")

# COMMAND ----------

def forward_event(row):
    """POST vers /api/v1/track avec event_id idempotent dérivé de request_id."""
    payload = {
        "event_id": f"databricks:{WORKSPACE_ID}:{row.request_id}",
        "model": row.model,
        "prompt_tokens": int(row.prompt_tokens or 0),
        "completion_tokens": int(row.completion_tokens or 0),
        "tenant_id": f"databricks:{row.endpoint_name}",
        "ingestion_source": "databricks_ai_gateway",
        "client_instance_id": row.run_as_user or "unknown",
    }

    try:
        r = requests.post(
            f"{BASE_URL}/api/v1/track",
            headers={
                "Authorization": f"Bearer {API_KEY}",
                "Content-Type": "application/json",
            },
            json=payload,
            timeout=10.0,
        )
        # 202 Accepted ou 409 Conflict (replay event_id) sont OK
        return r.status_code in (200, 202, 409)
    except Exception as e:
        print(f"forward_event failed: {e}")
        return False

# Foreach séquentiel — pour les volumes Databricks (1k-100k events/h),
# pas besoin de paralléliser. Le bottleneck est carbon-llm (rate-limit Pro
# = 600 req/min) pas Databricks.
results = [forward_event(row) for row in df.collect()]
print(f"Successfully forwarded: {sum(results)} / {len(results)}")
Planifier en Job horaire

Le notebook ci-dessus tourne en quelques secondes pour 10k events. Schedule horaire : Workflows → Create Job → notebook task → cron `0 * * * * ?` → use existing all-purpose cluster (suffit, pas besoin de jobs cluster dédié).

Notification email sur failure recommandée — un échec silencieux ferait perdre 1 heure de tracking.

Job JSON spec (workflows.create)

{
  "name": "carbon-llm-sync-hourly",
  "tasks": [{
    "task_key": "sync_to_carbon_llm",
    "notebook_task": {
      "notebook_path": "/Users/data-eng/carbon_llm_sync_hourly",
      "source": "WORKSPACE"
    },
    "existing_cluster_id": "0123-456789-abcdef"
  }],
  "schedule": {
    "quartz_cron_expression": "0 0 * * * ?",
    "timezone_id": "Europe/Paris",
    "pause_status": "UNPAUSED"
  },
  "email_notifications": {
    "on_failure": ["data-eng@company.com"]
  },
  "max_concurrent_runs": 1
}
Multi-workspace + multi-catalog Unity

Si vous avez plusieurs workspaces Databricks (DEV / PROD, ou par BU), répliquez le notebook dans chaque + un tenant_id distinct par workspace dans le payload (`f"databricks:{WORKSPACE_ID}:{endpoint_name}"`). carbon-llm filtrera par workspace dans le dashboard.

Pour le multi-catalog Unity (ex: catalog_finance, catalog_marketing), ajoutez le catalog dans le tenant_id : la table `system.ai.usage_log` n'a pas la colonne catalog directement mais `endpoint_name` reflète souvent l'organisation (`finance_summarizer_endpoint`).

Erreurs courantes

1. **system.ai schema not enabled** — la vue n'est pas activée par défaut. Account admin doit le faire dans Account Console > Previews. Sans ça, le SELECT échoue.

2. **Délai de remplissage** — la table système est mise à jour toutes les ~10 min, parfois plus. Le filtre `INTERVAL 1 HOUR + INTERVAL 5 MINUTES` (ne lit pas la dernière 5 min) évite de manquer des events.

3. **Modèles externes (Anthropic via Marketplace) sans tokens** — certains foundation models routés via Marketplace ne peuplent pas request_token_count. Vérifiez avec une query SELECT et complétez via le connecteur Pro Anthropic Admin si nécessaire.

4. **rate_limit_exceeded carbon-llm** — Pro = 600 req/min. Si vous avez des bursts > 10/sec, le notebook ralentira. Solution : ajouter un sleep entre les rows OU passer en mode batch (groupBy heure → 1 event aggregé par modèle/heure/tenant).

Prêt à tracer Databricks AI Gateway ?

Gratuit pendant l'early access. Toutes les features Pro débloquées (export ESRS E1-6, bundle audit signé) — sans CB, sans engagement.