Source code for octopus_sensing.devices.monitored_device

# This file is part of Octopus Sensing <https://octopus-sensing.nastaran-saffar.me/>
# Copyright © Nastaran Saffaryazdi 2020
#
# Octopus Sensing is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
#  either version 3 of the License, or (at your option) any later version.
#
# Octopus Sensing is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with Octopus Sensing.
# If not, see <https://www.gnu.org/licenses/>.

import sys
import pickle
import multiprocessing.queues
import threading
import traceback
from typing import List, Any

from octopus_sensing.devices.device import Device

QueueType = multiprocessing.queues.Queue


[docs]class MonitoredDevice(Device): ''' Provides functionalities for monitoring a device's data. For example, visualizing data in real time. ''' def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._monitor_in_q = None self._monitor_out_q = None
[docs] def set_monitoring_queues(self, monitor_in_q: QueueType, monitor_out_q: QueueType) -> None: '''Sets the queues for communicating with the parent process. It should be called before the start of the process. ''' self._monitor_in_q = monitor_in_q self._monitor_out_q = monitor_out_q
[docs] def run(self) -> None: # Ensuring queues are set. assert self._monitor_in_q is not None assert self._monitor_out_q is not None threading.Thread(target=self._monitor_loop, name=self.__class__.__name__ + " monitor thread", daemon=True) \ .start() self._run()
def _monitor_loop(self) -> None: while True: # We don't use the data from the queue. It's just a signal. self._monitor_in_q.get() try: # Only 10ms timeout, because we don't want to take cpu time from the # main thread (data collector) self._monitor_out_q.put( pickle.dumps( self._get_monitoring_data(), protocol=pickle.HIGHEST_PROTOCOL), timeout=0.01) except pickle.PickleError: print("Error pickling monitoring data", file=sys.stderr) traceback.print_exc() # We don't want to keep the parent process waiting self._monitor_out_q(pickle.dumps( [], protocol=pickle.HIGHEST_PROTOCOL)) def _get_monitoring_data(self) -> List[Any]: '''Subclasses must implmenet this method. It should return a list of latest collected records. This method will be called in a separate thread, and should be thread-safe. Also, implmentation must return as quick as possible, to prevent blocking of the main thread that doing the collecting. @param requested_records: Number of records that should be returned. @type requested_records: int @return: List of records, or empty list if there's nothing. @rtype: List[Any] ''' raise NotImplementedError()