D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
self
/
root
/
opt
/
hc_python
/
lib
/
python3.8
/
site-packages
/
mysql
/
connector
/
fabric
/
Filename :
connection.py
back
Copy
# MySQL Connector/Python - MySQL driver written in Python. # Copyright (c) 2013, 2015, Oracle and/or its affiliates. All rights reserved. # MySQL Connector/Python is licensed under the terms of the GPLv2 # <http://www.gnu.org/licenses/old-licenses/gpl-2.0.html>, like most # MySQL Connectors. There are special exceptions to the terms and # conditions of the GPLv2 as it is applied to this software, see the # FOSS License Exception # <http://www.mysql.com/about/legal/licensing/foss-exception.html>. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """Implementing communication with MySQL Fabric""" import sys import datetime import time import uuid from base64 import b16decode from bisect import bisect from hashlib import md5 import logging import socket import collections # pylint: disable=F0401,E0611 try: from xmlrpclib import Fault, ServerProxy, Transport import urllib2 from httplib import BadStatusLine except ImportError: # Python v3 from xmlrpc.client import Fault, ServerProxy, Transport import urllib.request as urllib2 from http.client import BadStatusLine if sys.version_info[0] == 2: try: from httplib import HTTPSConnection except ImportError: HAVE_SSL = False else: HAVE_SSL = True else: try: from http.client import HTTPSConnection except ImportError: HAVE_SSL = False else: HAVE_SSL = True # pylint: enable=F0401,E0611 import mysql.connector from ..connection import MySQLConnection from ..conversion import MySQLConverter from ..pooling import MySQLConnectionPool from ..errors import ( Error, InterfaceError, NotSupportedError, MySQLFabricError, InternalError, DatabaseError ) from ..cursor import ( MySQLCursor, MySQLCursorBuffered, MySQLCursorRaw, MySQLCursorBufferedRaw ) from .. import errorcode from . import FabricMySQLServer, FabricShard from .caching import FabricCache from .balancing import WeightedRoundRobin from .. import version from ..catch23 import PY2, isunicode, UNICODE_TYPES RESET_CACHE_ON_ERROR = ( errorcode.CR_SERVER_LOST, errorcode.ER_OPTION_PREVENTS_STATEMENT, ) # Errors to be reported to Fabric REPORT_ERRORS = ( errorcode.CR_SERVER_LOST, errorcode.CR_SERVER_GONE_ERROR, errorcode.CR_CONN_HOST_ERROR, errorcode.CR_CONNECTION_ERROR, errorcode.CR_IPSOCK_ERROR, ) REPORT_ERRORS_EXTRA = [] DEFAULT_FABRIC_PROTOCOL = 'xmlrpc' MYSQL_FABRIC_PORT = { 'xmlrpc': 32274, 'mysql': 32275 } FABRICS = {} # For attempting to connect with Fabric _CNX_ATTEMPT_DELAY = 1 _CNX_ATTEMPT_MAX = 3 _GETCNX_ATTEMPT_DELAY = 1 _GETCNX_ATTEMPT_MAX = 3 MODE_READONLY = 1 MODE_WRITEONLY = 2 MODE_READWRITE = 3 STATUS_FAULTY = 0 STATUS_SPARE = 1 STATUS_SECONDARY = 2 STATUS_PRIMARY = 3 SCOPE_GLOBAL = 'GLOBAL' SCOPE_LOCAL = 'LOCAL' _SERVER_STATUS_FAULTY = 'FAULTY' _CNX_PROPERTIES = { # name: ((valid_types), description, default) 'group': ((str,), "Name of group of servers", None), 'key': (tuple([int, str, datetime.datetime, datetime.date] + list(UNICODE_TYPES)), "Sharding key", None), 'tables': ((tuple, list), "List of tables in query", None), 'mode': ((int,), "Read-Only, Write-Only or Read-Write", MODE_READWRITE), 'shard': ((str,), "Identity of the shard for direct connection", None), 'mapping': ((str,), "", None), 'scope': ((str,), "GLOBAL for accessing Global Group, or LOCAL", SCOPE_LOCAL), 'attempts': ((int,), "Attempts for getting connection", _GETCNX_ATTEMPT_MAX), 'attempt_delay': ((int,), "Seconds to wait between each attempt", _GETCNX_ATTEMPT_DELAY), } _LOGGER = logging.getLogger('myconnpy-fabric') class MySQLRPCProtocol(object): """Class using MySQL protocol to query Fabric. """ def __init__(self, fabric, host, port, connect_attempts, connect_delay): self.converter = MySQLConverter() self.handler = FabricMySQLConnection(fabric, host, port, connect_attempts, connect_delay) self.handler.connect() def _process_params_dict(self, params): """Process query parameters given as dictionary""" try: res = [] for key, value in list(params.items()): conv = value conv = self.converter.to_mysql(conv) conv = self.converter.escape(conv) conv = self.converter.quote(conv) res.append("{0}={1}".format(key, str(conv))) except Exception as err: raise mysql.connector.errors.ProgrammingError( "Failed processing pyformat-parameters; %s" % err) else: return res def _process_params(self, params): """Process query parameters.""" try: res = params res = [self.converter.to_mysql(i) for i in res] res = [self.converter.escape(i) for i in res] res = [self.converter.quote(i) for i in res] res = [str(i) for i in res] except Exception as err: raise mysql.connector.errors.ProgrammingError( "Failed processing format-parameters; %s" % err) else: return tuple(res) def _execute_cmd(self, stmt, params=None): """Executes the given query Returns a list containing response from Fabric """ if not params: params = () cur = self.handler.connection.cursor(dictionary=True) results = [] for res in cur.execute(stmt, params, multi=True): results.append(res.fetchall()) return results def create_params(self, *args, **kwargs): """Process arguments to create query parameters. """ params = [] if args: args = self._process_params(args) params.extend(args) if kwargs: kwargs = self._process_params_dict(kwargs) params.extend(kwargs) params = ', '.join(params) return params def execute(self, group, command, *args, **kwargs): """Executes the given command with MySQL protocol Executes the given command with the given parameters. Returns an iterator to navigate to navigate through the result set returned by Fabric """ params = self.create_params(*args, **kwargs) cmd = "CALL {0}.{1}({2})".format(group, command, params) fab_set = None try: data = self._execute_cmd(cmd) fab_set = FabricMySQLSet(data) except (Fault, socket.error, InterfaceError) as exc: msg = "Executing {group}.{command} failed: {error}".format( group=group, command=command, error=str(exc)) raise InterfaceError(msg) return fab_set class XMLRPCProtocol(object): """Class using XML-RPC protocol to query Fabric. """ def __init__(self, fabric, host, port, connect_attempts, connect_delay): self.handler = FabricXMLRPCConnection(fabric, host, port, connect_attempts, connect_delay) self.handler.connect() def execute(self, group, command, *args, **kwargs): """Executes the given command with XML-RPC protocol Executes the given command with the given parameters Returns an iterator to navigate to navigate through the result set returned by Fabric """ try: grp = getattr(self.handler.proxy, group) cmd = getattr(grp, command) except AttributeError as exc: raise ValueError("{group}.{command} not available ({err})".format( group=group, command=command, err=str(exc))) fab_set = None try: data = cmd(*args, **kwargs) fab_set = FabricSet(data) except (Fault, socket.error, InterfaceError) as exc: msg = "Executing {group}.{command} failed: {error}".format( group=group, command=command, error=str(exc)) raise InterfaceError(msg) return fab_set class FabricMySQLResponse(object): """Class used to parse a response got from Fabric with MySQL protocol. """ def __init__(self, data): info = data[0][0] (fabric_uuid_str, ttl, error) = (info['fabric_uuid'], info['ttl'], info['message']) if error: raise InterfaceError(error) self.fabric_uuid_str = fabric_uuid_str self.ttl = ttl self.coded_rows = data[1] class FabricMySQLSet(FabricMySQLResponse): """Iterator to navigate through the result set returned from Fabric with MySQL Protocol. """ def __init__(self, data): """Initialize the FabricSet object. """ super(FabricMySQLSet, self).__init__(data) self.__names = self.coded_rows[0].keys() self.__rows = self.coded_rows self.__result = collections.namedtuple('ResultSet', self.__names) def rowcount(self): """The number of rows in the result set. """ return len(self.__rows) def rows(self): """Iterate over the rows of the result set. Each row is a named tuple. """ for row in self.__rows: yield self.__result(**row) def row(self, index): """Indexing method for a row. Each row is a named tuple. """ return self.__result(**self.__rows[index]) class FabricResponse(object): """Class used to parse a response got from Fabric. """ SUPPORTED_VERSION = 1 def __init__(self, data): """Initialize the FabricResponse object """ (format_version, fabric_uuid_str, ttl, error, rows) = data if error: raise InterfaceError(error) if format_version != FabricResponse.SUPPORTED_VERSION: raise InterfaceError( "Supported protocol has version {sversion}. Got a response " "from MySQL Fabric with version {gversion}.".format( sversion=FabricResponse.SUPPORTED_VERSION, gversion=format_version) ) self.format_version = format_version self.fabric_uuid_str = fabric_uuid_str self.ttl = ttl self.coded_rows = rows class FabricSet(FabricResponse): """Iterator to navigate through the result set returned from Fabric """ def __init__(self, data): """Initialize the FabricSet object. """ super(FabricSet, self).__init__(data) assert len(self.coded_rows) == 1 self.__names = self.coded_rows[0]['info']['names'] self.__rows = self.coded_rows[0]['rows'] assert all(len(self.__names) == len(row) for row in self.__rows) or \ len(self.__rows) == 0 self.__result = collections.namedtuple('ResultSet', self.__names) def rowcount(self): """The number of rows in the result set. """ return len(self.__rows) def rows(self): """Iterate over the rows of the result set. Each row is a named tuple. """ for row in self.__rows: yield self.__result(*row) def row(self, index): """Indexing method for a row. Each row is a named tuple. """ return self.__result(*self.__rows[index]) def extra_failure_report(error_codes): """Add MySQL error to be reported to Fabric This function adds error_codes to the error list to be reported to Fabric. To reset the custom error reporting list, pass None or empty list. The error_codes argument can be either a MySQL error code defined in the errorcode module, or list of error codes. Raises AttributeError when code is not an int. """ global REPORT_ERRORS_EXTRA # pylint: disable=W0603 if not error_codes: REPORT_ERRORS_EXTRA = [] if not isinstance(error_codes, (list, tuple)): error_codes = [error_codes] for code in error_codes: if not isinstance(code, int) or not (code >= 1000 and code < 3000): raise AttributeError("Unknown or invalid error code.") REPORT_ERRORS_EXTRA.append(code) def _fabric_xmlrpc_uri(host, port): """Create an XMLRPC URI for connecting to Fabric This method will create a URI using the host and TCP/IP port suitable for connecting to a MySQL Fabric instance. Returns a URI. """ return 'http://{host}:{port}'.format(host=host, port=port) def _fabric_server_uuid(host, port): """Create a UUID using host and port""" return uuid.uuid3(uuid.NAMESPACE_URL, _fabric_xmlrpc_uri(host, port)) def _validate_ssl_args(ssl_ca, ssl_key, ssl_cert): """Validate the SSL argument. Raises AttributeError is required argument is not set. Returns dict or None. """ if not HAVE_SSL: raise InterfaceError("Python does not support SSL") if any([ssl_ca, ssl_key, ssl_cert]): if not ssl_ca: raise AttributeError("Missing ssl_ca argument.") if (ssl_key or ssl_cert) and not (ssl_key and ssl_cert): raise AttributeError( "ssl_key and ssl_cert need to be both " "specified, or neither." ) return { 'ca': ssl_ca, 'key': ssl_key, 'cert': ssl_cert, } return None if HAVE_SSL: class FabricHTTPSHandler(urllib2.HTTPSHandler): """Class handling HTTPS connections""" def __init__(self, ssl_config): #pylint: disable=E1002 """Initialize""" if PY2: urllib2.HTTPSHandler.__init__(self) else: super().__init__() # pylint: disable=W0104 self._ssl_config = ssl_config def https_open(self, req): """Open HTTPS connection""" return self.do_open(self.get_https_connection, req) def get_https_connection(self, host, timeout=300): """Returns a HTTPSConnection""" return HTTPSConnection( host, key_file=self._ssl_config['key'], cert_file=self._ssl_config['cert'] ) class FabricTransport(Transport): """Custom XMLRPC Transport for Fabric""" user_agent = 'MySQL Connector Python/{0}'.format(version.VERSION_TEXT) def __init__(self, username, password, #pylint: disable=E1002 verbose=0, use_datetime=False, https_handler=None): """Initialize""" if PY2: Transport.__init__(self, use_datetime=False) else: super().__init__(use_datetime=False) self._username = username self._password = password self._use_datetime = use_datetime self.verbose = verbose self._username = username self._password = password self._handlers = [] if self._username and self._password: self._passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm() self._auth_handler = urllib2.HTTPDigestAuthHandler(self._passmgr) else: self._auth_handler = None self._passmgr = None if https_handler: self._handlers.append(https_handler) self._scheme = 'https' else: self._scheme = 'http' if self._auth_handler: self._handlers.append(self._auth_handler) def request(self, host, handler, request_body, verbose=0): """Send XMLRPC request""" uri = '{scheme}://{host}{handler}'.format(scheme=self._scheme, host=host, handler=handler) if self._passmgr: self._passmgr.add_password(None, uri, self._username, self._password) if self.verbose: _LOGGER.debug("FabricTransport: {0}".format(uri)) opener = urllib2.build_opener(*self._handlers) headers = { 'Content-Type': 'text/xml', 'User-Agent': self.user_agent, } req = urllib2.Request(uri, request_body, headers=headers) try: return self.parse_response(opener.open(req)) except (urllib2.URLError, urllib2.HTTPError) as exc: try: code = -1 if exc.code == 400: reason = 'Permission denied' code = exc.code else: reason = exc.reason msg = "{reason} ({code})".format(reason=reason, code=code) except AttributeError: if 'SSL' in str(exc): msg = "SSL error" else: msg = str(exc) raise InterfaceError("Connection with Fabric failed: " + msg) except BadStatusLine: raise InterfaceError("Connection with Fabric failed: check SSL") class Fabric(object): """Class managing MySQL Fabric instances""" def __init__(self, host, username=None, password=None, port=None, connect_attempts=_CNX_ATTEMPT_MAX, connect_delay=_CNX_ATTEMPT_DELAY, report_errors=False, ssl_ca=None, ssl_key=None, ssl_cert=None, user=None, protocol=DEFAULT_FABRIC_PROTOCOL): """Initialize""" if protocol == 'xmlrpc': self._protocol_class = XMLRPCProtocol elif protocol == 'mysql': self._protocol_class = MySQLRPCProtocol else: raise InterfaceError( "Protocol not supported by MySQL Fabric," " was '{}'".format(protocol)) if not port: port = MYSQL_FABRIC_PORT[protocol] self._fabric_instances = {} self._fabric_uuid = None self._ttl = 1 * 60 # one minute by default self._version_token = None self._connect_attempts = connect_attempts self._connect_delay = connect_delay self._cache = FabricCache() self._group_balancers = {} self._init_host = host self._init_port = port self._ssl = _validate_ssl_args(ssl_ca, ssl_key, ssl_cert) self._report_errors = report_errors self._protocol = protocol if user and username: raise ValueError("can not specify both user and username") self._username = user or username self._password = password @property def username(self): """Return username used to authenticate with Fabric""" return self._username @property def password(self): """Return password used to authenticate with Fabric""" return self._password @property def ssl_config(self): """Return the SSL configuration""" return self._ssl def seed(self, host=None, port=None): """Get MySQL Fabric Instances This method uses host and port to connect to a MySQL Fabric server and get all the instances managing the same metadata. Raises InterfaceError on errors. """ host = host or self._init_host port = port or self._init_port fabinst = self._protocol_class(self, host, port, connect_attempts=self._connect_attempts, connect_delay=self._connect_delay) fabric_uuid, fabric_version, ttl, fabrics = self.get_fabric_servers( fabinst) if not fabrics: # Raise, something went wrong. raise InterfaceError("Failed getting list of Fabric servers") if self._version_token == fabric_version: return _LOGGER.info( "Loading Fabric configuration version {version}".format( version=fabric_version)) self._fabric_uuid = fabric_uuid self._version_token = fabric_version if ttl > 0: self._ttl = ttl # Update the Fabric servers for fabric in fabrics: inst = self._protocol_class(self, fabric['host'], fabric['port'], connect_attempts=self._connect_attempts, connect_delay=self._connect_delay) inst_uuid = inst.handler.uuid if inst_uuid not in self._fabric_instances: self._fabric_instances[inst_uuid] = inst _LOGGER.debug( "Added new Fabric server {host}:{port}".format( host=inst.handler.host, port=inst.handler.port)) def reset_cache(self, group=None): """Reset cached information This method destroys all cached information. """ if group: _LOGGER.debug("Resetting cache for group '{group}'".format( group=group)) self.get_group_servers(group, use_cache=False) else: _LOGGER.debug("Resetting cache") self._cache = FabricCache() def get_instance(self): """Get a MySQL Fabric Instance This method will get the next available MySQL Fabric Instance. Raises InterfaceError when no instance is available or connected. """ nxt = 0 errmsg = "No MySQL Fabric instance available" if not self._fabric_instances: raise InterfaceError(errmsg + " (not seeded?)") if PY2: instance_list = self._fabric_instances.keys() inst = self._fabric_instances[instance_list[nxt]] else: inst = self._fabric_instances[list(self._fabric_instances)[nxt]] if not inst.handler.is_connected: inst.handler.connect() return inst def report_failure(self, server_uuid, errno): """Report failure to Fabric This method sets the status of a MySQL server identified by server_uuid. """ if not self._report_errors: return errno = int(errno) current_host = socket.getfqdn() if errno in REPORT_ERRORS or errno in REPORT_ERRORS_EXTRA: _LOGGER.debug("Reporting error %d of server %s", errno, server_uuid) inst = self.get_instance() try: data = inst.execute('threat', 'report_failure', server_uuid, current_host, errno) FabricResponse(data) except (Fault, socket.error) as exc: _LOGGER.debug("Failed reporting server to Fabric (%s)", str(exc)) # Not requiring further action def get_fabric_servers(self, fabric_cnx=None): """Get all MySQL Fabric instances This method looks up the other MySQL Fabric instances which uses the same metadata. The returned list contains dictionaries with connection information such ass host and port. For example: [ {'host': 'fabric_prod_1.example.com', 'port': 32274 }, {'host': 'fabric_prod_2.example.com', 'port': 32274 }, ] Returns a list of dictionaries """ inst = fabric_cnx or self.get_instance() result = [] err_msg = "Looking up Fabric servers failed using {host}:{port}: {err}" try: fset = inst.execute('dump', 'fabric_nodes', "protocol." + self._protocol) for row in fset.rows(): result.append({'host': row.host, 'port': row.port}) except (Fault, socket.error) as exc: msg = err_msg.format(err=str(exc), host=inst.handler.host, port=inst.handler.port) raise InterfaceError(msg) except (TypeError, AttributeError) as exc: msg = err_msg.format( err="No Fabric server available ({0})".format(exc), host=inst.handler.host, port=inst.handler.port) raise InterfaceError(msg) try: fabric_uuid = uuid.UUID(fset.fabric_uuid_str) except TypeError: fabric_uuid = uuid.uuid4() fabric_version = 0 return fabric_uuid, fabric_version, fset.ttl, result def get_group_servers(self, group, use_cache=True): """Get all MySQL servers in a group This method returns information about all MySQL part of the given high-availability group. When use_cache is set to True, the cached information will be used. Raises InterfaceError on errors. Returns list of FabricMySQLServer objects. """ # Get group information from cache if use_cache: entry = self._cache.group_search(group) if entry: # Cache group information return entry.servers inst = self.get_instance() result = [] try: fset = inst.execute('dump', 'servers', self._version_token, group) except (Fault, socket.error) as exc: msg = ("Looking up MySQL servers failed for group " "{group}: {error}").format(error=str(exc), group=group) raise InterfaceError(msg) weights = [] for row in fset.rows(): # We make sure, when using local groups, we skip the global group if row.group_id == group: mysqlserver = FabricMySQLServer( row.server_uuid, row.group_id, row.host, row.port, row.mode, row.status, row.weight ) result.append(mysqlserver) if mysqlserver.status == STATUS_SECONDARY: weights.append((mysqlserver.uuid, mysqlserver.weight)) self._cache.cache_group(group, result) if weights: self._group_balancers[group] = WeightedRoundRobin(*weights) return result def get_group_server(self, group, mode=None, status=None): """Get a MySQL server from a group The method uses MySQL Fabric to get the correct MySQL server for the specified group. You can specify mode or status, but not both. The mode argument will decide whether the primary or a secondary server is returned. When no secondary server is available, the primary is returned. Status is used to force getting either a primary or a secondary. The returned tuple contains host, port and uuid. Raises InterfaceError on errors; ValueError when both mode and status are given. Returns a FabricMySQLServer object. """ if mode and status: raise ValueError( "Either mode or status must be given, not both") errmsg = "No MySQL server available for group '{group}'" servers = self.get_group_servers(group, use_cache=True) if not servers: raise InterfaceError(errmsg.format(group=group)) # Get the Master and return list (host, port, UUID) primary = None secondary = [] for server in servers: if server.status == STATUS_SECONDARY: secondary.append(server) elif server.status == STATUS_PRIMARY: primary = server if mode in (MODE_WRITEONLY, MODE_READWRITE) or status == STATUS_PRIMARY: if not primary: self.reset_cache(group=group) raise InterfaceError((errmsg + ' {query}={value}').format( query='status' if status else 'mode', group=group, value=status or mode)) return primary # Return primary if no secondary is available if not secondary and primary: return primary elif group in self._group_balancers: next_secondary = self._group_balancers[group].get_next()[0] for mysqlserver in secondary: if next_secondary == mysqlserver.uuid: return mysqlserver self.reset_cache(group=group) raise InterfaceError(errmsg.format(group=group, mode=mode)) def get_sharding_information(self, tables=None, database=None): """Get and cache the sharding information for given tables This method is fetching sharding information from MySQL Fabric and caches the result. The tables argument must be sequence of sequences contain the name of the database and table. If no database is given, the value for the database argument will be used. Examples: tables = [('salary',), ('employees',)] get_sharding_information(tables, database='employees') tables = [('salary', 'employees'), ('employees', employees)] get_sharding_information(tables) Raises InterfaceError on errors; ValueError when something is wrong with the tables argument. """ if not isinstance(tables, (list, tuple)): raise ValueError("tables should be a sequence") patterns = [] for table in tables: if not isinstance(table, (list, tuple)) and not database: raise ValueError("No database specified for table {0}".format( table)) if isinstance(table, (list, tuple)): dbase = table[1] tbl = table[0] else: dbase = database tbl = table patterns.append("{0}.{1}".format(dbase, tbl)) inst = self.get_instance() try: fset = inst.execute( 'dump', 'sharding_information', self._version_token, ','.join(patterns) ) except (Fault, socket.error) as exc: msg = "Looking up sharding information failed : {error}".format( error=str(exc)) raise InterfaceError(msg) for row in fset.rows(): self._cache.sharding_cache_table( FabricShard(row.schema_name, row.table_name, row.column_name, row.lower_bound, row.shard_id, row.type_name, row.group_id, row.global_group) ) def get_shard_server(self, tables, key, scope=SCOPE_LOCAL, mode=None): """Get MySQL server information for a particular shard Raises DatabaseError when the table is unknown or when tables are not on the same shard. ValueError is raised when there is a problem with the methods arguments. InterfaceError is raised for other errors. """ if not isinstance(tables, (list, tuple)): raise ValueError("tables should be a sequence") groups = [] for dbobj in tables: try: database, table = dbobj.split('.') except ValueError: raise ValueError( "tables should be given as <database>.<table>, " "was {0}".format(dbobj)) entry = self._cache.sharding_search(database, table) if not entry: self.get_sharding_information((table,), database) entry = self._cache.sharding_search(database, table) if not entry: raise DatabaseError( errno=errorcode.ER_BAD_TABLE_ERROR, msg="Unknown table '{database}.{table}'".format( database=database, table=table)) if scope == 'GLOBAL': return self.get_group_server(entry.global_group, mode=mode) if entry.shard_type == 'RANGE': try: range_key = int(key) except ValueError: raise ValueError("Key must be an integer for RANGE") partitions = entry.keys index = partitions[bisect(partitions, range_key) - 1] partition = entry.partitioning[index] elif entry.shard_type == 'RANGE_DATETIME': if not isinstance(key, (datetime.date, datetime.datetime)): raise ValueError( "Key must be datetime.date or datetime.datetime for " "RANGE_DATETIME") index = None for partkey in entry.keys_reversed: if key >= partkey: index = partkey break try: partition = entry.partitioning[index] except KeyError: raise ValueError("Key invalid; was '{0}'".format(key)) elif entry.shard_type == 'RANGE_STRING': if not isunicode(key): raise ValueError("Key must be a unicode value") index = None for partkey in entry.keys_reversed: if key >= partkey: index = partkey break try: partition = entry.partitioning[index] except KeyError: raise ValueError("Key invalid; was '{0}'".format(key)) elif entry.shard_type == 'HASH': md5key = md5(str(key)) index = entry.keys_reversed[-1] for partkey in entry.keys_reversed: if md5key.digest() >= b16decode(partkey): index = partkey break partition = entry.partitioning[index] else: raise InterfaceError( "Unsupported sharding type {0}".format(entry.shard_type)) groups.append(partition['group']) if not all(group == groups[0] for group in groups): raise DatabaseError( "Tables are located in different shards.") return self.get_group_server(groups[0], mode=mode) def execute(self, group, command, *args, **kwargs): """Execute a Fabric command from given group This method will execute the given Fabric command from the given group using the given arguments. It returns an instance of FabricSet. Raises ValueError when group.command is not valid and raises InterfaceError when an error occurs while executing. Returns FabricSet. """ inst = self.get_instance() return inst.execute(group, command, *args, **kwargs) class FabricConnection(object): """Base Class for a class holding a connection to a MySQL Fabric server """ def __init__(self, fabric, host, port=MYSQL_FABRIC_PORT[DEFAULT_FABRIC_PROTOCOL], connect_attempts=_CNX_ATTEMPT_MAX, connect_delay=_CNX_ATTEMPT_DELAY): """Initialize""" if not isinstance(fabric, Fabric): raise ValueError("fabric must be instance of class Fabric") self._fabric = fabric self._host = host self._port = port self._connect_attempts = connect_attempts self._connect_delay = connect_delay @property def host(self): """Returns server IP or name of current Fabric connection""" return self._host @property def port(self): """Returns TCP/IP port of current Fabric connection""" return self._port @property def uuid(self): """Returns UUID of the Fabric server we are connected with""" return _fabric_server_uuid(self._host, self._port) def connect(self): """Connect with MySQL Fabric""" pass @property def is_connected(self): """Check whether connection with Fabric is valid Return True if we can still interact with the Fabric server; False if Not. Returns True or False. """ pass def __repr__(self): return "{class_}(host={host}, port={port})".format( class_=self.__class__, host=self._host, port=self._port, ) class FabricXMLRPCConnection(FabricConnection): """Class holding a connection to a MySQL Fabric server through XML-RPC""" def __init__(self, fabric, host, port=MYSQL_FABRIC_PORT['xmlrpc'], connect_attempts=_CNX_ATTEMPT_MAX, connect_delay=_CNX_ATTEMPT_DELAY): """Initialize""" super(FabricXMLRPCConnection, self).__init__( fabric, host, port, connect_attempts, connect_delay ) self._proxy = None @property def proxy(self): """Returns the XMLRPC Proxy of current Fabric connection""" return self._proxy @property def uri(self): """Returns the XMLRPC URI for current Fabric connection""" return _fabric_xmlrpc_uri(self._host, self._port) def _xmlrpc_get_proxy(self): """Return the XMLRPC server proxy instance to MySQL Fabric This method tries to get a valid connection to a MySQL Fabric server. Returns a XMLRPC ServerProxy instance. """ if self.is_connected: return self._proxy attempts = self._connect_attempts delay = self._connect_delay proxy = None counter = 0 while counter != attempts: counter += 1 try: if self._fabric.ssl_config: if not HAVE_SSL: raise InterfaceError("Python does not support SSL") https_handler = FabricHTTPSHandler(self._fabric.ssl_config) else: https_handler = None transport = FabricTransport(self._fabric.username, self._fabric.password, verbose=0, https_handler=https_handler) proxy = ServerProxy(self.uri, transport=transport, verbose=0) proxy._some_nonexisting_method() # pylint: disable=W0212 except Fault: # We are actually connected return proxy except socket.error as exc: if counter == attempts: raise InterfaceError( "Connection to MySQL Fabric failed ({0})".format(exc)) _LOGGER.debug( "Retrying {host}:{port}, attempts {counter}".format( host=self.host, port=self.port, counter=counter)) if delay > 0: time.sleep(delay) def connect(self): """Connect with MySQL Fabric""" self._proxy = self._xmlrpc_get_proxy() @property def is_connected(self): """Check whether connection with Fabric is valid Return True if we can still interact with the Fabric server; False if Not. Returns True or False. """ try: self._proxy._some_nonexisting_method() # pylint: disable=W0212 except Fault: return True except (TypeError, AttributeError): return False else: return False class FabricMySQLConnection(FabricConnection): """ Class holding a connection to a MySQL Fabric server through MySQL protocol """ def __init__(self, fabric, host, port=MYSQL_FABRIC_PORT['mysql'], connect_attempts=_CNX_ATTEMPT_MAX, connect_delay=_CNX_ATTEMPT_DELAY): """Initialize""" super(FabricMySQLConnection, self).__init__( fabric, host, port=port, connect_attempts=connect_attempts, connect_delay=connect_delay ) self._connection = None @property def connection(self): """Returns the MySQL RPC Connection to Fabric""" return self._connection def _get_connection(self): """Return the connection instance to MySQL Fabric through MySQL RPC This method tries to get a valid connection to a MySQL Fabric server. Returns a MySQLConnection instance. """ if self.is_connected: return self._connection attempts = self._connect_attempts delay = self._connect_delay counter = 0 while counter != attempts: counter += 1 try: dbconfig = { 'host': self._host, 'port': self._port, 'user': self._fabric.username, 'password': self._fabric.password } if self._fabric.ssl_config: if not HAVE_SSL: raise InterfaceError("Python does not support SSL") dbconfig['ssl_key'] = self._fabric.ssl_config['key'] dbconfig['ssl_cert'] = self._fabric.ssl_config['cert'] return MySQLConnection(**dbconfig) except AttributeError as exc: if counter == attempts: raise InterfaceError( "Connection to MySQL Fabric failed ({0})".format(exc)) _LOGGER.debug( "Retrying {host}:{port}, attempts {counter}".format( host=self.host, port=self.port, counter=counter)) if delay > 0: time.sleep(delay) def connect(self): """Connect with MySQL Fabric""" self._connection = self._get_connection() @property def is_connected(self): """Check whether connection with Fabric is valid Return True if we can still interact with the Fabric server; False if Not. Returns True or False. """ try: return self._connection.is_connected() except AttributeError: return False class MySQLFabricConnection(object): """Connection to a MySQL server through MySQL Fabric""" def __init__(self, **kwargs): """Initialize""" self._mysql_cnx = None self._fabric = None self._fabric_mysql_server = None self._mysql_config = None self._cnx_properties = {} self.reset_properties() # Validity of fabric-argument is checked in config()-method if 'fabric' not in kwargs: raise ValueError("Configuration parameters for Fabric missing") if kwargs: self.store_config(**kwargs) def __getattr__(self, attr): """Return the return value of the MySQLConnection instance""" if attr.startswith('cmd_'): raise NotSupportedError( "Calling {attr} is not supported for connections managed by " "MySQL Fabric.".format(attr=attr)) return getattr(self._mysql_cnx, attr) @property def fabric_uuid(self): """Returns the Fabric UUID of the MySQL server""" if self._fabric_mysql_server: return self._fabric_mysql_server.uuid return None @property def properties(self): """Returns connection properties""" return self._cnx_properties def reset_cache(self, group=None): """Reset cache for this connection's group""" if not group and self._fabric_mysql_server: group = self._fabric_mysql_server.group self._fabric.reset_cache(group=group) def is_connected(self): """Check whether we are connected with the MySQL server Returns True or False """ return self._mysql_cnx is not None def reset_properties(self): """Resets the connection properties This method can be called to reset the connection properties to their default values. """ self._cnx_properties = {} for key, attr in _CNX_PROPERTIES.items(): self._cnx_properties[key] = attr[2] def set_property(self, **properties): """Set one or more connection properties Arguments to the set_property() method will be used as properties. They are validated against the _CNX_PROPERTIES constant. Raise ValueError in case an invalid property is being set. TypeError is raised when the type of the value is not correct. To unset a property, set it to None. """ try: self.close() except Error: # We tried, but it's OK when we fail. pass props = self._cnx_properties for name, value in properties.items(): if name not in _CNX_PROPERTIES: raise ValueError( "Invalid property connection {0}".format(name)) elif value and not isinstance(value, _CNX_PROPERTIES[name][0]): valid_types_str = ' or '.join( [atype.__name__ for atype in _CNX_PROPERTIES[name][0]]) raise TypeError( "{name} is not valid, excepted {typename}".format( name=name, typename=valid_types_str)) if (name == 'group' and value and (props['key'] or props['tables'])): raise ValueError( "'group' property can not be set when 'key' or " "'tables' are set") elif name in ('key', 'tables') and value and props['group']: raise ValueError( "'key' and 'tables' property can not be " "set together with 'group'") elif name == 'scope' and value not in (SCOPE_LOCAL, SCOPE_GLOBAL): raise ValueError("Invalid value for 'scope'") elif name == 'mode' and value not in ( MODE_READWRITE, MODE_READONLY): raise ValueError("Invalid value for 'mode'") if value is None: # Set the default props[name] = _CNX_PROPERTIES[name][2] else: props[name] = value def _configure_fabric(self, config): """Configure the Fabric connection The config argument can be either a dictionary containing the necessary information to setup the connection. Or config can be an instance of Fabric. """ if isinstance(config, Fabric): self._fabric = config else: required_keys = ['host'] for required_key in required_keys: if required_key not in config: raise ValueError( "Missing configuration parameter '{parameter}' " "for fabric".format(parameter=required_key)) host = config['host'] protocol = config.get('protocol', DEFAULT_FABRIC_PROTOCOL) try: port = config.get('port', MYSQL_FABRIC_PORT[protocol]) except KeyError: raise InterfaceError( "{0} protocol is not available".format(protocol)) server_uuid = _fabric_server_uuid(host, port) try: self._fabric = FABRICS[server_uuid] except KeyError: _LOGGER.debug("New Fabric connection") self._fabric = Fabric(**config) self._fabric.seed() # Cache the new connection FABRICS[server_uuid] = self._fabric def store_config(self, **kwargs): """Store configuration of MySQL connections to use with Fabric The configuration found in the dictionary kwargs is used when instanciating a MySQLConnection object. The host and port entries are used to connect to MySQL Fabric. Raises ValueError when the Fabric configuration parameter is not correct or missing; AttributeError is raised when when a paramater is not valid. """ config = kwargs.copy() # Configure the Fabric connection if 'fabric' in config: self._configure_fabric(config['fabric']) del config['fabric'] if 'unix_socket' in config: _LOGGER.warning("MySQL Fabric does not use UNIX sockets.") config['unix_socket'] = None # Try to use the configuration test_config = config.copy() if 'pool_name' in test_config: del test_config['pool_name'] if 'pool_size' in test_config: del test_config['pool_size'] if 'pool_reset_session' in test_config: del test_config['pool_reset_session'] try: pool = MySQLConnectionPool(pool_name=str(uuid.uuid4())) pool.set_config(**test_config) except AttributeError as err: raise AttributeError( "Connection configuration not valid: {0}".format(err)) self._mysql_config = config def _connect(self): """Get a MySQL server based on properties and connect This method gets a MySQL server from MySQL Fabric using already properties set using the set_property() method. You can specify how many times and the delay between trying using attempts and attempt_delay. Raises ValueError when there are problems with arguments or properties; InterfaceError on connectivity errors. """ if self.is_connected(): return props = self._cnx_properties attempts = props['attempts'] attempt_delay = props['attempt_delay'] dbconfig = self._mysql_config.copy() counter = 0 while counter != attempts: counter += 1 try: group = None if props['tables']: if props['scope'] == 'LOCAL' and not props['key']: raise ValueError( "Scope 'LOCAL' needs key property to be set") mysqlserver = self._fabric.get_shard_server( props['tables'], props['key'], scope=props['scope'], mode=props['mode']) elif props['group']: group = props['group'] mysqlserver = self._fabric.get_group_server( group, mode=props['mode']) else: raise ValueError( "Missing group or key and tables properties") except InterfaceError as exc: _LOGGER.debug( "Trying to get MySQL server (attempt {0}; {1})".format( counter, exc)) if counter == attempts: raise InterfaceError("Error getting connection: {0}".format( exc)) if attempt_delay > 0: _LOGGER.debug("Waiting {0}".format(attempt_delay)) time.sleep(attempt_delay) continue # Make sure we do not change the stored configuration dbconfig['host'] = mysqlserver.host dbconfig['port'] = mysqlserver.port try: self._mysql_cnx = mysql.connector.connect(**dbconfig) except Error as exc: if counter == attempts: self.reset_cache(mysqlserver.group) self._fabric.report_failure(mysqlserver.uuid, exc.errno) raise InterfaceError( "Reported faulty server to Fabric ({0})".format(exc)) if attempt_delay > 0: time.sleep(attempt_delay) continue else: self._fabric_mysql_server = mysqlserver break def disconnect(self): """Close connection to MySQL server""" try: self.rollback() self._mysql_cnx.close() except AttributeError: pass # There was no connection except Error: raise finally: self._mysql_cnx = None self._fabric_mysql_server = None close = disconnect def cursor(self, buffered=None, raw=None, prepared=None, cursor_class=None): """Instantiates and returns a cursor This method is similar to MySQLConnection.cursor() except that it checks whether the connection is available and raises an InterfaceError when not. cursor_class argument is not supported and will raise a NotSupportedError exception. Returns a MySQLCursor or subclass. """ self._connect() if cursor_class: raise NotSupportedError( "Custom cursors not supported with MySQL Fabric") if prepared: raise NotSupportedError( "Prepared Statements are not supported with MySQL Fabric") if self._unread_result is True: raise InternalError("Unread result found.") buffered = buffered or self._buffered raw = raw or self._raw cursor_type = 0 if buffered is True: cursor_type |= 1 if raw is True: cursor_type |= 2 types = ( MySQLCursor, # 0 MySQLCursorBuffered, MySQLCursorRaw, MySQLCursorBufferedRaw, ) return (types[cursor_type])(self) def handle_mysql_error(self, exc): """Handles MySQL errors This method takes a mysql.connector.errors.Error exception and checks the error code. Based on the value, it takes certain actions such as clearing the cache. """ if exc.errno in RESET_CACHE_ON_ERROR: self.reset_cache() self.disconnect() raise MySQLFabricError( "Temporary error ({error}); " "retry transaction".format(error=str(exc))) raise exc def commit(self): """Commit current transaction Raises whatever MySQLConnection.commit() raises, but raises MySQLFabricError when MySQL returns error ER_OPTION_PREVENTS_STATEMENT. """ try: self._mysql_cnx.commit() except Error as exc: self.handle_mysql_error(exc) def rollback(self): """Rollback current transaction Raises whatever MySQLConnection.rollback() raises, but raises MySQLFabricError when MySQL returns error ER_OPTION_PREVENTS_STATEMENT. """ try: self._mysql_cnx.rollback() except Error as exc: self.handle_mysql_error(exc) def cmd_query(self, statement): """Send a statement to the MySQL server Raises whatever MySQLConnection.cmd_query() raises, but raises MySQLFabricError when MySQL returns error ER_OPTION_PREVENTS_STATEMENT. Returns a dictionary. """ self._connect() try: return self._mysql_cnx.cmd_query(statement) except Error as exc: self.handle_mysql_error(exc) def cmd_query_iter(self, statements): """Send one or more statements to the MySQL server Raises whatever MySQLConnection.cmd_query_iter() raises, but raises MySQLFabricError when MySQL returns error ER_OPTION_PREVENTS_STATEMENT. Returns a dictionary. """ self._connect() try: return self._mysql_cnx.cmd_query_iter(statements) except Error as exc: self.handle_mysql_error(exc)