D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
cloudlinux
/
venv
/
lib64
/
python3.11
/
site-packages
/
lvestats
/
plugins
/
generic
/
Filename :
cm_collector.py
back
Copy
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import logging import pwd import json import time from clcommon import clpwd from clcommon.lib.jwt_token import jwt_token_check from clcommon.lib.cmt_utils import is_cmt_disabled, is_client_enabled from clcommon.clproc import ProcLve, LIMIT_LVP_ID from clcommon.lib import MySQLGovernor, MySQLGovException, MySQLGovernorAbsent from lvestat import LVEStat from lvestats.core.plugin import LveStatsPlugin from lvestats.lib.commons.sizeutil import mempages_to_bytes from lvestats.lib.commons.func import get_domains from lvestats.plugins.generic.analyzers import LVEUsage from secureio import write_file_via_tempfile GOVERNOR_ENABLED_MSG = ( "There was detected MySQL Governor and information about limits will be collected starting from now" ) GOVERNOR_BASIC_ERROR_MSG = 'Error while getting Mysql Governor limits: %s. ' GOVERNOR_ABSENT_ERROR_MSG = ( GOVERNOR_BASIC_ERROR_MSG + "Since the governor doesn't seem to be well " "configured, the limits statistics won't be " "collected. This warning won't be repeated to " "prevent annoying spam in logs." ) CM_DISABLED_MSG = ( "Centralized Monitoring isn't enabled. " "CMCollector plugin won't collect data. It's expected " "behaviour for non CL+ clients" ) CM_ENABLED_MSG = ( "Centralized Monitoring has been detected! From now, " "CMCollector will gather limits statistics for it gently " "and affectionate." ) class CMCollector(LveStatsPlugin): """ Collector plugin for Cloudlinux+ Centralized Monitoring """ class DomainsCache: """ Sub-class for keeping username:domain pairs """ def __init__(self, logger): self.log = logger self.cache = {} self.cache_creation_time = 0 self.cache_expiration_interval = 60 * 60 * 24 # 1 day def get(self, username): if self._is_expired(): self.log.info('Domains cache is expired, going to regenerate user-domains cache') self.cache = {} if username in self.cache: return self.cache[username] domain = self._get_real_value(username) if domain: self.set(username, domain) return domain def set(self, username, domain): if not self.cache: self.cache_creation_time = int(time.time()) self.log.info('Cache creation timestamp: %s', str(self.cache_creation_time)) self.cache[username] = domain def _is_expired(self): return int(time.time()) - self.cache_creation_time > self.cache_expiration_interval def _get_real_value(self, username): try: return get_domains(username, raise_exception=False)[0] except Exception: # skip unavailable user or domain # We ignore all exceptions to reduce failure risks on custom panels return None def __init__(self): self.log = logging.getLogger('CL+CM_Collector') self.now = 0 # This changes in MainLoop self.period = 60 # Run this plugin every minute self._filename_to_write = '/var/lve/cm_lve.json' self._is_write_error = False self._is_jwt_token_error = False self._is_governor_absent_error = False self._is_cm_enabled_error = False self.log.info("CM Collector plugin init") self.proc_lve = ProcLve() self.governor = MySQLGovernor() self.min_uid = clpwd.ClPwd().get_sys_min_uid() self.domains_cache = self.DomainsCache(self.log) def _prepare_dbgov_limits(self, username): """ Gets Mysql Governor limits and returns ready-to-send dict with limits and zero-usage """ try: cpu, io = self.governor.get_limits_by_user(username) if self._is_governor_absent_error: self.log.info(GOVERNOR_ENABLED_MSG) self._is_governor_absent_error = False # Catch specific governor absence error to prevent logs spam except MySQLGovernorAbsent as e: if not self._is_governor_absent_error: self.log.warning(GOVERNOR_ABSENT_ERROR_MSG, str(e)) self._is_governor_absent_error = True return None except MySQLGovException as e: self.log.warning(GOVERNOR_BASIC_ERROR_MSG, str(e)) return None return { 'mysql_cpu': {'l': cpu}, 'mysql_io': { 'l': io * 1024 # convert values (in KB) from governor to bytes }, } def _parse_limits(self, procs, lvp_id=0): """ Parse lve limits from /proc/lve via ProcLve instance """ limits = {} for line in self.proc_lve.lines(lvp_id, without_limits=False): version = self.proc_lve.version() stat = LVEStat(line, version) if stat.id == 0 or stat.id == LIMIT_LVP_ID: continue limits[stat.id] = self._lveusage_to_dict( LVEUsage(version).init_limits_from_lvestat(stat, procs), only_limits=True ) return limits def _get_lve_users(self, procs): """ Returns dict with lve_id and it`s limits from /proc/lve """ users = {} if self.proc_lve.resellers_supported(): for lvp_id in self.proc_lve.lvp_id_list(): users.update(self._parse_limits(procs, lvp_id=lvp_id)) users.update(self._parse_limits(procs)) return users @staticmethod def _lveusage_to_dict(usage: LVEUsage, only_limits: bool = False): """ Return ready-to-send dict with metrics by passed lve usage """ if only_limits: return { 'cpu': {'l': usage.lcpu / 100}, 'ep': {'l': usage.lep}, 'pmem': {'l': mempages_to_bytes(usage.lmemphy)}, 'nproc': {'l': usage.lnproc}, 'io': {'l': usage.io}, 'iops': {'l': usage.liops}, } return { 'cpu': { 'a': int(round(usage.cpu_usage)) / 100, 'l': usage.lcpu / 100, 'f': usage.cpu_fault, }, 'ep': {'a': usage.mep, 'l': usage.lep, 'f': usage.mep_fault}, 'pmem': { 'a': mempages_to_bytes(int(round(usage.memphy))), 'l': mempages_to_bytes(usage.lmemphy), 'f': usage.memphy_fault, }, 'nproc': { 'a': int(round(usage.nproc)), 'l': usage.lnproc, 'f': usage.nproc_fault, }, 'io': { 'a': int(round(usage.io_usage)), 'l': usage.io, 'f': usage.io_fault, }, 'iops': { 'a': int(round(usage.iops)), 'l': usage.liops, 'f': usage.iops_fault, }, } def _prepare_metrics_to_send(self, lve_data): """ Returns list with metrics (lve&mysql) by each lve_id In case there is some usage for lve_id -> it will be used otherwise -> zero-usage values with actual lve/mysql limits are used """ lve_data_list = [] dbgov_data_for_cm = lve_data.pop("dbgov_data_for_cm", {}) users = self._get_lve_users(lve_data.get('procs', 1)) lve_usage = lve_data.get('lve_usage', {}) for lve_id, limits_data in users.items(): try: username = pwd.getpwuid(lve_id).pw_name except Exception: # skip unavailable user continue domain = self.domains_cache.get(username) if not domain: continue lve_data_dict = {"lve_id": lve_id, "username": username, "domain": domain} if lve_id in lve_usage: v = lve_usage[lve_id] lve_data_dict.update(self._lveusage_to_dict(v)) else: lve_data_dict.update(limits_data) # process MySQL governor data # dbgov_data_for_cm example # {1001: {'cpu_limit': 100, 'io_limit': 562036736, 'cpu_usage': 99.498342, 'io_usage': 2055473}} if lve_id in dbgov_data_for_cm: lve_data_dict["mysql_cpu"] = { "a": dbgov_data_for_cm[lve_id]['cpu_usage'], "l": dbgov_data_for_cm[lve_id]['cpu_limit'], } lve_data_dict["mysql_io"] = { "a": dbgov_data_for_cm[lve_id]['io_usage'], "l": dbgov_data_for_cm[lve_id]['io_limit'], } else: mysql_limits = self._prepare_dbgov_limits(username) # append mysql limits only if governor present if mysql_limits is not None: lve_data_dict.update(mysql_limits) lve_data_list.append(lve_data_dict) return lve_data_list def execute(self, lve_data): """ Main plugin entrypoint """ # Check and read JWT token token_is_valid, token_error_msg, _ = jwt_token_check() if not token_is_valid: # Token absent or invalid if not self._is_jwt_token_error: self.log.info( "JWT token error: '%s'. CM Collector plugin will not work, " "it is expected behaviour for non CL+ clients", token_error_msg, ) self._is_jwt_token_error = True return # CMCollector should be executed only if CM for client is enabled if is_cmt_disabled() or not is_client_enabled(): # CM isn't enabled if not self._is_cm_enabled_error: self.log.info(CM_DISABLED_MSG) self._is_cm_enabled_error = True return self._is_jwt_token_error = False if self._is_cm_enabled_error: # Welcome message after plugin start self.log.info(CM_ENABLED_MSG) self._is_cm_enabled_error = False json_result = json.dumps({'lve_stats': self._prepare_metrics_to_send(lve_data)}) try: write_file_via_tempfile(json_result, self._filename_to_write, 0o600) self._is_write_error = False except (IOError, OSError) as e: if not self._is_write_error: self.log.error(str(e)) self._is_write_error = True