From ac6d458aae73438bdc92848f6e36fccf0e3ff0ad Mon Sep 17 00:00:00 2001 From: Sergei Date: Wed, 21 Jan 2026 14:51:53 -0800 Subject: [PATCH] Fix Julia AI agent race condition - wait for participant metadata The agent was reading participant metadata immediately after connecting, but the mobile user might not have joined yet or metadata wasn't synced. Changes: - Replace extract_beneficiary_data() with wait_for_participant_with_metadata() - Wait up to 10 seconds for participant with deploymentId - Log waiting status every 2 seconds for debugging - Fallback gracefully if timeout reached This fixes "I couldn't get that information right now" error in voice calls. --- julia-agent/julia-ai/src/agent.py | 78 ++++++++++++++++++++++--------- 1 file changed, 57 insertions(+), 21 deletions(-) diff --git a/julia-agent/julia-ai/src/agent.py b/julia-agent/julia-ai/src/agent.py index 27d5f5e..171211d 100644 --- a/julia-agent/julia-ai/src/agent.py +++ b/julia-agent/julia-ai/src/agent.py @@ -306,37 +306,70 @@ def prewarm(proc: JobProcess): proc.userdata["vad"] = silero.VAD.load() -def extract_beneficiary_data(ctx: JobContext) -> tuple[str | None, dict | None]: +async def wait_for_participant_with_metadata( + ctx: JobContext, timeout: float = 10.0 +) -> tuple[str | None, dict | None]: """ - Extract deployment_id and beneficiary_names_dict from participant metadata. + Wait for a remote participant with metadata to join, then extract beneficiary data. The mobile app passes this data through the LiveKit token metadata: { "deploymentId": "21", "beneficiaryNamesDict": {"21": "papa", "69": "David"} } + + IMPORTANT: This function waits up to `timeout` seconds for a participant + with metadata to appear. This fixes the race condition where the agent + connects before the user's metadata is available. """ + import asyncio + deployment_id = None beneficiary_names_dict = None - # Get remote participants (the user who joined) - for participant in ctx.room.remote_participants.values(): - metadata = participant.metadata - if metadata: - try: - data = json.loads(metadata) - deployment_id = data.get("deploymentId") - beneficiary_names_dict = data.get("beneficiaryNamesDict") - logger.info( - f"Extracted from participant {participant.identity}: " - f"deployment_id={deployment_id}, " - f"beneficiary_names_dict={beneficiary_names_dict}" - ) - break # Use first participant with metadata - except json.JSONDecodeError: - logger.warning(f"Failed to parse participant metadata: {metadata}") + start_time = asyncio.get_event_loop().time() + attempt = 0 - return deployment_id, beneficiary_names_dict + while asyncio.get_event_loop().time() - start_time < timeout: + attempt += 1 + + # Check all remote participants for metadata + for participant in ctx.room.remote_participants.values(): + metadata = participant.metadata + if metadata: + try: + data = json.loads(metadata) + deployment_id = data.get("deploymentId") + beneficiary_names_dict = data.get("beneficiaryNamesDict") + + if deployment_id: + logger.info( + f"[Attempt {attempt}] Extracted from participant " + f"{participant.identity}: deployment_id={deployment_id}, " + f"beneficiary_names_dict={beneficiary_names_dict}" + ) + return deployment_id, beneficiary_names_dict + except json.JSONDecodeError: + logger.warning( + f"Failed to parse participant metadata: {metadata}" + ) + + # Log waiting status every 2 seconds + if attempt % 4 == 0: + logger.info( + f"Waiting for participant with metadata... " + f"({int(asyncio.get_event_loop().time() - start_time)}s elapsed, " + f"participants: {len(ctx.room.remote_participants)})" + ) + + await asyncio.sleep(0.5) + + # Timeout reached - log and return None + logger.warning( + f"Timeout ({timeout}s) waiting for participant metadata. " + f"Participants: {len(ctx.room.remote_participants)}" + ) + return None, None async def entrypoint(ctx: JobContext): @@ -347,8 +380,11 @@ async def entrypoint(ctx: JobContext): logger.info(f"Starting Julia AI session in room {ctx.room.name}") - # Extract beneficiary data from participant metadata - deployment_id, beneficiary_names_dict = extract_beneficiary_data(ctx) + # Wait for participant with metadata (fixes race condition) + # The mobile app sends deploymentId and beneficiaryNamesDict in token metadata + deployment_id, beneficiary_names_dict = await wait_for_participant_with_metadata( + ctx, timeout=10.0 + ) # Log what we're using effective_deployment_id = deployment_id or DEPLOYMENT_ID