import os
import ast
from ast import literal_eval
import falcon
from falcon import HTTP_200, HTTP_400, HTTP_401, HTTP_500
import json
import logging
from dotenv import load_dotenv
from minio import Minio
from minio.error import S3Error
import io
import datetime
from datetime import timedelta, timezone
import jwt
import psycopg2
import html
import re
import traceback
import time
import pytz
from scipy import interpolate
from scipy import stats
import pandas as pd
from collections import defaultdict
import numpy as np
import warnings
import cv2
print("OpenCV path:", cv2.__file__)
print("Available OpenCV attributes:", dir(cv2))
try:
# Create a small test array
import numpy as np
test_arr = np.zeros((10, 10), dtype=np.uint8) # Small black square
print("\nTesting cv2.imencode with simple array...")
success, encoded = cv2.imencode('.png', test_arr)
print("imencode test success:", success)
print("Encoded data type:", type(encoded))
print("Encoded data length:", len(encoded) if success else "N/A")
except Exception as e:
print("imencode test failed with error:", str(e))
print("Error type:", type(e).__name__)
EnablePlot = False
if EnablePlot:
import matplotlib
matplotlib.use('Agg') # Set the backend before importing pyplot
import matplotlib.pyplot as plt
from matplotlib.colors import LinearSegmentedColormap
warnings.filterwarnings('ignore')
#print(np.__file__)
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
logger.propagate = False
logger.setLevel(logging.WARNING) #.WARNING
handler = logging.StreamHandler()
formatter = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
location_names = {-1:"All",0:"?",5:"Office",6:"Hallway",7:"Garage",8:"Outside",9:"Conference Room",10:"Room",34:"Kitchen",
56:"Bedroom",78:"Living Room",102:"Bathroom",103:"Dining Room",104:"Bathroom Main",105:"Bathroom Guest",
106:"Bedroom Master", 107:"Bedroom Guest", 108:"Conference Room", 109:"Basement", 110:"Attic", 200:"Other"}
s_table = ["temperature", "humidity", "pressure", "light", "radar", "voc0", "voc1", "voc2", "voc3", "voc4", "voc5", "voc6", "voc7", "voc8", "voc9"]
location_indexes = {}
for i in location_names:
location_indexes[location_names[i]] = i
logger.warning("Well-api started")
load_dotenv()
DB_NAME = os.getenv('DB_NAME')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST')
DB_PORT = os.getenv('DB_PORT')
MINIO_ACCESS_KEY = os.getenv('MINIO_ACCESS_KEY')
MINIO_SECRET_KEY = os.getenv('MINIO_SECRET_KEY')
MINIO_HOST = "192.168.68.64"#os.getenv('MINIO_HOST')
MINIO_PORT = os.getenv('MINIO_PORT')
DAILY_MAPS_BUCKET_NAME = os.getenv('DAILY_MAPS_BUCKET_NAME')
JWT_SECRET = os.getenv('JWT_SECRET')
MASTER_ADMIN = os.getenv('MASTER_ADMIN')
MASTER_PS = os.getenv('MASTER_PS')
use_pdb = True
debug = True
debug_string = ""
logger.debug(f"Environment variables: {os.environ}")
filesDir = "/home/app/well_web_storage" #os.path.dirname(os.path.realpath(__file__))
min_io_address = MINIO_HOST + ":" + MINIO_PORT
logging.basicConfig(level=logging.DEBUG)
# Use the credentials we just created
#miniIO_blob_client = Minio(
#"192.168.68.64:9000",
#access_key="well_pipe", # The user we created
#secret_key="WellNuo_2024", # The password we set
#secure=False
#)
miniIO_blob_client = Minio(min_io_address, access_key=MINIO_ACCESS_KEY, secret_key=MINIO_SECRET_KEY, secure=False)
## Test operations
#try:
## First just try to list buckets
#print("Attempting to list buckets...")
#buckets = miniIO_blob_client.list_buckets()
#print("Available buckets:")
#for bucket in buckets:
#print(f"- {bucket.name}")
## Try operations with the 'daily-maps' bucket
#bucket_name = "daily-maps"
## Check if bucket exists
#print(f"\nChecking if bucket '{bucket_name}' exists...")
#if miniIO_blob_client.bucket_exists(bucket_name):
#print(f"Bucket '{bucket_name}' exists")
## Try to put a small test object
#print("\nTrying to upload test object...")
#test_data = b"Hello MinIO!"
#miniIO_blob_client.put_object(
#bucket_name,
#"test-file.txt",
#io.BytesIO(test_data),
#len(test_data)
#)
#print("Upload successful!")
#except S3Error as e:
#print(f"\nS3Error occurred:")
#print(f"Error code: {e.code}")
#print(f"Error message: {e.message}")
#print(f"Request ID: {getattr(e, 'request_id', 'N/A')}")
#print(f"Bucket name: {getattr(e, 'bucket_name', 'N/A')}")
#except Exception as e:
#print(f"\nUnexpected error: {type(e).__name__}")
#print(f"Error message: {str(e)}")
user_id_2_user = {}
smell_min = 1
smell_max = 102400000 - 1
sensor_legal_values = {"radar": (0,1000, 1), "co2": (1, 15999, 31), "humidity": (1,99, 31), "light": (0, 4095, 1),
"pressure": (0, 10000, 5), "temperature": (1, 60, 31), "voc": (smell_min, smell_max, 31), "voc0": (smell_min, smell_max, 31),
"voc1": (smell_min, smell_max, 31), "voc2": (smell_min, smell_max, 31), "voc3": (smell_min, smell_max, 31), "voc4": (smell_min, smell_max, 31),
"voc5": (smell_min, smell_max, 31), "voc6": (smell_min, smell_max, 31), "voc7": (smell_min, smell_max, 31), "voc8": (smell_min, smell_max, 31), "voc9": (smell_min, smell_max, 31),
"s0": (smell_min, smell_max, 31), "s1": (smell_min, smell_max, 31), "s2": (smell_min, smell_max, 31), "s3": (smell_min, smell_max, 31), "s4": (smell_min, smell_max, 31),
"s5": (smell_min, smell_max, 31), "s6": (smell_min, smell_max, 31), "s7": (smell_min, smell_max, 31), "s8": (smell_min, smell_max, 31), "s9": (smell_min, smell_max, 31)}
def get_db_connection():
return psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT)
def generate_token(username):
expiration = datetime.datetime.now(timezone.utc) + timedelta(hours=24)
token = jwt.encode({"username": username, "exp": expiration}, JWT_SECRET, algorithm="HS256")
return token
def read_file(file_name, source = "LOCAL", type_ = "TEXT", bucket_name="daily-maps"):
blob_data = ""
if source == "MINIO":
blob_data = ReadObjectMinIO(bucket_name, file_name)
elif source == "LOCAL":
login_file = os.path.join(filesDir, file_name)
login_file = login_file.replace("\\","/")
logger.debug(f"Full file path: {login_file}")
logger.debug(f"File exists: {os.path.exists(login_file)}")
#print(login_file)
if type_ == "TEXT":
with open(login_file, encoding="utf8") as f:
blob_data = f.read()
else:
with open(login_file, 'rb') as f:
blob_data = f.read()
elif source == "AZURE":
try:
blob_data = ""#container_client.download_blob(file_name).readall()
except Exception as err:
logger.error("Not reading Azure blob "+str(err))
blob_data = ""
return blob_data
else:
pass
return blob_data
def verify_token(token):
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=["HS256"])
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def ReadObjectMinIO(bucket_name, file_name):
try:
# Retrieve the object data
response = miniIO_blob_client.get_object(bucket_name, file_name)
# Read the data from response
data_bytes = response.read()
# Convert bytes to string and then load into a dictionary
data_string = data_bytes.decode('utf-8')
# Don't forget to close the response
response.close()
response.release_conn()
return data_string
except S3Error as e:
logger.error(f"An error occurred: {e}")
return None
def package_response(content, status=falcon.HTTP_200):
"""
Format the HTTP response.
:param content: The content to be returned in the response.
:param status: HTTP status code (default is 200 OK).
:return: A dictionary containing the formatted response.
"""
if isinstance(content, str):
# If content is a string, try to parse it as JSON
try:
response = json.loads(content)
except json.JSONDecodeError:
# If it's not valid JSON, use it as message
response = {"message": content}
elif isinstance(content, dict):
# If content is a dictionary, serialize it with datetime handling
try:
# First serialize to JSON string with datetime handling
json_str = json.dumps(content, default=datetime_handler)
# Then parse back to dict
response = json.loads(json_str)
except TypeError as e:
response = {"message": f"Serialization error: {str(e)}"}
else:
# For any other type, convert to string and use as message
response = {"message": str(content)}
# Add status code to the response
response["status"] = status
# Handle specific status codes
if status == falcon.HTTP_400:
response["error"] = "Bad Request"
elif status == falcon.HTTP_401:
response["error"] = "Unauthorized"
elif status == falcon.HTTP_500:
response["error"] = "Internal Server Error"
return response
def GetPriviledges(conn, user_name, password):
sql = "SELECT key, access_to_deployments, user_id FROM public.person_details WHERE user_name = '" + user_name + "'"
with conn.cursor() as cur:
cur.execute(sql)
result = cur.fetchall()#cur.fetchone()
if result != None:
if result[0][0] == password:
return result[0][1], result[0][2]
else:
return "0", "0"
else:
return "0", "0"
def GetPriviledgesOnly(user):
with get_db_connection() as conn:
if isinstance(user, (int)) or user.isdigit():
sql = "SELECT access_to_deployments FROM public.person_details WHERE user_id = " + user
else:
sql = "SELECT access_to_deployments FROM public.person_details WHERE user_name = '" + user + "'"
with conn.cursor() as cur:
cur.execute(sql)
result = cur.fetchall()#cur.fetchone()
if result != None:
return result[0][0]
else:
return "0"
def ListDeployments(priviledges, user_id):
global user_id_2_user
conn = get_db_connection()
if priviledges == "-1":
sql = "SELECT * FROM public.deployments ORDER BY deployment_id ASC;"
else:
sql = f"SELECT * FROM public.deployments WHERE deployment_id IN ({priviledges}) OR user_edit = {user_id} ORDER BY deployment_id ASC;"
try:
with conn.cursor() as cur:
cur.execute(sql)
result = cur.fetchall()#cur.fetchone()
if result == None:
complete_result = []
else:
deployment_ids = []
deployment_records_dict = {}
for record in result:
deployment_id = record[0]
deployment_ids.append(deployment_id)
deployment_records_dict[deployment_id] = record
sql = f"SELECT * FROM public.deployment_details WHERE deployment_id IN ({",".join(map(str, deployment_ids))}) ORDER BY deployment_id ASC;"
cur.execute(sql)
details_result = cur.fetchall()
beneficiary_ids = []
for record_details in details_result:
if record_details[1] != None and record_details[1] not in beneficiary_ids:
beneficiary_ids.append(record_details[1])
sql = f"SELECT * FROM public.person_details WHERE user_id IN ({",".join(map(str, beneficiary_ids))});"
cur.execute(sql)
user_id_2_user = {}
users = cur.fetchall()#cur.fetchone()
for usr_record in users:
user_id_2_user[usr_record[0]] = usr_record
complete_result = []
if details_result != None:
for record_details in details_result:
deployment_record = deployment_records_dict[record_details[0]]
complete_record = {'deployment_id': record_details[0], 'beneficiary_id': record_details[1], 'caretaker_id': record_details[2],
'owner_id': record_details[3], 'installer_id': record_details[4],
'address_street': record_details[6], 'address_city': record_details[7], 'address_zip': record_details[8],
'address_state': record_details[9], 'address_country': record_details[10],
'devices': record_details[5], 'wifis': record_details[11], 'persons': deployment_record[4], 'gender': deployment_record[5],
'race': deployment_record[6], 'born': deployment_record[7], 'pets': deployment_record[8], 'time_zone': deployment_record[3]
}
complete_result.append(complete_record)
except:
logger.debug(f"Error: {traceback.format_exc()}")
return complete_result
def ListCaretakers():
conn = get_db_connection()
sql = "SELECT * FROM public.person_details WHERE role_ids LIKE '%2%' ORDER BY last_name;" #2 is caretaker
with conn.cursor() as cur:
cur.execute(sql)
result = cur.fetchall()#cur.fetchone()
if result == None:
result = []
return result
def ListBeneficiaries(privilidges, user_info):
conn = get_db_connection()
with conn.cursor() as cur:
if (privilidges == "-1"):
sql = "SELECT * FROM public.person_details WHERE role_ids LIKE '%1%' ORDER BY last_name;" #1 is beneficiary
else:
#we need to find beneficiaries from list of deployments
sql = f"SELECT beneficiary_id FROM public.deployment_details WHERE deployment_id IN ({privilidges}) ORDER BY deployment_id ASC;"
cur.execute(sql)
result1 = cur.fetchall()#cur.fetchone()
if result1 == None:
result = []
return result
beneficiaries = ",".join(str(x[0]) for x in result1)
sql = f"SELECT * FROM public.person_details WHERE user_id IN ({beneficiaries}) OR user_edit = {user_info} AND role_ids LIKE '%1%' ORDER BY last_name;" #1 is beneficiary
logger.debug(f"sql= {sql}")
cur.execute(sql)
result = cur.fetchall()#cur.fetchone()
if result == None:
result = []
return result
def UserDetails(user_id):
conn = get_db_connection()
sql = "SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name = 'person_details';"
with conn.cursor() as cur:
cur.execute(sql)
columns_names = cur.fetchall()
sql = "SELECT * FROM public.person_details WHERE user_id = "+user_id
caretaker_record = {}
with conn.cursor() as cur:
cur.execute(sql)
result = cur.fetchone() #cur.fetchall()
if result != None:
cnt = 0
for field in columns_names:
caretaker_record[field[0]] = result[cnt]
cnt += 1
return caretaker_record
def DeploymentDetails(deployment_id):
deployment_record = {}
conn = get_db_connection()
with conn.cursor() as cur:
sql = "SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name = 'deployment_details';"
cur.execute(sql)
columns_names = cur.fetchall()
sql = "SELECT * FROM public.deployment_details WHERE deployment_id = "+deployment_id
cur.execute(sql)
result = cur.fetchone() #cur.fetchall()
if result != None:
cnt = 0
for field in columns_names:
deployment_record[field[0]] = result[cnt]
cnt += 1
sql = "SELECT column_name FROM information_schema.columns WHERE table_schema = 'public' AND table_name = 'deployments';"
cur.execute(sql)
columns_names1 = cur.fetchall()
sql = "SELECT * FROM public.deployments WHERE deployment_id = "+deployment_id
cur.execute(sql)
result = cur.fetchone() #cur.fetchall()
if result != None:
cnt = 0
for field in columns_names1:
deployment_record[field[0]] = result[cnt]
cnt += 1
return deployment_record
def ValidUser(user_name, password):
if use_pdb:
with get_db_connection() as db_conn:
priviledges, user_id= GetPriviledges(db_conn, user_name, password)
return priviledges, user_id
else:
pass
#container = GetReference("/MAC")
#try:
## We can do an efficient point read lookup on partition key and id
##response = container.read_item(item="64B708896BD8_temperature_2024-01-01_00", partition_key="64B708896BD8") #OK
##items = query_items(container, '64B708896BD8') #Too slow
##AddToLog("1!")
#privileges = GetCaretakers(container, email, password)
#return privileges
#except Exception as err:
#AddToLog("Error !1 "+str(err))
def AddToLog(str_s):
global debug_string
if debug:
try:
if isinstance(str_s, str):
debug_string = debug_string + str_s + "\n"
logging.info(str_s)
elif isinstance(str_s, (int, float)):
debug_string = debug_string + str(str_s) + "\n"
logging.info(str(str_s))
#debug_string = debug_string + str(type(str_s)) + "\n"
elif isinstance(str_s, (list, complex, dict)):
debug_string = debug_string + str(str_s) + "\n"
logging.info(str(str_s))
#debug_string = debug_string + str(type(str_s)) + "\n"
else:
debug_string = debug_string + str(type(str_s)) + "\n"#"Err:Unknown type!\n"
logging.info(str(str_s))
except Exception as err:
debug_string = debug_string + str(err) + "\n"
else:
print(str_s)
def SelectOption(html_code, select_id, selected_item):
"""
Modifies HTML code to set the selected attribute for a specific option in a select element.
Args:
html_code (str): Original HTML code
select_id (str): ID of the select element to modify
selected_item (str or int): Value of the option to be selected
Returns:
str: Modified HTML code with the selected attribute added
"""
# Convert selected_item to string for comparison
selected_item = str(selected_item)
# Find the select element with the given ID
select_pattern = rf''
select_match = re.search(select_pattern, html_code, re.IGNORECASE | re.DOTALL)
if not select_match:
return html_code # Return unchanged if select element not found
select_content = select_match.group(0)
select_content_orig = select_content
# Remove any existing selected attributes
select_content = re.sub(r'\s+selected(?=[>\s])', '', select_content, flags=re.IGNORECASE)
# Add selected attribute to the matching option
def replace_option(match):
value = re.search(r'value=[\'"]?([^\'">\s]+)', match.group(0))
if value and value.group(1) == selected_item:
# Add selected attribute before the closing >
return match.group(0).rstrip('>') + ' selected>'
return match.group(0)
modified_select = re.sub(
r'
if include_all:
selector_string = f' \n'
else:
selector_string = ''
for deployment in deployments:
first_name = ""
last_name = ""
if deployment[1] != None:
first_name = deployment[1]
if deployment[2] != None:
last_name = deployment[2]
if deployment[0] == int(selected):
choice_string = f' \n'
else:
choice_string = f' \n'
selector_string = selector_string + choice_string
#print(selector_string)
html_string = html_string.replace("###INSTALLS###",selector_string)
return html_string
def GetDeviceDetails(cur, deployment_ids, location_id):
#ID, Well id, MAC, Last_Message, Location, Description, Deployment
macs = [mac for _, mac in deployment_ids]
#macs = list(deployment_ids.keys())
macs_string_nq = ",".join(macs)
macs_string = "'" + "','".join(macs) + "'"
if location_id == -1:
sql = f"""
WITH ordered_macs AS (
SELECT unnest(string_to_array('{macs_string_nq}', ',')) as mac,
generate_series(1, array_length(string_to_array('{macs_string_nq}', ','), 1)) as position
)
SELECT d.*
FROM public.devices d
JOIN ordered_macs om ON d.device_mac = om.mac::text
WHERE device_mac IN ({macs_string})
ORDER BY om.position;
"""
else:
sql = f"""
WITH ordered_macs AS (
SELECT unnest(string_to_array('{macs_string_nq}', ',')) as mac,
generate_series(1, array_length(string_to_array('{macs_string_nq}', ','), 1)) as position
)
SELECT d.*
FROM public.devices d
JOIN ordered_macs om ON d.device_mac = om.mac::text
WHERE device_mac IN ({macs_string}) AND location = {location_id}
ORDER BY om.position;
"""
cur.execute(sql)
devices_ids_records = cur.fetchall()
all_details = []
devices_ids_list = [x[0] for x in devices_ids_records]
device_ids_string = ",".join(map(str, devices_ids_list))
#sql = f"SELECT device_id, MAX(time) as last_reading_time FROM sensor_readings WHERE device_id IN ({device_ids_string}) GROUP BY device_id" #to slow
sql = f"SELECT DISTINCT ON (device_id) device_id, time as last_reading_time FROM sensor_readings WHERE device_id IN ({device_ids_string}) ORDER BY device_id, time DESC"
cur.execute(sql)
devices_times = cur.fetchall()#cur.fetchone()
found_device_details = {}
for device_record in devices_times:
device_id, last_message_time = device_record
found_device_details[device_id] = last_message_time
cnt = 0
for device_table_record in devices_ids_records:
if len(devices_times) > 0:
if device_id in found_device_details:
last_message_time = found_device_details[device_id]
last_message_epoch = int(last_message_time.timestamp())
else:
last_message_time = int(device_table_record[14])
last_message_epoch = last_message_time
else:
last_message_time = 0
last_message_epoch = 0
#print(last_message_epoch)
#print(type(last_message_epoch))
device_id = device_table_record[0]
mac = device_table_record[1]
well_id = device_table_record[2]
description = device_table_record[3]
location_id = device_table_record[4]
row_data = [device_id, well_id, mac, last_message_epoch, location_names[location_id], description, deployment_ids[cnt][0]]
cnt += 1
all_details.append(row_data)
return all_details
def GetVisibleDevices(deployments):
devices_details = []
with get_db_connection() as conn:
#list all devices that user has access to
if deployments == "-1":
sql = "SELECT deployment_id, devices FROM public.deployment_details"
else:
sql = f"SELECT deployment_id, devices FROM public.deployment_details WHERE deployment_id IN ({deployments})"
with conn.cursor() as cur:
cur.execute(sql)
devices_groups = cur.fetchall()#cur.fetchone()
deployment_ids = []
for deployment_id, dev_group in devices_groups:
if len(dev_group) > 10:
macs_group = literal_eval(dev_group)
for mac in macs_group:
deployment_ids.append((deployment_id, mac))
devices_details = GetDeviceDetails(cur, deployment_ids, -1)
#devices_details.append(devices_detail)
return devices_details
def GetVisibleDevicesPerLocation(deployments, location):
devices_details = []
with get_db_connection() as conn:
#list all devices that user has access to
if deployments == "-1" or deployments == "0":
sql = "SELECT deployment_id, devices FROM public.deployment_details"
else:
sql = f"SELECT deployment_id, devices FROM public.deployment_details WHERE deployment_id IN ({deployments})"
with conn.cursor() as cur:
cur.execute(sql)
devices_groups = cur.fetchall()#cur.fetchone()
deployment_ids = []
for deployment_id, dev_group in devices_groups:
if len(dev_group) > 10:
macs_group = literal_eval(dev_group)
for mac in macs_group:
deployment_ids.append((deployment_id, mac))
devices_details = GetDeviceDetails(cur, deployment_ids, location_indexes[location])
#devices_details.append(devices_detail)
return devices_details
def GetUsersFromDeployments(deployments):
#list all devices that user has access to
deployments_dets = []
with get_db_connection() as conn:
try:
if deployments == "-1":
sql = f"""
SELECT dd.deployment_id, pd.first_name, pd.last_name
FROM deployment_details dd
JOIN person_details pd ON dd.beneficiary_id = pd.user_id
ORDER BY dd.deployment_id;
"""
else:
sql = f"""
SELECT dd.deployment_id, pd.first_name, pd.last_name
FROM deployment_details dd
JOIN person_details pd ON dd.beneficiary_id = pd.user_id
WHERE dd.deployment_id IN ({deployments})
ORDER BY dd.deployment_id;
"""
with conn.cursor() as cur:
cur.execute(sql)
deployments_dets = cur.fetchall()#cur.fetchone()
except Exception as err:
logger.error("GetUsersFromDeployments "+str(err) +" "+sql)
return deployments_dets
def ToList(input_data):
# If input is already a list
if isinstance(input_data, list):
return [str(x).strip() for x in input_data]
# If input is string
if isinstance(input_data, str):
# Remove outer brackets if present
cleaned = input_data.strip('()')
cleaned = cleaned.strip('[]')
# Remove extra quotes
cleaned = cleaned.replace('"', '').replace("'", '')
# Split by comma and clean each element
return [x.strip() for x in cleaned.split(',')]
raise ValueError(f"Unsupported input type: {type(input_data)}")
def MACsToWellIds(cur, macs_list):
macs_string = ",".join(f"'{mac}'" for mac in macs_list)
sqlr = f"SELECT well_id, device_mac, device_id, location, description FROM public.devices WHERE device_mac IN ({macs_string})"
print (sqlr)
macs_map = {}
cur.execute(sqlr)
proximitys_list = cur.fetchall()
for well_id, mac, device_id, location, description in proximitys_list:
macs_map[mac] = (well_id, device_id, location_names[location], description, mac)
device_ids = []
device_list = []
for mac in macs_list:
device_ids.append(macs_map[mac][1])
device_list.append(macs_map[mac])
return device_ids, device_list
def ReadCleanStringDB(cur, sql):
cur.execute(sql)
temp_string = cur.fetchone()
if temp_string == None:
return ""
else:
return str(temp_string[0]).strip()
def GetProximityList(deployment_id, epoch_from_file_s):
#both are valid:
#64B70888FA84,64B70888F6F0,64B70888F860,64B70889062C,64B70888FAB0,64B708896BDC,64B708897428
#['64B70888FA84', '64B70888F6F0', '64B70888F860', '64B70889062C', '64B70888FAB0', '64B708896BDC', '64B708897428']
#result_list = []
#well_ids = []
with get_db_connection() as conn:
sqlr = f"""
SELECT * FROM (
SELECT proximity
FROM public.deployment_history
WHERE deployment_id = {deployment_id}
AND time <= {epoch_from_file_s}
ORDER BY time DESC
LIMIT 1
) AS latest_deployment
"""
#print (sqlr)
with conn.cursor() as cur:
devices_string = ReadCleanStringDB(cur, sqlr)
if devices_string == "":
sqlr = f"SELECT devices from public.deployment_details WHERE deployment_id ={deployment_id}"
#print (sqlr)
devices_string = ReadCleanStringDB(cur, sqlr)
if devices_string == "":
return []
macs_list = ToList(devices_string)
device_ids, device_list = MACsToWellIds(cur, macs_list)
return device_list, device_ids
def FilterList(to_filter: str, allowed: str) -> str:
# Convert comma-separated strings to sets
filter_set = set(to_filter.split(','))
allowed_set = set(allowed.split(','))
# Find intersection and sort the result
filtered = sorted(filter_set.intersection(allowed_set), key=int)
# Join back to comma-separated string
return ','.join(filtered)
def GetMatchingDevices(privileges, group, deployment, location):
global LocationsMap
results=[]
if privileges != "-1":
if deployment == "" or deployment == "0":
deployment = privileges
privileges_list = privileges.split(',')
if deployment != "0":
if "," in deployment:
deployment = FilterList(deployment, privileges)
else:
if deployment not in privileges_list:
return results
else:
if deployment == "0":
deployment = "-1"
devices = GetVisibleDevicesPerLocation(deployment, location)
return devices
def getOldestDeploymentHistoryFromBeneficiary(deployment_id):
#this will return oldest entry as well as last proximity (devices)
results=[]
well_ids_last = [] #this needs to be list of tuples (well_id, Location_st, Description)
oldest_time = None
try:
with get_db_connection() as conn:
sqlr = f"""
SELECT * FROM (
SELECT time, proximity
FROM public.deployment_history
WHERE deployment_id = {deployment_id}
ORDER BY time ASC
) AS latest_deployment
"""
print (sqlr)
with conn.cursor() as cur:
cur.execute(sqlr)
results = cur.fetchall()
#lets find which of historical sets has data in DB
if results == None or results == []: #look in deployment_details
sqlr = f"SELECT devices from public.deployment_details WHERE deployment_id ={deployment_id}"
#print (sqlr)
devices_string = ReadCleanStringDB(cur, sqlr)
macs_list = ToList(devices_string)
device_ids_last, device_alls_last = MACsToWellIds(cur, macs_list)
sql_query = """
SELECT MIN(time) as oldest_record_time
FROM sensor_readings
WHERE device_id = ANY(%s);
"""
try:
cur.execute(sql_query, (device_ids_last,))
results1 = cur.fetchall()
oldest_time = results1[0][0]
except:
pass
else:
history_entry = results[-1]
macs_list = ToList(history_entry[1])
device_ids_last, device_alls_last = MACsToWellIds(cur, macs_list)
for history_entry in results:
macs_list = ToList(history_entry[1])
device_ids, device_alls = MACsToWellIds(cur, macs_list)
sql_query = """
SELECT MIN(time) as oldest_record_time
FROM sensor_readings
WHERE device_id = ANY(%s);
"""
try:
cur.execute(sql_query, (device_ids,))
results1 = cur.fetchall()
oldest_time = results1[0][0]
if oldest_time != None:
break
except:
pass
except Exception as e:
AddToLog(traceback.format_exc())
return oldest_time, device_alls_last
def getLastEditedBeneficiary(beneficiary):
#lets generate token here to elliminate issues with outdated token...
token = generate_token(beneficiary)
url = 'https://well-api.azurewebsites.net/api/well_api'
params = {
"name": "beneficiary_detail",
"beneficiary": beneficiary,
"token": token
}
#{"id": "user_beneficiary_bernhard@wellnuo.com", "MAC": "BENEFICIARY", "email": "bernhard@wellnuo.com", "edit_date": "Fri Aug 16 06:45:01 2024", "c_password": "bern1", "first_name": "Bernhard", "last_name": "Knigge", "address": "776 Dubanski Dr.", "address_city": "San Jose", "address_state": "CA", "address_zip": "95123", "address_country": "United States", "phone_number": "4087055709", "persons": "2", "gender": "M", "race": "W", "born": "1972", "pets": "1", "creds": "", "devs": "[[203, 'Living Room', '', '64B708890B14'], [251, 'Bathroom', '', '64B7088909E8'], [252, 'Bedroom', '', '64B708890734'], [204, 'Bathroom', 'Guest', '64B708890288'], [201, 'Kitchen', 'toaster', '64B708890584'], [202, 'Kitchen', 'stove', '64B7088906D8'], [205, 'Office', '', '64B708897018']]", "tzone": "America/Los_Angeles", "ttl": -1, "_rid": "R60hANIG-K+qTQIAAAAAAg==", "_self": "dbs/R60hAA==/colls/R60hANIG-K8=/docs/R60hANIG-K+qTQIAAAAAAg==/", "_etag": "\"3500a0ae-0000-0800-0000-66bef56d0000\"", "_attachments": "attachments/", "_ts": 1723790701}
response = requests.get(url, params=params)
if response.status_code == 200:
text = response.text
#print(text)
if text == "Log-Out":
return text
if text[0] == "{":
data = json.loads(response.text)
date_string = data["edit_date"]
parsed_date = datetime.datetime.strptime(date_string, '%c')
# Convert the datetime object to a timestamp (epoch time)
epoch_str = str(time.mktime(parsed_date.timetuple()))
devices = data["devs"]
return(epoch_str, devices)
else:
return text,""
else:
logger.debug((f"Failed to retrieve the data, status code: {response.status_code}"))
return "",""
def GetDeploymentNameFromId(Id):
con = sqlite3.connect(main_db)
con.text_factory = str
cur = con.cursor()
results=[]
SQL = "SELECT name FROM deployments WHERE id =" + Id
df = cur.execute(SQL)
results = cur.fetchall()
if len(results) > 0:
return results[0][0]
else:
return ""
def GetTimeZoneOfDeployment(deployment_id):
time_zone_st = 'America/Los_Angeles'
with get_db_connection() as conn:
with conn.cursor() as cur:
sqlr = f"SELECT time_zone_s from public.deployments WHERE deployment_id ={deployment_id}"
time_zone_st = ReadCleanStringDB(cur, sqlr)
return time_zone_st
def StringToEpoch(date_string, time_zone_s):
"""
Convert a date string to epoch timestamp for start of day (midnight) in specified timezone
Args:
date_string (str): Date in 'YYYY-MM-DD' format
time_zone_s (str): Timezone string (e.g. 'America/Los_Angeles')
Returns:
float: Epoch timestamp in seconds
"""
# Parse the date string
date_format = '%Y-%m-%d'
naive_date = datetime.datetime.strptime(date_string, date_format)
# Get the timezone
timezone = pytz.timezone(time_zone_s)
# Localize the date to midnight in the specified timezone
local_date = timezone.localize(naive_date)
# Convert to epoch timestamp
epoch_time = local_date.timestamp()
return epoch_time
def LocalDateToUTCEpoch(local_date_str, time_zone_s):
"""
Convert a date string to epoch timestamp for start of day (midnight) in UTC
Args:
local_date_str (str): Date in 'YYYY-MM-DD' format
time_zone_s (str): Timezone string (e.g. 'America/Los_Angeles')
Returns:
float: Epoch UTC timestamp in seconds
"""
timezone = pytz.timezone(time_zone_s)
# Parse the date string
date_format = '%Y-%m-%d'
local_datetime = datetime.datetime.strptime(local_date_str, date_format)
local_datetime = timezone.localize(local_datetime)
utc_datetime = local_datetime.astimezone(pytz.UTC)
epoch_time = int(utc_datetime.timestamp())
return epoch_time
def GetDeploymentDatesBoth(deployment_in):
#when looking at the date, date is defined in TZ where device is!
#Lets take oldest data from first member of deployment
date_list = []
time_zone_st = GetTimeZoneOfDeployment(deployment_in)
oldest_date_dt_utc, devices_all = getOldestDeploymentHistoryFromBeneficiary(deployment_in)
if oldest_date_dt_utc != None:
#get date in local time zone from UTC datetime
#oldest_date_dt
# Get today's date
local_timezone = pytz.timezone(time_zone_st) # Replace with your local timezone
oldest_date_dt_local = oldest_date_dt_utc.astimezone(local_timezone)
today_date = datetime.datetime.now(local_timezone)
# Generate a list of date strings from oldest_date to today in inverted order
date_list = [(today_date - timedelta(days=x)).strftime('%Y-%m-%d') for x in range((today_date - oldest_date_dt_local).days + 1)]
return date_list, devices_all
def check_file_exists(file_name, bucket_name="daily-maps"):
try:
# Try to get the object's stats - this will raise an exception if the object doesn't exist
stat_result = miniIO_blob_client.stat_object(bucket_name, file_name)
last_modified_utc = stat_result.last_modified
return True, last_modified_utc
except S3Error as e:
if e.code == 'NoSuchKey':
return False, 0
# Re-raise if it's a different error
raise
def SaveImageInBlob(file_name, arr_stretched):
AddToLog(f"@@2")
try:
# Encode the image to a memory buffer using imencode
success, encoded_image = cv2.imencode('.png', arr_stretched)
AddToLog(f"success={success}")
if not success:
raise Exception("Could not encode image!")
AddToLog(f"DAILY_MAPS_BUCKET_NAME={DAILY_MAPS_BUCKET_NAME}")
image_bytes = encoded_image.tobytes()
AddToLog(f"len(image_bytes)={len(image_bytes)}")
miniIO_blob_client.put_object(
DAILY_MAPS_BUCKET_NAME,
file_name,
io.BytesIO(image_bytes),
len(image_bytes))
except Exception as e:
AddToLog(f"{traceback.format_exc()}")
logger.error(f"{traceback.format_exc()}")
def GetLocalTimeForDate(selected_date, time_zone_s):
# Parse the selected date
local_tz = pytz.timezone(time_zone_s)
# Convert selected_date string to datetime object (start of day in local time)
local_date = datetime.datetime.strptime(selected_date, "%Y-%m-%d")
local_start = local_tz.localize(local_date)
# Get the next day
local_next = local_start + timedelta(days=1)
# Convert to UTC
utc_start = local_start.astimezone(pytz.UTC)
utc_next = local_next.astimezone(pytz.UTC)
# Format as strings
time_from_str = utc_start.strftime("%Y-%m-%d %H:%M:%S")
time_to_str = utc_next.strftime("%Y-%m-%d %H:%M:%S")
return time_from_str + "+0000", time_to_str + "+0000"
def GetLocalTimeEpochsForDate(selected_date, time_zone_s):
"""
Get start and end of day epochs for a given date in a specific timezone.
Args:
selected_date (str): Date in "YYYY-MM-DD" format
time_zone_s (str): Timezone string (e.g., "America/New_York")
Returns:
tuple: (start_epoch, end_epoch) - Unix timestamps for start and end of day
"""
# Parse the selected date
local_tz = pytz.timezone(time_zone_s)
# Convert selected_date string to datetime object (start of day in local time)
local_date = datetime.datetime.strptime(selected_date, "%Y-%m-%d")
local_start = local_tz.localize(local_date)
# Get the next day
local_next = local_start + timedelta(days=1)
# Convert to UTC
utc_start = local_start.astimezone(pytz.UTC)
utc_next = local_next.astimezone(pytz.UTC)
# Convert to epochs (Unix timestamps)
start_epoch = int(utc_start.timestamp())
end_epoch = int(utc_next.timestamp())
return start_epoch, end_epoch
def UTC2Local(utc_time, time_zone_s):
# Parse the selected date
local_tz = pytz.timezone(time_zone_s)
# Convert selected_date string to datetime object (start of day in local time)
#local_date = datetime.datetime.strptime(selected_date, "%Y-%m-%d")
local_start = local_tz.localize(selected_date)
# Convert to UTC
utc_start = local_start.astimezone(pytz.UTC)
utc_next = local_next.astimezone(pytz.UTC)
# Format as strings
time_from_str = utc_start.strftime("%Y-%m-%d %H:%M:%S")
time_to_str = utc_next.strftime("%Y-%m-%d %H:%M:%S")
return time_from_str + "+0000", time_to_str + "+0000"
def get_timezone_aware_datetime(time_str, timezone_str="America/Los_Angeles"):
"""
Convert a naive datetime string to a timezone-aware datetime object.
Parameters:
time_str: String in format 'YYYY-MM-DD HH:MM:SS'
timezone_str: String representing the timezone (default: "America/Los_Angeles")
Returns:
datetime: A timezone-aware datetime object
"""
# Parse the naive datetime
naive_dt = datetime.datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')
# Get the timezone
tz = pytz.timezone(timezone_str)
# Localize the datetime (make it timezone-aware)
# localize() is the correct way to do this, as it handles DST transitions properly
aware_dt = tz.localize(naive_dt)
return aware_dt
def fast_fill_array_from_timescale(day_data, time_from_str, devices_list, arr_source, timezone_str="Europe/Berlin"):
"""
Optimized version of array filling from TimeScaleDB data.
Uses vectorized operations for significant speed improvement.
"""
# Convert start time to timezone-aware datetime
start_time = datetime.datetime.strptime(time_from_str, '%Y-%m-%d %H:%M:%S%z')
#start_time = start_time.replace(tzinfo=timezone.utc)
# Create device index mapping
device_to_index = {device_id: idx for idx, device_id in enumerate(devices_list)}
# Define column mappings (sensor type to position in record)
columns = {
'avg_temperature': 2,
'avg_humidity': 3,
'pressure_amplitude': 4,
'max_light': 5,
'radar': 6
}
# Add sensor columns dynamically
for i in range(10):
columns[f'sensor_min_s{i}'] = i + 7
# Pre-process data into a more efficient structure
# Group by device_id to reduce lookup operations
device_data = defaultdict(list)
for record in day_data:
if record[0] and record[1]: # If time and device_id exist
device_data[record[1]].append(record)
# Process each device's data in bulk
for device_id, records in device_data.items():
if device_id not in device_to_index:
continue
base_idx = device_to_index[device_id] * len(columns)
# Convert records to numpy array for faster processing
records_array = np.array(records, dtype=object)
# Calculate all minute deltas at once
times = records_array[:, 0]
#print(times[0], start_time, (times[0] - start_time).total_seconds())
minute_deltas = np.array([(t - start_time).total_seconds() / 60 for t in times], dtype=int)
# Filter valid minute deltas
valid_mask = (minute_deltas >= 0) & (minute_deltas < arr_source.shape[1])
if not np.any(valid_mask):
continue
minute_deltas = minute_deltas[valid_mask]
records_array = records_array[valid_mask]
# Process each column type in bulk
for col_name, col_offset in columns.items():
row_idx = base_idx + list(columns.keys()).index(col_name)
values = records_array[:, col_offset]
# Filter out None values
valid_values = ~np.equal(values, None)
if not np.any(valid_values):
continue
# Update array in bulk
arr_source[row_idx, minute_deltas[valid_values]] = values[valid_values]
return arr_source
def BestColor(in_val):
#this function uses numbers from 0 to 1279 to convert to rainbow from Blue to Red(1024) to Violet 1279
r,g,b=0,0,0
in_val = int(in_val)
if(in_val > 1279):
in_val = 1279
if (in_val < 256):
r = 255
g = in_val
elif (in_val < 512):
r = 511 - in_val
g = 255
elif (in_val < 768):
g = 255
b = in_val-512
elif (in_val < 1024):
g = 1023 - in_val
b = 255
else:
r = in_val - 1024
b = 255
#if (r > 255):
# print(in_val)
# print(int(r),int(g),int(b))
return(int(r),int(g),int(b))
def fill_array_from_timescale(day_data, time_from_str, devices_list, arr_source, timezone_str):
"""
Fill numpy array with data from TimeScaleDB query results.
Parameters:
day_data: List of tuples from database query
time_from_str: Starting datetime string in format 'YYYY-MM-DD HH:MM:SS'
devices_list: List of device IDs
arr_source: Pre-initialized numpy array to fill
Returns:
numpy.ndarray: Filled array
"""
# Parse the start time
start_time = get_timezone_aware_datetime(time_from_str, timezone_str)
# Create mapping of device_ids to their index positions
device_to_index = {device_id: idx for idx, device_id in enumerate(devices_list)}
# Define columns and their positions in the result tuple
columns = {
'avg_temperature': 2,
'avg_humidity': 3,
'pressure_amplitude': 4,
'max_light': 5,
'radar': 6,
'sensor_min_s0': 7,
'sensor_min_s1': 8,
'sensor_min_s2': 9,
'sensor_min_s3': 10,
'sensor_min_s4': 11,
'sensor_min_s5': 12,
'sensor_min_s6': 13,
'sensor_min_s7': 14,
'sensor_min_s8': 15,
'sensor_min_s9': 16
}
# Process each record
for record in day_data:
# Get minute and device_id from record
record_time = record[0] # minute column
device_id = record[1] # device_id column
if record_time and device_id:
# Calculate minute delta
minute_delta = int((record_time - start_time).total_seconds() / 60)
if 0 <= minute_delta < arr_source.shape[1]:
# Calculate base index for this device
base_idx = device_to_index[device_id] * len(columns)
# Fill data for each sensor/measurement type
for col_name, col_offset in columns.items():
value = record[col_offset]
if value is not None: # Skip NULL values
row_idx = base_idx + list(columns.keys()).index(col_name)
arr_source[row_idx, minute_delta] = value
return arr_source
def CalcExtremes(arr_source, length, height):
"""
Calculate min and max values for each row within legal bounds.
Optimized version using numpy vectorized operations.
Parameters:
arr_source: numpy array of shape (height, length+4) containing data and bounds
length: number of data points to process (typically 1440 for minutes in a day)
height: number of rows in the array
Returns:
numpy array with min/max values stored in columns 1442 and 1443
"""
# Extract the data portion and bounds
data = arr_source[:, :length]
ignore_below = arr_source[:, 1440:1441] # Keep 2D shape for broadcasting
ignore_above = arr_source[:, 1441:1442] # Keep 2D shape for broadcasting
# Create masks for valid values
above_min_mask = data >= ignore_below
below_max_mask = data <= ignore_above
valid_mask = above_min_mask & below_max_mask
# Create a masked array to handle invalid values
masked_data = np.ma.array(data, mask=~valid_mask)
# Calculate min and max values for each row
row_mins = np.ma.min(masked_data, axis=1).filled(-0.001)
row_maxs = np.ma.max(masked_data, axis=1).filled(-0.001)
# Store results
arr_source[:, 1442] = row_mins
arr_source[:, 1443] = row_maxs
return arr_source
def plot(arr, filename="multiple_rows.png", title="1D Array Plot", figsize=(12, 6),
color='blue', style='line'):
"""
Plot a 1D numpy array as a line or scatter plot
Parameters:
arr : 1D numpy array
title : str, plot title
figsize : tuple, figure size in inches
color : str, line/point color
style : str, 'line' or 'scatter'
"""
title = filename
plt.figure(figsize=figsize)
x = np.arange(len(arr))
if style == 'line':
plt.plot(x, arr, color=color)
else: # scatter
plt.scatter(x, arr, color=color, alpha=0.6)
plt.title(title)
plt.xlabel('Index')
plt.ylabel('Value')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(filename)
plt.close()
print(f"Plot saved to: {filename}")
#plt.show()
def AddLimits(arr_source, devices_c, sensors_c, percentile):
for y in range(devices_c*sensors_c):
sensor_index = y % sensors_c
min_ok, max_ok, window = sensor_legal_values[s_table[sensor_index]]
if EnablePlot:
if (y == 33):
print("stop")
plot(arr_source[y, :1440], "before_clean_sensor.png")
if window > 2:
arr_source[y, :1440] = clean_data_vectorized(arr_source[y, :1440], window, percentile)
if EnablePlot:
if (y == 33):
print("stop")
plot(arr_source[y, :1440], "after_clean_sensor.png")
arr_source[y][1440] = min_ok
arr_source[y][1441] = max_ok
return arr_source
def clean_data_vectorized(data, window, percentile):
"""
Vectorized version of clean_data function using pure numpy
First removes zeros, then cleans outliers
Parameters:
data: numpy array of sensor readings
window: int, size of rolling window
percentile: float, percentile threshold for deviation filtering
"""
# Create a copy to avoid modifying original data
working_data = data.copy()
# Replace zeros with NaN
zero_mask = working_data == 0
working_data[zero_mask] = np.nan
# Create rolling window view of the data
def rolling_window(a, window):
shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
strides = a.strides + (a.strides[-1],)
return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
# Pad array for edge handling
pad_width = window // 2
padded = np.pad(working_data, pad_width, mode='edge')
# Create rolling windows
windows = rolling_window(padded, window)
# Calculate rolling median (ignoring NaN values)
medians = np.nanmedian(windows, axis=1)
# Forward/backward fill any NaN in medians
# Forward fill
mask = np.isnan(medians)
idx = np.where(~mask, np.arange(mask.shape[0]), 0)
np.maximum.accumulate(idx, out=idx)
medians[mask] = medians[idx[mask]]
# Backward fill any remaining NaNs
mask = np.isnan(medians)
idx = np.where(~mask, np.arange(mask.shape[0]), mask.shape[0] - 1)
idx = np.minimum.accumulate(idx[::-1])[::-1]
medians[mask] = medians[idx[mask]]
# Calculate deviations (ignoring NaN values)
deviations = np.abs(working_data - medians)
# Calculate threshold (ignoring NaN values)
threshold = np.nanpercentile(deviations, percentile)
# Create mask and replace outliers with median values
# Points are good if they're not NaN and deviation is within threshold
good_points = (~np.isnan(working_data)) & (deviations <= threshold)
# Replace all bad points (including zeros and outliers) with median values
result = np.where(good_points, working_data, medians)
return result
def process_chunk(args):
"""
Process a chunk of rows
"""
chunk, sensors_c, sensor_legal_values, s_table, window, percentile = args
result = np.copy(chunk)
# Process all time series in the chunk at once
result[:, :1440] = np.array([
clean_data_vectorized(row[:1440], window, percentile)
for row in chunk
])
# Set limits for all rows in chunk using vectorized operations
sensor_indices = np.arange(len(chunk)) % sensors_c
min_values = np.array([sensor_legal_values[s_table[i]][0] for i in sensor_indices])
max_values = np.array([sensor_legal_values[s_table[i]][1] for i in sensor_indices])
result[:, 1440] = min_values
result[:, 1441] = max_values
return result
def FillImage(scaled_day, devices_c, sensors_c, arr_stretched, group_by):
"""
Fill the stretched array with colored sensor data.
Parameters:
scaled_day: 2D array of shape (stripes, minutes+4) containing sensor readings
devices_c: number of devices
sensors_c: number of sensors per device
arr_stretched: 3D array of shape (stripes*stretch_by, minutes, 3) to fill with RGB values
Returns:
arr_stretched: Filled array with RGB values
"""
stripes = devices_c * sensors_c
stretch_by = arr_stretched.shape[0] // stripes
minutes = arr_stretched.shape[1]
# Create a boolean mask for VOC sensors
if group_by != "sensortype":
voc_rows = np.array([i for i in range(stripes) if int(i/devices_c) >= 5])
else:
voc_rows = np.array([i for i in range(stripes) if int(i % sensors_c) >= 5])
# Vectorize the BestColor function
vectorized_best_color = np.vectorize(BestColor)
# Process each row
for row in range(stripes):
row_data = scaled_day[row, :minutes] # Get minute data
#if row == 33:
# print("stop")
# plot(row_data, "row_data.png")
big_min = scaled_day[row, 1442] # min value
big_max = scaled_day[row, 1443] # max value
# Create mask for valid values
valid_mask = row_data != -0.001
# Initialize RGB row with zeros
rgb_row = np.zeros((minutes, 3), dtype=np.uint8)
if big_max > big_min:
# Scale factor
k = 1280/(big_max-big_min)
# Calculate normalized values
normalized_vals = k * (row_data - big_min)
# Invert if it's a VOC row
if row in voc_rows:
normalized_vals = 1279 - normalized_vals
# Apply valid mask
normalized_vals = np.where(valid_mask, normalized_vals, 0)
#if row == 33:
# plot(normalized_vals, "normalized_vals.png")
# Convert to RGB colors (vectorized)
r, g, b = vectorized_best_color(normalized_vals)
# Combine into RGB array
rgb_row[valid_mask] = np.stack([r[valid_mask],
g[valid_mask],
b[valid_mask]], axis=1)
else:
# Set to gray where valid
rgb_row[valid_mask] = 128
if group_by == "sensortype":
# Fill the stretched rows
sensor_index = row % sensors_c
device_index = int(row/sensors_c)
dest_row = sensor_index * devices_c + device_index #0-0, 1-
start_idx = dest_row * stretch_by
end_idx = start_idx + stretch_by
arr_stretched[start_idx:end_idx] = rgb_row
else:
# Fill the stretched rows
start_idx = row * stretch_by
end_idx = start_idx + stretch_by
arr_stretched[start_idx:end_idx] = rgb_row
return arr_stretched
def CreateMapFast(map_file, devices_list, selected_date, map_type, vocs_scaled, time_zone_s, radar_part, group_by):
global Id2MACDict
if radar_part == "s28":
radar_part = "(s2+s3+s4+s5+s6+s7+s8)/7"
try:
#stretch_to_min_max = True
#current_date_p = selected_date.replace("-", "_")
#current_date_s = selected_date
lower_than200 = 0
larger_than200 = 0
for details in devices_list[0]:
dev_id = details[0]
if dev_id < 200:
lower_than200 += 1
else:
larger_than200 += 1
if lower_than200 > 0 and larger_than200 > 0:
return
if larger_than200 > 0:
sensors_c = len(s_table)
else: #old sensors not supported
return False
devices_c = len(devices_list[0])
devices_list_str = ",".join(map(str, devices_list[1]))
image_file = map_file
time_from_str, time_to_str = GetLocalTimeForDate(selected_date, time_zone_s)
sql = f"""
SELECT
COALESCE(sr.minute, rr.minute) as minute,
COALESCE(sr.device_id, rr.device_id) as device_id,
sr.avg_temperature,
sr.avg_humidity,
sr.pressure_amplitude,
sr.max_light,
rr.radar,
sr.min_s0 as sensor_min_s0,
sr.min_s1 as sensor_min_s1,
sr.min_s2 as sensor_min_s2,
sr.min_s3 as sensor_min_s3,
sr.min_s4 as sensor_min_s4,
sr.min_s5 as sensor_min_s5,
sr.min_s6 as sensor_min_s6,
sr.min_s7 as sensor_min_s7,
sr.min_s8 as sensor_min_s8,
sr.min_s9 as sensor_min_s9
FROM (
SELECT
time_bucket('1 minute', time) AS minute,
device_id,
AVG(temperature) AS avg_temperature,
AVG(humidity) AS avg_humidity,
AVG(pressure) AS pressure_amplitude,
MAX(light) AS max_light,
MIN(s0) AS min_s0,
MIN(s1) AS min_s1,
MIN(s2) AS min_s2,
MIN(s3) AS min_s3,
MIN(s4) AS min_s4,
MIN(s5) AS min_s5,
MIN(s6) AS min_s6,
MIN(s7) AS min_s7,
MIN(s8) AS min_s8,
MIN(s9) AS min_s9
FROM
sensor_readings
WHERE
device_id IN ({devices_list_str})
AND time >= '{time_from_str}'
AND time < '{time_to_str}'
GROUP BY
minute,
device_id
) sr
FULL OUTER JOIN (
SELECT
time_bucket('1 minute', time) AS minute,
device_id,
MAX({radar_part}) AS radar
FROM
radar_readings
WHERE
device_id IN ({devices_list_str})
AND time >= '{time_from_str}'
AND time < '{time_to_str}'
GROUP BY
minute,
device_id
) rr
ON sr.minute = rr.minute AND sr.device_id = rr.device_id
ORDER BY
CASE COALESCE(sr.device_id, rr.device_id)
WHEN 571 THEN 1
WHEN 560 THEN 2
WHEN 572 THEN 3
WHEN 561 THEN 4
WHEN 559 THEN 5
WHEN 524 THEN 6
WHEN 562 THEN 7
END,
COALESCE(sr.minute, rr.minute);
"""
#print(sql)
#st = time.time()
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(sql)
day_data = cur.fetchall()#cur.fetchone()
#print(result)
if day_data == None:
return False
#print(time.time() - st)
#st = time.time()
#creating image
#print(time.time()-st)
stretch_by = 10
minutes = 1440
stripes = devices_c * sensors_c #2 for upper maxes, lower mins
arr_source_template = np.full((stripes, minutes+4), -0.001, dtype=float)
#array to be written as image
arr_stretched_template = np.zeros((int(stripes*stretch_by), minutes, 3), dtype=np.uint8) # 3 for RGB channels
#st = time.time()
arr_source = fast_fill_array_from_timescale(day_data, time_from_str, devices_list[1], arr_source_template, time_zone_s)
#plot(arr_source[33], "arr_source.png")
#print("@1:", time.time()-st)
#st = time.time()
#arr_source = process_data(arr_source, devices_c, sensors_c, window=5, percentile=99, use_threading=True)
arr_source = AddLimits(arr_source, devices_c, sensors_c, percentile=99)
#plot(arr_source[33][:1440], "arr_source2.png")
#print("@2:", time.time()-st)
#st = time.time()
#plot(arr_source[33], "arr_source3.png")
scaled_day = CalcExtremes(arr_source, minutes, stripes)
#plot(arr_source[33][:1440], "scaled_day.png")
#print("@@1", time.time()-st)
AddToLog(f"map_type={map_type}")
AddToLog(f"type(map_type)={type(map_type)}")
if True:#(int(map_type) == 1):
#st = time.time()
AddToLog(f"@@1")
arr_stretched = FillImage(scaled_day, devices_c, sensors_c, arr_stretched_template, group_by)
#print("@@2", time.time()-st)
SaveImageInBlob(image_file, arr_stretched)
return True
except Exception as e:
AddToLog(traceback.format_exc())
return False
def GetBlob(file_name):
"""
Retrieve image from blob storage
Args:
file_name (str): Name of the file to retrieve from blob storage
Returns:
tuple: (image_bytes, content_type)
Returns None, None if image not found or error occurs
"""
AddToLog(f"GetBlob({file_name})")
try:
# Get the object from blob storage
data = miniIO_blob_client.get_object(
DAILY_MAPS_BUCKET_NAME,
file_name
)
# Read the data into bytes
image_bytes = data.read()
AddToLog(f"len(image_bytes)={len(image_bytes)}")
return image_bytes, 'image/png'
except Exception as e:
AddToLog(f"Error: {traceback.format_exc()}")
logger.error(f"{traceback.format_exc()}")
return None, None
def MapFileToDate(map_file):
#'/Volumes/XTRM-Q/wellnuo/daily_maps/1/1_2023-11-07_dayly_image.png'
parts = map_file.split("/")
parts = parts[-1].split("_")
if "-" in parts[0]:
date_string = parts[0]
elif "-" in parts[1]:
date_string = parts[1]
date_object = datetime.datetime.strptime(date_string, "%Y-%m-%d")
date_only = date_object.date()
return date_only
def GetMACsListSimple(list_of_lists):
result = []
if len(list_of_lists) > 0:
result = [sublist[3] for sublist in list_of_lists]
return(result)
def datetime_handler(obj):
"""Handle datetime serialization for JSON"""
if isinstance(obj, datetime.datetime):
if obj.tzinfo:
return obj.isoformat()
return obj.strftime('%Y-%m-%d %H:%M:%S.%f')
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
def ReadCandles(file, sensor, period, time_from, time_to):
result = []
if sensor == "voc0":
sqlr = "SELECT * from vocs_0"+period+ " WHERE Date >= "+str(time_from) + " AND Date <= "+str(time_to)
elif sensor == "voc1":
sqlr = "SELECT * from vocs_1"+period+ " WHERE Date >= "+str(time_from) + " AND Date <= "+str(time_to)
elif sensor == "voc2":
sqlr = "SELECT * from vocs_2"+period+ " WHERE Date >= "+str(time_from) + " AND Date <= "+str(time_to)
elif sensor == "voc3":
sqlr = "SELECT * from vocs_3"+period+ " WHERE Date >= "+str(time_from) + " AND Date <= "+str(time_to)
elif sensor == "voc4":
sqlr = "SELECT * from vocs_4"+period+ " WHERE Date >= "+str(time_from) + " AND Date <= "+str(time_to)
elif sensor == "voc5":
sqlr = "SELECT * from vocs_5"+period+ " WHERE Date >= "+str(time_from) + " AND Date <= "+str(time_to)
elif sensor == "voc6":
sqlr = "SELECT * from vocs_6"+period+ " WHERE Date >= "+str(time_from) + " AND Date <= "+str(time_to)
elif sensor == "voc7":
sqlr = "SELECT * from vocs_7"+period+ " WHERE Date >= "+str(time_from) + " AND Date <= "+str(time_to)
elif sensor == "voc8":
sqlr = "SELECT * from vocs_8"+period+ " WHERE Date >= "+str(time_from) + " AND Date <= "+str(time_to)
elif sensor == "voc9":
sqlr = "SELECT * from vocs_9"+period+ " WHERE Date >= "+str(time_from) + " AND Date <= "+str(time_to)
else:
sqlr = "SELECT * from "+sensor+"s"+period+ " WHERE Date >= "+str(time_from) + " AND Date <= "+str(time_to)
logger.debug(f"sqlr = {sqlr}")
with get_db_connection() as conn:
with conn.cursor() as cur:
devices_string = ReadCleanStringDB(cur, sqlr)
result = QuerrySql(file, sqlr)
return result
def ReadSensor(device_id, sensor, time_from_epoch, time_to_epoch, data_type, radar_part):
time_utc = datetime.datetime.fromtimestamp(float(time_from_epoch), tz=timezone.utc)
# Format in ISO 8601 format with timezone
time_from_str = time_utc.strftime("%Y-%m-%d %H:%M:%S%z")
time_utc = datetime.datetime.fromtimestamp(float(time_to_epoch), tz=timezone.utc)
# Format in ISO 8601 format with timezone
time_to_str = time_utc.strftime("%Y-%m-%d %H:%M:%S%z")
legal_min, legal_max, window = sensor_legal_values[sensor]
result = []
if sensor == "radar":
if radar_part == "s28":
radar_part = "(s2+s3+s4+s5+s6+s7+s8)/7"
sqlr = f"SELECT time, {radar_part} AS radar FROM radar_readings WHERE device_id = {device_id} AND time >= '{time_from_str}' AND time <= '{time_to_str}' ORDER BY time ASC"
elif sensor[0] == "s":
sqlr = f"SELECT time, {sensor} AS smell FROM sensor_readings WHERE device_id = {device_id} AND {sensor} >= '{legal_min}' AND {sensor} <= '{legal_max}' AND time >= '{time_from_str}' AND time <= '{time_to_str}' ORDER BY time ASC"
else:
if sensor == "temperature":
sqlr = f"SELECT time, {sensor} - 16 from sensor_readings WHERE device_id = {device_id} AND {sensor} >= '{legal_min}' AND {sensor} <= '{legal_max}' AND time >= '{time_from_str}' AND time <= '{time_to_str}' ORDER BY time ASC"
else:
sqlr = f"SELECT time, {sensor} from sensor_readings WHERE device_id = {device_id} AND {sensor} >= '{legal_min}' AND {sensor} <= '{legal_max}' AND time >= '{time_from_str}' AND time <= '{time_to_str}' ORDER BY time ASC"
logger.debug(f"sqlr = {sqlr}")
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(sqlr)
result = cur.fetchall()
return result
def check_and_parse(data_str):
# Remove whitespace to handle cases with spaces
cleaned = data_str.strip()
# Check if second character is '['
is_list_of_lists = cleaned[1] == '['
if cleaned[0] == '[':
# Parse the string regardless of type
parsed = json.loads(cleaned)
else:
parsed = cleaned.split(",")
return is_list_of_lists, parsed
def clean_data_with_rolling_spline(line_part_t, window=5, threshold=2.0):
"""
Filter outliers using rolling median and replace with spline interpolation
Returns data in the same format as input: [(timestamp, value), ...]
"""
# Unzip the input tuples
x, y = zip(*line_part_t)
x = np.array(x)
y = np.array(y, dtype=float) # explicitly convert to float
# Calculate rolling median and MAD using a safer approach
rolling_median = []
rolling_mad = []
for i in range(len(y)):
start_idx = max(0, i - window//2)
end_idx = min(len(y), i + window//2 + 1)
window_values = y[start_idx:end_idx]
# Skip if window is empty or contains invalid values
if len(window_values) == 0 or np.any(np.isnan(window_values)):
rolling_median.append(y[i])
rolling_mad.append(0)
continue
med = np.median(window_values)
mad = np.median(np.abs(window_values - med))
rolling_median.append(med)
rolling_mad.append(mad)
rolling_median = np.array(rolling_median)
rolling_mad = np.array(rolling_mad)
# Identify outliers (protect against division by zero)
outlier_mask = np.abs(y - rolling_median) > threshold * (rolling_mad + 1e-10)
good_data_mask = ~outlier_mask
if np.sum(good_data_mask) < 4:
return line_part_t # return original data if we can't interpolate
try:
# Create and apply spline
spline = interpolate.InterpolatedUnivariateSpline(
x[good_data_mask],
y[good_data_mask],
k=3
)
y_cleaned = y.copy()
y_cleaned[outlier_mask] = spline(x[outlier_mask])
except Exception as e:
print(f"Spline interpolation failed: {e}")
return line_part_t
# Return in the same format as input
return list(zip(x, y_cleaned))
def clean_data_with_spline(x, y, threshold=2.0):
"""
Filter outliers and replace with spline interpolation
Parameters:
x : array-like, timestamps or x-coordinates
y : array-like, values to be filtered
threshold : float, number of median absolute deviations for outlier detection
Returns:
array-like : cleaned data with outliers replaced by spline interpolation
"""
# Convert inputs to numpy arrays
x = np.array(x)
y = np.array(y)
# Calculate median and median absolute deviation
median = np.median(y)
mad = stats.median_abs_deviation(y)
# Identify outliers
outlier_mask = np.abs(y - median) > threshold * mad
good_data_mask = ~outlier_mask
# If we have too few good points for interpolation, adjust threshold
min_points_needed = 4 # minimum points needed for cubic spline
if np.sum(good_data_mask) < min_points_needed:
return y # return original data if we can't interpolate
# Create spline with non-outlier data
spline = interpolate.InterpolatedUnivariateSpline(
x[good_data_mask],
y[good_data_mask],
k=3 # cubic spline
)
# Replace outliers with interpolated values
y_cleaned = y.copy()
y_cleaned[outlier_mask] = spline(x[outlier_mask])
return y_cleaned
def clean_data(line_part_t, window=5, threshold=2.0):
"""
Remove obvious outliers based on window comparison
Returns cleaned data in the same format: [(timestamp, value), ...]
"""
if len(line_part_t) < window:
return line_part_t
x, y = zip(*line_part_t)
x = np.array(x)
y = np.array(y, dtype=float)
cleaned_data = []
for i in range(len(y)):
# Get window around current point
start_idx = max(0, i - window//2)
end_idx = min(len(y), i + window//2 + 1)
window_values = y[start_idx:end_idx]
# Calculate median and MAD for the window
window_median = np.median(window_values)
deviation = abs(y[i] - window_median)
# Keep point if it's not too far from window median
if deviation <= threshold * window_median:
cleaned_data.append((x[i], y[i]))
#else:
#print(window_values)
return cleaned_data
def clean_data_fast(line_part_t, window=5, threshold=2.0):
"""
Remove obvious outliers based on window comparison - vectorized version
Returns cleaned data in the same format: [(timestamp, value), ...]
"""
if len(line_part_t) < window:
return line_part_t
x, y = zip(*line_part_t)
x = np.array(x)
y = np.array(y, dtype=float)
# Calculate rolling median using numpy
half_window = window // 2
medians = np.array([
np.median(y[max(0, i-half_window):min(len(y), i+half_window+1)])
for i in range(len(y))
])
# Calculate deviations for all points at once
deviations = np.abs(y - medians)
# Create mask for good points
good_points = deviations <= threshold * medians
# Return filtered data using boolean indexing
return list(zip(x[good_points], y[good_points]))
def clean_data_pd(line_part_t, window=5, percentile=99):
"""
Remove obvious outliers based on window comparison - pandas version
Returns cleaned data in the same format: [(timestamp, value), ...]
"""
#line_part_t = line_part_t[2000:2100]
if len(line_part_t) < window:
return line_part_t
x, y = zip(*line_part_t)
# Create pandas Series and calculate rolling median
series = pd.Series(y)
medians = series.rolling(window=window, center=True, min_periods=1).median()
# Calculate deviations
deviations = np.abs(series - medians)
largest_deviations = deviations.nlargest(10)
#print(largest_deviations)
# Create mask for good points
deviation_threshold = np.percentile(deviations, percentile)
good_points = deviations <= deviation_threshold
# Convert back to numpy arrays for filtering
x = np.array(x)
y = np.array(y)
# Return filtered data
return list(zip(x[good_points], y[good_points]))
def add_boundary_points(line_part_t, time_zone):
"""
Add boundary points (00:00:00 and 23:59:59) to a time series list.
Args:
line_part_t: List of tuples (timestamp, value)
time_zone: String representing the timezone (e.g., "America/Los_Angeles")
Returns:
List of tuples with added boundary points
"""
if not line_part_t:
return line_part_t
tz = pytz.timezone(time_zone)
# Get the date from the first point
first_dt = datetime.datetime.fromtimestamp(line_part_t[0][0], tz)
date = first_dt.date()
# Create datetime objects for start and end of the day
start_dt = tz.localize(datetime.datetime.combine(date, datetime.datetime.min.time()))
end_dt = tz.localize(datetime.datetime.combine(date, datetime.datetime.max.time()))
# Convert to timestamps
start_ts = start_dt.timestamp()
end_ts = end_dt.timestamp()
result = list(line_part_t)
# Handle start point (00:00:00)
first_point_dt = datetime.datetime.fromtimestamp(line_part_t[0][0], tz)
time_diff = first_point_dt - start_dt
start_value = line_part_t[0][1]
# Add start point at the beginning
result.insert(0, (start_ts, start_value))
# Handle end point (23:59:59)
last_point_dt = datetime.datetime.fromtimestamp(line_part_t[-1][0], tz)
end_value = line_part_t[-1][1]
# Add end point
result.append((end_ts, end_value))
return result
class well_api:
def on_get_healthz(self, req, resp):
resp.status = HTTP_200
resp.content_type = falcon.MEDIA_TEXT
resp.text = "OK"
def on_get(self, req, resp, path=""):
logger.debug(f"Sent variables: {req.params}")
if path == "" or path == "/":
#resp.media = package_response("Log-Out", HTTP_401)
#return
blob_data = read_file("well_portal.html")
resp.content_type = "text/html"
resp.text = blob_data
return
elif path == "favicon.ico":
favicon_path = "favicon.ico"
if os.path.isfile(favicon_path):
resp.content_type = 'image/x-icon'
resp.data = read_file(favicon_path, type_ = "BIN")
resp.status = falcon.HTTP_200
else:
resp.status = falcon.HTTP_404
name = req.params.get('name')
if name == "deployment_add":
token = req.params.get('token')
user_info = verify_token(token)
if user_info == None:
resp.media = package_response("Log-Out", HTTP_401)
return
user_id = req.params.get('user_id')
blob_data = read_file("edit_deployment.html")
caretaker = {'deployment_id': 0, 'beneficiary_id': user_id, 'caretaker_id': user_id, 'owner_id': user_id, 'installer_id': user_id, 'user_id': 0, 'role_ids': '2', 'access_to_deployments': '', 'email': '', 'user_name': '', 'first_name': '', 'last_name': '', 'address_street': '', 'address_city': '', 'address_zip': '', 'address_state': '', 'address_country': '', 'phone_number': '', 'picture': '/', 'key': ''}
blob_data = FillFields(blob_data, caretaker, 1)
resp.content_type = "text/html"
resp.text = blob_data
return
elif name == "devices_list":
user_id = req.params.get('user_id')
token = req.params.get('token')
user_info = verify_token(token)
if user_info == None:
resp.media = package_response("Log-Out", HTTP_401)
return
privileges = GetPriviledgesOnly(user_id)
first_s = req.params.get('first')
last_s = req.params.get('last')
try:
first = int(first_s)
except ValueError:
first = 0
try:
last = int(last_s)
except ValueError:
last = 1000000
blob_data = read_file("my_devices.html")
devices = GetVisibleDevices(privileges)
users = GetUsersFromDeployments(privileges)
blob_data = UpdateDevicesTable(blob_data, devices, users)
blob_data = UpdateDeploymentsSelector(blob_data, users)
resp.content_type = "text/html"
resp.text = blob_data
return
elif name == "deployment_edit":
deployment_id = req.params.get('deployment_id')
token = req.params.get('token')
user_info = verify_token(token)
if user_info == None:
resp.media = package_response("Log-Out", HTTP_401)
return
blob_data = read_file("edit_deployment.html")
deployment = DeploymentDetails(deployment_id)
#blob_data = blob_data.decode("utf-8")
blob_data = FillFields(blob_data, deployment, 1)
resp.content_type = "text/html"
resp.text = blob_data
return
elif name == "caretaker_add":
token = req.params.get('token')
user_info = verify_token(token)
if user_info == None:
resp.media = package_response("Log-Out", HTTP_401)
return
blob_data = read_file("edit_caretaker.html")
caretaker = {'user_id': 0, 'role_ids': '2', 'access_to_deployments': '', 'email': '', 'user_name': '', 'first_name': '', 'last_name': '', 'address_street': '', 'address_city': '', 'address_zip': '', 'address_state': '', 'address_country': '', 'phone_number': '', 'picture': '/', 'key': ''}
blob_data = FillFields(blob_data, caretaker, 1)
resp.content_type = "text/html"
resp.text = blob_data
return
elif name == "caretaker_edit":
user_id = req.params.get('user_id')
token = req.params.get('token')
user_info = verify_token(token)
if user_info == None:
resp.media = package_response("Log-Out", HTTP_401)
return
blob_data = read_file("edit_caretaker.html")
caretaker = UserDetails(user_id)
#blob_data = blob_data.decode("utf-8")
blob_data = FillFields(blob_data, caretaker, 1)
resp.content_type = "text/html"
resp.text = blob_data
return
elif name == "beneficiary_edit":
user_id = req.params.get('user_id')
token = req.params.get('token')
user_info = verify_token(token)
if user_info == None:
resp.media = package_response("Log-Out", HTTP_401)
return
blob_data = read_file("edit_beneficiary.html")
beneficiary = UserDetails(user_id)
#blob_data = blob_data.decode("utf-8")
blob_data = FillFields(blob_data, beneficiary, 1)
resp.content_type = "text/html"
resp.text = blob_data
return
elif name == "beneficiary_add":
token = req.params.get('token')
user_info = verify_token(token)
if user_info == None:
resp.media = package_response("Log-Out", HTTP_401)
return
user_id = req.params.get('user_id')
blob_data = read_file("edit_beneficiary.html")
beneficiary = {'user_id': 0, 'role_ids': '1', 'access_to_deployments': '', 'email': '', 'user_name': '', 'first_name': '', 'last_name': '', 'address_street': '', 'address_city': '', 'address_zip': '', 'address_state': '', 'address_country': '', 'phone_number': '', 'picture': '/', 'key': ''}
blob_data = FillFields(blob_data, beneficiary, 1)
resp.content_type = "text/html"
resp.text = blob_data
return
elif name == "get_image_file":
#image represents day in local time
token = req.params.get('token')
user_info = verify_token(token)
if user_info == None:
resp.media = package_response("Log-Out", HTTP_401)
return
deployment_id = req.params.get('deployment_id')
time_zone_s = GetTimeZoneOfDeployment(deployment_id)
ddate = req.params.get("date")
ddate = ddate.replace("_","-")
group_by = req.params.get("group_by")
timee = StringToEpoch(ddate, time_zone_s)
force_recreate = req.params.get("re_create") == "true"
radar_part = req.params.get("radar_part")
map_type = req.params.get("map_type")
filename = f"/{deployment_id}/{ddate}_{group_by}_{radar_part}_{map_type}_dayly_image.png"
#print(check_file_exists(filename))
if not force_recreate:
file_exists, time_modified_utc = check_file_exists(filename)
if file_exists:
time_modified_local = time_modified_utc.astimezone(pytz.timezone(time_zone_s))
time_modified_date = time_modified_local.date()
file_date = MapFileToDate(filename)
if time_modified_date <= file_date:
force_recreate = True
else:
force_recreate = True
#ddate is in Local Time
timee = LocalDateToUTCEpoch(ddate, time_zone_s)+5 #add so date boundary is avoided
#time that describes new devices in deployment_history is in UTC therefore timee is in UTC
devices_list = GetProximityList(deployment_id, timee)
st = time.time()
vocs_scaled = {}
#file_date is in Local time, so we are comparing that and current Local (to install) Date
if force_recreate:
st = time.time()
vocs_scaled = {}
stored = CreateMapFast(filename, devices_list, ddate, map_type, vocs_scaled, time_zone_s, radar_part, group_by) #"[bit] 1=same sensors together, 2=same device together, 4=1 der, 8=2 der
if stored != True:
AddToLog("Map not created")
#logger.warning("Map not created")
resp.media = package_response("Map not created", HTTP_401)
return
else:
AddToLog("Map created")
#print(time.time() - st)
#lets read and send image from blob
image_bytes, content_type = GetBlob(filename)
if debug:
AddToLog("@1")
resp.media = package_response(f'Log: {debug_string}', HTTP_200)
else:
if image_bytes is None:
raise falcon.HTTPNotFound(
title='Image not found',
description=f'Image {filename} could not be found or retrieved'
)
# Set response content type and body
resp.content_type = content_type
resp.data = image_bytes
resp.status = falcon.HTTP_200
return
resp.media = package_response("Use POST method for this endpoint", HTTP_400)
def on_post(self, req, resp, path=""):
logger.debug(f"Request method: {req.method}")
logger.debug(f"Request path: {req.path}")
logger.debug(f"Request query string: {req.query_string}")
logger.debug(f"Request headers: {req.headers}")
logger.debug(f"Request content type: {req.content_type}")
# Use Falcon's built-in parsing
if req.content_type == falcon.MEDIA_URLENCODED:
form_data = req.media
elif req.content_type == falcon.MEDIA_JSON:
form_data = req.media
else:
form_data = {}
logger.debug(f"Parsed form data: {form_data}")
# Extract parameters
try:
function = form_data.get('function')
if function != "credentials":
token = form_data.get('token')
user_info = verify_token(token)
if user_info == None:
resp.media = package_response("Log-Out", HTTP_401)
return
username = user_info['username']
with get_db_connection() as db_conn:
privileges = GetPriviledgesOnly(username)
if function == "credentials":
username = form_data.get('user_name')
if not username:
resp.media = package_response("Missing 'user_name' parameter", HTTP_400)
return
ps = form_data.get('ps')
if username == MASTER_ADMIN and ps == MASTER_PS:
access_token = generate_token(username)
privileges, user_id = ValidUser(username, ps)
privileges = "-1"
else:
#lets check for real
privileges, user_id = ValidUser(username, ps)
if privileges == "0":
access_token = 0
privileges = 0
else:
access_token = generate_token(username)
token_payload = {'access_token': access_token, 'privileges': privileges, 'user_id': user_id}
resp.media = package_response(token_payload)
resp.status = falcon.HTTP_200
return
elif function == "request_single_slice":
deployment_id = form_data.get('deployment_id')
time_zone_s = GetTimeZoneOfDeployment(deployment_id)
selected_date = form_data.get('date')
devices_list = form_data.get('devices_list')
#devices_list = '[267,560,"?",null,"64B70888F6F0"]'
#devices_list = '[[267,560,"?",null,"64B70888F6F0"],[268,561,"?",null,"64B70888F6F1"]]'
sensor_list_loc = [form_data.get('sensor_list')]
is_nested, device_details = check_and_parse(devices_list)
if not is_nested:
device_ids_list = [device_details[1]]
well_ids_list = [device_details[0]]
else:
device_ids_list = list(map(lambda x: x[1], device_details))
well_ids_list =list(map(lambda x: x[0], device_details))
data_type = form_data.get('data_type')
epoch_from_utc, epoch_to_utc = GetLocalTimeEpochsForDate(selected_date, time_zone_s) #>= #<
#epoch_to = '1730592010' #smal sample to test
radar_part = form_data.get('radar_part')
well_id = well_ids_list[0]
all_slices = {}
for device_id in device_ids_list:
sensor_data = {}
for sensor in sensor_list_loc:
st = time.time()
line_part = ReadSensor(device_id, sensor, epoch_from_utc, epoch_to_utc, data_type, radar_part)
window = sensor_legal_values[sensor][2]
#print("@1", time.time() - st)
#first = 3300
#last = 3400
#line_part = line_part[first:last]
line_part_t = []
#st = time.time()
#line_part_t = [tuple(x[:2]) for x in line_part]
#print(time.time() - st)
#st = time.time()
#line_part_t = list({(dt.timestamp(), value) for dt, value in line_part})
#print(time.time() - st)
line_part_t = [(x[0].timestamp(), x[1]) for x in line_part]
st = time.time()
cleaned_values_t = clean_data_pd(line_part_t, window=window, percentile=99)
cleaned_values = add_boundary_points(cleaned_values_t, time_zone_s)
#print("@2", time.time() - st)
#Lets add point in minute 0 and minute 1439
#st = time.time()
#cleaned_values = clean_data_fast(line_part_t, window=5, threshold=2.0)
#print("@3", time.time() - st)
sensor_data[sensor] = cleaned_values
all_slices[device_id] = sensor_data
dataa = {}
dataa['Function'] = "single_slicedata"
dataa['MACs_list'] = device_ids_list
dataa['all_slices'] = all_slices
dataa['well_id'] = well_id
resp.media = package_response(dataa)
resp.status = falcon.HTTP_200
#return
elif function == "get_deployment":
blob_data = read_file("deployment.html")
deployment_id = form_data.get('deployment_id')
#lets update "Deployments" select
users = GetUsersFromDeployments(privileges)
blob_data = UpdateDeploymentsSelector(blob_data, users, False, deployment_id)
resp.content_type = "text/html"
resp.text = blob_data
return
elif function == "request_deployment_map_new":
deployment_id = form_data.get('deployment_id')
maps_dates, positions_list = GetDeploymentDatesBoth(deployment_id)
datee = form_data.get('date')
if maps_dates != []:
if datee == "2022-4-2": #that one is default in HTML so disregard
datee = maps_dates[0]
locations_desc_map = {}
for details in positions_list:
well_id = details[0]
location = details[2]
if details[3] != None:
location = location +" "+ details[3]
MAC = details[4]
locations_desc_map[well_id] = location
dataa = {}
dataa['Function'] = "deployments_maps_report"
dataa['proximity'] = positions_list
maps_dates.sort(reverse = True)
dataa['maps_dates'] = maps_dates
dataa['device_count'] = len(positions_list)
#MACs_list = GetMACsListSimple(positions_list)
#MACs_map = {}
#for details in positions_list:
# id = details[0]
# MAC = details[3]
# MACs_map[id] = MAC
#for i in range(len(MACs_list)):
# MACs_map[devices_list[i]] = MACs_list[i][0]
id = positions_list[0][0]
#dataa['MACs_map'] = MACs_map
dataa['locations_desc_map'] = locations_desc_map
#proximity_list = proximity.split(",")
if id < 200:
checkmarks_string = 'T>\n'
checkmarks_string = checkmarks_string + 'H>\n'
checkmarks_string = checkmarks_string + 'P>\n'
checkmarks_string = checkmarks_string + 'C>\n'
checkmarks_string = checkmarks_string + 'V>\n'
checkmarks_string = checkmarks_string + 'L>\n'
checkmarks_string = checkmarks_string + 'R>
'
else: #>200 = ["Temperature", "Humidity", "Pressure", "Light", "Radar", "VOC"]
checkmarks_string = 'T>\n'
checkmarks_string = checkmarks_string + 'H>\n'
checkmarks_string = checkmarks_string + 'P>\n'
checkmarks_string = checkmarks_string + 'L>\n'
checkmarks_string = checkmarks_string + 'R>\n'
checkmarks_string = checkmarks_string + 'S0>\n'
checkmarks_string = checkmarks_string + 'S1>\n'
checkmarks_string = checkmarks_string + 'S2>\n'
checkmarks_string = checkmarks_string + 'S3>\n'
checkmarks_string = checkmarks_string + 'S4>\n'
checkmarks_string = checkmarks_string + 'S5>\n'
checkmarks_string = checkmarks_string + 'S6>\n'
checkmarks_string = checkmarks_string + 'S7>\n'
checkmarks_string = checkmarks_string + 'S8>\n'
checkmarks_string = checkmarks_string + 'S9>
'
checked_or_not = " checked"
for index in range(len(positions_list)):
details = positions_list[index]
device_id = details[0]
location = details[2]
if details[3] != None:
location = location + " "+details[2]
checkmarks_string = checkmarks_string + str(device_id) + '>\n'
checked_or_not = ''
dataa['checkmarks'] = checkmarks_string
resp.media = package_response(dataa)
resp.status = falcon.HTTP_200
elif function == "request_proximity":
deployment = form_data.get('deployment_id')
timee = form_data.get('time')
#timee = StringToEpoch(datee)
#print(deployment, timee)
well_ids, device_ids = GetProximityList(deployment, timee)
#print(proximity)
dataa = {}
dataa['Function'] = "proximity_report"
if len(well_ids) > 0:
dataa['proximity'] = well_ids
else:
dataa['proximity'] = []
resp.media = package_response(dataa)
resp.status = falcon.HTTP_200
elif function == "request_devices":
deployment_id = form_data.get('deployment_id')
group_id = form_data.get('group_id')
location = form_data.get('location')
is_fresh = form_data.get('is_fresh')
matching_devices = GetMatchingDevices(privileges, group_id, deployment_id, location)
dataa = {}
dataa['Function'] = "devices_report"
if len(matching_devices) > 0:
dataa['devices'] = matching_devices
else:
dataa['devices'] = []
resp.media = package_response(dataa)
resp.status = falcon.HTTP_200
elif function == "get_raw_data":
container = GetReference("/MAC")
MAC = req_dict["MAC"][0]
sensor = req_dict["sensor"][0]
if "part" in req_dict:
part = req_dict["part"][0]
else:
part = ""
from_time = req_dict["from_time"][0]
to_time = req_dict["to_time"][0]
timezone_str = req_dict["tzone"][0]
AddToLog("get_raw_data:" + str(MAC) +","+ str(sensor) + "," + str(from_time) + "," + str(to_time) + "," + part+ "," + timezone_str)
#raw_data = GetRawSensorData(container, MAC, sensor, from_time, to_time, timezone_str)
raw_data = GetRawSensorDataFromBlobStorage(MAC, sensor, part, from_time, to_time, timezone_str)
data_payload = {'raw_data': raw_data}
resp.media = package_response(data_payload)
resp.status = falcon.HTTP_200
return
elif function == "get_candle_data":
container = GetReference("/MAC")
MAC = req_dict["MAC"][0]
sensor = req_dict["sensor"][0]
from_time = req_dict["from_time"][0]
to_time = req_dict["to_time"][0]
part = req_dict["part"][0]
tzone = req_dict["tzone"][0]
AddToLog(str(req_dict))
candle_data = GetCandleSensorData(container, MAC, sensor, from_time, to_time, part, tzone)
data_payload = {'candle_data': candle_data}
resp.media = package_response(data_payload)
resp.status = falcon.HTTP_200
return
elif function == "deployment_form":
editing_deployment_id = form_data.get('editing_deployment_id')
ok = StoreDeployment2DB(form_data, editing_deployment_id)
if ok == 1:
payload = {'ok': ok}
resp.media = package_response(payload)
resp.status = falcon.HTTP_200
return
else:
payload = {'ok': ok, 'error': debug_string}
resp.media = package_response(payload)
resp.status = falcon.HTTP_200
return
elif function == "deployment_delete":
ok = DeleteRecordFromDB(form_data)
payload = {'ok': ok}
resp.media = package_response(payload)
resp.status = falcon.HTTP_200
return
elif function == "deployments_list":
result_list = []
first_s = form_data.get('first')
last_s = form_data.get('last')
try:
first = int(first_s)
except ValueError:
first = 0
try:
last = int(last_s)
except ValueError:
last = 1000000
user_id = form_data.get('user_id')
all_deployments = ListDeployments(privileges, user_id)
cnt = 0
for deployment in all_deployments:
cnt += 1
if cnt >= first:
caretaker_min_object = {"deployment_id": deployment['deployment_id'], "email": user_id_2_user[deployment['beneficiary_id']][3], "first_name": user_id_2_user[deployment['beneficiary_id']][5], "last_name": user_id_2_user[deployment['beneficiary_id']][6]}
result_list.append(caretaker_min_object)
if cnt > last:
break
payload = {'result_list': result_list}
resp.media = package_response(payload)
resp.status = falcon.HTTP_200
return
elif function == "caretaker_form":
editing_user_id = form_data.get('editing_user_id')
email = form_data.get('email')
if "@" in email:
ok = StoreCaretaker2DB(form_data, editing_user_id)
if ok == 1:
payload = {'ok': ok}
resp.media = package_response(payload)
resp.status = falcon.HTTP_200
return
else:
payload = {'ok': ok, 'error': debug_string}
resp.media = package_response(payload)
resp.status = falcon.HTTP_200
return
else:
resp.media = package_response("Missing or illegal 'email' parameter", HTTP_400)
return
elif function == "caretaker_delete":
if privileges == "-1":
ok = DeleteRecordFromDB(form_data)
else:
ok = 0
AddToLog(ok)
payload = {'ok': ok}
resp.media = package_response(payload)
resp.status = falcon.HTTP_200
return
elif function == "caretakers_list":
result_list = []
first_s = form_data.get('first')
last_s = form_data.get('last')
try:
first = int(first_s)
except ValueError:
first = 0
try:
last = int(last_s)
except ValueError:
last = 1000000
if privileges == "-1":
all_caretakers = ListCaretakers()
cnt = 0
for caretaker in all_caretakers:
cnt += 1
if cnt >= first:
caretaker_min_object = {"user_id": caretaker[0], "email": caretaker[3], "first_name": caretaker[5], "last_name": caretaker[6]}
result_list.append(caretaker_min_object)
if cnt > last:
break
payload = {'result_list': result_list}
resp.media = package_response(payload)
resp.status = falcon.HTTP_200
return
elif function == "beneficiary_form":
editing_user_id = form_data.get('editing_user_id')
email = form_data.get('email')
if "@" in email:
ok = StoreBeneficiary2DB(form_data, editing_user_id)
if ok == 1:
payload = {'ok': ok}
resp.media = package_response(payload)
resp.status = falcon.HTTP_200
return
else:
payload = {'ok': ok, 'error': debug_string}
resp.media = package_response(payload)
resp.status = falcon.HTTP_200
return
else:
payload = {'ok': ok}
resp.media = package_response(payload)
resp.status = falcon.HTTP_200
return
elif function == "beneficiary_delete":
ok = DeleteRecordFromDB(form_data)
payload = {'ok': ok}
resp.media = package_response(payload)
resp.status = falcon.HTTP_200
return
elif function == "beneficiaries_list":
result_list = []
first_s = form_data.get('first')
last_s = form_data.get('last')
try:
first = int(first_s)
except ValueError:
first = 0
try:
last = int(last_s)
except ValueError:
last = 1000000
user_id = form_data.get('user_id')
all_beneficiaries = ListBeneficiaries(privileges, user_id)
cnt = 0
for beneficiary in all_beneficiaries:
cnt += 1
if cnt >= first:
beneficiary_min_object = {"user_id": beneficiary[0], "email": beneficiary[3], "first_name": beneficiary[5], "last_name": beneficiary[6]}
result_list.append(beneficiary_min_object)
if cnt > last:
break
payload = {'result_list': result_list}
resp.media = package_response(payload)
resp.status = falcon.HTTP_200
return
elif function == "dashboard_list":
AddToLog(req_dict)
caretaker = req_dict["user"][0]
all_beneficiaries = ListBeneficiariesOfCaretaker(caretaker)
AddToLog(all_beneficiaries)
result_list = []
for beneficiary in all_beneficiaries:
details = GetDetails(beneficiary)
#"[[104, 'Living Room', '', '64B708890988'], [106, 'Bathroom', '', '64B708890298'], [101, 'Bedroom', '', '64B7088902B8'], [130, 'Dining Room', '', '64B7088905C8'], [107, 'Bathroom', 'Malo ', '4C75259783E0'], [102, 'Kitchen', '', '308398C72E58'], [141, 'Office', '', '64B70888FB50']]"
#lets determine where person is now
devices_string = details["devices"]
devices_list = ast.literal_eval(devices_string)
timezone_str = details["tzone"]
timezone_object = pytz.timezone(timezone_str)
latest_presence = ""
latest_MAC = ""
temperature_offset = 16
bathroom_presence = ""
kitchen_presence = ""
for device in devices_list:
if len(device) > 3:
MAC = device[3]
this_presence, temp_offset = GetLastPresent(MAC)
if latest_presence == "":
if this_presence != "":
latest_presence = this_presence
latest_MAC = MAC
if "bathroom" in device[1].lower():
bathroom_presence = this_presence
if "kitchen" in device[1].lower():
kitchen_presence = this_presence
if latest_presence != "":
AddToLog("@3 ")
AddToLog(this_presence)
AddToLog(latest_presence)
#this_time = datetime.datetime.strptime(this_presence, "%Y-%m-%d %H:%M:%S.%f", tzinfo=timezone_object)
this_time = strptime_tz(this_presence, timezone_str, "%Y-%m-%d %H:%M:%S.%f")
#latest_time = datetime.datetime.strptime(latest_presence, "%Y-%m-%d %H:%M:%S.%f", tzinfo=timezone_object)
latest_time = strptime_tz(latest_presence, timezone_str, "%Y-%m-%d %H:%M:%S.%f")
AddToLog(this_time)
AddToLog(latest_time)
if this_time > latest_time:
AddToLog("@4")
latest_presence = this_presence
latest_MAC = MAC
temperature_offset = temp_offset
AddToLog("latest_presence = ")
AddToLog(MAC+" "+latest_presence)
#lets get latest readings for latest_MAC
if latest_MAC != "":
device_dict = GetSensorsReads(latest_MAC)
#get latest read of temperature, VOC, CO2
details["temperature"] = device_dict["temperature"]["last_data"]
#apply temp offsett
details["temperature"]["value"] = details["temperature"]["value"] - temperature_offset
details["smell"] = {"timestamp": 1722198473.330856,"value": "coffee","intensity": 54} #device_dict["last_data"]
details["sleep"] = 7 + random.uniform(-2, 2) #todo, determine real value
details["last_bathroom"] = bathroom_presence
details["last_kitchen"] = kitchen_presence
details["HomeIcon"] = "air_green"
details["BathIcon"] = "bath_yellow"
details["BedIcon"] = "bed_red"
details["FoodIcon"] = "food_green"
user = details
result_list.append(user)
payload = {'result_list': result_list}
resp.media = package_response(payload)
resp.status = falcon.HTTP_200
AddToLog(payload)
return
else:
AddToLog("Error: function not recognized!")
payload = {'ok': ok, 'error': debug_string}
resp.media = package_response(payload)
resp.status = falcon.HTTP_200
return
except Exception as e:
AddToLog(traceback.format_exc())
resp.media = package_response(f"Error: {str(e)} {traceback.format_exc()}", HTTP_500)
class RootResource:
def on_get(self, req, resp):
resp.media = {
"message": "Hello from OpenFaaS Serverless Web Server!",
"method": "GET"
}
def on_post(self, req, resp):
data = json.load(req.bounded_stream)
resp.media = {
"message": "Received POST request",
"data": data
}
class CatchAllResource:
def on_get(self, req, resp, path):
resp.media = {
"message": f"Path: /{path}",
"method": "GET"
}
def on_post(self, req, resp, path):
resp.media = {
"message": f"Path: /{path}",
"method": "POST"
}
app = falcon.App()
# New routes for well_api
app.add_route('/function/well-api', well_api())
app.add_route('/function/well-api/{path}', well_api())
app.add_route('/api/well_api', well_api())
app.add_route('/api/well_api/{path}', well_api())
app.add_route('/healthz', well_api(), suffix='healthz')
# Keep the original routes for backward compatibility
app.add_route('/', well_api())
app.add_route('/{path}', well_api())
if __name__ == '__main__':
from wsgiref.simple_server import make_server
with make_server('', 8000, app) as httpd:
logger.debug('Serving on port 8000...')
httpd.serve_forever()