well-api/sample-alert.py
2025-06-15 18:39:43 -07:00

231 lines
7.0 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import redis
import psycopg2
from dotenv import load_dotenv
import ast
import time
load_dotenv()
DB_NAME = os.getenv('DB_NAME')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST')
DB_PORT = os.getenv('DB_PORT')
# Connect to Redis (assuming it's running on localhost with default port)
r = redis.Redis(host='localhost', port=6379, db=0)
def GetRedisString(key_name):
try:
result = r.get(key_name).decode('utf-8')
except:
result = None
return result
def GetRedisInt(key_name):
try:
result = int(r.get(key_name).decode('utf-8'))
except:
result = None
return result
def GetRedisFloat(key_name):
try:
result = float(r.get(key_name).decode('utf-8'))
except:
result = None
return result
value = GetRedisFloat('lastseen_510')
print(value)
def get_db_connection():
return psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT)
def GetLastPointQuery(device_id, field_name, threshold_value):
"""
Generate a SQL query to find the last point in radar_readings where the specified field
meets the threshold condition.
Parameters:
device_id (int): The device ID to filter by
field_name (str): The field to check, e.g., "s2_max", "s28_min", etc.
threshold_value (float): The threshold value to compare against
Returns:
str: SQL query string
"""
# Parse the field name to get base field and operation
parts = field_name.split('_')
base_field = parts[0]
operation = parts[1] if len(parts) > 1 else None
# Define the field expression based on base_field
if base_field == 's28':
field_expr = "(s2+s3+s4+s5+s6+s7+s8)/7"
elif base_field == 'm08':
field_expr = "(m0+m1+m2+m3+m4+m5+m6+m7+m8)/9"
else:
field_expr = base_field
# Define comparison operator based on operation
operator = ">" if operation == "max" else "<"
# Generate the SQL query
query = f"""
SELECT "time" AS point_time, {field_expr} AS point_value FROM radar_readings WHERE device_id = {device_id}
AND {field_expr} {operator} {threshold_value} ORDER BY "time" DESC LIMIT 1;
"""
return query
def GetLastDetected(device_id, field_name, threshold_value):
# Example usage:
query = GetLastPointQuery(device_id, field_name, threshold_value)
print(query)
last_detected = None
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(query)
last_detected = cur.fetchone()[0].timestamp()
return last_detected
def UpdateLastSeen(new_message_dict):
print(new_message_dict)
device_id_s = str(new_message_dict["device_id"])
# matches code in well-api
radar_threshold_signal = "s3_max"
radar_threshold_value = 10
threshold_details = GetRedisString("radar_threshold" + device_id_s)
try:
radar_threshold_list = ast.literal_eval(threshold_details)
radar_threshold_signal = radar_threshold_list[0]
radar_threshold_value = radar_threshold_list[1]
except:
# key not found so read from DB, and store to key
sql = f"""
SELECT "radar_threshold" AS threshold FROM devices WHERE device_id = {device_id_s};
"""
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(sql)
threshold_details = cur.fetchone()[0] #cur.fetchall()#
print(threshold_details)
radar_threshold_signal = "s3_max"
radar_threshold_value = 10
if threshold_details != None:
threshold_details_list = ast.literal_eval(threshold_details)
radar_threshold_signal = threshold_details_list[0]
radar_threshold_value = threshold_details_list[1]
r.set('radar_threshold'+device_id_s, str([radar_threshold_signal, radar_threshold_value]))
# lets determine if presence is detected
component = radar_threshold_signal.split("_")[0]
radar_val = 0
if component == "s28":
radar_val = sum(new_message_dict['radar'][1][3:9]) / new_message_dict['radar'][1][0]
elif component[0] == "s":
component_index = ast.literal_eval(component[1])
radar_val = new_message_dict['radar'][1][1 + component_index] / new_message_dict['radar'][1][0]
elif component[0] == "m":
component_index = ast.literal_eval(component[1])
radar_val = new_message_dict['radar'][0][5 + component_index] / new_message_dict['radar'][0][0]
if radar_val > radar_threshold_value: #seen now
r.set('lastseen_'+device_id_s, new_message_dict['time'])
else: #not seen now, but lets determine when he was and add the key
last_seen = GetRedisFloat('lastseen_'+device_id_s)
if last_seen == None:
last_seen = GetLastDetected(device_id_s, radar_threshold_signal, radar_threshold_value)
r.set('lastseen_'+device_id_s, last_seen)
new_message_dict = {'time': 1743554317.579559, 'device_id': 499, 'radar': [[100, 100, 0, 0, 0, 2226, 2654, 425, 523, 445, 340, 436, 339, 548], [100, 0, 0, 706, 657, 547, 570, 553, 509, 499]]}
st = time.time()
UpdateLastSeen(new_message_dict)
print(time.time()-st)
if False:
# Simple key-value
value = r.get('simple_key')
print(f"Simple key value: {value.decode('utf-8')}")
# WRITING DATA TO REDIS
# Simple key-value
r.set('simple_key', 'Hello, Redis!')
# Setting expiration (TTL) in seconds
r.setex('expiring_key', 60, 'This will expire in 60 seconds')
# Working with numbers
r.set('counter', 0)
r.incr('counter') # Increment by 1
r.incrby('counter', 5) # Increment by 5
# Lists
r.rpush('my_list', 'item1')
r.rpush('my_list', 'item2', 'item3') # Add multiple items
# Sets
r.sadd('my_set', 'member1', 'member2', 'member3')
# Hashes (dictionaries)
r.hset('user:1000', mapping={
'username': 'john_doe',
'email': 'john@example.com',
'visits': 10
})
# READING DATA FROM REDIS
# Simple key-value
value = r.get("simple_key")
print(f"Simple key value: {value.decode('utf-8')}")
# Check current counter value
counter = r.get("counter")
print(f"Counter value: {counter.decode('utf-8')}")
# Lists
all_items = r.lrange("my_list", 0, -1) # Get all items
print("List items:", [item.decode("utf-8") for item in all_items])
# Sets
all_members = r.smembers("my_set")
print("Set members:", [member.decode("utf-8") for member in all_members])
# Check if a member exists in a set
is_member = r.sismember("my_set", "member1")
print(f"Is 'member1' in set? {is_member}")
# Hashes
username = r.hget("user:1000", "username")
print(f"Username: {username.decode('utf-8')}")
# Get all hash fields
user_data = r.hgetall("user:1000")
print(
"User data:",
{k.decode("utf-8"): v.decode("utf-8") for k, v in user_data.items()},
)
# Check remaining TTL for expiring key (in seconds)
ttl = r.ttl("expiring_key")
print(f"Expiring key TTL: {ttl} seconds")