Source code for herostools.actor.prometheus
# coding:utf-8
"""
# Prometheus Actor
Use a prometheus push-gateway as target for prometheus scrape target.
docker pull prom/pushgateway
docker run -p 9091:9091 prom/pushgateway
A PrometheusActor aggregates data from labsurveillance
and pushes on Timeout to a prometheus push gateway.
Thereby observables are exposed as metrics
* {observable}_time {return_value.time}
* {observable}_value {return_value.value}
if unit is not 'None' a label {unit='{{return_value.unit}}'} is attached
* {observable}_raw_value {return_value.raw_value}
if unit is not 'None' a label {unit='{{return_value.raw_unit}}'} is attached
A predefined set of labels is attached to the metrics.
Observable names are cleaned to match metric names [a-z0-9_]
trailing _ and repetitions _ are removed.
"""
import re
import requests
from heros import DatasourceObserver
from herostools.helper import log
from six import iteritems, integer_types
no_metric = re.compile(r"[\Wäöüß]")
minify_metric = re.compile(r"__+")
allowed_types = integer_types + (
float,
bool,
)
[docs]
class PrometheusActor(DatasourceObserver):
def __init__(self, *args, metrics_path="http://localhost:9091/metrics", labels=dict(job="labsuv"), **kwargs):
"""
An actor to cache and push date to a prometheus push gateway.
:param metrics_path: (str) path to pushgateway metrics
:param labels: (dict<str, str>) a dictionary with additional labels.
"""
DatasourceObserver.__init__(self, object_selector="*", *args, **kwargs)
metrics_path = metrics_path.strip("/")
metrics_path = metrics_path if metrics_path.endswith("/metrics") else "%s/metrics" % metrics_path
metrics_path = metrics_path if metrics_path.startswith("http") else "http://%s" % metrics_path
self.metrics_path = metrics_path
self.labels = labels
self.path = "/".join(
"%s/%s" % (self._metric_name(label), self._metric_name(value)) for label, value in iteritems(labels)
)
self.cache = {}
self._session = None
self.register_callback(self.update)
[docs]
def _ensure_session(self):
if self._session is None:
self._session = requests.Session()
return self._session
[docs]
def _metric_name(self, key):
"""create a clean name for a metric without special characters matching [a-z0-9_]."""
metric = no_metric.sub("_", key.lower())
metric = minify_metric.sub("_", metric)
metric = metric.strip("_")
return metric
[docs]
def _unit_name(self, unit):
"""cleans units to not contain special characters"""
# according to https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
# the values of a label (in this case the value of the label "unit") can contain any
# Unicode character. However, stuff went havoc when confronted with a µ...
# it's useful to replace µ with u, e.g. µA -> uA
out = unit.replace("µ", "u")
# I don't know what else to clear and want to keep "/", so that's it for now...
return out
[docs]
def update(self, source_name, data):
"""convert a labsuv data dictionary to values"""
log.debug(f"updating values received from {source_name}")
for entry in data:
key = entry.id
metric = self._metric_name(key)
self.cache["%s_inbound" % metric] = entry.inbound
self.cache["%s_time" % metric] = entry.time
if entry.value is not None and isinstance(entry.value, allowed_types):
key = "%s_value" % metric
if entry.unit != "None":
key += '{unit="%s"}' % (self._unit_name(entry.unit),)
self.cache[key] = float(entry.value)
if entry.raw_value is not None and isinstance(entry.raw_value, allowed_types):
key = "%s_raw_value" % metric
if entry.raw_unit != "None":
key += '{unit="%s"}' % (self._unit_name(entry.raw_unit),)
self.cache[key] = float(entry.raw_value)
[docs]
def clear(self):
self.cache.clear()
[docs]
def push(self):
"""push current buffer to pushgateway."""
if not self.cache:
log.debug("PrometheusActor has nothing to push.")
return
data = "\n".join("%s %s" % (k, v) for k, v in iteritems(self.cache))
data += "\n"
self.clear()
url = "/".join((self.metrics_path, self.path))
request = requests.Request(method="POST", url=url, data=data, headers={"Content-Type": "text"})
prep = request.prepare()
session = self._ensure_session()
result = session.send(prep)
assert result.status_code < 300, "Cannot push data to %s due to %s \n %s " % (
url,
result.status_code,
result.content,
)
if __name__ == "__main__":
import asyncio
log.setLevel("debug")
loop = asyncio.new_event_loop()
p = PrometheusActor(metrics_path="127.0.0.1:9091")
async def push_loop():
while True:
p.push()
await asyncio.sleep(5)
loop.create_task(push_loop())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.stop()
p._session_manager.force_close()
# data = dict(test=ReturnValue(value=1, unit="K"))
# p(data)
# p.push()