Construindo uma Integração de Canal
Guia completo para conectar um canal de mensagens à Antonnia
Este guia orienta você na construção de uma integração de canal completa — desde o recebimento de mensagens de usuários até a entrega de respostas da IA. Todas as integrações de produção da Antonnia (WhatsApp, HubSpot, ZAPI, Syngoo) seguem exatamente este padrão.
Arquitetura
Sua integração de canal fica entre sua plataforma de mensagens e a API de Conversas da Antonnia:
Usuário Final ←→ API do Seu Canal ←→ Sua Integração ←→ API da AntonniaSua integração lida com dois fluxos:
- Entrada — Usuário envia uma mensagem → você encaminha para a Antonnia → a IA processa
- Saída — IA gera uma resposta → Antonnia dispara um webhook → você entrega pelo seu canal
Fluxo de entrada
Receba a mensagem do seu canal
A API do seu canal entrega mensagens recebidas ao seu endpoint de webhook. Extraia a identidade do remetente, o conteúdo da mensagem e quaisquer IDs específicos do canal.
@app.post("/webhook/incoming")
async def handle_incoming(request: Request):
event = await request.json()
sender_id = event["from"]
message_text = event["text"]
channel_msg_id = event["message_id"]
instance_id = event["instance_id"]Adquira um lock distribuído
Múltiplas mensagens do mesmo usuário podem chegar simultaneamente. Use um lock distribuído para evitar a criação de sessões duplicadas.
async with redis_lock(f"lock:mychannel:{sender_id}:{instance_id}", timeout=30):
session = await get_or_create_session(sender_id, instance_id)
# ... processar mensagemPadrão da chave do lock: lock:{channel}:{contact_id}:{channel_instance_id}
O lock deve incluir tanto o contato quanto a instância do canal para evitar colisões entre canais. Use um timeout de 30 segundos.
Encontre ou crie uma sessão
Busque por uma sessão aberta existente. Se não existir, crie uma nova.
async def get_or_create_session(sender_id: str, instance_id: str):
# Search for existing open session
sessions = httpx.post(f"{BASE_URL}/sessions/search", headers=HEADERS, json={
"contact_id": sender_id,
"status": "open",
"metadata": {"mychannel_instance_id": instance_id},
}).json()
if sessions:
return sessions[0]
# Create new session
return httpx.post(f"{BASE_URL}/sessions", headers=HEADERS, json={
"contact_id": sender_id,
"contact_name": sender_name,
"agent_id": MY_AGENT_ID,
"metadata": {
"mychannel_instance_id": instance_id,
"mychannel_user_id": sender_id,
},
}).json()Sempre filtre por metadata (ex.: mychannel_instance_id) além do contact_id. Isso garante que um contato possa ter sessões separadas em diferentes instâncias de canal.
Crie a mensagem na Antonnia
message = httpx.post(
f"{BASE_URL}/sessions/{session['id']}/messages",
headers=HEADERS,
json={
"content": {"type": "text", "text": message_text},
"role": "user",
"provider_message_id": channel_msg_id,
},
).json()Dispare a resposta da IA
if session["agent"] and session["agent"]["type"] == "ai":
httpx.post(
f"{BASE_URL}/sessions/{session['id']}/reply",
headers=HEADERS,
json={"debounce_time": 5},
)O debounce_time (em segundos) agrupa mensagens sequenciais rápidas em um único turno da IA. Use 3 a 5 segundos para canais de chat.
Fluxo de saída
Receba o evento do webhook
Registre um endpoint de webhook para receber eventos message.created:
@app.post("/webhook/antonnia")
async def handle_antonnia_event(request: Request):
event = await request.json()
if event["type"] == "message.created":
await handle_outbound_message(event["data"]["object"])Filtre e direcione a mensagem
async def handle_outbound_message(message: dict):
# Only process assistant messages
if message["role"] != "assistant":
return
# Skip internal AI content types
if message["content"]["type"] in ("function_call", "function_result", "thought"):
return
# Get session for routing metadata
session = httpx.get(
f"{BASE_URL}/sessions/{message['session_id']}",
headers=HEADERS,
).json()
instance_id = session["metadata"]["mychannel_instance_id"]
user_id = session["metadata"]["mychannel_user_id"]Entregue pelo seu canal
# Send via your channel's API
content = message["content"]
if content["type"] == "text":
channel_msg_id = await my_channel_api.send_text(instance_id, user_id, content["text"])
elif content["type"] == "image":
channel_msg_id = await my_channel_api.send_image(instance_id, user_id, content["url"])
# ... handle other content typesAtualize o status de entrega
# Report back to Antonnia
httpx.patch(
f"{BASE_URL}/sessions/{message['session_id']}/messages/{message['id']}",
headers=HEADERS,
json={
"provider_message_id": channel_msg_id,
"delivery_status": "sent",
},
)Exemplo completo
import httpx
import redis.asyncio as redis
BASE_URL = "https://services.antonnia.com/conversations/v2/api/v1"
HEADERS = {"Authorization": "Bearer sk_live_YOUR_TOKEN"}
AGENT_ID = "agent_abc"
redis_client = redis.Redis()
async def lock(key: str, timeout: int = 30):
return redis_client.lock(key, timeout=timeout)
# Inbound: channel → Antonnia
@app.post("/webhook/incoming")
async def handle_incoming(request: Request):
event = await request.json()
sender_id = event["from"]
instance_id = event["instance_id"]
async with await lock(f"lock:mychannel:{sender_id}:{instance_id}"):
session = await get_or_create_session(sender_id, instance_id)
httpx.post(
f"{BASE_URL}/sessions/{session['id']}/messages",
headers=HEADERS,
json={
"content": {"type": "text", "text": event["text"]},
"role": "user",
"provider_message_id": event["message_id"],
},
)
if session.get("agent", {}).get("type") == "ai":
httpx.post(
f"{BASE_URL}/sessions/{session['id']}/reply",
headers=HEADERS,
json={"debounce_time": 5},
)
# Outbound: Antonnia → channel
@app.post("/webhook/antonnia")
async def handle_antonnia_event(request: Request):
event = await request.json()
if event["type"] != "message.created":
return
message = event["data"]["object"]
if message["role"] != "assistant":
return
if message["content"]["type"] in ("function_call", "function_result", "thought"):
return
session = httpx.get(
f"{BASE_URL}/sessions/{message['session_id']}",
headers=HEADERS,
).json()
channel_msg_id = await my_channel_api.send(
session["metadata"]["mychannel_instance_id"],
session["metadata"]["mychannel_user_id"],
message["content"],
)
httpx.patch(
f"{BASE_URL}/sessions/{message['session_id']}/messages/{message['id']}",
headers=HEADERS,
json={"provider_message_id": channel_msg_id, "delivery_status": "sent"},
)Próximos passos
- Mensagens de Entrada — guia detalhado para receber mensagens
- Mensagens de Saída — guia detalhado para entregar mensagens
- Tratamento de Erros — lidando com falhas de forma elegante