feat: load rosdistro yaml from local clone and persist path in ~/.config/buildfarm

This commit is contained in:
Maik Knof
2025-12-29 16:50:47 +00:00
parent c4ec2bb6c1
commit ebe3e1059e
3 changed files with 119 additions and 51 deletions

View File

@@ -3,6 +3,8 @@ import sys
from ros2cli.command import CommandExtension from ros2cli.command import CommandExtension
from .config import (default_config_path, get_rosdistro_repo_path, load_config,
save_config, set_rosdistro_repo_path)
from .rosdistro_loader import fetch_rosdistro_packages from .rosdistro_loader import fetch_rosdistro_packages
from .verb.list import add_list_verb from .verb.list import add_list_verb
@@ -11,6 +13,10 @@ class BuildfarmCommand(CommandExtension):
"""https://git.maikknof.de/ros2/buildfarm""" """https://git.maikknof.de/ros2/buildfarm"""
def add_arguments(self, parser, cli_name): def add_arguments(self, parser, cli_name):
cfg_path = default_config_path()
cfg = load_config(cfg_path)
default_repo_path = get_rosdistro_repo_path(cfg)
parser.add_argument( parser.add_argument(
"--base-url", "--base-url",
default="https://git.maikknof.de", default="https://git.maikknof.de",
@@ -30,18 +36,30 @@ class BuildfarmCommand(CommandExtension):
help="ROS distro name (defaults to env ROS_DISTRO/ROSDISTRO)", help="ROS distro name (defaults to env ROS_DISTRO/ROSDISTRO)",
) )
parser.add_argument(
"--rosdistro-repo-path",
default=default_repo_path,
help="Path to a local clone of the rosdistro repo (stored in ~/.config/buildfarm/config.yaml if provided)",
)
subparsers = parser.add_subparsers(dest="verb", metavar="verb") subparsers = parser.add_subparsers(dest="verb", metavar="verb")
subparsers.required = True subparsers.required = True
add_list_verb(subparsers) add_list_verb(subparsers)
self._parser = parser self._parser = parser
self._cfg_path = cfg_path
self._cfg = cfg
def main(self, *, parser, args): def main(self, *, parser, args):
if not hasattr(args, "main"): if not hasattr(args, "main"):
self._parser.print_help() self._parser.print_help()
return 0 return 0
if getattr(args, "rosdistro_repo_path", None):
set_rosdistro_repo_path(self._cfg, args.rosdistro_repo_path)
save_config(self._cfg_path, self._cfg)
try: try:
packages = fetch_rosdistro_packages( packages = fetch_rosdistro_packages(
base_url=args.base_url, base_url=args.base_url,
@@ -49,6 +67,7 @@ class BuildfarmCommand(CommandExtension):
repo=args.repo, repo=args.repo,
branch=args.branch, branch=args.branch,
rosdistro=args.rosdistro, rosdistro=args.rosdistro,
rosdistro_repo_path=getattr(args, "rosdistro_repo_path", None),
) )
except Exception as e: except Exception as e:
print(f"Error loading rosdistro packages: {e}", file=sys.stderr) print(f"Error loading rosdistro packages: {e}", file=sys.stderr)

34
buildfarm/config.py Normal file
View File

@@ -0,0 +1,34 @@
from __future__ import annotations
import os
from pathlib import Path
from typing import Any, Dict, Optional
import yaml
def default_config_path() -> Path:
xdg = os.environ.get("XDG_CONFIG_HOME")
base = Path(xdg) if xdg else (Path.home() / ".config")
return base / "buildfarm" / "config.yaml"
def load_config(path: Path) -> Dict[str, Any]:
if not path.exists():
return {}
data = yaml.safe_load(path.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
def save_config(path: Path, data: Dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(yaml.safe_dump(data, sort_keys=True), encoding="utf-8")
def get_rosdistro_repo_path(cfg: Dict[str, Any]) -> Optional[str]:
val = cfg.get("rosdistro_repo_path")
return val if isinstance(val, str) and val.strip() else None
def set_rosdistro_repo_path(cfg: Dict[str, Any], repo_path: str) -> None:
cfg["rosdistro_repo_path"] = repo_path

View File

@@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
import re import re
from pathlib import Path
from typing import Dict from typing import Dict
import requests import requests
@@ -42,57 +43,7 @@ def _parse_package_ref(value: str) -> tuple[str, str, str]:
return owner, repo, ref return owner, repo, ref
def _fetch_text(url: str, timeout_s: float) -> str: def _parse_yaml_text(text: str, used_path: str) -> Dict[str, Package]:
resp = requests.get(url, timeout=timeout_s)
if resp.status_code != 200:
raise RuntimeError(
f"Failed to fetch '{url}' (HTTP {resp.status_code}): {resp.text}"
)
return resp.text
def fetch_rosdistro_packages(
*,
base_url: str,
owner: str,
repo: str,
branch: str,
rosdistro: str,
timeout_s: float = 20.0,
) -> Dict[str, Package]:
"""
Tries these paths (in order):
1) <rosdistro>/distro.yaml (your current layout)
2) rosdistro/<rosdistro>/distro.yaml (fallback / older layout)
Returns:
{ package_name: Package(name, owner, repo, ref) }
"""
candidates = [
f"{rosdistro}/distro.yaml",
f"rosdistro/{rosdistro}/distro.yaml",
]
last_err: Exception | None = None
text: str | None = None
used_path: str | None = None
for file_path in candidates:
url = _raw_url(base_url, owner, repo, branch, file_path)
try:
text = _fetch_text(url, timeout_s)
used_path = file_path
break
except Exception as e:
last_err = e
if text is None or used_path is None:
raise (
last_err
if last_err is not None
else RuntimeError("Failed to fetch rosdistro YAML")
)
data = yaml.safe_load(text) data = yaml.safe_load(text)
if not isinstance(data, dict): if not isinstance(data, dict):
raise ValueError( raise ValueError(
@@ -110,3 +61,67 @@ def fetch_rosdistro_packages(
packages[name] = Package(name=name, owner=pkg_owner, repo=pkg_repo, ref=pkg_ref) packages[name] = Package(name=name, owner=pkg_owner, repo=pkg_repo, ref=pkg_ref)
return packages return packages
def _try_read_local(repo_path: Path, candidates: list[str]) -> tuple[str, str] | None:
for rel in candidates:
p = repo_path / rel
if p.is_file():
return p.read_text(encoding="utf-8"), str(p)
return None
def _fetch_text(url: str, timeout_s: float) -> str:
resp = requests.get(url, timeout=timeout_s)
if resp.status_code != 200:
raise RuntimeError(
f"Failed to fetch '{url}' (HTTP {resp.status_code}): {resp.text}"
)
return resp.text
def fetch_rosdistro_packages(
*,
base_url: str,
owner: str,
repo: str,
branch: str,
rosdistro: str,
rosdistro_repo_path: str | None = None,
timeout_s: float = 20.0,
) -> Dict[str, Package]:
"""
Reads rosdistro YAML from:
1) local clone (if rosdistro_repo_path is set)
2) otherwise from Gitea via raw URL
Tries these paths (in order):
- <rosdistro>/distro.yaml
- rosdistro/<rosdistro>/distro.yaml
"""
candidates = [
f"{rosdistro}/distro.yaml",
f"rosdistro/{rosdistro}/distro.yaml",
]
if rosdistro_repo_path:
repo_path = Path(rosdistro_repo_path).expanduser()
local = _try_read_local(repo_path, candidates)
if local is not None:
text, used_path = local
return _parse_yaml_text(text, used_path)
last_err: Exception | None = None
for file_path in candidates:
url = _raw_url(base_url, owner, repo, branch, file_path)
try:
text = _fetch_text(url, timeout_s)
return _parse_yaml_text(text, url)
except Exception as e:
last_err = e
raise (
last_err
if last_err is not None
else RuntimeError("Failed to load rosdistro YAML")
)