D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clquota
/
Filename :
cl_quota.py
back
Copy
#!/opt/cloudlinux/venv/bin/python3 -bb # -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import print_function from __future__ import division from __future__ import absolute_import import os import sys import re import getopt import pwd import logging import logging.handlers from cldetectlib import get_boolean_param, CL_CONFIG_FILE from clquota import ( QuotaWrapper, NoSuchPackageException, NoSuchUserException, IncorrectLimitFormatException, InsufficientPrivilegesException, GeneralException, QuotaDisabledException ) from clquota.utils import ( print_text, print_csv, print_json, print_text_error, print_csv_error, print_json_error ) import logging logger_d = logging.getLogger(__name__) logger_d.setLevel(logging.DEBUG) # fh = logging.FileHandler('/root/cl_quota.log') # fh.setLevel(logging.DEBUG) # fh.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) # logger_d.addHandler(fh) def check_username_and_convert_to_uid(username=None): ''' Checks if username is valid one and returns UID as string ''' if username.lower() == 'default': return '0' try: uid = pwd.getpwnam(username).pw_uid return str(uid) except KeyError: raise NoSuchUserException(username) def check_uid(uid=None): ''' Checks if UID is valid one and retuns it as string ''' p = re.compile(r'(\d+)') pm = p.search(uid) if not pm: raise NoSuchUserException(uid) uid = pm.group(1) if uid == '0': # this is default. No need to check further return uid try: pwd.getpwuid(int(uid)) except KeyError: raise NoSuchUserException(uid) return uid def setup_system_log(): """ Sets syslog logger and returns its handler """ logger = logging.getLogger('clquota') logger.setLevel(logging.INFO) syslogger = logging.handlers.SysLogHandler(address='/dev/log') formatter = logging.Formatter(fmt='%(name)s:%(levelname)s %(message)s') syslogger.setFormatter(formatter) logger.addHandler(syslogger) return logger def usage(): print('') print('Usage: ' + sys.argv[0] + ' [OPTIONS]') print('If no options given prints quota statistics for all users') print('Options:') print('') print(' -u | --user : specifies the user') print(' -U | --user-id : specifies the user ID') print(' -S | --soft-limit : sets the soft limit for a user. ' 'Pass 0 or \'default\' to set package default limit. Pass -1 or \'unlimited\' to cancel limit') print(' -H | --hard-limit : sets the hard limit for a user. ' 'Pass 0 or \'default\' to set package default limit. Pass -1 or \'unlimited\' to cancel limit') print(' -V | --csv : returns data as comma separated values') print(' -J | --json : returns data as json') print(' -p | --package : specifies a package to set or get limits') print(' -P | --package-limits : prints package limits') print(' -a | --all-package-limits : prints all package limits (including packages without limits)') print(' -Y | --sync : synchronizes packages and users limits with the database') print(' -C | --cache-content : cache quota data to a file the database') print(' -F | --force : save user quotas even when they are equal to defaults') print(' --check : Deprecated. Check if quotas is enabled/activated/suported; ' 'if disabled show diagnostic information; using with --user or --user-id options') def quota_sync(): """ Quota synchronization :return: """ q = QuotaWrapper() q.synchronize() q.save_user_cache() def check_autosync_disabled(): """ Check autosynchronization parameter in config file and exit if it`s disabled :return: """ if not get_boolean_param(CL_CONFIG_FILE, 'cl_quota_limits_autosync', default_val=True): print("cl-quota limits autosynchronization is disabled in config " + CL_CONFIG_FILE) sys.exit(0) def parse_args(argv_): try: opts, args = getopt.getopt(argv_, 'PaVJYCp:u:U:S:H:F', ['package-limits', 'all-package-limits', 'csv', 'json', 'package=', 'user-id=', 'user=', 'soft-limit=', 'hard-limit=', 'sync', 'cache', 'check', 'profiling', 'force']) except getopt.GetoptError: usage() sys.exit(1) config = {} config['soft'] = None config['hard'] = None config['user'] = None config['user-id'] = None config['format'] = 'text' config['check'] = False config['profiling'] = False config['force_save'] = False for o, a in opts: if o in ['-P', '--package-limits']: config['package'] = None elif o in ['-a', '--all-package-limits']: config['all-package'] = None elif o in ['-p', '--package']: config['package'] = a elif o in ['-u', '--user']: config['user'] = a elif o in ['-U', '--user-id']: config['user-id'] = a elif o in ['-S', '--soft-limit']: config['soft'] = a elif o in ['-H', '--hard-limit']: config['hard'] = a elif o in ['-Y', '--sync']: config['sync'] = True elif o in ['-C', '--cache']: config['cache'] = True elif o == '--check': config['check'] = True elif o in ['-V', '--csv']: config['format'] = 'csv' elif o in ['-J', '--json']: config['format'] = 'json' elif o in ['--profiling']: config['profiling'] = True elif o in ['-F', '--force']: config['force_save'] = True return config def main(*argv_): output_dispatcher = { 'text': print_text, 'csv': print_csv, 'json': print_json, 'err_text': print_text_error, 'err_csv': print_csv_error, 'err_json': print_json_error } config = parse_args(argv_) try: if 'sync' in config and 'cache' in config: # exit in case synchronization is disabled in config file check_autosync_disabled() # We don't want to do anything if no saved data if not os.path.exists(QuotaWrapper.DATAFILE): return if config['profiling']: # profiler mode profiler_log = 'quota-profiling.log' print("Profiling end; result saved to " + profiler_log) import profile profile.run('quota_sync()', profiler_log) # Profiler result decode import pstats p = pstats.Stats(profiler_log) print('--------------------------------------') print() print('Cumulative time:') # p.sort_stats('cumulative').print_stats(20) p.sort_stats('calls').print_stats(20) print('--------------------------------------') print() print('Total time:') # p.sort_stats('time').print_stats(20) p.sort_stats('calls').print_stats(20) else: # work without profiler quota_sync() return q = QuotaWrapper() if 'package' in config and not (config['soft'] or config['hard']): if config['package'] is None: # Show limits for all used packages output_dispatcher[config['format']](q.get_package_limits(None), title='package') else: # Show limits only for given package output_dispatcher[config['format']](q.get_all_packages_limits(config['package']), title='package') return if 'all-package' in config: # Show all packages output_dispatcher[config['format']](q.get_all_packages_limits(), title='package') return uid = None if config['user']: uid = check_username_and_convert_to_uid(config['user']) elif config['user-id']: uid = check_uid(config['user-id']) if config['check'] and uid is not None and int(uid) != 0: q._check_if_quota_enabled(uid) if config['soft'] or config['hard']: if 'package' in config and config['package'] is not None: # Set package limits q.set_package_limit( package=config['package'], soft=config['soft'], hard=config['hard']) return # Set user's limits q.set_user_limit(uid=uid, soft=config['soft'], hard=config['hard'], force_save=config['force_save']) return if 'sync' in config: q.synchronize() return if 'cache' in config: q.save_user_cache() return if uid: if not config['check']: output_dispatcher[config['format']](q.get_user_limits(uid), title='id', sort_key=int) else: output_dispatcher[config['format']](q.get_all_users_limits(), title='id', sort_key=int) except QuotaDisabledException as e: if config['check']: print(str(e)) except (NoSuchPackageException, NoSuchUserException, InsufficientPrivilegesException, IncorrectLimitFormatException, GeneralException) as e: output_dispatcher['err_' + config['format']](e) setup_system_log().info(e) except Exception as e: output_dispatcher['err_' + config['format']](e) setup_system_log().error(e) if __name__ == '__main__': main(*sys.argv[1:])