Backend

The backend part is comprising:

  • A dynamic reader implemented with the linecache library to avoid loading the entire corpus in memory when getting a new line of text.

  • A cryptography module using cryptography.fernet to encrypt and decrypt the participant identity, necessary to assert the right to oblivion.

  • A ssh client built with paramiko to send instructions to the spatialization sphere when playing the sound, changing tracks, and locating the reading head with jack_transport and ladish_control bash commands.

  • A timer with start, pause, resume and reset methods.

  • A non-bocking streaming recorder implemented with sounddevice, soundfile and queue libraries.

# -*- coding: utf-8 -*-
import os
import sys
import time
import queue
import threading
import numpy # Make sure NumPy is loaded before it is used in the callback
import soundfile as sf
import sounddevice as sd
assert numpy # avoid "imported but unused" message (W0611)
class Recorder:
"""
Non-blocking and Multi-channel compatible audio recorder
inspired from :
https://github.com/spatialaudio/python-sounddevice/blob/0.4.5/examples/rec_unlimited.py
"""
def __init__(self, samplerate, channels):
# audio parameters
self.samplerate = samplerate
self.channels = channels
# private attributes
self._queue = queue.Queue()
self._recording = False
self._thread = None
def start_stream(self, filename):
"""Start recording audio stream in 'filename'.rf64"""
filename = f'{filename}.rf64'
if os.path.exists(filename):
os.remove(filename)
def record_stream():
# record from the default input audio
with sf.SoundFile(filename, mode='x', samplerate=self.samplerate, channels=self.channels, subtype=None) as file:
with sd.InputStream(samplerate=self.samplerate, device=None, channels=self.channels, callback=self._fill_queue):
while self._recording:
file.write(self._queue.get())
self._recording = True
self._thread = threading.Thread(target=record_stream, daemon=False)
self._thread.start()
def _fill_queue(self, indata, frames, time, status):
"""This is called (from a separate thread) for each audio block."""
if status:
print(status, file=sys.stderr)
self._queue.put(indata.copy())
def stop_stream(self):
"""Stop recording audio stream"""
self._recording = False
if self._thread is not None:
self._thread.join()
if __name__ == '__main__':
recorder = Recorder(samplerate=48_000, channels=1)
recorder.start_stream(filename='demo')
time.sleep(3.0)
recorder.stop_stream()
view raw recorder.py hosted with ❤ by GitHub