well-api/wellDbQuery.py
2025-12-13 12:21:31 -08:00

545 lines
22 KiB
Python

#!/usr/bin/env python3
import os
import sys
import csv
import json
import zipfile
import argparse
import datetime
import logging
import ast
import io
from collections import defaultdict
from dotenv import load_dotenv
# Try to import psycopg2
try:
import psycopg2
from psycopg2.extras import RealDictCursor
except ImportError:
print("Error: psycopg2 module not found. Please install it: pip install psycopg2-binary")
sys.exit(1)
# ==========================================
# Configuration & Defaults
# ==========================================
load_dotenv()
DEFAULTS = {
'DB_NAME': os.getenv('DB_NAME', 'wellnuo'),
'DB_USER': os.getenv('DB_USER', 'well_app'),
'DB_PASS': os.getenv('DB_PASSWORD', 'well_app_2024'),
'DB_HOST': os.getenv('DB_HOST', '192.168.68.70'),
'DB_PORT': os.getenv('DB_PORT', '5432'),
'OUT_FILE': "out.zip",
'RADAR_PART': "s28",
'GROUP_BY': "by_minute"
}
# Custom Logging Levels
LOG_INFO = 20
LOG_STEPS = 15 # Level 1 (-d 1)
LOG_DATA = 12 # Level 2 (-d 2)
LOG_SQL = 5 # Level 3 (-d 3)
logging.addLevelName(LOG_STEPS, "STEPS")
logging.addLevelName(LOG_DATA, "DATA")
logging.addLevelName(LOG_SQL, "SQL")
logger = logging.getLogger("wellDbQuery")
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# ==========================================
# Database Abstraction
# ==========================================
class Database:
def __init__(self, args):
self.conn_params = {
"host": args.db_host,
"port": args.db_port,
"dbname": args.db_name,
"user": args.db_username,
"password": args.db_password
}
self.conn = None
def connect(self):
if self.conn is None or self.conn.closed:
try:
logger.log(LOG_STEPS, f"Connecting to database {self.conn_params['host']}...")
self.conn = psycopg2.connect(**self.conn_params)
except Exception as e:
logger.error(f"Database connection failed: {e}")
sys.exit(1)
def close(self):
if self.conn:
self.conn.close()
logger.log(LOG_STEPS, "Database connection closed.")
def execute(self, query, params=None):
self.connect()
try:
with self.conn.cursor(cursor_factory=RealDictCursor) as cur:
logger.log(LOG_SQL, f"EXECUTING SQL: {query}")
if params:
logger.log(LOG_SQL, f"PARAMS: {params}")
cur.execute(query, params)
# If query returns rows
if cur.description:
return cur.fetchall()
return None
except Exception as e:
logger.error(f"Query execution failed: {e}")
if logger.level <= LOG_SQL:
import traceback
traceback.print_exc()
sys.exit(1)
# ==========================================
# Data Processing Logic
# ==========================================
class DataProcessor:
"""
Handles the complexity of unwrapping 'mtype' sensor readings
and merging them with radar data.
"""
@staticmethod
def get_bucket_interval(group_by):
if group_by == 'by_10_seconds':
return '10 seconds'
elif group_by == 'by_hour':
return '1 hour'
elif group_by == 'by_10_minute':
return '10 minutes'
return '1 minute'
@staticmethod
def build_fetch_query(device_ids, start_time, end_time, radar_part, all_radar, group_by):
"""
Builds a query that fetches raw rows. We perform the pivoting in Python
to dynamically handle the sparse s10-s79 columns.
"""
device_list_str = ",".join(map(str, device_ids))
bucket_interval = DataProcessor.get_bucket_interval(group_by)
# Radar Selection Logic
if all_radar:
radar_select = """
MAX(absent) AS radar_absent, MAX(moving) AS radar_moving, MAX(stationary) AS radar_stationary, MAX("both") AS radar_both,
MAX(m0) AS m0, MAX(m1) AS m1, MAX(m2) AS m2, MAX(m3) AS m3, MAX(m4) AS m4, MAX(m5) AS m5, MAX(m6) AS m6, MAX(m7) AS m7, MAX(m8) AS m8,
MAX(s2) AS radar_s2, MAX(s3) AS radar_s3, MAX(s4) AS radar_s4, MAX(s5) AS radar_s5, MAX(s6) AS radar_s6, MAX(s7) AS radar_s7, MAX(s8) AS radar_s8
"""
radar_outer = """
rr.radar_absent, rr.radar_moving, rr.radar_stationary, rr.radar_both,
rr.m0, rr.m1, rr.m2, rr.m3, rr.m4, rr.m5, rr.m6, rr.m7, rr.m8,
rr.radar_s2, rr.radar_s3, rr.radar_s4, rr.radar_s5, rr.radar_s6, rr.radar_s7, rr.radar_s8
"""
else:
radar_expr = radar_part
if radar_expr == "s28":
radar_expr = "(s2+s3+s4+s5+s6+s7+s8)/7"
radar_select = f"MAX({radar_expr}) AS radar"
radar_outer = "rr.radar"
# We fetch s0-s9 and mtype. Python will map these to s0-s79.
# We use MAX/MIN aggregation to flatten duplicates within the time bucket if any exist
# though typically mtypes are distinct rows.
sql = f"""
SELECT
COALESCE(sr.minute, rr.minute) as time,
COALESCE(sr.device_id, rr.device_id) as device_id,
sr.mtype,
sr.temperature, sr.humidity, sr.pressure, sr.light,
sr.s0, sr.s1, sr.s2, sr.s3, sr.s4, sr.s5, sr.s6, sr.s7, sr.s8, sr.s9,
{radar_outer}
FROM (
SELECT
time_bucket('{bucket_interval}', time) AS minute,
device_id,
mtype,
AVG(temperature) AS temperature,
AVG(humidity) AS humidity,
AVG(pressure) AS pressure,
MAX(light) AS light,
MIN(s0) as s0, MIN(s1) as s1, MIN(s2) as s2, MIN(s3) as s3, MIN(s4) as s4,
MIN(s5) as s5, MIN(s6) as s6, MIN(s7) as s7, MIN(s8) as s8, MIN(s9) as s9
FROM sensor_readings
WHERE device_id IN ({device_list_str})
AND time >= '{start_time}'
AND time < '{end_time}'
GROUP BY minute, device_id, mtype
) sr
FULL OUTER JOIN (
SELECT
time_bucket('{bucket_interval}', time) AS minute,
device_id,
{radar_select}
FROM radar_readings
WHERE device_id IN ({device_list_str})
AND time >= '{start_time}'
AND time < '{end_time}'
GROUP BY minute, device_id
) rr
ON sr.minute = rr.minute AND sr.device_id = rr.device_id
ORDER BY time ASC, device_id ASC;
"""
return sql
@staticmethod
def process_rows(rows):
"""
Pivots the rows.
Input: Multiple rows per timestamp (one for each mtype).
Output: Single row per timestamp with columns s0...s79 populated dynamically.
"""
if not rows:
return [], []
# Dictionary to hold merged records: key = (time, device_id)
merged_data = defaultdict(dict)
# Track which sensor columns actually have data to optimize CSV headers
active_s_columns = set()
# Base headers that are always present
base_headers = ['time', 'device_id', 'temperature', 'humidity', 'pressure', 'light']
# Radar headers depend on query, extract them from the first row excluding known sensor/base cols
first_row_keys = list(rows[0].keys())
radar_headers = [k for k in first_row_keys if k not in base_headers and k not in ['mtype'] and not (k.startswith('s') and k[1:].isdigit())]
for row in rows:
key = (row['time'], row['device_id'])
# Initialize base data if not present
if key not in merged_data:
for h in base_headers:
merged_data[key][h] = row.get(h)
for h in radar_headers:
merged_data[key][h] = row.get(h)
# Merge Base Sensor Data (Temp/Hum/etc) if current row has it and stored is None
# (This handles FULL OUTER JOIN nulls)
if row.get('temperature') is not None:
for h in base_headers[2:]: # Skip time/id
merged_data[key][h] = row.get(h)
# Merge Radar Data
if any(row.get(h) is not None for h in radar_headers):
for h in radar_headers:
if row.get(h) is not None:
merged_data[key][h] = row.get(h)
# Process Extended Sensors (s0-s79) based on mtype
mtype = row.get('mtype')
# Logic from well-api.py:
# mtype 0, 17, 100 -> s0-s9
# mtype 110 -> s10-s19 ... mtype 170 -> s70-s79
if mtype is not None:
base_offset = 0
if mtype in [0, 17, 100]:
base_offset = 0
elif 110 <= mtype <= 170:
base_offset = mtype - 100
else:
# Unknown mtype, skip sensor mapping or log debug
continue
for i in range(10):
val = row.get(f's{i}')
if val is not None:
target_idx = base_offset + i
col_name = f's{target_idx}'
merged_data[key][col_name] = val
active_s_columns.add(target_idx)
# Sort active sensor columns numerically
sorted_s_cols = [f's{i}' for i in sorted(list(active_s_columns))]
# Final Header List
final_headers = base_headers + radar_headers + sorted_s_cols
# Flatten dictionary to list of dicts
final_rows = []
for key in sorted(merged_data.keys()): # Sort by time, device_id
row_data = merged_data[key]
# Ensure all columns exist in dict for CSV writer
for col in final_headers:
if col not in row_data:
row_data[col] = None
final_rows.append(row_data)
return final_rows, final_headers
# ==========================================
# Main Application Logic
# ==========================================
class WellExporter:
def __init__(self, args):
self.args = args
self.db = Database(args)
self.target_device_ids = []
self.file_identifier = "unknown" # Used for filename prefix
def resolve_devices(self):
"""Resolves command line arguments into a list of internal device_ids."""
ids = set()
# 1. Explicit Device ID
if self.args.device_id:
ids.add(int(self.args.device_id))
self.file_identifier = str(self.args.device_id)
logger.log(LOG_STEPS, f"Resolved Device ID: {self.args.device_id}")
# 2. Well ID
if self.args.well_id:
rows = self.db.execute("SELECT device_id FROM public.devices WHERE well_id = %s", (self.args.well_id,))
if rows:
ids.add(rows[0]['device_id'])
self.file_identifier = str(self.args.well_id)
logger.log(LOG_STEPS, f"Resolved Well ID {self.args.well_id} -> Device ID {rows[0]['device_id']}")
else:
logger.warning(f"Well ID {self.args.well_id} not found.")
# 3. MAC Address
if self.args.mac:
rows = self.db.execute("SELECT device_id FROM public.devices WHERE device_mac = %s", (self.args.mac,))
if rows:
ids.add(rows[0]['device_id'])
self.file_identifier = self.args.mac
logger.log(LOG_STEPS, f"Resolved MAC {self.args.mac} -> Device ID {rows[0]['device_id']}")
else:
logger.warning(f"MAC {self.args.mac} not found.")
# 4. Deployment ID
if self.args.deployment_id:
self.file_identifier = str(self.args.deployment_id)
logger.log(LOG_STEPS, f"Resolving devices for Deployment ID: {self.args.deployment_id}")
rows = self.db.execute("SELECT devices FROM public.deployment_details WHERE deployment_id = %s", (self.args.deployment_id,))
if rows and rows[0]['devices']:
raw_devices = rows[0]['devices']
macs_to_find = []
try:
# Handle various formats stored in DB (JSON list, string representation, nested lists)
# Format seen in well-api: '["MAC1", "MAC2"]' or '[[id,id,loc,desc,MAC],...]'
try:
device_data = json.loads(raw_devices)
except json.JSONDecodeError:
try:
device_data = ast.literal_eval(raw_devices)
except:
# Fallback for simple comma separated string
device_data = [d.strip() for d in raw_devices.strip('[]"\'').split(',')]
for item in device_data:
# Format: [[id, id, "Loc", "Desc", "MAC"], ...]
if isinstance(item, list) and len(item) > 4:
macs_to_find.append(item[4])
# Format: ["MAC1", "MAC2"]
elif isinstance(item, str):
# Clean potential quotes
clean_mac = item.strip().replace('"', '').replace("'", "")
if len(clean_mac) in [12, 17]:
macs_to_find.append(clean_mac)
if macs_to_find:
placeholders = ','.join(['%s'] * len(macs_to_find))
sql = f"SELECT device_id FROM public.devices WHERE device_mac IN ({placeholders})"
d_rows = self.db.execute(sql, tuple(macs_to_find))
if d_rows:
for r in d_rows:
ids.add(r['device_id'])
logger.info(f"Found {len(d_rows)} devices in deployment {self.args.deployment_id}")
else:
logger.warning("No matching devices found in DB for the MACs in deployment.")
except Exception as e:
logger.error(f"Failed to parse deployment devices string: {e}")
else:
logger.warning(f"Deployment {self.args.deployment_id} not found or empty.")
self.target_device_ids = sorted(list(ids))
if not self.target_device_ids:
logger.error("No valid devices found based on input parameters.")
sys.exit(1)
logger.log(LOG_DATA, f"Target Device IDs: {self.target_device_ids}")
def run(self):
# 1. Setup
self.resolve_devices()
# 2. Parse Dates
try:
start_date = datetime.datetime.strptime(self.args.date_from, "%Y-%m-%d")
end_date = datetime.datetime.strptime(self.args.date_to, "%Y-%m-%d")
except ValueError:
logger.error("Invalid date format. Use YYYY-MM-DD")
sys.exit(1)
if start_date > end_date:
start_date, end_date = end_date, start_date
# 3. Open Zip File
try:
logger.info(f"Creating output file: {self.args.outFile}")
zip_buffer = zipfile.ZipFile(self.args.outFile, 'w', zipfile.ZIP_DEFLATED)
except Exception as e:
logger.error(f"Failed to create zip file: {e}")
sys.exit(1)
# 4. Iterate Days
current_date = start_date
total_rows_exported = 0
try:
while current_date <= end_date:
day_str = current_date.strftime("%Y-%m-%d")
# Define 24h window (UTC assumed based on schema usage in well-api)
t_start = f"{day_str} 00:00:00"
t_end = (current_date + datetime.timedelta(days=1)).strftime("%Y-%m-%d 00:00:00")
logger.info(f"Processing date: {day_str}...")
# Build Query
sql = DataProcessor.build_fetch_query(
self.target_device_ids,
t_start,
t_end,
self.args.radar_part,
self.args.allRadar,
self.args.group_by
)
# Execute
raw_rows = self.db.execute(sql)
if raw_rows:
logger.log(LOG_DATA, f" -> Fetched {len(raw_rows)} raw rows (including mtype splits).")
# Process / Pivot Data
processed_rows, headers = DataProcessor.process_rows(raw_rows)
count = len(processed_rows)
total_rows_exported += count
logger.log(LOG_STEPS, f" -> Processed into {count} unique time records.")
# Generate CSV in memory
csv_buffer = io.StringIO()
writer = csv.DictWriter(csv_buffer, fieldnames=headers)
writer.writeheader()
writer.writerows(processed_rows)
# Add to Zip
# Format: {ID}_{DATE}_{GROUP}_rc_data.csv
csv_filename = f"{self.file_identifier}_{day_str}_{self.args.group_by}_rc_data.csv"
zip_buffer.writestr(csv_filename, csv_buffer.getvalue())
logger.log(LOG_STEPS, f" -> Added {csv_filename} to zip.")
else:
logger.log(LOG_STEPS, " -> No data found for this date.")
current_date += datetime.timedelta(days=1)
except KeyboardInterrupt:
logger.warning("Operation cancelled by user.")
except Exception as e:
logger.error(f"An error occurred during processing: {e}")
if self.args.debug > 0:
import traceback
traceback.print_exc()
finally:
zip_buffer.close()
self.db.close()
logger.info(f"Export complete. Total records: {total_rows_exported}. File: {self.args.outFile}")
# ==========================================
# Argument Parsing
# ==========================================
def parse_arguments():
parser = argparse.ArgumentParser(
description="Wellnuo Database Export Tool",
formatter_class=argparse.RawTextHelpFormatter,
epilog="""Examples:
# Query by Device ID
wellDbQuery.py --device_id 560 --date_from 2025-03-09 --date_to 2025-04-22 --outFile c.zip
# Query by Deployment ID (all devices in deployment)
wellDbQuery.py --deployment_id 21 --date_from 2025-06-01 --date_to 2025-06-01 --outFile deployment_21.zip
# Query by MAC with all radar columns and high debug
wellDbQuery.py --mac 64B70888FAB0 --date_from 2025-01-01 --date_to 2025-01-01 --allRadar -d 2
"""
)
# Selection Group (Mutually Exclusive logic handled in code)
sel = parser.add_argument_group('Target Selection (One required)')
sel.add_argument('--device_id', '-di', type=int, help='Target Device ID (internal DB id)')
sel.add_argument('--well_id', '-wi', type=int, help='Target Well ID (external id)')
sel.add_argument('--mac', '-m', type=str, help='Target Device MAC Address')
sel.add_argument('--deployment_id', '-depid', type=int, help='Target Deployment ID (fetches all devices in deployment)')
# Date Group
date = parser.add_argument_group('Date Range')
date.add_argument('--date_from', '-df', type=str, required=True, help='Start Date (YYYY-MM-DD)')
date.add_argument('--date_to', '-dt', type=str, required=True, help='End Date (YYYY-MM-DD)')
# DB Config
db = parser.add_argument_group('Database Configuration')
db.add_argument('--db_name' , default=DEFAULTS['DB_NAME'], help=f"Default: {DEFAULTS['DB_NAME']}")
db.add_argument('--db_username' , default=DEFAULTS['DB_USER'], help=f"Default: {DEFAULTS['DB_USER']}")
db.add_argument('--db_password' , default=DEFAULTS['DB_PASS'], help="Default: from .env")
db.add_argument('--db_host' , default=DEFAULTS['DB_HOST'], help=f"Default: {DEFAULTS['DB_HOST']}")
db.add_argument('--db_port' , default=DEFAULTS['DB_PORT'], help=f"Default: {DEFAULTS['DB_PORT']}")
# Options
opts = parser.add_argument_group('Export Options')
opts.add_argument('--outFile', '-o' , default=DEFAULTS['OUT_FILE'], help=f"Output ZIP filename (default: {DEFAULTS['OUT_FILE']})")
opts.add_argument('--radar_part', '-radar', default=DEFAULTS['RADAR_PART'], help=f"Radar column expression (default: {DEFAULTS['RADAR_PART']})")
opts.add_argument('--allRadar', '-allr', action='store_true', help="Retrieve all raw radar columns instead of calculated part")
opts.add_argument('--group_by', '-g', default=DEFAULTS['GROUP_BY'], choices=['by_minute', 'by_10_seconds', 'by_hour', 'by_10_minute'], help="Time aggregation bucket")
opts.add_argument('-d', '--debug', type=int, default=0, choices=[0, 1, 2, 3], help="Debug level: 0=Info, 1=Steps, 2=Data, 3=SQL")
args = parser.parse_args()
if not any([args.device_id, args.well_id, args.mac, args.deployment_id]):
parser.error("You must provide one of --device_id, --well_id, --mac, or --deployment_id")
return args
def setup_logging_level(level):
if level == 0:
logger.setLevel(logging.INFO)
elif level == 1:
logger.setLevel(LOG_STEPS)
elif level == 2:
logger.setLevel(LOG_DATA)
elif level == 3:
logger.setLevel(LOG_SQL)
# ==========================================
# Entry Point
# ==========================================
if __name__ == "__main__":
args = parse_arguments()
setup_logging_level(args.debug)
exporter = WellExporter(args)
exporter.run()