diff --git a/well-api.py b/well-api.py index d69cce5..98626be 100644 --- a/well-api.py +++ b/well-api.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -#Vesion 2.0.0 +#Vesion 2.0.1 import os import sys import ast @@ -96,6 +96,8 @@ Loc2Color = {"Bedroom":((16,255,16),0),"Bedroom Master":((0,255,0),0),"Bedroom G s_table = ["temperature", "humidity", "pressure", "light", "radar", "voc0", "voc1", "voc2", "voc3", "voc4", "voc5", "voc6", "voc7", "voc8", "voc9"] # derived smells_table = ["s0", "s1", "s2", "s3", "s4", "s5", "s6", "s7", "s8", "s9"] # derived +s_table_temp = [] + Consolidataed_locations = {"?":"Room","Office":"Office","Hallway":"Hallway","Garage":"Garage","Outside":"Outside","Conference Room":"Office", "Room":"Room","Kitchen":"Kitchen","Bedroom":"Bedroom","Living Room":"Living Room","Bathroom Guest":"Bathroom", "Dining Room":"Dining Room","Bathroom":"Bathroom", "Bathroom Main":"Bathroom","Bedroom Master":"Bedroom", @@ -4136,6 +4138,112 @@ def fast_fill_array_from_timescale(day_data, time_from_str, devices_list, arr_so return arr_source +def fast_fill_array_from_timescale_single(day_data, time_from_str, devices_list, arr_source, sensor, timezone_str="Europe/Berlin"): + """ + Optimized version of array filling from TimeScaleDB data. + Uses vectorized operations for significant speed improvement. + """ + # Convert start time to timezone-aware datetime + start_time = datetime.datetime.strptime(time_from_str, '%Y-%m-%d %H:%M:%S%z') + + # Create device index mapping + device_to_index = {device_id: idx for idx, device_id in enumerate(devices_list)} + + # Pre-process data into a more efficient structure + # Group by device_id to reduce lookup operations + device_data = defaultdict(list) + for record in day_data: + if record[0] and record[1]: # If time and device_id exist + device_data[record[1]].append(record) + if sensor != None: + columns = { + 'avg_temperature': 2, + 'avg_humidity': 2, + 'pressure_amplitude': 2, + 'max_light': 2, + 'radar': 2, + 'sensor_min_s0': 2, + 'sensor_min_s1': 2, + 'sensor_min_s2': 2, + 'sensor_min_s3': 2, + 'sensor_min_s4': 2, + 'sensor_min_s5': 2, + 'sensor_min_s6': 2, + 'sensor_min_s7': 2, + 'sensor_min_s8': 2, + 'sensor_min_s9': 2 + } + else: + + columns = { + 'avg_temperature': 2, + 'avg_humidity': 3, + 'pressure_amplitude': 4, + 'max_light': 5, + 'radar': 6, + 'sensor_min_s0': 7, + 'sensor_min_s1': 8, + 'sensor_min_s2': 9, + 'sensor_min_s3': 10, + 'sensor_min_s4': 11, + 'sensor_min_s5': 12, + 'sensor_min_s6': 13, + 'sensor_min_s7': 14, + 'sensor_min_s8': 15, + 'sensor_min_s9': 16 + } + + column_keys = list(columns.keys()) + + # Process each device's data in bulk + for device_id, records in device_data.items(): + if device_id not in device_to_index: + continue + + base_idx = device_to_index[device_id] #* len(columns) + + # Convert records to numpy array for faster processing + records_array = np.array(records, dtype=object) + + # Calculate all minute deltas at once + times = records_array[:, 0] + minute_deltas = np.array([(t - start_time).total_seconds() / 60 for t in times], dtype=int) + + # Filter valid minute deltas + valid_mask = (minute_deltas >= 0) & (minute_deltas < arr_source.shape[1]) + if not np.any(valid_mask): + continue + + minute_deltas = minute_deltas[valid_mask] + records_array = records_array[valid_mask] + + # Process each column type in bulk + # Use pre-computed column_keys list for consistent indexing + + + #row_idx = base_idx + 2#column_keys.index(col_name) + #values = records_array[:, column_keys.index(col_name)] + + ## Filter out None values + #valid_values = ~np.equal(values, None) + #if not np.any(valid_values): + #continue + + # Process each column type in bulk + for col_name, col_offset in columns.items(): + row_idx = base_idx + list(columns.keys()).index(col_name) + values = records_array[:, col_offset] + + # Filter out None values + valid_values = ~np.equal(values, None) + if not np.any(valid_values): + continue + + # Update array in bulk + arr_source[row_idx, minute_deltas[valid_values]] = values[valid_values] + + return arr_source + def CalcExtremes(arr_source, length, height): """ Calculate min and max values for each row within legal bounds. @@ -7553,7 +7661,7 @@ def GenerateFullLocationMapLabelsOut(map_file, deployment_id, ddate, recreate_or print(ddate, time.time() - st) def CreateMapFast(map_file, devices_list, selected_date, bw, time_zone_s, radar_part, group_by): - global Id2MACDict + global Id2MACDict, s_table_temp st = time.time() if radar_part == "s28": @@ -7580,7 +7688,7 @@ def CreateMapFast(map_file, devices_list, selected_date, bw, time_zone_s, radar_ return False, [] if larger_than200 > 0: - sensors_c = len(s_table) + sensors_c = len(s_table_temp) else: #old sensors not supported return False, [] @@ -7590,49 +7698,31 @@ def CreateMapFast(map_file, devices_list, selected_date, bw, time_zone_s, radar_ image_file = map_file time_from_str, time_to_str = GetLocalTimeForDate(selected_date, time_zone_s) temp_offset = -16 - sql = get_deployment_query(devices_list_str, time_from_str, time_to_str, ids_list, radar_part, temp_offset) + if sensors_c > 1: + sql = get_deployment_query(devices_list_str, time_from_str, time_to_str, ids_list, radar_part, temp_offset) + else: + sql = get_deployment_single_query(devices_list_str, time_from_str, time_to_str, ids_list, radar_part, temp_offset, s_table_temp[0]) print(sql) #print(sql) #st = time.time() - print(f"@1 ----{time.time() - st}") with get_db_connection() as conn: - print(f"@1a ----{time.time() - st}") with conn.cursor() as cur: - print(f"@1b ----{time.time() - st}") cur.execute(sql) day_data = cur.fetchall()#cur.fetchone() #print(result) if day_data == None: - print(f"@1c ----{time.time() - st}") return False, [] - print(f"@2 ----{time.time() - st}") stretch_by = 10 minutes = 1440 stripes = devices_c * sensors_c #2 for upper maxes, lower mins arr_source_template = np.full((stripes, minutes+4), -0.001, dtype=float) - print(f"@3 ----{time.time() - st}") - st = time.time() arr_stretched_template = np.zeros((int(stripes*stretch_by), minutes, 3), dtype=np.uint8) # 3 for RGB channels - print(f"@4a ----{time.time() - st}") - #st = time.time() - #arr_source = fill_array_from_timescale(day_data, time_from_str, devices_list[1], arr_source_template, time_zone_s) - #print(f"@4b ----{time.time() - st}") - #st = time.time() - #arr_source = fast_fill_array_from_timescale_bad(day_data, time_from_str, devices_list[1], arr_source_template, time_zone_s) - #print(f"@4n ----{time.time() - st}") - st = time.time() - arr_source = fast_fill_array_from_timescale(day_data, time_from_str, devices_list[1], arr_source_template, time_zone_s) - #arr_source = fill_array_from_timescale(day_data, time_from_str, devices_list[1], arr_source_template, time_zone_s) - print(f"@5 ----{time.time() - st}") + arr_source = fast_fill_array_from_timescale_single(day_data, time_from_str, devices_list[1], arr_source_template, s_table_temp[0], time_zone_s) arr_source = AddLimits_optimized(arr_source, devices_c, sensors_c, percentile=100) - print(f"@6 ----{time.time() - st}") scaled_day = CalcExtremes(arr_source, minutes, stripes) - print(f"@7 ----{time.time() - st}") arr_stretched, vocs_scaled = FillImage_optimized(scaled_day, devices_c, sensors_c, arr_stretched_template, group_by, bw) - print(f"@8 ----{time.time() - st}") SaveImageInBlob(image_file, arr_stretched) - print(f"@9 ----{time.time() - st}") return True, vocs_scaled except Exception as e: @@ -7732,6 +7822,283 @@ def get_deployment_query(devices_list_str, time_from_str, time_to_str, ids_list, """ return sql +def get_deployment_single_query(devices_list_str, time_from_str, time_to_str, ids_list, radar_part, temp_offset, sensor_in): + """ + Generate a TimeScaleDB query for a single sensor reading based on device IDs. + Parameters: + devices_list_str (str): Comma-separated string of device IDs + time_from_str (str): Start time for the query + time_to_str (str): End time for the query + ids_list (list): List of device IDs in priority order for sorting + radar_part (str): Radar column name, defaults to 'radar' + temp_offset (float): Temperature offset to apply + sensor (str): Single sensor to query from s_table + Returns: + str: Generated SQL query + """ + # Generate the CASE statement for ordering based on the provided ids_list + case_statements = [] + for index, device_id in enumerate(ids_list, start=1): + case_statements.append(f"WHEN {device_id} THEN {index}") + case_order = "\n ".join(case_statements) + + if "_" in sensor_in: + sensor = sensor_in.split("_")[1] + else: + sensor = sensor_in + + # Handle different sensor types + if sensor == "radar": + # Query only radar readings + sql = f""" + SELECT + time_bucket('1 minute', time) AS minute, + device_id, + MAX({radar_part}) AS {sensor_in} + FROM + radar_readings + WHERE + device_id IN ({devices_list_str}) + AND time >= '{time_from_str}' + AND time < '{time_to_str}' + GROUP BY + minute, + device_id + ORDER BY + CASE device_id + {case_order} + END, + minute; + """ + elif sensor == "temperature": + # Query temperature with offset + sql = f""" + SELECT + time_bucket('1 minute', time) AS minute, + device_id, + AVG(temperature) + {temp_offset} AS {sensor_in} + FROM + sensor_readings + WHERE + device_id IN ({devices_list_str}) + AND time >= '{time_from_str}' + AND time < '{time_to_str}' + GROUP BY + minute, + device_id + ORDER BY + CASE device_id + {case_order} + END, + minute; + """ + elif sensor == "humidity": + # Query humidity + sql = f""" + SELECT + time_bucket('1 minute', time) AS minute, + device_id, + AVG(humidity) AS {sensor_in} + FROM + sensor_readings + WHERE + device_id IN ({devices_list_str}) + AND time >= '{time_from_str}' + AND time < '{time_to_str}' + GROUP BY + minute, + device_id + ORDER BY + CASE device_id + {case_order} + END, + minute; + """ + elif sensor == "pressure": + # Query pressure + sql = f""" + SELECT + time_bucket('1 minute', time) AS minute, + device_id, + AVG(pressure) AS {sensor_in} + FROM + sensor_readings + WHERE + device_id IN ({devices_list_str}) + AND time >= '{time_from_str}' + AND time < '{time_to_str}' + GROUP BY + minute, + device_id + ORDER BY + CASE device_id + {case_order} + END, + minute; + """ + elif sensor == "light": + # Query light + sql = f""" + SELECT + time_bucket('1 minute', time) AS minute, + device_id, + MAX(light) AS {sensor_in} + FROM + sensor_readings + WHERE + device_id IN ({devices_list_str}) + AND time >= '{time_from_str}' + AND time < '{time_to_str}' + GROUP BY + minute, + device_id + ORDER BY + CASE device_id + {case_order} + END, + minute; + """ + elif sensor.startswith("voc"): + # Query VOC sensors (voc0-voc9) - these correspond to s0-s9 in the original query + voc_num = sensor[3:] # Extract number from "voc0", "voc1", etc. + sql = f""" + SELECT + time_bucket('1 minute', time) AS minute, + device_id, + MIN(CASE WHEN s{voc_num} > 0 THEN s{voc_num} END) AS {sensor} + FROM + sensor_readings + WHERE + device_id IN ({devices_list_str}) + AND time >= '{time_from_str}' + AND time < '{time_to_str}' + GROUP BY + minute, + device_id + ORDER BY + CASE device_id + {case_order} + END, + minute; + """ + else: + raise ValueError(f"Unknown sensor type: {sensor}. Must be one of: temperature, humidity, pressure, light, radar, voc0-voc9") + + return sql + +def get_deployment_single_query_rz(devices_list_str, time_from_str, time_to_str, ids_list, radar_part, temp_offset, sensor): + """ + Generate a TimeScaleDB query for sensor and radar readings based on device IDs. + + Parameters: + devices_list_str (str): Comma-separated string of device IDs + time_from_str (str): Start time for the query + time_to_str (str): End time for the query + ids_list (list): List of device IDs in priority order for sorting + radar_part (str): Radar column name, defaults to 'radar' + + Returns: + str: Generated SQL query + """ + + #table_sens = {"temperature": (f"sr.avg_temperature+ {temp_offset} as avg_temperature", "avg_temperature"), + #"humidity": ("sr.avg_humidity", "avg_humidity"), + #"pressure": ("sr.pressure_amplitude", "pressure_amplitude"), + #"light":("sr.max_light", "max_light"), + #"radar":("rr.radar") + #"voc0": + #"voc1": + #"voc2": + #"voc3": + #"voc4": + #"voc5": + #"voc6": + #"voc7": + #"voc8": + #"voc9": , + #} + # derived + + #if sensor == + # Generate the CASE statement for ordering based on the provided ids_list + case_statements = [] + for index, device_id in enumerate(ids_list, start=1): + case_statements.append(f"WHEN {device_id} THEN {index}") + + case_order = "\n ".join(case_statements) + + sql = f""" + SELECT + COALESCE(sr.minute, rr.minute) as minute, + COALESCE(sr.device_id, rr.device_id) as device_id, + sr.avg_temperature+ {temp_offset} as avg_temperature, + sr.avg_humidity, + sr.pressure_amplitude, + sr.max_light, + rr.radar, + sr.min_s0 as sensor_min_s0, + sr.min_s1 as sensor_min_s1, + sr.min_s2 as sensor_min_s2, + sr.min_s3 as sensor_min_s3, + sr.min_s4 as sensor_min_s4, + sr.min_s5 as sensor_min_s5, + sr.min_s6 as sensor_min_s6, + sr.min_s7 as sensor_min_s7, + sr.min_s8 as sensor_min_s8, + sr.min_s9 as sensor_min_s9 + FROM ( + SELECT + time_bucket('1 minute', time) AS minute, + device_id, + AVG(temperature) AS avg_temperature, + AVG(humidity) AS avg_humidity, + AVG(pressure) AS pressure_amplitude, + MAX(light) AS max_light, + MIN(CASE WHEN s0 > 0 THEN s0 END) AS min_s0, + MIN(CASE WHEN s1 > 0 THEN s1 END) AS min_s1, + MIN(CASE WHEN s2 > 0 THEN s2 END) AS min_s2, + MIN(CASE WHEN s3 > 0 THEN s3 END) AS min_s3, + MIN(CASE WHEN s4 > 0 THEN s4 END) AS min_s4, + MIN(CASE WHEN s5 > 0 THEN s5 END) AS min_s5, + MIN(CASE WHEN s6 > 0 THEN s6 END) AS min_s6, + MIN(CASE WHEN s7 > 0 THEN s7 END) AS min_s7, + MIN(CASE WHEN s8 > 0 THEN s8 END) AS min_s8, + MIN(CASE WHEN s9 > 0 THEN s9 END) AS min_s9 + FROM + sensor_readings + WHERE + device_id IN ({devices_list_str}) + AND time >= '{time_from_str}' + AND time < '{time_to_str}' + GROUP BY + minute, + device_id + ) sr + FULL OUTER JOIN ( + SELECT + time_bucket('1 minute', time) AS minute, + device_id, + MAX({radar_part}) AS radar + FROM + radar_readings + WHERE + device_id IN ({devices_list_str}) + AND time >= '{time_from_str}' + AND time < '{time_to_str}' + GROUP BY + minute, + device_id + ) rr + ON sr.minute = rr.minute AND sr.device_id = rr.device_id + ORDER BY + CASE COALESCE(sr.device_id, rr.device_id) + {case_order} + END, + COALESCE(sr.minute, rr.minute); + """ + return sql + + def get_deployment_rd_query(devices_list_str, time_from_str, time_to_str, ids_list, temp_offset): #radar detailed """ @@ -13799,6 +14166,454 @@ def handle_telnyx_webhook3(webhook_data, remote_addr, request_id): logger.exception(f"Error in handle_telnyx_webhook3: {e}") return False + +def FilterDevicesByDeviceId(devices_list, device_id_str): + """ + Filter devices list to include only the specified device_id. + + Parameters: + devices_list: tuple of (device_details_list, device_ids_list) + device_id_str: string representation of device_id to filter by + + Returns: + tuple: filtered (device_details_list, device_ids_list) + """ + try: + target_device_id = int(device_id_str) + except ValueError: + return ([], []) + + device_details_list, device_ids_list = devices_list + + filtered_details = [] + filtered_ids = [] + + for i, device_details in enumerate(device_details_list): + device_id = device_details[1] # device_id is second element (index 1) + if device_id == target_device_id: + filtered_details.append(device_details) + filtered_ids.append(device_ids_list[i]) + + return (filtered_details, filtered_ids) + + +def FilterSensorsBySensorType(sensor_type): + """ + Filter s_table to include only the specified sensor type. + + Parameters: + sensor_type: string name of sensor type (e.g., 'temperature', 'radar', 'voc0', etc.) + + Returns: + list: filtered s_table containing only the mapped sensor name + """ + # Map user-friendly sensor names to their s_table equivalents + sensor_mapping = { + 'temperature': 'avg_temperature', + 'humidity': 'avg_humidity', + 'pressure': 'pressure_amplitude', + 'light': 'max_light', + 'radar': 'radar', + 'voc0': 'sensor_min_s0', + 'voc1': 'sensor_min_s1', + 'voc2': 'sensor_min_s2', + 'voc3': 'sensor_min_s3', + 'voc4': 'sensor_min_s4', + 'voc5': 'sensor_min_s5', + 'voc6': 'sensor_min_s6', + 'voc7': 'sensor_min_s7', + 'voc8': 'sensor_min_s8', + 'voc9': 'sensor_min_s9' + } + + # Get the actual sensor name used in s_table + mapped_sensor = sensor_mapping.get(sensor_type, sensor_type) + + # Return the mapped sensor name if it's valid, otherwise empty list + if mapped_sensor in sensor_mapping.values() or mapped_sensor == sensor_type: + return [mapped_sensor] + + return [] + + +def CreateSensorsMapFast(map_file, devices_list, selected_date, bw, time_zone_s, radar_part, group_by, filtered_s_table): + """ + Create a sensor map with filtered devices and sensors. + Based on CreateMapFast but with filtering support. + + Parameters: + map_file: output file path + devices_list: filtered devices list + selected_date: date string + bw: black and white flag + time_zone_s: timezone string + radar_part: radar part specification + group_by: grouping strategy + filtered_s_table: filtered sensor table + + Returns: + tuple: (success_boolean, vocs_scaled_array) + """ + global Id2MACDict + + st = time.time() + if radar_part == "s28": + radar_part = "(s2+s3+s4+s5+s6+s7+s8)/7" + + try: + lower_than200 = 0 + larger_than200 = 0 + ids_list = [] + + for details in devices_list[0]: + well_id = details[0] + ids_list.append(details[1]) + if well_id < 200: + lower_than200 += 1 + else: + larger_than200 += 1 + + if lower_than200 > 0 and larger_than200 > 0: + return False, [] + + if larger_than200 > 0: + sensors_c = len(filtered_s_table) + else: # old sensors not supported + return False, [] + + devices_c = len(devices_list[0]) + devices_list_str = ",".join(map(str, devices_list[1])) + image_file = map_file + time_from_str, time_to_str = GetLocalTimeForDate(selected_date, time_zone_s) + temp_offset = -16 + + # Use filtered sensor table for queries + if sensors_c > 1: + sql = get_deployment_query_filtered( + devices_list_str, + time_from_str, + time_to_str, + ids_list, + radar_part, + temp_offset, + filtered_s_table + ) + else: + sql = get_deployment_single_query( + devices_list_str, + time_from_str, + time_to_str, + ids_list, + radar_part, + temp_offset, + filtered_s_table[0] + ) + + print(sql) + + with get_db_connection() as conn: + with conn.cursor() as cur: + cur.execute(sql) + day_data = cur.fetchall() + if day_data == None: + return False, [] + + stretch_by = 10 + minutes = 1440 + stripes = devices_c * sensors_c + arr_source_template = np.full((stripes, minutes + 4), -0.001, dtype=float) + arr_stretched_template = np.zeros((int(stripes * stretch_by), minutes, 3), dtype=np.uint8) + + # Use filtered sensor table + arr_source = fast_fill_array_from_timescale_filtered( + day_data, + time_from_str, + devices_list[1], + arr_source_template, + filtered_s_table, + time_zone_s + ) + + arr_source = AddLimits_optimized_filtered(arr_source, devices_c, sensors_c, filtered_s_table, percentile=100) + scaled_day = CalcExtremes(arr_source, minutes, stripes) + arr_stretched, vocs_scaled = FillImage_optimized(scaled_day, devices_c, sensors_c, arr_stretched_template, group_by, bw) + SaveImageInBlob(image_file, arr_stretched) + return True, vocs_scaled + + except Exception as e: + AddToLog(traceback.format_exc()) + return False, [] + + +def get_deployment_query_filtered(devices_list_str, time_from_str, time_to_str, ids_list, radar_part, temp_offset, filtered_s_table): + """ + Generate a filtered TimeScaleDB query for specific sensors only. + + Parameters: + devices_list_str (str): Comma-separated string of device IDs + time_from_str (str): Start time for the query + time_to_str (str): End time for the query + ids_list (list): List of device IDs in priority order for sorting + radar_part (str): Radar column name + temp_offset (float): Temperature offset + filtered_s_table (list): List of sensor names to include + + Returns: + str: Generated SQL query + """ + # Generate the CASE statement for ordering + case_statements = [] + for index, device_id in enumerate(ids_list, start=1): + case_statements.append(f"WHEN {device_id} THEN {index}") + case_order = "\n ".join(case_statements) + + # Build sensor-specific SELECT clauses + sensor_selects = [] + sensor_aggregates = [] + radar_needed = False + + for sensor in filtered_s_table: + if sensor == "temperature": + sensor_selects.append(f"sr.avg_temperature+ {temp_offset} as avg_temperature") + sensor_aggregates.append("AVG(temperature) AS avg_temperature") + elif sensor == "humidity": + sensor_selects.append("sr.avg_humidity") + sensor_aggregates.append("AVG(humidity) AS avg_humidity") + elif sensor == "pressure": + sensor_selects.append("sr.pressure_amplitude") + sensor_aggregates.append("AVG(pressure) AS pressure_amplitude") + elif sensor == "light": + sensor_selects.append("sr.max_light") + sensor_aggregates.append("MAX(light) AS max_light") + elif sensor == "radar": + sensor_selects.append("rr.radar") + radar_needed = True + elif sensor.startswith("voc"): + # Extract sensor number (e.g., sensor_min_s0 -> s0) + sensor_num = sensor.replace("voc", "") + sensor_selects.append(f"sr.{sensor}") + sensor_aggregates.append(f"MIN(s{sensor_num}) AS {sensor}") + + # Build the query + if radar_needed and sensor_aggregates: + # Need both sensor readings and radar readings + sql = f""" + SELECT + COALESCE(sr.minute, rr.minute) as minute, + COALESCE(sr.device_id, rr.device_id) as device_id, + {', '.join(sensor_selects)} + FROM ( + SELECT + time_bucket('1 minute', time) AS minute, + device_id, + {', '.join(sensor_aggregates)} + FROM + sensor_readings + WHERE + device_id IN ({devices_list_str}) + AND time >= '{time_from_str}' + AND time < '{time_to_str}' + GROUP BY + minute, + device_id + ) sr + FULL OUTER JOIN ( + SELECT + time_bucket('1 minute', time) AS minute, + device_id, + MAX({radar_part}) AS radar + FROM + radar_readings + WHERE + device_id IN ({devices_list_str}) + AND time >= '{time_from_str}' + AND time < '{time_to_str}' + GROUP BY + minute, + device_id + ) rr + ON sr.minute = rr.minute AND sr.device_id = rr.device_id + ORDER BY + CASE COALESCE(sr.device_id, rr.device_id) + {case_order} + END, + COALESCE(sr.minute, rr.minute); + """ + elif radar_needed: + # Only radar needed + sql = f""" + SELECT + time_bucket('1 minute', time) AS minute, + device_id, + MAX({radar_part}) AS radar + FROM + radar_readings + WHERE + device_id IN ({devices_list_str}) + AND time >= '{time_from_str}' + AND time < '{time_to_str}' + GROUP BY + minute, + device_id + ORDER BY + CASE device_id + {case_order} + END, + minute; + """ + else: + # Only sensor readings needed + sql = f""" + SELECT + time_bucket('1 minute', time) AS minute, + device_id, + {', '.join(sensor_aggregates)} + FROM + sensor_readings + WHERE + device_id IN ({devices_list_str}) + AND time >= '{time_from_str}' + AND time < '{time_to_str}' + GROUP BY + minute, + device_id + ORDER BY + CASE device_id + {case_order} + END, + minute; + """ + + return sql + + + +def fast_fill_array_from_timescale_filtered(day_data, time_from_str, devices_list, arr_source, filtered_s_table, timezone_str="Europe/Berlin"): + """ + Optimized array filling for filtered sensors. + + Parameters: + day_data: query results + time_from_str: start time string + devices_list: list of device IDs + arr_source: array to fill + filtered_s_table: list of sensor names to process + timezone_str: timezone string + + Returns: + numpy array: filled array + """ + # Convert start time to timezone-aware datetime + start_time = datetime.datetime.strptime(time_from_str, '%Y-%m-%d %H:%M:%S%z') + + # Create device index mapping + device_to_index = {device_id: idx for idx, device_id in enumerate(devices_list)} + + # Pre-process data into a more efficient structure + device_data = defaultdict(list) + for record in day_data: + if record[0] and record[1]: # If time and device_id exist + device_data[record[1]].append(record) + + # Build column mapping based on filtered sensors + columns = {} + col_idx = 2 # Start after time and device_id + + for sensor in filtered_s_table: + columns[sensor] = col_idx + col_idx += 1 + + # Process each device's data + for device_id, records in device_data.items(): + if device_id not in device_to_index: + continue + + base_idx = device_to_index[device_id] * len(filtered_s_table) + + # Convert records to numpy array for faster processing + records_array = np.array(records, dtype=object) + + # Calculate all minute deltas at once + times = records_array[:, 0] + minute_deltas = np.array([(t - start_time).total_seconds() / 60 for t in times], dtype=int) + + # Filter valid minute deltas + valid_mask = (minute_deltas >= 0) & (minute_deltas < arr_source.shape[1] - 4) + if not np.any(valid_mask): + continue + + minute_deltas = minute_deltas[valid_mask] + records_array = records_array[valid_mask] + + # Process each filtered sensor + for sensor_idx, sensor_name in enumerate(filtered_s_table): + if sensor_name in columns: + row_idx = base_idx + sensor_idx + values = records_array[:, columns[sensor_name]] + + # Filter out None values + valid_values = ~np.equal(values, None) + if not np.any(valid_values): + continue + + # Update array in bulk + arr_source[row_idx, minute_deltas[valid_values]] = values[valid_values] + + return arr_source + + +def AddLimits_optimized_filtered(arr_source, devices_c, sensors_c, filtered_s_table, percentile): + """ + Vectorized version of AddLimits for filtered sensors. + + Parameters: + arr_source: array of shape (devices_c * sensors_c, 1444) + devices_c: number of devices + sensors_c: number of sensors per device + filtered_s_table: list of sensor names + percentile: parameter for clean_data_vectorized + """ + total_sensors = devices_c * sensors_c + + # Create arrays of sensor legal values for filtered sensors + min_vals = [] + max_vals = [] + windows = [] + + for sensor_name in filtered_s_table: + if sensor_name in sensor_legal_values: + min_vals.append(sensor_legal_values[sensor_name][0]) + max_vals.append(sensor_legal_values[sensor_name][1]) + windows.append(sensor_legal_values[sensor_name][2]) + else: + # Default values if sensor not found + min_vals.append(0) + max_vals.append(1000) + windows.append(1) + + # Repeat for each device + min_vals = np.tile(min_vals, devices_c) + max_vals = np.tile(max_vals, devices_c) + windows = np.tile(windows, devices_c) + + # Process rows that need cleaning (window > 2) + clean_mask = windows > 2 + if np.any(clean_mask): + for window in np.unique(windows[clean_mask]): + rows_to_clean = np.where(clean_mask & (windows == window))[0] + + for row_idx in rows_to_clean: + arr_source[row_idx, :1440] = clean_data_vectorized( + arr_source[row_idx, :1440], + window, + percentile + ) + + # Set min/max values for all rows + arr_source[:, 1440] = min_vals + arr_source[:, 1441] = max_vals + + return arr_source #==================================== ADD FUNCTIONS BEFORE ============================================ # Main API class @@ -13811,6 +14626,8 @@ class WellApi: def on_get(self, req, resp, path=""): """Handle GET requests""" + global s_table_temp + logger.debug(f"GET request to path: {path}") logger.debug(f"Sent variables: {req.params}") logger.debug(f"All headers: {dict(req.headers)}") @@ -14026,6 +14843,7 @@ class WellApi: st = time.time() vocs_scaled = {} devices_list = GetProximityList(deployment_id, timee) + s_table_temp = s_table stored, vocs_scaled = CreateMapFast(filename, devices_list, ddate, bw, time_zone_s, radar_part, group_by) #"[bit] 1=same sensors together, 2=same device together, 4=1 der, 8=2 der if stored != True: AddToLog("Map not created") @@ -14056,6 +14874,114 @@ class WellApi: resp.status = falcon.HTTP_200 return + elif get_function_name == "get_sensors_map": + # Get filtering parameters + device_id_str = req.params.get('device_id') + sensor = req.params.get('sensor') + + # Get standard image parameters + deployment_id = req.params.get('deployment_id') + time_zone_s = GetTimeZoneOfDeployment(deployment_id) + ddate = req.params.get("date") + ddate = ddate.replace("_", "-") + group_by = ""#req.params.get("group_by") + timee = StringToEpoch(ddate, time_zone_s) + force_recreate = "true" + radar_part = req.params.get("radar_part") + bw = req.params.get("bw") == "true" + unique_identifier = req.params.get("unique_identifier") + + # Create filename with filtering parameters + filter_suffix = "" + if device_id_str: + filter_suffix += f"_dev{device_id_str}" + if sensor: + filter_suffix += f"_sens{sensor}" + + filename = f"/{deployment_id}/{deployment_id}_{ddate}_{radar_part}_{bw}{filter_suffix}_sensors_map.png" + + # Check if file exists and needs recreation + if not force_recreate: + file_exists, time_modified_utc = check_file_exists(filename) + if file_exists: + time_modified_local = time_modified_utc.astimezone(pytz.timezone(time_zone_s)) + time_modified_date = time_modified_local.date() + file_date = MapFileToDate(filename) + if time_modified_date <= file_date: + force_recreate = True + else: + force_recreate = True + + # Convert date to UTC epoch for device queries + timee = LocalDateToUTCEpoch(ddate, time_zone_s) + 5 # add so date boundary is avoided + + st = time.time() + vocs_scaled = {} + + if force_recreate: + st = time.time() + vocs_scaled = {} + + # Get initial device list + devices_list = GetProximityList(deployment_id, timee) + + # Apply device filtering if specified + if device_id_str: + filtered_devices = FilterDevicesByDeviceId(devices_list, device_id_str) + else: + filtered_devices = devices_list + + # Apply sensor filtering if specified + if sensor: + filtered_s_table = [sensor]#FilterSensorsBySensorType(sensor) + else: + filtered_s_table = s_table + + # Validate we have devices and sensors to process + if not filtered_devices[0] or not filtered_s_table: + AddToLog("No devices or sensors match the specified filters") + resp.media = package_response("No devices or sensors match the specified filters", HTTP_400) + return + + # Create the filtered map + stored, vocs_scaled = CreateSensorsMapFast( + filename, + filtered_devices, + ddate, + bw, + time_zone_s, + radar_part, + group_by, + filtered_s_table + ) + + if stored != True: + AddToLog("Sensors map not created") + resp.media = package_response("Sensors map not created", HTTP_401) + return + else: + AddToLog("Sensors map created") + # Send over MQTT vocs_scaled + json_data = numpy_to_json(vocs_scaled, filtered_devices) + MQSendL("/" + unique_identifier, json_data) + + # Read and send image from blob + image_bytes, content_type = GetBlob(filename) + if debug: + resp.media = package_response(f'Log: {debug_string}', HTTP_200) + else: + if image_bytes is None: + raise falcon.HTTPNotFound( + title='Image not found', + description=f'Image {filename} could not be found or retrieved' + ) + sys.stdout.flush() + # Set response content type and body + resp.content_type = content_type + resp.data = image_bytes + resp.status = falcon.HTTP_200 + return + elif get_function_name == "get_full_location_map": raw = req.params.get("raw") == "true" @@ -15032,10 +15958,6 @@ class WellApi: days = (epoch_to_utc - epoch_from_utc) / (60 * 1440) - - - - well_id = well_ids_list[0] all_slices = {} radar_part = ""