ai-agent-simulation/wellnuo_agent.py

1344 lines
53 KiB
Python

import pandas as pd
import sys
import os
import time
from openai import OpenAI
import urllib.request
import requests
import datetime
import json
import math
import re
import numpy as np
import msgpack
# ---------------------------------------------------------
# GLOBAL CONFIGURATION & CONSTANTS
# ---------------------------------------------------------
DEBUG = 0 # Debug printout levels. Set to 1 to see verbose output.
USE_LOCAL_CACHE = False # Set to True to read from files, False to call API
PRINT_PRESENCE = True # Flag to print Presence (P) values in the timeline
WELLNUO_USER = "dive"
DEPLOYMENT_ID = "21"
TIMELINE_STORAGE_FILE = "timeline_history.json"
# Mappings to translate hardware IDs to human-readable room names
DEVICE_TO_ROOM_MAPPING = {
720: "Bedroom",
719: "Bathroom Main",
716: "Kitchen",
663: "Office",
714: "Living Room",
718: "Bathroom Guest",
715: "Dining Room"
}
WELL_TO_ROOM_MAPPING = {
"470": "Living Room",
"473": "Bathroom Guest",
"475": "Bedroom",
"471": "Dining Room",
"474": "Bathroom Main",
"472": "Kitchen",
"431": "Office"
}
# Settings for converting radar sensor readings into a narrative timeline
BASELINE_PRESENCE = 20 # Presence radar levels below this level indicates absense
LATCH_TIME_MINUTES = 15 # Time to hold a room state active during silence before switching
MIN_DURATION_BATHROOM_THRESHOLD = 2 # Minimum duration (mins) to keep a block; shorter blocks are merged (glitch removal)
MIN_DURATION_OTHER_THRESHOLD = 10 # Minimum duration (mins) to keep a block; shorter blocks are merged (glitch removal)
# Constants for Light Logic
LIGHT_THRESHOLD = 200 # Lux value above which a room is considered "Lit"
EVENING_START_MIN = 18 * 60 # 6:00 PM (Minute 1080)
MORNING_END_MIN = 6 * 60 # 6:00 AM (Minute 360)
# Define available LLM models.
MODEL_OPTIONS = {
"1": {
"name": "Local: Ollama (qwen3:8b)",
"type": "ollama",
"base_url": "http://localhost:11434/v1",
"api_key": "ollama",
"model_name": "qwen3:8b"
},
"2": {
"name": "Cloud: Groq (GPT-OSS-20B)",
"type": "cloud",
"base_url": "https://api.groq.com/openai/v1",
"api_key": "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",
"model_name": "openai/gpt-oss-20b"
},
"3": {
"name": "Cloud: Groq (GPT-OSS-120B)",
"type": "cloud",
"base_url": "https://api.groq.com/openai/v1",
"api_key": "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",
"model_name": "openai/gpt-oss-120b"
}
# "4": {
# "name": "Cloud: Alibaba (Qwen-Flash)",
# "type": "cloud",
# "base_url": "https://dashscope-intl.aliyuncs.com/compatible-mode/v1",
# "api_key": "sk-...",
# "model_name": "qwen-flash"
# }
}
# Global client variable (will be initialized after user selection)
client = None
# Helper function for time formatting
def minutes_to_time(minutes):
minutes = minutes % 1440 # Normalize 1440 to 0
hours = minutes // 60
mins = minutes % 60
period = "AM" if hours < 12 else "PM"
if hours == 0: hours = 12
if hours > 12: hours -= 12
return f"{hours:02d}:{mins:02d} {period}"
# Helper function for 2 significant figures
def format_sig_figs(val):
if val is None or pd.isna(val) or val == 0:
return "0"
# Calculate the power of 10 to round to the second significant digit
try:
return f"{round(val, -int(math.floor(math.log10(abs(val)))) + 1):.0f}"
except:
return "0"
def get_timeline_humidity_profile(timelines):
"""
Parses the generated text timelines to extract a sorted, deduplicated list
of average humidity values (H) recorded during Bathroom visits.
"""
all_h_values = []
# Regex to find H value in a line containing "Bathroom"
# Logic: Look for 'Bathroom', then look for 'H:' followed by digits
pattern = re.compile(r"Bathroom.*H:\s*(\d+)")
for date, text in timelines.items():
for line in text.split('\n'):
# Only process lines describing bathroom visits
if "Bathroom" in line:
match = pattern.search(line)
if match:
try:
val = int(match.group(1))
all_h_values.append(val)
except ValueError:
pass
if not all_h_values:
return ""
# Deduplicate and sort
unique_sorted = sorted(list(set(all_h_values)))
return f"### REFERENCE HUMIDITY DATA\nSorted list of average Bathroom Humidity levels (H) observed during visits: {unique_sorted}"
# ---------------------------------------------------------
# AUTHENTICATION
# ---------------------------------------------------------
def get_wellnuo_credentials():
"""
Authenticates with the Wellnuo API and retrieves the session token.
Returns:
str: Access token if successful, None otherwise.
"""
url = "https://eluxnetworks.net/function/well-api/api"
# Payload matching API requirements
payload = {
"function": "credentials",
"user_name": WELLNUO_USER,
"ps": "D@v1d3",
"clientId": "001",
"nonce": "111"
}
print("Authenticating with Wellnuo API...")
try:
# requests.post with 'data=' sends application/x-www-form-urlencoded
response = requests.post(url, data=payload)
response.raise_for_status()
data = response.json()
# Attempt to locate token in standard response fields
token = data.get("access_token") or data.get("data", {}).get("token")
if token:
print("✅ Wellnuo authentication successful.")
return token
else:
print(f"❌ Wellnuo authentication failed: Token not found in response: {data}")
return None
except Exception as e:
print(f"❌ Error connecting to Wellnuo API: {e}")
return None
def print_sensor_statistics(daily_sensor_dfs, sensor_type="Sensor", unit=""):
"""
Aggregates sensor data from all available days and prints percentiles for each room.
Percentiles shown:
- 0% to 90% in increments of 10%
- 90% to 100% in increments of 1%
Args:
daily_sensor_dfs (dict): Dictionary of daily DataFrames.
sensor_type (str): Name for the header (e.g., "HUMIDITY", "LIGHT").
unit (str): Unit suffix for the values (e.g., "%", " Lux").
"""
if not daily_sensor_dfs:
print(f"No {sensor_type} data available to analyze.")
return
print("\n========================================================")
print(f" {sensor_type.upper()} STATISTICS (Aggregated over all days) ")
print("========================================================")
# 1. Combine all daily DataFrames into one large DataFrame
combined_df = pd.concat(daily_sensor_dfs.values(), axis=0, ignore_index=True)
# 2. Iterate through each room (column)
for room in combined_df.columns:
# Extract the data series for this room
series = combined_df[room]
# CRITICAL: Filter out 0.0 values (missing data or "off" states)
valid_series = series[series > 0]
if valid_series.empty:
print(f"\nRoom: {room} - No valid data (>0) found.")
continue
print(f"\nRoom: {room}")
print("-" * 20)
# 3. Calculate Percentiles
# Generate list: 0.0, 0.1, ... 0.9
percentiles = [i / 10.0 for i in range(10)]
# Add fine-grained list: 0.91, 0.92, ... 1.0
percentiles.extend([i / 100.0 for i in range(91, 101)])
stats = valid_series.quantile(percentiles)
# 4. Print formatted results
for p, val in stats.items():
# Use round() to handle floating point precision issues (e.g. 0.1 * 100 = 9.999...)
label = f"{int(round(p * 100))}%"
print(f"{label:<5}: {val:.1f}{unit}")
def print_presence_statistics(filename):
"""
Calculates and prints the percentiles of the presence (radar) level
in each room for a particular CSV file.
"""
print(f"\n========================================================")
print(f" PRESENCE STATISTICS (File: {os.path.basename(filename)}) ")
print(f"========================================================")
try:
df = pd.read_csv(filename)
# Map Device IDs to Names
mapping = globals().get('DEVICE_TO_ROOM_MAPPING', {})
df['device_id'] = df['device_id'].map(mapping).fillna(df['device_id'])
# Get unique rooms
rooms = df['device_id'].unique()
for room in rooms:
# Filter data for this room
room_data = df[df['device_id'] == room]['radar']
# Filter out 0/noise for meaningful statistics if desired,
# or keep all to see true distribution including silence.
# Here we filter > 0 to see the strength of "active" signals.
valid_series = room_data[room_data > 0]
if valid_series.empty:
print(f"\nRoom: {room} - No valid presence data (>0) found.")
continue
print(f"\nRoom: {room}")
print("-" * 20)
# Calculate Percentiles
# 0% to 90% in increments of 10%, then 91-100%
percentiles = [i / 10.0 for i in range(10)]
percentiles.extend([i / 100.0 for i in range(91, 101)])
stats = valid_series.quantile(percentiles)
for p, val in stats.items():
label = f"{int(round(p * 100))}%"
print(f"{label:<5}: {val:.1f}")
except Exception as e:
print(f"Error calculating presence statistics: {e}")
# ---------------------------------------------------------
# CORE LOGIC: TIMELINE GENERATION
# ---------------------------------------------------------
def create_timeline(
light_df=None,
humidity_df=None,
radar_df=None,
start_minute_offset=0,
initial_state=None,
output_cutoff_minute=0,
date_label=None,
):
"""
Transforms raw radar sensor CSV data into a coherent, narrative timeline of user activity
for a single day.
This function implements a multi-stage algorithm to determine the most likely room
occupied by a person based on radar signal strength, while cross-referencing light
sensor data to filter out false positives during dark hours.
It also calculates average environmental conditions (Light and Humidity) for each
identified activity block.
Algorithm Overview:
-------------------
1. Data Preparation:
- Maps hardware Device IDs to human-readable Room Names.
- Pivots raw data into a minute-by-minute grid (0 to 1439 minutes).
2. State Machine & Signal Evaluation (The Core Loop):
- Iterates through every minute of the day.
- Bathroom Override Logic: A person is considered to be in the bathroom if either:
a) The light is not on (< 200 Lux), has the strongest radar presence signal of all rooms,
and that signal is > BASELINE_PRESENCE.
b) It falls within a pre-calculated "Sandwich Rule" block (a period of High Light > 1000 Lux &
possible presence > BASELINE_PRESENCE, strictly bookended by minutes with Light < 200 Lux).
- Standard Light Sensor Logic (Dark Hours Rule): If no override applies, between 6:00 PM and 6:00 AM,
the algorithm restricts the search space. Only rooms with active light (> 200 Lux) are considered
valid candidates.
* EXCEPTION: The "Bedroom" is always a valid candidate, even in the dark, to account for sleeping.
- Signal Selection: Determines which valid candidate room has the strongest radar signal.
- Rejection Tracking: Logs instances where a room had the strongest raw signal (> BASELINE_PRESENCE)
but was ignored because it was dark (and therefore deemed a false positive or ghost signal).
- Heuristics: Applies logic for "Sleeping" states and "Latching" (holding the previous
state during short periods of silence to prevent flickering).
3. Iterative Post-Processing (Gap Filling):
- A cleaning loop runs (up to 10 times) to refine the timeline.
- Merges consecutive entries for the same room.
- Identifies "glitches" (durations shorter than 2 mins for Bathrooms or 5 mins for other rooms).
- "Absorbs" these glitches into the neighboring room with the strongest signal context.
4. Reporting:
- Prints a summary of "Light Rule Exclusions" (times where strong signals were ignored due to darkness).
5. Formatting & Environmental Analysis:
- Iterates through the final timeline blocks.
- Calculates the average Light (Lux) and Humidity (%) for the specific room and time interval.
- Returns the final formatted text timeline.
Args:
filename (str): Path to the CSV file containing radar data.
light_df (pd.DataFrame, optional): Preprocessed DataFrame containing light sensor readings
(Index=Minute, Columns=Room Names).
humidity_df (pd.DataFrame, optional): Preprocessed DataFrame containing humidity sensor readings
(Index=Minute, Columns=Room Names).
Returns:
tuple: (timeline_text_string, final_state_room_string)
"""
if radar_df is not None:
pivot_df = radar_df
pivot_df.index = range(0, len(pivot_df))
else:
return "No Data", "Unknown"
if light_df is not None and getattr(light_df, "empty", False):
light_df = None
if humidity_df is not None and getattr(humidity_df, "empty", False):
humidity_df = None
# 1. Data Preparation (Sandwich Rule)
forced_condition_b_minutes = set()
df_len = len(pivot_df)
if light_df is not None and not light_df.empty:
bathrooms_list = [c for c in pivot_df.columns if "Bathroom" in c]
for bath in bathrooms_list:
if bath not in light_df.columns: continue
l_series = light_df[bath].values
p_series = pivot_df[bath].values
# Find high blocks
is_high = (l_series > 1000) & (p_series > BASELINE_PRESENCE)
in_block = False
block_start = -1
for i in range(df_len):
val = is_high[i]
if val and not in_block:
in_block = True
block_start = i
elif not val and in_block:
block_end = i - 1
in_block = False
valid_pre = True
# Check neighbor in current chunk
if block_start > 0 and l_series[block_start - 1] >= 200: valid_pre = False
valid_post = True
if i < df_len and l_series[i] >= 200: valid_post = False
if valid_pre and valid_post:
for k in range(block_start, block_end + 1):
forced_condition_b_minutes.add((k + start_minute_offset, bath))
# Initialize State Machine
raw_log = []
rejection_log = []
current_state = initial_state if initial_state else "Out of House/Inactive"
# Track where the current logic block started.
# If we are in a buffer zone, this might get reset once we cross the output_cutoff_minute.
current_block_start_time = start_minute_offset
start_time = start_minute_offset
silence_counter = 0
last_known_room = initial_state if initial_state else "Unknown"
# ---------------------------------------------------------
# 2. STATE MACHINE LOOP (NumPy Optimized)
# ---------------------------------------------------------
# Pre-computation for speed
radar_vals = pivot_df.values
light_vals = light_df.values if light_df is not None else None
columns = pivot_df.columns.tolist()
col_name_to_idx = {name: i for i, name in enumerate(columns)}
# Pre-identify specific room indices to avoid string matching in the loop
bathroom_indices = [i for i, name in enumerate(columns) if "Bathroom" in name]
bedroom_idx = col_name_to_idx.get("Bedroom")
# Loop
for i in range(len(pivot_df)):
# Access row data via integer index (Fast)
row_array = radar_vals[i]
light_array = light_vals[i] if light_vals is not None else None
minute_of_day = i + start_minute_offset
# 1. Raw Signal Stats
raw_max_signal = row_array.max()
# raw_best_room gets the name of the room with max signal
raw_best_room = columns[row_array.argmax()] if raw_max_signal > 0 else "Unknown"
# 2. Sandwich / Bathroom Override Logic
forced_bathroom = None
for bath_idx in bathroom_indices:
bath_name = columns[bath_idx]
p_val = row_array[bath_idx]
l_val = light_array[bath_idx] if light_array is not None else 0
# Condition A: Dark, Best Signal is here, Strong Signal
cond_a = (l_val < 200) and (raw_best_room == bath_name) and (p_val > BASELINE_PRESENCE)
# Condition B: Pre-calculated Sandwich
cond_b = (minute_of_day, bath_name) in forced_condition_b_minutes
if cond_a or cond_b:
forced_bathroom = bath_name
break
if forced_bathroom:
active_location = forced_bathroom
silence_counter = 0
last_known_room = active_location
else:
# 3. Candidate Selection (Light Logic)
# Start with all column indices as candidates
candidate_indices = list(range(len(columns)))
is_dark_hours = (minute_of_day < MORNING_END_MIN) or (minute_of_day > EVENING_START_MIN)
if light_vals is not None and is_dark_hours:
# Find indices where light is on
# np.where returns a tuple, we get the first array
lit_indices = np.where(light_array > LIGHT_THRESHOLD)[0]
# Convert to set for easy operations
allowed_indices = set(lit_indices)
# Always allow Bedroom in dark hours
if bedroom_idx is not None:
allowed_indices.add(bedroom_idx)
# Intersect with available data columns (implicit, as we are using range of cols)
candidate_indices = list(allowed_indices)
# Rejection Tracking
if raw_max_signal > BASELINE_PRESENCE:
# Check if the raw best index is in our allowed candidates
# Note: raw_best_room is a string, we need its index or check name
raw_best_idx = row_array.argmax()
if raw_best_idx not in candidate_indices:
rejection_log.append({'minute': minute_of_day, 'room': raw_best_room})
# 4. Find Strongest Room within Candidates
if len(candidate_indices) > 0:
# Extract values for candidates only
candidate_vals = row_array[candidate_indices]
local_max = candidate_vals.max()
if local_max > 0:
# Find which index in the candidate list had the max
best_local_idx = candidate_vals.argmax()
# Map back to global column index
best_global_idx = candidate_indices[best_local_idx]
strongest_room = columns[best_global_idx]
max_signal = local_max
else:
max_signal = 0
strongest_room = "Unknown"
else:
max_signal = 0
strongest_room = "Unknown"
if max_signal > BASELINE_PRESENCE:
active_location = strongest_room
silence_counter = 0
last_known_room = active_location
else:
active_location = None
silence_counter += 1
# 5. Latching & Sleep Logic
effective_location = current_state
if active_location:
effective_location = active_location
else:
is_night_time = (minute_of_day < 480) or (minute_of_day > 1320)
if last_known_room == "Bedroom" and is_night_time:
# Keep canonical state
effective_location = "Bedroom"
silence_counter = 0
elif silence_counter < LATCH_TIME_MINUTES and last_known_room != "Unknown":
effective_location = last_known_room
elif is_night_time:
effective_location = "Bedroom"
last_known_room = "Bedroom"
else:
effective_location = "Out of House/Inactive"
# 6. State Change & Logging
if effective_location != current_state:
if current_state != "Unknown":
# Ensure we don't log 0 duration blocks
if minute_of_day > start_time:
# Only log if the block ends AFTER the user requested start time
if minute_of_day > output_cutoff_minute:
# Calculate adjusted start time
final_start = max(start_time, output_cutoff_minute)
duration = minute_of_day - final_start
if duration > 0:
raw_log.append({
"room": current_state,
"start": final_start,
"end": minute_of_day,
"duration": duration
})
# -------------------------------------------
current_state = effective_location
start_time = minute_of_day
# Add final block (Handling the tail end)
final_minute = len(pivot_df) + start_minute_offset
# Apply the same buffer logic to the final block
if final_minute > output_cutoff_minute:
# If the current block started before the cutoff, truncate it
# If it started after, keep it as is.
# Note: current_block_start_time variable from original script was likely 'start_time'
# (variable name mismatch in original snippet, using start_time here which is the loop tracker)
final_block_start = max(start_time, output_cutoff_minute)
duration = final_minute - final_block_start
if duration > 0:
raw_log.append({
"room": current_state,
"start": final_block_start,
"end": final_minute,
"duration": duration
})
# 3. Iterative Cleaning (Same as before)
iteration = 0
max_iterations = 10
while iteration < max_iterations:
merged_log = []
if raw_log:
current_block = raw_log[0]
for next_block in raw_log[1:]:
if next_block['room'] == current_block['room']:
current_block['end'] = next_block['end']
current_block['duration'] = current_block['end'] - current_block['start']
else:
merged_log.append(current_block)
current_block = next_block
merged_log.append(current_block)
raw_log = merged_log
changes_made = False
for i in range(len(raw_log)):
block = raw_log[i]
dynamic_threshold = MIN_DURATION_BATHROOM_THRESHOLD if "Bathroom" in block['room'] else MIN_DURATION_OTHER_THRESHOLD
if block['duration'] < dynamic_threshold:
prev_block = raw_log[i-1] if i > 0 else None
next_block = raw_log[i+1] if i < len(raw_log) - 1 else None
winner_room = None
if prev_block and next_block: winner_room = prev_block['room']
elif prev_block: winner_room = prev_block['room']
elif next_block: winner_room = next_block['room']
if winner_room and winner_room != block['room']:
block['room'] = winner_room
changes_made = True
if not changes_made: break
iteration += 1
# ---------------------------------------------------------
# 4. Print Rejection Log
# ---------------------------------------------------------
if rejection_log:
date_str = date_label or "Unknown Date"
rejection_log.sort(key=lambda x: x['minute'])
curr_rej = rejection_log[0]
start_m = curr_rej['minute']
end_m = start_m
room = curr_rej['room']
for i in range(1, len(rejection_log)):
next_rej = rejection_log[i]
if (next_rej['minute'] == end_m + 1) and (next_rej['room'] == room):
end_m = next_rej['minute']
else:
duration = end_m - start_m + 1
threshold = 2 if "Bathroom" in room else 5
if duration >= threshold:
print(f"On {date_str}, between {minutes_to_time(start_m)} and {minutes_to_time(end_m + 1)}, {room} had the highest radar presence signal but was ignored (Dark Hours).")
curr_rej = next_rej
start_m = curr_rej['minute']
end_m = start_m
room = curr_rej['room']
# Final block
duration = end_m - start_m + 1
threshold = 2 if "Bathroom" in room else 5
if duration >= threshold:
print(f"On {date_str}, between {minutes_to_time(start_m)} and {minutes_to_time(end_m + 1)}, {room} had the highest radar presence signal but was ignored (Dark Hours).")
# 5. Format Output
final_text = []
for entry in raw_log:
# If the block ends before we are supposed to start outputting, skip it entirely.
if entry['end'] <= output_cutoff_minute:
continue
# If the block starts before the cutoff but ends after, truncate the start.
if entry['start'] < output_cutoff_minute:
entry['start'] = output_cutoff_minute
entry['duration'] = entry['end'] - entry['start']
start_s = minutes_to_time(entry['start'])
end_s = minutes_to_time(entry['end'])
room_name = entry['room']
display_room_name = room_name
if room_name == "Bedroom":
start_min = entry['start']
end_min = entry['end']
# same night window you used before
is_night_block = (start_min < 480) or (start_min > 1320) or (end_min < 480) or (end_min > 1320)
if is_night_block:
display_room_name = "Bedroom (Sleeping)"
local_start = max(0, entry['start'] - start_minute_offset)
local_end = max(0, entry['end'] - start_minute_offset)
env_parts = []
if light_df is not None and room_name in light_df.columns:
segment = light_df.iloc[local_start : local_end][room_name]
if not segment.empty: env_parts.append(f"L:{format_sig_figs(segment.mean())}")
if humidity_df is not None and room_name in humidity_df.columns:
segment = humidity_df.iloc[local_start : local_end][room_name]
if not segment.empty: env_parts.append(f"H:{format_sig_figs(segment.mean())}")
if globals().get('PRINT_PRESENCE', False) and room_name in pivot_df.columns:
segment = pivot_df.iloc[local_start : local_end][room_name]
if not segment.empty: env_parts.append(f"P:{format_sig_figs(segment.mean())}")
env_tag = f" [{', '.join(env_parts)}]" if env_parts else ""
final_text.append(f"- {start_s} to {end_s} ({entry['duration']} mins): {display_room_name}{env_tag}")
# RETURN BOTH TEXT AND FINAL STATE
return "\n".join(final_text), current_state
def parse_last_timeline_state(timeline_text):
"""
Reads the last line of a timeline string to determine where we left off.
Returns: (end_minute_of_day, room_name)
"""
if not timeline_text:
return 0, "Out of House/Inactive"
lines = timeline_text.strip().split('\n')
last_line = lines[-1]
# Regex to extract end time and room name
# Example line: "- 10:00 AM to 02:30 PM (270 mins): Living Room [L:300]"
# We look for "to HH:MM XX" and then the Room Name before any brackets
match = re.search(r"to\s+(\d{2}:\d{2}\s+[AP]M).*?:\s+(.*?)(?:\s\[|$)", last_line)
if match:
time_str = match.group(1)
room_name = match.group(2).strip()
# Convert Time String to Minutes
dt = datetime.datetime.strptime(time_str, "%I:%M %p")
minutes = dt.hour * 60 + dt.minute
# Handle wrapping (if 12:00 AM is 0 or 1440, though usually timeline ends before midnight)
return minutes, room_name
return 0, "Out of House/Inactive"
def merge_timeline_chunks(existing_text, new_text):
"""
Appends new_text to existing_text.
If the last line of existing_text and the first line of new_text
are the same room, it merges them into a single line.
"""
if not existing_text:
return new_text
if not new_text:
return existing_text
lines_old = existing_text.strip().split('\n')
lines_new = new_text.strip().split('\n')
last_line = lines_old[-1]
first_line = lines_new[0]
# Regex to extract: StartTime, EndTime, Duration, RoomName, EnvTags
# Matches: "- 10:00 AM to 11:00 AM (60 mins): Living Room [L:100]"
# Group 1: Start, Group 2: End, Group 3: Mins, Group 4: Room, Group 5: EnvTags (Optional)
pattern = re.compile(r"- (.*?) to (.*?) \((\d+) mins\): (.*?)(?: \[|$)")
match_old = pattern.search(last_line)
match_new = pattern.search(first_line)
if match_old and match_new:
room_old = match_old.group(4).strip()
room_new = match_new.group(4).strip()
# CHECK: Are we talking about the same room?
if room_old == room_new:
# We have a seam! Merge them.
# 1. Get original start time
start_time_str = match_old.group(1)
# 2. Get new end time
end_time_str = match_new.group(2)
# 3. Calculate total duration
dur_old = int(match_old.group(3))
dur_new = int(match_new.group(3))
total_duration = dur_old + dur_new
# 4. Handle Environmental Tags [L:..., H:...]
# Strategy: Use the tags from the NEW line (most recent data is usually more relevant)
# Alternatively, you could try to average them, but that's complex with regex.
env_suffix = ""
if "[" in first_line:
env_suffix = " [" + first_line.split("[", 1)[1]
elif "[" in last_line:
env_suffix = " [" + last_line.split("[", 1)[1]
# 5. Construct the merged line
merged_line = f"- {start_time_str} to {end_time_str} ({total_duration} mins): {room_old}{env_suffix}"
# 6. Rebuild the text
# Remove last line of old, remove first line of new, insert merged line
lines_old.pop()
lines_new.pop(0)
# Combine
result_lines = lines_old + [merged_line] + lines_new
return "\n".join(result_lines)
# No merge possible, just append
return existing_text + "\n" + new_text
# ---------------------------------------------------------
# HELPER: AI MODEL SELECTION
# ---------------------------------------------------------
def is_ollama_running():
"""Checks if Ollama is running on the default local port."""
try:
with urllib.request.urlopen("http://localhost:11434", timeout=0.5) as response:
return response.status == 200
except:
return False
def select_model_configuration():
"""Interactively select the model provider via CLI."""
print("\n--- SELECT AI MODEL ---")
available_keys = []
for key, config in MODEL_OPTIONS.items():
# Check availability for local models
if config['type'] == 'ollama':
if is_ollama_running():
print(f"[{key}] {config['name']}")
available_keys.append(key)
else:
continue
else:
# Always show cloud providers
print(f"[{key}] {config['name']}")
available_keys.append(key)
if not available_keys:
print("Error: No models available. (Ollama is offline and no cloud keys configured?)")
sys.exit()
while True:
choice = input("\nSelect Model # (or 'q'): ")
if choice.lower() == 'q': sys.exit()
if choice in available_keys:
return MODEL_OPTIONS[choice]
print("Invalid selection.")
# ---------------------------------------------------------
# CLASS: WELLNUO API WRAPPER
# ---------------------------------------------------------
class WellnuoAPI:
def __init__(self, api_token):
self.api_token = api_token
if not self.api_token:
raise ValueError("Error: No API Token available.")
self.url = "https://eluxnetworks.net/function/well-api/api"
def get_sensor_matrix(self, start_date_str, end_date_str, minute_start=0):
"""
Retrieves the sensor matrix. Supports partial day fetching via minute_start.
"""
# 1. Construct the sensor list
sensor_list = ["light", "humidity", "pressure", "radar"]
# sensor_list.extend([f"s{i}" for i in range(80)])
sensors_string = ",".join(sensor_list)
# 2. Prepare Payload
payload = {
"function": "get_sensor_matrix",
"user_name": WELLNUO_USER,
"token": self.api_token,
"deployment_id": "21",
"date": start_date_str,
"to_date": end_date_str,
"minute_start": minute_start,
"minute_end": 1439,
"sensors": sensors_string,
"radar_part": "s28",
"bucket_size": "1m"
}
try:
print(f"\n--- Calling Wellnuo get_sensor_matrix() (Start: {minute_start}) ---")
response = requests.post(self.url, data=payload)
if response.status_code != 200:
return None # Handle error gracefully in main
data = msgpack.unpackb(response.content, raw=False)
return data
except Exception as e:
print(f"API Error: {e}")
return None
def download(self, start_date_str, end_date_str):
"""Downloads radar data as ZIP file for a date range."""
payload = {
"name": "download",
"user_name": WELLNUO_USER,
"token": self.api_token,
"deployment_id": DEPLOYMENT_ID,
"date_from": start_date_str,
"date_to": end_date_str,
"group_by": "by_minute_rd",
"re_create": "false",
"radar_part": "s28"
}
try:
print(f"\n--- Calling Wellnuo download() API function to get radar sensor data between {start_date_str} and {end_date_str}...")
response = requests.get(self.url, params=payload)
response.raise_for_status()
return response
except Exception as e:
return f"API Error: {e}"
def get_sensor_data(self, start_date_str, end_date_str, sensor_type):
"""
Retrieves sensor data "light" or "humidity" for a specific date.
Matches the 'get_sensor_data_by_deployment_id' API function shown in Postman.
"""
payload = {
"function": "get_sensor_data_by_deployment_id",
"user_name": WELLNUO_USER,
"token": self.api_token,
"sensor": sensor_type,
"deployment_id": DEPLOYMENT_ID,
"data_type": "ML",
"radar_part": "s28",
"date": start_date_str,
"to_date": end_date_str,
}
try:
print(f"\n--- Calling Wellnuo get_sensor_data_by_deployment_id() API function to get {sensor_type} sensor data between {start_date_str} and {end_date_str}...")
response = requests.post(self.url, data=payload)
response.raise_for_status()
return response.json()
except Exception as e:
return f"API Error: {e}"
def get_presence_data(self, start_date_str, end_date_str):
"""Retrieves presence data (Z-graph)."""
payload = {
"function": "get_presence_data",
"user_name": WELLNUO_USER,
"token": self.api_token,
"deployment_id": DEPLOYMENT_ID,
"date": start_date_str,
"filter": "6",
"data_type": "z-graph",
"to_date": end_date_str,
"refresh": "0"
}
try:
print(f"\n--- Calling Wellnuo get_presence_data() API function for start_date = {start_date_str}, end_date = {end_date_str} ---")
response = requests.post(self.url, data=payload)
response.raise_for_status()
return response.json()
except Exception as e:
return f"API Error: {e}"
# ---------------------------------------------------------
# CLASS: AI INTERPRETER AGENT
# ---------------------------------------------------------
class ActivityAgent:
def __init__(self, timelines_dict, model_name, extra_context=""):
self.context = self.summarize_timelines(timelines_dict)
# Append the extra context (humidity list) to the end of the log
if extra_context:
self.context += f"\n\n{extra_context}"
print("AI agent received the following context: ")
print(self.context)
self.model_name = model_name
self.system_prompt = """
You are an expert Health Data Analyst.
You are receiving a timeline of activities of a person in a house.
Answer user's question strictly based on the provided timeline of activities.
If the user mentions 'bathroom' without specifying 'Main' or 'Guest', you must combine the data from both 'Bathroom Main'
and 'Bathroom Guest' and treat them as a single location.
The timeline entries include environmental data in brackets: [L: value, H: value].
- L represents the average Light level in Lux.
- H represents the average Humidity percentage.
These values are averages for the specific time interval and are rounded to two significant figures.
To identify showers:
1. Consult the 'Sorted list of average Bathroom Humidity levels' in the Reference Humidity Data.
2. Identify the main cluster of lower values (the baseline/median range).
3. Infer 'showering' ONLY for visits that meet **all** of the following criteria:
- The Humidity [H] falls into a higher cluster separated from the baseline by a distinct numeric gap (missing integers in the sorted list).
- The duration of the visit is **at least 7 minutes**.
- The Light [L] level is **at least 1000 Lux**.
4. **Maximum one shower per day**: If multiple visits on the same day meet the above criteria, identify only the single event with the highest Humidity [H] as the shower.
"""
print(f"\n and is given the following system prompt:\n")
print(f'"{self.system_prompt}"')
def summarize_timelines(self, timelines, max_days=7):
"""
Combines multiple daily timelines into a single string for the LLM context.
Limits the output to the most recent 'max_days' to save tokens.
"""
summary_parts = []
all_dates = sorted(timelines.keys())
# SLICE: Keep only the last N days
# If there are fewer than max_days, this logic still works safely (takes all).
relevant_dates = all_dates[-max_days:]
if relevant_dates:
last_date = relevant_dates[-1]
for date in relevant_dates:
daily_text = timelines[date]
# Header formatting
if date == last_date:
header = f"### ACTIVITY LOG FOR: {date} (TODAY)"
else:
header = f"### ACTIVITY LOG FOR: {date}"
day_block = f"{header}\n{daily_text}"
summary_parts.append(day_block)
if len(all_dates) > len(relevant_dates):
print(f"Context Note: Truncated history. Sending last {len(relevant_dates)} days to AI (Total stored: {len(all_dates)} days).")
return "\n" + "\n\n".join(summary_parts)
def ask(self, question):
"""Sends the user question and timeline context to the LLM."""
try:
start_time = time.time()
response = client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": f"Here is the log:\n{self.context}\n\nQuestion: {question}"}
],
temperature=0.1,
)
end_time = time.time()
duration = end_time - start_time
print(f"Thought for {duration:.2f} seconds")
return response.choices[0].message.content
except Exception as e:
return f"Error: {e}"
def load_timeline_history():
"""Loads the timeline dictionary from a JSON file."""
if os.path.exists(TIMELINE_STORAGE_FILE):
try:
with open(TIMELINE_STORAGE_FILE, 'r') as f:
data = json.load(f)
# Return the specific deployment's history or empty dict
return data.get(DEPLOYMENT_ID, {})
except Exception as e:
print(f"Warning: Could not read timeline history: {e}")
return {}
def save_timeline_history(timelines):
"""Saves the updated timeline dictionary to a JSON file."""
# Load existing full file (to preserve other deployments if any)
full_data = {}
if os.path.exists(TIMELINE_STORAGE_FILE):
try:
with open(TIMELINE_STORAGE_FILE, 'r') as f:
full_data = json.load(f)
except: pass
# Update the specific deployment
full_data[DEPLOYMENT_ID] = timelines
with open(TIMELINE_STORAGE_FILE, 'w') as f:
json.dump(full_data, f, indent=4)
print(f"✅ Timeline history saved to {TIMELINE_STORAGE_FILE}")
# ---------------------------------------------------------
# MAIN EXECUTION
# ---------------------------------------------------------
if __name__ == "__main__":
# ---------------------------------------------------------
# 1. INITIALIZE & DETERMINE DATE RANGE
# ---------------------------------------------------------
timelines_cache = load_timeline_history()
now = datetime.datetime.now()
today_str = now.strftime("%Y-%m-%d")
# Defaults
start_date_str = today_str
api_minute_start = 0
previous_state_room = None
if timelines_cache:
sorted_dates = sorted(timelines_cache.keys())
last_recorded_date = sorted_dates[-1]
if last_recorded_date == today_str:
# CASE A: We have data for Today, but it might be incomplete. Resume it.
last_mins, last_room = parse_last_timeline_state(timelines_cache[last_recorded_date])
start_date_str = today_str
api_minute_start = last_mins
previous_state_room = last_room
print(f"Resuming Today's timeline from {minutes_to_time(api_minute_start)} (State: {previous_state_room})...")
else:
# CASE B: History ends on a previous day. We must fill the gap.
last_mins, last_room = parse_last_timeline_state(timelines_cache[last_recorded_date])
# Check if the last recorded day was effectively finished (e.g. ended after 11:55 PM)
if last_mins >= 1435:
# Sub-case B1: Last day is complete. Start fetching from the NEXT day at 00:00.
last_date_obj = datetime.datetime.strptime(last_recorded_date, "%Y-%m-%d")
next_day_obj = last_date_obj + datetime.timedelta(days=1)
start_date_str = next_day_obj.strftime("%Y-%m-%d")
api_minute_start = 0
# Carry over the room state (e.g., Sleeping) so the new day starts with context
previous_state_room = last_room
print(f"History on {last_recorded_date} is complete. Filling gap starting from {start_date_str}...")
else:
# Sub-case B2: Last day is partial/incomplete. Resume fetching from that specific day and minute.
start_date_str = last_recorded_date
api_minute_start = last_mins
previous_state_room = last_room
print(f"History on {last_recorded_date} is partial. Resuming from {minutes_to_time(api_minute_start)}...")
else:
# No history, fetch last 7 days
start_date = now - datetime.timedelta(days=7)
start_date_str = start_date.strftime("%Y-%m-%d")
print("No history found. Fetching last 7 days...")
# ---------------------------------------------------------
# 2. FETCH DATA
# ---------------------------------------------------------
wellnuo_token = get_wellnuo_credentials()
wellnuo_api = WellnuoAPI(wellnuo_token)
if not wellnuo_api: sys.exit("Failed to initialize API.")
# We want to fetch 15 minutes BEFORE the needed time to provide context for logic
BUFFER_MINUTES = 15
fetch_minute_start = max(0, api_minute_start - BUFFER_MINUTES)
# Call API with buffered start
sensor_data = wellnuo_api.get_sensor_matrix(start_date_str, today_str, minute_start=fetch_minute_start)
if not sensor_data or 'sensors' not in sensor_data:
print("Error: No valid sensor data received.")
sys.exit()
else:
print("\n✅ Received data between ", sensor_data.get('date', 'Unknown'), " and ", today_str," for the following sensors: ")
print(list(sensor_data['sensors'].keys()))
# ---------------------------------------------------------
# 3. PARSE DATA
# ---------------------------------------------------------
rows, cols = sensor_data['shape']
room_ids = sensor_data['room_ids']
room_names = [DEVICE_TO_ROOM_MAPPING.get(rid, str(rid)) for rid in room_ids]
dtype_map = {
'uint16': np.uint16,
'uint8': np.uint8,
'int16': np.int16,
'int32': np.int32,
'uint32': np.uint32,
'float': np.float32,
'float32': np.float32,
'float64': np.float64,
}
default_dtype = dtype_map.get(sensor_data.get('dtype', 'uint16'), np.uint16)
def unpack_sensor_to_df(sensor_key: str) -> pd.DataFrame:
if sensor_key not in sensor_data['sensors']:
return pd.DataFrame()
sensor_entry = sensor_data['sensors'][sensor_key]
blob = sensor_entry.get('data')
if not blob:
return pd.DataFrame()
# dtype per sensor if present, else fall back to overall dtype, else uint16
sensor_dtype_str = sensor_entry.get('dtype') or sensor_data.get('dtype')
numpy_dtype = dtype_map.get(str(sensor_dtype_str).lower(), default_dtype)
arr = np.frombuffer(blob, dtype=numpy_dtype)
# Get shape from API: [Rooms, Minutes]
num_rooms_api, num_minutes_api = sensor_data['shape']
# Safety: If API indicates 0 rooms, return empty
if num_rooms_api <= 0:
return pd.DataFrame()
# 1. RESHAPE: Restore the matrix to (Rooms, Minutes)
# We calculate the actual number of minutes based on the blob size
# to be robust against truncated data.
if arr.size % num_rooms_api != 0:
# If the blob is incomplete, trim to the nearest full column (room set)
usable_size = (arr.size // num_rooms_api) * num_rooms_api
arr = arr[:usable_size]
actual_minutes = arr.size // num_rooms_api
if actual_minutes == 0:
return pd.DataFrame()
# Reshape to [Rooms, Minutes]
arr = arr.reshape(num_rooms_api, actual_minutes)
# 2. TRANSPOSE: Flip to [Minutes, Rooms]
# This aligns the data so that columns match 'room_names'
arr = arr.T
# 3. DATAFRAME CREATION
# Now arr.shape[1] (columns) is equal to num_rooms_api.
# We must ensure room_names matches this length.
expected_cols = len(room_names)
actual_cols = arr.shape[1] # This is now the Room count
if actual_cols != expected_cols:
# If API metadata shape differs from the room_ids list length
min_cols = min(actual_cols, expected_cols)
arr = arr[:, :min_cols]
use_columns = room_names[:min_cols]
return pd.DataFrame(arr, columns=use_columns)
return pd.DataFrame(arr, columns=room_names)
master_light_df = unpack_sensor_to_df('light')
master_humid_df = unpack_sensor_to_df('humidity')
master_radar_df = unpack_sensor_to_df('radar')
if master_radar_df.empty:
print("No new data available from API.")
# Proceed to Agent execution with existing cache
else:
# ---------------------------------------------------------
# 4. GENERATE & MERGE TIMELINES
# ---------------------------------------------------------
# We need to handle potential multi-day responses or just the partial today
# Since API returns linear data, we just iterate.
current_data_minutes = len(master_radar_df)
start_date_obj = datetime.datetime.strptime(sensor_data['date'], '%Y-%m-%d')
chunk_start = 0
chunk_date = start_date_obj
# We need to map the 'fetch_minute_start' (which might be 885) to the data index 0
current_daily_offset = fetch_minute_start
while chunk_start < current_data_minutes:
date_key = chunk_date.strftime('%Y-%m-%d')
# Calculate remaining minutes in this day (0 to 1440)
mins_remaining_in_day = 1440 - current_daily_offset
# Define the slice for this specific day
chunk_end = min(chunk_start + mins_remaining_in_day, current_data_minutes)
day_radar = master_radar_df.iloc[chunk_start:chunk_end].reset_index(drop=True)
day_light = master_light_df.iloc[chunk_start:chunk_end].reset_index(drop=True)
day_humid = master_humid_df.iloc[chunk_start:chunk_end].reset_index(drop=True)
if not day_radar.empty:
# Determine where we actually want output to start for this specific day
# If this is the FIRST day being processed, we use the user's requested api_minute_start.
# If this is a subsequent day (00:00), the cutoff is 0.
if chunk_start == 0:
cutoff = api_minute_start
else:
cutoff = 0
# Generate Timeline
# Note: We pass 'current_daily_offset' so the timeline knows
# that index 0 corresponds to e.g., minute 885.
new_text, final_state = create_timeline(
radar_df=day_radar,
light_df=day_light,
humidity_df=day_humid,
start_minute_offset=current_daily_offset,
initial_state=previous_state_room,
output_cutoff_minute=cutoff,
date_label=date_key,
)
# Update State for the NEXT loop iteration (Fix 3)
previous_state_room = final_state
# Append or Overwrite
if date_key in timelines_cache and cutoff > 0:
# Partial update: Smart Merge
if new_text.strip():
# Use the new helper function
timelines_cache[date_key] = merge_timeline_chunks(
timelines_cache[date_key],
new_text
)
else:
# New day or full update
timelines_cache[date_key] = new_text
# Prepare for next loop (Next day starts at 00:00)
chunk_start = chunk_end
chunk_date += datetime.timedelta(days=1)
current_daily_offset = 0 # Subsequent days always start at min 0
save_timeline_history(timelines_cache)
# ---------------------------------------------------------
# 5. AGENT EXECUTION
# ---------------------------------------------------------
model_config = select_model_configuration()
print(f"\nInitializing {model_config['name']}...")
client = OpenAI(base_url=model_config['base_url'], api_key=model_config['api_key'])
humidity_context = get_timeline_humidity_profile(timelines_cache)
agent = ActivityAgent(timelines_cache, model_config['model_name'], extra_context=humidity_context)
print(f"\n--- Smart Agent Ready ({model_config['name']}) ---")
while True:
q = input("\nYou: ")
if q.lower() in ['q', 'exit']: break
print("Thinking...")
print(f"Agent: {agent.ask(q)}")