Source code for octopus_sensing.devices.realtime_data_device

# This file is part of Octopus Sensing <https://octopus-sensing.nastaran-saffar.me/>
# Copyright © Nastaran Saffaryazdi 2020
#
# Octopus Sensing is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
#  either version 3 of the License, or (at your option) any later version.
#
# Octopus Sensing is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with Octopus Sensing.
# If not, see <https://www.gnu.org/licenses/>.

import sys
import pickle
import multiprocessing.queues
import threading
import traceback
from typing import Dict, Any

from octopus_sensing.devices.device import Device

QueueType = multiprocessing.queues.Queue


[docs]class RealtimeDataDevice(Device): ''' Provides functionalities for realtime processing or monitoring a device's data. For example, visualizing data in real time. ''' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._realtime_data_in_q = None self._realtime_data_out_q = None
[docs] def set_realtime_data_queues(self, realtime_data_in_q: QueueType, realtime_data_out_q: QueueType) -> None: '''Sets the queues for communicating with the parent process. It should be called before the start of the process. ''' self._realtime_data_in_q = realtime_data_in_q self._realtime_data_out_q = realtime_data_out_q
[docs] def run(self) -> None: # Ensuring queues are set. assert self._realtime_data_in_q is not None assert self._realtime_data_out_q is not None threading.Thread(target=self._realtime_data_loop, name=self.__class__.__name__ + " realtime data thread", daemon=True) \ .start() self._run()
def _realtime_data_loop(self) -> None: while True: # There is a duration for getting data n queue data = self._realtime_data_in_q.get() duration = int(data) try: # Only 10ms timeout, because we don't want to take cpu time from the # main thread (data collector) self._realtime_data_out_q.put( pickle.dumps( self._get_realtime_data(duration), protocol=pickle.HIGHEST_PROTOCOL), timeout=0.01) except pickle.PickleError: print("Error pickling realtime data", file=sys.stderr) traceback.print_exc() # We don't want to keep the parent process waiting self._realtime_data_out_q(pickle.dumps( [], protocol=pickle.HIGHEST_PROTOCOL)) def _get_realtime_data(self, duration: int) -> Dict[str, Any]: ''' Subclasses must implmenet this method. It should return a list of latest collected records. This method will be called in a separate thread, and should be thread-safe. Also, implmentation must return as quick as possible, to prevent blocking of the main thread that doing the collecting. Parameters ---------- duration: int A time duration in seconds for getting the latest recorded data in realtime Returns ------- data: Dict[str, Any] it includes `data`: List of records, or empty list if there's nothing. `metadata`: Dict of device metadata ''' raise NotImplementedError()