Skip to main content

Knowledge > Products > Voice Agent > Architecture > Voice Turn Processing Pipeline

Voice Turn Processing Pipeline

The inner loop of every voice conversation turn. The TurnProcessor wraps the LiveKit Agents SDK's LlmAgent and intercepts every event before/after it reaches the LLM.


Architecture

Caller speaks
|
v
Deepgram STT (livekit-plugins-deepgram) → text
|
v
Agent.on_user_turn_completed(turn_ctx, new_message)
| - moderation.py: check_crisis / check_threat / check_abuse
| - call_handler.py: should_filter / is_are_you_there
| - core/rag.py: fetch_turn_rag
v
LLM processes (Gemini 2.5 Flash or Claude Haiku 4.5)
|
v
Response text → Cartesia Sonic TTS (livekit-plugins-cartesia) → audio
|
v
Caller hears response

In the LiveKit Agents SDK, the turn processing pipeline is implemented via:

  • on_user_turn_completed() callback on each Agent class (in verticals/church/agents.py)
  • call_handler.py utility functions for noise filtering, farewell detection, and "are you there?" handling
  • moderation.py for crisis/threat/abuse checks

There is no separate TurnProcessor class in the LiveKit version.


Pipeline per UserTextSent Event

Step 0: Session Injection

_inject_session_into_env(env):
env.supabase = self.supabase
env.church_id = session["church_id"]
env.church_data = session["church_data"]
env.caller_phone = session["caller_phone"]
env.pco_app_id = church_data.get("pco_app_id")
env.cal_event_type_id = church_data.get("cal_event_type_id")
env.church_timezone = church_data.get("church_timezone") or "America/New_York"

Cancel any pending farewell timer:
session["farewell_pending"] = False

Step 1: "Are you there?" Reassurance

IF session["is_processing"] AND is_are_you_there(text):
# Caller is checking in during a long tool call
# Regex matches: "hello", "are you there", "are you still there",
# "you there", "anybody there", "anyone there"
YIELD AgentSendText("Yes, I'm here! Just one more moment.")
RETURN # Do NOT cancel pending LLM work, do NOT forward to LLM

Step 2: Moderation Checks

Three checks run in priority order. Moderation ALWAYS runs before noise filtering to prevent silently dropping safety-critical utterances.

CHECK A: THREAT DETECTION (check_threat)
Regex: "kill him/her/you/them", "gonna shoot/bomb",
"bring a gun", "shoot up", "blow up", etc.
GUARDS:
- Negation exclusion: "I'm NOT going to kill anyone" → no match
- Self-harm exclusion: "kill myself" → routes to crisis, not threat
IF MATCHED:
Log moderation_violations (type="threat", severity=0.9)
Send notifications (email + SMS + support alert) — parallel, fire-and-forget
YIELD AgentSendText("I need to stop you right there. This call is
being recorded and logged. I'm ending this call now. If you or
someone else is in danger, please call nine one one.")
await sleep(4) # Let TTS play fully
YIELD AgentEndCall()
RETURN

CHECK B: CRISIS DETECTION (check_crisis)
Regex patterns covering 6 categories:
- Direct self-harm: "don't want to be alive", "take my life"
- Hopelessness: "what's the point", "can't do this anymore"
- C-SSRS Q1: "wish I were dead", "wish I could go to sleep and not wake up"
- Elderly coded: "tired of living", "lived long enough"
- Religious coded: "going home to the Lord", "ready to meet my maker"
- Farewell signals: "giving away my things", "said my goodbyes"
- Stems: "suicid*", "self-harm*"
CONTEXT-AWARE: "ready to go to church/service/work/home" → NOT crisis
IF MATCHED:
session["crisis_detected"] = True
Log moderation_violations (type="crisis", severity=0.95)
Send crisis alerts (email + SMS + support) — parallel
context = "CRITICAL: Caller may be in crisis. Provide 988 Suicide
and Crisis Lifeline immediately. Say: 'I hear you. Please call
or text nine eight eight right now.' Then stop talking and listen.
Do NOT ask clarifying questions. Do NOT end the call."
# Falls through to LLM with this injected context

CHECK C: ABUSE DETECTION (check_abuse)
Regex: "fuck you", "go fuck", "piece of shit", "kill yourself",
"you stupid/dumb/useless/worthless"
ESCALATION:
abuse_count == 0: session["abuse_count"] = 1, return "warning"
→ context = "Caller used inappropriate language. Respond calmly and redirect."
abuse_count >= 1: return "end_call"
→ YIELD AgentSendText("I'm going to end this call now. Have a good day.")
→ await sleep(2)
→ YIELD AgentEndCall()
→ RETURN

Step 3: Noise Filtering

Only runs if moderation did NOT flag the utterance. Prevents trivial sounds from triggering LLM calls.

FUNCTION should_filter(text, agent_asked_question) -> bool

WORD TAXONOMY:
NOISE_SOUNDS (always drop): um, uh, hmm, mm, ah, er
PURE_BACKCHANNELS (always drop): uh huh, mm hmm, mhm, i see
CONTEXT_DEPENDENT (conditional):
ok, yeah, yes, sure, good, great, nice, perfect, got it
→ DROP if agent did NOT ask a question
→ PASS THROUGH if agent DID ask a question (valid answers)
FLOOR_TAKES (always pass): wait, stop, no, actually, hold on, excuse me
NEVER_FILTER (always pass): thanks, thank you, bye, goodbye

IF text after normalization matches NOISE_SOUNDS or PURE_BACKCHANNELS:
RETURN # Drop silently — no LLM call
IF CONTEXT_DEPENDENT and not agent_asked_question:
RETURN # Drop silently

Step 4: Per-Turn RAG

Fetches church-specific knowledge for the caller's question. Church KB only (not theological) to keep latency under perception threshold.

IF supabase connected AND no moderation context:
rag_context = await fetch_turn_rag(supabase, church_id, text)

FUNCTION fetch_turn_rag:
IF text < 10 characters: RETURN ""
TIMEOUT: 500ms hard limit via asyncio.wait_for()
→ If Supabase is slow, skip RAG for this turn (empty string)
→ Call continues uninterrupted

INNER LOGIC:
embedding = await generate_embedding(text) # text-embedding-3-small
IF no embedding: RETURN ""
results = await search_church_knowledge(
church_id, embedding,
match_count=5,
match_threshold=0.4 # Stricter than session init (0.35)
)
RETURN format_church_knowledge_context(results)
# Format: --- Church Knowledge Base ---
# [1] "Title" (FAQ/Document)
# Snippet up to 600 chars...
# --- End Church Knowledge Base ---

Step 5: Combine Contexts

combined = join_non_empty([moderation_context, rag_context])
# Examples:
# Crisis + no RAG: "CRITICAL: Caller may be in crisis..."
# No moderation + RAG: "--- Church Knowledge Base ---\n[1] ..."
# No moderation + no RAG: None

Step 6: LLM Processing with Tool Filler

session["is_processing"] = True

async for output in agent.process(env, event, context=combined):

IF output is AgentToolCalled AND no filler sent yet:
tool_name = output.tool_name
IF tool_name not in SKIP_FILLER_TOOLS ("end_call", "demo_agent"):
YIELD AgentSendText(random.choice(TOOL_FILLERS))
# Fillers: "One moment.", "Let me check on that.",
# "Give me one sec.", "Sure, let me look that up.",
# "Hang on, let me find that for you.", "Un momento!"
filler_sent = True

Track output:
IF AgentSendText: save last_agent_response, check for "?" (question tracking)
YIELD output

session["is_processing"] = False

Step 7: Farewell Detection and Auto-Hangup

IF NOT session["crisis_detected"]: # NEVER auto-hangup during crisis
IF is_mutual_farewell(last_agent_response, caller_text):
# Agent said farewell: "take care", "have a blessed", "goodbye", etc.
# AND caller said farewell: "bye", "that's all", "no I'm good",
# or short "thank you" (<=6 words)

session["farewell_pending"] = True
await sleep(4) # Grace period — let farewell TTS finish
IF session["farewell_pending"]: # Not cancelled by new speech
YIELD AgentEndCall()

CallEnded Event Handling

When the call ends (either party hangs up), the TurnProcessor finalizes the call log.

FUNCTION _handle_call_ended:
IF supabase AND session has call_id:
1. Serialize conversation history to transcript list
(model_dump() for Pydantic objects, str() fallback)
2. update_call_log_end(call_id, transcript, duration, tool_results)
3. Generate call classification (fire-and-forget):
POST to Gemini 2.5 Flash API with transcript text
Parse 7-field response:
SUMMARY, SENTIMENT, TOPICS, CATEGORY, URGENCY, FOLLOW_UP, ASSIGNEE
UPDATE voice_call_logs with classification fields