diff --git a/bin/nur b/bin/nur index 998edbeb0..62b8b462d 100755 --- a/bin/nur +++ b/bin/nur @@ -1,5 +1,5 @@ #!/usr/bin/env nix-shell -#!nix-shell -p python3 -p nix-prefetch-git -p nix -i python3 +#!nix-shell -p python3 -p python3Packages.aiohttp -p nix-prefetch-git -p nix -i python3 import sys import os diff --git a/ci/nur/error.py b/ci/nur/error.py index fa2665cce..8ac6b0d59 100644 --- a/ci/nur/error.py +++ b/ci/nur/error.py @@ -4,3 +4,7 @@ class NurError(Exception): class EvalError(NurError): pass + + +class RepositoryDeletedError(NurError): + pass diff --git a/ci/nur/eval.py b/ci/nur/eval.py index 69963526f..e33808a85 100644 --- a/ci/nur/eval.py +++ b/ci/nur/eval.py @@ -2,6 +2,7 @@ import logging import os import subprocess import tempfile +import asyncio from argparse import Namespace from pathlib import Path from urllib.parse import urlparse @@ -13,7 +14,7 @@ from .path import EVALREPO_PATH, nixpkgs_path logger = logging.getLogger(__name__) -def eval_repo(repo: Repo, repo_path: Path) -> None: +async def eval_repo(repo: Repo, repo_path: Path) -> None: with tempfile.TemporaryDirectory() as d: eval_path = Path(d).joinpath("default.nix") with open(eval_path, "w") as f: @@ -49,15 +50,20 @@ import {EVALREPO_PATH} {{ ] # fmt: on - logger.info(f"Evaluate repository {repo.name}") env = dict(PATH=os.environ["PATH"], NIXPKGS_ALLOW_UNSUPPORTED_SYSTEM="1") - proc = subprocess.Popen(cmd, env=env, stdout=subprocess.DEVNULL) + proc = await asyncio.create_subprocess_exec( + *cmd, + env=env, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) try: - res = proc.wait(15) - except subprocess.TimeoutExpired: + stdout, stderr = await asyncio.wait_for(proc.communicate(), 15) + except TimeoutError: + proc.kill() raise EvalError(f"evaluation for {repo.name} timed out of after 15 seconds") - if res != 0: - raise EvalError(f"{repo.name} does not evaluate:\n$ {' '.join(cmd)}") + if proc.returncode != 0: + raise EvalError(f"{repo.name} does not evaluate:\n$ {' '.join(cmd)}\n\n{stdout.decode()}") async def eval_command(args: Namespace) -> None: diff --git a/ci/nur/prefetch.py b/ci/nur/prefetch.py index fa56735bc..822b2308e 100644 --- a/ci/nur/prefetch.py +++ b/ci/nur/prefetch.py @@ -3,12 +3,13 @@ import os import re import subprocess import asyncio +import aiohttp from pathlib import Path from typing import Optional, Tuple from urllib.parse import urlparse, ParseResult -from .error import NurError -from .manifest import LockedVersion, Repo, RepoType +from .error import NurError, RepositoryDeletedError +from .manifest import Repo, RepoType Url = ParseResult @@ -29,25 +30,51 @@ async def nix_prefetch_zip(url: str) -> Tuple[str, Path]: return sha256, Path(path) +def parse_pkt_lines(data: bytes): + i = 0 + lines = [] + while i < len(data): + if i + 4 > len(data): + break + length = int(data[i:i+4], 16) + i += 4 + if length == 0: + continue + line = data[i:i+length-4] + i += length - 4 + lines.append(line) + return lines + + class GitPrefetcher: def __init__(self, repo: Repo) -> None: self.repo = repo async def latest_commit(self) -> str: - proc = await asyncio.create_subprocess_exec( - *["git", "ls-remote", self.repo.url.geturl(), self.repo.branch or "HEAD"], - env={**os.environ, "GIT_ASKPASS": "", "GIT_TERMINAL_PROMPT": "0"}, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - stdout, stderr = await proc.communicate() + info_url = f"{self.repo.url.geturl()}/info/refs?service=git-upload-pack" - if proc.returncode != 0: - raise NurError( - f"Failed to prefetch git repository {self.repo.url.geturl()}: {stderr.decode()}" - ) + async with aiohttp.ClientSession() as session: + async with session.get(info_url) as resp: + if resp.status == 401: + raise RepositoryDeletedError(f"Repository deleted!") + elif resp.status != 200: + raise NurError(f"Failed to get refs for {self.repo.url.geturl()}: {(await resp.read()).decode()}") + raw = await resp.read() - return stdout.decode().split(maxsplit=1)[0] + lines = parse_pkt_lines(raw) + + wanted = b"HEAD" if self.repo.branch is None else f"refs/heads/{self.repo.branch}".encode() + + for line in lines: + # Strip capabilities after NUL + if b"\x00" in line: + line = line.split(b"\x00", 1)[0] + + parts = line.strip().split() + if len(parts) == 2 and parts[1] == wanted: + return parts[0].decode() + + raise NurError(f"Ref not found: {wanted.decode()}") async def prefetch(self, ref: str) -> Tuple[str, Path]: cmd = ["nix-prefetch-git"] @@ -63,7 +90,7 @@ class GitPrefetcher: ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), 30) - except subprocess.TimeoutExpired: + except TimeoutError: proc.kill() raise NurError( f"Timeout expired while prefetching git repository {self. repo.url.geturl()}" @@ -77,8 +104,15 @@ class GitPrefetcher: metadata = json.loads(stdout) lines = stderr.decode("utf-8").split("\n") repo_path = re.search("path is (.+)", lines[-5]) - assert repo_path is not None + if not repo_path: + raise NurError( + f"Failed to prefetch git repository {self.repo.url.geturl()}" + ) path = Path(repo_path.group(1)) + if not path: + raise NurError( + f"Failed to prefetch git repository {self.repo.url.geturl()}" + ) sha256 = metadata["sha256"] return sha256, path @@ -99,21 +133,11 @@ class GitlabPrefetcher(GitPrefetcher): url = f"https://{hostname}/api/v4/projects/{escaped_path}/repository/archive.tar.gz?sha={ref}" return await nix_prefetch_zip(url) - -async def prefetch(repo: Repo) -> Tuple[Repo, LockedVersion, Optional[Path]]: - prefetcher: GitPrefetcher - if repo.type == RepoType.GITHUB: - prefetcher = GithubPrefetcher(repo) - elif repo.type == RepoType.GITLAB: - prefetcher = GitlabPrefetcher(repo) - else: - prefetcher = GitPrefetcher(repo) - - commit = await prefetcher.latest_commit() - locked_version = repo.locked_version - if locked_version is not None: - if locked_version.rev == commit: - return repo, locked_version, None - - sha256, path = await prefetcher.prefetch(commit) - return repo, LockedVersion(repo.url, commit, sha256, repo.submodules), path +def prefetcher_for(repo: Repo) -> GitPrefetcher: + match repo.type: + case RepoType.GITHUB: + return GithubPrefetcher(repo) + case RepoType.GITLAB: + return GithubPrefetcher(repo) + case _: + return GitPrefetcher(repo) diff --git a/ci/nur/update.py b/ci/nur/update.py index f30835b43..40e91a482 100644 --- a/ci/nur/update.py +++ b/ci/nur/update.py @@ -1,22 +1,27 @@ import logging +import asyncio from argparse import Namespace from concurrent.futures import ThreadPoolExecutor, as_completed from .eval import EvalError, eval_repo -from .manifest import Repo, load_manifest, update_lock_file +from .manifest import Repo, LockedVersion, load_manifest, update_lock_file from .path import LOCK_PATH, MANIFEST_PATH -from .prefetch import prefetch +from .prefetch import prefetcher_for logger = logging.getLogger(__name__) async def update(repo: Repo) -> Repo: - repo, locked_version, repo_path = await prefetch(repo) + prefetcher = prefetcher_for(repo) - if repo_path: - eval_repo(repo, repo_path) + latest_commit = await prefetcher.latest_commit() - repo.locked_version = locked_version + if repo.locked_version is not None and repo.locked_version.rev == latest_commit: + return repo + + sha256, repo_path = await prefetcher.prefetch(latest_commit) + eval_repo(repo, repo_path) + repo.locked_version = LockedVersion(repo.url, latest_commit, sha256, repo.submodules) return repo @@ -25,17 +30,42 @@ async def update_command(args: Namespace) -> None: manifest = load_manifest(MANIFEST_PATH, LOCK_PATH) - for repo in manifest.repos: - try: - await update(repo) - except EvalError as err: - if repo.locked_version is None: - logger.error( - f"repository {repo.name} failed to evaluate: {err}. This repo is not yet in our lock file!!!!" - ) - raise - logger.error(f"repository {repo.name} failed to evaluate: {err}") - except Exception: - logger.exception(f"Failed to update repository {repo.name}") + log_lock = asyncio.Lock() # serialize success/error output - update_lock_file(manifest.repos, LOCK_PATH) + results: List[Tuple[int, Optional[Repo], Optional[BaseException]]] = [] + + async def run_one(i: int, repo: Repo) -> None: + try: + updated = await update(repo) + + results.append((i, updated, None)) + + async with log_lock: + if updated.locked_version is not None: + logger.info(f"Updated repository {repo.name} -> {updated.locked_version.rev}") + else: + logger.info(f"Updated repository {repo.name}") + except BaseException as e: + results.append((i, None, e)) + + async with log_lock: + if isinstance(e, EvalError) and repo.locked_version is None: + logger.error( + f"repository {repo.name} failed to evaluate: {e}. " + "This repo is not yet in our lock file!!!!" + ) + elif isinstance(e, EvalError): + logger.error(f"repository {repo.name} failed to evaluate: {e}") + else: + logger.exception(f"Failed to update repository {repo.name}", exc_info=e) + + tasks = [asyncio.create_task(run_one(i, repo)) for i, repo in enumerate(manifest.repos)] + await asyncio.gather(*tasks) + + updated_repos: List[Repo] = list(manifest.repos) + + for i, updated, err in results: + if err is None and updated is not None: + updated_repos[i] = updated + + update_lock_file(updated_repos, LOCK_PATH)