261 lines
9.9 KiB
Python
261 lines
9.9 KiB
Python
#!/usr/bin/env python3
|
|
import unittest
|
|
import requests
|
|
import os
|
|
import json
|
|
import psycopg2
|
|
import sys
|
|
import time
|
|
from dotenv import load_dotenv
|
|
|
|
# --- Configuration ---
|
|
# Load environment variables from .env file in the same directory
|
|
load_dotenv()
|
|
|
|
# Configuration with fallbacks
|
|
PORT = os.getenv('PORT', '8002')
|
|
BASE_URL = f"http://localhost:{PORT}"
|
|
API_USER = os.getenv('API_USER', 'jpeters')
|
|
API_PASSWORD = os.getenv('API_PASSWORD', 'WellJson')
|
|
|
|
# ANSI Colors for better readability
|
|
GREEN = '\033[92m'
|
|
RED = '\033[91m'
|
|
YELLOW = '\033[93m'
|
|
RESET = '\033[0m'
|
|
|
|
class TestWellDrySenseAPI(unittest.TestCase):
|
|
"""
|
|
Test suite for WellDrySense.
|
|
Ensures zero-impact on existing data by cleaning up created records.
|
|
"""
|
|
|
|
token = None
|
|
user_id = None
|
|
job_id_to_test = None
|
|
db_conn_params = {}
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
print(f"\n{GREEN}=== Setting up WellDrySense Test Suite on Port {PORT} ==={RESET}")
|
|
|
|
# Setup DB Params
|
|
cls.db_conn_params = {
|
|
'dbname': os.getenv('DB_NAME'),
|
|
'user': os.getenv('DB_USER'),
|
|
'password': os.getenv('DB_PASSWORD'),
|
|
'host': os.getenv('DB_HOST'),
|
|
'port': os.getenv('DB_PORT')
|
|
}
|
|
|
|
# Authenticate
|
|
print(f"-> Logging in as: {API_USER}...")
|
|
url = f"{BASE_URL}/api/well_api"
|
|
payload = {
|
|
"function": "credentials",
|
|
"user_name": API_USER,
|
|
"ps": API_PASSWORD,
|
|
"clientId": "test-suite",
|
|
"nonce": "test-nonce"
|
|
}
|
|
|
|
try:
|
|
response = requests.post(url, data=payload)
|
|
if response.status_code != 200:
|
|
print(f"{RED}FATAL: Login failed. Status: {response.status_code}{RESET}")
|
|
print(f"Response: {response.text}")
|
|
sys.exit(1)
|
|
|
|
data = response.json()
|
|
|
|
# Handle different response structures
|
|
if 'access_token' in data:
|
|
cls.token = data['access_token']
|
|
cls.user_id = data.get('user_id')
|
|
elif 'data' in data and 'access_token' in data['data']:
|
|
cls.token = data['data']['access_token']
|
|
cls.user_id = data['data'].get('user_id')
|
|
else:
|
|
print(f"{RED}FATAL: Token not found in response.{RESET}")
|
|
sys.exit(1)
|
|
|
|
print(f"{GREEN}-> Login successful. User ID: {cls.user_id}{RESET}")
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
print(f"{RED}FATAL: Could not connect to {BASE_URL}. Ensure well-api.py is running.{RESET}")
|
|
sys.exit(1)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
print(f"\n{GREEN}=== Tearing Down Test Suite ==={RESET}")
|
|
if cls.job_id_to_test:
|
|
print(f"-> Cleaning up Job ID: {cls.job_id_to_test}...")
|
|
conn = None
|
|
try:
|
|
conn = psycopg2.connect(**cls.db_conn_params)
|
|
with conn.cursor() as cur:
|
|
cur.execute("DELETE FROM public.jobs WHERE job_id = %s;", (cls.job_id_to_test,))
|
|
conn.commit()
|
|
print(f"{GREEN}-> Cleanup successful. Database restored.{RESET}")
|
|
except Exception as e:
|
|
print(f"{RED}CRITICAL: DB Cleanup failed. Manually delete job {cls.job_id_to_test}. Error: {e}{RESET}")
|
|
finally:
|
|
if conn: conn.close()
|
|
else:
|
|
print("-> No job created, skipping cleanup.")
|
|
|
|
def _post_api(self, form_data):
|
|
"""Helper to send authenticated POST requests"""
|
|
form_data['user_name'] = API_USER
|
|
form_data['token'] = self.token
|
|
|
|
try:
|
|
response = requests.post(f"{BASE_URL}/api/well_api", data=form_data)
|
|
self.assertEqual(response.status_code, 200, f"API HTTP Error {response.status_code}: {response.text}")
|
|
|
|
try:
|
|
json_resp = response.json()
|
|
if 'data' in json_resp and 'status' in json_resp:
|
|
return json_resp['data']
|
|
return json_resp
|
|
except json.JSONDecodeError:
|
|
self.fail(f"API returned invalid JSON: {response.text}")
|
|
except requests.exceptions.ConnectionError:
|
|
self.fail("Connection refused. API server is down.")
|
|
|
|
# --- TESTS ---
|
|
|
|
def test_01_create_job(self):
|
|
"""Test creating a new job"""
|
|
print("\n[Test] job_create")
|
|
payload = {
|
|
"function": "job_create",
|
|
"customer_name": "TEST_SUITE_CUSTOMER",
|
|
"address_street": "123 Python Way",
|
|
"address_city": "Codeville",
|
|
"address_state": "CA",
|
|
"address_country": "USA",
|
|
"lat": 34.05,
|
|
"lng": -118.25,
|
|
"key_person_name": "Test Runner",
|
|
"key_person_email": "test@wellnuo.com",
|
|
"devices": json.dumps([{"mac": "TEST_MAC_VIRTUAL", "location": "Lab"}]),
|
|
"alerts_config": json.dumps({"temp_high": 30})
|
|
}
|
|
|
|
data = self._post_api(payload)
|
|
self.assertEqual(data.get('ok'), 1, f"Job creation failed: {data.get('error')}")
|
|
self.assertIn('job_id', data)
|
|
self.__class__.job_id_to_test = data['job_id']
|
|
print(f"-> Job created with ID: {self.job_id_to_test}")
|
|
|
|
def test_02_job_list(self):
|
|
"""Test retrieving the job list"""
|
|
print("\n[Test] job_list")
|
|
payload = {"function": "job_list"}
|
|
data = self._post_api(payload)
|
|
|
|
self.assertEqual(data.get('ok'), 1, f"List failed: {data.get('error')}")
|
|
self.assertIn('jobs', data)
|
|
|
|
found = any(j.get('job_id') == self.job_id_to_test for j in data['jobs'])
|
|
self.assertTrue(found, f"Created Job ID {self.job_id_to_test} not found in job_list")
|
|
|
|
def test_03_job_details(self):
|
|
"""Test retrieving single job details"""
|
|
print("\n[Test] job_details")
|
|
if not self.job_id_to_test: self.skipTest("No job ID available")
|
|
|
|
payload = {"function": "job_details", "job_id": self.job_id_to_test}
|
|
data = self._post_api(payload)
|
|
|
|
self.assertEqual(data.get('ok'), 1, f"Details failed: {data.get('error')}")
|
|
self.assertEqual(data['details']['customer_name'], "TEST_SUITE_CUSTOMER")
|
|
|
|
# Verify JSON parsing of devices
|
|
devices = data['details'].get('devices')
|
|
if isinstance(devices, str): devices = json.loads(devices)
|
|
self.assertEqual(devices[0]['mac'], "TEST_MAC_VIRTUAL")
|
|
|
|
def test_04_job_edit(self):
|
|
"""Test updating a job (Stop the job)"""
|
|
print("\n[Test] job_edit")
|
|
if not self.job_id_to_test: self.skipTest("No job ID available")
|
|
|
|
payload = {
|
|
"function": "job_edit",
|
|
"job_id": self.job_id_to_test,
|
|
"customer_name": "UPDATED_CUSTOMER_NAME",
|
|
"job_status": "Stopped",
|
|
"date_to": "2025-12-31T23:59:59"
|
|
}
|
|
data = self._post_api(payload)
|
|
self.assertEqual(data.get('ok'), 1, f"Edit failed: {data.get('error')}")
|
|
|
|
# Verify
|
|
v_payload = {"function": "job_details", "job_id": self.job_id_to_test}
|
|
v_data = self._post_api(v_payload)
|
|
self.assertEqual(v_data['details']['customer_name'], "UPDATED_CUSTOMER_NAME")
|
|
self.assertEqual(v_data['details']['job_status'], "Stopped")
|
|
|
|
def test_05_available_devices(self):
|
|
"""Test fetching available devices"""
|
|
print("\n[Test] job_available_devices")
|
|
payload = {"function": "job_available_devices"}
|
|
data = self._post_api(payload)
|
|
self.assertEqual(data.get('ok'), 1, f"Available devices failed: {data.get('error')}")
|
|
self.assertIsInstance(data['devices'], list)
|
|
|
|
def test_06_job_weather(self):
|
|
"""Test fetching weather"""
|
|
print("\n[Test] job_weather")
|
|
if not self.job_id_to_test: self.skipTest("No job ID available")
|
|
|
|
#if not os.getenv('WEATHER_API_KEY'): print(f"{YELLOW}-> Warning: WEATHER_API_KEY not found in .env{RESET}")
|
|
|
|
payload = {"function": "job_weather", "job_id": self.job_id_to_test}
|
|
data = self._post_api(payload)
|
|
|
|
if data.get('ok') == 0:
|
|
print(f"-> Weather API returned error (Expected if key invalid): {data.get('error')}")
|
|
else:
|
|
self.assertIn('weather', data)
|
|
print(f"-> Weather received: {data['weather']}")
|
|
|
|
def test_07_job_sensor_data(self):
|
|
"""Test fetching bucketed sensor data for a job"""
|
|
print("\n[Test] get_job_sensor_bucketed_data")
|
|
if not self.job_id_to_test: self.skipTest("No job ID available")
|
|
|
|
# Use a date range likely to cover "now" or recent past for testing
|
|
today = datetime.datetime.now().strftime("%Y-%m-%d")
|
|
|
|
payload = {
|
|
"function": "get_job_sensor_bucketed_data",
|
|
"job_id": self.job_id_to_test,
|
|
"sensor": "temperature",
|
|
"date": today,
|
|
"bucket_size": "1h"
|
|
}
|
|
|
|
data = self._post_api(payload)
|
|
|
|
self.assertEqual(data.get('ok'), 1, f"Sensor data fetch failed: {data.get('error')}")
|
|
self.assertIn('chart_data', data)
|
|
self.assertIn('units', data)
|
|
self.assertIn('time_zone', data)
|
|
|
|
# Since we created the job with a virtual MAC in test_01,
|
|
# we expect chart_data to contain an entry for that device (even if data list is empty)
|
|
# Note: The virtual MAC likely won't exist in the 'devices' table unless pre-seeded,
|
|
# so the API might skip it or return empty. We check structure primarily.
|
|
if data['chart_data']:
|
|
print(f"-> Retrieved data for {len(data['chart_data'])} locations")
|
|
first_loc = data['chart_data'][0]
|
|
self.assertIn('name', first_loc)
|
|
self.assertIn('data', first_loc)
|
|
else:
|
|
print("-> No device data found (Expected if test MAC is not in DB)")
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main() |