""" WellNuo Voice Agent - Julia AI LiveKit Agents Cloud deployment Uses WellNuo ask_wellnuo_ai API for LLM responses, Deepgram for STT/TTS """ import json import logging import os import random import re import aiohttp from livekit.agents import ( Agent, AgentSession, JobContext, JobProcess, RoomInputOptions, WorkerOptions, cli, llm, ) from livekit.agents.types import DEFAULT_API_CONNECT_OPTIONS, APIConnectOptions from livekit.plugins import deepgram, noise_cancellation, silero logger = logging.getLogger("julia-ai") # WellNuo API Configuration WELLNUO_API_URL = "https://eluxnetworks.net/function/well-api/api" WELLNUO_USER = os.getenv("WELLNUO_USER", "anandk") WELLNUO_PASSWORD = os.getenv("WELLNUO_PASSWORD", "anandk_8") # Hardcoded Ferdinand's deployment_id for testing DEPLOYMENT_ID = os.getenv("DEPLOYMENT_ID", "21") # Keywords that indicate user is asking about the care recipient's status STATUS_KEYWORDS = [ r"\bhow\s+is\b", r"\bhow\'?s\b", r"\bhow\s+are\b", r"\btell\s+me\s+about\b", r"\bwhat\'?s\s+up\s+with\b", r"\bupdate\s+on\b", r"\bstatus\b", r"\bdoing\b", r"\bfeeling\b", r"\bcheck\s+on\b", r"\bis\s+\w+\s+okay\b", r"\bis\s+\w+\s+alright\b", r"\bis\s+\w+\s+fine\b", r"\bokay\?\b", r"\balright\?\b", ] # Keywords that indicate the subject is the care recipient SUBJECT_KEYWORDS = [ r"\bdad\b", r"\bfather\b", r"\bferdinand\b", r"\bhim\b", r"\bhe\b", r"\bmy\s+dad\b", r"\bmy\s+father\b", r"\bthe\s+patient\b", r"\bloved\s+one\b", r"\bparent\b", r"\bgrandpa\b", r"\bgrandfather\b", ] # Sleep-related keywords SLEEP_KEYWORDS = [ r"\bsleep\b", r"\bslept\b", r"\brest\b", r"\bnight\b", r"\bwake\b", r"\bwoke\b", r"\bbedroom\b", r"\bbathroom\b", ] # Activity-related keywords ACTIVITY_KEYWORDS = [ r"\bactivity\b", r"\bactive\b", r"\bmoving\b", r"\bwhere\s+is\b", r"\blocation\b", r"\broom\b", r"\bkitchen\b", r"\bliving\b", ] def normalize_question(user_message: str) -> str: """ Transform user questions into format WellNuo API understands. WellNuo API only responds with real sensor data for very specific phrases. This function maps common user questions to those phrases. """ msg_lower = user_message.lower().strip() # Check if asking about status/wellbeing is_status_query = any(re.search(p, msg_lower) for p in STATUS_KEYWORDS) is_about_recipient = any(re.search(p, msg_lower) for p in SUBJECT_KEYWORDS) is_sleep_query = any(re.search(p, msg_lower) for p in SLEEP_KEYWORDS) is_activity_query = any(re.search(p, msg_lower) for p in ACTIVITY_KEYWORDS) # If asking about the care recipient's general status if is_status_query and is_about_recipient: logger.info(f"Normalized '{user_message}' -> 'how is dad doing'") return "how is dad doing" # If asking about sleep specifically if is_sleep_query and (is_about_recipient or is_status_query): logger.info(f"Normalized '{user_message}' -> 'how did dad sleep'") # Note: This might still not work with WellNuo API, but we try return "how is dad doing" # Fall back to general status which includes sleep # If asking about activity/location if is_activity_query and (is_about_recipient or is_status_query): logger.info(f"Normalized '{user_message}' -> 'how is dad doing'") return "how is dad doing" # General status includes location # Generic status questions without clear subject if is_status_query and not is_about_recipient: # User might say "how is he" or "how are things" # Assume they mean the care recipient logger.info( f"Normalized '{user_message}' -> 'how is dad doing' (assumed recipient)" ) return "how is dad doing" # If no transformation needed, return original logger.info(f"No normalization applied to: '{user_message}'") return user_message class WellNuoLLM(llm.LLM): """Custom LLM that uses WellNuo ask_wellnuo_ai API.""" def __init__( self, deployment_id: str | None = None, beneficiary_names_dict: dict | None = None, ): super().__init__() self._token = None self._model_name = "wellnuo-voice-ask" # Dynamic values from participant metadata (or fallback to env/defaults) self._deployment_id = deployment_id or DEPLOYMENT_ID # SINGLE_DEPLOYMENT_MODE: if beneficiary_names_dict is empty or None, # WellNuo API will automatically use the beneficiary name for this deployment_id # This is the Lite mode - we don't need to pass the names dict self._beneficiary_names_dict = beneficiary_names_dict if beneficiary_names_dict else None @property def model(self) -> str: return self._model_name @property def provider(self) -> str: return "wellnuo" async def _ensure_token(self) -> str: """Get authentication token from WellNuo API.""" if self._token: return self._token async with aiohttp.ClientSession() as session: nonce = str(random.randint(0, 999999)) data = { "function": "credentials", "clientId": "MA_001", "user_name": WELLNUO_USER, "ps": WELLNUO_PASSWORD, "nonce": nonce, } async with session.post(WELLNUO_API_URL, data=data) as resp: result = await resp.json() if result.get("status") == "200 OK": self._token = result.get("access_token") logger.info("WellNuo token obtained successfully") return self._token else: logger.error(f"Failed to get WellNuo token: {result}") raise Exception("Failed to authenticate with WellNuo API") async def get_response(self, user_message: str) -> str: """Call WellNuo ask_wellnuo_ai API and return response.""" if not user_message: return "I'm here to help. What would you like to know?" logger.info(f"User question (original): {user_message}") # Normalize question to format WellNuo API understands normalized_question = normalize_question(user_message) try: token = await self._ensure_token() async with aiohttp.ClientSession() as session: # Using ask_wellnuo_ai - latency ~1-2s after warmup # Provides more comprehensive responses than voice_ask data = { "function": "ask_wellnuo_ai", "clientId": "MA_001", "user_name": WELLNUO_USER, "token": token, "question": normalized_question, "deployment_id": self._deployment_id, } # Add beneficiary_names_dict ONLY if it's not empty # In SINGLE_DEPLOYMENT_MODE (Lite app), we don't send names dict # WellNuo API will use the beneficiary name for this deployment_id if self._beneficiary_names_dict: data["beneficiary_names_dict"] = json.dumps( self._beneficiary_names_dict ) logger.info( f"Full mode: Using beneficiary_names_dict: {self._beneficiary_names_dict}" ) else: logger.info( f"Single deployment mode: deployment_id={self._deployment_id}, no beneficiary_names_dict" ) async with session.post(WELLNUO_API_URL, data=data) as resp: result = await resp.json() if result.get("ok"): response_body = result.get("response", {}).get("body", "") logger.info(f"WellNuo response: {response_body}") return response_body else: logger.error(f"WellNuo API error: {result}") return "I'm sorry, I couldn't get that information right now." except Exception as e: logger.error(f"Error calling WellNuo API: {e}") return "I'm having trouble connecting. Please try again." def chat( self, *, chat_ctx: llm.ChatContext, tools: list[llm.Tool] | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, parallel_tool_calls=None, tool_choice=None, extra_kwargs=None, ) -> "WellNuoLLMStream": """Return an LLMStream for this chat request.""" return WellNuoLLMStream( llm_instance=self, chat_ctx=chat_ctx, tools=tools or [], conn_options=conn_options, ) class WellNuoLLMStream(llm.LLMStream): """Stream wrapper for WellNuo API response.""" def __init__( self, llm_instance: WellNuoLLM, chat_ctx: llm.ChatContext, tools: list[llm.Tool], conn_options: APIConnectOptions, ): super().__init__( llm=llm_instance, chat_ctx=chat_ctx, tools=tools, conn_options=conn_options, ) self._wellnuo_llm = llm_instance async def _run(self): """Emit the response as chunks - called by the base class.""" # Extract last user message from chat context user_message = "" for item in reversed(self._chat_ctx.items): # ChatMessage has type="message", role, and text_content property is_user_message = ( hasattr(item, "type") and item.type == "message" and hasattr(item, "role") and item.role == "user" ) if is_user_message: # Use text_content property which handles list[ChatContent] user_message = item.text_content or "" break # Get response from WellNuo API response_text = await self._wellnuo_llm.get_response(user_message) # Emit the response as a single chunk # The base class handles the async iteration self._event_ch.send_nowait( llm.ChatChunk( id="wellnuo-response", delta=llm.ChoiceDelta( role="assistant", content=response_text, ), ) ) def prewarm(proc: JobProcess): """Preload VAD model for faster startup.""" proc.userdata["vad"] = silero.VAD.load() async def wait_for_participant_with_metadata( ctx: JobContext, timeout: float = 10.0 ) -> tuple[str | None, dict | None]: """ Wait for a remote participant with metadata to join, then extract beneficiary data. The mobile app passes this data through the LiveKit token metadata: { "deploymentId": "21", "beneficiaryNamesDict": {"21": "papa", "69": "David"} } IMPORTANT: This function waits up to `timeout` seconds for a participant with metadata to appear. This fixes the race condition where the agent connects before the user's metadata is available. """ import asyncio deployment_id = None beneficiary_names_dict = None start_time = asyncio.get_event_loop().time() attempt = 0 while asyncio.get_event_loop().time() - start_time < timeout: attempt += 1 # Check all remote participants for metadata for participant in ctx.room.remote_participants.values(): metadata = participant.metadata if metadata: try: data = json.loads(metadata) deployment_id = data.get("deploymentId") beneficiary_names_dict = data.get("beneficiaryNamesDict") if deployment_id: logger.info( f"[Attempt {attempt}] Extracted from participant " f"{participant.identity}: deployment_id={deployment_id}, " f"beneficiary_names_dict={beneficiary_names_dict}" ) return deployment_id, beneficiary_names_dict except json.JSONDecodeError: logger.warning( f"Failed to parse participant metadata: {metadata}" ) # Log waiting status every 2 seconds if attempt % 4 == 0: logger.info( f"Waiting for participant with metadata... " f"({int(asyncio.get_event_loop().time() - start_time)}s elapsed, " f"participants: {len(ctx.room.remote_participants)})" ) await asyncio.sleep(0.5) # Timeout reached - log and return None logger.warning( f"Timeout ({timeout}s) waiting for participant metadata. " f"Participants: {len(ctx.room.remote_participants)}" ) return None, None async def entrypoint(ctx: JobContext): """Main Julia AI voice session handler.""" # CRITICAL: Must connect to room first before accessing ctx.room await ctx.connect() logger.info(f"Starting Julia AI session in room {ctx.room.name}") # Wait for participant with metadata (fixes race condition) # The mobile app sends deploymentId and beneficiaryNamesDict in token metadata deployment_id, beneficiary_names_dict = await wait_for_participant_with_metadata( ctx, timeout=10.0 ) # Log what we're using effective_deployment_id = deployment_id or DEPLOYMENT_ID logger.info( f"Using WellNuo ask_wellnuo_ai API with deployment_id: {effective_deployment_id}" ) if beneficiary_names_dict: logger.info(f"Beneficiary names dict: {beneficiary_names_dict}") else: logger.info("No beneficiary_names_dict provided, using default behavior") session = AgentSession( # Deepgram Nova-2 for accurate speech-to-text stt=deepgram.STT(model="nova-2"), # WellNuo voice_ask API for LLM with dynamic beneficiary data llm=WellNuoLLM( deployment_id=deployment_id, beneficiary_names_dict=beneficiary_names_dict, ), # Deepgram Aura Asteria for natural female voice tts=deepgram.TTS(model="aura-asteria-en"), # Silero VAD for voice activity detection vad=ctx.proc.userdata["vad"], ) # Start the session with Julia assistant await session.start( agent=Agent(instructions="You are Julia, a helpful AI care assistant."), room=ctx.room, room_input_options=RoomInputOptions( # Enable noise cancellation noise_cancellation=noise_cancellation.BVC(), ), ) # Generate initial greeting await session.generate_reply( instructions="Greet the user warmly as Julia. Briefly introduce yourself as their AI care assistant and ask how you can help them today." ) if __name__ == "__main__": cli.run_app( WorkerOptions( entrypoint_fnc=entrypoint, prewarm_fnc=prewarm, # Agent name must match what token requests (AGENT_NAME in livekit.js) agent_name="julia-ai", ) )