# Copyright 2025 Canonical Ltd.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Utilities for Maven projects and settings."""
from __future__ import annotations
import logging
import os
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, cast
from urllib.parse import urlparse
from lxml import etree
from semver import Version
from typing_extensions import Self, override
from ._xml import (
CRAFT_REPO_TEMPLATE,
DISTRIBUTION_REPO_TEMPLATE,
LOCAL_REPO_TEMPLATE,
MIRROR_REPO,
PLUGIN_TEMPLATE,
PROXIES_TEMPLATE,
PROXY_CREDENTIALS_TEMPLATE,
PROXY_TEMPLATE,
SETTINGS_TEMPLATE,
)
if TYPE_CHECKING:
from craft_parts.infos import PartInfo
logger = logging.getLogger(__name__)
ArtifactDict = dict[str, set["MavenArtifact"]]
GroupDict = dict[str, ArtifactDict]
Namespaces = dict[str | None, str]
# Created so these lint silencers don't need to be repeated every single time.
# lxml's `Element` is actually a free-form function that returns the private
# `_Element` class, so for typing's sake, a "private" type needs to be referenced.
Element = etree._Element # type: ignore[reportPrivateUsage] # noqa: SLF001
_XML_PARSER = etree.XMLParser(
# Attempt to recover if an error is encountered
recover=True,
# Entities can resolve into zip-bomb-like packages, so ignore them
resolve_entities=False,
# Removing blank text allows the `pretty_print` kwarg to work later
remove_blank_text=True,
# Just no
no_network=True,
remove_comments=False,
)
[docs]
def create_maven_settings(*, part_info: PartInfo, set_mirror: bool) -> Path:
"""Create a Maven configuration file.
The settings file contains additional configuration for Maven, such
as proxy parameters and a reference to the backstage repository.
:param part_info: The part info for the part invoking Maven.
:param set_mirror: Whether to configure for a local build.
:return: Returns a Path object to the newly created settings file.
"""
settings_path = part_info.part_build_subdir / ".parts/.m2/settings.xml"
settings_path.parent.mkdir(parents=True, exist_ok=True)
proxies_element = _get_proxy_config() if _needs_proxy_config() else ""
# Use artifacts generated by other parts, if they exist.
backstage_repo = cast("Path", part_info.backstage_dir) / "maven-use"
if backstage_repo.is_dir():
# This is the shared repository in the backstage
craft_element = CRAFT_REPO_TEMPLATE.format(repo_uri=backstage_repo.as_uri())
else:
craft_element = ""
# The "cache" dir local to the maven run
local_repo = part_info.part_build_subdir / ".parts/.m2/repository"
local_element = LOCAL_REPO_TEMPLATE.format(repo_dir=local_repo)
mirror_element = MIRROR_REPO if set_mirror else ""
settings_xml = SETTINGS_TEMPLATE.format(
local_repository_element=local_element,
craft_repository_element=craft_element,
mirror_repository_element=mirror_element,
proxies_element=proxies_element,
)
settings_path.write_text(settings_xml)
return settings_path
def _get_proxy_config() -> str:
"""Generate an XML string for proxy configurations.
Reads the environment for information on desired proxy settings and
transforms those variables into Maven XML settings entries.
"""
# Transform all environment variables to their lowercase form to support HTTPS_PROXY
# vs. https_proxy and such
case_insensitive_env = {item[0].lower(): item[1] for item in os.environ.items()}
proxies: list[str] = []
for protocol in ["http", "https"]:
env_name = f"{protocol}_proxy"
if env_name not in case_insensitive_env:
continue
proxy_url = urlparse(case_insensitive_env[env_name])
if proxy_url.username is not None and proxy_url.password is not None:
credentials = PROXY_CREDENTIALS_TEMPLATE.format(
username=proxy_url.username, password=proxy_url.password
)
else:
credentials = ""
proxy_element = PROXY_TEMPLATE.format(
id=env_name,
protocol=protocol,
host=proxy_url.hostname,
port=proxy_url.port,
credentials=credentials,
non_proxy_hosts=_get_no_proxy_string(),
)
proxies.append(proxy_element)
return PROXIES_TEMPLATE.format(proxies="\n".join(proxies))
def _needs_proxy_config() -> bool:
"""Determine whether or not proxy configuration is necessary for Maven."""
proxy_vars = ["http_proxy", "https_proxy", "HTTP_PROXY", "HTTPS_PROXY"]
return any(key in os.environ for key in proxy_vars)
def _get_no_proxy_string() -> str:
no_proxy = [k.strip() for k in os.environ.get("no_proxy", "localhost").split(",")]
return "|".join(no_proxy)
[docs]
def update_pom(
*,
part_info: PartInfo,
deploy_to: Path | None,
self_contained: bool,
pom_file: Path | None = None,
) -> None:
"""Update the POM file of a Maven project.
:param part_info: Information about the invoking part.
:param deploy_to: The path to configure the `mvn deploy` location. If None, no path
is configured.
:param self_contained: Whether or not to patch version numbers with what is
actually available.
:param pom_file: The optional Maven POM file to update. If ``None``, the function
will try to use ``pom.xml`` on the part's build subdir.
"""
existing = _get_existing_artifacts(part_info)
poms = _get_poms(pom_file, part_info, existing)
for pom in poms:
tree = etree.parse(pom, parser=_XML_PARSER)
project = tree.getroot()
namespaces = _get_namespaces(project)
if deploy_to is not None:
# Add a distributionManagement element, to tell "maven deploy" to deploy the
# artifacts (jars, poms, etc) to the export dir.
distribution_dir = deploy_to / "maven-use"
distribution_element = etree.fromstring(
DISTRIBUTION_REPO_TEMPLATE.format(repo_uri=distribution_dir.as_uri()),
parser=_XML_PARSER,
)
# Remove the existing distributionManagement tag if present.
# This is okay because we only need to "distribute" to the backstage directory,
# so any other project-specific config is irrelevant to this build.
if (
existing_distmgmt := project.find("distributionManagement", namespaces)
) is not None:
project.remove(existing_distmgmt)
project.append(distribution_element)
if self_contained:
MavenArtifact.update_versions(project, namespaces, existing)
MavenPlugin.update_versions(project, namespaces, existing)
MavenParent.update_versions(project, namespaces, existing)
# Add a comment to record the fact that this was modified by use
comment = etree.Comment("This project was modified by craft-parts")
project.insert(0, comment)
tree.write(pom, pretty_print=True)
[docs]
@dataclass(frozen=True)
class MavenArtifact:
"""A dataclass for Maven artifacts."""
group_id: str
artifact_id: str
version: str | None
packaging_type: str | None
field_name: str = "Dependency"
[docs]
@classmethod
def from_element(cls, element: Element, namespaces: Namespaces) -> Self:
"""Create a MavenArtifact from an XML artifact element."""
# We can always just set the version if it's missing, so don't raise
try:
version = _get_element_text(_find_element(element, "version", namespaces))
except MavenXMLError:
version = None
# Attempt to read a packaging type
try:
packaging = _get_element_text(
_find_element(element, "packaging", namespaces)
)
except MavenXMLError:
packaging = None
try:
group_id = _get_element_text(_find_element(element, "groupId", namespaces))
except MavenXMLError:
# Attempt to recover by retrieving the groupId from a parent element.
# If the parent is malformed, this raises a different error blaming the parent instead.
parent = cls._get_parent(element, namespaces)
if parent:
group_id = parent.group_id
else:
raise
artifact_id = _get_element_text(
_find_element(element, "artifactId", namespaces)
)
return cls(group_id, artifact_id, version, packaging)
[docs]
@classmethod
def from_pom(cls, pom: Path) -> Self:
"""Create a MavenArtifact from a pom file."""
tree = etree.parse(pom, parser=_XML_PARSER)
project = tree.getroot()
namespaces = _get_namespaces(project)
return cls.from_element(project, namespaces)
@classmethod
def _collect_elements(
cls, project: Element, namespaces: Namespaces
) -> list[Element]:
dependencies = project.find("dependencies", namespaces)
if dependencies is None:
return []
return dependencies.findall("dependency", namespaces)
[docs]
@classmethod
def update_versions(
cls, project: Element, namespaces: Namespaces, existing: GroupDict
) -> None:
"""Update all of the versions for this project as necessary."""
for dependency in cls._collect_elements(project, namespaces):
dep = cls.from_element(dependency, namespaces)
if versions := _get_available_versions(existing, dep):
_set_version(dependency, namespaces, versions)
else:
logger.debug(
f"{cls.field_name} {dep.artifact_id} has no available version, skipping."
)
@classmethod
def _get_parent(
cls, project: Element, namespaces: Namespaces
) -> MavenParent | None:
try:
parent_ele = _find_element(project, "parent", namespaces)
except MavenXMLError:
return None
return MavenParent.from_element(parent_ele, namespaces)
[docs]
@dataclass(frozen=True)
class MavenParent(MavenArtifact):
"""A dataclass for the Maven parent tag."""
field_name: str = "Parent"
@classmethod
@override
def _collect_elements(
cls, project: Element, namespaces: Namespaces
) -> list[Element]:
parent = project.find("parent", namespaces)
if parent is None:
return []
return [parent]
[docs]
@dataclass(frozen=True)
class MavenPlugin(MavenArtifact):
"""A dataclass for Maven plugins.
These are different because plugins have a default groupId.
"""
field_name: str = "Plugin"
[docs]
@classmethod
@override
def from_element(cls, element: Element, namespaces: Namespaces) -> Self:
"""Create a MavenPlugin from an XML plugin element.
If no groupId is found, 'org.apache.maven.plugins' will be used.
For more information on the default plugin group, see:
https://maven.apache.org/guides/mini/guide-configuring-plugins.html
"""
# We can always just set the version if it's missing, so don't raise
try:
group_id_element = _find_element(element, "groupId", namespaces)
except MavenXMLError:
# Attempt to recover by retrieving the groupId from a parent element.
# Otherwise, use the default groupId for plugins.
parent = cls._get_parent(element, namespaces)
group_id = parent.group_id if parent else "org.apache.maven.plugins"
else:
group_id = _get_element_text(group_id_element)
try:
version = _get_element_text(_find_element(element, "version", namespaces))
except MavenXMLError:
version = None
artifact_id = _get_element_text(
_find_element(element, "artifactId", namespaces)
)
return cls(group_id, artifact_id, version, "maven-plugin")
@classmethod
@override
def _collect_elements(
cls, project: Element, namespaces: Namespaces
) -> list[Element]:
all_plugins: list[Element] = []
# Get plugins declared at <build><plugins>
try:
build = _find_element(project, "build", namespaces)
plugins = _find_element(build, "plugins", namespaces)
all_plugins.extend(plugins.findall("plugin", namespaces))
except MavenXMLError:
pass
# Get plugins declared at <build><pluginManagement><plugins>
try:
build = _find_element(project, "build", namespaces)
plugin_mgmt = _find_element(build, "pluginManagement", namespaces)
plugins = _find_element(plugin_mgmt, "plugins", namespaces)
all_plugins.extend(plugins.findall("plugin", namespaces))
except MavenXMLError:
pass
return all_plugins
[docs]
@classmethod
@override
def update_versions(
cls, project: Element, namespaces: Namespaces, existing: GroupDict
) -> None:
"""Update all of the versions for this project as necessary."""
declared_plugins = cls._collect_elements(project, namespaces)
existing_plugins = cls._get_existing_plugins(existing)
# Patch the declared plugins
patched_plugins: set[MavenArtifact] = set()
for plugin_ele in declared_plugins:
plugin = cls.from_element(plugin_ele, namespaces)
if (versions := _get_available_versions(existing, plugin)) is not None:
_set_version(plugin_ele, namespaces, versions)
patched_plugins.add(plugin)
else:
logger.warning(
"Plugin '%s.%s' is declared, but is not available",
plugin.group_id,
plugin.artifact_id,
)
# Explicitly declare the version of every other plugin on disk to be safe
remaining_plugins = existing_plugins - patched_plugins
cls._set_remaining_plugins(remaining_plugins, project, namespaces)
@classmethod
def _get_existing_plugins(cls, existing: GroupDict) -> set[MavenArtifact]:
"""Get a list of every plugin on disk."""
existing_plugins: set[MavenArtifact] = set()
for group in existing.values():
for arts in group.values():
for art in arts:
if art.packaging_type == "maven-plugin":
existing_plugins.add(art)
break
return existing_plugins
@classmethod
def _set_remaining_plugins(
cls,
remaining_plugins: set[MavenArtifact],
project: Element,
namespaces: Namespaces,
) -> None:
"""Append remaining plugin dependency entries to a project."""
plugins_ele = cls._get_plugins_ele(project, namespaces)
for plugin in remaining_plugins:
plugin_str = PLUGIN_TEMPLATE.format(
artifact_id=plugin.artifact_id,
group_id=plugin.group_id,
version=plugin.version,
)
plugin_ele = etree.fromstring(plugin_str, parser=_XML_PARSER)
plugins_ele.append(plugin_ele)
@classmethod
def _get_plugins_ele(cls, project: Element, namespaces: Namespaces) -> Element:
build = _find_or_create_ele(project, "build", namespaces)
plugin_mgmt = _find_or_create_ele(build, "pluginManagement", namespaces)
return _find_or_create_ele(plugin_mgmt, "plugins", namespaces)
def _find_or_create_ele(element: Element, tag: str, namespaces: Namespaces) -> Element:
"""Find a subelement within a given element."""
try:
result = _find_element(element, tag, namespaces)
except MavenXMLError:
result = etree.Element(tag)
element.append(result)
return result
def _get_existing_artifacts(part_info: PartInfo) -> GroupDict:
result: GroupDict = GroupDict()
search_locations = [
part_info.backstage_dir / "maven-use",
Path("/usr/share/maven-repo"),
]
for loc in search_locations:
if not loc.is_dir():
continue
for pom in loc.glob("**/*.pom"):
art = MavenArtifact.from_pom(pom)
_insert_into_existing(result, art)
return result
class _Versions:
"""Convenience type for versions available on-disk."""
semvers: set[Version]
fallbacks: set[str]
def __init__(self, artifacts: set[MavenArtifact]) -> None:
"""Parse out the versions of a set of artifacts.
This function maps a set of artifacts into two sets. One is a list of all valid
versions for easy handling, and the other is a set of unparsable version numbers.
"""
if not artifacts:
raise ValueError("No versions were specified.")
available: set[Version] = set()
fallbacks: set[str] = set()
for art in artifacts:
if art.version is None:
continue
try:
available.add(Version.parse(art.version))
except ValueError:
fallbacks.add(art.version)
self.semvers = available
self.fallbacks = fallbacks
def nearest_to(self, target: str) -> str:
"""Calculate the nearest available version to `target`.
This method will make a best-effort attempt at matching the version
specified by `target`. It will always prefer exact version matches.
If the target is a semantic version, it will attempt to use the newest version
that is older than the target, followed by the oldest version that is newer
than the target.
Finally, if `target` is not a semantic version at all, it will first attempt
to use the newest semantic version available, then fall back to the
alphabetically highest non-semantic version available.
"""
# If this succeeds, the target is a semantic version. If not, we can't understand the target
# beyond equality, so just do our best.
try:
parsed_target = Version.parse(target)
except ValueError:
logger.debug("Requested version was not a semantic version.")
# If there is an exact match in fallbacks, just use that
if target in self.fallbacks:
logger.debug("Exact match was found.")
return target
# The target isn't semver, just get the latest version we can
if self.semvers:
logger.debug(
"Using maximum semantic version for unknown requested version."
)
return str(max(self.semvers))
# If there weren't any successfully parsed versions on-disk, use fallback versions
logger.debug(
"No package versions using semver were found - falling back to alphabetically highest."
)
return max(self.fallbacks)
# If there is an exact match available, just use that
if parsed_target in self.semvers:
logger.debug("Exact match was found.")
return target
# Sort the available versions into those that are newer than the target
semvers_newer = {ver for ver in self.semvers if ver > parsed_target}
if semvers_newer:
logger.debug("Using the closest newer version.")
return str(min(semvers_newer))
# What remains must then be older than the target
semvers_older = self.semvers - semvers_newer
logger.debug("Using the closest older version.")
return str(max(semvers_older))
def max(self) -> str:
"""Get the latest version on-disk."""
if self.semvers:
return str(max(self.semvers))
return max(self.fallbacks)
def _get_available_versions(
existing: GroupDict, dependency: MavenArtifact
) -> _Versions | None:
if artifacts := existing.get(dependency.group_id, {}).get(
dependency.artifact_id, set()
):
# Guaranteed to be a non-empty set of versions
return _Versions(artifacts)
return None
def _set_version(element: Element, namespaces: Namespaces, versions: _Versions) -> None:
group_id = _get_element_text(_find_element(element, "groupId", namespaces))
artifact_id = _get_element_text(_find_element(element, "artifactId", namespaces))
version_element = element.find("version", namespaces)
# If no version is specified at all, always set it
if version_element is None:
new_version = versions.max()
new_version_element = etree.Element("version")
new_version_element.text = new_version
comment = etree.Comment(f"Version set by craft-parts to '{new_version}'")
element.append(comment)
element.append(new_version_element)
logger.debug(
"Setting version of '%s.%s' to '%s'",
group_id,
artifact_id,
new_version,
)
return
current_version = _get_element_text(version_element)
logger.debug(f"Getting nearest version number for {artifact_id!r}.")
new_version = versions.nearest_to(current_version)
if current_version == new_version:
return
version_element.text = new_version
comment = etree.Comment(
f"Version updated by craft-parts from '{current_version}' to '{new_version}'"
)
logger.debug(
"Updating version of '%s.%s' from '%s' to '%s'",
group_id,
artifact_id,
current_version,
new_version,
)
version_element.addprevious(comment)
[docs]
@dataclass
class MavenXMLError(BaseException):
"""An error encountered while parsing XML for Maven projects."""
message: str
details: str | None = None
def __str__(self) -> str:
return f"{self.message}\n{self.details}"
def _find_element(element: Element, path: str, namespaces: Namespaces) -> Element:
"""Find a field within an element.
This is equivalent to `element.find(path, namespaces)`, except that
an exception is raised if the needle isn't found to reduce boilerplate.
:param element: The haystack to search.
:param path: The needle to find in the haystack.
:param namespaces: A mapping of namespaces to use during the search.
:raises MavenXMLError: if the needle can't be found.
:return: The discovered element.
"""
if (needle := element.find(path, namespaces)) is not None:
return needle
raise MavenXMLError(
message=f"Could not find path {path!r} in element {element.tag!r}",
details=f"Could not find path {path!r} in the following XML element:\n{_format_xml_str(element)}",
)
def _get_element_text(element: Element) -> str:
"""Extract the text field from an element.
This is equivalent to `element.text`, except that an exception is
raised if the text field is empty to reduce boilerplate.
:param element: The element to read from.
:raises _MavenXMLError: if there is no text field.
:return: The content of the text field.
"""
if (text := element.text) is not None:
return text
raise MavenXMLError(
message=f"No text field found on {element.tag!r}",
details=f"No text field found on {element.tag!r} in the following XML element:\n{_format_xml_str(element)}",
)
def _format_xml_str(element: Element) -> str:
"""Get a nicely-formatted string for displaying an XML element."""
return etree.tostring(element, pretty_print=True).decode(errors="replace")
def _get_namespaces(project: Element) -> Namespaces:
"""Find and register the first XML namespace."""
namespaces = project.nsmap
for prefix, uri in namespaces.items():
try:
etree.register_namespace(prefix or "default", uri)
# Some pom files, such as those found in the apt package libbsh-java, have malformed
# namespace URIs. Ignoring the error allows everything else to work, so just catch
# this particular case and move on.
except ValueError as ve: # noqa: PERF203
if "Invalid namespace URI" in str(ve):
continue
raise
return namespaces
def _get_poms(
base_pom: Path | None, part_info: PartInfo, existing: GroupDict
) -> list[Path]:
"""Get a list of poms on a project.
Each submodule is added to the list of existing artifacts because Maven's build
process will determine the correct order to build dependencies before their
consuming binaries need them - we do not need to figure this out ourselves. If
a build somehow fails due to one of these missing from the backstage, it is due
to a malformed pom.xml.
"""
poms: list[Path] = []
if base_pom is None:
base_pom = part_info.part_build_subdir / "pom.xml"
if not base_pom.is_file():
raise MavenXMLError("'pom.xml' does not exist")
poms.append(base_pom)
_recurse_submodules(part_info, base_pom, poms, existing)
logger.debug(
"Discovered poms for part '%s': [%s]",
part_info.part_name,
", ".join(
[str(path.relative_to(part_info.part_build_subdir)) for path in poms]
),
)
return poms
def _recurse_submodules(
part_info: PartInfo, parent_pom: Path, all_poms: list[Path], existing: GroupDict
) -> None:
"""Recursively find submodule poms and add them to the existing artifacts."""
tree = etree.parse(parent_pom, parser=_XML_PARSER)
project = tree.getroot()
namespaces = _get_namespaces(project)
# Check if there are any modules and end recursion early if not
try:
modules = _find_element(project, "modules", namespaces)
except MavenXMLError:
return
# For every module found:
for module in modules.findall("module", namespaces):
# - Append it to the list of poms that need patching
path_str = _get_element_text(module)
pom_path = (parent_pom.parent / path_str / "pom.xml").resolve()
# - Validate that it is a legitimate dependency file, but only warn if not
if not pom_path.is_file():
logger.debug(
"The pom '%s' declares a submodule at '%s', but this submodule could not be found.",
parent_pom.relative_to(part_info.part_build_subdir),
path_str,
)
continue
all_poms.append(pom_path)
# - Add it to the list of existing artifacts
art = MavenArtifact.from_pom(pom_path)
_insert_into_existing(existing, art)
# - Recurse on its pom.xml for more submodules
_recurse_submodules(part_info, pom_path, all_poms, existing)
def _insert_into_existing(existing: GroupDict, art: MavenArtifact) -> None:
"""Insert a pom file into the list of existing artifacts."""
group_artifacts = existing.setdefault(art.group_id, {})
versions = group_artifacts.setdefault(art.artifact_id, set())
versions.add(art)