347 lines
17 KiB
Python
347 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
import os
|
|
import sys
|
|
import logging
|
|
import json
|
|
import time
|
|
import telnyx
|
|
from sendgrid import SendGridAPIClient
|
|
from sendgrid.helpers.mail import Mail, Email, To, Content
|
|
from kombu import Connection, Exchange, Queue # Remove Consumer import here
|
|
# Import Consumer where it's used or rely on connection.Consumer
|
|
from kombu.exceptions import KombuError # Import specific exception for connection errors
|
|
from dotenv import load_dotenv
|
|
|
|
# --- Configuration Loading ---
|
|
# Load environment variables from from $HOME/.env and ./.env files
|
|
dotenv_path = os.path.join(os.environ.get("HOME", "."), '.env')
|
|
load_dotenv(dotenv_path=dotenv_path, override=True) # Allow overriding if $HOME/.env exists
|
|
dotenv_path = os.path.join(os.getcwd(), '.env')
|
|
load_dotenv(dotenv_path=dotenv_path)
|
|
|
|
# --- Configuration ---
|
|
TELNYX_API_KEY = os.environ.get("TELNYX_API_KEY")
|
|
TELNYX_SENDER_ID = os.environ.get("TELNYX_SENDER_ID", "+16505820706")
|
|
TELNYX_CONNECTION_ID = os.environ.get("TELNYX_CONNECTION_ID") # For Voice/TTS Calls
|
|
messaging_profile_id = os.environ.get("TELNYX_MESSAGING_PROFILE_ID") # Optional but good practice
|
|
|
|
# RabbitMQ Configuration using Kombu URL format
|
|
# Default to standard local guest user if only hostname 'localhost' is provided
|
|
raw_rabbitmq_url = os.environ.get("RABBITMQ_URL", "localhost")
|
|
if raw_rabbitmq_url == "localhost":
|
|
RABBITMQ_URL = "amqp://guest:guest@localhost:5672//"
|
|
# If your RabbitMQ needs different user/pass/vhost, provide the full URL
|
|
# Example: "amqp://user:password@host:port/vhost"
|
|
else:
|
|
# If the URL is not localhost, assume it's a full URL
|
|
RABBITMQ_URL = raw_rabbitmq_url
|
|
RABBITMQ_ALERTS_QNAME = os.environ.get("RABBITMQ_ALERTS_QNAME", "alerts")
|
|
|
|
# Define the exchange (default direct exchange)
|
|
exchange = Exchange("", type='direct')
|
|
|
|
# Define the queue, binding it to the default exchange using its name as routing key
|
|
# Make sure queue is durable to survive broker restarts
|
|
alert_queue = Queue(RABBITMQ_ALERTS_QNAME, exchange=exchange, routing_key=RABBITMQ_ALERTS_QNAME, durable=True)
|
|
|
|
# SendGrid configuration
|
|
SENDGRID_API_KEY = os.environ.get("SENDGRID_API_KEY")
|
|
SENDGRID_FROM_EMAIL = os.environ.get("SENDGRID_FROM_EMAIL") # Verified sender email
|
|
|
|
# --- Setup Logging ---
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
|
|
stream=sys.stdout,
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# --- Log Initial Configuration ---
|
|
logger.info("=============================================")
|
|
logger.info(" Initial Service Configuration ")
|
|
logger.info("---------------------------------------------")
|
|
logger.info(f"RABBITMQ_URL = {RABBITMQ_URL}")
|
|
logger.info(f"RABBITMQ_ALERTS_QNAME = {RABBITMQ_ALERTS_QNAME}")
|
|
logger.info(f"TELNYX_SENDER_ID = {TELNYX_SENDER_ID}")
|
|
logger.info(f"TELNYX_CONNECTION_ID (TTS) = {TELNYX_CONNECTION_ID if TELNYX_CONNECTION_ID else 'Not Set'}")
|
|
logger.info(f"SENDGRID_FROM_EMAIL (Email)= {SENDGRID_FROM_EMAIL if SENDGRID_FROM_EMAIL else 'Not Set'}")
|
|
logger.info(f"TELNYX_API_KEY = {'Set' if TELNYX_API_KEY else 'Not Set'}")
|
|
logger.info(f"SENDGRID_API_KEY = {'Set' if SENDGRID_API_KEY else 'Not Set'}")
|
|
logger.info("=============================================")
|
|
|
|
|
|
# --- Helper Functions ---
|
|
|
|
def check_env_vars():
|
|
"""Checks if required environment variables are set based on needs."""
|
|
required_vars = ["TELNYX_API_KEY", "TELNYX_SENDER_ID", "RABBITMQ_URL", "RABBITMQ_ALERTS_QNAME"]
|
|
missing_vars = [var for var in required_vars if not os.environ.get(var)]
|
|
|
|
# Check conditional variables needed for specific methods
|
|
# Assume we might get *any* type, so check all potential optional vars
|
|
if not TELNYX_CONNECTION_ID:
|
|
logger.warning("Optional environment variable TELNYX_CONNECTION_ID is missing (required for phone/TTS)")
|
|
if not SENDGRID_API_KEY:
|
|
logger.warning("Optional environment variable SENDGRID_API_KEY is missing (required for email)")
|
|
if not SENDGRID_FROM_EMAIL:
|
|
logger.warning("Optional environment variable SENDGRID_FROM_EMAIL is missing (required for email)")
|
|
|
|
if any(v in missing_vars for v in ["TELNYX_API_KEY", "TELNYX_SENDER_ID", "RABBITMQ_URL", "RABBITMQ_ALERTS_QNAME"]):
|
|
logger.error(f"Missing critical environment variables: {', '.join(missing_vars)}")
|
|
sys.exit(1)
|
|
else:
|
|
logger.info("Core environment variables appear to be set.")
|
|
|
|
|
|
def setup_telnyx():
|
|
"""Configures the Telnyx client."""
|
|
if not TELNYX_API_KEY:
|
|
logger.error("Cannot configure Telnyx client: TELNYX_API_KEY not set.")
|
|
return False
|
|
telnyx.api_key = TELNYX_API_KEY
|
|
logger.info("Telnyx client configured.")
|
|
return True
|
|
|
|
# --- Sending Functions (Keep these identical) ---
|
|
|
|
def send_telnyx_sms(recipient: str, message_body: str) -> bool:
|
|
"""Sends an SMS using the Telnyx API."""
|
|
if not recipient:
|
|
logger.error("Cannot send SMS: Recipient phone number is missing.")
|
|
return False
|
|
if not TELNYX_SENDER_ID:
|
|
logger.error("Cannot send SMS: TELNYX_SENDER_ID is not set.")
|
|
return False
|
|
|
|
logger.info(f"Attempting to send SMS from '{TELNYX_SENDER_ID}' to '{recipient}'")
|
|
try:
|
|
message_create_params = {
|
|
"from_": TELNYX_SENDER_ID,
|
|
"to": recipient,
|
|
"text": message_body,
|
|
"messaging_profile_id": messaging_profile_id, # Optional but good practice
|
|
"type": "sms" # Explicitly set type to SMS
|
|
}
|
|
response = telnyx.Message.create(**message_create_params)
|
|
logger.info(f"SMS submitted successfully to Telnyx. Message ID: {response.id}")
|
|
return True
|
|
except telnyx.error.TelnyxError as e:
|
|
logger.error(f"Telnyx API Error sending SMS to {recipient}: {e}")
|
|
if hasattr(e, 'http_body') and e.http_body:
|
|
logger.error(f"Telnyx Error Details: {e.http_body}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error sending SMS to {recipient}: {e}")
|
|
return False
|
|
|
|
def send_email(recipient_email: str, subject: str, body: str) -> bool:
|
|
"""Sends an email using SendGrid."""
|
|
if not recipient_email:
|
|
logger.error("Cannot send email: Recipient email address is missing.")
|
|
return False
|
|
if not SENDGRID_API_KEY or not SENDGRID_FROM_EMAIL:
|
|
logger.error("Cannot send email: SENDGRID_API_KEY or SENDGRID_FROM_EMAIL not configured.")
|
|
return False
|
|
|
|
logger.info(f"Attempting to send email via SendGrid from '{SENDGRID_FROM_EMAIL}' to '{recipient_email}'")
|
|
message = Mail(
|
|
from_email=Email(SENDGRID_FROM_EMAIL), # Use Email object
|
|
to_emails=To(recipient_email), # Use To object
|
|
subject=subject if subject else 'Alert Notification', # Default subject
|
|
plain_text_content=Content("text/plain", body) # Use Content object
|
|
)
|
|
try:
|
|
sg = SendGridAPIClient(SENDGRID_API_KEY)
|
|
response = sg.send(message)
|
|
# SendGrid returns 2xx status codes on success
|
|
if 200 <= response.status_code < 300:
|
|
logger.info(f"Email submitted successfully to SendGrid for {recipient_email}. Status: {response.status_code}")
|
|
return True
|
|
else:
|
|
logger.error(f"SendGrid API Error sending email to {recipient_email}. Status: {response.status_code}, Body: {response.body}")
|
|
return False
|
|
except Exception as e:
|
|
# Log the full exception details from SendGrid/http client if possible
|
|
logger.exception(f"Unexpected error sending email via SendGrid to {recipient_email}: {e}")
|
|
return False
|
|
|
|
def make_telnyx_tts_call(recipient: str, tts_body: str) -> bool:
|
|
"""Initiates a Telnyx Call Control V2 call to speak TTS."""
|
|
if not recipient:
|
|
logger.error("Cannot initiate TTS call: Recipient phone number is missing.")
|
|
return False
|
|
if not TELNYX_SENDER_ID:
|
|
logger.error("Cannot initiate TTS call: TELNYX_SENDER_ID is not set.")
|
|
return False
|
|
if not TELNYX_CONNECTION_ID:
|
|
logger.error("Cannot initiate TTS call: TELNYX_CONNECTION_ID (Call Control Application) is not set.")
|
|
return False
|
|
|
|
logger.info(f"Attempting to initiate TTS call from '{TELNYX_SENDER_ID}' to '{recipient}' via Connection ID '{TELNYX_CONNECTION_ID}'")
|
|
try:
|
|
call_request = {
|
|
"to": recipient,
|
|
"from_": TELNYX_SENDER_ID,
|
|
"connection_id": TELNYX_CONNECTION_ID,
|
|
"custom_headers": [
|
|
{"name": "X-TTS-Payload", "value": tts_body}
|
|
],
|
|
}
|
|
|
|
response = telnyx.Call.create(**call_request)
|
|
|
|
logger.info(f"Telnyx call initiation request successful for {recipient}. Call Control ID: {response.call_control_id}, Call Leg ID: {response.call_leg_id}")
|
|
logger.warning(f"Reminder: A Call Control Application webhook MUST handle call events for Connection ID '{TELNYX_CONNECTION_ID}' and issue the 'speak_text' command using the provided body: '{tts_body[:50]}...'")
|
|
return True
|
|
except telnyx.error.TelnyxError as e:
|
|
logger.error(f"Telnyx API Error initiating call to {recipient}: {e}")
|
|
if hasattr(e, 'http_body') and e.http_body:
|
|
logger.error(f"Telnyx Error Details: {e.http_body}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error initiating call to {recipient}: {e}")
|
|
return False
|
|
|
|
# --- Kombu Callback ---
|
|
|
|
def handle_message(body, message):
|
|
"""Callback function for Kombu to process messages."""
|
|
delivery_tag = message.delivery_tag
|
|
logger.info(f"Received message via Kombu (Delivery Tag: {delivery_tag})")
|
|
try:
|
|
# Decode JSON payload
|
|
if isinstance(body, bytes):
|
|
message_data = json.loads(body.decode('utf-8'))
|
|
elif isinstance(body, str):
|
|
message_data = json.loads(body)
|
|
elif isinstance(body, dict):
|
|
message_data = body # Already decoded
|
|
else:
|
|
raise TypeError(f"Unexpected message body type: {type(body)}")
|
|
|
|
logger.debug(f"Decoded message data: {message_data}")
|
|
|
|
send_method = message_data.get('send_method')
|
|
destination = message_data.get('destination')
|
|
message_body = message_data.get('body')
|
|
subject = message_data.get('subject', '') # Subject primarily for email
|
|
|
|
if not all([send_method, destination, message_body]):
|
|
logger.error(f"Invalid message format: Missing 'send_method', 'destination', or 'body'. Message: {body}")
|
|
message.reject(requeue=False) # Reject and discard malformed message
|
|
logger.warning(f"Message rejected (tag: {delivery_tag}) due to invalid format.")
|
|
return
|
|
|
|
logger.info(f"Processing alert - Method: {send_method}, Destination: {destination[:15]}...")
|
|
|
|
success = False
|
|
if send_method == "sms":
|
|
success = send_telnyx_sms(recipient=destination, message_body=message_body)
|
|
elif send_method == "email":
|
|
# Check if email vars are present before calling
|
|
if not SENDGRID_API_KEY or not SENDGRID_FROM_EMAIL:
|
|
logger.error(f"Cannot send email for tag {delivery_tag}: SendGrid config missing.")
|
|
success = False # Mark as failure for this specific task
|
|
else:
|
|
success = send_email(recipient_email=destination, subject=subject, body=message_body)
|
|
elif send_method == "phone":
|
|
# Check if TTS var is present before calling
|
|
if not TELNYX_CONNECTION_ID:
|
|
logger.error(f"Cannot make TTS call for tag {delivery_tag}: TELNYX_CONNECTION_ID missing.")
|
|
success = False # Mark as failure
|
|
else:
|
|
success = make_telnyx_tts_call(recipient=destination, tts_body=message_body)
|
|
else:
|
|
logger.error(f"Unsupported send_method: '{send_method}'. Discarding message (tag: {delivery_tag}).")
|
|
message.reject(requeue=False) # Reject and discard
|
|
return
|
|
|
|
# Acknowledge or Requeue/Reject based on success
|
|
if success:
|
|
logger.info(f"Successfully submitted '{send_method}' task for tag {delivery_tag}. Acknowledging.")
|
|
message.ack()
|
|
else:
|
|
logger.error(f"Failed to submit '{send_method}' task for tag {delivery_tag}. Rejecting message (requeue=False).")
|
|
message.reject(requeue=False)
|
|
|
|
except (json.JSONDecodeError, TypeError) as decode_err:
|
|
logger.error(f"Failed to decode or process message body (Tag: {delivery_tag}): {decode_err}. Body sample: {str(body)[:100]}. Rejecting (requeue=False).")
|
|
message.reject(requeue=False) # Reject malformed/unexpected message type
|
|
except Exception as e:
|
|
# Catch-all for unexpected errors during processing
|
|
logger.exception(f"Unexpected error processing message (Tag: {delivery_tag}): {e}. Rejecting message (requeue=False).")
|
|
try:
|
|
message.reject(requeue=False) # Try to reject even after an exception
|
|
except Exception as reject_e:
|
|
logger.error(f"Further error trying to reject message (Tag: {delivery_tag}): {reject_e}")
|
|
|
|
|
|
# --- Main Service Logic using Kombu (Corrected) ---
|
|
|
|
def main():
|
|
"""Main function to start the service using Kombu."""
|
|
logger.info("Starting Alert Service using Kombu...")
|
|
check_env_vars()
|
|
if not setup_telnyx():
|
|
sys.exit("Failed to configure Telnyx client. Exiting.")
|
|
|
|
logger.info(f"Attempting to connect to RabbitMQ via Kombu at {RABBITMQ_URL}")
|
|
|
|
# Connection loop
|
|
while True:
|
|
try:
|
|
# Establish connection using a context manager
|
|
with Connection(RABBITMQ_URL, connect_timeout=10, heartbeat=60) as connection:
|
|
logger.info("Kombu connection established.")
|
|
|
|
# *** FIX: Create the Consumer *using* the connection object ***
|
|
# This ensures the consumer is properly bound to the connection/channel.
|
|
consumer = connection.Consumer(
|
|
queues=[alert_queue], # List of queues to consume from
|
|
callbacks=[handle_message], # List of callbacks
|
|
accept=['json', 'text/plain'],# Define acceptable content-types
|
|
prefetch_count=1 # Process one message at a time
|
|
)
|
|
|
|
# Start consuming messages. This declares queues, sets QoS etc.
|
|
consumer.consume()
|
|
logger.info(f"Kombu consumer started. Waiting for messages on '{RABBITMQ_ALERTS_QNAME}'. To exit press CTRL+C")
|
|
|
|
# Inner loop to process messages using drain_events
|
|
while True:
|
|
try:
|
|
# Process messages indefinitely, checking for events every second
|
|
connection.drain_events(timeout=1)
|
|
except socket.timeout:
|
|
# No events within the timeout, connection likely still fine. Continue loop.
|
|
# You could add a check here connection.connected if needed.
|
|
pass
|
|
except KeyboardInterrupt:
|
|
logger.info("KeyboardInterrupt received during drain_events. Shutting down...")
|
|
consumer.cancel() # Stop consuming
|
|
logger.info("Kombu consumer cancelled.")
|
|
# Exit the inner loop to allow the 'with Connection' block to close
|
|
raise # Re-raise KeyboardInterrupt to exit outer loop/program
|
|
except (KombuError, ConnectionResetError, ConnectionAbortedError) as e: # Catch specific connection/channel errors
|
|
logger.error(f"Kombu connection/channel error during drain_events: {e}. Breaking inner loop to reconnect...")
|
|
consumer.cancel() # Try to cancel consumer cleanly
|
|
break # Exit inner loop to force reconnection via outer loop
|
|
except Exception as e: # Catch other unexpected errors in consumer loop
|
|
logger.exception(f"Unexpected error in Kombu drain_events loop: {e}. Breaking inner loop to reconnect...")
|
|
consumer.cancel() # Try to cancel consumer cleanly
|
|
break # Exit inner loop
|
|
|
|
# Handle outer loop exceptions (connection failures, final KeyboardInterrupt)
|
|
except KeyboardInterrupt:
|
|
logger.info("KeyboardInterrupt received during connection attempt or after exit from inner loop. Exiting.")
|
|
break # Exit the main while loop
|
|
except (ConnectionRefusedError, KombuError, Exception) as conn_err:
|
|
logger.error(f"Failed to connect or catastrophic failure ({type(conn_err).__name__}): {conn_err}. Retrying in 10 seconds...")
|
|
# Consumer is already cancelled or wasn't created if connection failed initially
|
|
time.sleep(10)
|
|
|
|
logger.info("Alert Service stopped.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |