D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
pytest_subprocess
/
Filename :
process_dispatcher.py
back
Copy
"""ProcessDispatcher class declaration""" import asyncio import subprocess import sys import typing from collections import deque from copy import deepcopy from functools import partial from typing import Any as AnyType from typing import Awaitable from typing import Callable from typing import Deque from typing import Dict from typing import List from typing import Optional from typing import Tuple from typing import Type from typing import Union from . import asyncio_subprocess from . import exceptions from .fake_popen import AsyncFakePopen from .fake_popen import FakePopen from .types import COMMAND from .utils import Command if typing.TYPE_CHECKING: from .fake_process import FakeProcess class ProcessDispatcher: """Main class for handling processes.""" process_list: List["FakeProcess"] = [] built_in_popen: Optional[Callable] = None built_in_async_subprocess: Optional[AnyType] = None _allow_unregistered: bool = False _cache: Dict["FakeProcess", Dict["FakeProcess", AnyType]] = dict() _keep_last_process: bool = False _pid: int = 0 @classmethod def register(cls, process: "FakeProcess") -> None: if not cls.process_list: cls.built_in_popen = subprocess.Popen subprocess.Popen = cls.dispatch # type: ignore cls.built_in_async_subprocess = asyncio.subprocess asyncio.create_subprocess_shell = cls.async_shell # type: ignore asyncio.create_subprocess_exec = cls.async_exec # type: ignore asyncio.subprocess = asyncio_subprocess cls._cache[process] = { proc: deepcopy(proc.definitions) for proc in cls.process_list } cls.process_list.append(process) @classmethod def deregister(cls, process: "FakeProcess") -> None: cls.process_list.remove(process) cache = cls._cache.pop(process) for proc, processes in cache.items(): proc.definitions = processes if not cls.process_list: subprocess.Popen = cls.built_in_popen # type: ignore cls.built_in_popen = None if cls.built_in_async_subprocess is None: raise exceptions.PluginInternalError asyncio.subprocess = cls.built_in_async_subprocess asyncio.create_subprocess_shell = ( cls.built_in_async_subprocess.create_subprocess_shell ) asyncio.create_subprocess_exec = ( cls.built_in_async_subprocess.create_subprocess_exec ) cls.built_in_async_subprocess = None @classmethod def dispatch( cls, command: COMMAND, **kwargs: Optional[Dict] ) -> Union[FakePopen, subprocess.Popen]: """This method will be used instead of the subprocess.Popen()""" process = cls.__dispatch(command) if process is None: if cls.built_in_popen is None: raise exceptions.PluginInternalError popen: subprocess.Popen = cls.built_in_popen(command, **kwargs) return popen result = cls._prepare_instance(FakePopen, command, kwargs, process) if not isinstance(result, FakePopen): raise exceptions.PluginInternalError result.run_thread() return result @classmethod async def async_shell( cls, cmd: Union[str, bytes], **kwargs: Dict ) -> Union[AsyncFakePopen, asyncio.subprocess.Process]: """Replacement of asyncio.create_subprocess_shell()""" if not isinstance(cmd, (str, bytes)): raise ValueError("cmd must be a string") method = partial( cls.built_in_async_subprocess.create_subprocess_shell, # type: ignore cmd, **kwargs ) if isinstance(cmd, bytes): cmd = cmd.decode() return await cls._call_async(cmd, method, kwargs) @classmethod async def async_exec( cls, program: Union[str, bytes], *args: Union[str, bytes], **kwargs: Dict ) -> Union[AsyncFakePopen, asyncio.subprocess.Process]: """Replacement of asyncio.create_subprocess_exec()""" if not isinstance(program, (str, bytes)): raise ValueError("program must be a string") method = partial( cls.built_in_async_subprocess.create_subprocess_exec, # type: ignore program, *args, **kwargs ) if isinstance(program, bytes): program = program.decode() command = [ program, *[arg.decode() if isinstance(arg, bytes) else arg for arg in args], ] return await cls._call_async(command, method, kwargs) @classmethod async def _call_async( cls, command: COMMAND, async_method: Callable, kwargs: Dict, ) -> Union[AsyncFakePopen, asyncio.subprocess.Process]: process = cls.__dispatch(command) if process is None: if cls.built_in_async_subprocess is None: raise exceptions.PluginInternalError async_shell: Awaitable[asyncio.subprocess.Process] = async_method() return await async_shell if sys.platform == "win32" and isinstance( asyncio.get_event_loop_policy().get_event_loop(), asyncio.SelectorEventLoop ): raise NotImplementedError( "The SelectorEventLoop doesn't support subprocess" ) result = cls._prepare_instance(AsyncFakePopen, command, kwargs, process) if not isinstance(result, AsyncFakePopen): raise exceptions.PluginInternalError result.run_thread() return result @classmethod def _prepare_instance( cls, klass: Union[Type[FakePopen], Type[AsyncFakePopen]], command: COMMAND, kwargs: dict, process: dict, ) -> Union[FakePopen, AsyncFakePopen]: # Update the command with the actual command specified by the caller. # This will ensure that Command objects do not end up unexpectedly in # caller's objects (e.g. proc.args, CalledProcessError.cmd). Take care # to preserve the dict that may still be referenced when using # keep_last_process. fake_popen_kwargs = process.copy() fake_popen_kwargs["command"] = command recorder = fake_popen_kwargs.pop("recorder") result = klass(**fake_popen_kwargs) recorder.calls.append(result) result.pid = cls._pid result.configure(**kwargs) return result @classmethod def __dispatch(cls, command: COMMAND) -> Optional[dict]: command_instance, processes, process_instance = cls._get_process(command) if process_instance: process_instance.calls.append(command) if not processes: if not cls.process_list[-1]._allow_unregistered: raise exceptions.ProcessNotRegisteredError( "The process '%s' was not registered." % ( command if isinstance(command, str) else " ".join(str(item) for item in command), ) ) else: return None process = processes.popleft() if not processes and process_instance is not None: if cls.process_list[-1]._keep_last_process: processes.append(process) elif command_instance: del process_instance.definitions[command_instance] cls._pid += 1 if isinstance(process, bool): # real process will be called return None return process @classmethod def _get_process( cls, command: COMMAND ) -> Tuple[ Optional[Command], Optional[Deque[Union[dict, bool]]], Optional["FakeProcess"] ]: for proc in reversed(cls.process_list): command_instance, processes = next( ( (key, value) for key, value in proc.definitions.items() if key == command ), (None, None), ) process_instance = proc if processes and isinstance(processes, deque): return command_instance, processes, process_instance return None, None, None