ZerethShell
File Manager
SQL Manager
/
opt
/
cloudlinux
/
venv
/
lib64
/
python3.11
/
site-packages
/
wmt
/
common
utils.py
#!/opt/cloudlinux/venv/bin/python3 -bb # coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # import contextlib import logging import fcntl import os import time from urllib.parse import urlparse import requests import subprocess from clcommon import cpapi from clcommon.utils import get_rhn_systemid_value from wmt.common.url_parser import parse from wmt.common.const import ( CLICKHOUSE_WMT_ENDPOINT, JWT_TOKEN, UNKNOWN_RHN_ID, SERVICE_BIN, CHKCONFIG_BIN, SYSTEMCTL_BIN ) # logic copied from get_domains script, that is used by go def get_domains(): domains = set() if cpapi.CP_NAME == cpapi.PLESK_NAME: users = [_cpinfo[0] for _cpinfo in cpapi.cpinfo(keyls=('cplogin',))] else: users = cpapi.cpusers() if not users: return domains try: suspended_users = cpapi.suspended_users_list() except Exception: logging.exception('Cannot obtain list of suspended users') suspended_users = [] # dirty hack to reload domains in runtime # TODO: replace with more suitable solution if cpapi.CP_NAME == 'cPanel': cpapi.plugins.cpanel._user_to_domains_map_cpanel = dict() for user in users: if user in suspended_users: logging.warning('User: %s is will not be pinged, because account is suspended', user) continue for domain, _ in cpapi.userdomains(user): domains.add(parse(domain)) return domains def setup_logger(logger_name): app_logger = logging.getLogger(logger_name) app_logger.setLevel(logging.DEBUG) try: old_umask = os.umask(0o137) try: fh = logging.FileHandler('/var/log/cl_wmt.log') finally: os.umask(old_umask) except IOError: pass else: # Use FileHandler.baseFilename so test fixtures that redirect the # handler at a tempfile chmod the same path the FileHandler opened, # and so future relocation of the log path doesn't drift the chmod. try: os.chmod(fh.baseFilename, 0o600) except OSError: pass fh.formatter = logging.Formatter('[%(levelname)s | %(asctime)s]: %(message)s') app_logger.addHandler(fh) return app_logger @contextlib.contextmanager def save_pid_and_lock(file: str, pid: str): # 'a+' creates the file if missing but does NOT truncate. Truncating # before flock would destroy a running daemon's PID if a second startup # raced in and lost the lock acquisition below. f = open(file, 'a+') try: fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) except OSError: f.close() raise OSError('Process %s already running!' % pid) try: f.seek(0) f.truncate() f.write(pid) f.flush() os.fsync(f) yield finally: fcntl.flock(f.fileno(), fcntl.LOCK_UN) f.close() os.remove(file) def intersect(d1, d2): result = {} for key in d1: if key in d2: result[key] = d1[key], d2[key] return result def send_report(url, report, headers=None): error = requests.RequestException('Error while sending report to ClickHouse') for i in range(5): try: response = requests.post(url, json=report, headers=headers, timeout=60) response.raise_for_status() except requests.RequestException as err: error = err time.sleep(min(4 ** i, 30)) else: break else: raise error def _strip_url_to_origin(url): parsed = urlparse(url) if parsed.scheme and parsed.netloc: return '{}://{}'.format(parsed.scheme, parsed.netloc) return url def _read_jwt_token(): # UnicodeDecodeError (subclass of ValueError, not OSError) covers a # corrupt or binary-overwritten token file; without it the daemon # would propagate an unhandled exception through send_report_to_clickhouse, # violating the "graceful fallback to unauthenticated" contract. try: with open(JWT_TOKEN, 'r') as f: return f.read().strip() except (IOError, OSError, UnicodeDecodeError): logging.warning('Cannot read JWT token from %s', JWT_TOKEN) return None def send_report_to_clickhouse(report): systemd_id = get_rhn_systemid_value('system_id') or UNKNOWN_RHN_ID # ID-XXXXXXXX -> XXXXXXXX report['server_id'] = systemd_id.replace('ID-', '') summary = report.pop('summary_report') # !17 — minimize disclosure: send scheme + hostname only for entry in report.get('error_report', []): entry['url'] = _strip_url_to_origin(entry['url']) for entry in report.get('duration_report', []): entry['url'] = _strip_url_to_origin(entry['url']) # !15 — attach JWT bearer if available; falls through with empty # headers when the token file is missing (defense-in-depth, not # enforced server-side). headers = {} token = _read_jwt_token() if token: headers['Authorization'] = 'Bearer ' + token send_report(CLICKHOUSE_WMT_ENDPOINT, {**report, **summary}, headers=headers) def enable_wmt_daemon(daemon_name, is_cl6): """ Enable cl_wmt_scanner service """ if is_cl6: # CL6 # Start wmt service subprocess.run([SERVICE_BIN, daemon_name, 'start'], capture_output=True) subprocess.run([CHKCONFIG_BIN, '--add', daemon_name], capture_output=True) else: # CL7, CL8 subprocess.run([SYSTEMCTL_BIN, 'daemon-reload'], capture_output=True) # Start wmt daemon subprocess.run([SYSTEMCTL_BIN, 'enable', daemon_name], capture_output=True) subprocess.run([SYSTEMCTL_BIN, 'start', daemon_name], capture_output=True) def disable_wmt_daemon(daemon_name, is_cl6): """ Disable WMT daemon :return: """ if is_cl6: # CL6 subprocess.run([SERVICE_BIN, daemon_name, 'stop'], capture_output=True) subprocess.run([CHKCONFIG_BIN, '--del', daemon_name], capture_output=True) else: # CL7, CL8 subprocess.run([SYSTEMCTL_BIN, 'kill', daemon_name], capture_output=True) subprocess.run([SYSTEMCTL_BIN, 'disable', daemon_name], capture_output=True) def manage_crons(status=True): cron_tool = '/usr/share/web-monitoring-tool/cron_control.py' if status: command = [cron_tool, '-i'] else: command = [cron_tool, '-d'] subprocess.run(command, capture_output=True, text=True)
Kaydet
Vazgeç