Antonnia Docs
ConversationsGuides

Exportação de dados (ETL)

Como extrair sessões e mensagens para seu data warehouse ou BI

Os endpoints /data/ foram projetados para exportação em massa de conversas. Use-os para alimentar seu data warehouse, dashboard de BI ou qualquer pipeline de dados.

Estratégia de sync incremental

A abordagem mais eficiente é o sync incremental: buscar apenas os registros criados ou atualizados desde a última extração.

Passo a passo

  1. Na primeira execução, extraia tudo (sem filtros de data)
  2. Salve o timestamp da execução
  3. Nas execuções seguintes, use updated_after com o timestamp salvo
  4. Use sort_by=updated_at para garantir ordenação consistente
import httpx
from datetime import datetime, timezone

BASE_URL = "https://services.antonnia.com/conversations/v2/api/v1"
HEADERS = {"Authorization": "Bearer sk_live_YOUR_TOKEN"}


def sync_sessions(last_sync: str | None = None) -> list[dict]:
    """Fetch all sessions created/updated since last sync."""
    sessions = []
    after_id = None

    while True:
        params: dict = {"limit": 3000, "sort_by": "updated_at"}
        if last_sync:
            params["updated_after"] = last_sync
        if after_id:
            params["after_id"] = after_id

        response = httpx.get(
            f"{BASE_URL}/data/sessions", headers=HEADERS, params=params
        )
        response.raise_for_status()
        page = response.json()

        if not page:
            break

        sessions.extend(page)
        after_id = page[-1]["id"]

    return sessions


def sync_messages(last_sync: str | None = None) -> list[dict]:
    """Fetch all messages created/updated since last sync."""
    messages = []
    after_id = None

    while True:
        params: dict = {"limit": 3000, "sort_by": "updated_at"}
        if last_sync:
            params["updated_after"] = last_sync
        if after_id:
            params["after_id"] = after_id

        response = httpx.get(
            f"{BASE_URL}/data/messages", headers=HEADERS, params=params
        )
        response.raise_for_status()
        page = response.json()

        if not page:
            break

        messages.extend(page)
        after_id = page[-1]["id"]

    return messages


# Usage
last_sync = None  # First run: fetch everything
# last_sync = "2025-01-14T00:00:00Z"  # Subsequent runs

sync_start = datetime.now(timezone.utc).isoformat()
sessions = sync_sessions(last_sync)
messages = sync_messages(last_sync)

# Save sync_start for next run
print(f"Synced {len(sessions)} sessions, {len(messages)} messages")

Use updated_after (não created_after) para sync incremental. Isso garante que sessões e mensagens que foram atualizadas (ex: status mudou, mensagem entregue) também sejam capturadas.

Busca em lote por IDs

Se você precisa buscar sessões específicas (ex: a partir de eventos de webhook), use o endpoint batch:

def get_sessions_by_ids(session_ids: list[str]) -> list[dict]:
    """Fetch specific sessions by ID."""
    response = httpx.get(
        f"{BASE_URL}/data/sessions/batch",
        headers=HEADERS,
        params=[("session_ids", sid) for sid in session_ids],
    )
    response.raise_for_status()
    return response.json()


# Example: hydrate sessions from webhook events
session_ids = ["sess_abc123", "sess_def456", "sess_ghi789"]
sessions = get_sessions_by_ids(session_ids)

Filtragem por período

Para relatórios ou extrações pontuais, use os filtros de data:

# Sessions created in January 2025
curl -X GET "https://services.antonnia.com/conversations/v2/api/v1/data/sessions?\
created_after=2025-01-01T00:00:00Z&\
created_before=2025-02-01T00:00:00Z&\
limit=3000" \
  -H "Authorization: Bearer sk_live_YOUR_TOKEN"

Filtros disponíveis:

  • created_after / created_before — filtrar por data de criação
  • updated_after / updated_before — filtrar por data de atualização
  • Combine filtros de criação e atualização na mesma requisição

Boas práticas

PráticaMotivo
Use limit=3000 para extração em massaReduz o número de requisições
Use sort_by=updated_at para sync incrementalCaptura registros atualizados, não só novos
Salve o timestamp antes de iniciar o syncEvita perder registros criados durante a extração
Pagine até receber array vazioNão confie no count — use o cursor
Use o endpoint batch para IDs conhecidosMais eficiente que N requisições individuais

On this page