From 435c87803baee49c6f482e46825a37d22b513136 Mon Sep 17 00:00:00 2001 From: syntaxbullet Date: Tue, 17 Feb 2026 15:57:14 +0100 Subject: [PATCH] refactor: fix several memory leaks and improve UX by exposing microphone selection etc. --- calliope/app.py | 168 +++++++++++++++++++++++++++++++--------- calliope/config.py | 4 + calliope/hotkeys.py | 58 +++++++++++--- calliope/overlay.py | 24 +++++- calliope/transcriber.py | 34 +++++--- calliope/typer.py | 48 ++++++++---- 6 files changed, 261 insertions(+), 75 deletions(-) diff --git a/calliope/app.py b/calliope/app.py index 9c1706e..087441a 100644 --- a/calliope/app.py +++ b/calliope/app.py @@ -11,6 +11,8 @@ os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") # Run offline — models are downloaded during setup, no need to hit HuggingFace on every launch. os.environ.setdefault("HF_HUB_OFFLINE", "1") +import subprocess + import rumps from calliope import config as config_mod @@ -36,6 +38,7 @@ class CalliopeApp(rumps.App): self.recorder = Recorder(device=cfg.get("device")) self.transcriber = Transcriber( model=cfg.get("model", "distil-whisper/distil-large-v3"), + silence_threshold=cfg.get("silence_threshold", 0.005), ) self.transcriber.context = cfg.get("context", "") self.transcriber.language = cfg.get("language", "auto") @@ -44,11 +47,15 @@ class CalliopeApp(rumps.App): self._rec_lock = threading.Lock() self._rec_start_time: float | None = None self._rec_timer: rumps.Timer | None = None + self._transcribe_done = threading.Event() + self._transcribe_done.set() # not transcribing initially self.status_item = rumps.MenuItem("Status: Loading model...") self.status_item.set_callback(None) self.toggle_item = rumps.MenuItem("Start Recording", callback=self._on_toggle_click) - self.context_item = rumps.MenuItem("Set Whisper Context...", callback=self._on_set_context) + ctx = cfg.get("context", "") + context_label = f"Set Whisper Context... ({ctx[:20]}...)" if ctx else "Set Whisper Context..." + self.context_item = rumps.MenuItem(context_label, callback=self._on_set_context) # Language submenu self._lang_menu = rumps.MenuItem("Language") @@ -67,6 +74,19 @@ class CalliopeApp(rumps.App): item = rumps.MenuItem(f"{prefix}{short}", callback=self._on_model_select) self._model_menu.add(item) + # Microphone submenu + self._mic_menu = rumps.MenuItem("Microphone") + self._build_mic_menu() + + # Typing mode submenu + self._typing_menu = rumps.MenuItem("Typing Mode") + current_mode = cfg.get("typing_mode", "char") + for mode, label in [("char", "Character (CGEvents)"), ("clipboard", "Clipboard (Cmd+V)")]: + prefix = "\u2713 " if mode == current_mode else " " + item = rumps.MenuItem(f"{prefix}{label}", callback=self._on_typing_mode_select) + item._typing_mode = mode + self._typing_menu.add(item) + quit_item = rumps.MenuItem("Quit Calliope", callback=self._on_quit) self.menu = [ @@ -76,6 +96,8 @@ class CalliopeApp(rumps.App): self.context_item, self._lang_menu, self._model_menu, + self._mic_menu, + self._typing_menu, None, quit_item, ] @@ -92,19 +114,34 @@ class CalliopeApp(rumps.App): # Load model in background threading.Thread(target=self._load_model, daemon=True).start() + def _notify(self, title: str, subtitle: str, message: str) -> None: + if not self.cfg.get("notifications", True): + return + try: + text = f"{subtitle} — {message}" if subtitle else message + script = f'display notification "{text}" with title "{title}"' + subprocess.Popen( + ["osascript", "-e", script], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + except Exception: + pass + + def _ready_status(self) -> str: + short = self.transcriber.model.split("/")[-1] + return f"Status: Ready ({short})" + def _load_model(self) -> None: try: self.transcriber.load() - self.status_item.title = "Status: Ready" + self.status_item.title = self._ready_status() self.hotkeys.start() log.info("Model loaded, hotkeys active") except Exception: log.error("Failed to load model", exc_info=True) self.status_item.title = "Status: Model load failed" - try: - rumps.notification("Calliope", "Error", "Failed to load Whisper model. Check logs.") - except RuntimeError: - pass + self._notify("Calliope", "Error", "Failed to load Whisper model. Check logs.") @staticmethod def _activate_app(): @@ -125,20 +162,23 @@ class CalliopeApp(rumps.App): response = rumps.Window( message="Provide context to help Whisper with domain-specific terms, " "names, or jargon. For example:\n\n" - "\"Meeting about Kubernetes, gRPC, and the Istio service mesh.\"", + "\"Meeting about Kubernetes, gRPC, and the Istio service mesh.\"\n\n" + "Clear the field and press Save to remove context.", title="Set Whisper Context", default_text=self.transcriber.context, ok="Save", - cancel="Clear", + cancel="Cancel", dimensions=(320, 120), ).run() - if response.clicked == 1: # Save - self.transcriber.context = response.text.strip() - else: # Clear - self.transcriber.context = "" + if response.clicked != 1: # Cancel / Escape + self._deactivate_app() + return + self.transcriber.context = response.text.strip() self._deactivate_app() ctx = self.transcriber.context self.context_item.title = f"Set Whisper Context... ({ctx[:20]}...)" if ctx else "Set Whisper Context..." + self.cfg["context"] = ctx + config_mod.save(self.cfg) def _on_language_select(self, sender) -> None: display_name = sender.title.strip().lstrip("\u2713").strip() @@ -170,19 +210,68 @@ class CalliopeApp(rumps.App): config_mod.save(self.cfg) self.status_item.title = "Status: Loading model..." self.hotkeys.stop() - self._release_transcriber() - self.transcriber = Transcriber(model=model_id) - self.transcriber.context = self.cfg.get("context", "") - self.transcriber.language = self.cfg.get("language", "auto") - threading.Thread(target=self._load_model, daemon=True).start() + + def _switch(): + self._transcribe_done.wait() # wait for in-flight transcription + self._release_transcriber() + self.transcriber = Transcriber( + model=model_id, + silence_threshold=self.cfg.get("silence_threshold", 0.005), + ) + self.transcriber.context = self.cfg.get("context", "") + self.transcriber.language = self.cfg.get("language", "auto") + self._load_model() + + threading.Thread(target=_switch, daemon=True).start() log.info("Switching model to %s", model_id) + def _build_mic_menu(self) -> None: + """Populate the microphone submenu with available input devices.""" + import sounddevice as sd + current_device = self.cfg.get("device") + # System default + prefix = "\u2713 " if current_device is None else " " + item = rumps.MenuItem(f"{prefix}System Default", callback=self._on_mic_select) + self._mic_menu.add(item) + # List input devices + for dev in sd.query_devices(): + if dev["max_input_channels"] > 0: + idx = dev["index"] + name = dev["name"] + prefix = "\u2713 " if current_device == idx else " " + item = rumps.MenuItem(f"{prefix}{name}", callback=self._on_mic_select) + item._device_index = idx + self._mic_menu.add(item) + + def _on_mic_select(self, sender) -> None: + name = sender.title.strip().lstrip("\u2713").strip() + device_index = getattr(sender, "_device_index", None) + self.recorder._device = device_index + self.cfg["device"] = device_index + config_mod.save(self.cfg) + # Update checkmarks + for item in self._mic_menu.values(): + item_name = item.title.strip().lstrip("\u2713").strip() + item.title = f"\u2713 {item_name}" if item_name == name else f" {item_name}" + log.info("Microphone set to %s (device=%s)", name, device_index) + + def _on_typing_mode_select(self, sender) -> None: + mode = sender._typing_mode + self.cfg["typing_mode"] = mode + config_mod.save(self.cfg) + for item in self._typing_menu.values(): + label = item.title.strip().lstrip("\u2713").strip() + item.title = f"\u2713 {label}" if getattr(item, "_typing_mode", None) == mode else f" {label}" + log.info("Typing mode set to %s", mode) + def _release_transcriber(self) -> None: """Free the current Whisper model to reclaim GPU memory.""" + import gc + import torch if self.transcriber is not None: self.transcriber._pipe = None self.transcriber._tokenizer = None - import torch + gc.collect() if torch.backends.mps.is_available(): torch.mps.empty_cache() @@ -214,18 +303,12 @@ class CalliopeApp(rumps.App): self.title = "\U0001f3a4" # 🎤 self.toggle_item.title = "Start Recording" self.status_item.title = "Status: Mic error (check device)" - try: - rumps.notification("Calliope", "", "Microphone unavailable — check audio device") - except RuntimeError: - pass + self._notify("Calliope", "", "Microphone unavailable — check audio device") return self.overlay.show() self._rec_timer = rumps.Timer(self._update_rec_duration, 1) self._rec_timer.start() - try: - rumps.notification("Calliope", "", "Recording started") - except RuntimeError: - pass # Info.plist missing CFBundleIdentifier + self._notify("Calliope", "", "Recording started") log.info("Recording started") def _stop_and_transcribe(self) -> None: @@ -238,36 +321,45 @@ class CalliopeApp(rumps.App): self._rec_timer = None duration = int(time.time() - self._rec_start_time) if self._rec_start_time else 0 self._rec_start_time = None - self.title = "\U0001f3a4" # 🎤 + self.title = "\u23f3" # ⏳ self.toggle_item.title = "Start Recording" self.status_item.title = "Status: Transcribing..." self.overlay.show_transcribing() audio = self.recorder.stop() - try: - rumps.notification("Calliope", "", f"Recording stopped ({duration}s)") - except RuntimeError: - pass + self._notify("Calliope", "", f"Recording stopped ({duration}s)") log.info("Recording stopped, %d samples", audio.size) + self._transcribe_done.clear() threading.Thread(target=self._transcribe_and_type, args=(audio,), daemon=True).start() def _update_rec_duration(self, timer) -> None: if self._rec_start_time is None: return elapsed = int(time.time() - self._rec_start_time) + max_dur = self.cfg.get("max_recording_seconds", 300) + if max_dur and elapsed >= max_dur: + log.info("Max recording duration reached (%ds)", max_dur) + self._stop_and_transcribe() + return minutes, seconds = divmod(elapsed, 60) self.title = f"\U0001f534 {minutes}:{seconds:02d}" def _transcribe_and_type(self, audio) -> None: try: text = self.transcriber.transcribe(audio) + if not text: + self.overlay.hide() + self.title = "\U0001f3a4" # 🎤 + self.status_item.title = self._ready_status() + self._notify("Calliope", "", "No speech detected — audio too short or too quiet") + return if text: def _do_type(): try: if self.cfg.get("typing_mode", "char") == "clipboard": type_text_clipboard(text) else: - type_text(text) + type_text(text, delay=self.cfg.get("typing_delay", 0.005)) print(f"\n[Calliope] {text}") log.info("Typed %d chars", len(text)) except Exception: @@ -275,15 +367,15 @@ class CalliopeApp(rumps.App): from PyObjCTools.AppHelper import callAfter callAfter(_do_type) self.overlay.hide() - self.status_item.title = "Status: Ready" + self.status_item.title = self._ready_status() except Exception: log.error("Transcription failed", exc_info=True) self.overlay.hide() - self.status_item.title = "Status: Ready" - try: - rumps.notification("Calliope", "Error", "Transcription failed. Check logs.") - except RuntimeError: - pass + self.status_item.title = self._ready_status() + self._notify("Calliope", "Error", "Transcription failed. Check logs.") + finally: + self.title = "\U0001f3a4" # 🎤 + self._transcribe_done.set() def _on_quit(self, sender) -> None: self.hotkeys.stop() diff --git a/calliope/config.py b/calliope/config.py index 60e1b97..4d578ed 100644 --- a/calliope/config.py +++ b/calliope/config.py @@ -22,6 +22,10 @@ DEFAULTS: dict[str, Any] = { "context": "", "debug": False, "typing_mode": "char", # "char" or "clipboard" + "max_recording_seconds": 300, # 5 minutes + "silence_threshold": 0.005, # RMS energy below which audio is considered silence + "notifications": True, # show macOS notifications + "typing_delay": 0.005, # seconds between keystrokes in char mode } LANGUAGES: dict[str, str] = { diff --git a/calliope/hotkeys.py b/calliope/hotkeys.py index b61eed7..69cdf72 100644 --- a/calliope/hotkeys.py +++ b/calliope/hotkeys.py @@ -14,21 +14,53 @@ _KEY_MAP: dict[str, keyboard.Key] = { "alt": keyboard.Key.alt, "cmd": keyboard.Key.cmd, "space": keyboard.Key.space, + "tab": keyboard.Key.tab, + "esc": keyboard.Key.esc, + "enter": keyboard.Key.enter, + "backspace": keyboard.Key.backspace, + "delete": keyboard.Key.delete, +} + +# Add function keys F1-F12 +for _i in range(1, 13): + _KEY_MAP[f"f{_i}"] = getattr(keyboard.Key, f"f{_i}") + +# Virtual keycodes for left/right modifier normalization (macOS) +_VK_NORMALIZE = { + 0x3B: keyboard.Key.ctrl, # left ctrl + 0x3E: keyboard.Key.ctrl, # right ctrl + 0x38: keyboard.Key.shift, # left shift + 0x3C: keyboard.Key.shift, # right shift + 0x3A: keyboard.Key.alt, # left alt/option + 0x3D: keyboard.Key.alt, # right alt/option + 0x37: keyboard.Key.cmd, # left cmd + 0x36: keyboard.Key.cmd, # right cmd } -def _parse_combo(combo: str) -> set[keyboard.Key]: - """Parse 'ctrl+shift' into a set of pynput keys.""" - keys: set[keyboard.Key] = set() +def _parse_combo(combo: str) -> set: + """Parse 'ctrl+shift' or 'ctrl+r' into a set of pynput keys.""" + keys: set = set() for part in combo.lower().split("+"): part = part.strip() if part in _KEY_MAP: keys.add(_KEY_MAP[part]) + elif len(part) == 1: + keys.add(keyboard.KeyCode.from_char(part)) else: log.warning("Unknown key in combo: %s", part) return keys +def _check_accessibility() -> bool: + """Check if Accessibility permission is currently granted.""" + try: + from ApplicationServices import AXIsProcessTrusted + return AXIsProcessTrusted() + except Exception: + return True # assume granted if we can't check + + class HotkeyListener: def __init__( self, @@ -51,6 +83,8 @@ class HotkeyListener: log.debug("PTT keys: %s, Toggle keys: %s", self._ptt_keys, self._toggle_keys) def start(self) -> None: + if not _check_accessibility(): + log.error("Accessibility permission not granted — hotkeys will not work") self._pressed.clear() self._ptt_active = False self._toggle_active = False @@ -75,23 +109,29 @@ class HotkeyListener: def _normalize(self, key) -> keyboard.Key | keyboard.KeyCode: if hasattr(key, "value") and hasattr(key.value, "vk"): vk = key.value.vk - if vk in (0x3B, 0x3E): - return keyboard.Key.ctrl - if vk in (0x38, 0x3C): - return keyboard.Key.shift + normalized = _VK_NORMALIZE.get(vk) + if normalized is not None: + return normalized + # Normalize character keys to lowercase + if isinstance(key, keyboard.KeyCode) and key.char is not None: + return keyboard.KeyCode.from_char(key.char.lower()) return key def _on_press(self, key) -> None: key = self._normalize(key) self._pressed.add(key) + # Check PTT first; if PTT fires, skip toggle to prevent double-trigger if self._ptt_keys.issubset(self._pressed) and not self._ptt_active: self._ptt_active = True self._on_ptt_start() + return if self._toggle_keys.issubset(self._pressed) and not self._toggle_active: - self._toggle_active = True - self._on_toggle() + # Don't fire toggle if PTT is active + if not self._ptt_active: + self._toggle_active = True + self._on_toggle() def _on_release(self, key) -> None: key = self._normalize(key) diff --git a/calliope/overlay.py b/calliope/overlay.py index b54fb13..92af0c4 100644 --- a/calliope/overlay.py +++ b/calliope/overlay.py @@ -109,18 +109,28 @@ class WaveformView(NSView): if not amps: return - step = draw_w / max(len(amps) - 1, 1) + # Draw centered: newest sample at center, older samples outward, mirrored + half_bars = len(amps) + mid_x = w / 2 + step = (draw_w / 2) / max(half_bars - 1, 1) for sign in (1, -1): line = NSBezierPath.bezierPath() line.setLineWidth_(1.5) + # Left half: oldest at left edge, newest at center for i, a in enumerate(amps): - x = padding + i * step + x = mid_x - (half_bars - 1 - i) * step y_off = a * draw_h * sign if i == 0: line.moveToPoint_((x, mid_y + y_off)) else: line.lineToPoint_((x, mid_y + y_off)) + # Right half: mirror (newest at center, oldest at right edge) + for i in range(1, half_bars): + a = amps[half_bars - 1 - i] + x = mid_x + i * step + y_off = a * draw_h * sign + line.lineToPoint_((x, mid_y + y_off)) line.stroke() self._draw_label("calliope recording...") @@ -252,8 +262,17 @@ class WaveformOverlay: """Switch overlay to transcribing state (pulsing dots).""" callAfter(self._show_transcribing_on_main) + def _reposition_panel(self): + """Move the panel to the top-center of the current main screen.""" + screen = NSScreen.mainScreen() + screen_frame = screen.frame() + x = (screen_frame.size.width - WIDTH) / 2 + y = screen_frame.size.height - HEIGHT - 40 + self._panel.setFrameOrigin_(NSMakePoint(x, y)) + def _show_on_main(self): self._ensure_panel() + self._reposition_panel() self._view.stopFade() self._view.mode = OverlayMode.RECORDING self._view.amplitudes = deque([0.0] * NUM_BARS, maxlen=NUM_BARS) @@ -265,6 +284,7 @@ class WaveformOverlay: def _show_transcribing_on_main(self): self._ensure_panel() + self._reposition_panel() self._view.stopFade() self._view.mode = OverlayMode.TRANSCRIBING self._view._pulse_start = time.monotonic() diff --git a/calliope/transcriber.py b/calliope/transcriber.py index 0cda782..309cb36 100644 --- a/calliope/transcriber.py +++ b/calliope/transcriber.py @@ -10,12 +10,23 @@ log = logging.getLogger(__name__) class Transcriber: - def __init__(self, model: str = "distil-whisper/distil-large-v3"): + def __init__(self, model: str = "distil-whisper/distil-large-v3", silence_threshold: float = 0.005): self.model = model self._pipe = None self._tokenizer = None - self.context: str = "" + self._context: str = "" + self._cached_prompt_ids = None self.language: str = "auto" + self.silence_threshold = silence_threshold + + @property + def context(self) -> str: + return self._context + + @context.setter + def context(self, value: str) -> None: + self._context = value + self._cached_prompt_ids = None # invalidate cache def load(self) -> None: from transformers import AutoTokenizer @@ -32,7 +43,12 @@ class Transcriber: device=device, ) self._tokenizer = AutoTokenizer.from_pretrained(self.model) - log.info("Model loaded successfully") + log.info("Model loaded, running warmup...") + self._pipe( + {"raw": np.zeros(16_000, dtype=np.float32), "sampling_rate": 16_000}, + batch_size=1, + ) + log.info("Model ready") except Exception: log.error("Failed to load model %s", self.model, exc_info=True) raise @@ -48,18 +64,18 @@ class Transcriber: duration = audio.size / 16_000 energy = float(np.sqrt(np.mean(audio ** 2))) log.debug("Audio: %.1fs, RMS energy: %.6f", duration, energy) - if duration < 1.0 or energy < 0.005: + if duration < 1.0 or energy < self.silence_threshold: log.debug("Audio too short or too quiet, skipping transcription") return "" generate_kwargs = {} - if self.context: - prompt_ids = self._tokenizer.get_prompt_ids(self.context) - generate_kwargs["prompt_ids"] = prompt_ids + if self._context: + if self._cached_prompt_ids is None: + self._cached_prompt_ids = self._tokenizer.get_prompt_ids(self._context) + generate_kwargs["prompt_ids"] = self._cached_prompt_ids pipe_kwargs = { - "batch_size": 4, - "return_timestamps": True, + "batch_size": 1, "generate_kwargs": generate_kwargs, } if self.language != "auto": diff --git a/calliope/typer.py b/calliope/typer.py index d7e05df..29becdd 100644 --- a/calliope/typer.py +++ b/calliope/typer.py @@ -1,7 +1,6 @@ """Type text into the focused field using Quartz CGEvents.""" import logging -import subprocess import time import Quartz @@ -9,36 +8,51 @@ import Quartz log = logging.getLogger(__name__) -def type_text(text: str) -> None: +def type_text(text: str, delay: float = 0.005) -> None: """Simulate typing text into the currently focused text field.""" for char in text: _type_char(char) - time.sleep(0.005) + time.sleep(delay) def type_text_clipboard(text: str) -> None: """Type text by copying to clipboard and pasting with Cmd+V. - Saves and restores the previous clipboard contents. + Saves and restores the previous clipboard contents, including non-text + data like images and files. """ - # Save current clipboard - try: - prev = subprocess.run( - ["pbpaste"], capture_output=True, text=True, timeout=2, - ).stdout - except Exception: - prev = None + from AppKit import NSPasteboard, NSStringPboardType - # Copy text to clipboard - subprocess.run(["pbcopy"], input=text, text=True, timeout=2) + pb = NSPasteboard.generalPasteboard() - # Paste with Cmd+V + # Save all current pasteboard items + saved_items = [] + for item in pb.pasteboardItems() or []: + item_data = {} + for ptype in item.types(): + data = item.dataForType_(ptype) + if data is not None: + item_data[ptype] = data + if item_data: + saved_items.append(item_data) + + # Set our text and paste + pb.clearContents() + pb.setString_forType_(text, NSStringPboardType) _cmd_v() time.sleep(0.05) - # Restore previous clipboard - if prev is not None: - subprocess.run(["pbcopy"], input=prev, text=True, timeout=2) + # Restore previous clipboard contents + if saved_items: + from AppKit import NSPasteboardItem + pb.clearContents() + new_items = [] + for item_data in saved_items: + item = NSPasteboardItem.alloc().init() + for ptype, data in item_data.items(): + item.setData_forType_(data, ptype) + new_items.append(item) + pb.writeObjects_(new_items) def _cmd_v() -> None: