Fix Julia AI agent race condition - wait for participant metadata

The agent was reading participant metadata immediately after connecting,
but the mobile user might not have joined yet or metadata wasn't synced.

Changes:
- Replace extract_beneficiary_data() with wait_for_participant_with_metadata()
- Wait up to 10 seconds for participant with deploymentId
- Log waiting status every 2 seconds for debugging
- Fallback gracefully if timeout reached

This fixes "I couldn't get that information right now" error in voice calls.
This commit is contained in:
Sergei 2026-01-21 14:51:53 -08:00
parent 204cb87f05
commit ac6d458aae

View File

@ -306,20 +306,34 @@ def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
def extract_beneficiary_data(ctx: JobContext) -> tuple[str | None, dict | None]:
async def wait_for_participant_with_metadata(
ctx: JobContext, timeout: float = 10.0
) -> tuple[str | None, dict | None]:
"""
Extract deployment_id and beneficiary_names_dict from participant metadata.
Wait for a remote participant with metadata to join, then extract beneficiary data.
The mobile app passes this data through the LiveKit token metadata:
{
"deploymentId": "21",
"beneficiaryNamesDict": {"21": "papa", "69": "David"}
}
IMPORTANT: This function waits up to `timeout` seconds for a participant
with metadata to appear. This fixes the race condition where the agent
connects before the user's metadata is available.
"""
import asyncio
deployment_id = None
beneficiary_names_dict = None
# Get remote participants (the user who joined)
start_time = asyncio.get_event_loop().time()
attempt = 0
while asyncio.get_event_loop().time() - start_time < timeout:
attempt += 1
# Check all remote participants for metadata
for participant in ctx.room.remote_participants.values():
metadata = participant.metadata
if metadata:
@ -327,16 +341,35 @@ def extract_beneficiary_data(ctx: JobContext) -> tuple[str | None, dict | None]:
data = json.loads(metadata)
deployment_id = data.get("deploymentId")
beneficiary_names_dict = data.get("beneficiaryNamesDict")
if deployment_id:
logger.info(
f"Extracted from participant {participant.identity}: "
f"deployment_id={deployment_id}, "
f"[Attempt {attempt}] Extracted from participant "
f"{participant.identity}: deployment_id={deployment_id}, "
f"beneficiary_names_dict={beneficiary_names_dict}"
)
break # Use first participant with metadata
except json.JSONDecodeError:
logger.warning(f"Failed to parse participant metadata: {metadata}")
return deployment_id, beneficiary_names_dict
except json.JSONDecodeError:
logger.warning(
f"Failed to parse participant metadata: {metadata}"
)
# Log waiting status every 2 seconds
if attempt % 4 == 0:
logger.info(
f"Waiting for participant with metadata... "
f"({int(asyncio.get_event_loop().time() - start_time)}s elapsed, "
f"participants: {len(ctx.room.remote_participants)})"
)
await asyncio.sleep(0.5)
# Timeout reached - log and return None
logger.warning(
f"Timeout ({timeout}s) waiting for participant metadata. "
f"Participants: {len(ctx.room.remote_participants)}"
)
return None, None
async def entrypoint(ctx: JobContext):
@ -347,8 +380,11 @@ async def entrypoint(ctx: JobContext):
logger.info(f"Starting Julia AI session in room {ctx.room.name}")
# Extract beneficiary data from participant metadata
deployment_id, beneficiary_names_dict = extract_beneficiary_data(ctx)
# Wait for participant with metadata (fixes race condition)
# The mobile app sends deploymentId and beneficiaryNamesDict in token metadata
deployment_id, beneficiary_names_dict = await wait_for_participant_with_metadata(
ctx, timeout=10.0
)
# Log what we're using
effective_deployment_id = deployment_id or DEPLOYMENT_ID