Fix WellNuoLLM custom implementation for livekit-agents 1.3.11

- chat() must be synchronous, returning LLMStream directly
- LLMStream.__init__() takes 4 params: llm, chat_ctx, tools, conn_options
- _run() emits chunks via self._event_ch.send_nowait()
- Added model and provider properties required by LLM base class

Tested: STT (Deepgram Nova-2), TTS (Deepgram Aura Asteria), VAD (Silero)
all working. WellNuo voice_ask API integration confirmed.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Sergei 2026-01-18 21:20:43 -08:00
parent cb0c83d82a
commit dcdd06739d

View File

@ -7,8 +7,8 @@ Uses WellNuo voice_ask API for LLM responses, Deepgram for STT/TTS
import logging
import os
import random
import aiohttp
import aiohttp
from livekit.agents import (
Agent,
AgentSession,
@ -19,7 +19,8 @@ from livekit.agents import (
cli,
llm,
)
from livekit.plugins import deepgram, silero, noise_cancellation
from livekit.agents.types import DEFAULT_API_CONNECT_OPTIONS, APIConnectOptions
from livekit.plugins import deepgram, noise_cancellation, silero
logger = logging.getLogger("julia-ai")
@ -30,9 +31,6 @@ WELLNUO_PASSWORD = os.getenv("WELLNUO_PASSWORD", "anandk_8")
# Hardcoded Ferdinand's deployment_id for testing
DEPLOYMENT_ID = os.getenv("DEPLOYMENT_ID", "21")
# Julia's personality for voice synthesis
JULIA_GREETING = "Hello! I'm Julia, your AI care assistant. How can I help you today?"
class WellNuoLLM(llm.LLM):
"""Custom LLM that uses WellNuo voice_ask API."""
@ -40,21 +38,28 @@ class WellNuoLLM(llm.LLM):
def __init__(self):
super().__init__()
self._token = None
self._session = None
self._model_name = "wellnuo-voice-ask"
async def _ensure_token(self):
@property
def model(self) -> str:
return self._model_name
@property
def provider(self) -> str:
return "wellnuo"
async def _ensure_token(self) -> str:
"""Get authentication token from WellNuo API."""
if self._token:
return self._token
async with aiohttp.ClientSession() as session:
# Generate random nonce for request
nonce = str(random.randint(0, 999999))
data = {
"function": "credentials",
"clientId": "001",
"user_name": WELLNUO_USER,
"ps": WELLNUO_PASSWORD, # API expects 'ps' not 'password'
"ps": WELLNUO_PASSWORD,
"nonce": nonce,
}
async with session.post(WELLNUO_API_URL, data=data) as resp:
@ -67,31 +72,13 @@ class WellNuoLLM(llm.LLM):
logger.error(f"Failed to get WellNuo token: {result}")
raise Exception("Failed to authenticate with WellNuo API")
async def chat(
self,
*,
chat_ctx: llm.ChatContext,
tools: list[llm.FunctionTool] | None = None,
tool_choice: llm.ToolChoice | None = None,
parallel_tool_calls: bool | None = None,
extra_body: dict | None = None,
) -> llm.LLMStream:
"""Send user question to WellNuo voice_ask API."""
# Get the last user message
user_message = ""
for msg in reversed(chat_ctx.items):
if hasattr(msg, 'role') and msg.role == "user":
if hasattr(msg, 'content'):
user_message = msg.content
break
async def get_response(self, user_message: str) -> str:
"""Call WellNuo voice_ask API and return response."""
if not user_message:
# Return a default response if no user message
return WellNuoLLMStream("I'm here to help. What would you like to know?")
return "I'm here to help. What would you like to know?"
logger.info(f"User question: {user_message}")
# Get response from WellNuo API
try:
token = await self._ensure_token()
@ -110,50 +97,83 @@ class WellNuoLLM(llm.LLM):
if result.get("ok"):
response_body = result.get("response", {}).get("body", "")
logger.info(f"WellNuo response: {response_body}")
return WellNuoLLMStream(response_body)
return response_body
else:
logger.error(f"WellNuo API error: {result}")
return WellNuoLLMStream("I'm sorry, I couldn't get that information right now.")
return "I'm sorry, I couldn't get that information right now."
except Exception as e:
logger.error(f"Error calling WellNuo API: {e}")
return WellNuoLLMStream("I'm having trouble connecting. Please try again.")
return "I'm having trouble connecting. Please try again."
def chat(
self,
*,
chat_ctx: llm.ChatContext,
tools: list[llm.Tool] | None = None,
conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
parallel_tool_calls=None,
tool_choice=None,
extra_kwargs=None,
) -> "WellNuoLLMStream":
"""Return an LLMStream for this chat request."""
return WellNuoLLMStream(
llm_instance=self,
chat_ctx=chat_ctx,
tools=tools or [],
conn_options=conn_options,
)
class WellNuoLLMStream(llm.LLMStream):
"""Stream wrapper for WellNuo API response."""
def __init__(self, response_text: str):
def __init__(
self,
llm_instance: WellNuoLLM,
chat_ctx: llm.ChatContext,
tools: list[llm.Tool],
conn_options: APIConnectOptions,
):
super().__init__(
llm=None,
chat_ctx=llm.ChatContext(),
tools=[],
tool_choice=None,
parallel_tool_calls=None,
extra_body=None,
llm=llm_instance,
chat_ctx=chat_ctx,
tools=tools,
conn_options=conn_options,
)
self._response_text = response_text
self._sent = False
self._wellnuo_llm = llm_instance
async def _run(self):
"""Yield the response as a single chunk."""
pass
"""Emit the response as chunks - called by the base class."""
# Extract last user message from chat context
user_message = ""
for item in reversed(self._chat_ctx.items):
# ChatMessage has type="message", role, and text_content property
is_user_message = (
hasattr(item, "type")
and item.type == "message"
and hasattr(item, "role")
and item.role == "user"
)
if is_user_message:
# Use text_content property which handles list[ChatContent]
user_message = item.text_content or ""
break
async def __anext__(self) -> llm.ChatChunk:
if self._sent:
raise StopAsyncIteration
# Get response from WellNuo API
response_text = await self._wellnuo_llm.get_response(user_message)
self._sent = True
return llm.ChatChunk(
id="wellnuo-response",
delta=llm.ChoiceDelta(
role="assistant",
content=self._response_text,
),
# Emit the response as a single chunk
# The base class handles the async iteration
self._event_ch.send_nowait(
llm.ChatChunk(
id="wellnuo-response",
delta=llm.ChoiceDelta(
role="assistant",
content=response_text,
),
)
)
def __aiter__(self):
return self
def prewarm(proc: JobProcess):
"""Preload VAD model for faster startup."""
@ -163,6 +183,9 @@ def prewarm(proc: JobProcess):
async def entrypoint(ctx: JobContext):
"""Main Julia AI voice session handler."""
# CRITICAL: Must connect to room first before accessing ctx.room
await ctx.connect()
logger.info(f"Starting Julia AI session in room {ctx.room.name}")
logger.info(f"Using WellNuo voice_ask API with deployment_id: {DEPLOYMENT_ID}")
@ -198,5 +221,7 @@ if __name__ == "__main__":
WorkerOptions(
entrypoint_fnc=entrypoint,
prewarm_fnc=prewarm,
# Agent name must match what token requests (AGENT_NAME in livekit.js)
agent_name="julia-ai",
)
)