D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
self
/
root
/
opt
/
hc_python
/
lib64
/
python3.8
/
site-packages
/
prometheus_client
/
Filename :
registry.py
back
Copy
from abc import ABC, abstractmethod import copy from threading import Lock from typing import Dict, Iterable, List, Optional from .metrics_core import Metric # Ideally this would be a Protocol, but Protocols are only available in Python >= 3.8. class Collector(ABC): @abstractmethod def collect(self) -> Iterable[Metric]: pass class _EmptyCollector(Collector): def collect(self) -> Iterable[Metric]: return [] class CollectorRegistry(Collector): """Metric collector registry. Collectors must have a no-argument method 'collect' that returns a list of Metric objects. The returned metrics should be consistent with the Prometheus exposition formats. """ def __init__(self, auto_describe: bool = False, target_info: Optional[Dict[str, str]] = None): self._collector_to_names: Dict[Collector, List[str]] = {} self._names_to_collectors: Dict[str, Collector] = {} self._auto_describe = auto_describe self._lock = Lock() self._target_info: Optional[Dict[str, str]] = {} self.set_target_info(target_info) def register(self, collector: Collector) -> None: """Add a collector to the registry.""" with self._lock: names = self._get_names(collector) duplicates = set(self._names_to_collectors).intersection(names) if duplicates: raise ValueError( 'Duplicated timeseries in CollectorRegistry: {}'.format( duplicates)) for name in names: self._names_to_collectors[name] = collector self._collector_to_names[collector] = names def unregister(self, collector: Collector) -> None: """Remove a collector from the registry.""" with self._lock: for name in self._collector_to_names[collector]: del self._names_to_collectors[name] del self._collector_to_names[collector] def _get_names(self, collector): """Get names of timeseries the collector produces and clashes with.""" desc_func = None # If there's a describe function, use it. try: desc_func = collector.describe except AttributeError: pass # Otherwise, if auto describe is enabled use the collect function. if not desc_func and self._auto_describe: desc_func = collector.collect if not desc_func: return [] result = [] type_suffixes = { 'counter': ['_total', '_created'], 'summary': ['_sum', '_count', '_created'], 'histogram': ['_bucket', '_sum', '_count', '_created'], 'gaugehistogram': ['_bucket', '_gsum', '_gcount'], 'info': ['_info'], } for metric in desc_func(): result.append(metric.name) for suffix in type_suffixes.get(metric.type, []): result.append(metric.name + suffix) return result def collect(self) -> Iterable[Metric]: """Yields metrics from the collectors in the registry.""" collectors = None ti = None with self._lock: collectors = copy.copy(self._collector_to_names) if self._target_info: ti = self._target_info_metric() if ti: yield ti for collector in collectors: yield from collector.collect() def restricted_registry(self, names: Iterable[str]) -> "RestrictedRegistry": """Returns object that only collects some metrics. Returns an object which upon collect() will return only samples with the given names. Intended usage is: generate_latest(REGISTRY.restricted_registry(['a_timeseries'])) Experimental.""" names = set(names) return RestrictedRegistry(names, self) def set_target_info(self, labels: Optional[Dict[str, str]]) -> None: with self._lock: if labels: if not self._target_info and 'target_info' in self._names_to_collectors: raise ValueError('CollectorRegistry already contains a target_info metric') self._names_to_collectors['target_info'] = _EmptyCollector() elif self._target_info: self._names_to_collectors.pop('target_info', None) self._target_info = labels def get_target_info(self) -> Optional[Dict[str, str]]: with self._lock: return self._target_info def _target_info_metric(self): m = Metric('target', 'Target metadata', 'info') m.add_sample('target_info', self._target_info, 1) return m def get_sample_value(self, name: str, labels: Optional[Dict[str, str]] = None) -> Optional[float]: """Returns the sample value, or None if not found. This is inefficient, and intended only for use in unittests. """ if labels is None: labels = {} for metric in self.collect(): for s in metric.samples: if s.name == name and s.labels == labels: return s.value return None class RestrictedRegistry: def __init__(self, names: Iterable[str], registry: CollectorRegistry): self._name_set = set(names) self._registry = registry def collect(self) -> Iterable[Metric]: collectors = set() target_info_metric = None with self._registry._lock: if 'target_info' in self._name_set and self._registry._target_info: target_info_metric = self._registry._target_info_metric() for name in self._name_set: if name != 'target_info' and name in self._registry._names_to_collectors: collectors.add(self._registry._names_to_collectors[name]) if target_info_metric: yield target_info_metric for collector in collectors: for metric in collector.collect(): m = metric._restricted_metric(self._name_set) if m: yield m REGISTRY = CollectorRegistry(auto_describe=True)