D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clcommon
/
Filename :
clfunc.py
back
Copy
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT # import os import re import signal import subprocess import sys LVE_FILE = '/proc/lve/list' # GET VERSION from /proc/lve/list def get_lve_version(): """ Obtain lve process filesystem version """ try: with open(LVE_FILE, encoding='utf-8') as f: line = f.read(3) # we need only first three symbols for parse version lve_procfs_version = [int(line.rsplit(':', 1)[0]), 'OK'] except IOError: lve_procfs_version = [None, f'clcommon: get_lve_version: Can`t open file {LVE_FILE}'] except IndexError: lve_procfs_version = [None, f'clcommon: get_lve_version: Can`t get data from {LVE_FILE}'] except ValueError: lve_procfs_version = [None, f'clcommon: get_lve_version: Can`t parse {LVE_FILE}'] return lve_procfs_version BYTES_CONVERSION_TABLE = { 'K': 1, 'M': 1024, 'G': 1024 * 1024, 'T': 1024 * 1024 * 1024 } def validate_cpu(val: str | int) -> str | int | None: """ Check that the value is a valid CPU limit (0-100 int or speed (% or MHz/GHz)). Return the value if valid, otherwise None. If the value contains decimal part, return only integer part. """ data = str(val) # Validate integer CPU limits regexp_int = re.compile(r'^([1-9][0-9]?|100)(?:\.(\d+))?$') regex_match = regexp_int.match(data) if regex_match: if isinstance(val, (float, int)): return int(val) return regex_match.group(1) # return only integer part # Validate percentage speeds regexp_speedp = re.compile(r'^([1-9]|[1-9][0-9]*)(?:\.(\d+))?%$') regex_match = regexp_speedp.match(data) if regex_match: return f'{regex_match.group(1)}%' # Validate frequency speeds regexp_speedf = re.compile(r'^([1-9]|[1-9][0-9]*)(?:\.(\d+))?(mhz|ghz)$', re.IGNORECASE) regex_match = regexp_speedf.match(data) if regex_match: integer_part, _, unit = regex_match.groups() return f'{integer_part}{unit}' return None def validate_int(val, min_val=0, max_val=sys.maxsize): """ Check that val - is a string number return val as a string """ try: dig_val = int(val) except ValueError: return None if max_val >= dig_val >= min_val: return val def memory_to_page(val, min_val=0, max_val=sys.maxsize): try: suffix = val[-1] if suffix.isdigit(): suffix = 'K' val = val + suffix result = int(float(val[:-1]) * BYTES_CONVERSION_TABLE[suffix.upper()] / 4) if max_val >= result >= min_val: return result except (IndexError, ValueError, KeyError): pass return None def page_to_memory(pages): if pages < 256: # KB return str(pages * 4) + 'K' if pages < 262144: # MB return str(round(float(pages) * 4 / 1024, 2)) + 'M' return str(round(float(pages) * 4 / (1024 * 1024), 2)) + 'G' def reload_processes(item, username): with subprocess.Popen( ['/bin/ps', '-U', username, '-u', username], stdout=subprocess.PIPE, ) as proc: lines = proc.communicate()[0].split(b"\n") for row in lines: parts = row.rstrip().split() try: parts[-1].index(item.encode()) os.kill(int(parts[0]), signal.SIGHUP) except (IndexError, ValueError, OSError): continue def login_defs(key, default=None, _path='/etc/login.defs'): with open(_path, encoding='utf-8') as login_defs: for raw_line in login_defs: if raw_line.startswith('#'): continue # ignore commented lines line = raw_line.split('#', 1)[0] # clear line of comments line_splited = line.split() if len(line_splited) >= 2 and line_splited[0] == key: return line_splited[1] return default def uid_max(default=60000): try: uid_max_ = int(login_defs('UID_MAX')) except (IOError, ValueError, TypeError): uid_max_ = default return uid_max_ def exit_with_error(message, status=1): sys.stderr.write(f"{message}\n") sys.exit(status) def safe_escaped_unicode_to_utf(s): # transforms unicode-escaped string into utf-8 encoded str if '\\u' in s: # str should have escaped unicode symbols try: s = s.decode('unicode_escape').encode('utf-8') except UnicodeDecodeError: pass return s # Copy/paste helper to convert unicode results of json.load back to str # https://stackoverflow.com/questions/956867/how-to-get-string-objects-instead-of-unicode-from-json/13105359#13105359 def byteify(data): if isinstance(data, dict): return {byteify(key): byteify(value) for key, value in data.items()} elif isinstance(data, list): return [byteify(element) for element in data] elif isinstance(data, tuple): return tuple(byteify(element) for element in data) elif isinstance(data, str): return data.encode() else: return data def unicodeify(data): if isinstance(data, dict): return {unicodeify(key): unicodeify(value) for key, value in data.items()} elif isinstance(data, list): return [unicodeify(element) for element in data] elif isinstance(data, tuple): return tuple(unicodeify(element) for element in data) elif isinstance(data, bytes): return data.decode() else: return data def is_ascii_string(s): """ Check is string contains only ASCII characters :param s: string to check :return: True - string contains only ASCII characters """ try: s.encode(encoding='utf-8').decode('ascii') except UnicodeDecodeError: return False else: return True def escape_formatting_chars(text: str) -> str: """ Escape '%' characters inside text, except '%' followed by '(' """ def replace(match_obj): """ Generate string to replace from matched string '% ' -> '%% ' '%%c' -> '%%%%c' """ return match_obj.group()[:-1] * 2 + match_obj.group()[-1] # twice all '%' inside text except '%' followed by '(' and '%' at the end # '% hello' -> '%% hello' # '%% hello' -> '%%%% hello' text = re.sub(r"%+([^(])", replace, text) # replace '%' at the end text = re.sub(r"%$", r"%%", text) return text