D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
cloudlinux
/
venv
/
lib64
/
python3.11
/
site-packages
/
lvestats
/
plugins
/
generic
/
burster
/
Filename :
__init__.py
back
Copy
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import atexit import contextlib import time import typing from datetime import timedelta from contextlib import ExitStack from typing import TypedDict, Callable, TYPE_CHECKING, Generator import sqlalchemy as sa if TYPE_CHECKING: from lvestat import LVEStat from lvestats.plugins.generic.analyzers import LVEUsage from lvestats.orm import BurstingEventType from ._logs import logger from .utils import bootstrap_gen from .config import ( StartupParams, PluginConfig, Config, ConfigUpdate, is_bursting_supported, MissingKeysInRawConfig, ) from .common import ( BurstingMultipliers, LveState, SerializedLveId, GetNormalLimits, ApplyLveSettings, AdjustStepData, read_normal_limits_from_proc, Timestamp, PyLveSettingsApplier, ) from .overload import OverloadChecker, GetStats, read_times_from_proc from .storage import ( init_db_schema, load_bursting_enabled_intervals_from_db, events_saver_running, cleanup_running, InBurstingEventRow, ) from .adjust import Adjuster, StepCalculator from .lve_sm import LveStateManager from .lves_tracker import LvesTracker, LveStateManagerFactory class _ExecutePayload(TypedDict): lve_active_ids: list[SerializedLveId] stats: dict[SerializedLveId, 'LVEStat'] lve_usage_5s: dict[SerializedLveId, 'LVEUsage'] class LveLimitsBurster: """ Limits Burster plugin """ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) if is_bursting_supported(): driver_gen = self._create_driver_gen() step = driver_gen.send # NOTE(vlebedev): It seems that plugins interface of lvestats does not contain any sane way # to get norified about server being stopped. Let's resort to hacks =/ @atexit.register def cleanup(): with contextlib.suppress(StopIteration, GeneratorExit): driver_gen.close() else: logger.info('Bursting Limits feature is not supported in current environment') def step(_, /): pass self._step: Callable[[sa.engine.Engine | ConfigUpdate | _ExecutePayload], None] = step @bootstrap_gen def _create_driver_gen(self): # NOTE(vlebedev): This import requires some shared library to be present in order to succeed, # so deffer it until it's really needed to make unittests writing/running easier. from lveapi import PyLve # pylint: disable=import-outside-toplevel # NOTE(vlebedev): This is supposed to be a composition root. # NOTE(vlebedev): Wait until all data required for proper startup is received. engine, initial_config = yield from StartupParams.wait() pylve = PyLve() if not pylve.initialize(): raise RuntimeError('Failed to initialize PyLve!') with adjuster_machinery_running( initial_config=initial_config, engine=engine, get_normal_limits=read_normal_limits_from_proc, apply_lve_settings=PyLveSettingsApplier( pylve=pylve, ), read_stats=read_times_from_proc, ) as adjuster: logger.info('LveLimitsBurster initialized') while True: msg = yield if not isinstance(msg, dict): logger.warning('Unexpected message type: %s', type(msg)) continue now = Timestamp(int(time.time())) lve_active_ids = msg.get('lve_active_ids', []) stats = msg.get('stats', {}) try: lve_usage_by_id = msg["lve_usages_5s"][-1] except (KeyError, IndexError): lve_usage_by_id = {} adjuster.step(AdjustStepData( now=now, lve_active_ids=lve_active_ids, stats=stats, lve_usages_by_id=lve_usage_by_id, )) def set_config(self, config: PluginConfig) -> None: # NOTE(vlebedev): Currently config dict contains all the keys from _all_ .cfg files parsed by # lvestats. So there is no point as report fields not present in `Confg` typing # as "unknown" or something like that - they might well belong to some other plugin =/ try: config_update = ConfigUpdate.from_plugin_config(config) except MissingKeysInRawConfig as e: logger.info('Missing config keys: %s', e.missing_raw_keys) else: self._step(config_update) def set_db_engine(self, engine: sa.engine.Engine) -> None: # NOTE(vlebedev): 'Engine' is thread safe, so there is no problem in requesting connections # from it on different threads. For more info have a look at this: # https://groups.google.com/g/sqlalchemy/c/t8i3RSKZGb0/m/QxWshAS3iKgJ self._step(engine) def execute(self, lve_data: _ExecutePayload) -> None: self._step(lve_data) @contextlib.contextmanager def adjuster_machinery_running( initial_config: Config, engine: sa.engine.Engine, apply_lve_settings: ApplyLveSettings, get_normal_limits: GetNormalLimits, read_stats: GetStats, ) -> Generator[Adjuster, None, None]: now = Timestamp(int(time.time())) cutoff = Timestamp(int(now - initial_config.bursting_quota_window.total_seconds())) init_db_schema(engine) lve_to_history = load_bursting_enabled_intervals_from_db( engine=engine, cutoff=typing.cast(Timestamp, cutoff), server_id=initial_config.server_id, ) logger.debug('Loaded intervals: %s', len(lve_to_history)) with ExitStack() as deffer: deffer.enter_context(cleanup_running( engine=engine, server_id=initial_config.server_id, cleanup_interval=timedelta(days=1), history_window=timedelta(days=30), fail_fast=initial_config.fail_fast, )) write_event = deffer.enter_context(events_saver_running( engine=engine, server_id=initial_config.server_id, dump_interval=initial_config.db_dump_period, )) adjuster = Adjuster( lves_tracker=(lves_tracker := LvesTracker( create_lve_manager=LveStateManagerFactory( _lve_to_history=lve_to_history, _apply_lve_settings=apply_lve_settings, _quota=initial_config.bursting_quota, _quota_window=initial_config.bursting_quota_window, _bursting_multipliers=BurstingMultipliers( initial_config.bursting_cpu_multiplier, initial_config.bursting_io_multiplier, ), _fail_fast=initial_config.fail_fast, ), fail_fast=initial_config.fail_fast, )), get_normal_limits=get_normal_limits, step_calculator=StepCalculator( overload_threshold=1.0 - initial_config.idle_time_threshold, ), is_server_overloaded=OverloadChecker( idle_time_threshold=initial_config.idle_time_threshold, get_stats=read_stats, max_samples_number=initial_config.idle_time_samples, ), fail_fast=initial_config.fail_fast, ) @lves_tracker.on_manager_added.register def on_new_lve_manager_created(manager: LveStateManager) -> None: lve_id = manager.lve_id @manager.on_state_changed.register def on_lve_state_chagned(old_state: LveState, new_state: LveState) -> None: assert old_state != new_state now = Timestamp(int(time.time())) if new_state == LveState.OVERUSING: write_event(InBurstingEventRow( lve_id=lve_id, timestamp=now, event_type=BurstingEventType.STARTED, )) elif old_state == LveState.OVERUSING: write_event(InBurstingEventRow( lve_id=lve_id, timestamp=now, event_type=BurstingEventType.STOPPED, )) yield adjuster