Sergei 5ecb5f9683 Fix Julia AI voice: use SINGLE_DEPLOYMENT_MODE for Lite
- livekitService.ts: send empty beneficiaryNamesDict in Lite mode
- agent.py: handle None beneficiary_names_dict correctly
- chat.tsx: align text chat with same SINGLE_DEPLOYMENT_MODE flag

This fixes Julia saying "I didn't get the name of beneficiary"
by letting WellNuo API use the default beneficiary for deployment_id.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-22 16:49:55 -08:00

447 lines
15 KiB
Python

"""
WellNuo Voice Agent - Julia AI
LiveKit Agents Cloud deployment
Uses WellNuo ask_wellnuo_ai API for LLM responses, Deepgram for STT/TTS
"""
import json
import logging
import os
import random
import re
import aiohttp
from livekit.agents import (
Agent,
AgentSession,
JobContext,
JobProcess,
RoomInputOptions,
WorkerOptions,
cli,
llm,
)
from livekit.agents.types import DEFAULT_API_CONNECT_OPTIONS, APIConnectOptions
from livekit.plugins import deepgram, noise_cancellation, silero
logger = logging.getLogger("julia-ai")
# WellNuo API Configuration
WELLNUO_API_URL = "https://eluxnetworks.net/function/well-api/api"
WELLNUO_USER = os.getenv("WELLNUO_USER", "anandk")
WELLNUO_PASSWORD = os.getenv("WELLNUO_PASSWORD", "anandk_8")
# Hardcoded Ferdinand's deployment_id for testing
DEPLOYMENT_ID = os.getenv("DEPLOYMENT_ID", "21")
# Keywords that indicate user is asking about the care recipient's status
STATUS_KEYWORDS = [
r"\bhow\s+is\b",
r"\bhow\'?s\b",
r"\bhow\s+are\b",
r"\btell\s+me\s+about\b",
r"\bwhat\'?s\s+up\s+with\b",
r"\bupdate\s+on\b",
r"\bstatus\b",
r"\bdoing\b",
r"\bfeeling\b",
r"\bcheck\s+on\b",
r"\bis\s+\w+\s+okay\b",
r"\bis\s+\w+\s+alright\b",
r"\bis\s+\w+\s+fine\b",
r"\bokay\?\b",
r"\balright\?\b",
]
# Keywords that indicate the subject is the care recipient
SUBJECT_KEYWORDS = [
r"\bdad\b",
r"\bfather\b",
r"\bferdinand\b",
r"\bhim\b",
r"\bhe\b",
r"\bmy\s+dad\b",
r"\bmy\s+father\b",
r"\bthe\s+patient\b",
r"\bloved\s+one\b",
r"\bparent\b",
r"\bgrandpa\b",
r"\bgrandfather\b",
]
# Sleep-related keywords
SLEEP_KEYWORDS = [
r"\bsleep\b",
r"\bslept\b",
r"\brest\b",
r"\bnight\b",
r"\bwake\b",
r"\bwoke\b",
r"\bbedroom\b",
r"\bbathroom\b",
]
# Activity-related keywords
ACTIVITY_KEYWORDS = [
r"\bactivity\b",
r"\bactive\b",
r"\bmoving\b",
r"\bwhere\s+is\b",
r"\blocation\b",
r"\broom\b",
r"\bkitchen\b",
r"\bliving\b",
]
def normalize_question(user_message: str) -> str:
"""
Transform user questions into format WellNuo API understands.
WellNuo API only responds with real sensor data for very specific phrases.
This function maps common user questions to those phrases.
"""
msg_lower = user_message.lower().strip()
# Check if asking about status/wellbeing
is_status_query = any(re.search(p, msg_lower) for p in STATUS_KEYWORDS)
is_about_recipient = any(re.search(p, msg_lower) for p in SUBJECT_KEYWORDS)
is_sleep_query = any(re.search(p, msg_lower) for p in SLEEP_KEYWORDS)
is_activity_query = any(re.search(p, msg_lower) for p in ACTIVITY_KEYWORDS)
# If asking about the care recipient's general status
if is_status_query and is_about_recipient:
logger.info(f"Normalized '{user_message}' -> 'how is dad doing'")
return "how is dad doing"
# If asking about sleep specifically
if is_sleep_query and (is_about_recipient or is_status_query):
logger.info(f"Normalized '{user_message}' -> 'how did dad sleep'")
# Note: This might still not work with WellNuo API, but we try
return "how is dad doing" # Fall back to general status which includes sleep
# If asking about activity/location
if is_activity_query and (is_about_recipient or is_status_query):
logger.info(f"Normalized '{user_message}' -> 'how is dad doing'")
return "how is dad doing" # General status includes location
# Generic status questions without clear subject
if is_status_query and not is_about_recipient:
# User might say "how is he" or "how are things"
# Assume they mean the care recipient
logger.info(
f"Normalized '{user_message}' -> 'how is dad doing' (assumed recipient)"
)
return "how is dad doing"
# If no transformation needed, return original
logger.info(f"No normalization applied to: '{user_message}'")
return user_message
class WellNuoLLM(llm.LLM):
"""Custom LLM that uses WellNuo ask_wellnuo_ai API."""
def __init__(
self,
deployment_id: str | None = None,
beneficiary_names_dict: dict | None = None,
):
super().__init__()
self._token = None
self._model_name = "wellnuo-voice-ask"
# Dynamic values from participant metadata (or fallback to env/defaults)
self._deployment_id = deployment_id or DEPLOYMENT_ID
# SINGLE_DEPLOYMENT_MODE: if beneficiary_names_dict is empty or None,
# WellNuo API will automatically use the beneficiary name for this deployment_id
# This is the Lite mode - we don't need to pass the names dict
self._beneficiary_names_dict = beneficiary_names_dict if beneficiary_names_dict else None
@property
def model(self) -> str:
return self._model_name
@property
def provider(self) -> str:
return "wellnuo"
async def _ensure_token(self) -> str:
"""Get authentication token from WellNuo API."""
if self._token:
return self._token
async with aiohttp.ClientSession() as session:
nonce = str(random.randint(0, 999999))
data = {
"function": "credentials",
"clientId": "MA_001",
"user_name": WELLNUO_USER,
"ps": WELLNUO_PASSWORD,
"nonce": nonce,
}
async with session.post(WELLNUO_API_URL, data=data) as resp:
result = await resp.json()
if result.get("status") == "200 OK":
self._token = result.get("access_token")
logger.info("WellNuo token obtained successfully")
return self._token
else:
logger.error(f"Failed to get WellNuo token: {result}")
raise Exception("Failed to authenticate with WellNuo API")
async def get_response(self, user_message: str) -> str:
"""Call WellNuo ask_wellnuo_ai API and return response."""
if not user_message:
return "I'm here to help. What would you like to know?"
logger.info(f"User question (original): {user_message}")
# Normalize question to format WellNuo API understands
normalized_question = normalize_question(user_message)
try:
token = await self._ensure_token()
async with aiohttp.ClientSession() as session:
# Using ask_wellnuo_ai - latency ~1-2s after warmup
# Provides more comprehensive responses than voice_ask
data = {
"function": "ask_wellnuo_ai",
"clientId": "MA_001",
"user_name": WELLNUO_USER,
"token": token,
"question": normalized_question,
"deployment_id": self._deployment_id,
}
# Add beneficiary_names_dict ONLY if it's not empty
# In SINGLE_DEPLOYMENT_MODE (Lite app), we don't send names dict
# WellNuo API will use the beneficiary name for this deployment_id
if self._beneficiary_names_dict:
data["beneficiary_names_dict"] = json.dumps(
self._beneficiary_names_dict
)
logger.info(
f"Full mode: Using beneficiary_names_dict: {self._beneficiary_names_dict}"
)
else:
logger.info(
f"Single deployment mode: deployment_id={self._deployment_id}, no beneficiary_names_dict"
)
async with session.post(WELLNUO_API_URL, data=data) as resp:
result = await resp.json()
if result.get("ok"):
response_body = result.get("response", {}).get("body", "")
logger.info(f"WellNuo response: {response_body}")
return response_body
else:
logger.error(f"WellNuo API error: {result}")
return "I'm sorry, I couldn't get that information right now."
except Exception as e:
logger.error(f"Error calling WellNuo API: {e}")
return "I'm having trouble connecting. Please try again."
def chat(
self,
*,
chat_ctx: llm.ChatContext,
tools: list[llm.Tool] | None = None,
conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
parallel_tool_calls=None,
tool_choice=None,
extra_kwargs=None,
) -> "WellNuoLLMStream":
"""Return an LLMStream for this chat request."""
return WellNuoLLMStream(
llm_instance=self,
chat_ctx=chat_ctx,
tools=tools or [],
conn_options=conn_options,
)
class WellNuoLLMStream(llm.LLMStream):
"""Stream wrapper for WellNuo API response."""
def __init__(
self,
llm_instance: WellNuoLLM,
chat_ctx: llm.ChatContext,
tools: list[llm.Tool],
conn_options: APIConnectOptions,
):
super().__init__(
llm=llm_instance,
chat_ctx=chat_ctx,
tools=tools,
conn_options=conn_options,
)
self._wellnuo_llm = llm_instance
async def _run(self):
"""Emit the response as chunks - called by the base class."""
# Extract last user message from chat context
user_message = ""
for item in reversed(self._chat_ctx.items):
# ChatMessage has type="message", role, and text_content property
is_user_message = (
hasattr(item, "type")
and item.type == "message"
and hasattr(item, "role")
and item.role == "user"
)
if is_user_message:
# Use text_content property which handles list[ChatContent]
user_message = item.text_content or ""
break
# Get response from WellNuo API
response_text = await self._wellnuo_llm.get_response(user_message)
# Emit the response as a single chunk
# The base class handles the async iteration
self._event_ch.send_nowait(
llm.ChatChunk(
id="wellnuo-response",
delta=llm.ChoiceDelta(
role="assistant",
content=response_text,
),
)
)
def prewarm(proc: JobProcess):
"""Preload VAD model for faster startup."""
proc.userdata["vad"] = silero.VAD.load()
async def wait_for_participant_with_metadata(
ctx: JobContext, timeout: float = 10.0
) -> tuple[str | None, dict | None]:
"""
Wait for a remote participant with metadata to join, then extract beneficiary data.
The mobile app passes this data through the LiveKit token metadata:
{
"deploymentId": "21",
"beneficiaryNamesDict": {"21": "papa", "69": "David"}
}
IMPORTANT: This function waits up to `timeout` seconds for a participant
with metadata to appear. This fixes the race condition where the agent
connects before the user's metadata is available.
"""
import asyncio
deployment_id = None
beneficiary_names_dict = None
start_time = asyncio.get_event_loop().time()
attempt = 0
while asyncio.get_event_loop().time() - start_time < timeout:
attempt += 1
# Check all remote participants for metadata
for participant in ctx.room.remote_participants.values():
metadata = participant.metadata
if metadata:
try:
data = json.loads(metadata)
deployment_id = data.get("deploymentId")
beneficiary_names_dict = data.get("beneficiaryNamesDict")
if deployment_id:
logger.info(
f"[Attempt {attempt}] Extracted from participant "
f"{participant.identity}: deployment_id={deployment_id}, "
f"beneficiary_names_dict={beneficiary_names_dict}"
)
return deployment_id, beneficiary_names_dict
except json.JSONDecodeError:
logger.warning(
f"Failed to parse participant metadata: {metadata}"
)
# Log waiting status every 2 seconds
if attempt % 4 == 0:
logger.info(
f"Waiting for participant with metadata... "
f"({int(asyncio.get_event_loop().time() - start_time)}s elapsed, "
f"participants: {len(ctx.room.remote_participants)})"
)
await asyncio.sleep(0.5)
# Timeout reached - log and return None
logger.warning(
f"Timeout ({timeout}s) waiting for participant metadata. "
f"Participants: {len(ctx.room.remote_participants)}"
)
return None, None
async def entrypoint(ctx: JobContext):
"""Main Julia AI voice session handler."""
# CRITICAL: Must connect to room first before accessing ctx.room
await ctx.connect()
logger.info(f"Starting Julia AI session in room {ctx.room.name}")
# Wait for participant with metadata (fixes race condition)
# The mobile app sends deploymentId and beneficiaryNamesDict in token metadata
deployment_id, beneficiary_names_dict = await wait_for_participant_with_metadata(
ctx, timeout=10.0
)
# Log what we're using
effective_deployment_id = deployment_id or DEPLOYMENT_ID
logger.info(
f"Using WellNuo ask_wellnuo_ai API with deployment_id: {effective_deployment_id}"
)
if beneficiary_names_dict:
logger.info(f"Beneficiary names dict: {beneficiary_names_dict}")
else:
logger.info("No beneficiary_names_dict provided, using default behavior")
session = AgentSession(
# Deepgram Nova-2 for accurate speech-to-text
stt=deepgram.STT(model="nova-2"),
# WellNuo voice_ask API for LLM with dynamic beneficiary data
llm=WellNuoLLM(
deployment_id=deployment_id,
beneficiary_names_dict=beneficiary_names_dict,
),
# Deepgram Aura Asteria for natural female voice
tts=deepgram.TTS(model="aura-asteria-en"),
# Silero VAD for voice activity detection
vad=ctx.proc.userdata["vad"],
)
# Start the session with Julia assistant
await session.start(
agent=Agent(instructions="You are Julia, a helpful AI care assistant."),
room=ctx.room,
room_input_options=RoomInputOptions(
# Enable noise cancellation
noise_cancellation=noise_cancellation.BVC(),
),
)
# Generate initial greeting
await session.generate_reply(
instructions="Greet the user warmly as Julia. Briefly introduce yourself as their AI care assistant and ask how you can help them today."
)
if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
prewarm_fnc=prewarm,
# Agent name must match what token requests (AGENT_NAME in livekit.js)
agent_name="julia-ai",
)
)