Source code for octopus_sensing.devices.audio_streaming

# This file is part of Octopus Sensing <https://octopus-sensing.nastaran-saffar.me/>
# Copyright © Nastaran Saffaryazdi 2020
#
# Octopus Sensing is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
#  either version 3 of the License, or (at your option) any later version.
#
# Octopus Sensing is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with Octopus Sensing.
# If not, see <https://www.gnu.org/licenses/>.

import os
import array
from typing import List, Any, Dict
import datetime
import csv
import miniaudio

from octopus_sensing.devices.realtime_data_device import RealtimeDataDevice
from octopus_sensing.common.message_creators import MessageType
from octopus_sensing.devices.common import SavingModeEnum

[docs]class AudioStreaming(RealtimeDataDevice): ''' Stream and Record audio Attributes ---------- Parameters ---------- device_id: int The audio recorder ID. If there is several audio recorder in the system name: str, optional device name This name will be used in the output path to identify each device's data output_path: str, optional The path for recording files. Audio files will be recorded in folder {output_path}/{name} Example ------- If you want to know what is your audio recorder's ID run the following example from `miniaudio <https://github.com/irmen/pyminiaudio>`_ >>> import miniaudio >>> devices = miniaudio.Devices() >>> captures = devices.get_captures() >>> for d in enumerate(captures): print("{num} = {name}".format(num=d[0], name=d[1]['name'])) Example ----------- Creating an instance of audio recorder and adding it to the device coordinator. Device coordinator is responsible for triggerng the audio recorder to start or stop recording >>> audio_recorder = AudioStreaming(1, ... name="Audio_monitoring", ... output_path="./output") >>> device_coordinator.add_device(audio_recorder) See Also ----------- :class:`octopus_sensing.device_coordinator` :class:`octopus_sensing.devices.Device` :class:`octopus_sensing.devices.RealtimeDataDevice` ''' def __init__(self, device_id:int, saving_mode: int=SavingModeEnum.SEPARATED_SAVING_MODE, **kwargs): super().__init__(**kwargs) self.output_path = os.path.join(self.output_path, self.name) os.makedirs(self.output_path, exist_ok=True) self._device_id = device_id self._saving_mode = saving_mode self._stream_data: List[bytes] = [] self._record = False self._terminate = False self._state = "" self._log: List[str] = [] self._continuous_capture = False self._sampling_rate = 44100 def __stream_loop(self): _ = yield while True: data = yield self._stream_data.append(data) def _run(self): devices = miniaudio.Devices() captures = devices.get_captures() selected_device = captures[self._device_id] capture = \ miniaudio.CaptureDevice(buffersize_msec=1000, sample_rate=self._sampling_rate, device_id=selected_device["id"]) recorder = self.__stream_loop() next(recorder) while True: message = self.message_queue.get() if message is None: continue if message.type == MessageType.START: if self._state == "START": print("Audio streaming has already started") else: if self._saving_mode == SavingModeEnum.SEPARATED_SAVING_MODE: self._stream_data = [] capture.start(recorder) self._record = True else: if self._continuous_capture is False: capture.start(recorder) self._continuous_capture = True self._record = True self._log.append([datetime.datetime.now(), str(message.stimulus_id).zfill(2), 'MESSAGE START']) self._state = "START" elif message.type == MessageType.STOP: if self._state == "STOP": print("Audio streaming has already stopped") else: if self._saving_mode == SavingModeEnum.SEPARATED_SAVING_MODE: capture.stop() self._record = False file_name = \ "{0}/{1}-{2}-{3}.wav".format(self.output_path, self.name, message.experiment_id, str(message.stimulus_id).zfill(2)) self._save_to_file(file_name, capture) else: self._log.append([datetime.datetime.now(), str(message.stimulus_id).zfill(2), 'MESSAGE STOP']) self._experiment_id = message.experiment_id self._state = "STOP" elif message.type == MessageType.TERMINATE: self._terminate = True if self._saving_mode == SavingModeEnum.CONTINIOUS_SAVING_MODE: self._log.append([datetime.datetime.now(), "-", 'MESSAGE TERMINATE']) capture.stop() self._record = False file_name = \ "{0}/{1}-{2}.wav".format(self.output_path, self.name, self._experiment_id) self._save_to_file(file_name, capture) self._save_log_file(f"{self.output_path}/{self.name}-{self._experiment_id}-log.csv") break def _save_to_file(self, file_name:str, capture:miniaudio.CaptureDevice): buffer = b"".join(self._stream_data) samples = array.array('h') samples.frombytes(buffer) sound = miniaudio.DecodedSoundFile('capture', capture.nchannels, capture.sample_rate, capture.format, samples) miniaudio.wav_write_file(file_name, sound) def _save_log_file(self, file_name:str): with open(file_name, 'a') as csv_file: writer = csv.writer(csv_file) for row in self._log: writer.writerow(row) csv_file.flush()
[docs] def get_saving_mode(self): ''' Gets saving mode Returns ----------- saving_mode: int The way of saving data: saving continiously in a file or save data related to each stimulus in a separate file. SavingModeEnum is CONTINIOUS_SAVING_MODE = 0 or SEPARATED_SAVING_MODE = 1 ''' return self._saving_mode
def _get_realtime_data(self, duration: int) -> Dict[str, Any]: ''' Returns n seconds (duration) of latest collected data for monitoring/visualizing or realtime processing purposes. Parameters ---------- duration: int A time duration in seconds for getting the latest recorded data in realtime Returns ------- data: Dict[str, Any] The keys are `data` and `metadata`. `data` is a list of records, or empty list if there's nothing. `metadata` is a dictionary of device metadata including `sampling_rate` and `type` ''' data = self._stream_data[-1 * duration * self._sampling_rate:] metadata = {"sampling_rate": self._sampling_rate, "type": self.__class__.__name__} realtime_data = {"data": data, "metadata": metadata} return realtime_data