D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
thread-self
/
root
/
proc
/
self
/
root
/
proc
/
thread-self
/
root
/
usr
/
local
/
lsws
/
lsns
/
bin
/
Filename :
lsnsctl
back
Copy
#!/usr/bin/python3 import argparse, subprocess, sys, os, logging, json import common from stat import * from subprocess import PIPE def command_requires_uid(command): return command in ['disable-uid', 'enable-uid', 'set-min-uid', 'unmount'] def command_uses_uid(command): return command in ['disable-uid', 'enable-uid', 'list-disabled-uids', 'list-mount', 'set-min-uid', 'unmount'] def validate_environment(): if os.getuid() != 0: common.fatal_error("this program must be run as root") common.ls_ok() def init_pgm(): common.init_logging() parser = argparse.ArgumentParser(prog="lsnsctl", description='LiteSpeed Namespace Control Program') parser.add_argument("command", type=str, help="The namespace command", choices=['disable-uid', 'enable-uid', 'get-min-uid', 'list-disabled-uids', 'list-mount', 'set-min-uid', 'unmount', 'unmount-all', 'version']) parser.add_argument("--uid", type=str, help="uid or user name for set-min-uid and unmount commands") parser.add_argument('-l', '--log', type=int, help='set logging level, 10=Debug, 20=Info, 30=Warning, 40=Error. Default is Info') parser.add_argument('-q', '--quiet', action='store_true', help='turns off all logging and only outputs what is requested.') args = parser.parse_args() if not args.quiet or args.log != None: if args.log != None: logging.getLogger().setLevel(args.log) else: logging.getLogger().setLevel(logging.INFO) logging.debug("Entering lscgctl") validate_environment() command = args.command if command_requires_uid(command) and args.uid == None: common.fatal_error("You must specify a --uid for the %s command" % command) if command_uses_uid(command) and args.uid != None: user, found = common.get_user(args.uid) else: user = None if command == 'version': logging.info("Version: %s" % common.VERSION) if args.quiet: print(common.VERSION) sys.exit(0) return args, user def command_get_min_uid(args): uid = common.get_min_uid() if args.quiet: print('%d' % uid) else: logging.info("Minimum UID: %d" % uid) def command_set_min_uid(args, user): fullfile = common.get_conf_file('lsns.conf') try: f = open(fullfile, 'w') except Exception as err: common.fatal_error('Error opening %s: %s' % (fullfile, err)) try: f.write('%d\n' % user.pw_uid) f.close() except Exception as err: common.fatal_error('Error writing %s: %s' % (fullfile, err)) command_get_min_uid(args) def get_mounted(): result = subprocess.run(['mount'], stdout=PIPE, stderr=PIPE) if result.returncode != 0: common.fatal_error('Error in running: mount, errors: ' + result.stdout.decode('utf-8') + ' ' + result.stderr.decode('utf-8')) mounted = {} stdout_lines = result.stdout.decode('utf-8').split('\n') for line in stdout_lines: logging.debug('mount line: %s' % line) index = line.find("/var/lsns/") if index == -1: continue remainder = line[(index + 10):] logging.debug(' After prefix: %s' % remainder) components = remainder.split('/') if len(components) < 2: continue logging.debug(' UID: %s, vhost: %s' % (components[0], components[1])) if not components[0] in mounted: logging.debug(' New UID/vhost') mounted[components[0]] = [ components[1] ] elif components[1] in mounted[components[0]]: logging.debug(' Existing UID/vhost') else: logging.debug(' New vhost only') mounted[components[0]].append(components[1]) return mounted def command_list_mount(args, user): mounted = get_mounted() if user == None: print(json.dumps(mounted)) else: if str(user.pw_uid) in mounted: mount = { str(user.pw_uid): mounted[str(user.pw_uid)]} else: mount = {} print(json.dumps(mount)) def get_disabled_uids(missing_ok): try: f = open(common.get_disabled_uid_file(), "r") uids_raw = f.read() f.close() except Exception as err: if not missing_ok: common.fatal_error('Error opening UID file: %s' % err) else: return {} lines = uids_raw.split('\n') uids = {} for line in lines: if line in uids or line == '': continue uids[line] = True if not missing_ok and len(uids) == 0: common.fatal_error("No uids in disabled UIDs file") return uids def write_disabled_uids(uids): try: f = open(common.get_disabled_uid_file(), "w") for k in uids: f.write("%s\n" % k) f.close() except Exception as err: common.fatal_error('Error writing UID file: %s' % err) def command_disable_uid(args, user): uids = get_disabled_uids(True) if str(user.pw_uid) in uids: if args.quiet: sys.exit(0) logging.info('UID %s already disabled' % str(user.pw_uid)) sys.exit(0) uids[str(user.pw_uid)] = True write_disabled_uids(uids) common.restart_external([user], False) if not args.quiet: logging.info("Write %s to disabled uid list" % str(user.pw_uid)) def command_list_disabled_uids(args, user): uids = get_disabled_uids(True) uids = sorted(uids) print(json.dumps(uids)) def command_enable_uid(args, user): uids = get_disabled_uids(True) if not str(user.pw_uid) in uids: if args.quiet: sys.exit(0) logging.info('UID %s already enabled' % str(user.pw_uid)) sys.exit(0) del uids[str(user.pw_uid)] if len(uids) == 0: os.remove(common.get_disabled_uid_file()) if not args.quiet: logging.info('Deleting last disabled uid %s from file' % str(user.pw_uid)) else: write_disabled_uids(uids) if not args.quiet: logging.info("Deleting %s from disabled uid list", str(user.pw_uid)) common.restart_external([user], False) def command_unmount(args, user): unmount_args = [common.get_bin_file('unmount_ns')] if args.command == 'unmount-all': unmount_args.append('-a') else: unmount_args.append('-u') unmount_args.append(str(user.pw_uid)) result = subprocess.run(unmount_args, stdout=PIPE, stderr=PIPE) if result.returncode != 0: common.fatal_error('Error in running: unmount, errors: ' + result.stdout.decode('utf-8') + ' ' + result.stderr.decode('utf-8')) if not args.quiet: print(result.stdout.decode('utf-8')) print(result.stderr.decode('utf-8')) if args.command == 'unmount-all': common.restart_external([], True) else: common.restart_external([user], False) def do_pgm(args, user): logging.debug("Entering lsnsctl, command: %s" % args.command) if 'get-min-uid' == args.command: command_get_min_uid(args) elif 'list-mount' == args.command: command_list_mount(args, user) elif 'disable-uid' == args.command: command_disable_uid(args, user) elif 'enable-uid' == args.command: command_enable_uid(args, user) elif 'list-disabled-uids' == args.command: command_list_disabled_uids(args, user) elif 'set-min-uid' == args.command: command_set_min_uid(args, user) elif 'unmount' == args.command or 'unmount-all' == args.command: command_unmount(args, user) logging.debug("Exiting lsnsctl") def main(): args, user = init_pgm() return do_pgm(args, user) if __name__ == "__main__": main()