Files
calliope/calliope/app.py

641 lines
26 KiB
Python

"""Calliope — Voice-to-text macOS menu bar app."""
import logging
import os
import threading
import time
from typing import Any
import numpy as np
import subprocess
import rumps
from calliope import config as config_mod
from calliope.recorder import Recorder
from calliope.transcriber import Transcriber
from calliope.postprocessor import Postprocessor
from calliope.typer import type_text, type_text_clipboard
from calliope.hotkeys import HotkeyListener
from calliope.overlay import WaveformOverlay
log = logging.getLogger(__name__)
class CalliopeApp(rumps.App):
def __init__(self, cfg: dict[str, Any] | None = None):
super().__init__("Calliope", title="\U0001f3a4", quit_button=None) # 🎤
if cfg is None:
cfg = config_mod.load()
self.cfg = cfg
self.overlay = WaveformOverlay()
self.recorder = Recorder(device=cfg.get("device"))
self.transcriber = Transcriber(
model=cfg.get("model", "mlx-community/whisper-large-v3-turbo"),
silence_threshold=cfg.get("silence_threshold", 0.005),
)
self.transcriber.context = cfg.get("context", "")
self.transcriber.language = cfg.get("language", "auto")
# Post-processing
pp_cfg = cfg.get("postprocessing", {})
self.postprocessor: Postprocessor | None = None
if pp_cfg.get("enabled") and pp_cfg.get("model"):
self.postprocessor = Postprocessor(
system_prompt=pp_cfg.get("system_prompt", ""),
)
self._recording = False
self._rec_lock = threading.Lock()
self._rec_start_time: float | None = None
self._rec_timer: rumps.Timer | None = None
self._transcribe_done = threading.Event()
self._transcribe_done.set() # not transcribing initially
# Silence-based auto-stop
self._silence_since: float | None = None
self._rec_has_speech: bool = False
self._silence_stop_evt: threading.Event = threading.Event()
self._silence_stop_evt.set() # not monitoring initially
self.status_item = rumps.MenuItem("Status: Loading model...")
self.status_item.set_callback(None)
self.toggle_item = rumps.MenuItem("Start Recording", callback=self._on_toggle_click)
ctx = cfg.get("context", "")
context_label = f"Set Whisper Context... ({ctx[:20]}...)" if ctx else "Set Whisper Context..."
self.context_item = rumps.MenuItem(context_label, callback=self._on_set_context)
# Language submenu
self._lang_menu = rumps.MenuItem("Language")
current_lang = cfg.get("language", "auto")
for display_name, code in config_mod.LANGUAGES.items():
prefix = "\u2713 " if code == current_lang else " "
item = rumps.MenuItem(f"{prefix}{display_name}", callback=self._on_language_select)
self._lang_menu.add(item)
# Model submenu
self._model_menu = rumps.MenuItem("Model")
current_model = cfg.get("model", "distil-whisper/distil-large-v3")
for model_id in config_mod.MODELS:
short = model_id.split("/")[-1]
prefix = "\u2713 " if model_id == current_model else " "
item = rumps.MenuItem(f"{prefix}{short}", callback=self._on_model_select)
self._model_menu.add(item)
# Microphone submenu
self._mic_menu = rumps.MenuItem("Microphone")
self._build_mic_menu()
# Post-Processing submenu
self._pp_menu = rumps.MenuItem("Post-Processing")
self._build_pp_menu()
# Auto-stop on silence toggle
auto_stop = cfg.get("auto_stop_silence", True)
prefix = "\u2713 " if auto_stop else " "
self._auto_stop_item = rumps.MenuItem(
f"{prefix}Auto-stop on Silence", callback=self._on_auto_stop_toggle
)
# Typing mode submenu
self._typing_menu = rumps.MenuItem("Typing Mode")
current_mode = cfg.get("typing_mode", "char")
for mode, label in [("char", "Character (CGEvents)"), ("clipboard", "Clipboard (Cmd+V)")]:
prefix = "\u2713 " if mode == current_mode else " "
item = rumps.MenuItem(f"{prefix}{label}", callback=self._on_typing_mode_select)
item._typing_mode = mode
self._typing_menu.add(item)
quit_item = rumps.MenuItem("Quit Calliope", callback=self._on_quit)
self.menu = [
self.status_item,
None,
self.toggle_item,
self._auto_stop_item,
self.context_item,
self._lang_menu,
self._model_menu,
self._mic_menu,
self._typing_menu,
self._pp_menu,
None,
quit_item,
]
hotkey_cfg = cfg.get("hotkeys", {})
self.hotkeys = HotkeyListener(
on_push_to_talk_start=self._start_recording,
on_push_to_talk_stop=self._stop_and_transcribe,
on_toggle=self._toggle_recording,
ptt_combo=hotkey_cfg.get("ptt", "ctrl+shift"),
toggle_combo=hotkey_cfg.get("toggle", "ctrl+space"),
)
# Load model in background
threading.Thread(target=self._load_model, daemon=True).start()
def _notify(self, title: str, subtitle: str, message: str) -> None:
if not self.cfg.get("notifications", True):
return
try:
text = f"{subtitle}{message}" if subtitle else message
script = f'display notification "{text}" with title "{title}"'
subprocess.Popen(
["osascript", "-e", script],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except Exception:
pass
def _ready_status(self) -> str:
short = self.transcriber.model.split("/")[-1]
return f"Status: Ready ({short})"
def _load_model(self) -> None:
try:
self.transcriber.load()
self.status_item.title = self._ready_status()
self.hotkeys.start()
log.info("Model loaded, hotkeys active")
# Load postprocessor if enabled
pp_cfg = self.cfg.get("postprocessing", {})
if pp_cfg.get("enabled") and pp_cfg.get("model"):
self._ensure_postprocessor(pp_cfg["model"])
except Exception:
log.error("Failed to load model", exc_info=True)
self.status_item.title = "Status: Model load failed"
self._notify("Calliope", "Error", "Failed to load Whisper model. Check logs.")
@staticmethod
def _activate_app():
"""Temporarily become a regular app so dialog text fields receive focus."""
from AppKit import NSApplication, NSApplicationActivationPolicyRegular
app = NSApplication.sharedApplication()
app.setActivationPolicy_(NSApplicationActivationPolicyRegular)
app.activateIgnoringOtherApps_(True)
@staticmethod
def _deactivate_app():
"""Revert to accessory app (no Dock icon)."""
from AppKit import NSApplication, NSApplicationActivationPolicyAccessory
NSApplication.sharedApplication().setActivationPolicy_(NSApplicationActivationPolicyAccessory)
def _on_set_context(self, sender) -> None:
self._activate_app()
response = rumps.Window(
message="Provide context to help Whisper with domain-specific terms, "
"names, or jargon. For example:\n\n"
"\"Meeting about Kubernetes, gRPC, and the Istio service mesh.\"\n\n"
"Clear the field and press Save to remove context.",
title="Set Whisper Context",
default_text=self.transcriber.context,
ok="Save",
cancel="Cancel",
dimensions=(320, 120),
).run()
if response.clicked != 1: # Cancel / Escape
self._deactivate_app()
return
self.transcriber.context = response.text.strip()
self._deactivate_app()
ctx = self.transcriber.context
self.context_item.title = f"Set Whisper Context... ({ctx[:20]}...)" if ctx else "Set Whisper Context..."
self.cfg["context"] = ctx
config_mod.save(self.cfg)
def _on_language_select(self, sender) -> None:
display_name = sender.title.strip().lstrip("\u2713").strip()
code = config_mod.LANGUAGES.get(display_name, "auto")
self.transcriber.language = code
# Update checkmarks
for item in self._lang_menu.values():
name = item.title.strip().lstrip("\u2713").strip()
item.title = f"\u2713 {name}" if config_mod.LANGUAGES.get(name) == code else f" {name}"
self.cfg["language"] = code
config_mod.save(self.cfg)
log.info("Language set to %s (%s)", display_name, code)
def _on_model_select(self, sender) -> None:
short_name = sender.title.strip().lstrip("\u2713").strip()
# Find full model ID
model_id = None
for m in config_mod.MODELS:
if m.split("/")[-1] == short_name:
model_id = m
break
if model_id is None or model_id == self.transcriber.model:
return
# Update checkmarks
for item in self._model_menu.values():
name = item.title.strip().lstrip("\u2713").strip()
item.title = f"\u2713 {name}" if name == short_name else f" {name}"
self.cfg["model"] = model_id
config_mod.save(self.cfg)
self.status_item.title = "Status: Loading model..."
self.hotkeys.stop()
def _switch():
self._transcribe_done.wait() # wait for in-flight transcription
self._release_transcriber()
self.transcriber = Transcriber(
model=model_id,
silence_threshold=self.cfg.get("silence_threshold", 0.005),
)
self.transcriber.context = self.cfg.get("context", "")
self.transcriber.language = self.cfg.get("language", "auto")
self._load_model()
threading.Thread(target=_switch, daemon=True).start()
log.info("Switching model to %s", model_id)
def _build_mic_menu(self) -> None:
"""Populate the microphone submenu with available input devices."""
import sounddevice as sd
current_device = self.cfg.get("device")
# System default
prefix = "\u2713 " if current_device is None else " "
item = rumps.MenuItem(f"{prefix}System Default", callback=self._on_mic_select)
self._mic_menu.add(item)
# List input devices
for dev in sd.query_devices():
if dev["max_input_channels"] > 0:
idx = dev["index"]
name = dev["name"]
prefix = "\u2713 " if current_device == idx else " "
item = rumps.MenuItem(f"{prefix}{name}", callback=self._on_mic_select)
item._device_index = idx
self._mic_menu.add(item)
def _on_mic_select(self, sender) -> None:
name = sender.title.strip().lstrip("\u2713").strip()
device_index = getattr(sender, "_device_index", None)
self.recorder._device = device_index
self.cfg["device"] = device_index
config_mod.save(self.cfg)
# Update checkmarks
for item in self._mic_menu.values():
item_name = item.title.strip().lstrip("\u2713").strip()
item.title = f"\u2713 {item_name}" if item_name == name else f" {item_name}"
log.info("Microphone set to %s (device=%s)", name, device_index)
def _on_typing_mode_select(self, sender) -> None:
mode = sender._typing_mode
self.cfg["typing_mode"] = mode
config_mod.save(self.cfg)
for item in self._typing_menu.values():
label = item.title.strip().lstrip("\u2713").strip()
item.title = f"\u2713 {label}" if getattr(item, "_typing_mode", None) == mode else f" {label}"
log.info("Typing mode set to %s", mode)
def _release_transcriber(self) -> None:
"""Free the current Whisper model to reclaim memory."""
import gc
if self.transcriber is not None:
self.transcriber._loaded = False
gc.collect()
def _on_toggle_click(self, sender) -> None:
self._toggle_recording()
def _toggle_recording(self) -> None:
if self._recording:
self._stop_and_transcribe()
else:
self._start_recording()
def _start_recording(self) -> None:
with self._rec_lock:
if self._recording:
return
self._recording = True
self._rec_start_time = time.time()
self.title = "\U0001f534 0:00" # 🔴
self.toggle_item.title = "Stop Recording"
self.status_item.title = "Status: Recording..."
self._silence_since = None
self._rec_has_speech = False
self._silence_stop_evt = threading.Event()
self.recorder.on_audio = self._on_audio_chunk
try:
self.recorder.start()
except Exception:
log.error("Failed to start recording", exc_info=True)
with self._rec_lock:
self._recording = False
self.title = "\U0001f3a4" # 🎤
self.toggle_item.title = "Start Recording"
self.status_item.title = "Status: Mic error (check device)"
self._notify("Calliope", "", "Microphone unavailable — check audio device")
return
self.overlay.show()
self._rec_timer = rumps.Timer(self._update_rec_duration, 1)
self._rec_timer.start()
if self.cfg.get("auto_stop_silence", True):
threading.Thread(target=self._silence_monitor, daemon=True).start()
self._notify("Calliope", "", "Recording started")
log.info("Recording started")
def _stop_and_transcribe(self) -> None:
with self._rec_lock:
if not self._recording:
return
self._recording = False
self._silence_stop_evt.set()
if self._rec_timer:
self._rec_timer.stop()
self._rec_timer = None
duration = int(time.time() - self._rec_start_time) if self._rec_start_time else 0
self._rec_start_time = None
self.title = "\u23f3" # ⏳
self.toggle_item.title = "Start Recording"
self.status_item.title = "Status: Transcribing..."
self.overlay.show_transcribing()
audio = self.recorder.stop()
self._notify("Calliope", "", f"Recording stopped ({duration}s)")
log.info("Recording stopped, %d samples", audio.size)
self._transcribe_done.clear()
threading.Thread(target=self._transcribe_and_type, args=(audio,), daemon=True).start()
def _on_audio_chunk(self, chunk: np.ndarray) -> None:
"""Called from the audio thread on every recorder chunk."""
self.overlay.push_samples(chunk)
rms = float(np.sqrt(np.mean(chunk ** 2)))
threshold = self.cfg.get("silence_threshold", 0.005)
if rms >= threshold:
self._rec_has_speech = True
self._silence_since = None
elif self._rec_has_speech and self._silence_since is None:
self._silence_since = time.monotonic()
def _silence_monitor(self) -> None:
"""Background thread: trigger auto-stop after sustained silence."""
timeout = self.cfg.get("silence_timeout_seconds", 1.5)
stop_evt = self._silence_stop_evt
while not stop_evt.is_set():
since = self._silence_since
if since is not None and (time.monotonic() - since) >= timeout:
log.info("Auto-stop: %.1fs of silence detected", timeout)
self._stop_and_transcribe()
break
stop_evt.wait(0.1)
def _on_auto_stop_toggle(self, sender) -> None:
enabled = not self.cfg.get("auto_stop_silence", True)
self.cfg["auto_stop_silence"] = enabled
config_mod.save(self.cfg)
prefix = "\u2713 " if enabled else " "
self._auto_stop_item.title = f"{prefix}Auto-stop on Silence"
log.info("Auto-stop on silence %s", "enabled" if enabled else "disabled")
def _update_rec_duration(self, timer) -> None:
if self._rec_start_time is None:
return
elapsed = int(time.time() - self._rec_start_time)
max_dur = self.cfg.get("max_recording_seconds", 300)
if max_dur and elapsed >= max_dur:
log.info("Max recording duration reached (%ds)", max_dur)
self._stop_and_transcribe()
return
minutes, seconds = divmod(elapsed, 60)
self.title = f"\U0001f534 {minutes}:{seconds:02d}"
def _transcribe_and_type(self, audio) -> None:
try:
text = self.transcriber.transcribe(audio)
if not text:
self.overlay.hide()
self.title = "\U0001f3a4" # 🎤
self.status_item.title = self._ready_status()
self._notify("Calliope", "", "No speech detected — audio too short or too quiet")
return
# LLM post-processing
pp_cfg = self.cfg.get("postprocessing", {})
if pp_cfg.get("enabled") and self.postprocessor and self.postprocessor._model is not None:
try:
self.status_item.title = "Status: Post-processing..."
text = self.postprocessor.process(text)
except Exception:
log.error("Post-processing failed, using raw transcription", exc_info=True)
if text:
def _do_type():
try:
if self.cfg.get("typing_mode", "char") == "clipboard":
type_text_clipboard(text)
else:
type_text(text, delay=self.cfg.get("typing_delay", 0.005))
print(f"\n[Calliope] {text}")
log.info("Typed %d chars", len(text))
except Exception:
log.error("Typing failed", exc_info=True)
from PyObjCTools.AppHelper import callAfter
callAfter(_do_type)
self.overlay.hide()
self.status_item.title = self._ready_status()
except Exception:
log.error("Transcription failed", exc_info=True)
self.overlay.hide()
self.status_item.title = self._ready_status()
self._notify("Calliope", "Error", "Transcription failed. Check logs.")
finally:
self.title = "\U0001f3a4" # 🎤
self._transcribe_done.set()
# ── Post-Processing ───────────────────────────────────────────
def _build_pp_menu(self) -> None:
if self._pp_menu._menu is not None:
self._pp_menu.clear()
pp_cfg = self.cfg.get("postprocessing", {})
enabled = pp_cfg.get("enabled", False)
active_model = pp_cfg.get("model")
models = pp_cfg.get("models", [])
# Enable/disable toggle
toggle_label = "Disable Post-Processing" if enabled else "Enable Post-Processing"
self._pp_menu.add(rumps.MenuItem(toggle_label, callback=self._on_pp_toggle))
self._pp_menu.add(None) # separator
# Downloaded models
if models:
for m in models:
short = m.split("/")[-1]
prefix = "\u2713 " if m == active_model else " "
item = rumps.MenuItem(f"{prefix}{short}", callback=self._on_pp_model_select)
item._pp_model_id = m
self._pp_menu.add(item)
self._pp_menu.add(None)
self._pp_menu.add(rumps.MenuItem("Download Model...", callback=self._on_pp_download))
self._pp_menu.add(rumps.MenuItem("Edit System Prompt...", callback=self._on_pp_edit_prompt))
if models:
self._pp_menu.add(rumps.MenuItem("Delete Model...", callback=self._on_pp_delete))
def _on_pp_toggle(self, sender) -> None:
pp_cfg = self.cfg.setdefault("postprocessing", {})
enabled = not pp_cfg.get("enabled", False)
pp_cfg["enabled"] = enabled
config_mod.save(self.cfg)
if enabled and pp_cfg.get("model"):
self._ensure_postprocessor(pp_cfg["model"])
elif not enabled:
self._release_postprocessor()
self._build_pp_menu()
log.info("Post-processing %s", "enabled" if enabled else "disabled")
def _on_pp_model_select(self, sender) -> None:
model_id = sender._pp_model_id
pp_cfg = self.cfg.setdefault("postprocessing", {})
if model_id == pp_cfg.get("model"):
return
pp_cfg["model"] = model_id
config_mod.save(self.cfg)
if pp_cfg.get("enabled"):
self._ensure_postprocessor(model_id)
self._build_pp_menu()
log.info("Post-processing model set to %s", model_id)
def _on_pp_download(self, sender) -> None:
self._activate_app()
response = rumps.Window(
message="Enter a HuggingFace MLX model repo ID.\n\n"
"Example: mlx-community/Qwen2.5-0.5B-Instruct-4bit",
title="Download MLX Model",
default_text="mlx-community/Qwen2.5-0.5B-Instruct-4bit",
ok="Download",
cancel="Cancel",
dimensions=(320, 24),
).run()
self._deactivate_app()
if response.clicked != 1:
return
repo = response.text.strip()
if not repo:
return
self._notify("Calliope", "", f"Downloading {repo}...")
def _do_download():
try:
import huggingface_hub.constants as hf_constants
os.environ["HF_HUB_OFFLINE"] = "0"
hf_constants.HF_HUB_OFFLINE = False
Postprocessor.download(repo)
pp_cfg = self.cfg.setdefault("postprocessing", {})
if repo not in pp_cfg.setdefault("models", []):
pp_cfg["models"].append(repo)
if not pp_cfg.get("model"):
pp_cfg["model"] = repo
config_mod.save(self.cfg)
self._build_pp_menu()
self._notify("Calliope", "", f"Model downloaded: {repo}")
except Exception:
log.error("Failed to download %s", repo, exc_info=True)
self._notify("Calliope", "Error", f"Failed to download {repo}")
finally:
os.environ["HF_HUB_OFFLINE"] = "1"
hf_constants.HF_HUB_OFFLINE = True
threading.Thread(target=_do_download, daemon=True).start()
def _on_pp_edit_prompt(self, sender) -> None:
pp_cfg = self.cfg.setdefault("postprocessing", {})
current = pp_cfg.get("system_prompt", "")
self._activate_app()
response = rumps.Window(
message="System prompt sent to the LLM before your transcription:",
title="Edit System Prompt",
default_text=current,
ok="Save",
cancel="Cancel",
dimensions=(320, 120),
).run()
self._deactivate_app()
if response.clicked != 1:
return
pp_cfg["system_prompt"] = response.text.strip()
config_mod.save(self.cfg)
if self.postprocessor:
from calliope.postprocessor import DEFAULT_SYSTEM_PROMPT
self.postprocessor.system_prompt = pp_cfg["system_prompt"] or DEFAULT_SYSTEM_PROMPT
log.info("Post-processing system prompt updated")
def _on_pp_delete(self, sender) -> None:
pp_cfg = self.cfg.setdefault("postprocessing", {})
models = pp_cfg.get("models", [])
if not models:
return
self._activate_app()
response = rumps.Window(
message="Enter the repo ID of the model to remove from Calliope:\n\n"
+ "\n".join(f"{m}" for m in models),
title="Delete Model",
default_text="",
ok="Delete",
cancel="Cancel",
dimensions=(320, 24),
).run()
self._deactivate_app()
if response.clicked != 1:
return
repo = response.text.strip()
if repo not in models:
return
models.remove(repo)
if pp_cfg.get("model") == repo:
pp_cfg["model"] = models[0] if models else None
if not models:
pp_cfg["enabled"] = False
self._release_postprocessor()
config_mod.save(self.cfg)
self._build_pp_menu()
log.info("Removed model %s", repo)
def _ensure_postprocessor(self, model_id: str) -> None:
"""Load the postprocessor model in a background thread."""
def _load():
try:
if self.postprocessor is None:
pp_cfg = self.cfg.get("postprocessing", {})
self.postprocessor = Postprocessor(
system_prompt=pp_cfg.get("system_prompt", ""),
)
self.postprocessor.unload()
self.postprocessor.load(model_id)
except Exception:
log.error("Failed to load postprocessor %s", model_id, exc_info=True)
self._notify("Calliope", "Error", f"Failed to load LLM: {model_id}")
threading.Thread(target=_load, daemon=True).start()
def _release_postprocessor(self) -> None:
if self.postprocessor is not None:
self.postprocessor.unload()
self.postprocessor = None
def _on_quit(self, sender) -> None:
self.hotkeys.stop()
self.recorder.stop()
# Wait for any in-flight transcription so PyTorch isn't killed mid-operation,
# which would cause a SIGTRAP from native threads being torn down uncleanly.
self._transcribe_done.wait(timeout=10)
self._release_transcriber()
self._release_postprocessor()
# Stop overlay timers synchronously to avoid retain cycles on quit.
self.overlay.cleanup()
rumps.quit_application()
def main():
from calliope.cli import cli
cli()
if __name__ == "__main__":
main()