apply trust to declarative plugin config (#85126)

* trust strings in loaded doc fragments
* added tests
* added hard_fail_context test mechanism

Co-authored-by: Matt Clay <matt@mystile.com>
This commit is contained in:
Matt Davis 2025-05-09 12:33:33 -07:00 committed by GitHub
parent 8a4fb78988
commit 9efba4f972
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 52 additions and 2 deletions

View File

@ -0,0 +1,2 @@
bugfixes:
- plugin loader - Apply template trust to strings loaded from plugin configuration definitions and doc fragments.

View File

@ -0,0 +1,26 @@
"""
Testing utilities for use in integration tests, not unit tests or non-test code.
Provides better error behavior than Python's `assert` statement.
"""
from __future__ import annotations
import contextlib
import typing as t
class _Checker:
@staticmethod
def check(value: object, msg: str | None = 'Value is not truthy.') -> None:
"""Raise an `AssertionError` if the given `value` is not truthy."""
if not value:
raise AssertionError(msg)
@contextlib.contextmanager
def hard_fail_context(msg: str) -> t.Generator[_Checker]:
"""Enter a context which converts all exceptions to `BaseException` and provides a `Checker` instance for making assertions."""
try:
yield _Checker()
except BaseException as ex:
raise BaseException(f"Hard failure: {msg}") from ex

View File

@ -313,6 +313,7 @@ class InventoryManager(object):
ex.obj = origin
failures.append({'src': source, 'plugin': plugin_name, 'exc': ex})
except Exception as ex:
# DTFIX-RELEASE: fix this error handling to correctly deal with messaging
try:
# omit line number to prevent contextual display of script or possibly sensitive info
raise AnsibleError(str(ex), obj=origin) from ex

View File

@ -505,7 +505,8 @@ class PluginLoader:
# if type name != 'module_doc_fragment':
if type_name in C.CONFIGURABLE_PLUGINS and not C.config.has_configuration_definition(type_name, name):
documentation_source = getattr(module, 'DOCUMENTATION', '')
# trust-tagged source propagates to loaded values; expressions and templates in config require trust
documentation_source = _tags.TrustedAsTemplate().tag(getattr(module, 'DOCUMENTATION', ''))
try:
dstring = yaml.load(_tags.Origin(path=path).tag(documentation_source), Loader=AnsibleLoader)
except ParserError as e:

View File

@ -154,7 +154,8 @@ def add_fragments(doc, filename, fragment_loader, is_module=False):
unknown_fragments.append(fragment_slug)
continue
fragment_yaml = getattr(fragment_class, fragment_var, None)
# trust-tagged source propagates to loaded values; expressions and templates in config require trust
fragment_yaml = _tags.TrustedAsTemplate().tag(getattr(fragment_class, fragment_var, None))
if fragment_yaml is None:
if fragment_var != 'DOCUMENTATION':
# if it's asking for something specific that's missing, that's an error

View File

@ -0,0 +1,10 @@
from __future__ import annotations
class ModuleDocFragment:
DOCUMENTATION = """
options:
fragment_expression:
description: a fragment hosted expression that must be trusted whose default resolves to 4
default: 2 + 2
"""

View File

@ -8,12 +8,17 @@ DOCUMENTATION = """
options:
plugin:
description: the load name of the plugin
plugin_expression:
description: an expression that must be trusted whose default resolves to 2
default: 1 + 1
extends_documentation_fragment:
- constructed
- fragment_with_expression
"""
from ansible.errors import AnsibleParserError
from ansible.plugins.inventory import BaseInventoryPlugin, Constructable
from ansible._internal import _testing
class InventoryModule(BaseInventoryPlugin, Constructable):
@ -27,6 +32,10 @@ class InventoryModule(BaseInventoryPlugin, Constructable):
super(InventoryModule, self).parse(inventory, loader, path, cache)
config = self._read_config_data(path)
with _testing.hard_fail_context("ensure config defaults are trusted and runnable as expressions") as ctx:
ctx.check(self._compose(self.get_option('plugin_expression'), variables={}) == 2)
ctx.check(self._compose(self.get_option('fragment_expression'), variables={}) == 4)
strict = self.get_option('strict')
try:
for host in inventory.hosts: