File: //opt/imunify360/venv/share/imunify360/scripts/setup_cagefs.py
#!/opt/imunify360/venv/bin/python3
# coding: utf-8
"""
WARNING
cagefs --remount-all can cause high load on customers server and even
temporary outage (See DEF-9491)
Please only add anything to cagefs.mp only if absolutely necessary
"""
import logging
import os
import subprocess
import sys
from defence360agent import sentry
from defence360agent.subsys import clcagefs
CAGEFSMP = "/etc/cagefs/cagefs.mp"
CAGEFSCTL_TOOL = "/usr/sbin/cagefsctl"
ACTUAL_DIRS = {
b"/var/imunify360/files/sigs": {"added_by": "imunify360", "prefix": b"!"},
b"/etc/imunify360/user_config": {
"added_by": "imunify360",
"mode": 0o755,
# every user has his own isolated dir in this dir
"prefix": b"%",
},
b"/usr/share/imunify360/wp-plugins": {
"added_by": "imunify360",
"mode": 0o1755,
},
}
OBSOLETES_DIRS = [
b"/var/imunify360/malware/signatures",
b"/var/imunify360/scan_report",
b"/var/imunify360/web_quar",
b"/var/lib/clamav",
b"/var/run/defence360agent",
b"/var/run/imunify360_user",
]
def setup_logging(level=logging.INFO) -> logging.Logger:
"""Set up logging with Sentry integration"""
logger = logging.getLogger("setup-cagefs")
logger.setLevel(level)
handler = logging.StreamHandler(sys.stderr)
formatter = logging.Formatter("%(name)s: %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
sentry.configure_sentry()
return logger
def _cagefs_remountall(action_info):
def _read_mp_file():
try:
with open(CAGEFSMP) as f:
return f.read()
except FileNotFoundError:
print("%s file was not found", CAGEFSMP)
return ""
def decorator(fun):
def wrapper(*args, **kwargs):
before = _read_mp_file()
try:
return fun(*args, **kwargs)
finally:
after = _read_mp_file()
if before != after:
print(
"CageFS for Imunify has been %s, remounting..."
% action_info
)
try:
subprocess.call([CAGEFSCTL_TOOL, "--remount-all"])
except Exception as e:
print(
"Something went wrong while executing"
" --remount-all command: %s",
e,
)
else:
print("CageFS for Imunify: no update is required.")
return wrapper
return decorator
def _add_imunify360_dirs(logger):
for path, options in ACTUAL_DIRS.items():
path_str = path.decode("utf-8")
# Check if directory exists before adding to cagefs.mp
if not os.path.isdir(path_str):
logger.error(
"Cannot add %s to cagefs.mp: directory does not exist",
path_str
)
continue
try:
clcagefs.setup_mount_dir_cagefs(
path, remount_cagefs=False, **options
)
except (clcagefs.CagefsMpConflict, EnvironmentError) as e:
logger.error(
"Failed to setup CageFS with Imunify for path %s: %s",
path, e
)
@_cagefs_remountall(action_info="set up")
def _setup_cagefs(logger):
_add_imunify360_dirs(logger)
_remove_obsoleted(logger)
@_cagefs_remountall(action_info="reset")
def _revert_cagefs(logger):
# here we assume that OBSOLETES_DIRS have been already removed
# during the installation step
for path in ACTUAL_DIRS.keys():
try:
clcagefs.remove_mount_dir_cagefs(path, remount_cagefs=False)
except Exception as e:
logger.error(
"Error during removing %s from cagefs.mp: %s", path, e
)
def _remove_obsoleted(logger):
for path in OBSOLETES_DIRS:
try:
clcagefs.remove_mount_dir_cagefs(path, remount_cagefs=False)
except Exception as e:
logger.error(
"Error during removing %s from cagefs.mp: %s", path, e
)
def _main():
logger = setup_logging()
if sys.argv[-1] == "--revert":
_revert_cagefs(logger)
else:
_setup_cagefs(logger)
def _is_removemountdircagefs_supported():
return hasattr(clcagefs, "remove_mount_dir_cagefs")
if __name__ == "__main__":
if clcagefs.is_cagefs_present() and _is_removemountdircagefs_supported():
_main()