Initial AI python script

This commit is contained in:
David Vengerov 2025-12-20 17:05:07 -08:00
parent 3948ccf1c9
commit edd59b1655

993
new_smart_agent.py Normal file
View File

@ -0,0 +1,993 @@
import pandas as pd
import sys
import os
import time
from openai import OpenAI
import urllib.request
import requests
import datetime
import zipfile
import io
import math
import re
# ---------------------------------------------------------
# GLOBAL CONFIGURATION & CONSTANTS
# ---------------------------------------------------------
DEBUG = 0 # Debug printout levels. Set to 1 to see verbose output.
WELLNUO_USER = "dive"
# Mappings to translate hardware IDs to human-readable room names
DEVICE_TO_ROOM_MAPPING = {
720: "Bedroom",
719: "Bathroom Main",
716: "Kitchen",
663: "Office",
714: "Living Room",
718: "Bathroom Guest",
715: "Dining Room"
}
WELL_TO_ROOM_MAPPING = {
"470": "Living Room",
"473": "Bathroom Guest",
"475": "Bedroom",
"471": "Dining Room",
"474": "Bathroom Main",
"472": "Kitchen",
"431": "Office"
}
# Settings for converting radar sensor readings into a narrative timeline
LATCH_TIME_MINUTES = 15 # Time to hold a room state active during silence before switching
NOISE_THRESHOLD = 15 # Minimum signal strength to consider a room "active"
MIN_DURATION_BATHROOM_THRESHOLD = 2 # Minimum duration (mins) to keep a block; shorter blocks are merged (glitch removal)
MIN_DURATION_OTHER_THRESHOLD = 10 # Minimum duration (mins) to keep a block; shorter blocks are merged (glitch removal)
# Constants for Light Logic
LIGHT_THRESHOLD = 200 # Lux value above which a room is considered "Lit"
EVENING_START_MIN = 18 * 60 # 6:00 PM (Minute 1080)
MORNING_END_MIN = 6 * 60 # 6:00 AM (Minute 360)
# Define available LLM models.
MODEL_OPTIONS = {
"1": {
"name": "Local: Ollama (qwen3:8b)",
"type": "ollama",
"base_url": "http://localhost:11434/v1",
"api_key": "ollama",
"model_name": "qwen3:8b"
},
"2": {
"name": "Cloud: Groq (GPT-OSS-20B)",
"type": "cloud",
"base_url": "https://api.groq.com/openai/v1",
"api_key": "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",
"model_name": "openai/gpt-oss-20b"
},
"3": {
"name": "Cloud: Groq (GPT-OSS-120B)",
"type": "cloud",
"base_url": "https://api.groq.com/openai/v1",
"api_key": "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",
"model_name": "openai/gpt-oss-20b"
}
# "4": {
# "name": "Cloud: Alibaba (Qwen-Flash)",
# "type": "cloud",
# "base_url": "https://dashscope-intl.aliyuncs.com/compatible-mode/v1",
# "api_key": "sk-...",
# "model_name": "qwen-flash"
# }
}
# Global client variable (will be initialized after user selection)
client = None
# Helper function for time formatting
def minutes_to_time(minutes):
"""
Converts minutes from start of day (0-1440) to HH:MM AM/PM string.
"""
hours = minutes // 60
mins = minutes % 60
period = "AM" if hours < 12 else "PM"
if hours == 0: hours = 12
if hours > 12: hours -= 12
return f"{hours:02d}:{mins:02d} {period}"
# Helper function for 2 significant figures
def format_sig_figs(val):
if val is None or pd.isna(val) or val == 0:
return "0"
# Calculate the power of 10 to round to the second significant digit
try:
return f"{round(val, -int(math.floor(math.log10(abs(val)))) + 1):.0f}"
except:
return "0"
def get_timeline_humidity_profile(timelines):
"""
Parses the generated text timelines to extract a sorted, deduplicated list
of average humidity values (H) recorded during Bathroom visits.
"""
all_h_values = []
# Regex to find H value in a line containing "Bathroom"
# Logic: Look for 'Bathroom', then look for 'H:' followed by digits
pattern = re.compile(r"Bathroom.*H:\s*(\d+)")
for date, text in timelines.items():
for line in text.split('\n'):
# Only process lines describing bathroom visits
if "Bathroom" in line:
match = pattern.search(line)
if match:
try:
val = int(match.group(1))
all_h_values.append(val)
except ValueError:
pass
if not all_h_values:
return ""
# Deduplicate and sort
unique_sorted = sorted(list(set(all_h_values)))
return f"### REFERENCE HUMIDITY DATA\nSorted list of average Bathroom Humidity levels (H) observed during visits: {unique_sorted}"
# ---------------------------------------------------------
# AUTHENTICATION
# ---------------------------------------------------------
def get_wellnuo_credentials():
"""
Authenticates with the Wellnuo API and retrieves the session token.
Returns:
str: Access token if successful, None otherwise.
"""
url = "https://eluxnetworks.net/function/well-api/api"
# Payload matching API requirements
payload = {
"function": "credentials",
"user_name": WELLNUO_USER,
"ps": "D@v1d3",
"clientId": "001",
"nonce": "111"
}
print("Authenticating with Wellnuo API...")
try:
# requests.post with 'data=' sends application/x-www-form-urlencoded
response = requests.post(url, data=payload)
response.raise_for_status()
data = response.json()
# Attempt to locate token in standard response fields
token = data.get("access_token") or data.get("data", {}).get("token")
if token:
print("✅ Wellnuo authentication successful.")
return token
else:
print(f"❌ Wellnuo authentication failed: Token not found in response: {data}")
return None
except Exception as e:
print(f"❌ Error connecting to Wellnuo API: {e}")
return None
def preprocess_sensor_data(api_response, data_name="sensor"):
"""
Parses raw JSON sensor data (light, humidity, etc.) covering multiple days
and splits it into a dictionary of daily DataFrames.
Args:
api_response (dict): The JSON response from the API.
data_name (str): Label for debug prints (e.g., "light", "humidity").
Returns:
dict: Keys are date strings (e.g., "2025-12-15").
Values are DataFrames (Index=0..1439, Columns=Room Names, Values=Sensor Reading).
"""
if not api_response or 'chart_data' not in api_response:
print(f"Warning: Invalid or missing {data_name} data.")
return {}
# 1. Flatten the nested JSON into a list of records
all_records = []
for room_entry in api_response['chart_data']:
room_name = room_entry['name']
# room_entry['data'] is a list of [timestamp_str, value]
for ts, val in room_entry['data']:
all_records.append({
'timestamp': ts,
'value': val,
'room': room_name
})
if not all_records:
return {}
# 2. Create a Master DataFrame
df = pd.DataFrame(all_records)
# Use format='ISO8601' to handle timestamps that mix precision
try:
df['timestamp'] = pd.to_datetime(df['timestamp'], format='ISO8601')
except ValueError:
# Fallback if specific ISO format fails
df['timestamp'] = pd.to_datetime(df['timestamp'], format='mixed')
# Extract Date string (for grouping) and Minute of Day (for indexing)
df['date_str'] = df['timestamp'].dt.strftime('%Y-%m-%d')
df['minute'] = df['timestamp'].dt.hour * 60 + df['timestamp'].dt.minute
daily_dfs = {}
# 3. Group by Date and create individual DataFrames
for date_key, group in df.groupby('date_str'):
# Pivot: Index=Minute, Columns=Room, Values=Sensor Value
# We use pivot_table with aggfunc='max' to handle duplicate minutes if any
daily_df = group.pivot_table(index='minute', columns='room', values='value', aggfunc='max')
# Reindex to ensure full 0-1439 grid
daily_df = daily_df.reindex(range(1440), fill_value=0)
# Forward fill small gaps (optional) then fill NaNs
daily_df = daily_df.ffill(limit=5).fillna(0)
daily_dfs[date_key] = daily_df
return daily_dfs
def print_sensor_statistics(daily_sensor_dfs, sensor_type="Sensor", unit=""):
"""
Aggregates sensor data from all available days and prints percentiles for each room.
Percentiles shown:
- 0% to 90% in increments of 10%
- 90% to 100% in increments of 1%
Args:
daily_sensor_dfs (dict): Dictionary of daily DataFrames.
sensor_type (str): Name for the header (e.g., "HUMIDITY", "LIGHT").
unit (str): Unit suffix for the values (e.g., "%", " Lux").
"""
if not daily_sensor_dfs:
print(f"No {sensor_type} data available to analyze.")
return
print("\n========================================================")
print(f" {sensor_type.upper()} STATISTICS (Aggregated over all days) ")
print("========================================================")
# 1. Combine all daily DataFrames into one large DataFrame
combined_df = pd.concat(daily_sensor_dfs.values(), axis=0, ignore_index=True)
# 2. Iterate through each room (column)
for room in combined_df.columns:
# Extract the data series for this room
series = combined_df[room]
# CRITICAL: Filter out 0.0 values (missing data or "off" states)
valid_series = series[series > 0]
if valid_series.empty:
print(f"\nRoom: {room} - No valid data (>0) found.")
continue
print(f"\nRoom: {room}")
print("-" * 20)
# 3. Calculate Percentiles
# Generate list: 0.0, 0.1, ... 0.9
percentiles = [i / 10.0 for i in range(10)]
# Add fine-grained list: 0.91, 0.92, ... 1.0
percentiles.extend([i / 100.0 for i in range(91, 101)])
stats = valid_series.quantile(percentiles)
# 4. Print formatted results
for p, val in stats.items():
# Use round() to handle floating point precision issues (e.g. 0.1 * 100 = 9.999...)
label = f"{int(round(p * 100))}%"
print(f"{label:<5}: {val:.1f}{unit}")
# ---------------------------------------------------------
# CORE LOGIC: TIMELINE GENERATION
# ---------------------------------------------------------
def create_timeline(filename, light_df=None, humidity_df=None):
"""
Transforms raw radar sensor CSV data into a coherent, narrative timeline of user activity
for a single day.
This function implements a multi-stage algorithm to determine the most likely room
occupied by a person based on radar signal strength, while cross-referencing light
sensor data to filter out false positives during dark hours.
It also calculates average environmental conditions (Light and Humidity) for each
identified activity block.
Algorithm Overview:
-------------------
1. Data Preparation:
- Maps hardware Device IDs to human-readable Room Names.
- Pivots raw data into a minute-by-minute grid (0 to 1439 minutes).
2. State Machine & Signal Evaluation (The Core Loop):
- Iterates through every minute of the day.
- Light Sensor Logic (Dark Hours Rule): Between 6:00 PM and 6:00 AM, the algorithm
restricts the search space. Only rooms with active light (> 200 Lux) are considered
valid candidates.
* EXCEPTION: The "Bedroom" is always a valid candidate, even in the dark, to account for sleeping.
- Signal Selection: Determines which valid candidate room has the strongest radar signal.
- Rejection Tracking: Logs instances where a room had the strongest raw signal but was
ignored because it was dark (and therefore deemed a false positive or ghost signal).
- Heuristics: Applies logic for "Sleeping" states and "Latching" (holding the previous
state during short periods of silence to prevent flickering).
3. Iterative Post-Processing (Gap Filling):
- A cleaning loop runs (up to 10 times) to refine the timeline.
- Merges consecutive entries for the same room.
- Identifies "glitches" (durations shorter than 2 mins for Bathrooms or 5 mins for other rooms).
- "Absorbs" these glitches into the neighboring room with the strongest signal context.
4. Reporting:
- Prints a summary of "Light Rule Exclusions" (times where strong signals were ignored due to darkness).
5. Formatting & Environmental Analysis:
- Iterates through the final timeline blocks.
- Calculates the average Light (Lux) and Humidity (%) for the specific room and time interval.
- Returns the final formatted text timeline.
Args:
filename (str): Path to the CSV file containing radar data.
light_df (pd.DataFrame, optional): Preprocessed DataFrame containing light sensor readings
(Index=Minute, Columns=Room Names).
humidity_df (pd.DataFrame, optional): Preprocessed DataFrame containing humidity sensor readings
(Index=Minute, Columns=Room Names).
Returns:
str: A formatted string describing the timeline of events with environmental data.
"""
if 'DEBUG' in globals() and DEBUG > 0:
print(f"\nProcessing data from: {os.path.basename(filename)}...")
try:
df = pd.read_csv(filename)
except Exception as e:
return f"Error reading CSV: {e}"
# ---------------------------------------------------------
# 1. Data Preparation
# ---------------------------------------------------------
# Map Device IDs to Names
mapping = globals().get('DEVICE_TO_ROOM_MAPPING', globals().get('ROOM_MAPPING', {}))
df['device_id'] = df['device_id'].map(mapping).fillna(df['device_id'])
# Pivot Data (Minute-by-Minute Grid)
# Creates a matrix where Index is the minute of the day (0-1439) and Columns are Room Names.
df['minute'] = df.groupby('device_id').cumcount()
pivot_df = df.pivot(index='minute', columns='device_id', values='radar').fillna(0)
# Initialize State Machine variables
raw_log = []
rejection_log = [] # To store [minute, room_name] of signals ignored due to the Light Rule
current_state = "Out of House/Inactive"
start_time = 0
silence_counter = 0
last_known_room = "Unknown"
# ---------------------------------------------------------
# 2. State Machine Loop
# ---------------------------------------------------------
for minute, row in pivot_df.iterrows():
# --- LIGHT SENSOR LOGIC ---
# Default: Consider all rooms available in the radar data as candidates
candidate_rooms = row.index
# Check if we have light data and if we are in the "Dark Hours" (6pm - 6am)
is_dark_hours = (minute < MORNING_END_MIN) or (minute > EVENING_START_MIN)
if light_df is not None and not light_df.empty and is_dark_hours:
try:
if minute in light_df.index:
light_row = light_df.loc[minute]
# Find rooms where light is ON (> 200 Lux)
lit_rooms_index = light_row[light_row > LIGHT_THRESHOLD].index
# Intersect with radar rooms to ensure validity (room must exist in both datasets)
lit_rooms = lit_rooms_index.intersection(row.index)
# LOGIC:
# If at least one room is lit, we filter the search space.
# If NO rooms are lit (e.g., user is sleeping or sitting in the dark),
# we fall back to standard behavior (all rooms are candidates).
if not lit_rooms.empty:
# Start with the lit rooms
allowed = set(lit_rooms)
# EXCEPTION: "Bedroom" is always allowed, even if dark (for sleeping).
if "Bedroom" in row.index:
allowed.add("Bedroom")
# Convert back to index for pandas selection
candidate_rooms = list(allowed)
except Exception as e:
if DEBUG > 0: print(f"Light logic error at min {minute}: {e}")
pass
# --- REJECTION TRACKING ---
# Determine which room had the strongest signal BEFORE applying light filters.
raw_max_signal = row.max()
raw_best_room = str(row.idxmax())
# If the raw winner is strong enough to be real...
if raw_max_signal > NOISE_THRESHOLD:
# ...but it is NOT in our filtered candidate list (it was dark)...
if raw_best_room not in candidate_rooms:
# ...then it was rejected due to the light rule. Log it for reporting.
rejection_log.append({
'minute': minute,
'room': raw_best_room
})
# --- Signal Evaluation (Using Candidate Rooms) ---
# We only look for the max signal among the candidate (Lit Rooms + Bedroom)
if len(candidate_rooms) > 0:
max_signal = row[candidate_rooms].max()
if pd.notna(max_signal):
strongest_room = str(row[candidate_rooms].idxmax())
else:
max_signal = 0
strongest_room = "Unknown"
else:
max_signal = 0
strongest_room = "Unknown"
# Check against Noise Threshold
if max_signal > NOISE_THRESHOLD:
active_location = strongest_room
silence_counter = 0
last_known_room = active_location
else:
active_location = None
silence_counter += 1
# --- Heuristics / Logic Tree ---
effective_location = current_state
if active_location:
effective_location = active_location
else:
# Handle periods of silence/inactivity
is_night_time = (minute < 480) or (minute > 1320)
# Heuristic: If last known room was Bedroom and it's night, assume Sleeping.
if last_known_room == "Bedroom" and is_night_time:
effective_location = "Bedroom (Sleeping)"
silence_counter = 0
# Heuristic: Latching. If silence is short, assume user is still in the last room.
elif silence_counter < LATCH_TIME_MINUTES and last_known_room != "Unknown":
effective_location = last_known_room
else:
effective_location = "Out of House/Inactive"
# --- State Change Detection ---
if effective_location != current_state:
if current_state != "Unknown":
# Log the completed block
raw_log.append({
"room": current_state,
"start": start_time,
"end": minute,
"duration": minute - start_time
})
current_state = effective_location
start_time = minute
# Add final block after loop finishes
duration = len(pivot_df) - start_time
raw_log.append({"room": current_state, "start": start_time, "end": len(pivot_df), "duration": duration})
# ---------------------------------------------------------
# 3. Iterative Cleaning (Merge & Gap Filling)
# ---------------------------------------------------------
# This loop repeatedly merges consecutive identical blocks and removes "glitches"
# (blocks that are too short to be significant).
iteration = 0
max_iterations = 10
while iteration < max_iterations:
# A. Merge Step: Combine consecutive blocks of the same room
merged_log = []
if raw_log:
current_block = raw_log[0]
for next_block in raw_log[1:]:
if next_block['room'] == current_block['room']:
# Extend the current block
current_block['end'] = next_block['end']
current_block['duration'] = current_block['end'] - current_block['start']
else:
merged_log.append(current_block)
current_block = next_block
merged_log.append(current_block)
raw_log = merged_log
# B. Scan & Fix Step: Remove short duration glitches
changes_made = False
for i in range(len(raw_log)):
block = raw_log[i]
# Dynamic Threshold: Bathrooms allowed to be short (2 mins), others need 5 mins.
if "Bathroom" in block['room']:
dynamic_threshold = 2
else:
dynamic_threshold = 5
if block['duration'] < dynamic_threshold:
# Identify neighbors
prev_block = raw_log[i-1] if i > 0 else None
next_block = raw_log[i+1] if i < len(raw_log) - 1 else None
winner_room = None
# Compare average signal strength of neighbors during the glitch period
if prev_block and next_block:
room_a = prev_block['room']
room_b = next_block['room']
sig_a = -1
if room_a in pivot_df.columns:
sig_a = pivot_df.loc[block['start']:block['end']-1, room_a].mean()
sig_b = -1
if room_b in pivot_df.columns:
sig_b = pivot_df.loc[block['start']:block['end']-1, room_b].mean()
if sig_a >= sig_b: winner_room = room_a
else: winner_room = room_b
elif prev_block:
winner_room = prev_block['room']
elif next_block:
winner_room = next_block['room']
# Apply the fix
if winner_room and winner_room != block['room']:
block['room'] = winner_room
changes_made = True
# If no changes were made, the timeline is stable.
if not changes_made:
break
iteration += 1
# ---------------------------------------------------------
# 4. Print Rejection Log (Merged & Filtered)
# ---------------------------------------------------------
# This section processes the 'rejection_log' to print a human-readable summary
# of times where radar signals were ignored because the room was dark.
if rejection_log:
# Extract date from filename (e.g., "21_2025-12-15_by_minute_rc_data.csv" -> "2025-12-15")
try:
date_str = os.path.basename(filename).split('_')[1]
except:
date_str = "Unknown Date"
# Sort by minute just in case
rejection_log.sort(key=lambda x: x['minute'])
curr_rej = rejection_log[0]
start_m = curr_rej['minute']
end_m = start_m
room = curr_rej['room']
# Iterate to merge consecutive rejected minutes into blocks
for i in range(1, len(rejection_log)):
next_rej = rejection_log[i]
# Check if consecutive minute AND same room
if (next_rej['minute'] == end_m + 1) and (next_rej['room'] == room):
end_m = next_rej['minute']
else:
# Block ended. Calculate duration
duration = end_m - start_m + 1
# Determine threshold: 2 mins for Bathroom, 5 mins for others
threshold = 2 if "Bathroom" in room else 5
# Print only if duration meets the threshold (ignore short noise)
if duration >= threshold:
start_s = minutes_to_time(start_m)
end_s = minutes_to_time(end_m + 1)
print(f"On {date_str}, between {start_s} and {end_s}, {room} had the highest radar presence signal but had light sensor reading below threshold for dark hours of the day.")
# Reset for next block
curr_rej = next_rej
start_m = curr_rej['minute']
end_m = start_m
room = curr_rej['room']
# Process the final block after loop finishes
duration = end_m - start_m + 1
threshold = 2 if "Bathroom" in room else 5
if duration >= threshold:
start_s = minutes_to_time(start_m)
end_s = minutes_to_time(end_m + 1)
print(f"On {date_str}, between {start_s} and {end_s}, {room} had the highest radar presence signal but had light sensor reading below threshold for dark hours of the day.")
# ---------------------------------------------------------
# 5. Format Output & Environmental Analysis
# ---------------------------------------------------------
final_text = []
for entry in raw_log:
start_s = minutes_to_time(entry['start'])
end_s = minutes_to_time(entry['end'])
room_name = entry['room']
start_idx = entry['start']
end_idx = entry['end']
env_parts = []
# --- Process Light (L) ---
if light_df is not None and not light_df.empty and room_name in light_df.columns:
segment = light_df.loc[start_idx : end_idx - 1, room_name]
if not segment.empty:
avg_l = segment.mean()
env_parts.append(f"L:{format_sig_figs(avg_l)}")
# --- Process Humidity (H) ---
if humidity_df is not None and not humidity_df.empty and room_name in humidity_df.columns:
segment = humidity_df.loc[start_idx : end_idx - 1, room_name]
if not segment.empty:
avg_h = segment.mean()
env_parts.append(f"H:{format_sig_figs(avg_h)}")
# Construct compact tag: [L:1600, H:22]
env_tag = f" [{', '.join(env_parts)}]" if env_parts else ""
final_text.append(
f"- {start_s} to {end_s} ({entry['duration']} mins): {room_name}{env_tag}"
)
return "\n".join(final_text)
# ---------------------------------------------------------
# HELPER: AI MODEL SELECTION
# ---------------------------------------------------------
def is_ollama_running():
"""Checks if Ollama is running on the default local port."""
try:
with urllib.request.urlopen("http://localhost:11434", timeout=0.5) as response:
return response.status == 200
except:
return False
def select_model_configuration():
"""Interactively select the model provider via CLI."""
print("\n--- SELECT AI MODEL ---")
available_keys = []
for key, config in MODEL_OPTIONS.items():
# Check availability for local models
if config['type'] == 'ollama':
if is_ollama_running():
print(f"[{key}] {config['name']}")
available_keys.append(key)
else:
continue
else:
# Always show cloud providers
print(f"[{key}] {config['name']}")
available_keys.append(key)
if not available_keys:
print("Error: No models available. (Ollama is offline and no cloud keys configured?)")
sys.exit()
while True:
choice = input("\nSelect Model # (or 'q'): ")
if choice.lower() == 'q': sys.exit()
if choice in available_keys:
return MODEL_OPTIONS[choice]
print("Invalid selection.")
# ---------------------------------------------------------
# HELPER: FILE EXTRACTION
# ---------------------------------------------------------
def extract_csv(api_response):
"""
Handles the API response. If it's a ZIP file, extracts CSVs.
Returns a list of file paths and a list of extracted dates.
"""
# Check for ZIP Magic Bytes (PK...)
if api_response.content.startswith(b'PK'):
print(f"✅ Received ZIP file. Extracting and saving individual files from it...\n")
# 1. Load binary content
zip_buffer = io.BytesIO(api_response.content)
# 2. Define extraction path
extract_folder = os.path.join(os.getcwd(), "data_downloads")
os.makedirs(extract_folder, exist_ok=True)
filename_list = []
extracted_dates = []
# 3. Open and Extract
with zipfile.ZipFile(zip_buffer) as z:
file_list = z.namelist()
for filename in file_list:
if filename.endswith('.csv'):
z.extract(filename, extract_folder)
csv_file_path = os.path.join(extract_folder, filename)
if DEBUG > 0:
print(f"Saved: {csv_file_path}")
filename_list.append(csv_file_path)
# Extract date from filename (e.g., "21_2025-12-15_by_minute_rc_data.csv")
# Split by '_' and take index 1
date_part = filename.split('_')[1]
extracted_dates.append(date_part)
if len(filename_list) > 0:
return filename_list, extracted_dates
else:
return "Error: could not extract the list of csv file names."
else:
# Fallback for JSON error messages
try:
return api_response.json()
except:
return f"Error: Unexpected response format. Start of content: {api_response.content[:20]}"
# ---------------------------------------------------------
# CLASS: WELLNUO API WRAPPER
# ---------------------------------------------------------
class WellnuoAPI:
def __init__(self, api_token):
self.api_token = api_token
if not self.api_token:
return "Error: No API Token available."
self.url = "https://eluxnetworks.net/function/well-api/api"
def download(self, start_date_str, end_date_str):
"""Downloads radar data as ZIP file for a date range."""
payload = {
"name": "download",
"user_name": WELLNUO_USER,
"token": self.api_token,
"deployment_id": "21",
"date_from": start_date_str,
"date_to": end_date_str,
"group_by": "by_minute_rd",
"re_create": "false",
"radar_part": "s28"
}
try:
print(f"\nCalling Wellnuo download() API function to get radar sensor data between {start_date_str} and {end_date_str}...")
response = requests.get(self.url, params=payload)
response.raise_for_status()
return response
except Exception as e:
return f"API Error: {e}"
def get_sensor_data(self, start_date_str, end_date_str, sensor_type):
"""
Retrieves sensor data "light" or "humidity" for a specific date.
Matches the 'get_sensor_data_by_deployment_id' API function shown in Postman.
"""
payload = {
"function": "get_sensor_data_by_deployment_id",
"user_name": WELLNUO_USER,
"token": self.api_token,
"sensor": sensor_type,
"deployment_id": "21",
"data_type": "ML",
"radar_part": "s28",
"date": start_date_str,
"to_date": end_date_str,
}
try:
print(f"\n--- Calling Wellnuo get_sensor_data_by_deployment_id() API function to get {sensor_type} sensor data between {start_date_str} and {end_date_str}...")
response = requests.post(self.url, data=payload)
response.raise_for_status()
return response.json()
except Exception as e:
return f"API Error: {e}"
def get_presence_data(self, start_date_str, end_date_str):
"""Retrieves presence data (Z-graph)."""
payload = {
"function": "get_presence_data",
"user_name": WELLNUO_USER,
"token": self.api_token,
"deployment_id": "21",
"date": start_date_str,
"filter": "6",
"data_type": "z-graph",
"to_date": end_date_str,
"refresh": "0"
}
try:
print(f"\n--- Calling Wellnuo get_presence_data() API function for start_date = {start_date_str}, end_date = {end_date_str} ---")
response = requests.post(self.url, data=payload)
response.raise_for_status()
return response.json()
except Exception as e:
return f"API Error: {e}"
# ---------------------------------------------------------
# CLASS: AI INTERPRETER AGENT
# ---------------------------------------------------------
class ActivityAgent:
def __init__(self, timelines_dict, model_name, extra_context=""):
self.context = self.summarize_timelines(timelines_dict)
# Append the extra context (humidity list) to the end of the log
if extra_context:
self.context += f"\n\n{extra_context}"
print("AI agent received the following context: ")
print(self.context)
self.model_name = model_name
self.system_prompt = """
You are an expert Health Data Analyst.
You are receiving a timeline of activities of a person in a house.
Answer user's question strictly based on the provided timeline of activities.
If the user mentions 'bathroom' without specifying 'Main' or 'Guest', you must combine the data from both 'Bathroom Main'
and 'Bathroom Guest' and treat them as a single location.
The timeline entries include environmental data in brackets: [L: value, H: value].
- L represents the average Light level in Lux.
- H represents the average Humidity percentage.
These values are averages for the specific time interval and are rounded to two significant figures.
To identify showers:
1. Consult the 'Sorted list of average Bathroom Humidity levels' in the Reference Humidity Data.
2. Identify the main cluster of lower values (the baseline/median range).
3. Infer 'showering' ONLY for visits that meet **all** of the following criteria:
- The Humidity [H] falls into a higher cluster separated from the baseline by a distinct numeric gap (missing integers in the sorted list).
- The duration of the visit is **at least 7 minutes**.
- The Light [L] level is **at least 1000 Lux**.
4. **Maximum one shower per day**: If multiple visits on the same day meet the above criteria, identify only the single event with the highest Humidity [H] as the shower.
"""
print(f"\n and is given the following system prompt:\n")
print(f'"{self.system_prompt}"')
def summarize_timelines(self, timelines):
"""Combines multiple daily timelines into a single string for the LLM context."""
summary_parts = []
sorted_dates = sorted(timelines.keys())
if sorted_dates:
last_date = sorted_dates[-1]
for date in sorted_dates:
daily_text = timelines[date]
# Header formatting
if date == last_date:
header = f"### ACTIVITY LOG FOR: {date} (TODAY)"
else:
header = f"### ACTIVITY LOG FOR: {date}"
day_block = f"{header}\n{daily_text}"
summary_parts.append(day_block)
return "\n" + "\n\n".join(summary_parts)
def ask(self, question):
"""Sends the user question and timeline context to the LLM."""
try:
start_time = time.time()
response = client.chat.completions.create(
model=self.model_name,
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": f"Here is the log:\n{self.context}\n\nQuestion: {question}"}
],
temperature=0.1,
)
end_time = time.time()
duration = end_time - start_time
print(f"Thought for {duration:.2f} seconds")
return response.choices[0].message.content
except Exception as e:
return f"Error: {e}"
# ---------------------------------------------------------
# MAIN EXECUTION
# ---------------------------------------------------------
if __name__ == "__main__":
# Initialize Wellnuo API
wellnuo_token = get_wellnuo_credentials()
wellnuo_api = WellnuoAPI(wellnuo_token)
# Determine Date Range (Last 7 Days)
now = datetime.datetime.now()
today_str = now.strftime("%Y-%m-%d")
lookback = 7
start_date = now - datetime.timedelta(days=lookback)
start_date_str = start_date.strftime("%Y-%m-%d")
# Download Light Data
start_time = time.time()
light_data_dict = wellnuo_api.get_sensor_data(start_date_str, today_str, sensor_type="light")
end_time = time.time()
duration = end_time - start_time
print(f"Spent {duration:.2f} seconds obtaining light data from Wellnuo API\n")
# Process light data into a Dictionary: {'2025-12-09': df, '2025-12-10': df...}
daily_light_dfs = preprocess_sensor_data(light_data_dict)
# print_sensor_statistics(daily_light_dfs, sensor_type="Light", unit=" Lux")
# Download Humidity Data
start_time = time.time()
humidity_data_dict = wellnuo_api.get_sensor_data(start_date_str, today_str, sensor_type="humidity")
end_time = time.time()
duration = end_time - start_time
print(f"Spent {duration:.2f} seconds obtaining humidity data from Wellnuo API\n")
# Process humidty data into a Dictionary: {'2025-12-09': df, '2025-12-10': df...}
daily_humidity_dfs = preprocess_sensor_data(humidity_data_dict)
# print_sensor_statistics(daily_humidity_dfs, sensor_type="Humidity", unit="%")
# Download Radar Data
start_time = time.time()
zipped_radar_data = wellnuo_api.download(start_date_str, today_str)
end_time = time.time()
duration = end_time - start_time
print(f"Spent {duration:.2f} seconds obtaining radar data from Wellnuo API\n")
# Extract and Process CSV radar files
filename_list, date_list = extract_csv(zipped_radar_data)
# Generate Timelines
print(f"Creating timelines for dates between {date_list[0]} and {date_list[-1]}\n")
timelines = {}
for filename, date in zip(filename_list, date_list):
# Retrieve the specific light DataFrame for this date (if it exists)
daily_light_df = daily_light_dfs.get(date)
daily_humidity_df = daily_humidity_dfs.get(date)
# Pass the specific day's light data to the timeline creator
timelines[date] = create_timeline(filename, light_df=daily_light_df, humidity_df=daily_humidity_df)
# Select AI Model
model_config = select_model_configuration()
# Initialize OpenAI Client
print(f"\nInitializing {model_config['name']}...")
client = OpenAI(
base_url=model_config['base_url'],
api_key=model_config['api_key']
)
# Generate the humidity profile string
humidity_context = get_timeline_humidity_profile(timelines)
# Start Agent
agent = ActivityAgent(timelines, model_config['model_name'], extra_context=humidity_context)
print(f"\n--- Smart Agent Ready ({model_config['name']}) ---")
# Interactive Loop
while True:
q = input("\nYou: ")
if q.lower() in ['q', 'exit']: break
print("Thinking...")
print(f"Agent: {agent.ask(q)}")