ConversationsGuides
Exportação de dados (ETL)
Como extrair sessões e mensagens para seu data warehouse ou BI
Os endpoints /data/ foram projetados para exportação em massa de conversas. Use-os para alimentar seu data warehouse, dashboard de BI ou qualquer pipeline de dados.
Estratégia de sync incremental
A abordagem mais eficiente é o sync incremental: buscar apenas os registros criados ou atualizados desde a última extração.
Passo a passo
- Na primeira execução, extraia tudo (sem filtros de data)
- Salve o timestamp da execução
- Nas execuções seguintes, use
updated_aftercom o timestamp salvo - Use
sort_by=updated_atpara garantir ordenação consistente
import httpx
from datetime import datetime, timezone
BASE_URL = "https://services.antonnia.com/conversations/v2/api/v1"
HEADERS = {"Authorization": "Bearer sk_live_YOUR_TOKEN"}
def sync_sessions(last_sync: str | None = None) -> list[dict]:
"""Fetch all sessions created/updated since last sync."""
sessions = []
after_id = None
while True:
params: dict = {"limit": 3000, "sort_by": "updated_at"}
if last_sync:
params["updated_after"] = last_sync
if after_id:
params["after_id"] = after_id
response = httpx.get(
f"{BASE_URL}/data/sessions", headers=HEADERS, params=params
)
response.raise_for_status()
page = response.json()
if not page:
break
sessions.extend(page)
after_id = page[-1]["id"]
return sessions
def sync_messages(last_sync: str | None = None) -> list[dict]:
"""Fetch all messages created/updated since last sync."""
messages = []
after_id = None
while True:
params: dict = {"limit": 3000, "sort_by": "updated_at"}
if last_sync:
params["updated_after"] = last_sync
if after_id:
params["after_id"] = after_id
response = httpx.get(
f"{BASE_URL}/data/messages", headers=HEADERS, params=params
)
response.raise_for_status()
page = response.json()
if not page:
break
messages.extend(page)
after_id = page[-1]["id"]
return messages
# Usage
last_sync = None # First run: fetch everything
# last_sync = "2025-01-14T00:00:00Z" # Subsequent runs
sync_start = datetime.now(timezone.utc).isoformat()
sessions = sync_sessions(last_sync)
messages = sync_messages(last_sync)
# Save sync_start for next run
print(f"Synced {len(sessions)} sessions, {len(messages)} messages")Use updated_after (não created_after) para sync incremental. Isso garante que sessões e mensagens que foram atualizadas (ex: status mudou, mensagem entregue) também sejam capturadas.
Busca em lote por IDs
Se você precisa buscar sessões específicas (ex: a partir de eventos de webhook), use o endpoint batch:
def get_sessions_by_ids(session_ids: list[str]) -> list[dict]:
"""Fetch specific sessions by ID."""
response = httpx.get(
f"{BASE_URL}/data/sessions/batch",
headers=HEADERS,
params=[("session_ids", sid) for sid in session_ids],
)
response.raise_for_status()
return response.json()
# Example: hydrate sessions from webhook events
session_ids = ["sess_abc123", "sess_def456", "sess_ghi789"]
sessions = get_sessions_by_ids(session_ids)Filtragem por período
Para relatórios ou extrações pontuais, use os filtros de data:
# Sessions created in January 2025
curl -X GET "https://services.antonnia.com/conversations/v2/api/v1/data/sessions?\
created_after=2025-01-01T00:00:00Z&\
created_before=2025-02-01T00:00:00Z&\
limit=3000" \
-H "Authorization: Bearer sk_live_YOUR_TOKEN"Filtros disponíveis:
created_after/created_before— filtrar por data de criaçãoupdated_after/updated_before— filtrar por data de atualização- Combine filtros de criação e atualização na mesma requisição
Boas práticas
| Prática | Motivo |
|---|---|
Use limit=3000 para extração em massa | Reduz o número de requisições |
Use sort_by=updated_at para sync incremental | Captura registros atualizados, não só novos |
| Salve o timestamp antes de iniciar o sync | Evita perder registros criados durante a extração |
| Pagine até receber array vazio | Não confie no count — use o cursor |
| Use o endpoint batch para IDs conhecidos | Mais eficiente que N requisições individuais |