Antonnia Docs

Construindo uma Integração de Canal

Guia completo para conectar um canal de mensagens à Antonnia

Este guia orienta você na construção de uma integração de canal completa — desde o recebimento de mensagens de usuários até a entrega de respostas da IA. Todas as integrações de produção da Antonnia (WhatsApp, HubSpot, ZAPI, Syngoo) seguem exatamente este padrão.

Arquitetura

Sua integração de canal fica entre sua plataforma de mensagens e a API de Conversas da Antonnia:

Usuário Final ←→ API do Seu Canal ←→ Sua Integração ←→ API da Antonnia

Sua integração lida com dois fluxos:

  1. Entrada — Usuário envia uma mensagem → você encaminha para a Antonnia → a IA processa
  2. Saída — IA gera uma resposta → Antonnia dispara um webhook → você entrega pelo seu canal

Fluxo de entrada

Receba a mensagem do seu canal

A API do seu canal entrega mensagens recebidas ao seu endpoint de webhook. Extraia a identidade do remetente, o conteúdo da mensagem e quaisquer IDs específicos do canal.

@app.post("/webhook/incoming")
async def handle_incoming(request: Request):
    event = await request.json()
    sender_id = event["from"]
    message_text = event["text"]
    channel_msg_id = event["message_id"]
    instance_id = event["instance_id"]

Adquira um lock distribuído

Múltiplas mensagens do mesmo usuário podem chegar simultaneamente. Use um lock distribuído para evitar a criação de sessões duplicadas.

async with redis_lock(f"lock:mychannel:{sender_id}:{instance_id}", timeout=30):
    session = await get_or_create_session(sender_id, instance_id)
    # ... processar mensagem

Padrão da chave do lock: lock:{channel}:{contact_id}:{channel_instance_id}

O lock deve incluir tanto o contato quanto a instância do canal para evitar colisões entre canais. Use um timeout de 30 segundos.

Encontre ou crie uma sessão

Busque por uma sessão aberta existente. Se não existir, crie uma nova.

async def get_or_create_session(sender_id: str, instance_id: str):
    # Search for existing open session
    sessions = httpx.post(f"{BASE_URL}/sessions/search", headers=HEADERS, json={
        "contact_id": sender_id,
        "status": "open",
        "metadata": {"mychannel_instance_id": instance_id},
    }).json()

    if sessions:
        return sessions[0]

    # Create new session
    return httpx.post(f"{BASE_URL}/sessions", headers=HEADERS, json={
        "contact_id": sender_id,
        "contact_name": sender_name,
        "agent_id": MY_AGENT_ID,
        "metadata": {
            "mychannel_instance_id": instance_id,
            "mychannel_user_id": sender_id,
        },
    }).json()

Sempre filtre por metadata (ex.: mychannel_instance_id) além do contact_id. Isso garante que um contato possa ter sessões separadas em diferentes instâncias de canal.

Crie a mensagem na Antonnia

message = httpx.post(
    f"{BASE_URL}/sessions/{session['id']}/messages",
    headers=HEADERS,
    json={
        "content": {"type": "text", "text": message_text},
        "role": "user",
        "provider_message_id": channel_msg_id,
    },
).json()

Dispare a resposta da IA

if session["agent"] and session["agent"]["type"] == "ai":
    httpx.post(
        f"{BASE_URL}/sessions/{session['id']}/reply",
        headers=HEADERS,
        json={"debounce_time": 5},
    )

O debounce_time (em segundos) agrupa mensagens sequenciais rápidas em um único turno da IA. Use 3 a 5 segundos para canais de chat.

Fluxo de saída

Receba o evento do webhook

Registre um endpoint de webhook para receber eventos message.created:

@app.post("/webhook/antonnia")
async def handle_antonnia_event(request: Request):
    event = await request.json()

    if event["type"] == "message.created":
        await handle_outbound_message(event["data"]["object"])

Filtre e direcione a mensagem

async def handle_outbound_message(message: dict):
    # Only process assistant messages
    if message["role"] != "assistant":
        return

    # Skip internal AI content types
    if message["content"]["type"] in ("function_call", "function_result", "thought"):
        return

    # Get session for routing metadata
    session = httpx.get(
        f"{BASE_URL}/sessions/{message['session_id']}",
        headers=HEADERS,
    ).json()

    instance_id = session["metadata"]["mychannel_instance_id"]
    user_id = session["metadata"]["mychannel_user_id"]

Entregue pelo seu canal

    # Send via your channel's API
    content = message["content"]
    if content["type"] == "text":
        channel_msg_id = await my_channel_api.send_text(instance_id, user_id, content["text"])
    elif content["type"] == "image":
        channel_msg_id = await my_channel_api.send_image(instance_id, user_id, content["url"])
    # ... handle other content types

Atualize o status de entrega

    # Report back to Antonnia
    httpx.patch(
        f"{BASE_URL}/sessions/{message['session_id']}/messages/{message['id']}",
        headers=HEADERS,
        json={
            "provider_message_id": channel_msg_id,
            "delivery_status": "sent",
        },
    )

Exemplo completo

import httpx
import redis.asyncio as redis

BASE_URL = "https://services.antonnia.com/conversations/v2/api/v1"
HEADERS = {"Authorization": "Bearer sk_live_YOUR_TOKEN"}
AGENT_ID = "agent_abc"

redis_client = redis.Redis()

async def lock(key: str, timeout: int = 30):
    return redis_client.lock(key, timeout=timeout)

# Inbound: channel → Antonnia
@app.post("/webhook/incoming")
async def handle_incoming(request: Request):
    event = await request.json()
    sender_id = event["from"]
    instance_id = event["instance_id"]

    async with await lock(f"lock:mychannel:{sender_id}:{instance_id}"):
        session = await get_or_create_session(sender_id, instance_id)

        httpx.post(
            f"{BASE_URL}/sessions/{session['id']}/messages",
            headers=HEADERS,
            json={
                "content": {"type": "text", "text": event["text"]},
                "role": "user",
                "provider_message_id": event["message_id"],
            },
        )

        if session.get("agent", {}).get("type") == "ai":
            httpx.post(
                f"{BASE_URL}/sessions/{session['id']}/reply",
                headers=HEADERS,
                json={"debounce_time": 5},
            )

# Outbound: Antonnia → channel
@app.post("/webhook/antonnia")
async def handle_antonnia_event(request: Request):
    event = await request.json()
    if event["type"] != "message.created":
        return
    message = event["data"]["object"]
    if message["role"] != "assistant":
        return
    if message["content"]["type"] in ("function_call", "function_result", "thought"):
        return

    session = httpx.get(
        f"{BASE_URL}/sessions/{message['session_id']}",
        headers=HEADERS,
    ).json()

    channel_msg_id = await my_channel_api.send(
        session["metadata"]["mychannel_instance_id"],
        session["metadata"]["mychannel_user_id"],
        message["content"],
    )

    httpx.patch(
        f"{BASE_URL}/sessions/{message['session_id']}/messages/{message['id']}",
        headers=HEADERS,
        json={"provider_message_id": channel_msg_id, "delivery_status": "sent"},
    )

Próximos passos

On this page