Backend
The backend part is comprising:
-
A dynamic reader implemented with the linecache library to avoid loading the entire corpus in memory when getting a new line of text.
-
A cryptography module using cryptography.fernet to encrypt and decrypt the participant identity, necessary to assert the right to oblivion.
-
A ssh client built with paramiko to send instructions to the spatialization sphere when playing the sound, changing tracks, and locating the reading head with jack_transport and ladish_control bash commands.
-
A timer with start, pause, resume and reset methods.
-
A non-bocking streaming recorder implemented with sounddevice, soundfile and queue libraries.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import os | |
import sys | |
import time | |
import queue | |
import threading | |
import numpy # Make sure NumPy is loaded before it is used in the callback | |
import soundfile as sf | |
import sounddevice as sd | |
assert numpy # avoid "imported but unused" message (W0611) | |
class Recorder: | |
""" | |
Non-blocking and Multi-channel compatible audio recorder | |
inspired from : | |
https://github.com/spatialaudio/python-sounddevice/blob/0.4.5/examples/rec_unlimited.py | |
""" | |
def __init__(self, samplerate, channels): | |
# audio parameters | |
self.samplerate = samplerate | |
self.channels = channels | |
# private attributes | |
self._queue = queue.Queue() | |
self._recording = False | |
self._thread = None | |
def start_stream(self, filename): | |
"""Start recording audio stream in 'filename'.rf64""" | |
filename = f'{filename}.rf64' | |
if os.path.exists(filename): | |
os.remove(filename) | |
def record_stream(): | |
# record from the default input audio | |
with sf.SoundFile(filename, mode='x', samplerate=self.samplerate, channels=self.channels, subtype=None) as file: | |
with sd.InputStream(samplerate=self.samplerate, device=None, channels=self.channels, callback=self._fill_queue): | |
while self._recording: | |
file.write(self._queue.get()) | |
self._recording = True | |
self._thread = threading.Thread(target=record_stream, daemon=False) | |
self._thread.start() | |
def _fill_queue(self, indata, frames, time, status): | |
"""This is called (from a separate thread) for each audio block.""" | |
if status: | |
print(status, file=sys.stderr) | |
self._queue.put(indata.copy()) | |
def stop_stream(self): | |
"""Stop recording audio stream""" | |
self._recording = False | |
if self._thread is not None: | |
self._thread.join() | |
if __name__ == '__main__': | |
recorder = Recorder(samplerate=48_000, channels=1) | |
recorder.start_stream(filename='demo') | |
time.sleep(3.0) | |
recorder.stop_stream() |