Source code for craft_parts.utils.maven.common

# Copyright 2025 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

"""Utilities for Maven projects and settings."""

from __future__ import annotations

import logging
import os
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, cast
from urllib.parse import urlparse

from lxml import etree
from semver import Version
from typing_extensions import Self, override

from ._xml import (
    CRAFT_REPO_TEMPLATE,
    DISTRIBUTION_REPO_TEMPLATE,
    LOCAL_REPO_TEMPLATE,
    MIRROR_REPO,
    PLUGIN_TEMPLATE,
    PROXIES_TEMPLATE,
    PROXY_CREDENTIALS_TEMPLATE,
    PROXY_TEMPLATE,
    SETTINGS_TEMPLATE,
)

if TYPE_CHECKING:
    from craft_parts.infos import PartInfo

logger = logging.getLogger(__name__)

ArtifactDict = dict[str, set["MavenArtifact"]]
GroupDict = dict[str, ArtifactDict]
Namespaces = dict[str | None, str]

# Created so these lint silencers don't need to be repeated every single time.
# lxml's `Element` is actually a free-form function that returns the private
# `_Element` class, so for typing's sake, a "private" type needs to be referenced.
Element = etree._Element  # type: ignore[reportPrivateUsage] # noqa: SLF001

_XML_PARSER = etree.XMLParser(
    # Attempt to recover if an error is encountered
    recover=True,
    # Entities can resolve into zip-bomb-like packages, so ignore them
    resolve_entities=False,
    # Removing blank text allows the `pretty_print` kwarg to work later
    remove_blank_text=True,
    # Just no
    no_network=True,
    remove_comments=False,
)


[docs] def create_maven_settings(*, part_info: PartInfo, set_mirror: bool) -> Path: """Create a Maven configuration file. The settings file contains additional configuration for Maven, such as proxy parameters and a reference to the backstage repository. :param part_info: The part info for the part invoking Maven. :param set_mirror: Whether to configure for a local build. :return: Returns a Path object to the newly created settings file. """ settings_path = part_info.part_build_subdir / ".parts/.m2/settings.xml" settings_path.parent.mkdir(parents=True, exist_ok=True) proxies_element = _get_proxy_config() if _needs_proxy_config() else "" # Use artifacts generated by other parts, if they exist. backstage_repo = cast("Path", part_info.backstage_dir) / "maven-use" if backstage_repo.is_dir(): # This is the shared repository in the backstage craft_element = CRAFT_REPO_TEMPLATE.format(repo_uri=backstage_repo.as_uri()) else: craft_element = "" # The "cache" dir local to the maven run local_repo = part_info.part_build_subdir / ".parts/.m2/repository" local_element = LOCAL_REPO_TEMPLATE.format(repo_dir=local_repo) mirror_element = MIRROR_REPO if set_mirror else "" settings_xml = SETTINGS_TEMPLATE.format( local_repository_element=local_element, craft_repository_element=craft_element, mirror_repository_element=mirror_element, proxies_element=proxies_element, ) settings_path.write_text(settings_xml) return settings_path
def _get_proxy_config() -> str: """Generate an XML string for proxy configurations. Reads the environment for information on desired proxy settings and transforms those variables into Maven XML settings entries. """ # Transform all environment variables to their lowercase form to support HTTPS_PROXY # vs. https_proxy and such case_insensitive_env = {item[0].lower(): item[1] for item in os.environ.items()} proxies: list[str] = [] for protocol in ["http", "https"]: env_name = f"{protocol}_proxy" if env_name not in case_insensitive_env: continue proxy_url = urlparse(case_insensitive_env[env_name]) if proxy_url.username is not None and proxy_url.password is not None: credentials = PROXY_CREDENTIALS_TEMPLATE.format( username=proxy_url.username, password=proxy_url.password ) else: credentials = "" proxy_element = PROXY_TEMPLATE.format( id=env_name, protocol=protocol, host=proxy_url.hostname, port=proxy_url.port, credentials=credentials, non_proxy_hosts=_get_no_proxy_string(), ) proxies.append(proxy_element) return PROXIES_TEMPLATE.format(proxies="\n".join(proxies)) def _needs_proxy_config() -> bool: """Determine whether or not proxy configuration is necessary for Maven.""" proxy_vars = ["http_proxy", "https_proxy", "HTTP_PROXY", "HTTPS_PROXY"] return any(key in os.environ for key in proxy_vars) def _get_no_proxy_string() -> str: no_proxy = [k.strip() for k in os.environ.get("no_proxy", "localhost").split(",")] return "|".join(no_proxy)
[docs] def update_pom( *, part_info: PartInfo, deploy_to: Path | None, self_contained: bool, pom_file: Path | None = None, ) -> None: """Update the POM file of a Maven project. :param part_info: Information about the invoking part. :param deploy_to: The path to configure the `mvn deploy` location. If None, no path is configured. :param self_contained: Whether or not to patch version numbers with what is actually available. :param pom_file: The optional Maven POM file to update. If ``None``, the function will try to use ``pom.xml`` on the part's build subdir. """ existing = _get_existing_artifacts(part_info) poms = _get_poms(pom_file, part_info, existing) for pom in poms: tree = etree.parse(pom, parser=_XML_PARSER) project = tree.getroot() namespaces = _get_namespaces(project) if deploy_to is not None: # Add a distributionManagement element, to tell "maven deploy" to deploy the # artifacts (jars, poms, etc) to the export dir. distribution_dir = deploy_to / "maven-use" distribution_element = etree.fromstring( DISTRIBUTION_REPO_TEMPLATE.format(repo_uri=distribution_dir.as_uri()), parser=_XML_PARSER, ) # Remove the existing distributionManagement tag if present. # This is okay because we only need to "distribute" to the backstage directory, # so any other project-specific config is irrelevant to this build. if ( existing_distmgmt := project.find("distributionManagement", namespaces) ) is not None: project.remove(existing_distmgmt) project.append(distribution_element) if self_contained: MavenArtifact.update_versions(project, namespaces, existing) MavenPlugin.update_versions(project, namespaces, existing) MavenParent.update_versions(project, namespaces, existing) # Add a comment to record the fact that this was modified by use comment = etree.Comment("This project was modified by craft-parts") project.insert(0, comment) tree.write(pom, pretty_print=True)
[docs] @dataclass(frozen=True) class MavenArtifact: """A dataclass for Maven artifacts.""" group_id: str artifact_id: str version: str | None packaging_type: str | None field_name: str = "Dependency"
[docs] @classmethod def from_element(cls, element: Element, namespaces: Namespaces) -> Self: """Create a MavenArtifact from an XML artifact element.""" # We can always just set the version if it's missing, so don't raise try: version = _get_element_text(_find_element(element, "version", namespaces)) except MavenXMLError: version = None # Attempt to read a packaging type try: packaging = _get_element_text( _find_element(element, "packaging", namespaces) ) except MavenXMLError: packaging = None try: group_id = _get_element_text(_find_element(element, "groupId", namespaces)) except MavenXMLError: # Attempt to recover by retrieving the groupId from a parent element. # If the parent is malformed, this raises a different error blaming the parent instead. parent = cls._get_parent(element, namespaces) if parent: group_id = parent.group_id else: raise artifact_id = _get_element_text( _find_element(element, "artifactId", namespaces) ) return cls(group_id, artifact_id, version, packaging)
[docs] @classmethod def from_pom(cls, pom: Path) -> Self: """Create a MavenArtifact from a pom file.""" tree = etree.parse(pom, parser=_XML_PARSER) project = tree.getroot() namespaces = _get_namespaces(project) return cls.from_element(project, namespaces)
@classmethod def _collect_elements( cls, project: Element, namespaces: Namespaces ) -> list[Element]: dependencies = project.find("dependencies", namespaces) if dependencies is None: return [] return dependencies.findall("dependency", namespaces)
[docs] @classmethod def update_versions( cls, project: Element, namespaces: Namespaces, existing: GroupDict ) -> None: """Update all of the versions for this project as necessary.""" for dependency in cls._collect_elements(project, namespaces): dep = cls.from_element(dependency, namespaces) if versions := _get_available_versions(existing, dep): _set_version(dependency, namespaces, versions) else: logger.debug( f"{cls.field_name} {dep.artifact_id} has no available version, skipping." )
@classmethod def _get_parent( cls, project: Element, namespaces: Namespaces ) -> MavenParent | None: try: parent_ele = _find_element(project, "parent", namespaces) except MavenXMLError: return None return MavenParent.from_element(parent_ele, namespaces)
[docs] @dataclass(frozen=True) class MavenParent(MavenArtifact): """A dataclass for the Maven parent tag.""" field_name: str = "Parent" @classmethod @override def _collect_elements( cls, project: Element, namespaces: Namespaces ) -> list[Element]: parent = project.find("parent", namespaces) if parent is None: return [] return [parent]
[docs] @dataclass(frozen=True) class MavenPlugin(MavenArtifact): """A dataclass for Maven plugins. These are different because plugins have a default groupId. """ field_name: str = "Plugin"
[docs] @classmethod @override def from_element(cls, element: Element, namespaces: Namespaces) -> Self: """Create a MavenPlugin from an XML plugin element. If no groupId is found, 'org.apache.maven.plugins' will be used. For more information on the default plugin group, see: https://maven.apache.org/guides/mini/guide-configuring-plugins.html """ # We can always just set the version if it's missing, so don't raise try: group_id_element = _find_element(element, "groupId", namespaces) except MavenXMLError: # Attempt to recover by retrieving the groupId from a parent element. # Otherwise, use the default groupId for plugins. parent = cls._get_parent(element, namespaces) group_id = parent.group_id if parent else "org.apache.maven.plugins" else: group_id = _get_element_text(group_id_element) try: version = _get_element_text(_find_element(element, "version", namespaces)) except MavenXMLError: version = None artifact_id = _get_element_text( _find_element(element, "artifactId", namespaces) ) return cls(group_id, artifact_id, version, "maven-plugin")
@classmethod @override def _collect_elements( cls, project: Element, namespaces: Namespaces ) -> list[Element]: all_plugins: list[Element] = [] # Get plugins declared at <build><plugins> try: build = _find_element(project, "build", namespaces) plugins = _find_element(build, "plugins", namespaces) all_plugins.extend(plugins.findall("plugin", namespaces)) except MavenXMLError: pass # Get plugins declared at <build><pluginManagement><plugins> try: build = _find_element(project, "build", namespaces) plugin_mgmt = _find_element(build, "pluginManagement", namespaces) plugins = _find_element(plugin_mgmt, "plugins", namespaces) all_plugins.extend(plugins.findall("plugin", namespaces)) except MavenXMLError: pass return all_plugins
[docs] @classmethod @override def update_versions( cls, project: Element, namespaces: Namespaces, existing: GroupDict ) -> None: """Update all of the versions for this project as necessary.""" declared_plugins = cls._collect_elements(project, namespaces) existing_plugins = cls._get_existing_plugins(existing) # Patch the declared plugins patched_plugins: set[MavenArtifact] = set() for plugin_ele in declared_plugins: plugin = cls.from_element(plugin_ele, namespaces) if (versions := _get_available_versions(existing, plugin)) is not None: _set_version(plugin_ele, namespaces, versions) patched_plugins.add(plugin) else: logger.warning( "Plugin '%s.%s' is declared, but is not available", plugin.group_id, plugin.artifact_id, ) # Explicitly declare the version of every other plugin on disk to be safe remaining_plugins = existing_plugins - patched_plugins cls._set_remaining_plugins(remaining_plugins, project, namespaces)
@classmethod def _get_existing_plugins(cls, existing: GroupDict) -> set[MavenArtifact]: """Get a list of every plugin on disk.""" existing_plugins: set[MavenArtifact] = set() for group in existing.values(): for arts in group.values(): for art in arts: if art.packaging_type == "maven-plugin": existing_plugins.add(art) break return existing_plugins @classmethod def _set_remaining_plugins( cls, remaining_plugins: set[MavenArtifact], project: Element, namespaces: Namespaces, ) -> None: """Append remaining plugin dependency entries to a project.""" plugins_ele = cls._get_plugins_ele(project, namespaces) for plugin in remaining_plugins: plugin_str = PLUGIN_TEMPLATE.format( artifact_id=plugin.artifact_id, group_id=plugin.group_id, version=plugin.version, ) plugin_ele = etree.fromstring(plugin_str, parser=_XML_PARSER) plugins_ele.append(plugin_ele) @classmethod def _get_plugins_ele(cls, project: Element, namespaces: Namespaces) -> Element: build = _find_or_create_ele(project, "build", namespaces) plugin_mgmt = _find_or_create_ele(build, "pluginManagement", namespaces) return _find_or_create_ele(plugin_mgmt, "plugins", namespaces)
def _find_or_create_ele(element: Element, tag: str, namespaces: Namespaces) -> Element: """Find a subelement within a given element.""" try: result = _find_element(element, tag, namespaces) except MavenXMLError: result = etree.Element(tag) element.append(result) return result def _get_existing_artifacts(part_info: PartInfo) -> GroupDict: result: GroupDict = GroupDict() search_locations = [ part_info.backstage_dir / "maven-use", Path("/usr/share/maven-repo"), ] for loc in search_locations: if not loc.is_dir(): continue for pom in loc.glob("**/*.pom"): art = MavenArtifact.from_pom(pom) _insert_into_existing(result, art) return result class _Versions: """Convenience type for versions available on-disk.""" semvers: set[Version] fallbacks: set[str] def __init__(self, artifacts: set[MavenArtifact]) -> None: """Parse out the versions of a set of artifacts. This function maps a set of artifacts into two sets. One is a list of all valid versions for easy handling, and the other is a set of unparsable version numbers. """ if not artifacts: raise ValueError("No versions were specified.") available: set[Version] = set() fallbacks: set[str] = set() for art in artifacts: if art.version is None: continue try: available.add(Version.parse(art.version)) except ValueError: fallbacks.add(art.version) self.semvers = available self.fallbacks = fallbacks def nearest_to(self, target: str) -> str: """Calculate the nearest available version to `target`. This method will make a best-effort attempt at matching the version specified by `target`. It will always prefer exact version matches. If the target is a semantic version, it will attempt to use the newest version that is older than the target, followed by the oldest version that is newer than the target. Finally, if `target` is not a semantic version at all, it will first attempt to use the newest semantic version available, then fall back to the alphabetically highest non-semantic version available. """ # If this succeeds, the target is a semantic version. If not, we can't understand the target # beyond equality, so just do our best. try: parsed_target = Version.parse(target) except ValueError: logger.debug("Requested version was not a semantic version.") # If there is an exact match in fallbacks, just use that if target in self.fallbacks: logger.debug("Exact match was found.") return target # The target isn't semver, just get the latest version we can if self.semvers: logger.debug( "Using maximum semantic version for unknown requested version." ) return str(max(self.semvers)) # If there weren't any successfully parsed versions on-disk, use fallback versions logger.debug( "No package versions using semver were found - falling back to alphabetically highest." ) return max(self.fallbacks) # If there is an exact match available, just use that if parsed_target in self.semvers: logger.debug("Exact match was found.") return target # Sort the available versions into those that are newer than the target semvers_newer = {ver for ver in self.semvers if ver > parsed_target} if semvers_newer: logger.debug("Using the closest newer version.") return str(min(semvers_newer)) # What remains must then be older than the target semvers_older = self.semvers - semvers_newer logger.debug("Using the closest older version.") return str(max(semvers_older)) def max(self) -> str: """Get the latest version on-disk.""" if self.semvers: return str(max(self.semvers)) return max(self.fallbacks) def _get_available_versions( existing: GroupDict, dependency: MavenArtifact ) -> _Versions | None: if artifacts := existing.get(dependency.group_id, {}).get( dependency.artifact_id, set() ): # Guaranteed to be a non-empty set of versions return _Versions(artifacts) return None def _set_version(element: Element, namespaces: Namespaces, versions: _Versions) -> None: group_id = _get_element_text(_find_element(element, "groupId", namespaces)) artifact_id = _get_element_text(_find_element(element, "artifactId", namespaces)) version_element = element.find("version", namespaces) # If no version is specified at all, always set it if version_element is None: new_version = versions.max() new_version_element = etree.Element("version") new_version_element.text = new_version comment = etree.Comment(f"Version set by craft-parts to '{new_version}'") element.append(comment) element.append(new_version_element) logger.debug( "Setting version of '%s.%s' to '%s'", group_id, artifact_id, new_version, ) return current_version = _get_element_text(version_element) logger.debug(f"Getting nearest version number for {artifact_id!r}.") new_version = versions.nearest_to(current_version) if current_version == new_version: return version_element.text = new_version comment = etree.Comment( f"Version updated by craft-parts from '{current_version}' to '{new_version}'" ) logger.debug( "Updating version of '%s.%s' from '%s' to '%s'", group_id, artifact_id, current_version, new_version, ) version_element.addprevious(comment)
[docs] @dataclass class MavenXMLError(BaseException): """An error encountered while parsing XML for Maven projects.""" message: str details: str | None = None def __str__(self) -> str: return f"{self.message}\n{self.details}"
def _find_element(element: Element, path: str, namespaces: Namespaces) -> Element: """Find a field within an element. This is equivalent to `element.find(path, namespaces)`, except that an exception is raised if the needle isn't found to reduce boilerplate. :param element: The haystack to search. :param path: The needle to find in the haystack. :param namespaces: A mapping of namespaces to use during the search. :raises MavenXMLError: if the needle can't be found. :return: The discovered element. """ if (needle := element.find(path, namespaces)) is not None: return needle raise MavenXMLError( message=f"Could not find path {path!r} in element {element.tag!r}", details=f"Could not find path {path!r} in the following XML element:\n{_format_xml_str(element)}", ) def _get_element_text(element: Element) -> str: """Extract the text field from an element. This is equivalent to `element.text`, except that an exception is raised if the text field is empty to reduce boilerplate. :param element: The element to read from. :raises _MavenXMLError: if there is no text field. :return: The content of the text field. """ if (text := element.text) is not None: return text raise MavenXMLError( message=f"No text field found on {element.tag!r}", details=f"No text field found on {element.tag!r} in the following XML element:\n{_format_xml_str(element)}", ) def _format_xml_str(element: Element) -> str: """Get a nicely-formatted string for displaying an XML element.""" return etree.tostring(element, pretty_print=True).decode(errors="replace") def _get_namespaces(project: Element) -> Namespaces: """Find and register the first XML namespace.""" namespaces = project.nsmap for prefix, uri in namespaces.items(): try: etree.register_namespace(prefix or "default", uri) # Some pom files, such as those found in the apt package libbsh-java, have malformed # namespace URIs. Ignoring the error allows everything else to work, so just catch # this particular case and move on. except ValueError as ve: # noqa: PERF203 if "Invalid namespace URI" in str(ve): continue raise return namespaces def _get_poms( base_pom: Path | None, part_info: PartInfo, existing: GroupDict ) -> list[Path]: """Get a list of poms on a project. Each submodule is added to the list of existing artifacts because Maven's build process will determine the correct order to build dependencies before their consuming binaries need them - we do not need to figure this out ourselves. If a build somehow fails due to one of these missing from the backstage, it is due to a malformed pom.xml. """ poms: list[Path] = [] if base_pom is None: base_pom = part_info.part_build_subdir / "pom.xml" if not base_pom.is_file(): raise MavenXMLError("'pom.xml' does not exist") poms.append(base_pom) _recurse_submodules(part_info, base_pom, poms, existing) logger.debug( "Discovered poms for part '%s': [%s]", part_info.part_name, ", ".join( [str(path.relative_to(part_info.part_build_subdir)) for path in poms] ), ) return poms def _recurse_submodules( part_info: PartInfo, parent_pom: Path, all_poms: list[Path], existing: GroupDict ) -> None: """Recursively find submodule poms and add them to the existing artifacts.""" tree = etree.parse(parent_pom, parser=_XML_PARSER) project = tree.getroot() namespaces = _get_namespaces(project) # Check if there are any modules and end recursion early if not try: modules = _find_element(project, "modules", namespaces) except MavenXMLError: return # For every module found: for module in modules.findall("module", namespaces): # - Append it to the list of poms that need patching path_str = _get_element_text(module) pom_path = (parent_pom.parent / path_str / "pom.xml").resolve() # - Validate that it is a legitimate dependency file, but only warn if not if not pom_path.is_file(): logger.debug( "The pom '%s' declares a submodule at '%s', but this submodule could not be found.", parent_pom.relative_to(part_info.part_build_subdir), path_str, ) continue all_poms.append(pom_path) # - Add it to the list of existing artifacts art = MavenArtifact.from_pom(pom_path) _insert_into_existing(existing, art) # - Recurse on its pom.xml for more submodules _recurse_submodules(part_info, pom_path, all_poms, existing) def _insert_into_existing(existing: GroupDict, art: MavenArtifact) -> None: """Insert a pom file into the list of existing artifacts.""" group_artifacts = existing.setdefault(art.group_id, {}) versions = group_artifacts.setdefault(art.artifact_id, set()) versions.add(art)