tcs_event
– ZMQ based listener of TCS events¶
This module provides functionalities to interface with TCS events.
TCSEvent
is a listener for ZeroMQ events
generated by the HETDEX Telescope Control System (TCS).
Urls and topics to subscribe can be provided when creating a new instance or
using the TCSEvent.connect()
and TCSEvent.subscribe()
.
Events can be received either using the TCSEvent.next()
method or in a
for loop.
SafeTCSEvent
is a fail safe version of TCSEvent
.
TCSDict
is a dictionary class that, on initialisation, makes sure that
the keywords, that TCS needs to parse events, are initialised. It also provides
the TCSDict.topic
property to get and set the topic associated to the
event.
Examples¶
>>> from __future__ import unicode_literals
>>> tcs_dict = TCSDict()
>>> print(*sorted(tcs_dict.keys()))
__data __data_time __key __source __system __wire_time
>>> # get to topic
>>> tcs_dict.topic
Traceback (most recent call last):
...
AttributeError: 'TCSDict' object has no attribute 'topic'
>>> tcs_dict = TCSDict({'__system': 'tcs_lib', '__source': 'tcs_event',
... '__key': 'test'})
>>> print(tcs_dict.topic)
tcs_lib.tcs_event.test
>>> # set topic
>>> tcs_dict.topic = 'tcs_lib1.tcs_event1.test1'
>>> print(tcs_dict['__system'], tcs_dict['__source'], tcs_dict['__key'])
tcs_lib1 tcs_event1 test1
>>> tcs_dict.topic = 42
Traceback (most recent call last):
...
TypeError: The "topic" must be string, not "<... 'int'>"
>>> tcs_dict.topic = 'tcs_lib.tcs_event'
Traceback (most recent call last):
...
ValueError: The topic must be a string with format "system.source.key"
-
class
tcs_lib.tcs_event.
TCSEvent
(urls, topics=None, dates_to_float=False, context=None)[source]¶ Bases:
object
TCS event class that connects to the given list of urls and listens for events. See
next()
Warning
This object currently does not support concurrency.
Parameters: - urls : list of strings
urls to monitor; the list is passed to the
connect()
method- topics : list of strings, optional
topics to subscribe; the list is passed to the
subscribe()
method- dates_to_float : bool, optional
convert
__wire_time
and__data_time
to float before returning the event- context :
zmq.Context
, optional context to use when creating the sockets. If not given, uses the global instance returned by
zmq.Context.instance()
Examples
>>> from tcs_lib import tcs_event >>> events = tcs_event.TCSEvent(["tcp://127.0.0.1:30301", ], ... topics=["tcs.tracker.position", ... "tcs.root.ra_dec"]) >>> for (h, e) in events: ... if e is not None: ... handle(e)
or
>>> while True: ... (h, e) = events.next() ... if e is not None: ... handle(e)
-
subscribe
(topics)[source]¶ Subscribe to each of the topics in the given list.
The subscription mechanism works in the following way:
if
topics
isNone
or an empty list ([]
) and:- no topic has ever been subscribed, subscribe to all (empty string);
- there are already subscribed topics, do nothing;
if
topics
is a non empty list and:- no topic has ever been subscribed, subscribe to the given topics;
- all topics are subscribed (empty string), unsubscribe it and then subscribe to the given topics;
- otherwise subscribe to the new topics, avoiding duplicates
Parameters: - topics : list of strings
topics to subscribe
-
connect
(urls)[source]¶ Connect to each of the urls in the given list.
Note that this does not affect any prior connections. However, receipt order is still at the whim of the producers.
Parameters: - urls : list of strings
urls to monitor; the list is passed to the
connect()
method
-
next
()[source]¶ Returns a tuple containing the next event topic and dictionary received. Blocks indefinitely.
Todo
add timeout?
Returns: - tuple (topic, dict)
topic: string with the topic; dict: dictionary with the event information
Raises: - TCSEventIndexError
if the incoming multipart event does not have two elements
- TCSJSONDecodeError
if the json cannot be correctly decoded
-
class
tcs_lib.tcs_event.
SafeTCSEvent
(log_func, re_raise, *args, **kwargs)[source]¶ Bases:
tcs_lib.tcs_event.TCSEvent
Wrap
TCSEvent.next()
in a try/except block to log exceptions and keep doing.StopIteration
and the exceptions passed tore_raise
are raised again. All the other exceptions are trapped, a message is passed to thelog_func
so that the caller can handle the exception at need and theTCSEvent.next()
method is called again.Important
Do never ever initialise
SafeTCSEvent
with a function that drops the messages: if you do this you are asking for big troubles. The suggested ways are to use either the TCS logging capability (e.g. via thetcs_proxy.tcs_log
class) or thestandard python logging module
to report error messages.Parameters: - log_func : callable
function accepting one arguments: * message (string): the error message with the traceback
- re_raise : iteralbe of exceptions
list of exceptions to re-raise;
StopIteration
is always re-raised- args, kwarg :
position and keyword arguments passed to
TCSEvent
Examples
>>> from __future__ import print_function >>> from tcs_lib import tcs_event >>> from tcs_lib import errors >>> def printer(msg): ... print('[Error message]', msg) >>> events = tcs_event.SafeTCSEvent(printer, [], ... ["tcp://127.0.0.1:30301", ], ... topics=["tcs.tracker.position", ... "tcs.root.ra_dec"]) >>> for (h, e) in events: ... if e is not None: ... handle(e)
Prints to the standard output the error message and the traceback. If e.g. one of the events is
['test', ]
, the following is printed butevents
keeps listening for events:[Error message] It was not possible to retrieve an event because of the following error. If you think that it is a bug an error that should handle implicitly, please report this with **the full traceback** to the developers. Traceback (most recent call last): File "/data01/montefra/HETDEX/Code/tcs_lib/tcs_lib/tcs_event.py", line 271, in next return super(SafeTCSEvent, self).next() File "/data01/montefra/HETDEX/Code/tcs_lib/tcs_lib/tcs_event.py", line 165, in next raise errors.TCSEventIndexError(msg) tcs_lib.errors.TCSEventIndexError: The incoming multipart message ['test'] must have two elements, not 1
If instead the same event comes in and the
SafeTCSEvent
is initialised in the following way:>>> events = tcs_event.SafeTCSEvent(printer, [errors.TCSEventIndexError, ], ... ["tcp://127.0.0.1:30301", ], ... topics=["tcs.tracker.position", ... "tcs.root.ra_dec"])
events
would crash with the above traceback.Attributes: - log_func
same as input
- re_raise : tuple
exceptions to re-raise
-
next
()[source]¶ Fail-safe version of
TCSEvent.next()
.
-
class
tcs_lib.tcs_event.
TCSDict
(*args, **kwargs)[source]¶ Bases:
dict
Dictionary providing the basic keys necessary to comply with event TCS API.
When constructing the object, makes sure that the following keys a present:
__wire_time
(string): the time at which the messaging api put the message on the wire (default: ‘0’);__data_time
(string): a time provided by a developer, ideally this is the temporal reference point for the contents (default:string_helpers.time_str()
);__data
(string): boolean indicating whether a data payload follows (default: ‘false’);__system
(string): the system that generated the event (first token in the event topic: “system.source.key”) (default: empty string)__source
(string): the source that generated the event (second token in the event topic: “system.source.key”) (default: empty string)__key
(string): the key that generated the event (third token in the event topic: “system.source.key”) (default: empty string)
After initializing, execute
ensure_string_times()
. This method can be executed before sending out events, e.g. usingserver.ZMQServer
, to make sure that the times are stored in the correct format.Parameters: - args, kwargs:
arguments to initialise underlying
dict
Attributes: topic
Get or set the topic associated with current dictionary event (as stored in the
__system
,__source
and__key
keys).
-
topic
¶ Get or set the topic associated with current dictionary event (as stored in the
__system
,__source
and__key
keys).Raises: - AttributeError
if, when retrieving the topic, the above keys are empty
- TypeError
if the new topic is not a string
- ValueError
if the new topic is not in the form ‘system.source.key’
-
set_data_time
()[source]¶ reset the
__data_time
value tostring_helpers.time_str()
-
ensure_string_times
()[source]¶ Ensure that the
__wire_time
and__data_time
are strings usingstring_helpers.TIME_FMT