openWakeWord-rhasspy/detect.py
Dale 4222295766 Detect wake words from multiple audio streams
Allows running Open Wake Word centrally (on base) and all Rhasspy satellites stream audio to the base.
2023-04-27 17:25:20 +02:00

167 lines
5.3 KiB
Python

"""
Listen on UDP for audio from Rhasspy, detect wake words using Open Wake Word,
and then publish on MQTT when wake word is detected to trigger Rhasspy speech-to-text.
"""
import argparse
import io
import queue
import socket
import threading
import time
import wave
from json import dumps
import numpy as np
import paho.mqtt.client
import yaml
from openwakeword.model import Model
RHASSPY_BYTES = 2092
RHASSPY_FRAMES = 1024
CHUNK = 1280 # 80 ms window @ 16 kHz = 1280 frames
OWW_FRAMES = CHUNK * 3 # Increase efficiency of detection but higher latency
parser = argparse.ArgumentParser(description="Open Wake Word detection for Rhasspy")
parser.add_argument(
"-c",
"--config",
default="config.yaml",
help="Configuration yaml file, defaults to `config.yaml`",
dest="config_file",
)
args = parser.parse_args()
def load_config(config_file):
"""Use config.yaml to override the default configuration."""
try:
with open(config_file, "r") as f:
config_override = yaml.safe_load(f)
except FileNotFoundError:
config_override = {}
default_config = {
"mqtt": {
"broker": "127.0.0.1",
"port": 1883,
"username": None,
"password": None,
},
"oww": {
"activation_threshold": 0.5,
"vad_threshold": 0,
"enable_speex_noise_suppression": False,
"activation_ratelimit": 5,
},
"udp_ports": {"base": 12202},
}
config = {**default_config, **config_override}
return config
class RhasspyUdpAudio(threading.Thread):
"""Get audio from UDP stream and add to wake word detection queue."""
def __init__(self, roomname, port, queue):
threading.Thread.__init__(self)
self._roomname = roomname
self._port = port
self._queue = queue
self._buffer = []
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.bind(("", port))
def run(self):
"""Thread to receive UDP audio and add to processing queue."""
print(
f"Listening for {self._roomname} audio on UDP port {self._port}", flush=True
)
while True:
data, addr = self._sock.recvfrom(RHASSPY_BYTES)
audio = wave.open(io.BytesIO(data))
frames = audio.readframes(RHASSPY_FRAMES)
self._buffer.extend(np.frombuffer(frames, dtype=np.int16))
if len(self._buffer) > OWW_FRAMES:
self._queue.put(
{
"roomname": self._roomname,
"timestamp": time.time(),
"audio": np.asarray(self._buffer[:OWW_FRAMES], dtype=np.int16),
}
)
self._buffer = self._buffer[OWW_FRAMES:]
class Prediction(threading.Thread):
"""Process wake word detection queue and publishing MQTT message when a wake word is detected."""
def __init__(self, config, queue):
threading.Thread.__init__(self)
self.config = config
self.queue = queue
self.published = 0
self.mqtt = paho.mqtt.client.Client()
self.mqtt.username_pw_set(
config["mqtt"]["username"], config["mqtt"]["password"]
)
self.mqtt.connect(config["mqtt"]["broker"], config["mqtt"]["port"], 60)
print("Connected to MQTT broker", flush=True)
self.oww = Model(
vad_threshold=config["oww"]["vad_threshold"],
enable_speex_noise_suppression=config["oww"][
"enable_speex_noise_suppression"
],
)
def run(self):
"""Wake word detection thread."""
while True:
roomname, timestamp, audio = self.queue.get()
prediction = self.oww.predict(audio)
for model_name in prediction.keys():
prediction_level = prediction[model_name]
if prediction_level >= self.config["oww"]["activation_threshold"]:
delta = time.time() - self.published
print(
f"{roomname} {model_name} {prediction_level:.3f} {delta:.3f}",
flush=True,
)
if delta > self.config["oww"]["activation_ratelimit"]:
self.__publish(model_name, roomname)
self.published = time.time()
def __publish(self, model_name, roomname):
"""Publish wake word message to Rhasspy Hermes/MQTT."""
payload = {
"modelId": model_name,
"modelVersion": "",
"modelType": "universal",
"currentSensitivity": self.config["oww"]["activation_threshold"],
"siteId": roomname,
"sessionId": None,
"sendAudioCaptured": None,
"lang": None,
"customEntities": None,
}
self.mqtt.publish(f"hermes/hotword/{model_name}/detected", dumps(payload))
print("Sent wakeword to Rhasspy", flush=True)
if __name__ == "__main__":
config = load_config(args.config_file)
q = queue.Queue()
threads = []
for roomname, port in config["udp_ports"].items():
t = RhasspyUdpAudio(roomname, port, q)
t.daemon = True
t.start()
threads.append(t)
t = Prediction(config, q)
t.deamon = True
t.start()
threads.append(t)
print(f"Threads: {threads}")