File: //proc/self/root/opt/cloudlinux/venv/lib/python3.11/site-packages/clcommon/clcagefs.py
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
#
import logging
import os
import re
import subprocess
import sys
CAGEFS_MP_FILENAME = "/etc/cagefs/cagefs.mp"
CAGEFSCTL_TOOL = "/usr/sbin/cagefsctl"
CAGEFS_SKELETON_PATH = "/usr/share/cagefs-skeleton"
CAGEFS_DIR_PATH = "/var/.cagefs"
logger = logging.getLogger(__name__)
class CagefsMpConflict(Exception):
    def __init__(self, new_item, existing_item):
        self._msg = (
            f"Conflict in adding '{new_item}' to {CAGEFS_MP_FILENAME} "
            f"because of pre-existing alternative specification: '{existing_item}'"
        )
    def __str__(self):
        return self._msg
class CagefsNotSupportedError(Exception):
    """Cagefs Not Supported Exception"""
    def __init__(self, message):
        Exception.__init__(self, message)
class CagefsMpItem:
    PREFIX_LIST = "@!%*"
    _PREFIX_MOUNT_RW = ""
    _PREFIX_MOUNT_RO = "!"
    def __init__(self, arg):
        """Constructor
        :param arg: Is either path to add to cagefs.mp or a raw line is read from cagefs.mp
        :param prefix: The same as adding prefix '!' to arg before passing it to ctor"""
        if arg[:1] == "#":  # is a comment? then init as dummy
            self._path_spec = None
        elif arg.strip() == "":  # init as dummy for empty lines
            self._path_spec = None
        else:
            self._path_spec = arg
    def mode(self, mode):
        """Specify mode as in fluent constructor"""
        if self.prefix() == "@" and mode is not None:
            self._path_spec = f"{self._path_spec},{mode:03o}"
        return self
    def __str__(self):
        return self._path_spec
    @staticmethod
    def _add_slash(path):
        if path == "":
            return "/"
        if path[-1] != "/":
            return path + "/"
        return path
    def pre_exist_in(self, another):
        adopted = CagefsMpItem._adopt(another)
        # overkill: just to keep strictly to comparing NULL objects principle
        if self.is_dummy() or adopted.is_dummy():
            return False
        this_path = CagefsMpItem._add_slash(self.path())
        test_preexist_in_path = CagefsMpItem._add_slash(adopted.path())
        return this_path.startswith(test_preexist_in_path)
    def is_compatible_by_prefix_with(self, existing):
        adopted = CagefsMpItem._adopt(existing)
        # overkill: just to keep strictly to comparing NULL objects principle
        if self.is_dummy() or adopted.is_dummy():
            return False
        if self.prefix() == adopted.prefix():
            return True
        prefix_compatibility_map = {CagefsMpItem._PREFIX_MOUNT_RW: [CagefsMpItem._PREFIX_MOUNT_RO]}
        null_options = []
        return self.prefix() in prefix_compatibility_map.get(adopted.prefix(), null_options)
    def is_dummy(self):
        return self._path_spec is None
    @staticmethod
    def _adopt(x):
        if isinstance(x, CagefsMpItem):
            return x
        else:
            return CagefsMpItem(x)
    @staticmethod
    def _cut_off_mode(path_spec):
        """Cut off mode from path spec like @/var/run/screen,777
        Only one comma per path spec is allowed ;-)"""
        return path_spec.split(",")[0]
    @staticmethod
    def _cut_off_prefix(path_spec):
        return path_spec.lstrip(CagefsMpItem.PREFIX_LIST)
    def path(self):
        return CagefsMpItem._cut_off_prefix(CagefsMpItem._cut_off_mode(self._path_spec))
    def prefix(self):
        if self._path_spec != self.path():
            return self._path_spec[0]
        else:
            return ""
def is_cagefs_present():
    return os.path.exists(CAGEFSCTL_TOOL)
def _mk_mount_dir_setup_perm(path, mode=0o755, owner_id=None, group_id=None):
    # -1 means 'unchanged'
    if group_id is None:
        group_id = -1
    if owner_id is None:
        owner_id = -1
    if not os.path.isdir(path):
        os.mkdir(path)
    if mode is not None:
        os.chmod(path, mode)
    os.chown(path, owner_id, group_id)
def _remount_cagefs(user=None, remount_in_background=False):
    if not is_cagefs_present():
        return
    if user is None:
        command = [CAGEFSCTL_TOOL, "--wait-lock", "--remount-all"]
    else:
        command = [CAGEFSCTL_TOOL, "--remount", user]
    if remount_in_background:
        subprocess.Popen(  # pylint: disable=consider-using-with
            command,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
        )
    else:
        subprocess.run(
            command,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
            check=False,
        )
def setup_mount_dir_cagefs(
    path,
    added_by=None,
    mode=0o755,
    owner_id=None,
    group_id=None,
    prefix="",
    remount_cagefs=False,
    remount_in_background=False,
):
    """
    Add mount point to /etc/cagefs/cagefs.mp
    :param path: Directory path to be added in cagefs.mp and mounted
                 from within setup_mount_dir_cagefs().
                 If this directory does not exist, then it is created.
    :param added_by: package or component, mount dir relates to, or whatever will
                     stay in cagefs.mp with "# added by..." comment
    :param mode: If is not None: Regardless of whether directory exists or not prior this call,
                 it's permissions will be set to mode.
    :param owner_id: Regardless of whether directory exists or not prior this call,
                     it's owner id will be set to.
                     If None, the owner won't be changed.
    :param group_id: Regardless of whether directory exists or not prior this call,
                     it's group id will be set to.
                     If None, the group won't be changed.
    :param prefix: Mount point prefix. Default is mount as RW.
                   Pass '!' to add read-only mount point.
                   Refer CageFS section at http://docs.cloudlinux.com/ for more options.
    :param remount_cagefs: If True, cagefs skeleton will be automatically
                           remounted to apply changes.
    :param remount_in_background: If True, cagefs remount will be done in separate
                    background process, without waiting for completion
    :returns: None
    Propagates native EnvironmentError if no CageFS installed or something else goes wrong.
    Raises CagefsMpConflict if path is already specified in cagefs.mp, but in a way which is opposite
    to mount_as_readonly param.
    """
    _mk_mount_dir_setup_perm(path, mode, owner_id, group_id)
    if not is_cagefs_present():
        return
    # Create cagefs.mp if absent. It will be merged when cagefsctl --init.
    if not os.path.exists(CAGEFS_MP_FILENAME):
        subprocess.run(
            [CAGEFSCTL_TOOL, "--create-mp"],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
            check=False,
        )
    add_new_line_to_cagefs_mp()
    # ^^
    # Hereafter we will not care if there was
    # 'no newline at the end of file'
    with open(CAGEFS_MP_FILENAME, "r+", encoding="utf-8") as f:
        new_item = CagefsMpItem(prefix + path).mode(mode)
        trim_nl_iter = (file_line.rstrip() for file_line in f)
        pre_exist_option = [x for x in trim_nl_iter if new_item.pre_exist_in(x)]
        if not pre_exist_option:
            f.seek(0, 2)  # 2: seek to the end of file
            if added_by is not None:
                # no newline is allowed
                added_by = added_by.strip()
                print("# next line is added by ", added_by, file=f)
            print(new_item, file=f)
            if remount_cagefs:
                _remount_cagefs(remount_in_background=remount_in_background)
        elif not new_item.is_compatible_by_prefix_with(pre_exist_option[-1]):
            raise CagefsMpConflict(new_item, pre_exist_option[-1])
def _get_cagefs_mp_lines():
    with open(CAGEFS_MP_FILENAME, "r", encoding="utf-8") as f:
        return f.readlines()
def _write_cagefs_mp_lines(lines):
    with open(CAGEFS_MP_FILENAME, "w", encoding="utf-8") as f:
        return f.writelines(lines)
# next line is added by
def add_new_line_to_cagefs_mp():
    """
    Add new line to the end of /etc/cagefs/cagefs.mp file if it is not there
    """
    lines = _get_cagefs_mp_lines()
    # file is not empty and last line in the file does not end with new line symbol ?
    if lines and lines[0] != "" and lines[-1][-1] != "\n":
        lines[-1] += "\n"
        _write_cagefs_mp_lines(lines)
def remove_mount_dir_cagefs(path, remount_cagefs=False, remount_in_background=False):
    """
    Remove mount points matching given path from cagefs.mp file
    :param str path: Path that should be removed from file.
    :param bool remount_cagefs: Remount cagefs skeleton or not
    :param remount_in_background: If True, cagefs remount will be done in separate
           background process, without waiting for completion
    :return: Nothing
    """
    lines = _get_cagefs_mp_lines()
    r = re.compile(rf"^[{CagefsMpItem.PREFIX_LIST}]?{re.escape(path)}(,\d+)?$")
    lines_with_excluded_path = [line for line in lines if not r.match(line)]
    # nothing to do if nothing found by pattern
    if len(lines) == len(lines_with_excluded_path):
        return
    _write_cagefs_mp_lines(lines_with_excluded_path)
    if remount_cagefs:
        _remount_cagefs(remount_in_background=remount_in_background)
def _is_directory_mounted(path: str) -> bool:
    """Check the fact that directory is mounted i.e. listed in mountinfo"""
    with open("/proc/self/mountinfo", encoding="utf-8") as mountinfo_file:
        for line in mountinfo_file:
            mountinfo_entry_parts = line.split(maxsplit=5)
            # See the following link about /proc/<pid>/mountinfo format:
            # https://docs.kernel.org/filesystems/proc.html#proc-pid-mountinfo-information-about-mounts
            mount_point = mountinfo_entry_parts[4]
            if mount_point == path:
                return True
    return False
def in_cagefs() -> bool:
    """Recommended way to check whether process is running inside CageFS
    Please avoid testing for /var/.cagefs existence.
    Replace with this function call when possible.
    """
    in_cagefs_ = _is_directory_mounted(CAGEFS_DIR_PATH)
    if not in_cagefs_ and os.path.isdir(CAGEFS_DIR_PATH):
        logger.warning("%s should not exist in the real file system", CAGEFS_DIR_PATH)
    return in_cagefs_
def _is_cagefs_enabled(user):
    """
    Check that cagefs enabled for user
    """
    try:
        cagefs_lib_dir = "/usr/share/cagefs/"
        if cagefs_lib_dir not in sys.path:
            sys.path.append(cagefs_lib_dir)
        import cagefsctl  # pylint: disable=import-outside-toplevel
    except ImportError:
        return False
    try:
        if not cagefsctl.is_user_enabled(user):
            return False
    except AttributeError as e:
        raise CagefsNotSupportedError("ERROR: CageFS version is unsupported. Please update CageFS.") from e
    return True