D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
self
/
root
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clcagefslib
/
Filename :
domain.py
back
Copy
#!/opt/cloudlinux/venv/bin/python3 -sbb # -*- coding: utf-8 -*- # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # import logging import os import subprocess from collections import defaultdict from pathlib import Path from clcommon import ClPwd from clcommon.clcagefs import setup_mount_dir_cagefs, _remount_cagefs from clcommon.cpapi import cpusers, userdomains from clcommon.cpapi import docroot as get_domain_docroot from clcommon.cpapi.cpapiexceptions import NoDomain, NoPanelUser from .io import write_via_tmp from .webisolation import config, jail_utils from .webisolation.config import DOCROOTS_ISOLATED_BASE from .webisolation.jail_config import MountConfig from .webisolation.mount_config import IsolatedRootConfig from .webisolation.mount_types import MountType from .webisolation.mount_ordering import build_docroot_tree, process_ordered_mounts from .webisolation.php import reload_processes_with_docroots from .webisolation.service import start_monitoring_service, stop_monitoring_service WEBSITE_ISOLATION_MARKER = "/opt/cloudlinux/flags/enabled-flags.d/website-isolation.flag" WEBSITE_ISOLATION_AVAILABLE_MARKER = ( "/opt/cloudlinux/flags/available-flags.d/website-isolation.flag" ) class WebsiteMountsConfig: def __init__(self, user): self.user = user self._all_docroots: set[str] = set() self._isolated_docroots: set[str] = set() def add_docroot(self, docroot): self._all_docroots.add(docroot) def enable_isolation(self, docroot): self._isolated_docroots.add(docroot) def generate_mounts_conf(self): pw = ClPwd().get_pw_by_name(self.user) homedir = pw.pw_dir uid, gid = pw.pw_uid, pw.pw_gid # Build docroot tree once for all isolated docroots tree = build_docroot_tree(self._all_docroots) # Generate config for each isolated docroot generated_configs = [] for docroot in sorted(self._isolated_docroots, key=len): split_storage_base = jail_utils.full_website_path(homedir, docroot) home_overlay = IsolatedRootConfig( root_path=f"{split_storage_base}/home", target=homedir, persistent=True ) # Process ordered mounts for this isolated docroot docroot_mounts = process_ordered_mounts( active_docroot=docroot, tree=tree, uid=uid, gid=gid ) jail_config = MountConfig(uid=uid, gid=gid) # Add storage for the overlay'ed dir jail_config.add_overlay(home_overlay) # open .clwpos directory to make redis.sock available awp_path = f"{homedir}/.clwpos" home_overlay.mount(MountType.BIND, awp_path, awp_path, ("mkdir",)) # Add docroot mounts (from tree processing) # Mount them into already created overlay for mount in docroot_mounts: home_overlay.mount(mount.type, mount.source, mount.target, mount.options) # Apply mounts from isolated root and close target directory jail_config.close_overlay(home_overlay) # Home directory is already overlayed, we can apply per-domain mounts directly jail_config.add(MountType.USER_MOUNTS, "/") # Override proxyexec token with website specific folder jail_config.add( MountType.BIND, source=f"/var/.cagefs/website/{jail_utils.get_website_id(docroot)}", target="/var/.cagefs", ) generated_configs.append(jail_config.render(docroot)) return "\n".join(generated_configs) def is_website_isolation_allowed_server_wide(): return os.path.isfile(WEBSITE_ISOLATION_MARKER) def is_website_isolation_feature_available(): return os.path.isfile(WEBSITE_ISOLATION_AVAILABLE_MARKER) def allow_website_isolation_server_wide(): setup_mount_dir_cagefs( str(DOCROOTS_ISOLATED_BASE), prefix="*", remount_cagefs=True, remount_in_background=False ) Path(WEBSITE_ISOLATION_MARKER).touch() def disallow_website_isolation_server_wide(): users_with_domain_isolation = users_with_enabled_domain_isolation() for user in users_with_domain_isolation: try: with_isolation = config.load_user_config(user).enabled_websites # deletes config while when None config.save_user_config(user, config=None) _regenerate_mounts_configuration(user, None) domain_docroot_map = {domain: _get_docroot_or_none(domain) for domain in with_isolation} reload_processes_with_docroots( user, filter_by_docroots=list(domain_docroot_map.values()) ) for domain in with_isolation: document_root = domain_docroot_map[domain] if document_root is None: logging.error( "Unable to detect document root for domain %s, " "configuration cleanup failed. Contact CloudLinux support " "if the error repeats.", domain, ) continue jail_utils.remove_website_token_directory(user, document_root) except Exception: logging.error("Unable to disallow website isolation for user %s, skipping.", user) continue stop_monitoring_service() Path(WEBSITE_ISOLATION_MARKER).unlink(missing_ok=True) def _get_docroot_or_none(domain: str): try: return get_domain_docroot(domain)[0] except (NoDomain, IndexError): return None def is_isolation_enabled(user): if not is_website_isolation_allowed_server_wide(): return False domains_config_path = jail_utils.get_jail_config_path(user) return os.path.exists(domains_config_path) def users_with_enabled_domain_isolation() -> dict: users = [u for u in cpusers() if is_isolation_enabled(u)] user_domain_pairs = {} for user in users: domains_with_isolation = get_websites_with_enabled_isolation(user) if domains_with_isolation: user_domain_pairs[user] = domains_with_isolation return user_domain_pairs def get_websites_with_enabled_isolation(user: str): return config.load_user_config(user).enabled_websites def get_docroots_of_isolated_websites() -> dict: """ Returns pairs user: set(docroots) for all users with website isolation enabled Used by monitoring service to watch docroots changes to load actual list of docroot paths instead of stale storage """ users_with_isolation = users_with_enabled_domain_isolation() pairs = defaultdict(set) for user, domains in users_with_isolation.items(): for domain in domains: try: dr = get_domain_docroot(domain)[0] except (NoDomain, IndexError): continue pairs[user].add(dr) return pairs def enable_website_isolation(user, domain): user_config = config.load_user_config(user) or config.UserConfig() is_first_website = not user_config.enabled_websites if domain not in user_config.enabled_websites: user_config.enabled_websites.append(domain) # if it crashes just let the command fail with NoDomain # exception, it should be a very rare case because # we validate input domain name in cagefsctl.py document_root = get_domain_docroot(domain)[0] # Create website token directory and overlay storage jail_utils.create_website_token_directory(user, document_root) jail_utils.create_overlay_storage_directory(user, document_root) config.save_user_config(user, user_config) # Remount cagefs for the user if this is the first # website to ensure split mount is created if is_first_website: homedir = ClPwd().get_pw_by_name(user).pw_dir is_redis_running = os.path.exists(f"{homedir}/.clwpos/redis.sock") _remount_cagefs(user) # regenerate alt-php ini configuration for selector subprocess.run(["cagefsctl", "--rebuild-alt-php-ini", user], check=True) # remount kills redis process if is_redis_running and os.path.exists("/usr/share/cloudlinux/wpos/redis_reloader.py"): r = subprocess.run( ["/usr/share/cloudlinux/wpos/redis_reloader.py", user, "None"], capture_output=True, text=True, ) logging.info(f"Reloaded redis for user {user}, output: {r.stdout}") _regenerate_mounts_configuration(user, user_config) reload_processes_with_docroots(user, filter_by_docroots=[_get_docroot_or_none(domain)]) start_monitoring_service() def regenerate_isolation_configuration(user): user_config = config.load_user_config(user) _regenerate_mounts_configuration(user, user_config) document_roots = [] for domain in user_config.enabled_websites: document_root = _get_docroot_or_none(domain) if document_root is None: logging.warning( "Unable to find document root for domain %s, " "please contact CloudLinux support if the issue persists.", domain, ) continue document_roots.append(document_root) try: # recreate tokens and storage e.g. when username changes jail_utils.create_website_token_directory(user, document_root) jail_utils.create_overlay_storage_directory(user, document_root) except Exception as e: logging.error("Unable to recreate token/storage for domain=%s, Error=%s", domain, e) continue reload_processes_with_docroots(user, filter_by_docroots=document_roots) def _regenerate_mounts_configuration(user: str, user_config: config.UserConfig | None) -> None: jail_isolates_config_path = Path(jail_utils.get_jail_config_path(user)) if user_config is None or not user_config.enabled_websites: Path(jail_isolates_config_path).unlink(missing_ok=True) return jail_config = WebsiteMountsConfig(user) try: domain_to_docroot_map = dict(userdomains(user)) except NoPanelUser: logging.warning("Cannot regenerate mount configuration, no panel user=%s", user) return # add docroot informaton for isolations for docroot in domain_to_docroot_map.values(): jail_config.add_docroot(docroot) # add information about which websites should have isolation enabled for domain in user_config.enabled_websites: try: docroot = domain_to_docroot_map[domain] except KeyError: logging.warning("Docroot not found for domain %s", domain) continue jail_config.enable_isolation(docroot) result = jail_config.generate_mounts_conf() jail_isolates_config_path.parent.mkdir(exist_ok=True, mode=0o755) write_via_tmp(str(jail_isolates_config_path.parent), str(jail_isolates_config_path), result) def disable_website_isolation(user: str, domain: str | None = None): user_config = config.load_user_config(user) reload_docroots = None if domain is None: reload_docroots = [ _get_docroot_or_none(website) for website in user_config.enabled_websites ] user_config.enabled_websites = [] elif domain in user_config.enabled_websites: reload_docroots = [_get_docroot_or_none(domain)] user_config.enabled_websites.remove(domain) config.save_user_config(user, user_config) _regenerate_mounts_configuration(user, user_config) if reload_docroots: reload_processes_with_docroots(user, filter_by_docroots=reload_docroots) for document_root in reload_docroots: if document_root is None: continue jail_utils.remove_website_token_directory(user, document_root) # get actual docroots for all users with website isolation enabled users_with_isolation = users_with_enabled_domain_isolation() if not users_with_isolation: stop_monitoring_service()