Source code for octopus_sensing.devices.tobiiglasses_streaming

# This file is part of Octopus Sensing <https://octopus-sensing.nastaran-saffar.me/>
# Copyright © Nastaran Saffaryazdi 2020-2026
#
# Octopus Sensing is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
#  either version 3 of the License, or (at your option) any later version.
#
# Octopus Sensing is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with Octopus Sensing.
# If not, see <https://www.gnu.org/licenses/>.

import time
import os
import threading
import csv
import numpy as np
from typing import List, Optional, Dict, Any

from octopus_sensing.common.message_creators import MessageType
from octopus_sensing.devices.realtime_data_device import RealtimeDataDevice
from octopus_sensing.devices.common import SavingModeEnum

from libtobiiglassesctrl import TobiiGlassesController


[docs]class TobiiGlassesStreaming(RealtimeDataDevice): ''' Manage Tobii Glasses streaming Attributes ---------- device_ip Device IP address. (e.g.192.168.71.50) sampling_rate the sampling rate for recording data name device name This name will be used in the output path to identify each device's data output_path The path for recording files. Recorded files will be recorded in folder {output_path}/{name} saving_mode The way of saving data. It saves data continiously in a file or saves data which are related to various stimulus in separate files. default is SavingModeEnum.CONTINIOUS_SAVING_MODE SavingModeEnum is [CONTINIOUS_SAVING_MODE, SEPARATED_SAVING_MODE] Notes ----- This class is used for reading data from Tobii Pro Glasses 2. It uses the `libtobiiglassesctrl` library to connect to the device and read data. The output file is in CSV format and its columns are as follows: [ac_ts, ac_x, ac_y, ac_z, gy_ts, gy_x, gy_y, gy_z, right_eye_pc_ts, right_eye_pc_x, right_eye_pc_y, right_eye_pc_z, right_eye_pd_ts, right_eye_pd, right_eye_gd_ts, right_eye_gd_x, right_eye_gd_y, right_eye_gd_z, left_eye_pc_ts, left_eye_pc_x, left_eye_pc_y, left_eye_pc_z, left_eye_pd_ts, left_eye_pd, left_eye_gd_ts, left_eye_gd_x, left_eye_gd_y, left_eye_gd_z, gp_ts, gp_l, gp_x, gp_y, gp3_ts, gp3_x, gp3_y, gp3_z, timestamp, trigger] ts: timestamp for each sensor ac: accelometer data gy: gyroscope data pc: pupil center pd: pupil diameter gd: gaze direction gp: gaze position gp3: 3D gaze position timestamp: the time when the data is recorded The `trigger` column is used to identify the stimuli marker See Also ----------- :class:`octopus_sensing.device_coordinator` :class:`octopus_sensing.devices.device` Examples --------- Here is an example of using Tobii pro glasses 2 for reading data >>> params.serial_port = "/dev/ttyUSB0" >>> my_tobiiglasses = ... TobiGlassesStreaming("192.168.71.50", ... 50, ... name="tobii_glasses", ... output_path="./output", ... saving_mode=SavingModeEnum.CONTINIOUS_SAVING_MODE) ''' def __init__(self, device_ip: str = "192.168.71.50", sampling_rate: int = 50, saving_mode: int=SavingModeEnum.CONTINIOUS_SAVING_MODE, name: Optional[str] = None, output_path: str = "output"): super().__init__(name=name, output_path=output_path) self._saving_mode = saving_mode self._stream_data: List[float] = [] self.sampling_rate = sampling_rate self._board = None self._device_ip = device_ip self._terminate = False self._trigger = None self._experiment_id = None self.__loop_thread: Optional[threading.Thread] = None self._controller = None self.output_path = os.path.join(self.output_path, self.name) os.makedirs(self.output_path, exist_ok=True) self._state = "" def _run(self): self._controller = TobiiGlassesController(self._device_ip) print("TobiiGlasses streaming: Connecting to the device...") print(self._controller.get_battery_status()) self.__loop_thread = threading.Thread(target=self._stream_loop) self.__loop_thread.start() while True: message = self.message_queue.get() if not self.__loop_thread.is_alive(): print("TobiiGlasses streaming: The streaming thread is dead. Terminating.") break if message is None: continue if message.type == MessageType.START: if self._state == "START": print("TobiiGlasses streaming has already recorded the START triger") else: print("TobiiGlasses start") self.__set_trigger(message) self._experiment_id = message.experiment_id self._state = "START" elif message.type == MessageType.STOP: if self._state == "STOP": print("Tobii glasses streaming has already recorded the STOP triger") else: print("Tobii glasses stop") if self._saving_mode == SavingModeEnum.SEPARATED_SAVING_MODE: self._experiment_id = message.experiment_id file_name = \ "{0}/{1}-{2}-{3}.csv".format(self.output_path, self.name, self._experiment_id, message.stimulus_id) self._save_to_file(file_name) self._stream_data = [] else: self._experiment_id = message.experiment_id self.__set_trigger(message) self._state = "STOP" elif message.type == MessageType.SAVE: if self._saving_mode == SavingModeEnum.CONTINIOUS_SAVING_MODE: self._experiment_id = message.experiment_id file_name = \ "{0}/{1}-{2}.csv".format(self.output_path, self.name, self._experiment_id) self._save_to_file(file_name) self._stream_data = [] elif message.type == MessageType.TERMINATE: self._terminate = True if self._saving_mode == SavingModeEnum.CONTINIOUS_SAVING_MODE: file_name = \ "{0}/{1}-{2}.csv".format(self.output_path, self.name, self._experiment_id) self._save_to_file(file_name) break self._controller.stop_streaming() self._controller.close() self.__loop_thread.join() def _stream_loop(self): self._controller.start_streaming() while True: if self._terminate is True: break data = self.__safe_get(self._controller.get_data()) if self._trigger is not None: data.append(self._trigger) self._trigger = None self._stream_data.append(np.array(data)) time.sleep(1/self.sampling_rate) def __set_trigger(self, message): ''' Takes a message and set the trigger using its data Parameters ---------- message: Message a message object ''' self._trigger = \ "{0}-{1}-{2}".format(message.type, message.experiment_id, str(message.stimulus_id).zfill(2)) def _save_to_file(self, file_name): print("Saving {0} to file {1}".format(self._name, file_name)) header = ["ac_ts", "ac_x", "ac_y", "ac_z", "gy_ts", "gy_x", "gy_y", "gy_z", "left_eye_pc_ts", "left_eye_pc_x", "left_eye_pc_y", "left_eye_pc_z", "left_eye_pd_ts", "left_eye_pd", "left_eye_gd_ts", "left_eye_gd_x", "left_eye_gd_y", "left_eye_gd_z", "right_eye_pc_ts", "right_eye_pc_x", "right_eye_pc_y", "right_eye_pc_z", "right_eye_pd_ts", "right_eye_pd", "right_eye_gd_ts", "right_eye_gd_x", "right_eye_gd_y", "right_eye_gd_z", "gp_ts", "gp_l", "gp_x", "gp_y", "gp3_ts", "gp3_x", "gp3_y", "gp3_z", "timestamp", "trigger"] with open(file_name, 'a') as csv_file: writer = csv.writer(csv_file) if os.stat(file_name).st_size == 0: print("TobiiGlassesStreaming: file created") writer.writerow(header) csv_file.flush() print("TobiiGlassesStreaming: file already exists, appending data") for row in self._stream_data: writer.writerow(row) csv_file.flush() print("Saving {0} to file {1} is done".format(self._name, file_name)) def _get_realtime_data(self, duration: int) -> Dict[str, Any]: ''' Returns n seconds (duration) of latest collected data for monitoring/visualizing or realtime processing purposes. Parameters ---------- duration: int A time duration in seconds for getting the latest recorded data in realtime Returns ------- data: Dict[str, Any] The keys are `data` and `metadata`. `data` is a list of records, or empty list if there's nothing. `metadata` is a dictionary of device metadata including `sampling_rate` and `channels` and `type` ''' # Last seconds of data data = self._stream_data[-1 * duration * self.sampling_rate:] metadata = {"sampling_rate": self.sampling_rate, "type": self.__class__.__name__} realtime_data = {"data": data, "metadata": metadata} return realtime_data def __safe_get(self, data): ''' Safely get nested dictionary values with fallback to None. Parameters ---------- data: dict The data dictionary to extract values from. Returns ------- flat_data: list A flat list of values extracted from the nested dictionary. ''' flat_data = [] mems = data.get("mems", [None]*8) if isinstance(mems, dict): mems_ac = mems.get("ac", [None]*4) if isinstance(mems_ac, dict): flat_data.extend([mems_ac.get("ts", None), *mems_ac.get("ac", [None]*3)]) mems_gy = mems.get("gy", [None]*4) if isinstance(mems_gy, dict): flat_data.extend([mems_gy.get("ts", None), *mems_gy.get("gy", [None]*3)]) left_eye = data.get("left_eye", [None]*10) if isinstance(left_eye, dict): left_pc = left_eye.get("pc", [None]*4) if isinstance(left_pc, dict): flat_data.extend([left_pc.get("ts", None), *left_pc.get("pc", [None]*3)]) left_pd = left_eye.get("pd", [None, None]) flat_data.extend([left_pd.get("ts", None), left_pd.get("pd", None)]) left_gd = left_eye.get("gd", [None]*4) if isinstance(left_gd, dict): flat_data.extend([left_gd.get("ts", None), *left_gd.get("gd", [None]*3)]) right_eye = data.get("right_eye", [None]*10) if isinstance(right_eye, dict): right_pc = right_eye.get("pc", [None]*4) if isinstance(right_pc, dict): flat_data.extend([right_pc.get("ts", None), *right_pc.get("pc", [None]*3)]) right_pd = right_eye.get("pd", [None, None]) flat_data.extend([right_pd.get("ts", None), right_pd.get("pd", None)]) right_gd = right_eye.get("gd", [None]*4) if isinstance(right_gd, dict): flat_data.extend([right_gd.get("ts", None), *right_gd.get("gd", [None]*3)]) gp = data.get("gp", [None]*4) if isinstance(gp, dict): flat_data.extend([gp.get("ts", None), gp.get("l", None), *gp.get("gp", [None]*2)]) gp3 = data.get("gp3", [None]*4) if isinstance(gp3, dict): flat_data.extend([gp3.get("ts", None), *gp3.get("gp3", [None]*3)]) flat_data.append(time.time()) return flat_data