D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
self
/
root
/
opt
/
cloudlinux
/
venv
/
lib64
/
python3.11
/
site-packages
/
ssa
/
modules
/
Filename :
storage.py
back
Copy
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ Represents storage where ssa data is collected, stored and extracted """ import itertools from dataclasses import dataclass from typing import List, Iterator, Tuple, Dict import sqlalchemy from sqlalchemy import func, cast, case, literal_column, distinct, text from ssa.db import session_scope, RequestResult @dataclass class DomainData: domain_name: str domain_total_reqs: List[int] is_a_wordpress_domain: bool urls_number: int def iter_domains_data(engine) -> Iterator[DomainData]: """ Iterates data from database domain-by-domain. """ with session_scope(engine) as db: results_by_hour = db.query( RequestResult.domain, func.strftime('%H', RequestResult.created_at), func.Count(RequestResult.id), func.max(RequestResult.wordpress), func.count(distinct(RequestResult.path)) ).group_by( RequestResult.domain, func.strftime('%H', RequestResult.created_at) ).order_by( RequestResult.domain, func.strftime('%H', RequestResult.created_at) ) results_by_hour_grouped = itertools.groupby(results_by_hour, key=lambda item: item[0]) for domain_name, group in results_by_hour_grouped: domain_results_by_hour = tuple(group) urls_number = 0 # at some hours there may be no requests # so we must normalize data to match 24h data format requests_number_by_hour = [0] * 24 for _, hour, requests_num, is_wordpress, urls in domain_results_by_hour: requests_number_by_hour[int(hour)] = requests_num urls_number = max(urls_number, urls) yield DomainData( domain_name=domain_name, domain_total_reqs=requests_number_by_hour, is_a_wordpress_domain=is_wordpress, urls_number=urls_number ) def iter_urls_data(engine, domain_name, all_paths): """ Iterates urls data from database url-by-url. """ with session_scope(engine) as db: all_paths_escaped = [path.replace(":", "\\:") for path in all_paths] urls_data = db.query( RequestResult.path, func.strftime('%H', RequestResult.created_at), func.Sum(cast( RequestResult.hitting_limits, sqlalchemy.Integer )).label('url_throttled_reqs'), func.Count( RequestResult.id ).label('url_total_reqs'), func.Sum(cast( RequestResult.is_slow_request, sqlalchemy.Integer) ).label('url_slow_reqs') ).filter( RequestResult.domain == domain_name ).filter( text(RequestResult.path.in_(all_paths_escaped).expression.compile(compile_kwargs={"literal_binds": True}).string) ).group_by( RequestResult.path, func.strftime('%H', RequestResult.created_at) ).order_by( RequestResult.path, func.strftime('%H', RequestResult.created_at) ) previous_path = None url_throttled_reqs, url_total_reqs, url_slow_reqs = \ [0] * 24, [0] * 24, [0] * 24 for path, hour, url_throttled_req, url_total_req, url_slow_req in urls_data: if previous_path and previous_path != path: yield previous_path, dict( path=previous_path, url_throttled_reqs=url_throttled_reqs, url_total_reqs=url_total_reqs, url_slow_reqs=url_slow_reqs ) url_throttled_reqs, url_total_reqs, url_slow_reqs = \ [0] * 24, [0] * 24, [0] * 24 url_throttled_reqs[int(hour)] = url_throttled_req url_total_reqs[int(hour)] = url_total_req url_slow_reqs[int(hour)] = url_slow_req previous_path = path yield path, dict( path=path, url_throttled_reqs=url_throttled_reqs, url_total_reqs=url_total_reqs, url_slow_reqs=url_slow_reqs ) def get_url_durations(engine, domain_name) -> Dict[str, Tuple[int]]: """ Get information about durations of requests url-by-url. """ with session_scope(engine) as db: urls_data = db.query( RequestResult.path, RequestResult.duration ).filter( RequestResult.domain == domain_name ).order_by( RequestResult.path ) durations_by_path = itertools.groupby( list(urls_data), lambda item: item[0]) for key, group in durations_by_path: yield key, [duration for _, duration in group]