# Library with high level interface to HET's TCS
# Copyright (C) 2017, 2018 "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''ZMQ based server to stream content and TCS-like event generators.
'''
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import json
from random import random
import sqlite3
import time
import six
import zmq
from .string_helpers import string_to_bytes, time_str
from .errors import DBOrderingError
[docs]class ZMQServer(object):
'''Create a ZeroMQ server that publishes content on the give ``ulr``.
Sending events is performed by the :meth:`send_event` method using
:meth:`zmq.Socket.send_multipart`.
:meth:`send_event` and :meth:`start` allow to send generic events.
:meth:`send_tcs_event` is designed to send events that reflect TCS
expectations. If the ``tcs_event`` passed to the method does not contain
the ``__wire_time`` key, set it to :func:`time.time`.
Parameters
----------
url : string
url and port to bind the socket to
context : :class:`zmq.Context`, optional
context to use when creating the sockets. If not given, uses the global
instance returned by :meth:`zmq.Context.instance`
'''
def __init__(self, url, context=None):
if context:
self.context = context
else:
self.context = zmq.Context.instance()
self.socket = self.context.socket(zmq.PUB)
self.socket.bind(url)
[docs] def send_event(self, event):
'''Send the event via the socket.
Parameters
----------
event : list of string
event to send using :meth:`zmq.Socket.send_multipart`; the events
are converted to byte string using
:func:`~tcs_lib.string_helpers.string_to_bytes`.
'''
event = string_to_bytes(event)
self.socket.send_multipart(event)
[docs] def send_tcs_event(self, tcs_topic, tcs_event):
'''Version of :meth:`send_event` specialized to send TCS-like events.
Set ``__wire_time`` in the ``tcs_event`` to :func:`time.time`.
If event is a string, decode it into a dictionary using
:func:`json.loads`.
Parameters
----------
tcs_topic : string
topic of the event
tcs_event : dict or string
event to send
'''
if isinstance(tcs_event, six.string_types):
tcs_event = json.loads(tcs_event)
wire_time = '__wire_time'
tcs_event[wire_time] = time_str()
tcs_event = json.dumps(tcs_event)
self.send_event([tcs_topic, tcs_event])
[docs] def start(self, events):
'''Start serving events via the socket
Parameters
----------
events : generator yielding lists of strings or bytes
each event retrieved in a loop and sent using :meth:`send_event`.
If one event is ``None``, it is not sent.
'''
for event in events:
if event is not None:
self.send_event(event)
[docs] def close(self):
'''Close the socket'''
self.socket.close()
[docs]class TCSDBReplay(six.Iterator):
'''Open a TCS sqlite3 database, query it and return one entry at a time
when looping or using the :func:`next` builtin function.
Parameters
----------
db_name : string
file containing the database
sort_by : string, optional
whether to sort or not the event_ids. Accepted values: ``'none'``,
``'__data_time'``, ``'__wire_time'``
speedup : float, optional
speedup to use to replay the database. A value larger than 1
fast-forwards the replay, a smaller value slows the replay down
topics : list, optional
list of topics to return when iterating. If ``None`` or ``[]``, all
topics are returned.
convert_number, convert_bool : bool, optional
try to convert database entries to number (int or float) and to
boolean, according to the value of the ``data_type`` column. The
conversion is fail-safe
Attributes
----------
event_ids : iterator
results of the query for the ``event_id``. The attribute is filled by
the :meth:`_reset_iter`
start_wire_time, start_iter_time : float
wire time and current time when calling :meth:`__next__` the first
time. Call :meth:`_reset_iter` to reset them before starting a new
iteration
'''
def __init__(self, db_name, sort_by='none', speedup=1., topics=None,
convert_number=False, convert_bool=False):
super(TCSDBReplay, self).__init__()
# initialize the database
self._conn = sqlite3.connect(db_name)
self._conn.text_factory = str
# self._cursor = self._conn.cursor()
# save the other variables
self._speedup = speedup
self._topics = topics
self._convert_number = convert_number
self._convert_bool = convert_bool
# query to get the IDs
if sort_by == 'none':
self._id_query = "SELECT event_id FROM event"
elif sort_by == "__data_time":
self._id_query = "SELECT event_id FROM event ORDER BY origts"
elif sort_by == "__wire_time":
self._id_query = ('SELECT event_id FROM attribute'
' WHERE keyname="__wire_time"'
' ORDER BY CAST(value as decimal)')
else:
raise DBOrderingError('Unknown sorting "{}"'.format(sort_by))
[docs] def _reset_iter(self):
'''Reset the status and allow restarting the iterations.
* Create and execute the query to retrieve the ``event_id`` and save it
in :attr:`event_ids`. The query is done on the ``event`` table, if no
ordering ``'none'`` or the ``'__data_time'`` ordering is required, or
on the ``attribute`` table, if the ``'__wire_time'`` ordering is
required.
* Unset the :attr:`start_wire_time` and :attr:`start_iter_time`
This method is called when initializing the class.
It can be called to reinitialize the iterator.
Raises
------
tcs_lib.DBOrderingError
if the ordering is not known
'''
self.event_ids = self._conn.cursor().execute(self._id_query)
self.start_wire_time = self.start_iter_time = None
[docs] def __iter__(self):
'''Return the instance for use as iterator'''
self._reset_iter()
return self
[docs] def __next__(self):
'''Get the next element of :attr:`event_id`, build the TCS object and
return it.
Returns
-------
list of strings or ``None``
if the topic is accepted returns ``[topic, json(event)]``,
otherwise returns ``None``
'''
# get the event ID and create the event dictionary
event_id = next(self.event_ids)[0]
event_dict = self._event_dict(event_id)
# sleep for the time necessary
delta_time = float(event_dict['__wire_time']) - time.time()
if delta_time > 0:
time.sleep(delta_time)
topic = '{}.{}.{}'.format(event_dict['__system'],
event_dict['__source'], event_dict['__key'])
if self._topics and topic not in self._topics:
return None
else:
return [topic, json.dumps(event_dict)]
[docs] def _event_dict(self, event_id):
'''Create dictionary representing the event with id ``event_id``
Parameters
----------
event_id : string
id of the event
Returns
-------
result : dictionary
event information
'''
result = {}
result["__event_id"] = event_id
event_query = "SELECT * FROM attribute WHERE event_id=?"
query = self._conn.cursor().execute(event_query, [str(event_id), ])
for _, keyname, value, type_ in query:
result[keyname] = self._convert_to_type(value, type_)
return self._update_times(result)
[docs] def _convert_to_type(self, value, type_):
'''Try to convert value to the given type, if the conversion is
required
Parameters
----------
value : string
value to convert
type_ : string
type of value. Known types: "number", "boolean", "string". Any
unknown type is treated as a string
Returns
-------
value : int, float, bool or string
converted value
'''
if type_ == 'number' and self._convert_number:
return self._convert_to_number(value)
elif type_ == 'boolean' and self._convert_bool:
return self._convert_to_bool(value)
else:
return value
[docs] def _convert_to_number(self, value):
'''Try to convert the input ``value`` from string to ``int`` or
``float``, in that order. If it fails, returns ``value`` unchanged'''
try:
return int(value)
except ValueError:
try:
return float(value)
except ValueError:
return value
[docs] def _convert_to_bool(self, value):
'''Try to convert the input ``value`` from ``"true"``/``"false"`` to
``True``/``False``. If it fails, returns ``value`` unchanged'''
if value == 'true':
return True
elif value == 'false':
return False
else:
return value
[docs] def _update_times(self, event_dict):
'''Update the ``'__data_time'`` and ``'__wire_time'`` as an offset from
:attr:`start_iter_time` according to the required speedup.
Parameters
----------
event_dict : dictionary
event information
Returns
-------
event_dict : dictionary
updated input
'''
wire_time = float(event_dict["__wire_time"])
data_time = float(event_dict["__data_time"])
if self.start_wire_time is None:
self.start_wire_time = wire_time
self.start_iter_time = time.time()
# compute the new wire time offset, considering the speedup, and then
# put it on top of the start_iter_time
new_wire_time = (wire_time - self.start_wire_time) / self._speedup
new_wire_time += self.start_iter_time
new_data_time = (data_time - wire_time) / self._speedup + new_wire_time
# reset the data time and the wire time rescaled to the new wire time
event_dict["__data_time"] = '{0:f}'.format(new_data_time)
event_dict["__wire_time"] = '{0:f}'.format(new_wire_time)
return event_dict
[docs]class TCSMockEvent(six.Iterator):
'''Iterator class that always return a mock event
``'lrs2.hardware.status'``
Parameters
----------
sleep : float, optional
sleep ``sleep`` seconds between events
'''
def __init__(self, sleep=1):
super(TCSMockEvent, self).__init__()
self._sleep = sleep
[docs] def __iter__(self):
'''Return the instance for use as iterator'''
return self
[docs] def __next__(self):
'''Return a mock event after sleeping for one second.
It never raises a StopIteration exception.
Returns
-------
list of strings
topic and mock event as ``[topic, json(event)]``
'''
event = {"__pid": "12498",
"lrs2.mux.000.cntl.086.cryo_pressure": "-999999.900000",
"__data": "false",
"lrs2.mux.000.cntl.086.ccd.15590.temp": str(-115+random()*2),
"lrs2.mux.000.cntl.033.cryo_pressure_valid": "false",
"lrs2.mux.000.cntl.033.ccd.15612.heater_voltage": "0.919246",
"__system": "lrs2",
"lrs2.mux.000.cntl.086.ccd.15537.temp": str(-115+random()*3),
"lrs2.mux.000.cntl.086.ccd.15590.amp.UR.bias": "3300",
"lrs2.mux.000.cntl.086.ccd.15590.amp.LL.bias": "3316",
"lrs2.mux.000.cntl.033.ccd.15609.amp.UR.bias": "1925",
"lrs2.mux.000.cntl.086.ccd.15537.amp.LL.bias": "3376",
"lrs2.mux.000.cntl.033.ccd.15609.temp": "-115.001992",
"lrs2.mux.000.cntl.033.cryo_temp": "-168.084658",
"lrs2.mux.000.cntl.033.ccd.15612.temp": "-115.001992",
"lrs2.mux.000.cntl.033.ccd.15612.amp.LL.bias": "2173",
"lrs2.mux.000.cntl.033.ccd.15609.heater_voltage": "0.743898",
"__key": "status",
"__data_time": str(time.time()),
"__wire_time": str(time.time()),
"lrs2.mux.000.cntl.086.cryo_temp": "-168.088844",
"lrs2.mux.000.cntl.086.ccd.15537.amp.UR.bias": "3402",
"lrs2.mux.000.cntl.033.cryo_pressure": "-999999.900000",
"lrs2.mux.000.cntl.086.cryo_pressure_valid": "false",
"lrs2.mux.000.cntl.086.ccd.15590.heater_voltage": "0.547796",
"lrs2.mux.000.cntl.033.ccd.15612.amp.UR.bias": "1713",
"__source": "hardware",
"lrs2.mux.000.cntl.086.ccd.15537.heater_voltage": "0.701238",
"lrs2.mux.000.cntl.033.ccd.15609.amp.LL.bias": "1684"}
time.sleep(self._sleep)
return ['lrs2.hardware.status', json.dumps(event)]