diff --git a/bin/nur b/bin/nur index 998edbeb0..62b8b462d 100755 --- a/bin/nur +++ b/bin/nur @@ -1,5 +1,5 @@ #!/usr/bin/env nix-shell -#!nix-shell -p python3 -p nix-prefetch-git -p nix -i python3 +#!nix-shell -p python3 -p python3Packages.aiohttp -p nix-prefetch-git -p nix -i python3 import sys import os diff --git a/ci/nur/__init__.py b/ci/nur/__init__.py index b9ba402ae..f298b4fff 100644 --- a/ci/nur/__init__.py +++ b/ci/nur/__init__.py @@ -1,4 +1,5 @@ import argparse +import asyncio import logging import sys from typing import List @@ -60,4 +61,4 @@ def main() -> None: args = parse_arguments(sys.argv) logging.basicConfig(level=LOG_LEVELS[args.log_level]) - args.func(args) + asyncio.run(args.func(args)) diff --git a/ci/nur/combine.py b/ci/nur/combine.py index 9ca754f93..07d77b479 100644 --- a/ci/nur/combine.py +++ b/ci/nur/combine.py @@ -174,7 +174,7 @@ def setup_combined() -> None: commit_files(vcs_files, "update code") -def combine_command(args: Namespace) -> None: +async def combine_command(args: Namespace) -> None: combined_path = Path(args.directory) with chdir(combined_path): diff --git a/ci/nur/error.py b/ci/nur/error.py index fa2665cce..8ac6b0d59 100644 --- a/ci/nur/error.py +++ b/ci/nur/error.py @@ -4,3 +4,7 @@ class NurError(Exception): class EvalError(NurError): pass + + +class RepositoryDeletedError(NurError): + pass diff --git a/ci/nur/eval.py b/ci/nur/eval.py index b74a41386..789b08e0c 100644 --- a/ci/nur/eval.py +++ b/ci/nur/eval.py @@ -1,6 +1,6 @@ +import asyncio import logging import os -import subprocess import tempfile from argparse import Namespace from pathlib import Path @@ -13,7 +13,7 @@ from .path import EVALREPO_PATH, nixpkgs_path logger = logging.getLogger(__name__) -def eval_repo(repo: Repo, repo_path: Path) -> None: +async def eval_repo(repo: Repo, repo_path: Path) -> None: with tempfile.TemporaryDirectory() as d: eval_path = Path(d).joinpath("default.nix") with open(eval_path, "w") as f: @@ -49,18 +49,25 @@ import {EVALREPO_PATH} {{ ] # fmt: on - logger.info(f"Evaluate repository {repo.name}") env = dict(PATH=os.environ["PATH"], NIXPKGS_ALLOW_UNSUPPORTED_SYSTEM="1") - proc = subprocess.Popen(cmd, env=env, stdout=subprocess.DEVNULL) + proc = await asyncio.create_subprocess_exec( + *cmd, + env=env, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) try: - res = proc.wait(15) - except subprocess.TimeoutExpired: + stdout, stderr = await asyncio.wait_for(proc.communicate(), 15) + except TimeoutError: + proc.kill() raise EvalError(f"evaluation for {repo.name} timed out of after 15 seconds") - if res != 0: - raise EvalError(f"{repo.name} does not evaluate:\n$ {' '.join(cmd)}") + if proc.returncode != 0: + raise EvalError( + f"{repo.name} does not evaluate:\n$ {' '.join(cmd)}\n\n{stdout.decode()}" + ) -def eval_command(args: Namespace) -> None: +async def eval_command(args: Namespace) -> None: logging.basicConfig(level=logging.INFO) repo_path = Path(args.directory) @@ -74,5 +81,5 @@ def eval_command(args: Namespace) -> None: None, None, ) - eval_repo(repo, repo_path) + await eval_repo(repo, repo_path) print("OK") diff --git a/ci/nur/format_manifest.py b/ci/nur/format_manifest.py index bc397c25f..e35c1d76e 100644 --- a/ci/nur/format_manifest.py +++ b/ci/nur/format_manifest.py @@ -6,7 +6,7 @@ from argparse import Namespace from .path import ROOT -def format_manifest_command(args: Namespace) -> None: +async def format_manifest_command(args: Namespace) -> None: path = ROOT.joinpath("repos.json") manifest = json.load(open(path)) for name, repo in manifest.get("repos", []).items(): diff --git a/ci/nur/index.py b/ci/nur/index.py index 496229993..434725b8a 100644 --- a/ci/nur/index.py +++ b/ci/nur/index.py @@ -94,7 +94,7 @@ callPackage (nur.repo-sources."%s" + "/%s") {} return pkgs -def index_command(args: Namespace) -> None: +async def index_command(args: Namespace) -> None: directory = Path(args.directory) manifest_path = directory.joinpath("repos.json") with open(manifest_path) as f: diff --git a/ci/nur/prefetch.py b/ci/nur/prefetch.py index 31313d791..7e37eb16b 100644 --- a/ci/nur/prefetch.py +++ b/ci/nur/prefetch.py @@ -1,47 +1,100 @@ +import asyncio import json -import os import re -import subprocess from pathlib import Path -from typing import Optional, Tuple +from typing import List, Tuple from urllib.parse import ParseResult -from .error import NurError -from .manifest import LockedVersion, Repo, RepoType +import aiohttp + +from .error import NurError, RepositoryDeletedError +from .manifest import Repo, RepoType Url = ParseResult -def nix_prefetch_zip(url: str) -> Tuple[str, Path]: - data = subprocess.check_output( - ["nix-prefetch-url", "--name", "source", "--unpack", "--print-path", url] +async def nix_prefetch_zip(url: str) -> Tuple[str, Path]: + proc = await asyncio.create_subprocess_exec( + *["nix-prefetch-url", "--name", "source", "--unpack", "--print-path", url], + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, ) - sha256, path = data.decode().strip().split("\n") + stdout, stderr = await proc.communicate() + + if proc.returncode != 0: + raise NurError(f"Failed to prefetch git repository {url}: {stderr.decode()}") + + sha256, path = stdout.decode().strip().split("\n") return sha256, Path(path) +def parse_pkt_lines(data: bytes) -> List[bytes]: + i = 0 + lines = [] + while i < len(data): + if i + 4 > len(data): + break + length = int(data[i : i + 4], 16) + i += 4 + if length == 0: + continue + line = data[i : i + length - 4] + i += length - 4 + lines.append(line) + return lines + + class GitPrefetcher: def __init__(self, repo: Repo) -> None: self.repo = repo - def latest_commit(self) -> str: - data = subprocess.check_output( - ["git", "ls-remote", self.repo.url.geturl(), self.repo.branch or "HEAD"], - env={**os.environ, "GIT_ASKPASS": "", "GIT_TERMINAL_PROMPT": "0"}, - ) - return data.decode().split(maxsplit=1)[0] + async def latest_commit(self) -> str: + info_url = f"{self.repo.url.geturl()}/info/refs?service=git-upload-pack" - def prefetch(self, ref: str) -> Tuple[str, Path]: + async with aiohttp.ClientSession() as session: + async with session.get(info_url) as resp: + if resp.status == 401: + raise RepositoryDeletedError("Repository deleted!") + elif resp.status != 200: + raise NurError( + f"Failed to get refs for {self.repo.url.geturl()}: {(await resp.read()).decode()}" + ) + raw = await resp.read() + + lines = parse_pkt_lines(raw) + + wanted = ( + b"HEAD" + if self.repo.branch is None + else f"refs/heads/{self.repo.branch}".encode() + ) + + for line in lines: + # Strip capabilities after NUL + if b"\x00" in line: + line = line.split(b"\x00", 1)[0] + + parts = line.strip().split() + if len(parts) == 2 and parts[1] == wanted: + return parts[0].decode() + + raise NurError(f"Ref not found: {wanted.decode()}") + + async def prefetch(self, ref: str) -> Tuple[str, Path]: cmd = ["nix-prefetch-git"] if self.repo.submodules: cmd += ["--fetch-submodules"] if self.repo.branch: cmd += ["--rev", f"refs/heads/{self.repo.branch}"] cmd += [self.repo.url.geturl()] - proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) try: - stdout, stderr = proc.communicate(timeout=30) - except subprocess.TimeoutExpired: + stdout, stderr = await asyncio.wait_for(proc.communicate(), 30) + except TimeoutError: proc.kill() raise NurError( f"Timeout expired while prefetching git repository {self. repo.url.geturl()}" @@ -55,19 +108,26 @@ class GitPrefetcher: metadata = json.loads(stdout) lines = stderr.decode("utf-8").split("\n") repo_path = re.search("path is (.+)", lines[-5]) - assert repo_path is not None + if not repo_path: + raise NurError( + f"Failed to prefetch git repository {self.repo.url.geturl()}" + ) path = Path(repo_path.group(1)) + if not path: + raise NurError( + f"Failed to prefetch git repository {self.repo.url.geturl()}" + ) sha256 = metadata["sha256"] return sha256, path class GithubPrefetcher(GitPrefetcher): - def prefetch(self, ref: str) -> Tuple[str, Path]: - return nix_prefetch_zip(f"{self.repo.url.geturl()}/archive/{ref}.tar.gz") + async def prefetch(self, ref: str) -> Tuple[str, Path]: + return await nix_prefetch_zip(f"{self.repo.url.geturl()}/archive/{ref}.tar.gz") class GitlabPrefetcher(GitPrefetcher): - def prefetch(self, ref: str) -> Tuple[str, Path]: + async def prefetch(self, ref: str) -> Tuple[str, Path]: hostname = self.repo.url.hostname assert ( hostname is not None @@ -75,23 +135,14 @@ class GitlabPrefetcher(GitPrefetcher): path = Path(self.repo.url.path) escaped_path = "%2F".join(path.parts[1:]) url = f"https://{hostname}/api/v4/projects/{escaped_path}/repository/archive.tar.gz?sha={ref}" - return nix_prefetch_zip(url) + return await nix_prefetch_zip(url) -def prefetch(repo: Repo) -> Tuple[Repo, LockedVersion, Optional[Path]]: - prefetcher: GitPrefetcher - if repo.type == RepoType.GITHUB: - prefetcher = GithubPrefetcher(repo) - elif repo.type == RepoType.GITLAB: - prefetcher = GitlabPrefetcher(repo) - else: - prefetcher = GitPrefetcher(repo) - - commit = prefetcher.latest_commit() - locked_version = repo.locked_version - if locked_version is not None: - if locked_version.rev == commit: - return repo, locked_version, None - - sha256, path = prefetcher.prefetch(commit) - return repo, LockedVersion(repo.url, commit, sha256, repo.submodules), path +def prefetcher_for(repo: Repo) -> GitPrefetcher: + match repo.type: + case RepoType.GITHUB: + return GithubPrefetcher(repo) + case RepoType.GITLAB: + return GithubPrefetcher(repo) + case _: + return GitPrefetcher(repo) diff --git a/ci/nur/update.py b/ci/nur/update.py index 4834b3aec..6de323e0d 100644 --- a/ci/nur/update.py +++ b/ci/nur/update.py @@ -1,61 +1,79 @@ +import asyncio import logging from argparse import Namespace -from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import List, Optional, Tuple from .eval import EvalError, eval_repo -from .manifest import Repo, load_manifest, update_lock_file +from .manifest import LockedVersion, Repo, load_manifest, update_lock_file from .path import LOCK_PATH, MANIFEST_PATH -from .prefetch import prefetch +from .prefetch import prefetcher_for logger = logging.getLogger(__name__) -def update(repo: Repo) -> Repo: - repo, locked_version, repo_path = prefetch(repo) +async def update(repo: Repo) -> Repo: + prefetcher = prefetcher_for(repo) - if repo_path: - eval_repo(repo, repo_path) + latest_commit = await prefetcher.latest_commit() - repo.locked_version = locked_version + if repo.locked_version is not None and repo.locked_version.rev == latest_commit: + return repo + + sha256, repo_path = await prefetcher.prefetch(latest_commit) + await eval_repo(repo, repo_path) + repo.locked_version = LockedVersion( + repo.url, latest_commit, sha256, repo.submodules + ) return repo -def update_command(args: Namespace) -> None: +async def update_command(args: Namespace) -> None: logging.basicConfig(level=logging.INFO) manifest = load_manifest(MANIFEST_PATH, LOCK_PATH) - if getattr(args, "debug", False): - for repo in manifest.repos: - try: - update(repo) - except EvalError as err: - if repo.locked_version is None: - logger.error( - f"repository {repo.name} failed to evaluate: {err}. This repo is not yet in our lock file!!!!" + log_lock = asyncio.Lock() # serialize success/error output + + results: List[Tuple[int, Optional[Repo], Optional[BaseException]]] = [] + + async def run_one(i: int, repo: Repo) -> None: + try: + updated = await update(repo) + + results.append((i, updated, None)) + + async with log_lock: + if updated.locked_version is not None: + logger.info( + f"Updated repository {repo.name} -> {updated.locked_version.rev}" ) - raise - logger.error(f"repository {repo.name} failed to evaluate: {err}") - except Exception: - logger.exception(f"Failed to update repository {repo.name}") - else: - with ThreadPoolExecutor() as executor: - future_to_repo = { - executor.submit(update, repo): repo for repo in manifest.repos - } + else: + logger.info(f"Updated repository {repo.name}") + except BaseException as e: + results.append((i, None, e)) - for future in as_completed(future_to_repo): - repo = future_to_repo[future] - try: - future.result() - except EvalError as err: - if repo.locked_version is None: - logger.error( - f"repository {repo.name} failed to evaluate: {err}. This repo is not yet in our lock file!!!!" - ) - raise - logger.error(f"repository {repo.name} failed to evaluate: {err}") - except Exception: - logger.exception(f"Failed to update repository {repo.name}") + async with log_lock: + if isinstance(e, EvalError) and repo.locked_version is None: + logger.error( + f"repository {repo.name} failed to evaluate: {e}. " + "This repo is not yet in our lock file!!!!" + ) + elif isinstance(e, EvalError): + logger.error(f"repository {repo.name} failed to evaluate: {e}") + else: + logger.exception( + f"Failed to update repository {repo.name}", exc_info=e + ) - update_lock_file(manifest.repos, LOCK_PATH) + tasks = [ + asyncio.create_task(run_one(i, repo)) for i, repo in enumerate(manifest.repos) + ] + await asyncio.gather(*tasks) + + updated_repos: List[Repo] = list(manifest.repos) + + for i, updated, err in results: + if err is None and updated is not None: + updated_repos[i] = updated + + update_lock_file(updated_repos, LOCK_PATH) diff --git a/ci/test.sh b/ci/test.sh index d419b0730..f01ff6597 100755 --- a/ci/test.sh +++ b/ci/test.sh @@ -1,5 +1,5 @@ #!/usr/bin/env nix-shell -#!nix-shell -p bash -i bash -p mypy -p black -p ruff -p nix +#!nix-shell -p bash -i bash -p mypy -p black -p ruff -p nix -p python3Packages.aiohttp set -eux -o pipefail # Exit with nonzero exit code if anything fails