refactor: fix several memory leaks and improve UX by exposing microphone selection etc.

This commit is contained in:
syntaxbullet
2026-02-17 15:57:14 +01:00
parent 7cbf2d04a9
commit 435c87803b
6 changed files with 261 additions and 75 deletions

View File

@@ -11,6 +11,8 @@ os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")
# Run offline — models are downloaded during setup, no need to hit HuggingFace on every launch. # Run offline — models are downloaded during setup, no need to hit HuggingFace on every launch.
os.environ.setdefault("HF_HUB_OFFLINE", "1") os.environ.setdefault("HF_HUB_OFFLINE", "1")
import subprocess
import rumps import rumps
from calliope import config as config_mod from calliope import config as config_mod
@@ -36,6 +38,7 @@ class CalliopeApp(rumps.App):
self.recorder = Recorder(device=cfg.get("device")) self.recorder = Recorder(device=cfg.get("device"))
self.transcriber = Transcriber( self.transcriber = Transcriber(
model=cfg.get("model", "distil-whisper/distil-large-v3"), model=cfg.get("model", "distil-whisper/distil-large-v3"),
silence_threshold=cfg.get("silence_threshold", 0.005),
) )
self.transcriber.context = cfg.get("context", "") self.transcriber.context = cfg.get("context", "")
self.transcriber.language = cfg.get("language", "auto") self.transcriber.language = cfg.get("language", "auto")
@@ -44,11 +47,15 @@ class CalliopeApp(rumps.App):
self._rec_lock = threading.Lock() self._rec_lock = threading.Lock()
self._rec_start_time: float | None = None self._rec_start_time: float | None = None
self._rec_timer: rumps.Timer | None = None self._rec_timer: rumps.Timer | None = None
self._transcribe_done = threading.Event()
self._transcribe_done.set() # not transcribing initially
self.status_item = rumps.MenuItem("Status: Loading model...") self.status_item = rumps.MenuItem("Status: Loading model...")
self.status_item.set_callback(None) self.status_item.set_callback(None)
self.toggle_item = rumps.MenuItem("Start Recording", callback=self._on_toggle_click) self.toggle_item = rumps.MenuItem("Start Recording", callback=self._on_toggle_click)
self.context_item = rumps.MenuItem("Set Whisper Context...", callback=self._on_set_context) ctx = cfg.get("context", "")
context_label = f"Set Whisper Context... ({ctx[:20]}...)" if ctx else "Set Whisper Context..."
self.context_item = rumps.MenuItem(context_label, callback=self._on_set_context)
# Language submenu # Language submenu
self._lang_menu = rumps.MenuItem("Language") self._lang_menu = rumps.MenuItem("Language")
@@ -67,6 +74,19 @@ class CalliopeApp(rumps.App):
item = rumps.MenuItem(f"{prefix}{short}", callback=self._on_model_select) item = rumps.MenuItem(f"{prefix}{short}", callback=self._on_model_select)
self._model_menu.add(item) self._model_menu.add(item)
# Microphone submenu
self._mic_menu = rumps.MenuItem("Microphone")
self._build_mic_menu()
# Typing mode submenu
self._typing_menu = rumps.MenuItem("Typing Mode")
current_mode = cfg.get("typing_mode", "char")
for mode, label in [("char", "Character (CGEvents)"), ("clipboard", "Clipboard (Cmd+V)")]:
prefix = "\u2713 " if mode == current_mode else " "
item = rumps.MenuItem(f"{prefix}{label}", callback=self._on_typing_mode_select)
item._typing_mode = mode
self._typing_menu.add(item)
quit_item = rumps.MenuItem("Quit Calliope", callback=self._on_quit) quit_item = rumps.MenuItem("Quit Calliope", callback=self._on_quit)
self.menu = [ self.menu = [
@@ -76,6 +96,8 @@ class CalliopeApp(rumps.App):
self.context_item, self.context_item,
self._lang_menu, self._lang_menu,
self._model_menu, self._model_menu,
self._mic_menu,
self._typing_menu,
None, None,
quit_item, quit_item,
] ]
@@ -92,19 +114,34 @@ class CalliopeApp(rumps.App):
# Load model in background # Load model in background
threading.Thread(target=self._load_model, daemon=True).start() threading.Thread(target=self._load_model, daemon=True).start()
def _notify(self, title: str, subtitle: str, message: str) -> None:
if not self.cfg.get("notifications", True):
return
try:
text = f"{subtitle}{message}" if subtitle else message
script = f'display notification "{text}" with title "{title}"'
subprocess.Popen(
["osascript", "-e", script],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except Exception:
pass
def _ready_status(self) -> str:
short = self.transcriber.model.split("/")[-1]
return f"Status: Ready ({short})"
def _load_model(self) -> None: def _load_model(self) -> None:
try: try:
self.transcriber.load() self.transcriber.load()
self.status_item.title = "Status: Ready" self.status_item.title = self._ready_status()
self.hotkeys.start() self.hotkeys.start()
log.info("Model loaded, hotkeys active") log.info("Model loaded, hotkeys active")
except Exception: except Exception:
log.error("Failed to load model", exc_info=True) log.error("Failed to load model", exc_info=True)
self.status_item.title = "Status: Model load failed" self.status_item.title = "Status: Model load failed"
try: self._notify("Calliope", "Error", "Failed to load Whisper model. Check logs.")
rumps.notification("Calliope", "Error", "Failed to load Whisper model. Check logs.")
except RuntimeError:
pass
@staticmethod @staticmethod
def _activate_app(): def _activate_app():
@@ -125,20 +162,23 @@ class CalliopeApp(rumps.App):
response = rumps.Window( response = rumps.Window(
message="Provide context to help Whisper with domain-specific terms, " message="Provide context to help Whisper with domain-specific terms, "
"names, or jargon. For example:\n\n" "names, or jargon. For example:\n\n"
"\"Meeting about Kubernetes, gRPC, and the Istio service mesh.\"", "\"Meeting about Kubernetes, gRPC, and the Istio service mesh.\"\n\n"
"Clear the field and press Save to remove context.",
title="Set Whisper Context", title="Set Whisper Context",
default_text=self.transcriber.context, default_text=self.transcriber.context,
ok="Save", ok="Save",
cancel="Clear", cancel="Cancel",
dimensions=(320, 120), dimensions=(320, 120),
).run() ).run()
if response.clicked == 1: # Save if response.clicked != 1: # Cancel / Escape
self.transcriber.context = response.text.strip() self._deactivate_app()
else: # Clear return
self.transcriber.context = "" self.transcriber.context = response.text.strip()
self._deactivate_app() self._deactivate_app()
ctx = self.transcriber.context ctx = self.transcriber.context
self.context_item.title = f"Set Whisper Context... ({ctx[:20]}...)" if ctx else "Set Whisper Context..." self.context_item.title = f"Set Whisper Context... ({ctx[:20]}...)" if ctx else "Set Whisper Context..."
self.cfg["context"] = ctx
config_mod.save(self.cfg)
def _on_language_select(self, sender) -> None: def _on_language_select(self, sender) -> None:
display_name = sender.title.strip().lstrip("\u2713").strip() display_name = sender.title.strip().lstrip("\u2713").strip()
@@ -170,19 +210,68 @@ class CalliopeApp(rumps.App):
config_mod.save(self.cfg) config_mod.save(self.cfg)
self.status_item.title = "Status: Loading model..." self.status_item.title = "Status: Loading model..."
self.hotkeys.stop() self.hotkeys.stop()
self._release_transcriber()
self.transcriber = Transcriber(model=model_id) def _switch():
self.transcriber.context = self.cfg.get("context", "") self._transcribe_done.wait() # wait for in-flight transcription
self.transcriber.language = self.cfg.get("language", "auto") self._release_transcriber()
threading.Thread(target=self._load_model, daemon=True).start() self.transcriber = Transcriber(
model=model_id,
silence_threshold=self.cfg.get("silence_threshold", 0.005),
)
self.transcriber.context = self.cfg.get("context", "")
self.transcriber.language = self.cfg.get("language", "auto")
self._load_model()
threading.Thread(target=_switch, daemon=True).start()
log.info("Switching model to %s", model_id) log.info("Switching model to %s", model_id)
def _build_mic_menu(self) -> None:
"""Populate the microphone submenu with available input devices."""
import sounddevice as sd
current_device = self.cfg.get("device")
# System default
prefix = "\u2713 " if current_device is None else " "
item = rumps.MenuItem(f"{prefix}System Default", callback=self._on_mic_select)
self._mic_menu.add(item)
# List input devices
for dev in sd.query_devices():
if dev["max_input_channels"] > 0:
idx = dev["index"]
name = dev["name"]
prefix = "\u2713 " if current_device == idx else " "
item = rumps.MenuItem(f"{prefix}{name}", callback=self._on_mic_select)
item._device_index = idx
self._mic_menu.add(item)
def _on_mic_select(self, sender) -> None:
name = sender.title.strip().lstrip("\u2713").strip()
device_index = getattr(sender, "_device_index", None)
self.recorder._device = device_index
self.cfg["device"] = device_index
config_mod.save(self.cfg)
# Update checkmarks
for item in self._mic_menu.values():
item_name = item.title.strip().lstrip("\u2713").strip()
item.title = f"\u2713 {item_name}" if item_name == name else f" {item_name}"
log.info("Microphone set to %s (device=%s)", name, device_index)
def _on_typing_mode_select(self, sender) -> None:
mode = sender._typing_mode
self.cfg["typing_mode"] = mode
config_mod.save(self.cfg)
for item in self._typing_menu.values():
label = item.title.strip().lstrip("\u2713").strip()
item.title = f"\u2713 {label}" if getattr(item, "_typing_mode", None) == mode else f" {label}"
log.info("Typing mode set to %s", mode)
def _release_transcriber(self) -> None: def _release_transcriber(self) -> None:
"""Free the current Whisper model to reclaim GPU memory.""" """Free the current Whisper model to reclaim GPU memory."""
import gc
import torch
if self.transcriber is not None: if self.transcriber is not None:
self.transcriber._pipe = None self.transcriber._pipe = None
self.transcriber._tokenizer = None self.transcriber._tokenizer = None
import torch gc.collect()
if torch.backends.mps.is_available(): if torch.backends.mps.is_available():
torch.mps.empty_cache() torch.mps.empty_cache()
@@ -214,18 +303,12 @@ class CalliopeApp(rumps.App):
self.title = "\U0001f3a4" # 🎤 self.title = "\U0001f3a4" # 🎤
self.toggle_item.title = "Start Recording" self.toggle_item.title = "Start Recording"
self.status_item.title = "Status: Mic error (check device)" self.status_item.title = "Status: Mic error (check device)"
try: self._notify("Calliope", "", "Microphone unavailable — check audio device")
rumps.notification("Calliope", "", "Microphone unavailable — check audio device")
except RuntimeError:
pass
return return
self.overlay.show() self.overlay.show()
self._rec_timer = rumps.Timer(self._update_rec_duration, 1) self._rec_timer = rumps.Timer(self._update_rec_duration, 1)
self._rec_timer.start() self._rec_timer.start()
try: self._notify("Calliope", "", "Recording started")
rumps.notification("Calliope", "", "Recording started")
except RuntimeError:
pass # Info.plist missing CFBundleIdentifier
log.info("Recording started") log.info("Recording started")
def _stop_and_transcribe(self) -> None: def _stop_and_transcribe(self) -> None:
@@ -238,36 +321,45 @@ class CalliopeApp(rumps.App):
self._rec_timer = None self._rec_timer = None
duration = int(time.time() - self._rec_start_time) if self._rec_start_time else 0 duration = int(time.time() - self._rec_start_time) if self._rec_start_time else 0
self._rec_start_time = None self._rec_start_time = None
self.title = "\U0001f3a4" # 🎤 self.title = "\u23f3" #
self.toggle_item.title = "Start Recording" self.toggle_item.title = "Start Recording"
self.status_item.title = "Status: Transcribing..." self.status_item.title = "Status: Transcribing..."
self.overlay.show_transcribing() self.overlay.show_transcribing()
audio = self.recorder.stop() audio = self.recorder.stop()
try: self._notify("Calliope", "", f"Recording stopped ({duration}s)")
rumps.notification("Calliope", "", f"Recording stopped ({duration}s)")
except RuntimeError:
pass
log.info("Recording stopped, %d samples", audio.size) log.info("Recording stopped, %d samples", audio.size)
self._transcribe_done.clear()
threading.Thread(target=self._transcribe_and_type, args=(audio,), daemon=True).start() threading.Thread(target=self._transcribe_and_type, args=(audio,), daemon=True).start()
def _update_rec_duration(self, timer) -> None: def _update_rec_duration(self, timer) -> None:
if self._rec_start_time is None: if self._rec_start_time is None:
return return
elapsed = int(time.time() - self._rec_start_time) elapsed = int(time.time() - self._rec_start_time)
max_dur = self.cfg.get("max_recording_seconds", 300)
if max_dur and elapsed >= max_dur:
log.info("Max recording duration reached (%ds)", max_dur)
self._stop_and_transcribe()
return
minutes, seconds = divmod(elapsed, 60) minutes, seconds = divmod(elapsed, 60)
self.title = f"\U0001f534 {minutes}:{seconds:02d}" self.title = f"\U0001f534 {minutes}:{seconds:02d}"
def _transcribe_and_type(self, audio) -> None: def _transcribe_and_type(self, audio) -> None:
try: try:
text = self.transcriber.transcribe(audio) text = self.transcriber.transcribe(audio)
if not text:
self.overlay.hide()
self.title = "\U0001f3a4" # 🎤
self.status_item.title = self._ready_status()
self._notify("Calliope", "", "No speech detected — audio too short or too quiet")
return
if text: if text:
def _do_type(): def _do_type():
try: try:
if self.cfg.get("typing_mode", "char") == "clipboard": if self.cfg.get("typing_mode", "char") == "clipboard":
type_text_clipboard(text) type_text_clipboard(text)
else: else:
type_text(text) type_text(text, delay=self.cfg.get("typing_delay", 0.005))
print(f"\n[Calliope] {text}") print(f"\n[Calliope] {text}")
log.info("Typed %d chars", len(text)) log.info("Typed %d chars", len(text))
except Exception: except Exception:
@@ -275,15 +367,15 @@ class CalliopeApp(rumps.App):
from PyObjCTools.AppHelper import callAfter from PyObjCTools.AppHelper import callAfter
callAfter(_do_type) callAfter(_do_type)
self.overlay.hide() self.overlay.hide()
self.status_item.title = "Status: Ready" self.status_item.title = self._ready_status()
except Exception: except Exception:
log.error("Transcription failed", exc_info=True) log.error("Transcription failed", exc_info=True)
self.overlay.hide() self.overlay.hide()
self.status_item.title = "Status: Ready" self.status_item.title = self._ready_status()
try: self._notify("Calliope", "Error", "Transcription failed. Check logs.")
rumps.notification("Calliope", "Error", "Transcription failed. Check logs.") finally:
except RuntimeError: self.title = "\U0001f3a4" # 🎤
pass self._transcribe_done.set()
def _on_quit(self, sender) -> None: def _on_quit(self, sender) -> None:
self.hotkeys.stop() self.hotkeys.stop()

View File

@@ -22,6 +22,10 @@ DEFAULTS: dict[str, Any] = {
"context": "", "context": "",
"debug": False, "debug": False,
"typing_mode": "char", # "char" or "clipboard" "typing_mode": "char", # "char" or "clipboard"
"max_recording_seconds": 300, # 5 minutes
"silence_threshold": 0.005, # RMS energy below which audio is considered silence
"notifications": True, # show macOS notifications
"typing_delay": 0.005, # seconds between keystrokes in char mode
} }
LANGUAGES: dict[str, str] = { LANGUAGES: dict[str, str] = {

View File

@@ -14,21 +14,53 @@ _KEY_MAP: dict[str, keyboard.Key] = {
"alt": keyboard.Key.alt, "alt": keyboard.Key.alt,
"cmd": keyboard.Key.cmd, "cmd": keyboard.Key.cmd,
"space": keyboard.Key.space, "space": keyboard.Key.space,
"tab": keyboard.Key.tab,
"esc": keyboard.Key.esc,
"enter": keyboard.Key.enter,
"backspace": keyboard.Key.backspace,
"delete": keyboard.Key.delete,
}
# Add function keys F1-F12
for _i in range(1, 13):
_KEY_MAP[f"f{_i}"] = getattr(keyboard.Key, f"f{_i}")
# Virtual keycodes for left/right modifier normalization (macOS)
_VK_NORMALIZE = {
0x3B: keyboard.Key.ctrl, # left ctrl
0x3E: keyboard.Key.ctrl, # right ctrl
0x38: keyboard.Key.shift, # left shift
0x3C: keyboard.Key.shift, # right shift
0x3A: keyboard.Key.alt, # left alt/option
0x3D: keyboard.Key.alt, # right alt/option
0x37: keyboard.Key.cmd, # left cmd
0x36: keyboard.Key.cmd, # right cmd
} }
def _parse_combo(combo: str) -> set[keyboard.Key]: def _parse_combo(combo: str) -> set:
"""Parse 'ctrl+shift' into a set of pynput keys.""" """Parse 'ctrl+shift' or 'ctrl+r' into a set of pynput keys."""
keys: set[keyboard.Key] = set() keys: set = set()
for part in combo.lower().split("+"): for part in combo.lower().split("+"):
part = part.strip() part = part.strip()
if part in _KEY_MAP: if part in _KEY_MAP:
keys.add(_KEY_MAP[part]) keys.add(_KEY_MAP[part])
elif len(part) == 1:
keys.add(keyboard.KeyCode.from_char(part))
else: else:
log.warning("Unknown key in combo: %s", part) log.warning("Unknown key in combo: %s", part)
return keys return keys
def _check_accessibility() -> bool:
"""Check if Accessibility permission is currently granted."""
try:
from ApplicationServices import AXIsProcessTrusted
return AXIsProcessTrusted()
except Exception:
return True # assume granted if we can't check
class HotkeyListener: class HotkeyListener:
def __init__( def __init__(
self, self,
@@ -51,6 +83,8 @@ class HotkeyListener:
log.debug("PTT keys: %s, Toggle keys: %s", self._ptt_keys, self._toggle_keys) log.debug("PTT keys: %s, Toggle keys: %s", self._ptt_keys, self._toggle_keys)
def start(self) -> None: def start(self) -> None:
if not _check_accessibility():
log.error("Accessibility permission not granted — hotkeys will not work")
self._pressed.clear() self._pressed.clear()
self._ptt_active = False self._ptt_active = False
self._toggle_active = False self._toggle_active = False
@@ -75,23 +109,29 @@ class HotkeyListener:
def _normalize(self, key) -> keyboard.Key | keyboard.KeyCode: def _normalize(self, key) -> keyboard.Key | keyboard.KeyCode:
if hasattr(key, "value") and hasattr(key.value, "vk"): if hasattr(key, "value") and hasattr(key.value, "vk"):
vk = key.value.vk vk = key.value.vk
if vk in (0x3B, 0x3E): normalized = _VK_NORMALIZE.get(vk)
return keyboard.Key.ctrl if normalized is not None:
if vk in (0x38, 0x3C): return normalized
return keyboard.Key.shift # Normalize character keys to lowercase
if isinstance(key, keyboard.KeyCode) and key.char is not None:
return keyboard.KeyCode.from_char(key.char.lower())
return key return key
def _on_press(self, key) -> None: def _on_press(self, key) -> None:
key = self._normalize(key) key = self._normalize(key)
self._pressed.add(key) self._pressed.add(key)
# Check PTT first; if PTT fires, skip toggle to prevent double-trigger
if self._ptt_keys.issubset(self._pressed) and not self._ptt_active: if self._ptt_keys.issubset(self._pressed) and not self._ptt_active:
self._ptt_active = True self._ptt_active = True
self._on_ptt_start() self._on_ptt_start()
return
if self._toggle_keys.issubset(self._pressed) and not self._toggle_active: if self._toggle_keys.issubset(self._pressed) and not self._toggle_active:
self._toggle_active = True # Don't fire toggle if PTT is active
self._on_toggle() if not self._ptt_active:
self._toggle_active = True
self._on_toggle()
def _on_release(self, key) -> None: def _on_release(self, key) -> None:
key = self._normalize(key) key = self._normalize(key)

View File

@@ -109,18 +109,28 @@ class WaveformView(NSView):
if not amps: if not amps:
return return
step = draw_w / max(len(amps) - 1, 1) # Draw centered: newest sample at center, older samples outward, mirrored
half_bars = len(amps)
mid_x = w / 2
step = (draw_w / 2) / max(half_bars - 1, 1)
for sign in (1, -1): for sign in (1, -1):
line = NSBezierPath.bezierPath() line = NSBezierPath.bezierPath()
line.setLineWidth_(1.5) line.setLineWidth_(1.5)
# Left half: oldest at left edge, newest at center
for i, a in enumerate(amps): for i, a in enumerate(amps):
x = padding + i * step x = mid_x - (half_bars - 1 - i) * step
y_off = a * draw_h * sign y_off = a * draw_h * sign
if i == 0: if i == 0:
line.moveToPoint_((x, mid_y + y_off)) line.moveToPoint_((x, mid_y + y_off))
else: else:
line.lineToPoint_((x, mid_y + y_off)) line.lineToPoint_((x, mid_y + y_off))
# Right half: mirror (newest at center, oldest at right edge)
for i in range(1, half_bars):
a = amps[half_bars - 1 - i]
x = mid_x + i * step
y_off = a * draw_h * sign
line.lineToPoint_((x, mid_y + y_off))
line.stroke() line.stroke()
self._draw_label("calliope recording...") self._draw_label("calliope recording...")
@@ -252,8 +262,17 @@ class WaveformOverlay:
"""Switch overlay to transcribing state (pulsing dots).""" """Switch overlay to transcribing state (pulsing dots)."""
callAfter(self._show_transcribing_on_main) callAfter(self._show_transcribing_on_main)
def _reposition_panel(self):
"""Move the panel to the top-center of the current main screen."""
screen = NSScreen.mainScreen()
screen_frame = screen.frame()
x = (screen_frame.size.width - WIDTH) / 2
y = screen_frame.size.height - HEIGHT - 40
self._panel.setFrameOrigin_(NSMakePoint(x, y))
def _show_on_main(self): def _show_on_main(self):
self._ensure_panel() self._ensure_panel()
self._reposition_panel()
self._view.stopFade() self._view.stopFade()
self._view.mode = OverlayMode.RECORDING self._view.mode = OverlayMode.RECORDING
self._view.amplitudes = deque([0.0] * NUM_BARS, maxlen=NUM_BARS) self._view.amplitudes = deque([0.0] * NUM_BARS, maxlen=NUM_BARS)
@@ -265,6 +284,7 @@ class WaveformOverlay:
def _show_transcribing_on_main(self): def _show_transcribing_on_main(self):
self._ensure_panel() self._ensure_panel()
self._reposition_panel()
self._view.stopFade() self._view.stopFade()
self._view.mode = OverlayMode.TRANSCRIBING self._view.mode = OverlayMode.TRANSCRIBING
self._view._pulse_start = time.monotonic() self._view._pulse_start = time.monotonic()

View File

@@ -10,12 +10,23 @@ log = logging.getLogger(__name__)
class Transcriber: class Transcriber:
def __init__(self, model: str = "distil-whisper/distil-large-v3"): def __init__(self, model: str = "distil-whisper/distil-large-v3", silence_threshold: float = 0.005):
self.model = model self.model = model
self._pipe = None self._pipe = None
self._tokenizer = None self._tokenizer = None
self.context: str = "" self._context: str = ""
self._cached_prompt_ids = None
self.language: str = "auto" self.language: str = "auto"
self.silence_threshold = silence_threshold
@property
def context(self) -> str:
return self._context
@context.setter
def context(self, value: str) -> None:
self._context = value
self._cached_prompt_ids = None # invalidate cache
def load(self) -> None: def load(self) -> None:
from transformers import AutoTokenizer from transformers import AutoTokenizer
@@ -32,7 +43,12 @@ class Transcriber:
device=device, device=device,
) )
self._tokenizer = AutoTokenizer.from_pretrained(self.model) self._tokenizer = AutoTokenizer.from_pretrained(self.model)
log.info("Model loaded successfully") log.info("Model loaded, running warmup...")
self._pipe(
{"raw": np.zeros(16_000, dtype=np.float32), "sampling_rate": 16_000},
batch_size=1,
)
log.info("Model ready")
except Exception: except Exception:
log.error("Failed to load model %s", self.model, exc_info=True) log.error("Failed to load model %s", self.model, exc_info=True)
raise raise
@@ -48,18 +64,18 @@ class Transcriber:
duration = audio.size / 16_000 duration = audio.size / 16_000
energy = float(np.sqrt(np.mean(audio ** 2))) energy = float(np.sqrt(np.mean(audio ** 2)))
log.debug("Audio: %.1fs, RMS energy: %.6f", duration, energy) log.debug("Audio: %.1fs, RMS energy: %.6f", duration, energy)
if duration < 1.0 or energy < 0.005: if duration < 1.0 or energy < self.silence_threshold:
log.debug("Audio too short or too quiet, skipping transcription") log.debug("Audio too short or too quiet, skipping transcription")
return "" return ""
generate_kwargs = {} generate_kwargs = {}
if self.context: if self._context:
prompt_ids = self._tokenizer.get_prompt_ids(self.context) if self._cached_prompt_ids is None:
generate_kwargs["prompt_ids"] = prompt_ids self._cached_prompt_ids = self._tokenizer.get_prompt_ids(self._context)
generate_kwargs["prompt_ids"] = self._cached_prompt_ids
pipe_kwargs = { pipe_kwargs = {
"batch_size": 4, "batch_size": 1,
"return_timestamps": True,
"generate_kwargs": generate_kwargs, "generate_kwargs": generate_kwargs,
} }
if self.language != "auto": if self.language != "auto":

View File

@@ -1,7 +1,6 @@
"""Type text into the focused field using Quartz CGEvents.""" """Type text into the focused field using Quartz CGEvents."""
import logging import logging
import subprocess
import time import time
import Quartz import Quartz
@@ -9,36 +8,51 @@ import Quartz
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def type_text(text: str) -> None: def type_text(text: str, delay: float = 0.005) -> None:
"""Simulate typing text into the currently focused text field.""" """Simulate typing text into the currently focused text field."""
for char in text: for char in text:
_type_char(char) _type_char(char)
time.sleep(0.005) time.sleep(delay)
def type_text_clipboard(text: str) -> None: def type_text_clipboard(text: str) -> None:
"""Type text by copying to clipboard and pasting with Cmd+V. """Type text by copying to clipboard and pasting with Cmd+V.
Saves and restores the previous clipboard contents. Saves and restores the previous clipboard contents, including non-text
data like images and files.
""" """
# Save current clipboard from AppKit import NSPasteboard, NSStringPboardType
try:
prev = subprocess.run(
["pbpaste"], capture_output=True, text=True, timeout=2,
).stdout
except Exception:
prev = None
# Copy text to clipboard pb = NSPasteboard.generalPasteboard()
subprocess.run(["pbcopy"], input=text, text=True, timeout=2)
# Paste with Cmd+V # Save all current pasteboard items
saved_items = []
for item in pb.pasteboardItems() or []:
item_data = {}
for ptype in item.types():
data = item.dataForType_(ptype)
if data is not None:
item_data[ptype] = data
if item_data:
saved_items.append(item_data)
# Set our text and paste
pb.clearContents()
pb.setString_forType_(text, NSStringPboardType)
_cmd_v() _cmd_v()
time.sleep(0.05) time.sleep(0.05)
# Restore previous clipboard # Restore previous clipboard contents
if prev is not None: if saved_items:
subprocess.run(["pbcopy"], input=prev, text=True, timeout=2) from AppKit import NSPasteboardItem
pb.clearContents()
new_items = []
for item_data in saved_items:
item = NSPasteboardItem.alloc().init()
for ptype, data in item_data.items():
item.setData_forType_(data, ptype)
new_items.append(item)
pb.writeObjects_(new_items)
def _cmd_v() -> None: def _cmd_v() -> None: