D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
lvestats
/
plugins
/
other
/
Filename :
v1_db_migrator.py
back
Copy
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import logging import os import time from datetime import datetime, timedelta import sqlalchemy from sqlalchemy import Column, Float, Integer, String, func, insert from sqlalchemy.exc import DatabaseError, SQLAlchemyError from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from lvestats.core.plugin import LveStatsPlugin, LveStatsPluginTerminated from lvestats.lib import uidconverter from lvestats.lib.commons.dateutil import gm_datetime_to_unixtimestamp from lvestats.lib.commons.func import get_chunks from lvestats.lib.dbengine import fix_lost_keep_alive, validate_database from lvestats.orm.history import history, history_x60 from lvestats.orm.history_gov import history_gov STATE_FILE = '/var/lve/v1_migration_last.ts' V2_KEYS = [ 'id', 'mem', 'mem_limit', 'mem_fault', 'memphy', 'lmemphy', 'memphy_fault', 'mep', 'mep_limit', 'mep_fault', 'nproc', 'lnproc', 'nproc_fault', 'iops', 'liops', ] V2_GOV_KEYS = [ 'username', 'sum_cpu', 'sum_write', 'sum_read', 'limit_cpu_on_period_end', 'limit_read_on_period_end', 'limit_write_on_period_end', 'cause_of_restrict', ] V1Base = declarative_base() class V1HistoryGov(V1Base): """ Mapping out v1 gov history table """ __tablename__ = 'history_gov' ts = Column('ts', Integer, primary_key=True) username = Column('username', String(64), primary_key=True) sum_cpu = Column('sum_cpu', Float) sum_write = Column('sum_write', Float) sum_read = Column('sum_read', Float) limit_cpu_on_period_end = Column('limit_cpu_on_period_end', Integer) limit_read_on_period_end = Column('limit_read_on_period_end', Integer) limit_write_on_period_end = Column('limit_write_on_period_end', Integer) cause_of_restrict = Column('cause_of_restrict', Integer) server_id = Column('server_id', String(10), primary_key=True) weight = Column('weight', Integer) class V1History(V1Base): """ Mapping out v1 history table """ __tablename__ = 'history' id = Column('id', Integer, primary_key=True) cpu = Column('cpu', Integer) cpu_limit = Column('cpu_limit', Integer) cpu_max = Column('cpu_max', Integer) ncpu = Column('ncpu', Integer) mep = Column('mep', Integer) mep_limit = Column('mep_limit', Integer) mep_max = Column('mep_max', Integer) io = Column('io', Integer) io_max = Column('io_max', Integer) io_limit = Column('io_limit', Integer) mem = Column('mem', Integer) mem_limit = Column('mem_limit', Integer) mem_max = Column('mem_max', Integer) mem_fault = Column('mem_fault', Integer) mep_fault = Column('mep_fault', Integer) created = Column('created', sqlalchemy.types.DateTime, primary_key=True) weight = Column('weight', Integer) server_id = Column('server_id', String(10)) lmemphy = Column('lmemphy', Integer) memphy = Column('memphy', Integer) memphy_max = Column('memphy_max', Integer) memphy_fault = Column('memphy_fault', Integer) lnproc = Column('lnproc', Integer) nproc = Column('nproc', Integer) nproc_max = Column('nproc_max', Integer) nproc_fault = Column('nproc_fault', Integer) iops = Column('iops', Integer) iops_max = Column('iops_max', Integer) liops = Column('liops', Integer) class V1TimeInterval(object): """ The way it would work - on first run, the /var/lve/v1_migration_last.ts will be non-existant, and we will use latest timestamp from V1 db as the 'starting point' After that on each call of get_data we will use that 'starting point' to get_period from start point to 1 hour before. As soon as our start point is > 30 days old -- we will return as part of get_period third parameter true which means that ok, the rest of data is too old, lets move on. V1DBMigrator will convert data for that period, and then will call save_state(from) -- this will be new starting point for the next plugin run. We will store it in a property (last_ts), and save it to the file. So, that even if software restarted, we don't just ignore it. """ def __init__(self, v1session, ts_file=STATE_FILE, server_id='localhost'): self.ts_file = ts_file self.server_id = server_id self.last_ts = None self.last_uid = -1 self.v1session = v1session self.read_state() def save_ts_to_file(self, ts, uid=None): with open(self.ts_file, 'w', encoding='utf-8') as f: f.write(ts.strftime(self.get_ts_format())) self.last_ts = ts if uid is not None: f.write('\n' + str(uid)) self.last_uid = uid or -1 f.close() @staticmethod def get_ts_format(): return "%Y-%m-%d %H:%M:%S.%f" def save_timestamp(self, ts): self._save_state(ts) def save_uid(self, uid=None): self._save_state(self.last_ts, uid) def _save_state(self, ts, uid=None): try: self.save_ts_to_file(ts, uid) except IOError as e: logging.getLogger('plugin.V1DBMigrator.TimeInterval').error("Unable to save v1 migration TS %s", str(e)) def _read_state(self): ts = None try: with open(self.ts_file, 'r', encoding='utf-8') as f: ts = datetime.strptime(f.readline().rstrip(), self.get_ts_format()) uid = int(f.readline().rstrip() or -1) return ts, uid except IOError: return ts, -1 except ValueError as e: logging.getLogger('plugin.V1DBMigrator.TimeInterval').warning( "Unable to read %s (%s)", self.ts_file, e, ) return ts, -1 def read_state(self): self.last_ts, self.last_uid = self._read_state() if self.last_ts is None: res = ( self.v1session.query(func.max(V1History.created)) .filter(V1History.server_id == self.server_id).first() ) # set very old datetime if no rows in database last_ts_from_db = res[0] or datetime(1, 1, 1) self.last_ts = last_ts_from_db + timedelta(microseconds=1) def _to_ts(self): self.read_state() return self.last_ts - timedelta(microseconds=1) def is_too_old(self): return datetime.now() - timedelta(days=30) > self._to_ts() def get_uid(self): self.read_state() return self.last_uid def convert_username_to_uid(self, username): pass def _get_history_gov_users(self): from_ts, to_ts = self.get_period() from_ts_ = gm_datetime_to_unixtimestamp(from_ts) to_ts_ = gm_datetime_to_unixtimestamp(to_ts) usernames_ = ( self.v1session.query(V1HistoryGov) .filter(V1HistoryGov.ts.between(from_ts_, to_ts_), V1HistoryGov.server_id == self.server_id) .distinct(V1HistoryGov.username) .group_by(V1HistoryGov.username) ) return [item.username for item in usernames_] def _get_history_uids(self): from_ts, to_ts = self.get_period() uids_ = ( self.v1session.query(V1History) .filter( V1History.created.between(from_ts, to_ts), V1History.server_id == self.server_id, V1History.id > self.last_uid, ) .distinct(V1History.id) .group_by(V1History.id) ) return [item.id for item in uids_] def get_uids(self): uids_list = self._get_history_uids() for username in self._get_history_gov_users(): uid = self.convert_username_to_uid(username) if uid is not None and uid > self.last_uid and uid not in uids_list: uids_list.append(uid) return sorted(uids_list) def get_period(self): """We want to go 1 hour at a time, up to 1 month back, starting from now""" to_ts = self._to_ts() from_ts = self.last_ts - timedelta(hours=1) return from_ts, to_ts class Break(Exception): pass class V1DBMigrator(LveStatsPlugin): PLUGIN_LOCATION = '/usr/share/lve-stats/plugins/v1_db_migrator.py' timeout = 18 # change default timeout is_done = False period = 60 # every minute order = 9500 # We pretty much want to be last one standing v1_connect_string = None V1Session = None # We will need it to create session on each execution time_interval = None debug = True skip_on_error = True # What if we cannot save data for some reason, if True, skip it v2_server_id = 'localhost' v1_server_id = 'localhost' def __init__(self): self.log = logging.getLogger('plugin.V1DBMigrator') self._username_to_uid_cache = {} self._no_such_uid_cache = [] self._procs = 1 self.now = 0 # This changes in MainLoop self.log.info("V1 Migration Started") self._time_commit = self.timeout * 0.5 # time limit for stopping plugin self.control_time = True self._conn = None self._database_does_not_exist = False def set_config(self, config): self.v1_server_id = config.get('v1_server_id', 'localhost') self.v2_server_id = config.get('server_id', 'localhost') self.v1_connect_string = config.get('v1_connect_string') self.debug = config.get('debug', 'F').lower() in ('t', 'y', 'true', 'yes', 1) self.init_v1_db() def init_v1_db(self, ts=STATE_FILE): if self.v1_connect_string is None: self._database_does_not_exist = True return # check present sqlite database sqlite = 'sqlite:///' if self.v1_connect_string.startswith(sqlite) and not os.path.exists(self.v1_connect_string[len(sqlite):]): self.log.warning('Database "%s" does not exist', self.v1_connect_string) self._database_does_not_exist = True return # create database engine try: v1_db_engine = sqlalchemy.engine.create_engine(self.v1_connect_string, echo=self.debug) except SQLAlchemyError as e: self.log.warning(str(e)) self._database_does_not_exist = True return # check present history table if not v1_db_engine.dialect.has_table(v1_db_engine, V1History.__tablename__): self.log.warning( 'Table "%s" in database "%s" does not exist', V1History.__tablename__, self.v1_connect_string, ) self._database_does_not_exist = True return result = validate_database(v1_db_engine, hide_logging=True, base=V1Base) if result['column_error'] or result['table_error']: self.log.warning('V1 database malformed, migration skipped.') self._database_does_not_exist = True return self.V1Session = sessionmaker(bind=v1_db_engine) self.time_interval = V1TimeInterval(self.get_v1_session(), ts, self.v1_server_id) self.time_interval.convert_username_to_uid = self.convert_username_to_uid def get_v1_session(self): return self.V1Session() def execute(self, lve_data): self._procs = lve_data.get('procs', 1) if self.is_done: # all data had been migrated return if self._database_does_not_exist or self.time_interval.is_too_old(): self.log.warning("V1 Migration Done") self.cleanup() self.fix_lost_keep_alive_records() else: self.convert_all() def fix_lost_keep_alive_records(self): session = sessionmaker(bind=self.engine)() fix_lost_keep_alive(session, server_id=self.v2_server_id, log_=self.log) session.close() def cleanup(self): """ There is not much to do on clean up. Lets just set flag done = True, and remove plugin so that on next restart it would't be running any more :return: """ self.is_done = True try: os.remove(V1DBMigrator.PLUGIN_LOCATION) # remove compiled python code os.remove(V1DBMigrator.PLUGIN_LOCATION + 'c') except (IOError, OSError) as e: self.log.error("Unable to remove %s: %s", V1DBMigrator.PLUGIN_LOCATION, str(e)) session = sessionmaker(bind=self.engine)() try: session.query(history_x60).filter(history_x60.server_id == self.v2_server_id).delete() session.commit() except SQLAlchemyError: session.rollback() def get_v1_gov_data(self, from_ts, to_ts, username): from_ts_ = gm_datetime_to_unixtimestamp(from_ts) to_ts_ = gm_datetime_to_unixtimestamp(to_ts) return ( self.get_v1_session() .query(V1HistoryGov) .filter( V1HistoryGov.ts.between(from_ts_, to_ts_), V1HistoryGov.username == username, V1HistoryGov.server_id == self.v1_server_id, ) .all() ) def get_v1_data(self, from_ts, to_ts, uid): return ( self.get_v1_session() .query(V1History) .filter( V1History.created.between(from_ts, to_ts), V1History.server_id == self.v1_server_id, V1History.id == uid ) .order_by(V1History.id) .all() ) def _convert_data(self, from_ts, to_ts, uid, trans): username = self.convert_uid_to_username(uid) try: v2_rows_insert_list = [] for row in self.get_v1_data(from_ts, to_ts, uid): v2_rows = self.convert_row(row, self._procs) v2_rows_insert_list.extend(v2_rows) if v2_rows_insert_list: for chunk in get_chunks(v2_rows_insert_list): self._conn.execute(insert(history), chunk) v2_gov_rows_insert_list = [] if username and username != 'root': # ignore uid 0 (root) for row in self.get_v1_gov_data(from_ts, to_ts, username): v2_gov_rows = self.convert_gov_row(row) v2_gov_rows_insert_list.extend(v2_gov_rows) if v2_gov_rows_insert_list: for chunk in get_chunks(v2_gov_rows_insert_list): self._conn.execute(insert(history_gov), chunk) except (SQLAlchemyError, DatabaseError) as e: trans.rollback() self.log.warning('Can not save data to database: %s', str(e)) if not self.skip_on_error: raise e except LveStatsPluginTerminated as e: trans.commit() self.log.debug("Plugin is terminated.") raise Break() from e def _work_time(self): return time.time() - self.now # calculate plugin working time def _need_break(self): return self.timeout - self._work_time() < self._time_commit * 1.2 def convert_data(self, from_ts, to_ts): self.log.debug('Start converting from %s to %s', from_ts, to_ts) uids = self.time_interval.get_uids() # obtain uids need convert if not uids: return trans = self._conn.begin() for uid in uids: self._convert_data(from_ts, to_ts, uid, trans) self.time_interval.save_uid(uid) self.log.debug( 'Converted from %s to %s uid: %s; plugin work time %s', from_ts, to_ts, uid, self._work_time(), ) # control plugin work time if self.control_time and self._need_break(): self.log.debug( 'Stop converting; plugin work time %s', self._work_time(), ) raise Break() if trans.is_active: trans.commit() def convert_all(self): with self.engine.begin() as self._conn: try: while not self._need_break() and not self.time_interval.is_too_old(): from_ts, to_ts = self.time_interval.get_period() self.convert_data(from_ts, to_ts) self.time_interval.save_timestamp(from_ts) # save timestamp if not breacke cycle only except Break: # for break all cycles pass time_start = time.time() commit_time = time.time() - time_start self._time_commit = max(self._time_commit, commit_time) self.log.debug('Commit time %s', commit_time) @staticmethod def fault_count(limit, _max): if limit == _max: return 1 else: return 0 @staticmethod def convert_iops_faults(v1_row, v2_row): # v1 & v2 store IOPS the same way, but faults are not tracked in v1 v2_row['iops_fault'] = V1DBMigrator.fault_count(v1_row.liops, v1_row.iops_max) @staticmethod def convert_io(v1_row, v2_row): # v1 stores IO in KB/s, v2 in B/s v2_row['io'] = v1_row.io * 1024 v2_row['io_limit'] = v1_row.io_limit * 1024 v2_row['io_fault'] = V1DBMigrator.fault_count(v1_row.io_limit, v1_row.io_max) @staticmethod def convert_cpu_(procs, cpu, cpu_limit, cpu_max, ncpu): """ v1 holds CPU relative to total cores, where on 4 core system 1 core is 25% it also limits by ncpu (whatever is less), so on 4 cores system 2 ncpu and 30% is 30% of all cores (as 2ncpu = 50%, and we take smaller), and 2 ncpu and 70% is 50%, as 2ncpu = 50% / we take smaller To switch to new limit, we need to talke old limit and multiply it by 100 So 25% on 4 core system in v1 (1 core), is 25 * 4 * 100 = 10,000 """ v2_cpu_limit = min(100 * cpu_limit * procs, ncpu * 100 * 100) # no matter what mistake we make, lets not ever set CPU usage > CPU limit v2_cpu = min(v2_cpu_limit, cpu * procs * 100) # if cpu_limit == cpu_max, lets consider it to be a fault, note we loose precision # anyway, so if weight was 60, we will add 60 faults... oh well. v2_cpu_faults = V1DBMigrator.fault_count(v2_cpu_limit, 100 * cpu_max * procs) return v2_cpu, v2_cpu_limit, v2_cpu_faults def convert_cpu(self, row, v2_row, procs): v2_row['cpu'], v2_row['cpu_limit'], v2_row['cpu_fault'] = self.convert_cpu_( procs, row.cpu, row.cpu_limit, row.cpu_max, row.ncpu ) def convert_username_to_uid(self, username): if username in self._username_to_uid_cache: return self._username_to_uid_cache[username] uid = uidconverter.username_to_uid_local(username) self._username_to_uid_cache[username] = uid if uid is None: self.log.warning('Can not find uid for user %s', username) return uid def convert_uid_to_username(self, uid): if uid in self._no_such_uid_cache: return for username_, uid_ in self._username_to_uid_cache.items(): if uid == uid_: return username_ username_ = uidconverter.uid_to_username_local(uid) if username_ is None: self._no_such_uid_cache.append(uid) self.log.warning('Can not find user name for uid %s', uid) else: self._username_to_uid_cache[username_] = uid return username_ def convert_gov_row(self, row): to_ts = row.ts result = [] for i in range(0, row.weight): v2_gov_row = {'server_id': self.v2_server_id, 'ts': to_ts - 60 * i} for key in V2_GOV_KEYS: v2_gov_row[key] = getattr(row, key) uid = self.convert_username_to_uid(v2_gov_row.pop('username')) if uid: v2_gov_row['uid'] = uid result.append(v2_gov_row) return result def convert_row(self, row, procs): to_ts = gm_datetime_to_unixtimestamp(row.created) result = [] for i in range(0, row.weight): v2_row = {'server_id': self.v2_server_id, 'created': to_ts - 60 * i} for key in V2_KEYS: v2_row[key] = getattr(row, key) self.convert_cpu(row, v2_row, procs) self.convert_io(row, v2_row) self.convert_iops_faults(row, v2_row) result.append(v2_row) return result