Actual true parallel non-blocking updating with asyncio

This commit is contained in:
Gavin John 2026-01-29 20:53:33 -08:00
parent 3692fecb54
commit 8d3292af3a
5 changed files with 125 additions and 61 deletions

View file

@ -1,5 +1,5 @@
#!/usr/bin/env nix-shell
#!nix-shell -p python3 -p nix-prefetch-git -p nix -i python3
#!nix-shell -p python3 -p python3Packages.aiohttp -p nix-prefetch-git -p nix -i python3
import sys
import os

View file

@ -4,3 +4,7 @@ class NurError(Exception):
class EvalError(NurError):
pass
class RepositoryDeletedError(NurError):
pass

View file

@ -2,6 +2,7 @@ import logging
import os
import subprocess
import tempfile
import asyncio
from argparse import Namespace
from pathlib import Path
from urllib.parse import urlparse
@ -13,7 +14,7 @@ from .path import EVALREPO_PATH, nixpkgs_path
logger = logging.getLogger(__name__)
def eval_repo(repo: Repo, repo_path: Path) -> None:
async def eval_repo(repo: Repo, repo_path: Path) -> None:
with tempfile.TemporaryDirectory() as d:
eval_path = Path(d).joinpath("default.nix")
with open(eval_path, "w") as f:
@ -49,15 +50,20 @@ import {EVALREPO_PATH} {{
]
# fmt: on
logger.info(f"Evaluate repository {repo.name}")
env = dict(PATH=os.environ["PATH"], NIXPKGS_ALLOW_UNSUPPORTED_SYSTEM="1")
proc = subprocess.Popen(cmd, env=env, stdout=subprocess.DEVNULL)
proc = await asyncio.create_subprocess_exec(
*cmd,
env=env,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
res = proc.wait(15)
except subprocess.TimeoutExpired:
stdout, stderr = await asyncio.wait_for(proc.communicate(), 15)
except TimeoutError:
proc.kill()
raise EvalError(f"evaluation for {repo.name} timed out of after 15 seconds")
if res != 0:
raise EvalError(f"{repo.name} does not evaluate:\n$ {' '.join(cmd)}")
if proc.returncode != 0:
raise EvalError(f"{repo.name} does not evaluate:\n$ {' '.join(cmd)}\n\n{stdout.decode()}")
async def eval_command(args: Namespace) -> None:

View file

@ -3,12 +3,13 @@ import os
import re
import subprocess
import asyncio
import aiohttp
from pathlib import Path
from typing import Optional, Tuple
from urllib.parse import urlparse, ParseResult
from .error import NurError
from .manifest import LockedVersion, Repo, RepoType
from .error import NurError, RepositoryDeletedError
from .manifest import Repo, RepoType
Url = ParseResult
@ -29,25 +30,51 @@ async def nix_prefetch_zip(url: str) -> Tuple[str, Path]:
return sha256, Path(path)
def parse_pkt_lines(data: bytes):
i = 0
lines = []
while i < len(data):
if i + 4 > len(data):
break
length = int(data[i:i+4], 16)
i += 4
if length == 0:
continue
line = data[i:i+length-4]
i += length - 4
lines.append(line)
return lines
class GitPrefetcher:
def __init__(self, repo: Repo) -> None:
self.repo = repo
async def latest_commit(self) -> str:
proc = await asyncio.create_subprocess_exec(
*["git", "ls-remote", self.repo.url.geturl(), self.repo.branch or "HEAD"],
env={**os.environ, "GIT_ASKPASS": "", "GIT_TERMINAL_PROMPT": "0"},
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
info_url = f"{self.repo.url.geturl()}/info/refs?service=git-upload-pack"
if proc.returncode != 0:
raise NurError(
f"Failed to prefetch git repository {self.repo.url.geturl()}: {stderr.decode()}"
)
async with aiohttp.ClientSession() as session:
async with session.get(info_url) as resp:
if resp.status == 401:
raise RepositoryDeletedError(f"Repository deleted!")
elif resp.status != 200:
raise NurError(f"Failed to get refs for {self.repo.url.geturl()}: {(await resp.read()).decode()}")
raw = await resp.read()
return stdout.decode().split(maxsplit=1)[0]
lines = parse_pkt_lines(raw)
wanted = b"HEAD" if self.repo.branch is None else f"refs/heads/{self.repo.branch}".encode()
for line in lines:
# Strip capabilities after NUL
if b"\x00" in line:
line = line.split(b"\x00", 1)[0]
parts = line.strip().split()
if len(parts) == 2 and parts[1] == wanted:
return parts[0].decode()
raise NurError(f"Ref not found: {wanted.decode()}")
async def prefetch(self, ref: str) -> Tuple[str, Path]:
cmd = ["nix-prefetch-git"]
@ -63,7 +90,7 @@ class GitPrefetcher:
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), 30)
except subprocess.TimeoutExpired:
except TimeoutError:
proc.kill()
raise NurError(
f"Timeout expired while prefetching git repository {self. repo.url.geturl()}"
@ -77,8 +104,15 @@ class GitPrefetcher:
metadata = json.loads(stdout)
lines = stderr.decode("utf-8").split("\n")
repo_path = re.search("path is (.+)", lines[-5])
assert repo_path is not None
if not repo_path:
raise NurError(
f"Failed to prefetch git repository {self.repo.url.geturl()}"
)
path = Path(repo_path.group(1))
if not path:
raise NurError(
f"Failed to prefetch git repository {self.repo.url.geturl()}"
)
sha256 = metadata["sha256"]
return sha256, path
@ -99,21 +133,11 @@ class GitlabPrefetcher(GitPrefetcher):
url = f"https://{hostname}/api/v4/projects/{escaped_path}/repository/archive.tar.gz?sha={ref}"
return await nix_prefetch_zip(url)
async def prefetch(repo: Repo) -> Tuple[Repo, LockedVersion, Optional[Path]]:
prefetcher: GitPrefetcher
if repo.type == RepoType.GITHUB:
prefetcher = GithubPrefetcher(repo)
elif repo.type == RepoType.GITLAB:
prefetcher = GitlabPrefetcher(repo)
else:
prefetcher = GitPrefetcher(repo)
commit = await prefetcher.latest_commit()
locked_version = repo.locked_version
if locked_version is not None:
if locked_version.rev == commit:
return repo, locked_version, None
sha256, path = await prefetcher.prefetch(commit)
return repo, LockedVersion(repo.url, commit, sha256, repo.submodules), path
def prefetcher_for(repo: Repo) -> GitPrefetcher:
match repo.type:
case RepoType.GITHUB:
return GithubPrefetcher(repo)
case RepoType.GITLAB:
return GithubPrefetcher(repo)
case _:
return GitPrefetcher(repo)

View file

@ -1,22 +1,27 @@
import logging
import asyncio
from argparse import Namespace
from concurrent.futures import ThreadPoolExecutor, as_completed
from .eval import EvalError, eval_repo
from .manifest import Repo, load_manifest, update_lock_file
from .manifest import Repo, LockedVersion, load_manifest, update_lock_file
from .path import LOCK_PATH, MANIFEST_PATH
from .prefetch import prefetch
from .prefetch import prefetcher_for
logger = logging.getLogger(__name__)
async def update(repo: Repo) -> Repo:
repo, locked_version, repo_path = await prefetch(repo)
prefetcher = prefetcher_for(repo)
if repo_path:
eval_repo(repo, repo_path)
latest_commit = await prefetcher.latest_commit()
repo.locked_version = locked_version
if repo.locked_version is not None and repo.locked_version.rev == latest_commit:
return repo
sha256, repo_path = await prefetcher.prefetch(latest_commit)
eval_repo(repo, repo_path)
repo.locked_version = LockedVersion(repo.url, latest_commit, sha256, repo.submodules)
return repo
@ -25,17 +30,42 @@ async def update_command(args: Namespace) -> None:
manifest = load_manifest(MANIFEST_PATH, LOCK_PATH)
for repo in manifest.repos:
try:
await update(repo)
except EvalError as err:
if repo.locked_version is None:
logger.error(
f"repository {repo.name} failed to evaluate: {err}. This repo is not yet in our lock file!!!!"
)
raise
logger.error(f"repository {repo.name} failed to evaluate: {err}")
except Exception:
logger.exception(f"Failed to update repository {repo.name}")
log_lock = asyncio.Lock() # serialize success/error output
update_lock_file(manifest.repos, LOCK_PATH)
results: List[Tuple[int, Optional[Repo], Optional[BaseException]]] = []
async def run_one(i: int, repo: Repo) -> None:
try:
updated = await update(repo)
results.append((i, updated, None))
async with log_lock:
if updated.locked_version is not None:
logger.info(f"Updated repository {repo.name} -> {updated.locked_version.rev}")
else:
logger.info(f"Updated repository {repo.name}")
except BaseException as e:
results.append((i, None, e))
async with log_lock:
if isinstance(e, EvalError) and repo.locked_version is None:
logger.error(
f"repository {repo.name} failed to evaluate: {e}. "
"This repo is not yet in our lock file!!!!"
)
elif isinstance(e, EvalError):
logger.error(f"repository {repo.name} failed to evaluate: {e}")
else:
logger.exception(f"Failed to update repository {repo.name}", exc_info=e)
tasks = [asyncio.create_task(run_one(i, repo)) for i, repo in enumerate(manifest.repos)]
await asyncio.gather(*tasks)
updated_repos: List[Repo] = list(manifest.repos)
for i, updated, err in results:
if err is None and updated is not None:
updated_repos[i] = updated
update_lock_file(updated_repos, LOCK_PATH)