Source code for octopus_sensing.devices.network_devices.http_device

# This file is part of Octopus Sensing <https://octopus-sensing.nastaran-saffar.me/>
# Copyright © Nastaran Saffaryazdi 2020
#
# Octopus Sensing is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software Foundation,
#  either version 3 of the License, or (at your option) any later version.
#
# Octopus Sensing is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
# without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with Octopus Sensing.
# If not, see <https://www.gnu.org/licenses/>.

import sys
import re
import threading
import http.client
import pickle
import json
from typing import Optional, Callable, List, Union

import msgpack

from octopus_sensing.common.message_creators import MessageType
from octopus_sensing.common.message import Message
from octopus_sensing.devices.device import Device


class SerializationTypes:
    JSON = json.dumps
    MSGPACK = msgpack.packb
    PICKLE = pickle.dumps


[docs]class HttpNetworkDevice(Device): ''' This class can be used to send messages (triggers) to the other softwares through HTTP protocol. The external software need to listen for incoming HTTP connections (a.k.a HTTP server or HTTP endpoint). This device will POST message to the specified URLs. The messages will be in the form of a dictionary, serialized by the specified type. For example, if the `serialization_type` is JSON, a `START` message will be: {"type": "START", "experiment_id": "exp1", "stimulus_id": "s1"} Parameters ---------- external_endpoints: List[str] A list of URLs to send the message to. It should have the scheme (http or https) at the beginning. For example: ["http://localhost:8080"] Note that for IPv6, you need to put it between braces, like this: `http://[2345:425:2ca1:0000:0000:567:5673:23b5]/` serialization_type: Should be one of the types defined in SerializationTypes name: str, default: None Name of this device timeout: int, default: 3 Timeout for both connecting to the external endpoint and POSTing messages. In seconds. Example ------- In your Octopus Sensing software: >>> coordinator = DeviceCoordinator() >>> device = HttpNetworkDevice(["http://localhost:8080/", "http://192.168.1.1/trigger"]) >>> coordinator.add_device(device) Every message that is dispatched using `coordinator.dispatch` method will be send to `http://localhost:8080/`. ''' def __init__(self, external_endpoints: List[str], serialization_type: Callable[..., Union[bytes, str]] = SerializationTypes.JSON, name: Optional[str] = None, timeout: int = 3): super().__init__(name=name) self._endpoints = external_endpoints self._timeout = timeout self._serialization_func = serialization_type # Regex is borrowed from O'Reilly website: https://www.oreilly.com/library/view/regular-expressions-cookbook/9781449327453/ch08s10.html # It validates and extracts host and port parts of the URL self._url_regex = re.compile(( r"""([a-z][a-z0-9+\-.]*://)""" # scheme r"""([a-z0-9\-._~%]+""" # named host r"""|\[[a-f0-9:.]+\]""" # ipv6 host r"""|\[v[a-f0-9][a-z0-9\-._~%!$&'()*+,;=:]+\])""" # IPvFuture host r"""(:[0-9]+)?""" # port r"""(/.*)?""" # rest )) # We want to give these kind of errors early self._validate_endpoints() def _validate_endpoints(self): for endpoint in self._endpoints: match_object = self._url_regex.match(endpoint) if match_object is None: raise RuntimeError("HttpNetworkDevice({0}): invalid URL: {1}\nIt should be in the form http://host:port/".format( self.name, endpoint)) scheme, host, port, rest = match_object.groups() if scheme not in ('http://', 'https://'): raise RuntimeError("HttpNetworkDevice({0}): invalid URL: {1} It only supports 'http' & 'https' scheme, got: {2}".format( self.name, endpoint, scheme)) def _run(self): while True: message = self.message_queue.get() if message is None: continue self._send_message(message) if message.type == MessageType.TERMINATE: break def _send_message(self, message: Message): message_dict = {"type": message.type, "experiment_id": message.experiment_id, "stimulus_id": message.stimulus_id} serialized_message = self._serialization_func(message_dict) for endpoint in self._endpoints: threading.Thread(target=self._http_post, args=(endpoint, serialized_message,)).start() def _http_post(self, endpoint: str, body: Union[bytes, str]): match_object = self._url_regex.match(endpoint) assert match_object is not None, 'We already validated all endpoints' scheme, host, port_str, rest = match_object.groups() port: Optional[int] = None if port_str: port = int(port_str) connect_to = "{0}:{1}".format(scheme, host) post_to = rest if not rest: post_to = "/" http_client = http.client.HTTPConnection( connect_to, port=port, timeout=self._timeout) http_client.request("POST", post_to, body=body) response = http_client.getresponse() if response.status != 200: print("HttpNetworkDevice({0}): Sending message to [{1}] failed: {2} {3}".format( self.name, endpoint, response.status, response.reason), file=sys.stderr)