Source code for octopus_sensing.device_coordinator

# This file is part of Octopus Sensing <https://octopus-sensing.nastaran-saffar.me/>
# Copyright © Nastaran Saffaryazdi 2020
#
# Octopus Sensing is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
#  either version 3 of the License, or (at your option) any later version.
#
# Octopus Sensing is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with Octopus Sensing.
# If not, see <https://www.gnu.org/licenses/>.

import sys
import time
import traceback
import multiprocessing
import queue
import pickle
from typing import List, Any, Tuple, Dict, Optional

from octopus_sensing.devices.device import Device
from octopus_sensing.devices.realtime_data_device import RealtimeDataDevice
from octopus_sensing.common.message import Message
from octopus_sensing.common.message_creators import terminate_message

QueueType = multiprocessing.queues.Queue


[docs]class RealtimeDataCache: def __init__(self): self._time = time.time_ns() self._cached_data = []
[docs] def get_cache(self): '''Returns None if cache is not available or expired''' # 100 ms if time.time_ns() - self._time > 100000000: return None return self._cached_data
[docs] def cache(self, data: List[Any]): self._cached_data = data self._time = time.time_ns()
[docs]class DeviceCoordinator: ''' Device coordinator is responsible for coordination, like start recording data from all devices at once, stop recording, triggering (marking data at point), and terminating devices. When a device is added to the device coordinator, it will be initialized and prepared for data recording. ''' def __init__(self): self.__devices = {} self.__queues: List[QueueType] = [] self.__device_counter: int = 0 self.__realtime_data_queues: List[Tuple[QueueType, QueueType, Device]] = [ ] self.__realtime_data_cache = RealtimeDataCache() def __get_device_id(self) -> str: ''' Generate an ID for devices that do not have name Returns ----------- str an ID for device ''' self.__device_counter += 1 device_id = "device_{0}".format(self.__device_counter) return device_id
[docs] def get_devices(self) -> List[Device]: ''' Return a list of added devices Returns ----------- List[Device] List of device objects ''' return list(self.__devices.values())
[docs] def add_device(self, device: Device) -> None: ''' Adds new device to the coordinator and starts it Parameters ---------- device: Device a device object Example -------- >>> my_shimmer = Shimmer3Streaming(name="Shimmer3_sensor", output_path="./output") >>> device_coordinator.add_device(my_shimmer) See Also --------- add_devices: Adds a list of new devices to the coordinator and starts them ''' assert isinstance(device, Device) if device.name is None: device.name = self.__get_device_id() if device.name in self.__devices.keys(): raise RuntimeError("This device already has been added") self.__devices[device.name] = device msg_queue: QueueType = multiprocessing.Queue() device.set_queue(msg_queue) self.__queues.append(msg_queue) self.__set_realtime_data_queues(device) device.start()
[docs] def add_devices(self, devices: List[Device]) -> None: ''' Adds a list of new devices to the coordinator and starts them Parameters ---------- devices: List[Device] a list of device objects Example -------- >>> my_shimmer = Shimmer3Streaming(name="Shimmer3_sensor", output_path="./output") >>> device_coordinator.add_devices([my_shimmer]) See Also --------- add_device: Adds new device to the coordinator and starts it ''' for device in devices: self.add_device(device)
[docs] def dispatch(self, message: Message) -> None: ''' dispatches a new message to all devices Parameters ---------- message: Message a message object Example -------- >>> device_coordinator.dispatch(start_message(experiment_id, stimuli_id)) ''' for message_queue in self.__queues: message_queue.put(message)
[docs] def health_check(self): ''' Checks if all the devices are okay. Returns ----------- boolean True if all are healthy, False otherwise ''' for device_name, device, in self.__devices.items(): if not device.is_alive(): print("device [{0}] is not alive anymore.".format(device_name)) return False return True
[docs] def terminate(self): ''' sends terminate message to all devices and terminate all processes Example -------- >>> device_coordinator.terminate() ''' self.dispatch(terminate_message()) for item in self.__devices.values(): item.join()
[docs] def get_realtime_data(self, duration: int, device_list: Optional[List[str]]) -> Dict[str, List[Any]]: ''' Returns latest collected data from all devices. Device's data can be anything, depending on the device itself. Parameters ---------- duration: int a time duration for getting realtime data in seconds device_list: List[str] a list of device names. Only devices in this list will be monitored or processed in realtime Returns --------- data : dict[str, list[any]] The keys are device names and values are collected data from the device Note ---- This method is being used for getting data in real-time for monitoring or realtime processing ''' cached = self.__realtime_data_cache.get_cache() if cached: return cached out_queues: List[Tuple[QueueType, str]] = [] # Putting request for all devices, then collecting them all, for performance reasons. for in_q, out_q, device in self.__realtime_data_queues: if device_list is None or device.name in device_list: try: # The sub-process won't use the data we put in the queue. It's just a signal. in_q.put(str(duration), timeout=0.1) out_queues.append((out_q, device.name)) except queue.Full: print("Could not put realtime data request for {0} device.".format( device.name), file=sys.stderr) traceback.print_exc() result = {} for out_q, device_name in out_queues: try: records = pickle.loads(out_q.get(timeout=0.1)) # We ensured device has a name in the add_device, ignoring it here. result[device_name] = records # type: ignore except (queue.Empty, pickle.PickleError): print("Could not read realtime data from {0} device".format( device_name), file=sys.stderr) traceback.print_exc() self.__realtime_data_cache.cache(result) return result
def __set_realtime_data_queues(self, device: Device) -> None: if isinstance(device, RealtimeDataDevice): in_q: QueueType = multiprocessing.Queue() out_q: QueueType = multiprocessing.Queue() device.set_realtime_data_queues(in_q, out_q) self.__realtime_data_queues.append((in_q, out_q, device))