Device Coordinator

class octopus_sensing.device_coordinator.RealtimeDataCache[source]

Bases: object

get_cache()[source]

Returns None if cache is not available or expired

cache(data: List[Any])[source]
class octopus_sensing.device_coordinator.DeviceCoordinator[source]

Bases: object

Device coordinator is responsible for coordination, like start recording data from all devices at once, stop recording, triggering (marking data at point), and terminating devices. When a device is added to the device coordinator, it will be initialized and prepared for data recording.

get_devices() List[octopus_sensing.devices.device.Device][source]

Return a list of added devices

Returns

List of device objects

Return type

List[Device]

add_device(device: octopus_sensing.devices.device.Device) None[source]

Adds new device to the coordinator and starts it

Parameters

device (Device) – a device object

Example

>>> my_shimmer = Shimmer3Streaming(name="Shimmer3_sensor", output_path="./output")
>>> device_coordinator.add_device(my_shimmer)

See also

add_devices

Adds a list of new devices to the coordinator and starts them

add_devices(devices: List[octopus_sensing.devices.device.Device]) None[source]

Adds a list of new devices to the coordinator and starts them

Parameters

devices (List[Device]) – a list of device objects

Example

>>> my_shimmer = Shimmer3Streaming(name="Shimmer3_sensor",
                                   output_path="./output")
>>> device_coordinator.add_devices([my_shimmer])

See also

add_device

Adds new device to the coordinator and starts it

dispatch(message: octopus_sensing.common.message.Message) None[source]

dispatches a new message to all devices

Parameters

message (Message) – a message object

Example

>>> device_coordinator.dispatch(start_message(experiment_id,
                                              stimuli_id))
health_check()[source]

Checks if all the devices are okay.

Returns

True if all are healthy, False otherwise

Return type

boolean

terminate()[source]

sends terminate message to all devices and terminate all processes

Example

>>> device_coordinator.terminate()
get_realtime_data(duration: int, device_list: Optional[List[str]]) Dict[str, List[Any]][source]

Returns latest collected data from all devices. Device’s data can be anything, depending on the device itself.

Parameters
  • duration (int) – a time duration for getting realtime data in seconds

  • device_list (List[str]) – a list of device names. Only devices in this list will be monitored or processed in realtime

Returns

data – The keys are device names and values are collected data from the device

Return type

dict[str, list[any]]

Note

This method is being used for getting data in real-time for monitoring or realtime processing