Source code for octopus_sensing.devices.brainflow_streaming

# This file is part of Octopus Sensing <https://octopus-sensing.nastaran-saffar.me/>
# Copyright © Nastaran Saffaryazdi 2020
#
# Octopus Sensing is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
#  either version 3 of the License, or (at your option) any later version.
#
# Octopus Sensing is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with Octopus Sensing.
# If not, see <https://www.gnu.org/licenses/>.

from datetime import datetime
import time
import os
import threading
import csv
import numpy as np
from typing import List, Optional, Dict, Any
from brainflow.board_shim import BoardShim, BrainFlowInputParams

from octopus_sensing.common.message_creators import MessageType
from octopus_sensing.devices.realtime_data_device import RealtimeDataDevice
from octopus_sensing.devices.common import SavingModeEnum


[docs]class BrainFlowStreaming(RealtimeDataDevice): ''' Manage brainflow streaming Attributes ---------- device_id Device ID. Brainflow support a list of devices, to see supported device IDs go to: https://brainflow.readthedocs.io/en/stable/SupportedBoards.html sampling_rate the sampling rate for recording data brain_flow_input_params Each supported board in brainflow gets some parameters, to see the list of parameters for each board go to: https://brainflow.readthedocs.io/en/stable/SupportedBoards.html name device name This name will be used in the output path to identify each device's data output_path The path for recording files. Audio files will be recorded in folder {output_path}/{name} saving_mode The way of saving data. It saves data continiously in a file or saves data which are related to various stimulus in separate files. default is SavingModeEnum.CONTINIOUS_SAVING_MODE SavingModeEnum is [CONTINIOUS_SAVING_MODE, SEPARATED_SAVING_MODE] ** kwargs: Extra optional arguments according to the board type See Also ----------- :class:`octopus_sensing.device_coordinator` :class:`octopus_sensing.devices.device` Examples --------- Here is an example of using brainflow for reading cyton_daisy board data >>> params = BrainFlowInputParams() >>> params.serial_port = "/dev/ttyUSB0" >>> my_brainflow = ... BrainFlowStreaming(2, ... 125, ... brain_flow_input_params=params, ... name="cyton_daisy", ... output_path="./output", ... saving_mode=SavingModeEnum.CONTINIOUS_SAVING_MODE) ''' def __init__(self, device_id: int, sampling_rate: int, brain_flow_input_params: BrainFlowInputParams, saving_mode: int=SavingModeEnum.CONTINIOUS_SAVING_MODE, name: Optional[str] = None, output_path: str = "output"): super().__init__(name=name, output_path=output_path) self._saving_mode = saving_mode self._stream_data: List[float] = [] self.sampling_rate = sampling_rate self._board = None self._device_id = device_id self._brain_flow_input_params = brain_flow_input_params self._terminate = False self._trigger = None self._experiment_id = None self.__loop_thread: Optional[threading.Thread] = None self.output_path = os.path.join(self.output_path, self.name) os.makedirs(self.output_path, exist_ok=True) self._state = "" def _run(self): self._board = BoardShim(self._device_id, self._brain_flow_input_params) self._board.set_log_level(0) self._board.prepare_session() self.__loop_thread = threading.Thread(target=self._stream_loop) self.__loop_thread.start() while True: message = self.message_queue.get() if not self.__loop_thread.is_alive(): print("Brainflow streaming: The streaming thread is dead. Terminating.") break if message is None: continue if message.type == MessageType.START: if self._state == "START": print("Brainflow streaming has already recorded the START triger") else: print("Brainflow start") self.__set_trigger(message) self._experiment_id = message.experiment_id self._state = "START" elif message.type == MessageType.STOP: if self._state == "STOP": print("Brainflow streaming has already recorded the STOP triger") else: print("Brainflow stop") if self._saving_mode == SavingModeEnum.SEPARATED_SAVING_MODE: self._experiment_id = message.experiment_id file_name = \ "{0}/{1}-{2}-{3}.csv".format(self.output_path, self.name, self._experiment_id, message.stimulus_id) self._save_to_file(file_name) self._stream_data = [] else: self._experiment_id = message.experiment_id self.__set_trigger(message) self._state = "STOP" elif message.type == MessageType.TERMINATE: self._terminate = True if self._saving_mode == SavingModeEnum.CONTINIOUS_SAVING_MODE: file_name = \ "{0}/{1}-{2}.csv".format(self.output_path, self.name, self._experiment_id) self._save_to_file(file_name) break self._board.stop_stream() self._board.release_session() self.__loop_thread.join() def _stream_loop(self): self._board.start_stream() while True: if self._terminate is True: break data = self._board.get_board_data() if np.array(data).shape[1] != 0: self._stream_data.extend(list(np.transpose(data))) last_record = self._stream_data.pop() last_record = list(last_record) now = str(datetime.now().time()) last_record.append(now) last_record.append(time.time()) if self._trigger is not None: last_record.append(self._trigger) self._trigger = None self._stream_data.append(last_record) else: time.sleep(0.1) # print("brainflow: didn't read any data") def __set_trigger(self, message): ''' Takes a message and set the trigger using its data Parameters ---------- message: Message a message object ''' self._trigger = \ "{0}-{1}-{2}".format(message.type, message.experiment_id, str(message.stimulus_id).zfill(2)) def _save_to_file(self, file_name): with open(file_name, 'a') as csv_file: writer = csv.writer(csv_file) for row in self._stream_data: writer.writerow(row) csv_file.flush()
[docs] def get_channels(self): ''' Gets the list of channels Returns ------- channels_name: List[str] The list of channels' name ''' raise NotImplementedError()
def _get_realtime_data(self, duration: int) -> Dict[str, Any]: ''' Returns n seconds (duration) of latest collected data for monitoring/visualizing or realtime processing purposes. Parameters ---------- duration: int A time duration in seconds for getting the latest recorded data in realtime Returns ------- data: Dict[str, Any] The keys are `data` and `metadata`. `data` is a list of records, or empty list if there's nothing. `metadata` is a dictionary of device metadata including `sampling_rate` and `channels` and `type` ''' # Last seconds of data data = self._stream_data[-1 * duration * self.sampling_rate:] metadata = {"sampling_rate": self.sampling_rate, "channels": self.get_channels(), "type": self.__class__.__name__} realtime_data = {"data": data, "metadata": metadata} return realtime_data