Padrão de Integração Helpdesk
Padrão real para uma integração de canal helpdesk/ticketing
Este exemplo mostra o padrão de integração utilizado pelo handler de HubSpot em produção da Antonnia. Use-o como referência para construir integrações de helpdesk ou ticketing (criação de sessão em múltiplas etapas, ciclo de vida de tickets, atribuição de agente baseada em cobertura).
Arquitetura
Customer ←→ HubSpot Conversations ←→ Your Service ←→ Antonnia APIIntegrações de helpdesk diferem de canais de mensagens diretas em vários aspectos:
- Metadata mais rico — tickets, threads, pipelines, IDs de portal
- Locking em dois níveis — locks separados para dados do canal e criação de sessão
- Atribuição de agente baseada em cobertura — divisão probabilística entre agentes IA e humanos
- Ciclo de vida do ticket — fechar tickets quando sessões são finalizadas
Metadata da sessão
Armazene as informações de roteamento do helpdesk no metadata da sessão:
{
"hubspot_portal_id": "portal_123",
"hubspot_channel_id": "channel_456",
"hubspot_channel_account_id": "account_001",
"hubspot_thread_id": "thread_789",
"hubspot_ticket_id": "ticket_012",
"hubspot_sender_id": "sender_345",
"hubspot_sender_name": "Jane Doe"
}Handler de entrada
Receber evento do helpdesk
Plataformas de helpdesk geralmente enviam eventos para novas mensagens em threads ou tickets:
@app.post("/webhook/helpdesk")
async def handle_helpdesk_event(request: Request):
event = await request.json()
if event["type"] == "new_message":
await process_new_message(
channel_id=event["channel_id"],
thread_id=event["thread_id"],
message_id=event["message_id"],
)Locking em dois níveis
Integrações de helpdesk frequentemente precisam buscar credenciais do canal antes de processar. Use um lock curto para a busca do canal e um lock mais longo para a criação da sessão:
async def process_new_message(channel_id, thread_id, message_id):
# Level 1: Lock channel data fetch (short timeout)
async with redis_lock(f"lock:helpdesk:{channel_id}", timeout=10):
channel = await get_channel_config(channel_id)
credentials = await get_credentials(channel)
# Level 2: Lock session creation (longer timeout)
sender_id = await get_sender_id(thread_id, credentials)
async with redis_lock(f"lock:helpdesk:{channel['account_id']}:{sender_id}", timeout=60):
session = await get_or_create_session(channel, sender_id, thread_id)
await create_messages(session, thread_id, message_id, credentials)Atribuição de agente baseada em cobertura
Em vez de sempre atribuir um agente IA, use uma porcentagem de cobertura para dividir entre atendimento IA e humano:
import random
def should_assign_ai(coverage: float) -> bool:
"""coverage is 0.0 to 1.0 — probability of AI assignment"""
return random.uniform(0, 1) < coverage
async def get_or_create_session(channel, sender_id, thread_id):
sessions = httpx.post(f"{BASE_URL}/sessions/search", headers=HEADERS, json={
"contact_id": sender_id,
"status": "open",
"metadata": {"helpdesk_channel_account_id": channel["account_id"]},
}).json()
if sessions:
return sessions[0]
# Assign AI agent based on coverage percentage
agent_id = channel["ai_agent_id"] if should_assign_ai(channel["coverage"]) else None
return httpx.post(f"{BASE_URL}/sessions", headers=HEADERS, json={
"contact_id": sender_id,
"contact_name": sender_name,
"agent_id": agent_id,
"metadata": {
"helpdesk_portal_id": channel["portal_id"],
"helpdesk_channel_id": channel["id"],
"helpdesk_channel_account_id": channel["account_id"],
"helpdesk_thread_id": thread_id,
"helpdesk_ticket_id": ticket_id,
"helpdesk_sender_id": sender_id,
},
}).json()A atribuição baseada em cobertura permite uma implantação gradual da IA. Comece com 10% de cobertura, monitore a qualidade e depois aumente. Tanto as integrações de produção do HubSpot quanto do Syngoo utilizam este padrão.
Tratar múltiplos anexos
Mensagens de helpdesk frequentemente incluem múltiplos anexos. Crie uma mensagem separada na Antonnia para cada um:
async def create_messages(session, thread_id, message_id, credentials):
msg = await helpdesk_api.get_message(message_id, credentials)
# Process attachments
for attachment in msg.get("attachments", []):
if attachment["type"] == "IMAGE":
content = {"type": "image", "url": attachment["url"]}
elif attachment["type"] == "VOICE_RECORDING":
content = {"type": "audio", "url": attachment["url"]}
else:
continue # Skip unsupported types
httpx.post(
f"{BASE_URL}/sessions/{session['id']}/messages",
headers=HEADERS,
json={"content": content, "role": "user", "provider_message_id": message_id},
)
# Process text content
if msg.get("text"):
httpx.post(
f"{BASE_URL}/sessions/{session['id']}/messages",
headers=HEADERS,
json={
"content": {"type": "text", "text": msg["text"]},
"role": "user",
"provider_message_id": message_id,
},
)
# Trigger AI reply if applicable
if session["agent"] and session["agent"]["type"] == "ai":
httpx.post(
f"{BASE_URL}/sessions/{session['id']}/reply",
headers=HEADERS,
json={"debounce_time": 0}, # No debounce for async channels
)Handler de saída
@app.post("/webhook/antonnia")
async def handle_antonnia_event(request: Request):
event = await request.json()
if event["type"] != "message.created":
return
message = event["data"]["object"]
if message["role"] != "assistant":
return
if message["content"]["type"] in ("function_call", "function_result", "thought"):
return
# Only text is supported for helpdesk channels
if message["content"]["type"] != "text":
return
session = httpx.get(
f"{BASE_URL}/sessions/{message['session_id']}",
headers=HEADERS,
).json()
# Send via helpdesk API
helpdesk_msg_id = await helpdesk_api.send_message(
thread_id=session["metadata"]["helpdesk_thread_id"],
channel_account_id=session["metadata"]["helpdesk_channel_account_id"],
text=message["content"]["text"],
)
# Update delivery status
httpx.patch(
f"{BASE_URL}/sessions/{message['session_id']}/messages/{message['id']}",
headers=HEADERS,
json={"provider_message_id": helpdesk_msg_id, "delivery_status": "sent"},
)Ciclo de vida sessão-para-ticket
Feche o ticket do helpdesk quando a sessão da Antonnia for finalizada:
@app.post("/webhook/antonnia")
async def handle_antonnia_event(request: Request):
event = await request.json()
if event["type"] == "session.finished":
session = event["data"]["object"]
ticket_id = session["metadata"].get("helpdesk_ticket_id")
if ticket_id:
await helpdesk_api.close_ticket(ticket_id)Padrões-chave
| Padrão | Implementação |
|---|---|
| Locking | Dois níveis: canal (10s) + sessão (60s) |
| Atribuição de agente | Divisão probabilística baseada em cobertura |
| Busca de sessão | Busca por contact_id + metadata helpdesk_channel_account_id |
| Mídia | Anexos processados individualmente; saída somente texto |
| Debounce | 0 segundos (canais assíncronos respondem imediatamente) |
| Ciclo de vida do ticket | Fechar ticket do helpdesk no evento session.finished |
| Estratégia de retry | Backoff exponencial para buscas de ticket (consistência eventual) |
Diferenças em relação ao padrão WhatsApp
| Aspecto | Helpdesk | |
|---|---|---|
| Roteamento | Número de telefone | Thread ID + Channel account |
| Níveis de lock | 1 (sessão) | 2 (canal + sessão) |
| Atribuição de agente | Sempre IA | Divisão baseada em cobertura |
| Mídia de saída | Imagens, áudio, arquivos | Somente texto |
| Debounce | 5 segundos | 0 segundos |
| Efeito colateral no encerramento da sessão | Nenhum | Fechar ticket |