Antonnia Docs

Padrão de Integração WhatsApp

Padrão real para uma integração de canal WhatsApp

Este exemplo mostra o padrão completo de integração utilizado pelo handler de WhatsApp em produção da Antonnia. Use-o como referência para construir integrações semelhantes ao WhatsApp (mensagens diretas, roteamento baseado em número de telefone).

Arquitetura

User Phone ←→ WhatsApp Cloud API ←→ Your Service ←→ Antonnia API
  • Entrada: O WhatsApp entrega mensagens ao seu webhook → você as cria na Antonnia
  • Saída: A Antonnia dispara message.created → você envia via WhatsApp Cloud API
  • Status: O WhatsApp envia confirmações de entrega → você atualiza o status da mensagem na Antonnia

Metadata da sessão

Armazene as informações de roteamento do WhatsApp no metadata da sessão:

{
  "whatsapp_phone_number": "5511999888777",
  "whatsapp_phone_number_id": "phone_id_123"
}

Handler de entrada

Receber webhook do WhatsApp

@app.post("/webhook/whatsapp")
async def handle_whatsapp(request: Request):
    body = await request.json()

    for entry in body.get("entry", []):
        for change in entry.get("changes", []):
            if change["field"] == "messages":
                for message in change["value"].get("messages", []):
                    await process_incoming(
                        phone_number_id=change["value"]["metadata"]["phone_number_id"],
                        sender_phone=message["from"],
                        sender_name=change["value"]["contacts"][0]["profile"]["name"],
                        message=message,
                    )

Obter ou criar sessão com lock

async def process_incoming(phone_number_id, sender_phone, sender_name, message):
    # Lock to prevent duplicate sessions
    async with redis_lock(f"lock:whatsapp:{sender_phone}:{phone_number_id}", timeout=30):
        session = await get_or_create_session(phone_number_id, sender_phone, sender_name)

        # Build content from WhatsApp message type
        content = build_content(message)

        # Create message in Antonnia
        httpx.post(
            f"{BASE_URL}/sessions/{session['id']}/messages",
            headers=HEADERS,
            json={
                "content": content,
                "role": "user",
                "provider_message_id": message["id"],
            },
        )

        # Trigger AI reply
        if session["agent"] and session["agent"]["type"] == "ai":
            httpx.post(
                f"{BASE_URL}/sessions/{session['id']}/reply",
                headers=HEADERS,
                json={"debounce_time": 5},
            )

Mapear tipos de mensagem do WhatsApp para conteúdo da Antonnia

def build_content(wa_message: dict) -> dict:
    msg_type = wa_message["type"]

    if msg_type == "text":
        return {"type": "text", "text": wa_message["text"]["body"]}

    elif msg_type == "image":
        url = download_whatsapp_media(wa_message["image"]["id"])
        return {"type": "image", "url": url}

    elif msg_type == "audio":
        url = download_whatsapp_media(wa_message["audio"]["id"])
        return {"type": "audio", "url": url}

    elif msg_type == "document":
        url = download_whatsapp_media(wa_message["document"]["id"])
        return {
            "type": "file",
            "url": url,
            "mime_type": wa_message["document"].get("mime_type", "application/octet-stream"),
            "name": wa_message["document"].get("filename", "document"),
        }

    elif msg_type == "location":
        loc = wa_message["location"]
        return {
            "type": "text",
            "text": f"Location: {loc['latitude']}, {loc['longitude']}"
                    + (f" - {loc['name']}" if loc.get("name") else ""),
        }

    elif msg_type == "interactive":
        reply = wa_message["interactive"]
        if reply["type"] == "button_reply":
            return {"type": "text", "text": reply["button_reply"]["title"]}
        elif reply["type"] == "list_reply":
            return {"type": "text", "text": reply["list_reply"]["title"]}

    # Fallback
    return {"type": "text", "text": f"[{msg_type} message]"}

As URLs de mídia do WhatsApp são temporárias. Baixe a mídia usando a Graph API e hospede-a em uma URL permanente antes de criar a mensagem.

Handler de saída

@app.post("/webhook/antonnia")
async def handle_antonnia_event(request: Request):
    event = await request.json()
    if event["type"] != "message.created":
        return

    message = event["data"]["object"]
    if message["role"] != "assistant":
        return
    if message["content"]["type"] in ("function_call", "function_result", "thought"):
        return

    # Get session for routing metadata
    session = httpx.get(
        f"{BASE_URL}/sessions/{message['session_id']}",
        headers=HEADERS,
    ).json()

    phone_number = session["metadata"]["whatsapp_phone_number"]
    phone_number_id = session["metadata"]["whatsapp_phone_number_id"]

    # Send via WhatsApp Cloud API
    content = message["content"]
    if content["type"] == "text":
        wa_msg_id = send_whatsapp_text(phone_number_id, phone_number, content["text"])
    elif content["type"] == "image":
        wa_msg_id = send_whatsapp_image(phone_number_id, phone_number, content["url"])
    elif content["type"] == "audio":
        wa_msg_id = send_whatsapp_audio(phone_number_id, phone_number, content["url"])
    elif content["type"] == "file":
        wa_msg_id = send_whatsapp_document(phone_number_id, phone_number, content["url"], content.get("name"))
    else:
        return  # Unsupported type

    # Update delivery status
    httpx.patch(
        f"{BASE_URL}/sessions/{message['session_id']}/messages/{message['id']}",
        headers=HEADERS,
        json={"provider_message_id": wa_msg_id, "delivery_status": "sent"},
    )

Rastreamento de status de entrega

O WhatsApp envia confirmações de entrega de volta ao seu webhook. Mapeie-as para os status da Antonnia:

STATUS_MAP = {
    "sent": "sent",
    "delivered": "delivered",
    "read": "read",
    "failed": "failed",
}

async def handle_status_update(wa_status: dict):
    status = STATUS_MAP.get(wa_status["status"])
    if not status:
        return

    # Find message by provider_message_id
    messages = httpx.post(
        f"{BASE_URL}/sessions/{session_id}/messages/search",
        headers=HEADERS,
        json={"provider_message_id": wa_status["id"]},
    ).json()

    if not messages:
        return  # Message not yet created — expected race condition

    update = {"delivery_status": status}
    if status == "failed" and wa_status.get("errors"):
        error = wa_status["errors"][0]
        update["delivery_error_code"] = error.get("code")
        update["delivery_error_message"] = error.get("title")

    httpx.patch(
        f"{BASE_URL}/sessions/{session_id}/messages/{messages[0]['id']}",
        headers=HEADERS,
        json=update,
    )

Padrões-chave

PadrãoImplementação
LockingLock Redis em lock:whatsapp:{phone}:{phone_number_id}, timeout de 30s
Busca de sessãoBusca por contact_id + metadata whatsapp_phone_number_id
MídiaDownload via Graph API → re-hospedagem → URL pública
Tipos não suportadosConvertidos para placeholder de texto
Debounce5 segundos (agrupa mensagens rápidas de chat)
Rastreamento de statusFluxo completo: pending → sent → delivered → read / failed
Comando de resetMensagem de texto /reset encerra a sessão e envia confirmação

On this page