D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
cloudlinux
/
venv
/
lib64
/
python3.11
/
site-packages
/
clcagefslib
/
webisolation
/
crontab
/
Filename :
libhooks.py
back
Copy
# -*- coding: utf-8 -*- # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2025 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # """Hook integration functions for crontab operations.""" import logging import shlex import subprocess from clcommon.cpapi import userdomains from .constants import ISOLATION_WRAPPER from .parser import parse_crontab_structure, write_crontab_structure from .structure import ParsedCrontabLine CRONTAB_BIN = "/usr/bin/crontab" logger = logging.getLogger(__name__) def migrate_crontab_docroot( username: str, old_docroot: str, new_docroot: str, ) -> bool: """ Migrate crontab entries from old document root to new document root. When a document root path changes (e.g., domain rename, docroot move), this function updates all crontab entries that reference the old path to use the new path instead. Args: username: Unix username whose crontab to migrate old_docroot: Previous document root path new_docroot: New document root path Returns: bool: True if migration was successful, False if no changes needed or error Raises: Exception: If crontab operations fail critically """ # Get current crontab for the user list_result = subprocess.run( [CRONTAB_BIN, "-u", username, "-l"], capture_output=True, ) # No existing crontab - nothing to migrate if list_result.returncode != 0: logger.debug( "No crontab to migrate for user %s (returncode: %d)", username, list_result.returncode, ) return False existing_data = list_result.stdout if not existing_data: logger.debug("Empty crontab for user %s, nothing to migrate", username) return False # Parse existing crontab into structure structure = parse_crontab_structure(existing_data) # Check if old docroot section exists if old_docroot not in structure.docroot_sections: logger.debug( "No crontab entries found for old docroot %s for user %s", old_docroot, username, ) return False # Migrate: move entries from old docroot to new docroot old_entries = structure.docroot_sections.pop(old_docroot) # Update wrapper commands in the entries to use new docroot migrated_entries = [] for entry in old_entries: if isinstance(entry, ParsedCrontabLine): # Get the clean command (without wrapper) clean_command = entry.get_clean_command() has_newline = clean_command.endswith(b"\n") command_str = clean_command.rstrip(b"\n").decode("utf-8", errors="replace") # Rebuild with new docroot quoted_command = shlex.quote(command_str) prefix_parts = shlex.join([ISOLATION_WRAPPER, new_docroot, "bash", "-c"]) wrapped_command_str = f"{prefix_parts} {quoted_command}" wrapped_command = wrapped_command_str.encode("utf-8") if has_newline: wrapped_command += b"\n" migrated_entries.append( ParsedCrontabLine(schedule=entry.schedule, command=wrapped_command) ) else: # Keep comments as-is migrated_entries.append(entry) # Add migrated entries to new docroot section (merge with existing if any) if new_docroot in structure.docroot_sections: structure.docroot_sections[new_docroot].extend(migrated_entries) else: structure.docroot_sections[new_docroot] = migrated_entries # Write updated crontab modified_data = write_crontab_structure(structure) save_result = subprocess.run( [CRONTAB_BIN, "-u", username, "-"], input=modified_data, capture_output=True, ) if save_result.returncode != 0: logger.error( "Failed to save migrated crontab for user %s: %s", username, save_result.stderr.decode("utf-8", errors="replace"), ) return False logger.info( "Successfully migrated crontab entries for user %s from %s to %s", username, old_docroot, new_docroot, ) return True def cleanup_stale_crontab_sections( username: str, ) -> bool: """ Remove crontab sections for docroots that no longer exist. After a domain is deleted, this function cleans up crontab entries for docroots that are no longer valid (not in the current domain list). Args: username: Unix username whose crontab to clean up Returns: bool: True if cleanup was performed, False if no changes needed Raises: Exception: If crontab operations fail critically """ # Get current valid docroots from userdomains valid_docroots = {docroot for _, docroot in userdomains(username)} # Get current crontab for the user list_result = subprocess.run( [CRONTAB_BIN, "-u", username, "-l"], capture_output=True, ) # No existing crontab - nothing to clean if list_result.returncode != 0: logger.debug( "No crontab to clean for user %s (returncode: %d)", username, list_result.returncode, ) return False existing_data = list_result.stdout if not existing_data: logger.debug("Empty crontab for user %s, nothing to clean", username) return False structure = parse_crontab_structure(existing_data) stale_docroots = [ docroot for docroot in structure.docroot_sections.keys() if docroot not in valid_docroots ] if not stale_docroots: logger.debug( "No stale crontab sections found for user %s", username, ) return False for stale_docroot in stale_docroots: logger.info( "Removing stale crontab section for user %s, docroot: %s", username, stale_docroot, ) del structure.docroot_sections[stale_docroot] modified_data = write_crontab_structure(structure) save_result = subprocess.run( [CRONTAB_BIN, "-u", username, "-"], input=modified_data, capture_output=True, ) if save_result.returncode != 0: logger.error( "Failed to save cleaned crontab for user %s: %s", username, save_result.stderr.decode("utf-8", errors="replace"), ) return False logger.info( "Successfully cleaned up %d stale crontab section(s) for user %s: %s", len(stale_docroots), username, ", ".join(stale_docroots), ) return True