231 lines
7.0 KiB
Python
231 lines
7.0 KiB
Python
import os
|
||
import redis
|
||
import psycopg2
|
||
from dotenv import load_dotenv
|
||
import ast
|
||
import time
|
||
|
||
load_dotenv()
|
||
|
||
DB_NAME = os.getenv('DB_NAME')
|
||
DB_USER = os.getenv('DB_USER')
|
||
DB_PASSWORD = os.getenv('DB_PASSWORD')
|
||
DB_HOST = os.getenv('DB_HOST')
|
||
DB_PORT = os.getenv('DB_PORT')
|
||
|
||
# Connect to Redis (assuming it's running on localhost with default port)
|
||
r = redis.Redis(host='localhost', port=6379, db=0)
|
||
|
||
def GetRedisString(key_name):
|
||
try:
|
||
result = r.get(key_name).decode('utf-8')
|
||
except:
|
||
result = None
|
||
return result
|
||
|
||
def GetRedisInt(key_name):
|
||
try:
|
||
result = int(r.get(key_name).decode('utf-8'))
|
||
except:
|
||
result = None
|
||
return result
|
||
|
||
def GetRedisFloat(key_name):
|
||
try:
|
||
result = float(r.get(key_name).decode('utf-8'))
|
||
except:
|
||
result = None
|
||
|
||
return result
|
||
|
||
value = GetRedisFloat('lastseen_510')
|
||
print(value)
|
||
|
||
def get_db_connection():
|
||
return psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT)
|
||
|
||
def GetLastPointQuery(device_id, field_name, threshold_value):
|
||
"""
|
||
Generate a SQL query to find the last point in radar_readings where the specified field
|
||
meets the threshold condition.
|
||
|
||
Parameters:
|
||
device_id (int): The device ID to filter by
|
||
field_name (str): The field to check, e.g., "s2_max", "s28_min", etc.
|
||
threshold_value (float): The threshold value to compare against
|
||
|
||
Returns:
|
||
str: SQL query string
|
||
"""
|
||
# Parse the field name to get base field and operation
|
||
parts = field_name.split('_')
|
||
base_field = parts[0]
|
||
operation = parts[1] if len(parts) > 1 else None
|
||
|
||
# Define the field expression based on base_field
|
||
if base_field == 's28':
|
||
field_expr = "(s2+s3+s4+s5+s6+s7+s8)/7"
|
||
elif base_field == 'm08':
|
||
field_expr = "(m0+m1+m2+m3+m4+m5+m6+m7+m8)/9"
|
||
else:
|
||
field_expr = base_field
|
||
|
||
# Define comparison operator based on operation
|
||
operator = ">" if operation == "max" else "<"
|
||
|
||
# Generate the SQL query
|
||
query = f"""
|
||
SELECT "time" AS point_time, {field_expr} AS point_value FROM radar_readings WHERE device_id = {device_id}
|
||
AND {field_expr} {operator} {threshold_value} ORDER BY "time" DESC LIMIT 1;
|
||
"""
|
||
|
||
return query
|
||
|
||
|
||
def GetLastDetected(device_id, field_name, threshold_value):
|
||
# Example usage:
|
||
|
||
query = GetLastPointQuery(device_id, field_name, threshold_value)
|
||
print(query)
|
||
last_detected = None
|
||
with get_db_connection() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(query)
|
||
last_detected = cur.fetchone()[0].timestamp()
|
||
|
||
return last_detected
|
||
|
||
def UpdateLastSeen(new_message_dict):
|
||
print(new_message_dict)
|
||
|
||
device_id_s = str(new_message_dict["device_id"])
|
||
|
||
# matches code in well-api
|
||
radar_threshold_signal = "s3_max"
|
||
radar_threshold_value = 10
|
||
|
||
threshold_details = GetRedisString("radar_threshold" + device_id_s)
|
||
try:
|
||
radar_threshold_list = ast.literal_eval(threshold_details)
|
||
radar_threshold_signal = radar_threshold_list[0]
|
||
radar_threshold_value = radar_threshold_list[1]
|
||
except:
|
||
# key not found so read from DB, and store to key
|
||
sql = f"""
|
||
SELECT "radar_threshold" AS threshold FROM devices WHERE device_id = {device_id_s};
|
||
"""
|
||
|
||
with get_db_connection() as conn:
|
||
with conn.cursor() as cur:
|
||
cur.execute(sql)
|
||
threshold_details = cur.fetchone()[0] #cur.fetchall()#
|
||
print(threshold_details)
|
||
radar_threshold_signal = "s3_max"
|
||
radar_threshold_value = 10
|
||
|
||
if threshold_details != None:
|
||
|
||
threshold_details_list = ast.literal_eval(threshold_details)
|
||
radar_threshold_signal = threshold_details_list[0]
|
||
radar_threshold_value = threshold_details_list[1]
|
||
|
||
r.set('radar_threshold'+device_id_s, str([radar_threshold_signal, radar_threshold_value]))
|
||
|
||
# lets determine if presence is detected
|
||
component = radar_threshold_signal.split("_")[0]
|
||
radar_val = 0
|
||
if component == "s28":
|
||
radar_val = sum(new_message_dict['radar'][1][3:9]) / new_message_dict['radar'][1][0]
|
||
elif component[0] == "s":
|
||
component_index = ast.literal_eval(component[1])
|
||
radar_val = new_message_dict['radar'][1][1 + component_index] / new_message_dict['radar'][1][0]
|
||
elif component[0] == "m":
|
||
component_index = ast.literal_eval(component[1])
|
||
radar_val = new_message_dict['radar'][0][5 + component_index] / new_message_dict['radar'][0][0]
|
||
|
||
if radar_val > radar_threshold_value: #seen now
|
||
r.set('lastseen_'+device_id_s, new_message_dict['time'])
|
||
|
||
else: #not seen now, but lets determine when he was and add the key
|
||
last_seen = GetRedisFloat('lastseen_'+device_id_s)
|
||
if last_seen == None:
|
||
last_seen = GetLastDetected(device_id_s, radar_threshold_signal, radar_threshold_value)
|
||
r.set('lastseen_'+device_id_s, last_seen)
|
||
|
||
|
||
new_message_dict = {'time': 1743554317.579559, 'device_id': 499, 'radar': [[100, 100, 0, 0, 0, 2226, 2654, 425, 523, 445, 340, 436, 339, 548], [100, 0, 0, 706, 657, 547, 570, 553, 509, 499]]}
|
||
|
||
st = time.time()
|
||
UpdateLastSeen(new_message_dict)
|
||
print(time.time()-st)
|
||
|
||
if False:
|
||
# Simple key-value
|
||
|
||
value = r.get('simple_key')
|
||
print(f"Simple key value: {value.decode('utf-8')}")
|
||
|
||
# WRITING DATA TO REDIS
|
||
|
||
# Simple key-value
|
||
r.set('simple_key', 'Hello, Redis!')
|
||
|
||
# Setting expiration (TTL) in seconds
|
||
r.setex('expiring_key', 60, 'This will expire in 60 seconds')
|
||
|
||
# Working with numbers
|
||
r.set('counter', 0)
|
||
r.incr('counter') # Increment by 1
|
||
r.incrby('counter', 5) # Increment by 5
|
||
|
||
# Lists
|
||
r.rpush('my_list', 'item1')
|
||
r.rpush('my_list', 'item2', 'item3') # Add multiple items
|
||
|
||
# Sets
|
||
r.sadd('my_set', 'member1', 'member2', 'member3')
|
||
|
||
# Hashes (dictionaries)
|
||
r.hset('user:1000', mapping={
|
||
'username': 'john_doe',
|
||
'email': 'john@example.com',
|
||
'visits': 10
|
||
})
|
||
|
||
# READING DATA FROM REDIS
|
||
|
||
# Simple key-value
|
||
value = r.get("simple_key")
|
||
print(f"Simple key value: {value.decode('utf-8')}")
|
||
|
||
# Check current counter value
|
||
counter = r.get("counter")
|
||
print(f"Counter value: {counter.decode('utf-8')}")
|
||
|
||
# Lists
|
||
all_items = r.lrange("my_list", 0, -1) # Get all items
|
||
print("List items:", [item.decode("utf-8") for item in all_items])
|
||
|
||
# Sets
|
||
all_members = r.smembers("my_set")
|
||
print("Set members:", [member.decode("utf-8") for member in all_members])
|
||
|
||
# Check if a member exists in a set
|
||
is_member = r.sismember("my_set", "member1")
|
||
print(f"Is 'member1' in set? {is_member}")
|
||
|
||
# Hashes
|
||
username = r.hget("user:1000", "username")
|
||
print(f"Username: {username.decode('utf-8')}")
|
||
|
||
# Get all hash fields
|
||
user_data = r.hgetall("user:1000")
|
||
print(
|
||
"User data:",
|
||
{k.decode("utf-8"): v.decode("utf-8") for k, v in user_data.items()},
|
||
)
|
||
|
||
# Check remaining TTL for expiring key (in seconds)
|
||
ttl = r.ttl("expiring_key")
|
||
print(f"Expiring key TTL: {ttl} seconds")
|