Antonnia Docs

Padrão de Integração Helpdesk

Padrão real para uma integração de canal helpdesk/ticketing

Este exemplo mostra o padrão de integração utilizado pelo handler de HubSpot em produção da Antonnia. Use-o como referência para construir integrações de helpdesk ou ticketing (criação de sessão em múltiplas etapas, ciclo de vida de tickets, atribuição de agente baseada em cobertura).

Arquitetura

Customer ←→ HubSpot Conversations ←→ Your Service ←→ Antonnia API

Integrações de helpdesk diferem de canais de mensagens diretas em vários aspectos:

  • Metadata mais rico — tickets, threads, pipelines, IDs de portal
  • Locking em dois níveis — locks separados para dados do canal e criação de sessão
  • Atribuição de agente baseada em cobertura — divisão probabilística entre agentes IA e humanos
  • Ciclo de vida do ticket — fechar tickets quando sessões são finalizadas

Metadata da sessão

Armazene as informações de roteamento do helpdesk no metadata da sessão:

{
  "hubspot_portal_id": "portal_123",
  "hubspot_channel_id": "channel_456",
  "hubspot_channel_account_id": "account_001",
  "hubspot_thread_id": "thread_789",
  "hubspot_ticket_id": "ticket_012",
  "hubspot_sender_id": "sender_345",
  "hubspot_sender_name": "Jane Doe"
}

Handler de entrada

Receber evento do helpdesk

Plataformas de helpdesk geralmente enviam eventos para novas mensagens em threads ou tickets:

@app.post("/webhook/helpdesk")
async def handle_helpdesk_event(request: Request):
    event = await request.json()

    if event["type"] == "new_message":
        await process_new_message(
            channel_id=event["channel_id"],
            thread_id=event["thread_id"],
            message_id=event["message_id"],
        )

Locking em dois níveis

Integrações de helpdesk frequentemente precisam buscar credenciais do canal antes de processar. Use um lock curto para a busca do canal e um lock mais longo para a criação da sessão:

async def process_new_message(channel_id, thread_id, message_id):
    # Level 1: Lock channel data fetch (short timeout)
    async with redis_lock(f"lock:helpdesk:{channel_id}", timeout=10):
        channel = await get_channel_config(channel_id)
        credentials = await get_credentials(channel)

    # Level 2: Lock session creation (longer timeout)
    sender_id = await get_sender_id(thread_id, credentials)
    async with redis_lock(f"lock:helpdesk:{channel['account_id']}:{sender_id}", timeout=60):
        session = await get_or_create_session(channel, sender_id, thread_id)
        await create_messages(session, thread_id, message_id, credentials)

Atribuição de agente baseada em cobertura

Em vez de sempre atribuir um agente IA, use uma porcentagem de cobertura para dividir entre atendimento IA e humano:

import random

def should_assign_ai(coverage: float) -> bool:
    """coverage is 0.0 to 1.0 — probability of AI assignment"""
    return random.uniform(0, 1) < coverage

async def get_or_create_session(channel, sender_id, thread_id):
    sessions = httpx.post(f"{BASE_URL}/sessions/search", headers=HEADERS, json={
        "contact_id": sender_id,
        "status": "open",
        "metadata": {"helpdesk_channel_account_id": channel["account_id"]},
    }).json()

    if sessions:
        return sessions[0]

    # Assign AI agent based on coverage percentage
    agent_id = channel["ai_agent_id"] if should_assign_ai(channel["coverage"]) else None

    return httpx.post(f"{BASE_URL}/sessions", headers=HEADERS, json={
        "contact_id": sender_id,
        "contact_name": sender_name,
        "agent_id": agent_id,
        "metadata": {
            "helpdesk_portal_id": channel["portal_id"],
            "helpdesk_channel_id": channel["id"],
            "helpdesk_channel_account_id": channel["account_id"],
            "helpdesk_thread_id": thread_id,
            "helpdesk_ticket_id": ticket_id,
            "helpdesk_sender_id": sender_id,
        },
    }).json()

A atribuição baseada em cobertura permite uma implantação gradual da IA. Comece com 10% de cobertura, monitore a qualidade e depois aumente. Tanto as integrações de produção do HubSpot quanto do Syngoo utilizam este padrão.

Tratar múltiplos anexos

Mensagens de helpdesk frequentemente incluem múltiplos anexos. Crie uma mensagem separada na Antonnia para cada um:

async def create_messages(session, thread_id, message_id, credentials):
    msg = await helpdesk_api.get_message(message_id, credentials)

    # Process attachments
    for attachment in msg.get("attachments", []):
        if attachment["type"] == "IMAGE":
            content = {"type": "image", "url": attachment["url"]}
        elif attachment["type"] == "VOICE_RECORDING":
            content = {"type": "audio", "url": attachment["url"]}
        else:
            continue  # Skip unsupported types

        httpx.post(
            f"{BASE_URL}/sessions/{session['id']}/messages",
            headers=HEADERS,
            json={"content": content, "role": "user", "provider_message_id": message_id},
        )

    # Process text content
    if msg.get("text"):
        httpx.post(
            f"{BASE_URL}/sessions/{session['id']}/messages",
            headers=HEADERS,
            json={
                "content": {"type": "text", "text": msg["text"]},
                "role": "user",
                "provider_message_id": message_id,
            },
        )

    # Trigger AI reply if applicable
    if session["agent"] and session["agent"]["type"] == "ai":
        httpx.post(
            f"{BASE_URL}/sessions/{session['id']}/reply",
            headers=HEADERS,
            json={"debounce_time": 0},  # No debounce for async channels
        )

Handler de saída

@app.post("/webhook/antonnia")
async def handle_antonnia_event(request: Request):
    event = await request.json()
    if event["type"] != "message.created":
        return

    message = event["data"]["object"]
    if message["role"] != "assistant":
        return
    if message["content"]["type"] in ("function_call", "function_result", "thought"):
        return

    # Only text is supported for helpdesk channels
    if message["content"]["type"] != "text":
        return

    session = httpx.get(
        f"{BASE_URL}/sessions/{message['session_id']}",
        headers=HEADERS,
    ).json()

    # Send via helpdesk API
    helpdesk_msg_id = await helpdesk_api.send_message(
        thread_id=session["metadata"]["helpdesk_thread_id"],
        channel_account_id=session["metadata"]["helpdesk_channel_account_id"],
        text=message["content"]["text"],
    )

    # Update delivery status
    httpx.patch(
        f"{BASE_URL}/sessions/{message['session_id']}/messages/{message['id']}",
        headers=HEADERS,
        json={"provider_message_id": helpdesk_msg_id, "delivery_status": "sent"},
    )

Ciclo de vida sessão-para-ticket

Feche o ticket do helpdesk quando a sessão da Antonnia for finalizada:

@app.post("/webhook/antonnia")
async def handle_antonnia_event(request: Request):
    event = await request.json()

    if event["type"] == "session.finished":
        session = event["data"]["object"]
        ticket_id = session["metadata"].get("helpdesk_ticket_id")
        if ticket_id:
            await helpdesk_api.close_ticket(ticket_id)

Padrões-chave

PadrãoImplementação
LockingDois níveis: canal (10s) + sessão (60s)
Atribuição de agenteDivisão probabilística baseada em cobertura
Busca de sessãoBusca por contact_id + metadata helpdesk_channel_account_id
MídiaAnexos processados individualmente; saída somente texto
Debounce0 segundos (canais assíncronos respondem imediatamente)
Ciclo de vida do ticketFechar ticket do helpdesk no evento session.finished
Estratégia de retryBackoff exponencial para buscas de ticket (consistência eventual)

Diferenças em relação ao padrão WhatsApp

AspectoWhatsAppHelpdesk
RoteamentoNúmero de telefoneThread ID + Channel account
Níveis de lock1 (sessão)2 (canal + sessão)
Atribuição de agenteSempre IADivisão baseada em cobertura
Mídia de saídaImagens, áudio, arquivosSomente texto
Debounce5 segundos0 segundos
Efeito colateral no encerramento da sessãoNenhumFechar ticket

On this page