import pandas as pd import sys import os import time from openai import OpenAI import urllib.request import requests import datetime import json import math import re import numpy as np import msgpack # --------------------------------------------------------- # GLOBAL CONFIGURATION & CONSTANTS # --------------------------------------------------------- DEBUG = 0 # Debug printout levels. Set to 1 to see verbose output. USE_LOCAL_CACHE = False # Set to True to read from files, False to call API PRINT_PRESENCE = True # Flag to print Presence (P) values in the timeline WELLNUO_USER = "dive" DEPLOYMENT_ID = "21" TIMELINE_STORAGE_FILE = "timeline_history.json" # Mappings to translate hardware IDs to human-readable room names DEVICE_TO_ROOM_MAPPING = { 720: "Bedroom", 719: "Bathroom Main", 716: "Kitchen", 663: "Office", 714: "Living Room", 718: "Bathroom Guest", 715: "Dining Room" } WELL_TO_ROOM_MAPPING = { "470": "Living Room", "473": "Bathroom Guest", "475": "Bedroom", "471": "Dining Room", "474": "Bathroom Main", "472": "Kitchen", "431": "Office" } # Settings for converting radar sensor readings into a narrative timeline BASELINE_PRESENCE = 20 # Presence radar levels below this level indicates absense LATCH_TIME_MINUTES = 15 # Time to hold a room state active during silence before switching MIN_DURATION_BATHROOM_THRESHOLD = 2 # Minimum duration (mins) to keep a block; shorter blocks are merged (glitch removal) MIN_DURATION_OTHER_THRESHOLD = 10 # Minimum duration (mins) to keep a block; shorter blocks are merged (glitch removal) # Constants for Light Logic LIGHT_THRESHOLD = 200 # Lux value above which a room is considered "Lit" EVENING_START_MIN = 18 * 60 # 6:00 PM (Minute 1080) MORNING_END_MIN = 6 * 60 # 6:00 AM (Minute 360) # Define available LLM models. MODEL_OPTIONS = { "1": { "name": "Local: Ollama (qwen3:8b)", "type": "ollama", "base_url": "http://localhost:11434/v1", "api_key": "ollama", "model_name": "qwen3:8b" }, "2": { "name": "Cloud: Groq (GPT-OSS-20B)", "type": "cloud", "base_url": "https://api.groq.com/openai/v1", "api_key": "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", "model_name": "openai/gpt-oss-20b" }, "3": { "name": "Cloud: Groq (GPT-OSS-120B)", "type": "cloud", "base_url": "https://api.groq.com/openai/v1", "api_key": "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", "model_name": "openai/gpt-oss-120b" } # "4": { # "name": "Cloud: Alibaba (Qwen-Flash)", # "type": "cloud", # "base_url": "https://dashscope-intl.aliyuncs.com/compatible-mode/v1", # "api_key": "sk-...", # "model_name": "qwen-flash" # } } # Global client variable (will be initialized after user selection) client = None # Helper function for time formatting def minutes_to_time(minutes): minutes = minutes % 1440 # Normalize 1440 to 0 hours = minutes // 60 mins = minutes % 60 period = "AM" if hours < 12 else "PM" if hours == 0: hours = 12 if hours > 12: hours -= 12 return f"{hours:02d}:{mins:02d} {period}" # Helper function for 2 significant figures def format_sig_figs(val): if val is None or pd.isna(val) or val == 0: return "0" # Calculate the power of 10 to round to the second significant digit try: return f"{round(val, -int(math.floor(math.log10(abs(val)))) + 1):.0f}" except: return "0" def get_timeline_humidity_profile(timelines): """ Parses the generated text timelines to extract a sorted, deduplicated list of average humidity values (H) recorded during Bathroom visits. """ all_h_values = [] # Regex to find H value in a line containing "Bathroom" # Logic: Look for 'Bathroom', then look for 'H:' followed by digits pattern = re.compile(r"Bathroom.*H:\s*(\d+)") for date, text in timelines.items(): for line in text.split('\n'): # Only process lines describing bathroom visits if "Bathroom" in line: match = pattern.search(line) if match: try: val = int(match.group(1)) all_h_values.append(val) except ValueError: pass if not all_h_values: return "" # Deduplicate and sort unique_sorted = sorted(list(set(all_h_values))) return f"### REFERENCE HUMIDITY DATA\nSorted list of average Bathroom Humidity levels (H) observed during visits: {unique_sorted}" # --------------------------------------------------------- # AUTHENTICATION # --------------------------------------------------------- def get_wellnuo_credentials(): """ Authenticates with the Wellnuo API and retrieves the session token. Returns: str: Access token if successful, None otherwise. """ url = "https://eluxnetworks.net/function/well-api/api" # Payload matching API requirements payload = { "function": "credentials", "user_name": WELLNUO_USER, "ps": "D@v1d3", "clientId": "001", "nonce": "111" } print("Authenticating with Wellnuo API...") try: # requests.post with 'data=' sends application/x-www-form-urlencoded response = requests.post(url, data=payload) response.raise_for_status() data = response.json() # Attempt to locate token in standard response fields token = data.get("access_token") or data.get("data", {}).get("token") if token: print("✅ Wellnuo authentication successful.") return token else: print(f"❌ Wellnuo authentication failed: Token not found in response: {data}") return None except Exception as e: print(f"❌ Error connecting to Wellnuo API: {e}") return None def print_sensor_statistics(daily_sensor_dfs, sensor_type="Sensor", unit=""): """ Aggregates sensor data from all available days and prints percentiles for each room. Percentiles shown: - 0% to 90% in increments of 10% - 90% to 100% in increments of 1% Args: daily_sensor_dfs (dict): Dictionary of daily DataFrames. sensor_type (str): Name for the header (e.g., "HUMIDITY", "LIGHT"). unit (str): Unit suffix for the values (e.g., "%", " Lux"). """ if not daily_sensor_dfs: print(f"No {sensor_type} data available to analyze.") return print("\n========================================================") print(f" {sensor_type.upper()} STATISTICS (Aggregated over all days) ") print("========================================================") # 1. Combine all daily DataFrames into one large DataFrame combined_df = pd.concat(daily_sensor_dfs.values(), axis=0, ignore_index=True) # 2. Iterate through each room (column) for room in combined_df.columns: # Extract the data series for this room series = combined_df[room] # CRITICAL: Filter out 0.0 values (missing data or "off" states) valid_series = series[series > 0] if valid_series.empty: print(f"\nRoom: {room} - No valid data (>0) found.") continue print(f"\nRoom: {room}") print("-" * 20) # 3. Calculate Percentiles # Generate list: 0.0, 0.1, ... 0.9 percentiles = [i / 10.0 for i in range(10)] # Add fine-grained list: 0.91, 0.92, ... 1.0 percentiles.extend([i / 100.0 for i in range(91, 101)]) stats = valid_series.quantile(percentiles) # 4. Print formatted results for p, val in stats.items(): # Use round() to handle floating point precision issues (e.g. 0.1 * 100 = 9.999...) label = f"{int(round(p * 100))}%" print(f"{label:<5}: {val:.1f}{unit}") def print_presence_statistics(filename): """ Calculates and prints the percentiles of the presence (radar) level in each room for a particular CSV file. """ print(f"\n========================================================") print(f" PRESENCE STATISTICS (File: {os.path.basename(filename)}) ") print(f"========================================================") try: df = pd.read_csv(filename) # Map Device IDs to Names mapping = globals().get('DEVICE_TO_ROOM_MAPPING', {}) df['device_id'] = df['device_id'].map(mapping).fillna(df['device_id']) # Get unique rooms rooms = df['device_id'].unique() for room in rooms: # Filter data for this room room_data = df[df['device_id'] == room]['radar'] # Filter out 0/noise for meaningful statistics if desired, # or keep all to see true distribution including silence. # Here we filter > 0 to see the strength of "active" signals. valid_series = room_data[room_data > 0] if valid_series.empty: print(f"\nRoom: {room} - No valid presence data (>0) found.") continue print(f"\nRoom: {room}") print("-" * 20) # Calculate Percentiles # 0% to 90% in increments of 10%, then 91-100% percentiles = [i / 10.0 for i in range(10)] percentiles.extend([i / 100.0 for i in range(91, 101)]) stats = valid_series.quantile(percentiles) for p, val in stats.items(): label = f"{int(round(p * 100))}%" print(f"{label:<5}: {val:.1f}") except Exception as e: print(f"Error calculating presence statistics: {e}") # --------------------------------------------------------- # CORE LOGIC: TIMELINE GENERATION # --------------------------------------------------------- def create_timeline( light_df=None, humidity_df=None, radar_df=None, start_minute_offset=0, initial_state=None, output_cutoff_minute=0, date_label=None, ): """ Transforms raw radar sensor CSV data into a coherent, narrative timeline of user activity for a single day. This function implements a multi-stage algorithm to determine the most likely room occupied by a person based on radar signal strength, while cross-referencing light sensor data to filter out false positives during dark hours. It also calculates average environmental conditions (Light and Humidity) for each identified activity block. Algorithm Overview: ------------------- 1. Data Preparation: - Maps hardware Device IDs to human-readable Room Names. - Pivots raw data into a minute-by-minute grid (0 to 1439 minutes). 2. State Machine & Signal Evaluation (The Core Loop): - Iterates through every minute of the day. - Bathroom Override Logic: A person is considered to be in the bathroom if either: a) The light is not on (< 200 Lux), has the strongest radar presence signal of all rooms, and that signal is > BASELINE_PRESENCE. b) It falls within a pre-calculated "Sandwich Rule" block (a period of High Light > 1000 Lux & possible presence > BASELINE_PRESENCE, strictly bookended by minutes with Light < 200 Lux). - Standard Light Sensor Logic (Dark Hours Rule): If no override applies, between 6:00 PM and 6:00 AM, the algorithm restricts the search space. Only rooms with active light (> 200 Lux) are considered valid candidates. * EXCEPTION: The "Bedroom" is always a valid candidate, even in the dark, to account for sleeping. - Signal Selection: Determines which valid candidate room has the strongest radar signal. - Rejection Tracking: Logs instances where a room had the strongest raw signal (> BASELINE_PRESENCE) but was ignored because it was dark (and therefore deemed a false positive or ghost signal). - Heuristics: Applies logic for "Sleeping" states and "Latching" (holding the previous state during short periods of silence to prevent flickering). 3. Iterative Post-Processing (Gap Filling): - A cleaning loop runs (up to 10 times) to refine the timeline. - Merges consecutive entries for the same room. - Identifies "glitches" (durations shorter than 2 mins for Bathrooms or 5 mins for other rooms). - "Absorbs" these glitches into the neighboring room with the strongest signal context. 4. Reporting: - Prints a summary of "Light Rule Exclusions" (times where strong signals were ignored due to darkness). 5. Formatting & Environmental Analysis: - Iterates through the final timeline blocks. - Calculates the average Light (Lux) and Humidity (%) for the specific room and time interval. - Returns the final formatted text timeline. Args: filename (str): Path to the CSV file containing radar data. light_df (pd.DataFrame, optional): Preprocessed DataFrame containing light sensor readings (Index=Minute, Columns=Room Names). humidity_df (pd.DataFrame, optional): Preprocessed DataFrame containing humidity sensor readings (Index=Minute, Columns=Room Names). Returns: tuple: (timeline_text_string, final_state_room_string) """ if radar_df is not None: pivot_df = radar_df pivot_df.index = range(0, len(pivot_df)) else: return "No Data", "Unknown" if light_df is not None and getattr(light_df, "empty", False): light_df = None if humidity_df is not None and getattr(humidity_df, "empty", False): humidity_df = None # 1. Data Preparation (Sandwich Rule) forced_condition_b_minutes = set() df_len = len(pivot_df) if light_df is not None and not light_df.empty: bathrooms_list = [c for c in pivot_df.columns if "Bathroom" in c] for bath in bathrooms_list: if bath not in light_df.columns: continue l_series = light_df[bath].values p_series = pivot_df[bath].values # Find high blocks is_high = (l_series > 1000) & (p_series > BASELINE_PRESENCE) in_block = False block_start = -1 for i in range(df_len): val = is_high[i] if val and not in_block: in_block = True block_start = i elif not val and in_block: block_end = i - 1 in_block = False valid_pre = True # Check neighbor in current chunk if block_start > 0 and l_series[block_start - 1] >= 200: valid_pre = False valid_post = True if i < df_len and l_series[i] >= 200: valid_post = False if valid_pre and valid_post: for k in range(block_start, block_end + 1): forced_condition_b_minutes.add((k + start_minute_offset, bath)) # Initialize State Machine raw_log = [] rejection_log = [] current_state = initial_state if initial_state else "Out of House/Inactive" # Track where the current logic block started. # If we are in a buffer zone, this might get reset once we cross the output_cutoff_minute. current_block_start_time = start_minute_offset start_time = start_minute_offset silence_counter = 0 last_known_room = initial_state if initial_state else "Unknown" # --------------------------------------------------------- # 2. STATE MACHINE LOOP (NumPy Optimized) # --------------------------------------------------------- # Pre-computation for speed radar_vals = pivot_df.values light_vals = light_df.values if light_df is not None else None columns = pivot_df.columns.tolist() col_name_to_idx = {name: i for i, name in enumerate(columns)} # Pre-identify specific room indices to avoid string matching in the loop bathroom_indices = [i for i, name in enumerate(columns) if "Bathroom" in name] bedroom_idx = col_name_to_idx.get("Bedroom") # Loop for i in range(len(pivot_df)): # Access row data via integer index (Fast) row_array = radar_vals[i] light_array = light_vals[i] if light_vals is not None else None minute_of_day = i + start_minute_offset # 1. Raw Signal Stats raw_max_signal = row_array.max() # raw_best_room gets the name of the room with max signal raw_best_room = columns[row_array.argmax()] if raw_max_signal > 0 else "Unknown" # 2. Sandwich / Bathroom Override Logic forced_bathroom = None for bath_idx in bathroom_indices: bath_name = columns[bath_idx] p_val = row_array[bath_idx] l_val = light_array[bath_idx] if light_array is not None else 0 # Condition A: Dark, Best Signal is here, Strong Signal cond_a = (l_val < 200) and (raw_best_room == bath_name) and (p_val > BASELINE_PRESENCE) # Condition B: Pre-calculated Sandwich cond_b = (minute_of_day, bath_name) in forced_condition_b_minutes if cond_a or cond_b: forced_bathroom = bath_name break if forced_bathroom: active_location = forced_bathroom silence_counter = 0 last_known_room = active_location else: # 3. Candidate Selection (Light Logic) # Start with all column indices as candidates candidate_indices = list(range(len(columns))) is_dark_hours = (minute_of_day < MORNING_END_MIN) or (minute_of_day > EVENING_START_MIN) if light_vals is not None and is_dark_hours: # Find indices where light is on # np.where returns a tuple, we get the first array lit_indices = np.where(light_array > LIGHT_THRESHOLD)[0] # Convert to set for easy operations allowed_indices = set(lit_indices) # Always allow Bedroom in dark hours if bedroom_idx is not None: allowed_indices.add(bedroom_idx) # Intersect with available data columns (implicit, as we are using range of cols) candidate_indices = list(allowed_indices) # Rejection Tracking if raw_max_signal > BASELINE_PRESENCE: # Check if the raw best index is in our allowed candidates # Note: raw_best_room is a string, we need its index or check name raw_best_idx = row_array.argmax() if raw_best_idx not in candidate_indices: rejection_log.append({'minute': minute_of_day, 'room': raw_best_room}) # 4. Find Strongest Room within Candidates if len(candidate_indices) > 0: # Extract values for candidates only candidate_vals = row_array[candidate_indices] local_max = candidate_vals.max() if local_max > 0: # Find which index in the candidate list had the max best_local_idx = candidate_vals.argmax() # Map back to global column index best_global_idx = candidate_indices[best_local_idx] strongest_room = columns[best_global_idx] max_signal = local_max else: max_signal = 0 strongest_room = "Unknown" else: max_signal = 0 strongest_room = "Unknown" if max_signal > BASELINE_PRESENCE: active_location = strongest_room silence_counter = 0 last_known_room = active_location else: active_location = None silence_counter += 1 # 5. Latching & Sleep Logic effective_location = current_state if active_location: effective_location = active_location else: is_night_time = (minute_of_day < 480) or (minute_of_day > 1320) if last_known_room == "Bedroom" and is_night_time: # Keep canonical state effective_location = "Bedroom" silence_counter = 0 elif silence_counter < LATCH_TIME_MINUTES and last_known_room != "Unknown": effective_location = last_known_room elif is_night_time: effective_location = "Bedroom" last_known_room = "Bedroom" else: effective_location = "Out of House/Inactive" # 6. State Change & Logging if effective_location != current_state: if current_state != "Unknown": # Ensure we don't log 0 duration blocks if minute_of_day > start_time: # Only log if the block ends AFTER the user requested start time if minute_of_day > output_cutoff_minute: # Calculate adjusted start time final_start = max(start_time, output_cutoff_minute) duration = minute_of_day - final_start if duration > 0: raw_log.append({ "room": current_state, "start": final_start, "end": minute_of_day, "duration": duration }) # ------------------------------------------- current_state = effective_location start_time = minute_of_day # Add final block (Handling the tail end) final_minute = len(pivot_df) + start_minute_offset # Apply the same buffer logic to the final block if final_minute > output_cutoff_minute: # If the current block started before the cutoff, truncate it # If it started after, keep it as is. # Note: current_block_start_time variable from original script was likely 'start_time' # (variable name mismatch in original snippet, using start_time here which is the loop tracker) final_block_start = max(start_time, output_cutoff_minute) duration = final_minute - final_block_start if duration > 0: raw_log.append({ "room": current_state, "start": final_block_start, "end": final_minute, "duration": duration }) # 3. Iterative Cleaning (Same as before) iteration = 0 max_iterations = 10 while iteration < max_iterations: merged_log = [] if raw_log: current_block = raw_log[0] for next_block in raw_log[1:]: if next_block['room'] == current_block['room']: current_block['end'] = next_block['end'] current_block['duration'] = current_block['end'] - current_block['start'] else: merged_log.append(current_block) current_block = next_block merged_log.append(current_block) raw_log = merged_log changes_made = False for i in range(len(raw_log)): block = raw_log[i] dynamic_threshold = MIN_DURATION_BATHROOM_THRESHOLD if "Bathroom" in block['room'] else MIN_DURATION_OTHER_THRESHOLD if block['duration'] < dynamic_threshold: prev_block = raw_log[i-1] if i > 0 else None next_block = raw_log[i+1] if i < len(raw_log) - 1 else None winner_room = None if prev_block and next_block: winner_room = prev_block['room'] elif prev_block: winner_room = prev_block['room'] elif next_block: winner_room = next_block['room'] if winner_room and winner_room != block['room']: block['room'] = winner_room changes_made = True if not changes_made: break iteration += 1 # --------------------------------------------------------- # 4. Print Rejection Log # --------------------------------------------------------- if rejection_log: date_str = date_label or "Unknown Date" rejection_log.sort(key=lambda x: x['minute']) curr_rej = rejection_log[0] start_m = curr_rej['minute'] end_m = start_m room = curr_rej['room'] for i in range(1, len(rejection_log)): next_rej = rejection_log[i] if (next_rej['minute'] == end_m + 1) and (next_rej['room'] == room): end_m = next_rej['minute'] else: duration = end_m - start_m + 1 threshold = 2 if "Bathroom" in room else 5 if duration >= threshold: print(f"On {date_str}, between {minutes_to_time(start_m)} and {minutes_to_time(end_m + 1)}, {room} had the highest radar presence signal but was ignored (Dark Hours).") curr_rej = next_rej start_m = curr_rej['minute'] end_m = start_m room = curr_rej['room'] # Final block duration = end_m - start_m + 1 threshold = 2 if "Bathroom" in room else 5 if duration >= threshold: print(f"On {date_str}, between {minutes_to_time(start_m)} and {minutes_to_time(end_m + 1)}, {room} had the highest radar presence signal but was ignored (Dark Hours).") # 5. Format Output final_text = [] for entry in raw_log: # If the block ends before we are supposed to start outputting, skip it entirely. if entry['end'] <= output_cutoff_minute: continue # If the block starts before the cutoff but ends after, truncate the start. if entry['start'] < output_cutoff_minute: entry['start'] = output_cutoff_minute entry['duration'] = entry['end'] - entry['start'] start_s = minutes_to_time(entry['start']) end_s = minutes_to_time(entry['end']) room_name = entry['room'] display_room_name = room_name if room_name == "Bedroom": start_min = entry['start'] end_min = entry['end'] # same night window you used before is_night_block = (start_min < 480) or (start_min > 1320) or (end_min < 480) or (end_min > 1320) if is_night_block: display_room_name = "Bedroom (Sleeping)" local_start = max(0, entry['start'] - start_minute_offset) local_end = max(0, entry['end'] - start_minute_offset) env_parts = [] if light_df is not None and room_name in light_df.columns: segment = light_df.iloc[local_start : local_end][room_name] if not segment.empty: env_parts.append(f"L:{format_sig_figs(segment.mean())}") if humidity_df is not None and room_name in humidity_df.columns: segment = humidity_df.iloc[local_start : local_end][room_name] if not segment.empty: env_parts.append(f"H:{format_sig_figs(segment.mean())}") if globals().get('PRINT_PRESENCE', False) and room_name in pivot_df.columns: segment = pivot_df.iloc[local_start : local_end][room_name] if not segment.empty: env_parts.append(f"P:{format_sig_figs(segment.mean())}") env_tag = f" [{', '.join(env_parts)}]" if env_parts else "" final_text.append(f"- {start_s} to {end_s} ({entry['duration']} mins): {display_room_name}{env_tag}") # RETURN BOTH TEXT AND FINAL STATE return "\n".join(final_text), current_state def parse_last_timeline_state(timeline_text): """ Reads the last line of a timeline string to determine where we left off. Returns: (end_minute_of_day, room_name) """ if not timeline_text: return 0, "Out of House/Inactive" lines = timeline_text.strip().split('\n') last_line = lines[-1] # Regex to extract end time and room name # Example line: "- 10:00 AM to 02:30 PM (270 mins): Living Room [L:300]" # We look for "to HH:MM XX" and then the Room Name before any brackets match = re.search(r"to\s+(\d{2}:\d{2}\s+[AP]M).*?:\s+(.*?)(?:\s\[|$)", last_line) if match: time_str = match.group(1) room_name = match.group(2).strip() # Convert Time String to Minutes dt = datetime.datetime.strptime(time_str, "%I:%M %p") minutes = dt.hour * 60 + dt.minute # Handle wrapping (if 12:00 AM is 0 or 1440, though usually timeline ends before midnight) return minutes, room_name return 0, "Out of House/Inactive" def merge_timeline_chunks(existing_text, new_text): """ Appends new_text to existing_text. If the last line of existing_text and the first line of new_text are the same room, it merges them into a single line. """ if not existing_text: return new_text if not new_text: return existing_text lines_old = existing_text.strip().split('\n') lines_new = new_text.strip().split('\n') last_line = lines_old[-1] first_line = lines_new[0] # Regex to extract: StartTime, EndTime, Duration, RoomName, EnvTags # Matches: "- 10:00 AM to 11:00 AM (60 mins): Living Room [L:100]" # Group 1: Start, Group 2: End, Group 3: Mins, Group 4: Room, Group 5: EnvTags (Optional) pattern = re.compile(r"- (.*?) to (.*?) \((\d+) mins\): (.*?)(?: \[|$)") match_old = pattern.search(last_line) match_new = pattern.search(first_line) if match_old and match_new: room_old = match_old.group(4).strip() room_new = match_new.group(4).strip() # CHECK: Are we talking about the same room? if room_old == room_new: # We have a seam! Merge them. # 1. Get original start time start_time_str = match_old.group(1) # 2. Get new end time end_time_str = match_new.group(2) # 3. Calculate total duration dur_old = int(match_old.group(3)) dur_new = int(match_new.group(3)) total_duration = dur_old + dur_new # 4. Handle Environmental Tags [L:..., H:...] # Strategy: Use the tags from the NEW line (most recent data is usually more relevant) # Alternatively, you could try to average them, but that's complex with regex. env_suffix = "" if "[" in first_line: env_suffix = " [" + first_line.split("[", 1)[1] elif "[" in last_line: env_suffix = " [" + last_line.split("[", 1)[1] # 5. Construct the merged line merged_line = f"- {start_time_str} to {end_time_str} ({total_duration} mins): {room_old}{env_suffix}" # 6. Rebuild the text # Remove last line of old, remove first line of new, insert merged line lines_old.pop() lines_new.pop(0) # Combine result_lines = lines_old + [merged_line] + lines_new return "\n".join(result_lines) # No merge possible, just append return existing_text + "\n" + new_text # --------------------------------------------------------- # HELPER: AI MODEL SELECTION # --------------------------------------------------------- def is_ollama_running(): """Checks if Ollama is running on the default local port.""" try: with urllib.request.urlopen("http://localhost:11434", timeout=0.5) as response: return response.status == 200 except: return False def select_model_configuration(): """Interactively select the model provider via CLI.""" print("\n--- SELECT AI MODEL ---") available_keys = [] for key, config in MODEL_OPTIONS.items(): # Check availability for local models if config['type'] == 'ollama': if is_ollama_running(): print(f"[{key}] {config['name']}") available_keys.append(key) else: continue else: # Always show cloud providers print(f"[{key}] {config['name']}") available_keys.append(key) if not available_keys: print("Error: No models available. (Ollama is offline and no cloud keys configured?)") sys.exit() while True: choice = input("\nSelect Model # (or 'q'): ") if choice.lower() == 'q': sys.exit() if choice in available_keys: return MODEL_OPTIONS[choice] print("Invalid selection.") # --------------------------------------------------------- # CLASS: WELLNUO API WRAPPER # --------------------------------------------------------- class WellnuoAPI: def __init__(self, api_token): self.api_token = api_token if not self.api_token: raise ValueError("Error: No API Token available.") self.url = "https://eluxnetworks.net/function/well-api/api" def get_sensor_matrix(self, start_date_str, end_date_str, minute_start=0): """ Retrieves the sensor matrix. Supports partial day fetching via minute_start. """ # 1. Construct the sensor list sensor_list = ["light", "humidity", "pressure", "radar"] # sensor_list.extend([f"s{i}" for i in range(80)]) sensors_string = ",".join(sensor_list) # 2. Prepare Payload payload = { "function": "get_sensor_matrix", "user_name": WELLNUO_USER, "token": self.api_token, "deployment_id": "21", "date": start_date_str, "to_date": end_date_str, "minute_start": minute_start, "minute_end": 1439, "sensors": sensors_string, "radar_part": "s28", "bucket_size": "1m" } try: print(f"\n--- Calling Wellnuo get_sensor_matrix() (Start: {minute_start}) ---") response = requests.post(self.url, data=payload) if response.status_code != 200: return None # Handle error gracefully in main data = msgpack.unpackb(response.content, raw=False) return data except Exception as e: print(f"API Error: {e}") return None def download(self, start_date_str, end_date_str): """Downloads radar data as ZIP file for a date range.""" payload = { "name": "download", "user_name": WELLNUO_USER, "token": self.api_token, "deployment_id": DEPLOYMENT_ID, "date_from": start_date_str, "date_to": end_date_str, "group_by": "by_minute_rd", "re_create": "false", "radar_part": "s28" } try: print(f"\n--- Calling Wellnuo download() API function to get radar sensor data between {start_date_str} and {end_date_str}...") response = requests.get(self.url, params=payload) response.raise_for_status() return response except Exception as e: return f"API Error: {e}" def get_sensor_data(self, start_date_str, end_date_str, sensor_type): """ Retrieves sensor data "light" or "humidity" for a specific date. Matches the 'get_sensor_data_by_deployment_id' API function shown in Postman. """ payload = { "function": "get_sensor_data_by_deployment_id", "user_name": WELLNUO_USER, "token": self.api_token, "sensor": sensor_type, "deployment_id": DEPLOYMENT_ID, "data_type": "ML", "radar_part": "s28", "date": start_date_str, "to_date": end_date_str, } try: print(f"\n--- Calling Wellnuo get_sensor_data_by_deployment_id() API function to get {sensor_type} sensor data between {start_date_str} and {end_date_str}...") response = requests.post(self.url, data=payload) response.raise_for_status() return response.json() except Exception as e: return f"API Error: {e}" def get_presence_data(self, start_date_str, end_date_str): """Retrieves presence data (Z-graph).""" payload = { "function": "get_presence_data", "user_name": WELLNUO_USER, "token": self.api_token, "deployment_id": DEPLOYMENT_ID, "date": start_date_str, "filter": "6", "data_type": "z-graph", "to_date": end_date_str, "refresh": "0" } try: print(f"\n--- Calling Wellnuo get_presence_data() API function for start_date = {start_date_str}, end_date = {end_date_str} ---") response = requests.post(self.url, data=payload) response.raise_for_status() return response.json() except Exception as e: return f"API Error: {e}" # --------------------------------------------------------- # CLASS: AI INTERPRETER AGENT # --------------------------------------------------------- class ActivityAgent: def __init__(self, timelines_dict, model_name, extra_context=""): self.context = self.summarize_timelines(timelines_dict) # Append the extra context (humidity list) to the end of the log if extra_context: self.context += f"\n\n{extra_context}" print("AI agent received the following context: ") print(self.context) self.model_name = model_name self.system_prompt = """ You are an expert Health Data Analyst. You are receiving a timeline of activities of a person in a house. Answer user's question strictly based on the provided timeline of activities. If the user mentions 'bathroom' without specifying 'Main' or 'Guest', you must combine the data from both 'Bathroom Main' and 'Bathroom Guest' and treat them as a single location. The timeline entries include environmental data in brackets: [L: value, H: value]. - L represents the average Light level in Lux. - H represents the average Humidity percentage. These values are averages for the specific time interval and are rounded to two significant figures. To identify showers: 1. Consult the 'Sorted list of average Bathroom Humidity levels' in the Reference Humidity Data. 2. Identify the main cluster of lower values (the baseline/median range). 3. Infer 'showering' ONLY for visits that meet **all** of the following criteria: - The Humidity [H] falls into a higher cluster separated from the baseline by a distinct numeric gap (missing integers in the sorted list). - The duration of the visit is **at least 7 minutes**. - The Light [L] level is **at least 1000 Lux**. 4. **Maximum one shower per day**: If multiple visits on the same day meet the above criteria, identify only the single event with the highest Humidity [H] as the shower. """ print(f"\n and is given the following system prompt:\n") print(f'"{self.system_prompt}"') def summarize_timelines(self, timelines, max_days=7): """ Combines multiple daily timelines into a single string for the LLM context. Limits the output to the most recent 'max_days' to save tokens. """ summary_parts = [] all_dates = sorted(timelines.keys()) # SLICE: Keep only the last N days # If there are fewer than max_days, this logic still works safely (takes all). relevant_dates = all_dates[-max_days:] if relevant_dates: last_date = relevant_dates[-1] for date in relevant_dates: daily_text = timelines[date] # Header formatting if date == last_date: header = f"### ACTIVITY LOG FOR: {date} (TODAY)" else: header = f"### ACTIVITY LOG FOR: {date}" day_block = f"{header}\n{daily_text}" summary_parts.append(day_block) if len(all_dates) > len(relevant_dates): print(f"Context Note: Truncated history. Sending last {len(relevant_dates)} days to AI (Total stored: {len(all_dates)} days).") return "\n" + "\n\n".join(summary_parts) def ask(self, question): """Sends the user question and timeline context to the LLM.""" try: start_time = time.time() response = client.chat.completions.create( model=self.model_name, messages=[ {"role": "system", "content": self.system_prompt}, {"role": "user", "content": f"Here is the log:\n{self.context}\n\nQuestion: {question}"} ], temperature=0.1, ) end_time = time.time() duration = end_time - start_time print(f"Thought for {duration:.2f} seconds") return response.choices[0].message.content except Exception as e: return f"Error: {e}" def load_timeline_history(): """Loads the timeline dictionary from a JSON file.""" if os.path.exists(TIMELINE_STORAGE_FILE): try: with open(TIMELINE_STORAGE_FILE, 'r') as f: data = json.load(f) # Return the specific deployment's history or empty dict return data.get(DEPLOYMENT_ID, {}) except Exception as e: print(f"Warning: Could not read timeline history: {e}") return {} def save_timeline_history(timelines): """Saves the updated timeline dictionary to a JSON file.""" # Load existing full file (to preserve other deployments if any) full_data = {} if os.path.exists(TIMELINE_STORAGE_FILE): try: with open(TIMELINE_STORAGE_FILE, 'r') as f: full_data = json.load(f) except: pass # Update the specific deployment full_data[DEPLOYMENT_ID] = timelines with open(TIMELINE_STORAGE_FILE, 'w') as f: json.dump(full_data, f, indent=4) print(f"✅ Timeline history saved to {TIMELINE_STORAGE_FILE}") # --------------------------------------------------------- # MAIN EXECUTION # --------------------------------------------------------- if __name__ == "__main__": # --------------------------------------------------------- # 1. INITIALIZE & DETERMINE DATE RANGE # --------------------------------------------------------- timelines_cache = load_timeline_history() now = datetime.datetime.now() today_str = now.strftime("%Y-%m-%d") # Defaults start_date_str = today_str api_minute_start = 0 previous_state_room = None if timelines_cache: sorted_dates = sorted(timelines_cache.keys()) last_recorded_date = sorted_dates[-1] if last_recorded_date == today_str: # CASE A: We have data for Today, but it might be incomplete. Resume it. last_mins, last_room = parse_last_timeline_state(timelines_cache[last_recorded_date]) start_date_str = today_str api_minute_start = last_mins previous_state_room = last_room print(f"Resuming Today's timeline from {minutes_to_time(api_minute_start)} (State: {previous_state_room})...") else: # CASE B: History ends on a previous day. We must fill the gap. last_mins, last_room = parse_last_timeline_state(timelines_cache[last_recorded_date]) # Check if the last recorded day was effectively finished (e.g. ended after 11:55 PM) if last_mins >= 1435: # Sub-case B1: Last day is complete. Start fetching from the NEXT day at 00:00. last_date_obj = datetime.datetime.strptime(last_recorded_date, "%Y-%m-%d") next_day_obj = last_date_obj + datetime.timedelta(days=1) start_date_str = next_day_obj.strftime("%Y-%m-%d") api_minute_start = 0 # Carry over the room state (e.g., Sleeping) so the new day starts with context previous_state_room = last_room print(f"History on {last_recorded_date} is complete. Filling gap starting from {start_date_str}...") else: # Sub-case B2: Last day is partial/incomplete. Resume fetching from that specific day and minute. start_date_str = last_recorded_date api_minute_start = last_mins previous_state_room = last_room print(f"History on {last_recorded_date} is partial. Resuming from {minutes_to_time(api_minute_start)}...") else: # No history, fetch last 7 days start_date = now - datetime.timedelta(days=7) start_date_str = start_date.strftime("%Y-%m-%d") print("No history found. Fetching last 7 days...") # --------------------------------------------------------- # 2. FETCH DATA # --------------------------------------------------------- wellnuo_token = get_wellnuo_credentials() wellnuo_api = WellnuoAPI(wellnuo_token) if not wellnuo_api: sys.exit("Failed to initialize API.") # We want to fetch 15 minutes BEFORE the needed time to provide context for logic BUFFER_MINUTES = 15 fetch_minute_start = max(0, api_minute_start - BUFFER_MINUTES) # Call API with buffered start sensor_data = wellnuo_api.get_sensor_matrix(start_date_str, today_str, minute_start=fetch_minute_start) if not sensor_data or 'sensors' not in sensor_data: print("Error: No valid sensor data received.") sys.exit() else: print("\n✅ Received data between ", sensor_data.get('date', 'Unknown'), " and ", today_str," for the following sensors: ") print(list(sensor_data['sensors'].keys())) # --------------------------------------------------------- # 3. PARSE DATA # --------------------------------------------------------- rows, cols = sensor_data['shape'] room_ids = sensor_data['room_ids'] room_names = [DEVICE_TO_ROOM_MAPPING.get(rid, str(rid)) for rid in room_ids] dtype_map = { 'uint16': np.uint16, 'uint8': np.uint8, 'int16': np.int16, 'int32': np.int32, 'uint32': np.uint32, 'float': np.float32, 'float32': np.float32, 'float64': np.float64, } default_dtype = dtype_map.get(sensor_data.get('dtype', 'uint16'), np.uint16) def unpack_sensor_to_df(sensor_key: str) -> pd.DataFrame: if sensor_key not in sensor_data['sensors']: return pd.DataFrame() sensor_entry = sensor_data['sensors'][sensor_key] blob = sensor_entry.get('data') if not blob: return pd.DataFrame() # dtype per sensor if present, else fall back to overall dtype, else uint16 sensor_dtype_str = sensor_entry.get('dtype') or sensor_data.get('dtype') numpy_dtype = dtype_map.get(str(sensor_dtype_str).lower(), default_dtype) arr = np.frombuffer(blob, dtype=numpy_dtype) # Get shape from API: [Rooms, Minutes] num_rooms_api, num_minutes_api = sensor_data['shape'] # Safety: If API indicates 0 rooms, return empty if num_rooms_api <= 0: return pd.DataFrame() # 1. RESHAPE: Restore the matrix to (Rooms, Minutes) # We calculate the actual number of minutes based on the blob size # to be robust against truncated data. if arr.size % num_rooms_api != 0: # If the blob is incomplete, trim to the nearest full column (room set) usable_size = (arr.size // num_rooms_api) * num_rooms_api arr = arr[:usable_size] actual_minutes = arr.size // num_rooms_api if actual_minutes == 0: return pd.DataFrame() # Reshape to [Rooms, Minutes] arr = arr.reshape(num_rooms_api, actual_minutes) # 2. TRANSPOSE: Flip to [Minutes, Rooms] # This aligns the data so that columns match 'room_names' arr = arr.T # 3. DATAFRAME CREATION # Now arr.shape[1] (columns) is equal to num_rooms_api. # We must ensure room_names matches this length. expected_cols = len(room_names) actual_cols = arr.shape[1] # This is now the Room count if actual_cols != expected_cols: # If API metadata shape differs from the room_ids list length min_cols = min(actual_cols, expected_cols) arr = arr[:, :min_cols] use_columns = room_names[:min_cols] return pd.DataFrame(arr, columns=use_columns) return pd.DataFrame(arr, columns=room_names) master_light_df = unpack_sensor_to_df('light') master_humid_df = unpack_sensor_to_df('humidity') master_radar_df = unpack_sensor_to_df('radar') if master_radar_df.empty: print("No new data available from API.") # Proceed to Agent execution with existing cache else: # --------------------------------------------------------- # 4. GENERATE & MERGE TIMELINES # --------------------------------------------------------- # We need to handle potential multi-day responses or just the partial today # Since API returns linear data, we just iterate. current_data_minutes = len(master_radar_df) start_date_obj = datetime.datetime.strptime(sensor_data['date'], '%Y-%m-%d') chunk_start = 0 chunk_date = start_date_obj # We need to map the 'fetch_minute_start' (which might be 885) to the data index 0 current_daily_offset = fetch_minute_start while chunk_start < current_data_minutes: date_key = chunk_date.strftime('%Y-%m-%d') # Calculate remaining minutes in this day (0 to 1440) mins_remaining_in_day = 1440 - current_daily_offset # Define the slice for this specific day chunk_end = min(chunk_start + mins_remaining_in_day, current_data_minutes) day_radar = master_radar_df.iloc[chunk_start:chunk_end].reset_index(drop=True) day_light = master_light_df.iloc[chunk_start:chunk_end].reset_index(drop=True) day_humid = master_humid_df.iloc[chunk_start:chunk_end].reset_index(drop=True) if not day_radar.empty: # Determine where we actually want output to start for this specific day # If this is the FIRST day being processed, we use the user's requested api_minute_start. # If this is a subsequent day (00:00), the cutoff is 0. if chunk_start == 0: cutoff = api_minute_start else: cutoff = 0 # Generate Timeline # Note: We pass 'current_daily_offset' so the timeline knows # that index 0 corresponds to e.g., minute 885. new_text, final_state = create_timeline( radar_df=day_radar, light_df=day_light, humidity_df=day_humid, start_minute_offset=current_daily_offset, initial_state=previous_state_room, output_cutoff_minute=cutoff, date_label=date_key, ) # Update State for the NEXT loop iteration (Fix 3) previous_state_room = final_state # Append or Overwrite if date_key in timelines_cache and cutoff > 0: # Partial update: Smart Merge if new_text.strip(): # Use the new helper function timelines_cache[date_key] = merge_timeline_chunks( timelines_cache[date_key], new_text ) else: # New day or full update timelines_cache[date_key] = new_text # Prepare for next loop (Next day starts at 00:00) chunk_start = chunk_end chunk_date += datetime.timedelta(days=1) current_daily_offset = 0 # Subsequent days always start at min 0 save_timeline_history(timelines_cache) # --------------------------------------------------------- # 5. AGENT EXECUTION # --------------------------------------------------------- model_config = select_model_configuration() print(f"\nInitializing {model_config['name']}...") client = OpenAI(base_url=model_config['base_url'], api_key=model_config['api_key']) humidity_context = get_timeline_humidity_profile(timelines_cache) agent = ActivityAgent(timelines_cache, model_config['model_name'], extra_context=humidity_context) print(f"\n--- Smart Agent Ready ({model_config['name']}) ---") while True: q = input("\nYou: ") if q.lower() in ['q', 'exit']: break print("Thinking...") print(f"Agent: {agent.ask(q)}")