Source code for tcs_lib.tcs_event

#!/usr/bin/env python
# Library with high level interface to HET's TCS
# Copyright (C) 2017, 2018 "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
'''This module provides functionalities to interface with TCS events.

:class:`TCSEvent` is a listener for `ZeroMQ <http://zeromq.org/>`_ events
generated by the HETDEX Telescope Control System (TCS).

Urls and topics to subscribe can be provided when creating a new instance or
using the :meth:`TCSEvent.connect` and :meth:`TCSEvent.subscribe`.

Events can be received either using the :meth:`TCSEvent.next` method or in a
for loop.

:class:`SafeTCSEvent` is a fail safe version of :class:`TCSEvent`.

:class:`TCSDict` is a dictionary class that, on initialisation, makes sure that
the keywords, that TCS needs to parse events, are initialised. It also provides
the :attr:`TCSDict.topic` property to get and set the topic associated to the
event.

Examples
--------
>>> from __future__ import unicode_literals
>>> tcs_dict = TCSDict()
>>> print(*sorted(tcs_dict.keys()))
__data __data_time __key __source __system __wire_time
>>> # get to topic
>>> tcs_dict.topic
Traceback (most recent call last):
...
AttributeError: 'TCSDict' object has no attribute 'topic'
>>> tcs_dict = TCSDict({'__system': 'tcs_lib', '__source': 'tcs_event',
...                     '__key': 'test'})
>>> print(tcs_dict.topic)
tcs_lib.tcs_event.test
>>> # set topic
>>> tcs_dict.topic = 'tcs_lib1.tcs_event1.test1'
>>> print(tcs_dict['__system'], tcs_dict['__source'], tcs_dict['__key'])
tcs_lib1 tcs_event1 test1
>>> tcs_dict.topic = 42
Traceback (most recent call last):
...
TypeError: The "topic" must be string, not "<... 'int'>"
>>> tcs_dict.topic = 'tcs_lib.tcs_event'
Traceback (most recent call last):
...
ValueError: The topic must be a string with format "system.source.key"
'''
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import json
import traceback

import six
import zmq

from . import string_helpers
from . import errors


[docs]class TCSEvent(object): """TCS event class that connects to the given list of urls and listens for events. See :meth:`next` .. warning:: This object currently does not support concurrency. Examples -------- >>> from tcs_lib import tcs_event >>> events = tcs_event.TCSEvent(["tcp://127.0.0.1:30301", ], ... topics=["tcs.tracker.position", ... "tcs.root.ra_dec"]) >>> for (h, e) in events: # doctest: +SKIP ... if e is not None: ... handle(e) or >>> while True: # doctest: +SKIP ... (h, e) = events.next() ... if e is not None: ... handle(e) Parameters ---------- urls : list of strings urls to monitor; the list is passed to the :meth:`connect` method topics : list of strings, optional topics to subscribe; the list is passed to the :meth:`subscribe` method dates_to_float : bool, optional convert ``__wire_time`` and ``__data_time`` to float before returning the event context : :class:`zmq.Context`, optional context to use when creating the sockets. If not given, uses the global instance returned by :meth:`zmq.Context.instance` """ def __init__(self, urls, topics=None, dates_to_float=False, context=None): if context: self._context = context else: self._context = zmq.Context.instance() self._socket = self._context.socket(zmq.SUB) # ZeroMQ does not allow to access the subscribed topics and urls, so we # save them here self._topics = [] self._urls = [] self._dates_to_float = dates_to_float # subscribe to topics and connect to the ulrs self.subscribe(topics) self.connect(urls)
[docs] def subscribe(self, topics): """Subscribe to each of the topics in the given list. The subscription mechanism works in the following way: * if ``topics`` is ``None`` or an empty list (``[]``) and: * no topic has ever been subscribed, subscribe to all (empty string); * there are already subscribed topics, do nothing; * if ``topics`` is a non empty list and: * no topic has ever been subscribed, subscribe to the given topics; * all topics are subscribed (empty string), unsubscribe it and then subscribe to the given topics; * otherwise subscribe to the new topics, avoiding duplicates Parameters ---------- topics : list of strings topics to subscribe """ all_topics = b'' if not topics: # if no topic is given if self._topics: # Do nothing if there are already subscribed topics pass else: # otherwise subscribe to all self._socket.setsockopt(zmq.SUBSCRIBE, all_topics) self._topics.append(all_topics) else: if all_topics in self._topics: # get rid of the "subscribe all" to be able to subscribe to # other topics self._socket.setsockopt(zmq.UNSUBSCRIBE, all_topics) self._topics.remove(all_topics) # subscribe to the topics avoiding replicates for t in topics: t = string_helpers.string_to_bytes(t) if t not in self._topics: self._socket.setsockopt(zmq.SUBSCRIBE, t) self._topics.append(t)
[docs] def connect(self, urls): """Connect to each of the urls in the given list. Note that this does not affect any prior connections. However, receipt order is still at the whim of the producers. Parameters ---------- urls : list of strings urls to monitor; the list is passed to the :meth:`connect` method """ if urls: for url in urls: self._socket.connect(url) self._urls += urls
[docs] def next(self): """Returns a tuple containing the next event topic and dictionary received. Blocks indefinitely. .. todo:: add timeout? Returns ------- tuple (topic, dict) topic: string with the topic; dict: dictionary with the event information Raises ------ TCSEventIndexError if the incoming multipart event does not have two elements TCSJSONDecodeError if the json cannot be correctly decoded """ message = self._socket.recv_multipart(copy=True) message = string_helpers.bytes_to_string(message) if len(message) != 2: msg = ('The incoming multipart message {0} must have two' ' elements, not {1}'.format(message, len(message))) raise errors.TCSEventIndexError(msg) topic = message[0] try: event = json.loads(message[1]) except ValueError as e: msg = ('The event "{}" is not a valid JSON and cannot be' ' decoded because of: "{}: {}"') msg = msg.format(message[1], e.__class__.__name__, e) six.raise_from(errors.TCSJSONDecodeError(msg), e) if self._dates_to_float: event = self._convert_dates_to_float(event) return topic, event
[docs] def __next__(self): '''alias of :meth:`next`, for iteration in python>=3''' return self.next()
[docs] def close(self): '''Close the socket''' self._socket.close()
[docs] def __iter__(self): '''Return the instance itself for use as iterator''' return self
[docs] def _convert_dates_to_float(self, event): '''Convert __wire_time and __data_time to floats. If they are not in the event dictionary, skip Parameters ---------- event : dict event dictionary Returns ------- event : dict modified event ''' for key in ['__wire_time', '__data_time']: try: event[key] = float(event[key]) except KeyError: pass # ignore missing keys return event
[docs]class SafeTCSEvent(TCSEvent): '''Wrap :meth:`TCSEvent.next` in a try/except block to log exceptions and keep doing. :exc:`StopIteration` and the exceptions passed to ``re_raise`` are raised again. All the other exceptions are trapped, a message is passed to the ``log_func`` so that the caller can handle the exception at need and the :meth:`TCSEvent.next` method is called again. .. important:: Do never ever initialise :class:`SafeTCSEvent` with a function that drops the messages: if you do this you are asking for big troubles. The suggested ways are to use either the TCS logging capability (e.g. via the :class:`tcs_proxy.tcs_log` class) or the :mod:`standard python logging module <logging>` to report error messages. Examples -------- >>> from __future__ import print_function >>> from tcs_lib import tcs_event >>> from tcs_lib import errors >>> def printer(msg): ... print('[Error message]', msg) >>> events = tcs_event.SafeTCSEvent(printer, [], ... ["tcp://127.0.0.1:30301", ], ... topics=["tcs.tracker.position", ... "tcs.root.ra_dec"]) >>> for (h, e) in events: # doctest: +SKIP ... if e is not None: ... handle(e) Prints to the standard output the error message and the traceback. If e.g. one of the events is ``['test', ]``, the following is printed but ``events`` keeps listening for events:: [Error message] It was not possible to retrieve an event because of the following error. If you think that it is a bug an error that should handle implicitly, please report this with **the full traceback** to the developers. Traceback (most recent call last): File "/data01/montefra/HETDEX/Code/tcs_lib/tcs_lib/tcs_event.py", line 271, in next return super(SafeTCSEvent, self).next() File "/data01/montefra/HETDEX/Code/tcs_lib/tcs_lib/tcs_event.py", line 165, in next raise errors.TCSEventIndexError(msg) tcs_lib.errors.TCSEventIndexError: The incoming multipart message ['test'] must have two elements, not 1 If instead the same event comes in and the :class:`SafeTCSEvent` is initialised in the following way: >>> events = tcs_event.SafeTCSEvent(printer, [errors.TCSEventIndexError, ], ... ["tcp://127.0.0.1:30301", ], ... topics=["tcs.tracker.position", ... "tcs.root.ra_dec"]) ``events`` would crash with the above traceback. Parameters ---------- log_func : callable function accepting one arguments: * message (string): the error message with the traceback re_raise : iteralbe of exceptions list of exceptions to re-raise; :exc:`StopIteration` is always re-raised args, kwarg : position and keyword arguments passed to :class:`TCSEvent` Attributes ---------- log_func same as input re_raise : tuple exceptions to re-raise ''' def __init__(self, log_func, re_raise, *args, **kwargs): super(SafeTCSEvent, self).__init__(*args, **kwargs) self.log_func = log_func _re_raise = {StopIteration} _re_raise.update(re_raise) self.re_raise = tuple(_re_raise)
[docs] def next(self): '''Fail-safe version of :meth:`TCSEvent.next`.''' while True: try: return super(SafeTCSEvent, self).next() except self.re_raise: # At the time this class has been implemented (09.02.2018) # TCSEvent doesn't raise a StopIteration. However # since this is the way to break out of iterations, the guard # has been added to make this class future proofed raise except Exception as e: err_msg = ('It was not possible to retrieve an event because' ' of the following error.\n' 'If you think that it is a bug an error' ' that should handle implicitly, please report' ' this with **the full traceback** to the' ' developers.\n{}' ) self.log_func(err_msg.format(traceback.format_exc()))
[docs]class TCSDict(dict): '''Dictionary providing the basic keys necessary to comply with event TCS API. When constructing the object, makes sure that the following keys a present: * ``__wire_time`` (string): the time at which the messaging api put the message on the wire (default: '0'); * ``__data_time`` (string): a time provided by a developer, ideally this is the temporal reference point for the contents (default: :func:`.string_helpers.time_str`); * ``__data`` (string): boolean indicating whether a data payload follows (default: 'false'); * ``__system`` (string): the system that generated the event (first token in the event topic: "system.source.key") (default: empty string) * ``__source`` (string): the source that generated the event (second token in the event topic: "system.source.key") (default: empty string) * ``__key`` (string): the key that generated the event (third token in the event topic: "system.source.key") (default: empty string) After initializing, execute :meth:`ensure_string_times`. This method can be executed before sending out events, e.g. using :class:`.server.ZMQServer`, to make sure that the times are stored in the correct format. Parameters ---------- args, kwargs: arguments to initialise underlying :class:`dict` Attributes ---------- topic ''' def __init__(self, *args, **kwargs): # call the init of the parent the parent class super(TCSDict, self).__init__(*args, **kwargs) # lists or sets of keys self._string_dates = ['__wire_time', '__data_time'] self._topic_keys = ['__system', '__source', '__key'] _mandatory = {'__wire_time': '0', '__data_time': string_helpers.time_str(), '__data': 'false', '__system': '', '__source': '', '__key': ''} # set the defaults for k, v in _mandatory.items(): if k not in self: self[k] = v self.ensure_string_times() @property def topic(self): '''Get or set the topic associated with current dictionary event (as stored in the ``__system``, ``__source`` and ``__key`` keys). Raises ------ AttributeError if, when retrieving the topic, the above keys are empty TypeError if the new topic is not a string ValueError if the new topic is not in the form 'system.source.key' ''' _topic = '.'.join(self[k] for k in self._topic_keys) if _topic == '..': msg = "'{}' object has no attribute 'topic'" raise AttributeError(msg.format(self.__class__.__name__)) else: return _topic @topic.setter def topic(self, value): if not isinstance(value, six.string_types): raise TypeError('The "topic" must be string, not' ' "{}"'.format(type(value))) topic_values = value.split('.') if len(topic_values) != len(self._topic_keys): raise ValueError('The topic must be a string with format' ' "system.source.key"') else: for k, v in zip(self._topic_keys, topic_values): self[k] = v
[docs] def set_data_time(self): '''reset the ``__data_time`` value to :func:`.string_helpers.time_str`''' self['__data_time'] = string_helpers.time_str()
[docs] def ensure_string_times(self): '''Ensure that the ``__wire_time`` and ``__data_time`` are strings using :data:`.string_helpers.TIME_FMT`''' for k in self._string_dates: if not isinstance(self[k], six.string_types): self[k] = string_helpers.TIME_FMT.format(t=self[k])