Source code for tcs_lib.server

# Library with high level interface to HET's TCS
# Copyright (C) 2017, 2018 "The HETDEX collaboration"
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
'''ZMQ based server to stream content and TCS-like event generators.
'''
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

import json
from random import random
import sqlite3
import time

import six
import zmq

from .string_helpers import string_to_bytes, time_str
from .errors import DBOrderingError


[docs]class ZMQServer(object): '''Create a ZeroMQ server that publishes content on the give ``ulr``. Sending events is performed by the :meth:`send_event` method using :meth:`zmq.Socket.send_multipart`. :meth:`send_event` and :meth:`start` allow to send generic events. :meth:`send_tcs_event` is designed to send events that reflect TCS expectations. If the ``tcs_event`` passed to the method does not contain the ``__wire_time`` key, set it to :func:`time.time`. Parameters ---------- url : string url and port to bind the socket to context : :class:`zmq.Context`, optional context to use when creating the sockets. If not given, uses the global instance returned by :meth:`zmq.Context.instance` ''' def __init__(self, url, context=None): if context: self.context = context else: self.context = zmq.Context.instance() self.socket = self.context.socket(zmq.PUB) self.socket.bind(url)
[docs] def send_event(self, event): '''Send the event via the socket. Parameters ---------- event : list of string event to send using :meth:`zmq.Socket.send_multipart`; the events are converted to byte string using :func:`~tcs_lib.string_helpers.string_to_bytes`. ''' event = string_to_bytes(event) self.socket.send_multipart(event)
[docs] def send_tcs_event(self, tcs_topic, tcs_event): '''Version of :meth:`send_event` specialized to send TCS-like events. Set ``__wire_time`` in the ``tcs_event`` to :func:`time.time`. If event is a string, decode it into a dictionary using :func:`json.loads`. Parameters ---------- tcs_topic : string topic of the event tcs_event : dict or string event to send ''' if isinstance(tcs_event, six.string_types): tcs_event = json.loads(tcs_event) wire_time = '__wire_time' tcs_event[wire_time] = time_str() tcs_event = json.dumps(tcs_event) self.send_event([tcs_topic, tcs_event])
[docs] def start(self, events): '''Start serving events via the socket Parameters ---------- events : generator yielding lists of strings or bytes each event retrieved in a loop and sent using :meth:`send_event`. If one event is ``None``, it is not sent. ''' for event in events: if event is not None: self.send_event(event)
[docs] def close(self): '''Close the socket''' self.socket.close()
[docs]class TCSDBReplay(six.Iterator): '''Open a TCS sqlite3 database, query it and return one entry at a time when looping or using the :func:`next` builtin function. Parameters ---------- db_name : string file containing the database sort_by : string, optional whether to sort or not the event_ids. Accepted values: ``'none'``, ``'__data_time'``, ``'__wire_time'`` speedup : float, optional speedup to use to replay the database. A value larger than 1 fast-forwards the replay, a smaller value slows the replay down topics : list, optional list of topics to return when iterating. If ``None`` or ``[]``, all topics are returned. convert_number, convert_bool : bool, optional try to convert database entries to number (int or float) and to boolean, according to the value of the ``data_type`` column. The conversion is fail-safe Attributes ---------- event_ids : iterator results of the query for the ``event_id``. The attribute is filled by the :meth:`_reset_iter` start_wire_time, start_iter_time : float wire time and current time when calling :meth:`__next__` the first time. Call :meth:`_reset_iter` to reset them before starting a new iteration ''' def __init__(self, db_name, sort_by='none', speedup=1., topics=None, convert_number=False, convert_bool=False): super(TCSDBReplay, self).__init__() # initialize the database self._conn = sqlite3.connect(db_name) self._conn.text_factory = str # self._cursor = self._conn.cursor() # save the other variables self._speedup = speedup self._topics = topics self._convert_number = convert_number self._convert_bool = convert_bool # query to get the IDs if sort_by == 'none': self._id_query = "SELECT event_id FROM event" elif sort_by == "__data_time": self._id_query = "SELECT event_id FROM event ORDER BY origts" elif sort_by == "__wire_time": self._id_query = ('SELECT event_id FROM attribute' ' WHERE keyname="__wire_time"' ' ORDER BY CAST(value as decimal)') else: raise DBOrderingError('Unknown sorting "{}"'.format(sort_by))
[docs] def _reset_iter(self): '''Reset the status and allow restarting the iterations. * Create and execute the query to retrieve the ``event_id`` and save it in :attr:`event_ids`. The query is done on the ``event`` table, if no ordering ``'none'`` or the ``'__data_time'`` ordering is required, or on the ``attribute`` table, if the ``'__wire_time'`` ordering is required. * Unset the :attr:`start_wire_time` and :attr:`start_iter_time` This method is called when initializing the class. It can be called to reinitialize the iterator. Raises ------ tcs_lib.DBOrderingError if the ordering is not known ''' self.event_ids = self._conn.cursor().execute(self._id_query) self.start_wire_time = self.start_iter_time = None
[docs] def __iter__(self): '''Return the instance for use as iterator''' self._reset_iter() return self
[docs] def __next__(self): '''Get the next element of :attr:`event_id`, build the TCS object and return it. Returns ------- list of strings or ``None`` if the topic is accepted returns ``[topic, json(event)]``, otherwise returns ``None`` ''' # get the event ID and create the event dictionary event_id = next(self.event_ids)[0] event_dict = self._event_dict(event_id) # sleep for the time necessary delta_time = float(event_dict['__wire_time']) - time.time() if delta_time > 0: time.sleep(delta_time) topic = '{}.{}.{}'.format(event_dict['__system'], event_dict['__source'], event_dict['__key']) if self._topics and topic not in self._topics: return None else: return [topic, json.dumps(event_dict)]
[docs] def _event_dict(self, event_id): '''Create dictionary representing the event with id ``event_id`` Parameters ---------- event_id : string id of the event Returns ------- result : dictionary event information ''' result = {} result["__event_id"] = event_id event_query = "SELECT * FROM attribute WHERE event_id=?" query = self._conn.cursor().execute(event_query, [str(event_id), ]) for _, keyname, value, type_ in query: result[keyname] = self._convert_to_type(value, type_) return self._update_times(result)
[docs] def _convert_to_type(self, value, type_): '''Try to convert value to the given type, if the conversion is required Parameters ---------- value : string value to convert type_ : string type of value. Known types: "number", "boolean", "string". Any unknown type is treated as a string Returns ------- value : int, float, bool or string converted value ''' if type_ == 'number' and self._convert_number: return self._convert_to_number(value) elif type_ == 'boolean' and self._convert_bool: return self._convert_to_bool(value) else: return value
[docs] def _convert_to_number(self, value): '''Try to convert the input ``value`` from string to ``int`` or ``float``, in that order. If it fails, returns ``value`` unchanged''' try: return int(value) except ValueError: try: return float(value) except ValueError: return value
[docs] def _convert_to_bool(self, value): '''Try to convert the input ``value`` from ``"true"``/``"false"`` to ``True``/``False``. If it fails, returns ``value`` unchanged''' if value == 'true': return True elif value == 'false': return False else: return value
[docs] def _update_times(self, event_dict): '''Update the ``'__data_time'`` and ``'__wire_time'`` as an offset from :attr:`start_iter_time` according to the required speedup. Parameters ---------- event_dict : dictionary event information Returns ------- event_dict : dictionary updated input ''' wire_time = float(event_dict["__wire_time"]) data_time = float(event_dict["__data_time"]) if self.start_wire_time is None: self.start_wire_time = wire_time self.start_iter_time = time.time() # compute the new wire time offset, considering the speedup, and then # put it on top of the start_iter_time new_wire_time = (wire_time - self.start_wire_time) / self._speedup new_wire_time += self.start_iter_time new_data_time = (data_time - wire_time) / self._speedup + new_wire_time # reset the data time and the wire time rescaled to the new wire time event_dict["__data_time"] = '{0:f}'.format(new_data_time) event_dict["__wire_time"] = '{0:f}'.format(new_wire_time) return event_dict
[docs]class TCSMockEvent(six.Iterator): '''Iterator class that always return a mock event ``'lrs2.hardware.status'`` Parameters ---------- sleep : float, optional sleep ``sleep`` seconds between events ''' def __init__(self, sleep=1): super(TCSMockEvent, self).__init__() self._sleep = sleep
[docs] def __iter__(self): '''Return the instance for use as iterator''' return self
[docs] def __next__(self): '''Return a mock event after sleeping for one second. It never raises a StopIteration exception. Returns ------- list of strings topic and mock event as ``[topic, json(event)]`` ''' event = {"__pid": "12498", "lrs2.mux.000.cntl.086.cryo_pressure": "-999999.900000", "__data": "false", "lrs2.mux.000.cntl.086.ccd.15590.temp": str(-115+random()*2), "lrs2.mux.000.cntl.033.cryo_pressure_valid": "false", "lrs2.mux.000.cntl.033.ccd.15612.heater_voltage": "0.919246", "__system": "lrs2", "lrs2.mux.000.cntl.086.ccd.15537.temp": str(-115+random()*3), "lrs2.mux.000.cntl.086.ccd.15590.amp.UR.bias": "3300", "lrs2.mux.000.cntl.086.ccd.15590.amp.LL.bias": "3316", "lrs2.mux.000.cntl.033.ccd.15609.amp.UR.bias": "1925", "lrs2.mux.000.cntl.086.ccd.15537.amp.LL.bias": "3376", "lrs2.mux.000.cntl.033.ccd.15609.temp": "-115.001992", "lrs2.mux.000.cntl.033.cryo_temp": "-168.084658", "lrs2.mux.000.cntl.033.ccd.15612.temp": "-115.001992", "lrs2.mux.000.cntl.033.ccd.15612.amp.LL.bias": "2173", "lrs2.mux.000.cntl.033.ccd.15609.heater_voltage": "0.743898", "__key": "status", "__data_time": str(time.time()), "__wire_time": str(time.time()), "lrs2.mux.000.cntl.086.cryo_temp": "-168.088844", "lrs2.mux.000.cntl.086.ccd.15537.amp.UR.bias": "3402", "lrs2.mux.000.cntl.033.cryo_pressure": "-999999.900000", "lrs2.mux.000.cntl.086.cryo_pressure_valid": "false", "lrs2.mux.000.cntl.086.ccd.15590.heater_voltage": "0.547796", "lrs2.mux.000.cntl.033.ccd.15612.amp.UR.bias": "1713", "__source": "hardware", "lrs2.mux.000.cntl.086.ccd.15537.heater_voltage": "0.701238", "lrs2.mux.000.cntl.033.ccd.15609.amp.LL.bias": "1684"} time.sleep(self._sleep) return ['lrs2.hardware.status', json.dumps(event)]