mirror of https://github.com/sudo-project/sudo.git
173 lines
6.0 KiB
Python
173 lines
6.0 KiB
Python
import sudo
|
|
|
|
import errno
|
|
import sys
|
|
import os
|
|
import pwd
|
|
import grp
|
|
import shutil
|
|
|
|
|
|
VERSION = 1.0
|
|
|
|
|
|
class SudoPolicyPlugin(sudo.Plugin):
|
|
"""Example sudo policy plugin
|
|
|
|
Demonstrates how to use the sudo policy plugin API. All functions are added
|
|
as an example on their syntax, but note that most of them are optional
|
|
(except check_policy).
|
|
|
|
On detailed description of the functions refer to sudo_plugin manual (man
|
|
sudo_plugin).
|
|
|
|
Most functions can express error or reject through their "int" return value
|
|
as documented in the manual. The sudo module also has constants for these:
|
|
sudo.RC.ACCEPT / sudo.RC.OK 1
|
|
sudo.RC.REJECT 0
|
|
sudo.RC.ERROR -1
|
|
sudo.RC.USAGE_ERROR -2
|
|
|
|
If the plugin encounters an error, instead of just returning sudo.RC.ERROR
|
|
result code it can also add a message describing the problem.
|
|
This can be done by raising the special exception:
|
|
raise sudo.PluginError("Message")
|
|
This added message will be used by the audit plugins.
|
|
|
|
If the function returns "None" (for example does not call return), it will
|
|
be considered sudo.RC.OK. If an exception other than sudo.PluginError is
|
|
raised, its backtrace will be shown to the user and the plugin function
|
|
returns sudo.RC.ERROR. If that is not acceptable, catch it.
|
|
"""
|
|
|
|
_allowed_commands = ("id", "whoami")
|
|
_safe_password = "12345"
|
|
|
|
# -- Plugin API functions --
|
|
|
|
def __init__(self, user_env: tuple, settings: tuple,
|
|
version: str, **kwargs):
|
|
"""The constructor matches the C sudo plugin API open() call
|
|
|
|
Other variables you can currently use as arguments are:
|
|
user_info: tuple
|
|
plugin_options: tuple
|
|
|
|
For their detailed description, see the open() call of the C plugin API
|
|
in the sudo manual ("man sudo").
|
|
"""
|
|
if not version.startswith("1."):
|
|
raise sudo.PluginError(
|
|
"This plugin is not compatible with python plugin"
|
|
"API version {}".format(version))
|
|
|
|
self.user_env = sudo.options_as_dict(user_env)
|
|
self.settings = sudo.options_as_dict(settings)
|
|
|
|
def check_policy(self, argv: tuple, env_add: tuple):
|
|
cmd = argv[0]
|
|
# Example for a simple reject:
|
|
if not self._is_command_allowed(cmd):
|
|
sudo.log_error("You are not allowed to run this command!")
|
|
return sudo.RC.REJECT
|
|
|
|
raise sudo.PluginError("You are not allowed to run this command!")
|
|
|
|
# The environment the command will be executed with (we allow any here)
|
|
user_env_out = sudo.options_from_dict(self.user_env) + env_add
|
|
|
|
command_info_out = sudo.options_from_dict({
|
|
"command": self._find_on_path(cmd), # Absolute path of command
|
|
"runas_uid": self._runas_uid(), # The user id
|
|
"runas_gid": self._runas_gid(), # The group id
|
|
})
|
|
|
|
return (sudo.RC.ACCEPT, command_info_out, argv, user_env_out)
|
|
|
|
def init_session(self, user_pwd: tuple, user_env: tuple):
|
|
"""Perform session setup
|
|
|
|
Beware that user_pwd can be None if user is not present in the password
|
|
database. Otherwise it is a tuple convertible to pwd.struct_passwd.
|
|
"""
|
|
# conversion example:
|
|
user_pwd = pwd.struct_passwd(user_pwd) if user_pwd else None
|
|
|
|
# This is how you change the user_env:
|
|
return (sudo.RC.OK, user_env + ("PLUGIN_EXAMPLE_ENV=1",))
|
|
|
|
# If you do not want to change user_env, you can just return (or None):
|
|
# return sudo.RC.OK
|
|
|
|
def list(self, argv: tuple, is_verbose: int, user: str):
|
|
cmd = argv[0] if argv else None
|
|
as_user_text = "as user '{}'".format(user) if user else ""
|
|
|
|
if cmd:
|
|
allowed_text = "" if self._is_command_allowed(cmd) else "NOT "
|
|
sudo.log_info("You are {}allowed to execute command '{}'{}"
|
|
.format(allowed_text, cmd, as_user_text))
|
|
|
|
if not cmd or is_verbose:
|
|
sudo.log_info("Only the following commands are allowed:",
|
|
", ".join(self._allowed_commands), as_user_text)
|
|
|
|
def validate(self):
|
|
pass # we have no cache
|
|
|
|
def invalidate(self, remove: int):
|
|
pass # we have no cache
|
|
|
|
def show_version(self, is_verbose: int):
|
|
sudo.log_info("Python Example Policy Plugin "
|
|
"version: {}".format(VERSION))
|
|
if is_verbose:
|
|
sudo.log_info("Python interpreter version:", sys.version)
|
|
|
|
def close(self, exit_status: int, error: int) -> None:
|
|
if error == 0:
|
|
sudo.log_info("The command returned with exit_status {}".format(
|
|
exit_status))
|
|
else:
|
|
error_name = errno.errorcode.get(error, "???")
|
|
sudo.log_error(
|
|
"Failed to execute command, execve syscall returned "
|
|
"{} ({})".format(error, error_name))
|
|
|
|
# -- Helper functions --
|
|
|
|
def _is_command_allowed(self, cmd):
|
|
return os.path.basename(cmd) in self._allowed_commands
|
|
|
|
def _find_on_path(self, cmd):
|
|
if os.path.isabs(cmd):
|
|
return cmd
|
|
|
|
path = self.user_env.get("PATH", "/usr/bin:/bin")
|
|
absolute_cmd = shutil.which(cmd, path=path)
|
|
if not absolute_cmd:
|
|
raise sudo.PluginError("Can not find cmd '{}' on PATH".format(cmd))
|
|
return absolute_cmd
|
|
|
|
def _runas_pwd(self):
|
|
runas_user = self.settings.get("runas_user") or "root"
|
|
try:
|
|
return pwd.getpwnam(runas_user)
|
|
except KeyError:
|
|
raise sudo.PluginError("Could not find user "
|
|
"'{}'".format(runas_user))
|
|
|
|
def _runas_uid(self):
|
|
return self._runas_pwd().pw_uid
|
|
|
|
def _runas_gid(self):
|
|
runas_group = self.settings.get("runas_group")
|
|
if runas_group is None:
|
|
return self._runas_pwd().pw_gid
|
|
|
|
try:
|
|
return grp.getgrnam(runas_group).gr_gid
|
|
except KeyError:
|
|
raise sudo.PluginError(
|
|
"Could not find group '{}'".format(runas_group))
|