sudo/plugins/python/example_policy_plugin.py

173 lines
6.0 KiB
Python

import sudo
import errno
import sys
import os
import pwd
import grp
import shutil
VERSION = 1.0
class SudoPolicyPlugin(sudo.Plugin):
"""Example sudo policy plugin
Demonstrates how to use the sudo policy plugin API. All functions are added
as an example on their syntax, but note that most of them are optional
(except check_policy).
On detailed description of the functions refer to sudo_plugin manual (man
sudo_plugin).
Most functions can express error or reject through their "int" return value
as documented in the manual. The sudo module also has constants for these:
sudo.RC.ACCEPT / sudo.RC.OK 1
sudo.RC.REJECT 0
sudo.RC.ERROR -1
sudo.RC.USAGE_ERROR -2
If the plugin encounters an error, instead of just returning sudo.RC.ERROR
result code it can also add a message describing the problem.
This can be done by raising the special exception:
raise sudo.PluginError("Message")
This added message will be used by the audit plugins.
If the function returns "None" (for example does not call return), it will
be considered sudo.RC.OK. If an exception other than sudo.PluginError is
raised, its backtrace will be shown to the user and the plugin function
returns sudo.RC.ERROR. If that is not acceptable, catch it.
"""
_allowed_commands = ("id", "whoami")
_safe_password = "12345"
# -- Plugin API functions --
def __init__(self, user_env: tuple, settings: tuple,
version: str, **kwargs):
"""The constructor matches the C sudo plugin API open() call
Other variables you can currently use as arguments are:
user_info: tuple
plugin_options: tuple
For their detailed description, see the open() call of the C plugin API
in the sudo manual ("man sudo").
"""
if not version.startswith("1."):
raise sudo.PluginError(
"This plugin is not compatible with python plugin"
"API version {}".format(version))
self.user_env = sudo.options_as_dict(user_env)
self.settings = sudo.options_as_dict(settings)
def check_policy(self, argv: tuple, env_add: tuple):
cmd = argv[0]
# Example for a simple reject:
if not self._is_command_allowed(cmd):
sudo.log_error("You are not allowed to run this command!")
return sudo.RC.REJECT
raise sudo.PluginError("You are not allowed to run this command!")
# The environment the command will be executed with (we allow any here)
user_env_out = sudo.options_from_dict(self.user_env) + env_add
command_info_out = sudo.options_from_dict({
"command": self._find_on_path(cmd), # Absolute path of command
"runas_uid": self._runas_uid(), # The user id
"runas_gid": self._runas_gid(), # The group id
})
return (sudo.RC.ACCEPT, command_info_out, argv, user_env_out)
def init_session(self, user_pwd: tuple, user_env: tuple):
"""Perform session setup
Beware that user_pwd can be None if user is not present in the password
database. Otherwise it is a tuple convertible to pwd.struct_passwd.
"""
# conversion example:
user_pwd = pwd.struct_passwd(user_pwd) if user_pwd else None
# This is how you change the user_env:
return (sudo.RC.OK, user_env + ("PLUGIN_EXAMPLE_ENV=1",))
# If you do not want to change user_env, you can just return (or None):
# return sudo.RC.OK
def list(self, argv: tuple, is_verbose: int, user: str):
cmd = argv[0] if argv else None
as_user_text = "as user '{}'".format(user) if user else ""
if cmd:
allowed_text = "" if self._is_command_allowed(cmd) else "NOT "
sudo.log_info("You are {}allowed to execute command '{}'{}"
.format(allowed_text, cmd, as_user_text))
if not cmd or is_verbose:
sudo.log_info("Only the following commands are allowed:",
", ".join(self._allowed_commands), as_user_text)
def validate(self):
pass # we have no cache
def invalidate(self, remove: int):
pass # we have no cache
def show_version(self, is_verbose: int):
sudo.log_info("Python Example Policy Plugin "
"version: {}".format(VERSION))
if is_verbose:
sudo.log_info("Python interpreter version:", sys.version)
def close(self, exit_status: int, error: int) -> None:
if error == 0:
sudo.log_info("The command returned with exit_status {}".format(
exit_status))
else:
error_name = errno.errorcode.get(error, "???")
sudo.log_error(
"Failed to execute command, execve syscall returned "
"{} ({})".format(error, error_name))
# -- Helper functions --
def _is_command_allowed(self, cmd):
return os.path.basename(cmd) in self._allowed_commands
def _find_on_path(self, cmd):
if os.path.isabs(cmd):
return cmd
path = self.user_env.get("PATH", "/usr/bin:/bin")
absolute_cmd = shutil.which(cmd, path=path)
if not absolute_cmd:
raise sudo.PluginError("Can not find cmd '{}' on PATH".format(cmd))
return absolute_cmd
def _runas_pwd(self):
runas_user = self.settings.get("runas_user") or "root"
try:
return pwd.getpwnam(runas_user)
except KeyError:
raise sudo.PluginError("Could not find user "
"'{}'".format(runas_user))
def _runas_uid(self):
return self._runas_pwd().pw_uid
def _runas_gid(self):
runas_group = self.settings.get("runas_group")
if runas_group is None:
return self._runas_pwd().pw_gid
try:
return grp.getgrnam(runas_group).gr_gid
except KeyError:
raise sudo.PluginError(
"Could not find group '{}'".format(runas_group))