#!/usr/bin/env python3 import os import json import requests import re import base64 from flask import Flask, request, Response, jsonify import logging import logging.handlers import sys import argparse from dotenv import load_dotenv # --- Configuration Loading --- def load_env_file(filepath): """Load environment variables from file with shell-style export syntax support""" if not os.path.exists(filepath): return False print(f"INFO: Found environment file: {filepath}") # Check if it's a standard .env file first (let dotenv handle it) if filepath.endswith('.env'): print(f"INFO: Loading environment variables from .env file") load_dotenv(dotenv_path=filepath) return True # Handle shell script style env file with 'export KEY=VALUE' format try: with open(filepath, 'r') as f: for line in f: line = line.strip() # Skip comments and empty lines if not line or line.startswith('#'): continue # Parse export KEY=VALUE format export_match = re.match(r'^export\s+([A-Za-z0-9_]+)=(?:"(.+?)"|\'(.+?)\'|(.*))$', line) if export_match: key = export_match.group(1) # Get the first non-None value from the captured groups value = next((g for g in export_match.groups()[1:] if g is not None), '') # Only set if not already set in environment if key not in os.environ: os.environ[key] = value print(f"INFO: Loaded {key}={value[:3]}{'*'*(len(value)-6)}{value[-3:] if len(value) > 6 else ''}") return True except Exception as e: print(f"ERROR: Failed to load environment file {filepath}: {e}") return False # Try to load from .env or env file in the current directory env_loaded = False dotenv_path = os.path.join(os.getcwd(), '.env') alt_env_path = os.path.join(os.getcwd(), 'env') if os.path.exists(dotenv_path): env_loaded = load_env_file(dotenv_path) if not env_loaded and os.path.exists(alt_env_path): env_loaded = load_env_file(alt_env_path) if not env_loaded: print("INFO: No .env or env file found, using defaults or provided arguments.") # --- Global Configuration --- LOG_LEVEL_DEFAULT = os.environ.get("LOG_LEVEL", "INFO").upper() LOG_FILE_NAME_DEFAULT = os.environ.get("LOG_FILE_PATH", "webhook.log") OPT_IN_KEYWORD_DEFAULT = os.environ.get("OPT_IN_KEYWORD", "WELLNUOJOIN").upper() DEFAULT_TTS_VOICE_DEFAULT = os.environ.get("TELNYX_DEFAULT_TTS_VOICE", "female") DEFAULT_TTS_LANGUAGE_DEFAULT = os.environ.get("TELNYX_DEFAULT_TTS_LANGUAGE", "en-US") CLIENT_STATE_PREFIX_DEFAULT = os.environ.get("TELNYX_CLIENT_STATE_PREFIX", "app_state") PLAY_AUDIO_ON_ANSWER_DEFAULT = os.environ.get("PLAY_AUDIO_ON_ANSWER", "false").lower() == "true" TELNYX_API_KEY_DEFAULT = os.environ.get("TELNYX_API_KEY", "") TELNYX_API_BASE_URL = os.environ.get("TELNYX_API_BASE_URL", "https://api.telnyx.com/v2") # Print key configuration for debugging at startup print(f"INFO: TELNYX_API_KEY is {'set' if TELNYX_API_KEY_DEFAULT else 'NOT SET'}") if TELNYX_API_KEY_DEFAULT: masked_key = TELNYX_API_KEY_DEFAULT[:8] + '*'*(len(TELNYX_API_KEY_DEFAULT)-16) + TELNYX_API_KEY_DEFAULT[-8:] if len(TELNYX_API_KEY_DEFAULT) > 16 else "***" print(f"INFO: TELNYX_API_KEY value: {masked_key}") # --- Application Setup --- app = Flask(__name__) # Initialize app_logger; it will be configured in setup_logging app_logger = logging.getLogger("TelnyxWebhookApp") # --- Helper Functions --- def setup_logging(log_level_str, log_file_path, app_name="TelnyxWebhookApp"): global app_logger # Ensure we are configuring the global app_logger app_logger = logging.getLogger(app_name) numeric_level = getattr(logging, log_level_str.upper(), logging.INFO) if not isinstance(numeric_level, int): # Should not happen with getattr default print(f"ERROR: Invalid log level string: {log_level_str}. Defaulting to INFO.") numeric_level = logging.INFO app_logger.setLevel(numeric_level) app_logger.propagate = False # Important to prevent duplicate logs if root is also configured formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - [%(funcName)s:%(lineno)d] - %(message)s') # Clear existing handlers from app_logger to prevent duplication on script re-run/re-import if app_logger.hasHandlers(): app_logger.handlers.clear() # Console Handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(formatter) app_logger.addHandler(console_handler) # File Handler try: file_handler = logging.handlers.RotatingFileHandler( log_file_path, maxBytes=10*1024*1024, backupCount=5, encoding='utf-8' ) file_handler.setFormatter(formatter) app_logger.addHandler(file_handler) # Use app_logger for this initial message AFTER handlers are attached app_logger.info(f"Application logging configured. Level: {log_level_str}. File: {os.path.abspath(log_file_path)}") except Exception as e: # Fallback to print if logger itself fails during setup for file handler print(f"ERROR: Failed to configure file logger at {log_file_path}: {e}") if app_logger.handlers: # If console handler was added, use it app_logger.error(f"Failed to configure file logger at {log_file_path}: {e}") def find_custom_header(headers, name): if not headers: return None for header in headers: if header.get('name', '').lower() == name.lower(): return header.get('value') return None def create_client_state(base_event, call_control_id, prefix): """Create a base64 encoded client state string as required by Telnyx API""" # Create the plain text client state string plain_state = f"{prefix}_{base_event}_{call_control_id[:8]}" if call_control_id else f"{prefix}_{base_event}_unknownccid" # Encode to base64 as required by Telnyx API # First encode to bytes, then encode to base64, then decode to ASCII string base64_state = base64.b64encode(plain_state.encode('utf-8')).decode('ascii') app_logger.debug(f"Client state created: '{plain_state}' -> base64: '{base64_state}'") return base64_state def send_telnyx_command(command_type, params, api_key): """ Send command to Telnyx Call Control API Args: command_type (str): Type of command (speak, play, hangup, etc) params (dict): Command parameters api_key (str): Telnyx API key Returns: dict: API response as JSON or None on error """ if not api_key: app_logger.error(f"Cannot send {command_type} command: TELNYX_API_KEY not set") return None endpoint = f"{TELNYX_API_BASE_URL}/calls/{params.get('call_control_id')}/{command_type}" # Remove call_control_id from params as it's in the URL api_params = params.copy() if 'call_control_id' in api_params: del api_params['call_control_id'] # Ensure client_state is present and base64 encoded (handled by create_client_state function) if 'client_state' in api_params and not api_params['client_state']: del api_params['client_state'] # Remove if empty headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } app_logger.debug(f"Sending {command_type} command to Telnyx API: {json.dumps(api_params)}") try: response = requests.post( endpoint, json=api_params, headers=headers ) if response.status_code >= 200 and response.status_code < 300: result = response.json() app_logger.info(f"Telnyx accepted {command_type} command. Response: {response.status_code}") app_logger.debug(f"Full API response: {json.dumps(result)}") return result else: app_logger.error(f"Telnyx rejected {command_type} command. Status: {response.status_code}") app_logger.error(f"Response body: {response.text}") return None except Exception as e: app_logger.exception(f"Error sending {command_type} command to Telnyx API") return None # --- Webhook Route Handler --- @app.route('/', methods=['POST']) def handle_telnyx_webhook(webhook_path_received): # Access app_args via app.config for a cleaner way if set during app init, # or pass it via other means. For simplicity with __main__, global is used. # Make sure app_args is accessible here if not using global. # For now, assuming app_args is available as configured in __main__ if f"/{webhook_path_received}" != app_args.webhook_path: # app_args is from main app_logger.warning(f"Request to unknown path '/{webhook_path_received}'. Configured for '{app_args.webhook_path}'") return "Path not found", 404 remote_addr = request.remote_addr request_id = request.headers.get("X-Request-Id") or request.headers.get("Telnyx-Request-Id") or "N/A" # EXPLICITLY use app_logger for all subsequent logging app_logger.info(f"Webhook request received. Path: '/{webhook_path_received}', From: {remote_addr}, X-Telnyx-Request-ID: {request_id}") try: if not request.is_json: raw_data_preview = request.get_data(as_text=True)[:250] app_logger.warning(f"Non-JSON request. Content-Type: {request.content_type}. Data Preview: '{raw_data_preview}...'") return "Invalid Content-Type: Expected application/json", 415 webhook_data = request.get_json() app_logger.debug(f"Full incoming webhook payload: {json.dumps(webhook_data, indent=2)}") data = webhook_data.get('data', {}) event_type = data.get('event_type') record_type = data.get('record_type') payload = data.get('payload', {}) call_control_id = payload.get('call_control_id') call_session_id = payload.get('call_session_id') message_id = payload.get('id') if record_type == 'message' else None client_state_on_event = payload.get("client_state") log_prefix = f"Event: '{event_type}' ({record_type})" if call_control_id: log_prefix += f", CCID: {call_control_id}" if call_session_id: log_prefix += f", CSID: {call_session_id}" if message_id: log_prefix += f", MsgID: {message_id}" if client_state_on_event: log_prefix += f", ClientStateRcvd: {client_state_on_event}" app_logger.info(log_prefix) # Main event identifier log command_response = None # --- SMS Event Handling --- if record_type == 'message': direction = payload.get('direction') messaging_profile_id = payload.get("messaging_profile_id") from_details = payload.get('from', {}) from_number = from_details.get('phone_number') to_list = payload.get('to', []) to_number = to_list[0].get('phone_number') if to_list else 'N/A' text = payload.get('text', '') num_media = len(payload.get('media', [])) tags = payload.get('tags', []) cost = payload.get('cost') # Might be None # Log detailed SMS payload information sms_details = ( f" SMS Details: Direction='{direction}', From='{from_number}', To='{to_number}', " f"Profile='{messaging_profile_id}', MediaCount={num_media}, Tags={tags}" ) if cost: sms_details += f", Cost={cost.get('amount')} {cost.get('currency')}" app_logger.info(sms_details) if text: app_logger.info(f" Text: '{text.strip()}'") if event_type == 'message.received' and direction == 'inbound': app_logger.info(f" -> Inbound SMS received.") # More specific than just "SMS RECEIVED" if text.strip().upper() == app_args.opt_in_keyword: app_logger.info(f" -> Opt-in keyword '{app_args.opt_in_keyword}' detected from {from_number}.") elif event_type == 'message.sent': status = payload.get('status') errors = payload.get('errors', []) app_logger.info(f" -> SMS Sent (DLR): Status='{status}', ErrorsCount={len(errors)}") if errors: app_logger.warning(f" -> DLR Errors: {json.dumps(errors)}") else: app_logger.info(f" -> Other SMS Event: Type='{event_type}'") return Response(status=204) # --- Voice Event Handling --- elif record_type == 'event': client_s_prefix = app_args.client_state_prefix from_number_call = payload.get('from') # 'from' in call events is usually the caller to_number_call = payload.get('to') # 'to' in call events is usually the callee if event_type == 'call.initiated': state = payload.get('state') direction_call = payload.get('direction') # 'incoming' or 'outgoing' app_logger.info(f" CALL INITIATED: State='{state}', Direction='{direction_call}', From='{from_number_call}', To='{to_number_call}'") elif event_type == 'call.answered': custom_headers = payload.get('custom_headers', []) app_logger.info(f" CALL ANSWERED: From='{from_number_call}', To='{to_number_call}'") audio_url = find_custom_header(custom_headers, 'X-Audio-Url') tts_payload = find_custom_header(custom_headers, 'X-TTS-Payload') client_s = create_client_state("answered", call_control_id, client_s_prefix) if app_args.enable_audio_playback and audio_url: app_logger.info(f" -> Audio URL '{audio_url}' found. Sending playback_start command.") play_params = { "call_control_id": call_control_id, "client_state": client_s, "audio_url": audio_url } send_telnyx_command("actions/playback_start", play_params, app_args.api_key) elif tts_payload: app_logger.info(f" -> TTS payload '{tts_payload}' found. Sending speak command.") speak_params = { "payload": tts_payload, "voice": app_args.default_tts_voice, "language": app_args.default_tts_language, "call_control_id": call_control_id, "client_state": client_s } send_telnyx_command("actions/speak", speak_params, app_args.api_key) else: app_logger.warning(" -> Call answered, but no 'X-Audio-Url' or 'X-TTS-Payload' found/enabled. Issuing hangup.") hangup_params = { "call_control_id": call_control_id, "client_state": create_client_state("nohdr_hup", call_control_id, client_s_prefix) } send_telnyx_command("actions/hangup", hangup_params, app_args.api_key) elif event_type == 'call.speak.started': app_logger.info(f" CALL SPEAK STARTED") # CCID already logged in prefix elif event_type == 'call.playback.started': media_url = payload.get('media_url', 'N/A') app_logger.info(f" CALL PLAYBACK STARTED: MediaURL='{media_url}'") elif event_type in ['call.speak.ended', 'call.playback.ended']: status = payload.get('status') ended_event_type = event_type.split('.')[-2] app_logger.info(f" CALL {ended_event_type.upper()} ENDED: Status='{status}'. Issuing hangup.") hangup_params = { "call_control_id": call_control_id, "client_state": create_client_state(f"{ended_event_type}_hup", call_control_id, client_s_prefix) } send_telnyx_command("actions/hangup", hangup_params, app_args.api_key) elif event_type == 'call.dtmf.received': digit = payload.get('digit') dtmf_from = payload.get('from') dtmf_to = payload.get('to') app_logger.info(f" DTMF RECEIVED: Digit='{digit}', FromLeg='{dtmf_from}', ToLeg='{dtmf_to}'") elif event_type == 'call.hangup': cause = payload.get('cause') sip_code = payload.get('sip_hangup_cause') hangup_source = payload.get('hangup_source') # 'local' or 'remote' app_logger.info(f" CALL HANGUP: Cause='{cause}', SIPCause='{sip_code}', Source='{hangup_source}'") else: app_logger.warning(f" UNHANDLED VOICE EVENT: Type='{event_type}'. Specific payload details:") app_logger.info(f" {json.dumps(payload, indent=4)}") else: app_logger.warning(f"UNKNOWN RECORD TYPE: '{record_type}'. Full webhook data:") app_logger.info(f" {json.dumps(webhook_data, indent=4)}") # Always return 204 No Content to acknowledge webhook receipt return Response(status=204) except json.JSONDecodeError as e: app_logger.error(f"Failed to decode JSON from request body: {e}") return "Invalid JSON payload", 400 except Exception as e: app_logger.exception("Unhandled error processing webhook") return "Internal Server Error", 500 @app.route('/', methods=['GET', 'POST']) def handle_root(): app_logger.info(f"Request to root path '/' from {request.remote_addr}. Method: {request.method}") return "Telnyx Webhook Listener (v2) is active.", 200 if __name__ == '__main__': parser = argparse.ArgumentParser(description="Telnyx Webhook Listener v2 with detailed logging.", formatter_class=argparse.RawTextHelpFormatter) parser.add_argument('--host', type=str, default='0.0.0.0', help='Host address (default: 0.0.0.0).') parser.add_argument('--port', type=int, default=1998, help='Port (default: 1998).') parser.add_argument('--webhook-path', type=str, default='/telnyx-webhook', help="URL path for webhooks (default: /telnyx-webhook). Must start with '/'.") parser.add_argument('--log-level', type=str, default=LOG_LEVEL_DEFAULT, choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], help=f'Logging level (default: {LOG_LEVEL_DEFAULT}).') parser.add_argument('--log-file', type=str, default=LOG_FILE_NAME_DEFAULT, help=f'Path to log file (default: {LOG_FILE_NAME_DEFAULT}).') parser.add_argument('--debug', action='store_true', help='Run Flask in debug mode (sets log to DEBUG, reloader). NOT FOR PRODUCTION.') parser.add_argument('--opt-in-keyword', type=str, default=OPT_IN_KEYWORD_DEFAULT, help=f'SMS opt-in keyword (default: {OPT_IN_KEYWORD_DEFAULT}).') parser.add_argument('--default-tts-voice', type=str, default=DEFAULT_TTS_VOICE_DEFAULT, help=f'Default TTS voice (default: {DEFAULT_TTS_VOICE_DEFAULT}).') parser.add_argument('--default-tts-language', type=str, default=DEFAULT_TTS_LANGUAGE_DEFAULT, help=f'Default TTS language (default: {DEFAULT_TTS_LANGUAGE_DEFAULT}).') parser.add_argument('--client-state-prefix', type=str, default=CLIENT_STATE_PREFIX_DEFAULT, help=f'Prefix for client_state (default: {CLIENT_STATE_PREFIX_DEFAULT}).') parser.add_argument('--enable-audio-playback', action='store_true', default=PLAY_AUDIO_ON_ANSWER_DEFAULT, help=f'Prioritize X-Audio-Url (default: {PLAY_AUDIO_ON_ANSWER_DEFAULT}).') parser.add_argument('--api-key', type=str, default=TELNYX_API_KEY_DEFAULT, help='Telnyx API Key (default: from environment)') app_args = parser.parse_args() # Define app_args globally for access in route handler if not app_args.webhook_path.startswith('/'): print(f"ERROR: --webhook-path ('{app_args.webhook_path}') must start with a '/'. Prepending.") app_args.webhook_path = '/' + app_args.webhook_path app_args.opt_in_keyword = app_args.opt_in_keyword.upper() log_level_to_use = "DEBUG" if app_args.debug else app_args.log_level.upper() setup_logging(log_level_to_use, app_args.log_file, app_name="TelnyxWebhookApp") # app_logger is now configured and can be used directly # Disable Werkzeug's default access logger to reduce console noise if desired, # as we have our own request received log. # Or set its level higher if you still want it but less verbose. if not app_args.debug: # Keep Werkzeug logs for Flask debug mode werkzeug_logger = logging.getLogger('werkzeug') werkzeug_logger.setLevel(logging.WARNING) # Or ERROR to only see issues app_logger.info("--- Telnyx Webhook Listener Starting (v2) ---") app_logger.info(f" Version: 1.2.0") app_logger.info(f" Configuration:") for arg, value in sorted(vars(app_args).items()): # Don't log API key value if arg == 'api_key': value = "***" if value else "Not set" app_logger.info(f" {arg}: {value}") app_logger.info(f" Log File (Absolute): {os.path.abspath(app_args.log_file)}") app_logger.info("-------------------------------------------") app.run(host=app_args.host, port=app_args.port, debug=app_args.debug, use_reloader=app_args.debug)