# coding:utf-8
"""
# State Machine for HEROs
This keeps a cache of the last known state of a datasource and
allows to query this cache via a HTTP metrics endpoint. This is
especially useful for scraping from prometheus or influxdb.
* {observable}_time {return_value.time}
* {observable}_value {return_value.value}
if unit is not 'None' a label {unit='{{return_value.unit}}'} is attached
* {observable}_raw_value {return_value.raw_value}
if unit is not 'None' a label {unit='{{return_value.raw_unit}}'} is attached
A predefined set of labels is attached to the metrics.
Observable names are cleaned to match metric names [a-z0-9_]
trailing _ and repetitions _ are removed.
"""
import re
import copy
from aiohttp import web
import time
from heros import DatasourceObserver
from heros.datasource.types import DatasourceReturnSet
from herostools.helper import log
from six import integer_types
no_metric = re.compile(r"[\Wäöüß]")
minify_metric = re.compile(r"__+")
allowed_types = integer_types + (
float,
bool,
)
[docs]
class HERODatasourceStateMachine(DatasourceObserver):
def __init__(
self,
loop,
*args,
http_port: int = 9090,
bind_address: str = "localhost",
metrics_endpoint="/metrics",
object_selector: str = "*",
labels: dict = {},
**kwargs,
):
"""
An actor to aggregate data from all available datasources and keep a cache that contains the
last known state.
A view on the cache is provided in the prometheus data export format. This can be used as a
target for scraping in prometheus or influxdb.
Args:
loop: asyncio event loop to which the webserver gets attached.
http_port: Port for the HTTP server to run on
"""
DatasourceObserver.__init__(self, object_selector=object_selector, *args, **kwargs)
self.cache = {}
self._http_port = http_port
self._bind_address = bind_address
self._metrics_endpoint = metrics_endpoint
self._global_labels = labels
self.register_callback(self._update)
loop.create_task(self._start_webserver())
[docs]
async def _http_handle_metrics(self, request):
text = (
"\n\n".join(
[
self._convert_to_metrics(return_value_set, prefix=obj_name)
for obj_name, return_value_set in self.cache.items()
]
)
+ "\n"
)
return web.Response(text=text)
[docs]
async def _start_webserver(self):
app = web.Application()
app.add_routes([web.get(self._metrics_endpoint, self._http_handle_metrics)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, self._bind_address, self._http_port)
await site.start()
[docs]
def _metric_name(self, key):
"""create a clean name for a metric without special characters matching [a-z0-9_]."""
metric = no_metric.sub("_", key.lower())
metric = minify_metric.sub("_", metric)
metric = metric.strip("_")
return metric
[docs]
def _unit_name(self, unit):
"""cleans units to not contain special characters"""
# according to https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
# the values of a label (in this case the value of the label "unit") can contain any
# Unicode character. However, stuff went havoc when confronted with a µ...
# it's useful to replace µ with u, e.g. µA -> uA
out = unit.replace("µ", "u")
# I don't know what else to clear and want to keep "/", so that's it for now...
return out
[docs]
def _update(self, source_name, data):
"""
update the values in the cache for source_name
"""
log.debug(f"updating values received from {source_name}")
if isinstance(data, DatasourceReturnSet):
self.cache.update({source_name: data})
else:
log.warning(f"data received from {source_name} is not of type DatasourceReturnSet")
[docs]
def _convert_to_metrics(self, dsrs: DatasourceReturnSet, prefix=None):
metrics = []
for entry in dsrs:
key = entry.id
metric = f"{prefix}_{key}" if prefix is not None else key
metric = self._metric_name(metric)
# we should not expose metrics that are to far the current timestamp since prometheus will otherwise
# omit all metrics
if abs(entry.time - time.time()) > 600:
continue
timestamp = int(entry.time * 1000) # milliseconds since epoch
labels = copy.copy(self._global_labels)
labels.update({"prefix": prefix if prefix else ""})
labels.update({"key": key})
metrics.append([f"{metric}_inbound", copy.copy(labels), entry.inbound, timestamp])
if entry.value is not None and isinstance(entry.value, allowed_types):
log.info(f"setting quantity {prefix}")
if entry.unit != "None":
log.info(f"setting unit for {prefix} to {entry.unit}")
labels.update({"unit": self._unit_name(entry.unit)})
metrics.append([f"{metric}_value", copy.copy(labels), float(entry.value), timestamp])
if entry.raw_value is not None and isinstance(entry.raw_value, allowed_types):
if entry.raw_unit != "None":
labels.update({"unit": self._unit_name(entry.raw_unit)})
metrics.append([f"{metric}_raw_value", copy.copy(labels), float(entry.raw_value), timestamp])
def label_string(labels) -> str:
if len(labels) > 0:
return "{" + ",".join([f'{key}="{value}"' for key, value in labels.items()]) + "}"
else:
return ""
log.info(metrics)
return "\n".join([f"{name}{label_string(labels)} {value} {time}" for name, labels, value, time in metrics])
[docs]
def get_cache(self):
return self.cache
[docs]
def clear(self):
self.cache.clear()
if __name__ == "__main__":
import asyncio
log.setLevel("debug")
loop = asyncio.new_event_loop()
p = HERODatasourceStateMachine(loop, http_port=9090)
async def push_loop():
while True:
log.info("waiting..")
await asyncio.sleep(5)
loop.create_task(push_loop())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.stop()
p._session_manager.force_close()