""" WellNuo Voice Agent - Julia AI LiveKit Agents Cloud deployment Uses WellNuo voice_ask API for LLM responses, Deepgram for STT/TTS """ import logging import os import random import aiohttp from livekit.agents import ( Agent, AgentSession, JobContext, JobProcess, RoomInputOptions, WorkerOptions, cli, llm, ) from livekit.plugins import deepgram, silero, noise_cancellation logger = logging.getLogger("julia-ai") # WellNuo API Configuration WELLNUO_API_URL = "https://eluxnetworks.net/function/well-api/api" WELLNUO_USER = os.getenv("WELLNUO_USER", "anandk") WELLNUO_PASSWORD = os.getenv("WELLNUO_PASSWORD", "anandk_8") # Hardcoded Ferdinand's deployment_id for testing DEPLOYMENT_ID = os.getenv("DEPLOYMENT_ID", "21") # Julia's personality for voice synthesis JULIA_GREETING = "Hello! I'm Julia, your AI care assistant. How can I help you today?" class WellNuoLLM(llm.LLM): """Custom LLM that uses WellNuo voice_ask API.""" def __init__(self): super().__init__() self._token = None self._session = None async def _ensure_token(self): """Get authentication token from WellNuo API.""" if self._token: return self._token async with aiohttp.ClientSession() as session: # Generate random nonce for request nonce = str(random.randint(0, 999999)) data = { "function": "credentials", "clientId": "001", "user_name": WELLNUO_USER, "ps": WELLNUO_PASSWORD, # API expects 'ps' not 'password' "nonce": nonce, } async with session.post(WELLNUO_API_URL, data=data) as resp: result = await resp.json() if result.get("status") == "200 OK": self._token = result.get("access_token") logger.info("WellNuo token obtained successfully") return self._token else: logger.error(f"Failed to get WellNuo token: {result}") raise Exception("Failed to authenticate with WellNuo API") async def chat( self, *, chat_ctx: llm.ChatContext, tools: list[llm.FunctionTool] | None = None, tool_choice: llm.ToolChoice | None = None, parallel_tool_calls: bool | None = None, extra_body: dict | None = None, ) -> llm.LLMStream: """Send user question to WellNuo voice_ask API.""" # Get the last user message user_message = "" for msg in reversed(chat_ctx.items): if hasattr(msg, 'role') and msg.role == "user": if hasattr(msg, 'content'): user_message = msg.content break if not user_message: # Return a default response if no user message return WellNuoLLMStream("I'm here to help. What would you like to know?") logger.info(f"User question: {user_message}") # Get response from WellNuo API try: token = await self._ensure_token() async with aiohttp.ClientSession() as session: data = { "function": "voice_ask", "clientId": "001", "user_name": WELLNUO_USER, "token": token, "question": user_message, "deployment_id": DEPLOYMENT_ID, } async with session.post(WELLNUO_API_URL, data=data) as resp: result = await resp.json() if result.get("ok"): response_body = result.get("response", {}).get("body", "") logger.info(f"WellNuo response: {response_body}") return WellNuoLLMStream(response_body) else: logger.error(f"WellNuo API error: {result}") return WellNuoLLMStream("I'm sorry, I couldn't get that information right now.") except Exception as e: logger.error(f"Error calling WellNuo API: {e}") return WellNuoLLMStream("I'm having trouble connecting. Please try again.") class WellNuoLLMStream(llm.LLMStream): """Stream wrapper for WellNuo API response.""" def __init__(self, response_text: str): super().__init__( llm=None, chat_ctx=llm.ChatContext(), tools=[], tool_choice=None, parallel_tool_calls=None, extra_body=None, ) self._response_text = response_text self._sent = False async def _run(self): """Yield the response as a single chunk.""" pass async def __anext__(self) -> llm.ChatChunk: if self._sent: raise StopAsyncIteration self._sent = True return llm.ChatChunk( id="wellnuo-response", delta=llm.ChoiceDelta( role="assistant", content=self._response_text, ), ) def __aiter__(self): return self def prewarm(proc: JobProcess): """Preload VAD model for faster startup.""" proc.userdata["vad"] = silero.VAD.load() async def entrypoint(ctx: JobContext): """Main Julia AI voice session handler.""" logger.info(f"Starting Julia AI session in room {ctx.room.name}") logger.info(f"Using WellNuo voice_ask API with deployment_id: {DEPLOYMENT_ID}") session = AgentSession( # Deepgram Nova-2 for accurate speech-to-text stt=deepgram.STT(model="nova-2"), # WellNuo voice_ask API for LLM llm=WellNuoLLM(), # Deepgram Aura Asteria for natural female voice tts=deepgram.TTS(model="aura-asteria-en"), # Silero VAD for voice activity detection vad=ctx.proc.userdata["vad"], ) # Start the session with Julia assistant await session.start( agent=Agent(instructions="You are Julia, a helpful AI care assistant."), room=ctx.room, room_input_options=RoomInputOptions( # Enable noise cancellation noise_cancellation=noise_cancellation.BVC(), ), ) # Generate initial greeting await session.generate_reply( instructions="Greet the user warmly as Julia. Briefly introduce yourself as their AI care assistant and ask how you can help them today." ) if __name__ == "__main__": cli.run_app( WorkerOptions( entrypoint_fnc=entrypoint, prewarm_fnc=prewarm, ) )