D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
cloudlinux
/
venv
/
lib64
/
python3.11
/
site-packages
/
sqlalchemy
/
testing
/
Filename :
replay_fixture.py
back
Copy
from collections import deque import contextlib import types from . import config from . import fixtures from . import profiling from .. import create_engine from .. import MetaData from .. import util from ..orm import Session class ReplayFixtureTest(fixtures.TestBase): @contextlib.contextmanager def _dummy_ctx(self, *arg, **kw): yield def test_invocation(self): dbapi_session = ReplayableSession() creator = config.db.pool._creator def recorder(): return dbapi_session.recorder(creator()) engine = create_engine( config.db.url, creator=recorder, use_native_hstore=False ) self.metadata = MetaData(engine) self.engine = engine self.session = Session(engine) self.setup_engine() try: self._run_steps(ctx=self._dummy_ctx) finally: self.teardown_engine() engine.dispose() def player(): return dbapi_session.player() engine = create_engine( config.db.url, creator=player, use_native_hstore=False ) self.metadata = MetaData(engine) self.engine = engine self.session = Session(engine) self.setup_engine() try: self._run_steps(ctx=profiling.count_functions) finally: self.session.close() engine.dispose() def setup_engine(self): pass def teardown_engine(self): pass def _run_steps(self, ctx): raise NotImplementedError() class ReplayableSession(object): """A simple record/playback tool. This is *not* a mock testing class. It only records a session for later playback and makes no assertions on call consistency whatsoever. It's unlikely to be suitable for anything other than DB-API recording. """ Callable = object() NoAttribute = object() if util.py2k: Natives = set( [getattr(types, t) for t in dir(types) if not t.startswith("_")] ).difference( [ getattr(types, t) for t in ( "FunctionType", "BuiltinFunctionType", "MethodType", "BuiltinMethodType", "LambdaType", "UnboundMethodType", ) ] ) else: Natives = ( set( [ getattr(types, t) for t in dir(types) if not t.startswith("_") ] ) .union( [ type(t) if not isinstance(t, type) else t for t in __builtins__.values() ] ) .difference( [ getattr(types, t) for t in ( "FunctionType", "BuiltinFunctionType", "MethodType", "BuiltinMethodType", "LambdaType", ) ] ) ) def __init__(self): self.buffer = deque() def recorder(self, base): return self.Recorder(self.buffer, base) def player(self): return self.Player(self.buffer) class Recorder(object): def __init__(self, buffer, subject): self._buffer = buffer self._subject = subject def __call__(self, *args, **kw): subject, buffer = [ object.__getattribute__(self, x) for x in ("_subject", "_buffer") ] result = subject(*args, **kw) if type(result) not in ReplayableSession.Natives: buffer.append(ReplayableSession.Callable) return type(self)(buffer, result) else: buffer.append(result) return result @property def _sqla_unwrap(self): return self._subject def __getattribute__(self, key): try: return object.__getattribute__(self, key) except AttributeError: pass subject, buffer = [ object.__getattribute__(self, x) for x in ("_subject", "_buffer") ] try: result = type(subject).__getattribute__(subject, key) except AttributeError: buffer.append(ReplayableSession.NoAttribute) raise else: if type(result) not in ReplayableSession.Natives: buffer.append(ReplayableSession.Callable) return type(self)(buffer, result) else: buffer.append(result) return result class Player(object): def __init__(self, buffer): self._buffer = buffer def __call__(self, *args, **kw): buffer = object.__getattribute__(self, "_buffer") result = buffer.popleft() if result is ReplayableSession.Callable: return self else: return result @property def _sqla_unwrap(self): return None def __getattribute__(self, key): try: return object.__getattribute__(self, key) except AttributeError: pass buffer = object.__getattribute__(self, "_buffer") result = buffer.popleft() if result is ReplayableSession.Callable: return self elif result is ReplayableSession.NoAttribute: raise AttributeError(key) else: return result