queue-muncher/redis_test.py
RZ_MINIX\rober 1dbb91f8ba Initial push
2025-06-15 20:31:11 -07:00

234 lines
6.7 KiB
Python

import os
import redis
import psycopg2
from dotenv import load_dotenv
import ast
import time
load_dotenv()
DB_NAME = os.getenv('DB_NAME')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST')
DB_PORT = os.getenv('DB_PORT')
# Connect to Redis (assuming it's running on localhost with default port)
r = redis.Redis(host='localhost', port=6379, db=0)
def GetRedisString(key_name):
try:
result = r.get(key_name).decode('utf-8')
except:
result = None
return result
def GetRedisInt(key_name):
try:
result = int(r.get(key_name).decode('utf-8'))
except:
result = None
return result
def GetRedisFloat(key_name):
try:
result = float(r.get(key_name).decode('utf-8'))
except:
result = None
return result
value = GetRedisFloat('lastseen_501')
print(value)
r.set('lastseen_510', 1.444)
value = GetRedisFloat('lastseen_510')
print(value)
def get_db_connection():
return psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT)
def GetLastPointQuery(device_id, field_name, threshold_value):
"""
Generate a SQL query to find the last point in radar_readings where the specified field
meets the threshold condition.
Parameters:
device_id (int): The device ID to filter by
field_name (str): The field to check, e.g., "s2_max", "s28_min", etc.
threshold_value (float): The threshold value to compare against
Returns:
str: SQL query string
"""
# Parse the field name to get base field and operation
parts = field_name.split('_')
base_field = parts[0]
operation = parts[1] if len(parts) > 1 else None
# Define the field expression based on base_field
if base_field == 's28':
field_expr = "(s2+s3+s4+s5+s6+s7+s8)/7"
elif base_field == 'm08':
field_expr = "(m0+m1+m2+m3+m4+m5+m6+m7+m8)/9"
else:
field_expr = base_field
# Define comparison operator based on operation
operator = ">" if operation == "max" else "<"
# Generate the SQL query
query = f"""
SELECT "time" AS point_time, {field_expr} AS point_value FROM radar_readings WHERE device_id = {device_id}
AND {field_expr} {operator} {threshold_value} ORDER BY "time" DESC LIMIT 1;
"""
return query
def GetLastDetected(device_id, field_name, threshold_value):
# Example usage:
query = GetLastPointQuery(device_id, field_name, threshold_value)
print(query)
last_detected = None
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(query)
last_detected = cur.fetchone()[0].timestamp()
return last_detected
def UpdateLastSeen(new_message_dict):
print(new_message_dict)
device_id_s = str(new_message_dict['device_id'])
#matches code in well-api
radar_threshold_signal = "s3_max"
radar_threshold_value = 10
threshold_details = GetRedisString('radar_threshold'+device_id_s)
try:
radar_threshold_list = ast.literal_eval(threshold_details)
radar_threshold_signal = radar_threshold_list[0]
radar_threshold_value = radar_threshold_list[1]
except:
#key not found so read from DB, and store to key
sql = f"""
SELECT "radar_threshold" AS threshold FROM devices WHERE device_id = {device_id_s};
"""
with get_db_connection() as conn:
with conn.cursor() as cur:
cur.execute(sql)
threshold_details = cur.fetchone()[0] #cur.fetchall()#
print(threshold_details)
radar_threshold_signal = "s3_max"
radar_threshold_value = 10
if threshold_details != None:
threshold_details_list = ast.literal_eval(threshold_details)
radar_threshold_signal = threshold_details_list[0]
radar_threshold_value = threshold_details_list[1]
r.set('radar_threshold'+device_id_s, str([radar_threshold_signal, radar_threshold_value]))
#lets determine if presence is detected
component = radar_threshold_signal.split("_")[0]
radar_val = 0
if component == "s28":
radar_val = sum(new_message_dict['radar'][1][3:9]) / new_message_dict['radar'][1][0]
elif component[0] == "s":
component_index = ast.literal_eval(component[1])
radar_val = new_message_dict['radar'][1][1 + component_index] / new_message_dict['radar'][1][0]
elif component[0] == "m":
component_index = ast.literal_eval(component[1])
radar_val = new_message_dict['radar'][0][5 + component_index] / new_message_dict['radar'][0][0]
if radar_val > radar_threshold_value: #seen now
r.set('lastseen_'+device_id_s, new_message_dict['time'])
else: #not seen now, but lets determine when he was and add the key
last_seen = GetRedisFloat('lastseen_'+device_id_s)
if last_seen == None:
last_seen = GetLastDetected(device_id_s, radar_threshold_signal, radar_threshold_value)
r.set('lastseen_'+device_id_s, last_seen)
new_message_dict = {'time': 1743554317.579559, 'device_id': 499, 'radar': [[100, 100, 0, 0, 0, 2226, 2654, 425, 523, 445, 340, 436, 339, 548], [100, 0, 0, 706, 657, 547, 570, 553, 509, 499]]}
st = time.time()
UpdateLastSeen(new_message_dict)
print(time.time()-st)
if False:
# Simple key-value
value = r.get('simple_key')
print(f"Simple key value: {value.decode('utf-8')}")
# WRITING DATA TO REDIS
# Simple key-value
r.set('simple_key', 'Hello, Redis!')
# Setting expiration (TTL) in seconds
r.setex('expiring_key', 60, 'This will expire in 60 seconds')
# Working with numbers
r.set('counter', 0)
r.incr('counter') # Increment by 1
r.incrby('counter', 5) # Increment by 5
# Lists
r.rpush('my_list', 'item1')
r.rpush('my_list', 'item2', 'item3') # Add multiple items
# Sets
r.sadd('my_set', 'member1', 'member2', 'member3')
# Hashes (dictionaries)
r.hset('user:1000', mapping={
'username': 'john_doe',
'email': 'john@example.com',
'visits': 10
})
# READING DATA FROM REDIS
# Simple key-value
value = r.get('simple_key')
print(f"Simple key value: {value.decode('utf-8')}")
# Check current counter value
counter = r.get('counter')
print(f"Counter value: {counter.decode('utf-8')}")
# Lists
all_items = r.lrange('my_list', 0, -1) # Get all items
print("List items:", [item.decode('utf-8') for item in all_items])
# Sets
all_members = r.smembers('my_set')
print("Set members:", [member.decode('utf-8') for member in all_members])
# Check if a member exists in a set
is_member = r.sismember('my_set', 'member1')
print(f"Is 'member1' in set? {is_member}")
# Hashes
username = r.hget('user:1000', 'username')
print(f"Username: {username.decode('utf-8')}")
# Get all hash fields
user_data = r.hgetall('user:1000')
print("User data:", {k.decode('utf-8'): v.decode('utf-8') for k, v in user_data.items()})
# Check remaining TTL for expiring key (in seconds)
ttl = r.ttl('expiring_key')
print(f"Expiring key TTL: {ttl} seconds")