#!/usr/bin/env python3 """SIP Voice Notifier Add-on Service.""" import json import logging import os import tempfile import time from urllib.parse import urlparse import requests from flask import Flask, request, jsonify from pydub import AudioSegment import pjsua2 as pj # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) _LOGGER = logging.getLogger(__name__) app = Flask(__name__) # Global config CONFIG = {} DEFAULT_SAMPLE_RATE = 8000 class CallHandler(pj.Call): """Handle SIP call events.""" def __init__(self, account, call_id=pj.PJSUA_INVALID_ID): pj.Call.__init__(self, account, call_id) self.player = None self.audio_file = None self.connected = False def onCallState(self, prm): """Called when call state changes.""" ci = self.getInfo() _LOGGER.info(f"Call state: {ci.stateText}") if ci.state == pj.PJSIP_INV_STATE_CONFIRMED: _LOGGER.info("Call connected! Playing audio...") self.connected = True self.play_audio() elif ci.state == pj.PJSIP_INV_STATE_DISCONNECTED: _LOGGER.info("Call disconnected") def onCallMediaState(self, prm): """Called when media state changes.""" ci = self.getInfo() for mi in ci.media: if mi.type == pj.PJMEDIA_TYPE_AUDIO and mi.status == pj.PJSUA_CALL_MEDIA_ACTIVE: aud_med = self.getAudioMedia(mi.index) try: pj.Endpoint.instance().audDevManager().getPlaybackDevMedia().startTransmit(aud_med) aud_med.startTransmit(pj.Endpoint.instance().audDevManager().getPlaybackDevMedia()) except Exception as e: _LOGGER.warning(f"Audio routing warning: {e}") def play_audio(self): """Play the audio file into the call.""" if not self.audio_file: _LOGGER.error("No audio file specified") return try: self.player = pj.AudioMediaPlayer() self.player.createPlayer(self.audio_file, pj.PJMEDIA_FILE_NO_LOOP) ci = self.getInfo() for mi in ci.media: if mi.type == pj.PJMEDIA_TYPE_AUDIO and mi.status == pj.PJSUA_CALL_MEDIA_ACTIVE: aud_med = self.getAudioMedia(mi.index) self.player.startTransmit(aud_med) _LOGGER.info(f"Playing: {self.audio_file}") break except Exception as e: _LOGGER.error(f"Error playing audio: {e}", exc_info=True) def set_audio_file(self, audio_file: str): """Set the audio file to play.""" self.audio_file = audio_file class SIPAccount(pj.Account): """SIP Account handler.""" def __init__(self): pj.Account.__init__(self) def onIncomingCall(self, prm): """Reject incoming calls.""" call = CallHandler(self, prm.callId) call_param = pj.CallOpParam() call_param.statusCode = 486 call.answer(call_param) def download_and_convert_audio(url: str) -> str: """Download and convert audio to WAV.""" parsed_url = urlparse(url) extension = os.path.splitext(parsed_url.path)[1] or ".mp3" with tempfile.NamedTemporaryFile(delete=False, suffix=extension) as tmp: download_path = tmp.name response = requests.get(url, stream=True, timeout=30) response.raise_for_status() with open(download_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) _LOGGER.debug(f"Downloaded: {download_path}") try: audio = AudioSegment.from_file(download_path) audio = audio.set_frame_rate(DEFAULT_SAMPLE_RATE) audio = audio.set_channels(1) audio = audio.set_sample_width(2) with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp: wav_path = tmp.name audio.export(wav_path, format="wav") _LOGGER.debug(f"Converted: {wav_path}") os.remove(download_path) return wav_path except Exception as e: if os.path.exists(download_path): os.remove(download_path) raise Exception(f"Audio conversion failed: {e}") def generate_tts_audio(message: str) -> str: """Generate TTS audio.""" try: from gtts import gTTS with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp: mp3_path = tmp.name tts = gTTS(text=message, lang='en') tts.save(mp3_path) audio = AudioSegment.from_mp3(mp3_path) audio = audio.set_frame_rate(DEFAULT_SAMPLE_RATE) audio = audio.set_channels(1) audio = audio.set_sample_width(2) with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp: wav_path = tmp.name audio.export(wav_path, format="wav") os.remove(mp3_path) return wav_path except ImportError: raise Exception("gTTS not available") def place_sip_call(destination: str, audio_file: str, duration: int): """Place a SIP call.""" ep = pj.Endpoint() ep.libCreate() ep_cfg = pj.EpConfig() ep_cfg.logConfig.level = 3 ep_cfg.logConfig.consoleLevel = 0 ep.libInit(ep_cfg) transport_cfg = pj.TransportConfig() transport_cfg.port = 0 ep.transportCreate(pj.PJSIP_TRANSPORT_UDP, transport_cfg) ep.libStart() _LOGGER.info("PJSIP started") try: acc = SIPAccount() acc_cfg = pj.AccountConfig() sip_user = CONFIG['sip_user'] sip_server = CONFIG['sip_server'] sip_password = CONFIG['sip_password'] if not sip_user.startswith('sip:'): sip_uri = f"sip:{sip_user}@{sip_server}" else: sip_uri = sip_user acc_cfg.idUri = sip_uri acc_cfg.regConfig.registrarUri = f"sip:{sip_server}" cred = pj.AuthCredInfo("digest", "*", sip_user, 0, sip_password) acc_cfg.sipConfig.authCreds.append(cred) acc.create(acc_cfg) _LOGGER.info(f"Account created: {sip_uri}") time.sleep(2) if not destination.startswith('sip:'): dest_uri = f"sip:{destination}@{sip_server}" else: dest_uri = destination _LOGGER.info(f"Calling: {dest_uri}") call = CallHandler(acc) call.set_audio_file(audio_file) call_param = pj.CallOpParam() call_param.opt.audioCount = 1 call_param.opt.videoCount = 0 call.makeCall(dest_uri, call_param) wait_time = 0 while not call.connected and wait_time < 10: time.sleep(0.5) wait_time += 0.5 if not call.connected: _LOGGER.warning("Call did not connect") _LOGGER.info(f"Call active for {duration}s") time.sleep(duration) _LOGGER.info("Hanging up...") hangup_param = pj.CallOpParam() call.hangup(hangup_param) time.sleep(1) except Exception as e: _LOGGER.error(f"Call error: {e}", exc_info=True) raise finally: try: ep.libDestroy() except Exception as e: _LOGGER.warning(f"Cleanup warning: {e}") @app.route('/health', methods=['GET']) def health(): """Health check endpoint.""" return jsonify({'status': 'ok'}), 200 @app.route('/send_notification', methods=['POST']) def handle_send_notification(): """ Handle send_notification service call. This endpoint is called by Home Assistant when the service is invoked. """ try: data = request.json destination = data.get('destination') audio_url = data.get('audio_url') message = data.get('message') duration = data.get('duration', CONFIG.get('default_duration', 30)) if not destination: return jsonify({'error': 'destination required'}), 400 if not audio_url and not message: return jsonify({'error': 'audio_url or message required'}), 400 temp_files = [] try: # Prepare audio if audio_url: _LOGGER.info(f"Downloading: {audio_url}") audio_file = download_and_convert_audio(audio_url) temp_files.append(audio_file) elif message: _LOGGER.info(f"Generating TTS: {message}") audio_file = generate_tts_audio(message) temp_files.append(audio_file) # Place call _LOGGER.info(f"Calling: {destination}") place_sip_call(destination, audio_file, duration) return jsonify({'status': 'success'}), 200 finally: # Cleanup for f in temp_files: try: if os.path.exists(f): os.remove(f) except Exception as e: _LOGGER.warning(f"Cleanup failed: {e}") except Exception as e: _LOGGER.error(f"Request error: {e}", exc_info=True) return jsonify({'error': str(e)}), 500 if __name__ == '__main__': # Load config from Home Assistant add-on options options_file = '/data/options.json' if os.path.exists(options_file): with open(options_file, 'r') as f: CONFIG = json.load(f) _LOGGER.info("Config loaded from options.json") else: _LOGGER.warning("No options.json found, using environment variables") CONFIG = { 'sip_server': os.getenv('SIP_SERVER', ''), 'sip_user': os.getenv('SIP_USER', ''), 'sip_password': os.getenv('SIP_PASSWORD', ''), 'default_duration': int(os.getenv('DEFAULT_DURATION', 30)) } _LOGGER.info("SIP Voice Notifier ready - service will be auto-registered by Supervisor") _LOGGER.info("Starting service on port 8099") app.run(host='0.0.0.0', port=8099, debug=False)