tcs_event – ZMQ based listener of TCS events

This module provides functionalities to interface with TCS events.

TCSEvent is a listener for ZeroMQ events generated by the HETDEX Telescope Control System (TCS).

Urls and topics to subscribe can be provided when creating a new instance or using the TCSEvent.connect() and TCSEvent.subscribe().

Events can be received either using the TCSEvent.next() method or in a for loop.

SafeTCSEvent is a fail safe version of TCSEvent.

TCSDict is a dictionary class that, on initialisation, makes sure that the keywords, that TCS needs to parse events, are initialised. It also provides the TCSDict.topic property to get and set the topic associated to the event.

Examples

>>> from __future__ import unicode_literals
>>> tcs_dict = TCSDict()
>>> print(*sorted(tcs_dict.keys()))
__data __data_time __key __source __system __wire_time
>>> # get to topic
>>> tcs_dict.topic
Traceback (most recent call last):
...
AttributeError: 'TCSDict' object has no attribute 'topic'
>>> tcs_dict = TCSDict({'__system': 'tcs_lib', '__source': 'tcs_event',
...                     '__key': 'test'})
>>> print(tcs_dict.topic)
tcs_lib.tcs_event.test
>>> # set topic
>>> tcs_dict.topic = 'tcs_lib1.tcs_event1.test1'
>>> print(tcs_dict['__system'], tcs_dict['__source'], tcs_dict['__key'])
tcs_lib1 tcs_event1 test1
>>> tcs_dict.topic = 42
Traceback (most recent call last):
...
TypeError: The "topic" must be string, not "<... 'int'>"
>>> tcs_dict.topic = 'tcs_lib.tcs_event'
Traceback (most recent call last):
...
ValueError: The topic must be a string with format "system.source.key"
class tcs_lib.tcs_event.TCSEvent(urls, topics=None, dates_to_float=False, context=None)[source]

Bases: object

TCS event class that connects to the given list of urls and listens for events. See next()

Warning

This object currently does not support concurrency.

Parameters:
urls : list of strings

urls to monitor; the list is passed to the connect() method

topics : list of strings, optional

topics to subscribe; the list is passed to the subscribe() method

dates_to_float : bool, optional

convert __wire_time and __data_time to float before returning the event

context : zmq.Context, optional

context to use when creating the sockets. If not given, uses the global instance returned by zmq.Context.instance()

Examples

>>> from tcs_lib import tcs_event
>>> events = tcs_event.TCSEvent(["tcp://127.0.0.1:30301", ],
...                              topics=["tcs.tracker.position",
...                                      "tcs.root.ra_dec"])
>>> for (h, e) in events:  
...     if e is not None:
...         handle(e)

or

>>> while True:  
...     (h, e) = events.next()
...     if e is not None:
...         handle(e)
subscribe(topics)[source]

Subscribe to each of the topics in the given list.

The subscription mechanism works in the following way:

  • if topics is None or an empty list ([]) and:

    • no topic has ever been subscribed, subscribe to all (empty string);
    • there are already subscribed topics, do nothing;
  • if topics is a non empty list and:

    • no topic has ever been subscribed, subscribe to the given topics;
    • all topics are subscribed (empty string), unsubscribe it and then subscribe to the given topics;
    • otherwise subscribe to the new topics, avoiding duplicates
Parameters:
topics : list of strings

topics to subscribe

connect(urls)[source]

Connect to each of the urls in the given list.

Note that this does not affect any prior connections. However, receipt order is still at the whim of the producers.

Parameters:
urls : list of strings

urls to monitor; the list is passed to the connect() method

next()[source]

Returns a tuple containing the next event topic and dictionary received. Blocks indefinitely.

Todo

add timeout?

Returns:
tuple (topic, dict)

topic: string with the topic; dict: dictionary with the event information

Raises:
TCSEventIndexError

if the incoming multipart event does not have two elements

TCSJSONDecodeError

if the json cannot be correctly decoded

__next__()[source]

alias of next(), for iteration in python>=3

close()[source]

Close the socket

__iter__()[source]

Return the instance itself for use as iterator

_convert_dates_to_float(event)[source]

Convert __wire_time and __data_time to floats. If they are not in the event dictionary, skip

Parameters:
event : dict

event dictionary

Returns:
event : dict

modified event

class tcs_lib.tcs_event.SafeTCSEvent(log_func, re_raise, *args, **kwargs)[source]

Bases: tcs_lib.tcs_event.TCSEvent

Wrap TCSEvent.next() in a try/except block to log exceptions and keep doing.

StopIteration and the exceptions passed to re_raise are raised again. All the other exceptions are trapped, a message is passed to the log_func so that the caller can handle the exception at need and the TCSEvent.next() method is called again.

Important

Do never ever initialise SafeTCSEvent with a function that drops the messages: if you do this you are asking for big troubles. The suggested ways are to use either the TCS logging capability (e.g. via the tcs_proxy.tcs_log class) or the standard python logging module to report error messages.

Parameters:
log_func : callable

function accepting one arguments: * message (string): the error message with the traceback

re_raise : iteralbe of exceptions

list of exceptions to re-raise; StopIteration is always re-raised

args, kwarg :

position and keyword arguments passed to TCSEvent

Examples

>>> from __future__ import print_function
>>> from tcs_lib import tcs_event
>>> from tcs_lib import errors
>>> def printer(msg):
...     print('[Error message]', msg)
>>> events = tcs_event.SafeTCSEvent(printer, [],
...                                 ["tcp://127.0.0.1:30301", ],
...                                 topics=["tcs.tracker.position",
...                                         "tcs.root.ra_dec"])
>>> for (h, e) in events:  
...     if e is not None:
...         handle(e)

Prints to the standard output the error message and the traceback. If e.g. one of the events is ['test', ], the following is printed but events keeps listening for events:

[Error message] It was not possible to retrieve an event because of the following error.
If you think that it is a bug an error that should handle implicitly,
please report this with **the full traceback** to the developers.
Traceback (most recent call last):
File "/data01/montefra/HETDEX/Code/tcs_lib/tcs_lib/tcs_event.py", line 271, in next
    return super(SafeTCSEvent, self).next()
File "/data01/montefra/HETDEX/Code/tcs_lib/tcs_lib/tcs_event.py", line 165, in next
    raise errors.TCSEventIndexError(msg)
tcs_lib.errors.TCSEventIndexError: The incoming multipart message ['test'] must have two elements, not 1

If instead the same event comes in and the SafeTCSEvent is initialised in the following way:

>>> events = tcs_event.SafeTCSEvent(printer, [errors.TCSEventIndexError, ],
...                                 ["tcp://127.0.0.1:30301", ],
...                                 topics=["tcs.tracker.position",
...                                         "tcs.root.ra_dec"])

events would crash with the above traceback.

Attributes:
log_func

same as input

re_raise : tuple

exceptions to re-raise

next()[source]

Fail-safe version of TCSEvent.next().

class tcs_lib.tcs_event.TCSDict(*args, **kwargs)[source]

Bases: dict

Dictionary providing the basic keys necessary to comply with event TCS API.

When constructing the object, makes sure that the following keys a present:

  • __wire_time (string): the time at which the messaging api put the message on the wire (default: ‘0’);
  • __data_time (string): a time provided by a developer, ideally this is the temporal reference point for the contents (default: string_helpers.time_str());
  • __data (string): boolean indicating whether a data payload follows (default: ‘false’);
  • __system (string): the system that generated the event (first token in the event topic: “system.source.key”) (default: empty string)
  • __source (string): the source that generated the event (second token in the event topic: “system.source.key”) (default: empty string)
  • __key (string): the key that generated the event (third token in the event topic: “system.source.key”) (default: empty string)

After initializing, execute ensure_string_times(). This method can be executed before sending out events, e.g. using server.ZMQServer, to make sure that the times are stored in the correct format.

Parameters:
args, kwargs:

arguments to initialise underlying dict

Attributes:
topic

Get or set the topic associated with current dictionary event (as stored in the __system, __source and __key keys).

topic

Get or set the topic associated with current dictionary event (as stored in the __system, __source and __key keys).

Raises:
AttributeError

if, when retrieving the topic, the above keys are empty

TypeError

if the new topic is not a string

ValueError

if the new topic is not in the form ‘system.source.key’

set_data_time()[source]

reset the __data_time value to string_helpers.time_str()

ensure_string_times()[source]

Ensure that the __wire_time and __data_time are strings using string_helpers.TIME_FMT