diff --git a/julia-agent/julia-ai/src/agent.py b/julia-agent/julia-ai/src/agent.py index 9f30667..29218ce 100644 --- a/julia-agent/julia-ai/src/agent.py +++ b/julia-agent/julia-ai/src/agent.py @@ -7,8 +7,8 @@ Uses WellNuo voice_ask API for LLM responses, Deepgram for STT/TTS import logging import os import random -import aiohttp +import aiohttp from livekit.agents import ( Agent, AgentSession, @@ -19,7 +19,8 @@ from livekit.agents import ( cli, llm, ) -from livekit.plugins import deepgram, silero, noise_cancellation +from livekit.agents.types import DEFAULT_API_CONNECT_OPTIONS, APIConnectOptions +from livekit.plugins import deepgram, noise_cancellation, silero logger = logging.getLogger("julia-ai") @@ -30,9 +31,6 @@ WELLNUO_PASSWORD = os.getenv("WELLNUO_PASSWORD", "anandk_8") # Hardcoded Ferdinand's deployment_id for testing DEPLOYMENT_ID = os.getenv("DEPLOYMENT_ID", "21") -# Julia's personality for voice synthesis -JULIA_GREETING = "Hello! I'm Julia, your AI care assistant. How can I help you today?" - class WellNuoLLM(llm.LLM): """Custom LLM that uses WellNuo voice_ask API.""" @@ -40,21 +38,28 @@ class WellNuoLLM(llm.LLM): def __init__(self): super().__init__() self._token = None - self._session = None + self._model_name = "wellnuo-voice-ask" - async def _ensure_token(self): + @property + def model(self) -> str: + return self._model_name + + @property + def provider(self) -> str: + return "wellnuo" + + async def _ensure_token(self) -> str: """Get authentication token from WellNuo API.""" if self._token: return self._token async with aiohttp.ClientSession() as session: - # Generate random nonce for request nonce = str(random.randint(0, 999999)) data = { "function": "credentials", "clientId": "001", "user_name": WELLNUO_USER, - "ps": WELLNUO_PASSWORD, # API expects 'ps' not 'password' + "ps": WELLNUO_PASSWORD, "nonce": nonce, } async with session.post(WELLNUO_API_URL, data=data) as resp: @@ -67,31 +72,13 @@ class WellNuoLLM(llm.LLM): logger.error(f"Failed to get WellNuo token: {result}") raise Exception("Failed to authenticate with WellNuo API") - async def chat( - self, - *, - chat_ctx: llm.ChatContext, - tools: list[llm.FunctionTool] | None = None, - tool_choice: llm.ToolChoice | None = None, - parallel_tool_calls: bool | None = None, - extra_body: dict | None = None, - ) -> llm.LLMStream: - """Send user question to WellNuo voice_ask API.""" - # Get the last user message - user_message = "" - for msg in reversed(chat_ctx.items): - if hasattr(msg, 'role') and msg.role == "user": - if hasattr(msg, 'content'): - user_message = msg.content - break - + async def get_response(self, user_message: str) -> str: + """Call WellNuo voice_ask API and return response.""" if not user_message: - # Return a default response if no user message - return WellNuoLLMStream("I'm here to help. What would you like to know?") + return "I'm here to help. What would you like to know?" logger.info(f"User question: {user_message}") - # Get response from WellNuo API try: token = await self._ensure_token() @@ -110,50 +97,83 @@ class WellNuoLLM(llm.LLM): if result.get("ok"): response_body = result.get("response", {}).get("body", "") logger.info(f"WellNuo response: {response_body}") - return WellNuoLLMStream(response_body) + return response_body else: logger.error(f"WellNuo API error: {result}") - return WellNuoLLMStream("I'm sorry, I couldn't get that information right now.") + return "I'm sorry, I couldn't get that information right now." except Exception as e: logger.error(f"Error calling WellNuo API: {e}") - return WellNuoLLMStream("I'm having trouble connecting. Please try again.") + return "I'm having trouble connecting. Please try again." + + def chat( + self, + *, + chat_ctx: llm.ChatContext, + tools: list[llm.Tool] | None = None, + conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, + parallel_tool_calls=None, + tool_choice=None, + extra_kwargs=None, + ) -> "WellNuoLLMStream": + """Return an LLMStream for this chat request.""" + return WellNuoLLMStream( + llm_instance=self, + chat_ctx=chat_ctx, + tools=tools or [], + conn_options=conn_options, + ) class WellNuoLLMStream(llm.LLMStream): """Stream wrapper for WellNuo API response.""" - def __init__(self, response_text: str): + def __init__( + self, + llm_instance: WellNuoLLM, + chat_ctx: llm.ChatContext, + tools: list[llm.Tool], + conn_options: APIConnectOptions, + ): super().__init__( - llm=None, - chat_ctx=llm.ChatContext(), - tools=[], - tool_choice=None, - parallel_tool_calls=None, - extra_body=None, + llm=llm_instance, + chat_ctx=chat_ctx, + tools=tools, + conn_options=conn_options, ) - self._response_text = response_text - self._sent = False + self._wellnuo_llm = llm_instance async def _run(self): - """Yield the response as a single chunk.""" - pass + """Emit the response as chunks - called by the base class.""" + # Extract last user message from chat context + user_message = "" + for item in reversed(self._chat_ctx.items): + # ChatMessage has type="message", role, and text_content property + is_user_message = ( + hasattr(item, "type") + and item.type == "message" + and hasattr(item, "role") + and item.role == "user" + ) + if is_user_message: + # Use text_content property which handles list[ChatContent] + user_message = item.text_content or "" + break - async def __anext__(self) -> llm.ChatChunk: - if self._sent: - raise StopAsyncIteration + # Get response from WellNuo API + response_text = await self._wellnuo_llm.get_response(user_message) - self._sent = True - return llm.ChatChunk( - id="wellnuo-response", - delta=llm.ChoiceDelta( - role="assistant", - content=self._response_text, - ), + # Emit the response as a single chunk + # The base class handles the async iteration + self._event_ch.send_nowait( + llm.ChatChunk( + id="wellnuo-response", + delta=llm.ChoiceDelta( + role="assistant", + content=response_text, + ), + ) ) - def __aiter__(self): - return self - def prewarm(proc: JobProcess): """Preload VAD model for faster startup.""" @@ -163,6 +183,9 @@ def prewarm(proc: JobProcess): async def entrypoint(ctx: JobContext): """Main Julia AI voice session handler.""" + # CRITICAL: Must connect to room first before accessing ctx.room + await ctx.connect() + logger.info(f"Starting Julia AI session in room {ctx.room.name}") logger.info(f"Using WellNuo voice_ask API with deployment_id: {DEPLOYMENT_ID}") @@ -198,5 +221,7 @@ if __name__ == "__main__": WorkerOptions( entrypoint_fnc=entrypoint, prewarm_fnc=prewarm, + # Agent name must match what token requests (AGENT_NAME in livekit.js) + agent_name="julia-ai", ) )