Merge pull request #1075 from nix-community/better-updates
`nur update` improvements
This commit is contained in:
commit
110d3237a4
10 changed files with 178 additions and 97 deletions
2
bin/nur
2
bin/nur
|
|
@ -1,5 +1,5 @@
|
|||
#!/usr/bin/env nix-shell
|
||||
#!nix-shell -p python3 -p nix-prefetch-git -p nix -i python3
|
||||
#!nix-shell -p python3 -p python3Packages.aiohttp -p nix-prefetch-git -p nix -i python3
|
||||
import sys
|
||||
import os
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
import argparse
|
||||
import asyncio
|
||||
import logging
|
||||
import sys
|
||||
from typing import List
|
||||
|
|
@ -60,4 +61,4 @@ def main() -> None:
|
|||
args = parse_arguments(sys.argv)
|
||||
logging.basicConfig(level=LOG_LEVELS[args.log_level])
|
||||
|
||||
args.func(args)
|
||||
asyncio.run(args.func(args))
|
||||
|
|
|
|||
|
|
@ -174,7 +174,7 @@ def setup_combined() -> None:
|
|||
commit_files(vcs_files, "update code")
|
||||
|
||||
|
||||
def combine_command(args: Namespace) -> None:
|
||||
async def combine_command(args: Namespace) -> None:
|
||||
combined_path = Path(args.directory)
|
||||
|
||||
with chdir(combined_path):
|
||||
|
|
|
|||
|
|
@ -4,3 +4,7 @@ class NurError(Exception):
|
|||
|
||||
class EvalError(NurError):
|
||||
pass
|
||||
|
||||
|
||||
class RepositoryDeletedError(NurError):
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
from argparse import Namespace
|
||||
from pathlib import Path
|
||||
|
|
@ -13,7 +13,7 @@ from .path import EVALREPO_PATH, nixpkgs_path
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def eval_repo(repo: Repo, repo_path: Path) -> None:
|
||||
async def eval_repo(repo: Repo, repo_path: Path) -> None:
|
||||
with tempfile.TemporaryDirectory() as d:
|
||||
eval_path = Path(d).joinpath("default.nix")
|
||||
with open(eval_path, "w") as f:
|
||||
|
|
@ -49,18 +49,25 @@ import {EVALREPO_PATH} {{
|
|||
]
|
||||
# fmt: on
|
||||
|
||||
logger.info(f"Evaluate repository {repo.name}")
|
||||
env = dict(PATH=os.environ["PATH"], NIXPKGS_ALLOW_UNSUPPORTED_SYSTEM="1")
|
||||
proc = subprocess.Popen(cmd, env=env, stdout=subprocess.DEVNULL)
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
env=env,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
try:
|
||||
res = proc.wait(15)
|
||||
except subprocess.TimeoutExpired:
|
||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), 15)
|
||||
except TimeoutError:
|
||||
proc.kill()
|
||||
raise EvalError(f"evaluation for {repo.name} timed out of after 15 seconds")
|
||||
if res != 0:
|
||||
raise EvalError(f"{repo.name} does not evaluate:\n$ {' '.join(cmd)}")
|
||||
if proc.returncode != 0:
|
||||
raise EvalError(
|
||||
f"{repo.name} does not evaluate:\n$ {' '.join(cmd)}\n\n{stdout.decode()}"
|
||||
)
|
||||
|
||||
|
||||
def eval_command(args: Namespace) -> None:
|
||||
async def eval_command(args: Namespace) -> None:
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
repo_path = Path(args.directory)
|
||||
|
|
@ -74,5 +81,5 @@ def eval_command(args: Namespace) -> None:
|
|||
None,
|
||||
None,
|
||||
)
|
||||
eval_repo(repo, repo_path)
|
||||
await eval_repo(repo, repo_path)
|
||||
print("OK")
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ from argparse import Namespace
|
|||
from .path import ROOT
|
||||
|
||||
|
||||
def format_manifest_command(args: Namespace) -> None:
|
||||
async def format_manifest_command(args: Namespace) -> None:
|
||||
path = ROOT.joinpath("repos.json")
|
||||
manifest = json.load(open(path))
|
||||
for name, repo in manifest.get("repos", []).items():
|
||||
|
|
|
|||
|
|
@ -94,7 +94,7 @@ callPackage (nur.repo-sources."%s" + "/%s") {}
|
|||
return pkgs
|
||||
|
||||
|
||||
def index_command(args: Namespace) -> None:
|
||||
async def index_command(args: Namespace) -> None:
|
||||
directory = Path(args.directory)
|
||||
manifest_path = directory.joinpath("repos.json")
|
||||
with open(manifest_path) as f:
|
||||
|
|
|
|||
|
|
@ -1,47 +1,100 @@
|
|||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Optional, Tuple
|
||||
from typing import List, Tuple
|
||||
from urllib.parse import ParseResult
|
||||
|
||||
from .error import NurError
|
||||
from .manifest import LockedVersion, Repo, RepoType
|
||||
import aiohttp
|
||||
|
||||
from .error import NurError, RepositoryDeletedError
|
||||
from .manifest import Repo, RepoType
|
||||
|
||||
Url = ParseResult
|
||||
|
||||
|
||||
def nix_prefetch_zip(url: str) -> Tuple[str, Path]:
|
||||
data = subprocess.check_output(
|
||||
["nix-prefetch-url", "--name", "source", "--unpack", "--print-path", url]
|
||||
async def nix_prefetch_zip(url: str) -> Tuple[str, Path]:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*["nix-prefetch-url", "--name", "source", "--unpack", "--print-path", url],
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
sha256, path = data.decode().strip().split("\n")
|
||||
stdout, stderr = await proc.communicate()
|
||||
|
||||
if proc.returncode != 0:
|
||||
raise NurError(f"Failed to prefetch git repository {url}: {stderr.decode()}")
|
||||
|
||||
sha256, path = stdout.decode().strip().split("\n")
|
||||
return sha256, Path(path)
|
||||
|
||||
|
||||
def parse_pkt_lines(data: bytes) -> List[bytes]:
|
||||
i = 0
|
||||
lines = []
|
||||
while i < len(data):
|
||||
if i + 4 > len(data):
|
||||
break
|
||||
length = int(data[i : i + 4], 16)
|
||||
i += 4
|
||||
if length == 0:
|
||||
continue
|
||||
line = data[i : i + length - 4]
|
||||
i += length - 4
|
||||
lines.append(line)
|
||||
return lines
|
||||
|
||||
|
||||
class GitPrefetcher:
|
||||
def __init__(self, repo: Repo) -> None:
|
||||
self.repo = repo
|
||||
|
||||
def latest_commit(self) -> str:
|
||||
data = subprocess.check_output(
|
||||
["git", "ls-remote", self.repo.url.geturl(), self.repo.branch or "HEAD"],
|
||||
env={**os.environ, "GIT_ASKPASS": "", "GIT_TERMINAL_PROMPT": "0"},
|
||||
)
|
||||
return data.decode().split(maxsplit=1)[0]
|
||||
async def latest_commit(self) -> str:
|
||||
info_url = f"{self.repo.url.geturl()}/info/refs?service=git-upload-pack"
|
||||
|
||||
def prefetch(self, ref: str) -> Tuple[str, Path]:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(info_url) as resp:
|
||||
if resp.status == 401:
|
||||
raise RepositoryDeletedError("Repository deleted!")
|
||||
elif resp.status != 200:
|
||||
raise NurError(
|
||||
f"Failed to get refs for {self.repo.url.geturl()}: {(await resp.read()).decode()}"
|
||||
)
|
||||
raw = await resp.read()
|
||||
|
||||
lines = parse_pkt_lines(raw)
|
||||
|
||||
wanted = (
|
||||
b"HEAD"
|
||||
if self.repo.branch is None
|
||||
else f"refs/heads/{self.repo.branch}".encode()
|
||||
)
|
||||
|
||||
for line in lines:
|
||||
# Strip capabilities after NUL
|
||||
if b"\x00" in line:
|
||||
line = line.split(b"\x00", 1)[0]
|
||||
|
||||
parts = line.strip().split()
|
||||
if len(parts) == 2 and parts[1] == wanted:
|
||||
return parts[0].decode()
|
||||
|
||||
raise NurError(f"Ref not found: {wanted.decode()}")
|
||||
|
||||
async def prefetch(self, ref: str) -> Tuple[str, Path]:
|
||||
cmd = ["nix-prefetch-git"]
|
||||
if self.repo.submodules:
|
||||
cmd += ["--fetch-submodules"]
|
||||
if self.repo.branch:
|
||||
cmd += ["--rev", f"refs/heads/{self.repo.branch}"]
|
||||
cmd += [self.repo.url.geturl()]
|
||||
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
try:
|
||||
stdout, stderr = proc.communicate(timeout=30)
|
||||
except subprocess.TimeoutExpired:
|
||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), 30)
|
||||
except TimeoutError:
|
||||
proc.kill()
|
||||
raise NurError(
|
||||
f"Timeout expired while prefetching git repository {self. repo.url.geturl()}"
|
||||
|
|
@ -55,19 +108,26 @@ class GitPrefetcher:
|
|||
metadata = json.loads(stdout)
|
||||
lines = stderr.decode("utf-8").split("\n")
|
||||
repo_path = re.search("path is (.+)", lines[-5])
|
||||
assert repo_path is not None
|
||||
if not repo_path:
|
||||
raise NurError(
|
||||
f"Failed to prefetch git repository {self.repo.url.geturl()}"
|
||||
)
|
||||
path = Path(repo_path.group(1))
|
||||
if not path:
|
||||
raise NurError(
|
||||
f"Failed to prefetch git repository {self.repo.url.geturl()}"
|
||||
)
|
||||
sha256 = metadata["sha256"]
|
||||
return sha256, path
|
||||
|
||||
|
||||
class GithubPrefetcher(GitPrefetcher):
|
||||
def prefetch(self, ref: str) -> Tuple[str, Path]:
|
||||
return nix_prefetch_zip(f"{self.repo.url.geturl()}/archive/{ref}.tar.gz")
|
||||
async def prefetch(self, ref: str) -> Tuple[str, Path]:
|
||||
return await nix_prefetch_zip(f"{self.repo.url.geturl()}/archive/{ref}.tar.gz")
|
||||
|
||||
|
||||
class GitlabPrefetcher(GitPrefetcher):
|
||||
def prefetch(self, ref: str) -> Tuple[str, Path]:
|
||||
async def prefetch(self, ref: str) -> Tuple[str, Path]:
|
||||
hostname = self.repo.url.hostname
|
||||
assert (
|
||||
hostname is not None
|
||||
|
|
@ -75,23 +135,14 @@ class GitlabPrefetcher(GitPrefetcher):
|
|||
path = Path(self.repo.url.path)
|
||||
escaped_path = "%2F".join(path.parts[1:])
|
||||
url = f"https://{hostname}/api/v4/projects/{escaped_path}/repository/archive.tar.gz?sha={ref}"
|
||||
return nix_prefetch_zip(url)
|
||||
return await nix_prefetch_zip(url)
|
||||
|
||||
|
||||
def prefetch(repo: Repo) -> Tuple[Repo, LockedVersion, Optional[Path]]:
|
||||
prefetcher: GitPrefetcher
|
||||
if repo.type == RepoType.GITHUB:
|
||||
prefetcher = GithubPrefetcher(repo)
|
||||
elif repo.type == RepoType.GITLAB:
|
||||
prefetcher = GitlabPrefetcher(repo)
|
||||
else:
|
||||
prefetcher = GitPrefetcher(repo)
|
||||
|
||||
commit = prefetcher.latest_commit()
|
||||
locked_version = repo.locked_version
|
||||
if locked_version is not None:
|
||||
if locked_version.rev == commit:
|
||||
return repo, locked_version, None
|
||||
|
||||
sha256, path = prefetcher.prefetch(commit)
|
||||
return repo, LockedVersion(repo.url, commit, sha256, repo.submodules), path
|
||||
def prefetcher_for(repo: Repo) -> GitPrefetcher:
|
||||
match repo.type:
|
||||
case RepoType.GITHUB:
|
||||
return GithubPrefetcher(repo)
|
||||
case RepoType.GITLAB:
|
||||
return GithubPrefetcher(repo)
|
||||
case _:
|
||||
return GitPrefetcher(repo)
|
||||
|
|
|
|||
|
|
@ -1,61 +1,79 @@
|
|||
import asyncio
|
||||
import logging
|
||||
from argparse import Namespace
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
from .eval import EvalError, eval_repo
|
||||
from .manifest import Repo, load_manifest, update_lock_file
|
||||
from .manifest import LockedVersion, Repo, load_manifest, update_lock_file
|
||||
from .path import LOCK_PATH, MANIFEST_PATH
|
||||
from .prefetch import prefetch
|
||||
from .prefetch import prefetcher_for
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def update(repo: Repo) -> Repo:
|
||||
repo, locked_version, repo_path = prefetch(repo)
|
||||
async def update(repo: Repo) -> Repo:
|
||||
prefetcher = prefetcher_for(repo)
|
||||
|
||||
if repo_path:
|
||||
eval_repo(repo, repo_path)
|
||||
latest_commit = await prefetcher.latest_commit()
|
||||
|
||||
repo.locked_version = locked_version
|
||||
if repo.locked_version is not None and repo.locked_version.rev == latest_commit:
|
||||
return repo
|
||||
|
||||
sha256, repo_path = await prefetcher.prefetch(latest_commit)
|
||||
await eval_repo(repo, repo_path)
|
||||
repo.locked_version = LockedVersion(
|
||||
repo.url, latest_commit, sha256, repo.submodules
|
||||
)
|
||||
return repo
|
||||
|
||||
|
||||
def update_command(args: Namespace) -> None:
|
||||
async def update_command(args: Namespace) -> None:
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
manifest = load_manifest(MANIFEST_PATH, LOCK_PATH)
|
||||
|
||||
if getattr(args, "debug", False):
|
||||
for repo in manifest.repos:
|
||||
try:
|
||||
update(repo)
|
||||
except EvalError as err:
|
||||
if repo.locked_version is None:
|
||||
logger.error(
|
||||
f"repository {repo.name} failed to evaluate: {err}. This repo is not yet in our lock file!!!!"
|
||||
log_lock = asyncio.Lock() # serialize success/error output
|
||||
|
||||
results: List[Tuple[int, Optional[Repo], Optional[BaseException]]] = []
|
||||
|
||||
async def run_one(i: int, repo: Repo) -> None:
|
||||
try:
|
||||
updated = await update(repo)
|
||||
|
||||
results.append((i, updated, None))
|
||||
|
||||
async with log_lock:
|
||||
if updated.locked_version is not None:
|
||||
logger.info(
|
||||
f"Updated repository {repo.name} -> {updated.locked_version.rev}"
|
||||
)
|
||||
raise
|
||||
logger.error(f"repository {repo.name} failed to evaluate: {err}")
|
||||
except Exception:
|
||||
logger.exception(f"Failed to update repository {repo.name}")
|
||||
else:
|
||||
with ThreadPoolExecutor() as executor:
|
||||
future_to_repo = {
|
||||
executor.submit(update, repo): repo for repo in manifest.repos
|
||||
}
|
||||
else:
|
||||
logger.info(f"Updated repository {repo.name}")
|
||||
except BaseException as e:
|
||||
results.append((i, None, e))
|
||||
|
||||
for future in as_completed(future_to_repo):
|
||||
repo = future_to_repo[future]
|
||||
try:
|
||||
future.result()
|
||||
except EvalError as err:
|
||||
if repo.locked_version is None:
|
||||
logger.error(
|
||||
f"repository {repo.name} failed to evaluate: {err}. This repo is not yet in our lock file!!!!"
|
||||
)
|
||||
raise
|
||||
logger.error(f"repository {repo.name} failed to evaluate: {err}")
|
||||
except Exception:
|
||||
logger.exception(f"Failed to update repository {repo.name}")
|
||||
async with log_lock:
|
||||
if isinstance(e, EvalError) and repo.locked_version is None:
|
||||
logger.error(
|
||||
f"repository {repo.name} failed to evaluate: {e}. "
|
||||
"This repo is not yet in our lock file!!!!"
|
||||
)
|
||||
elif isinstance(e, EvalError):
|
||||
logger.error(f"repository {repo.name} failed to evaluate: {e}")
|
||||
else:
|
||||
logger.exception(
|
||||
f"Failed to update repository {repo.name}", exc_info=e
|
||||
)
|
||||
|
||||
update_lock_file(manifest.repos, LOCK_PATH)
|
||||
tasks = [
|
||||
asyncio.create_task(run_one(i, repo)) for i, repo in enumerate(manifest.repos)
|
||||
]
|
||||
await asyncio.gather(*tasks)
|
||||
|
||||
updated_repos: List[Repo] = list(manifest.repos)
|
||||
|
||||
for i, updated, err in results:
|
||||
if err is None and updated is not None:
|
||||
updated_repos[i] = updated
|
||||
|
||||
update_lock_file(updated_repos, LOCK_PATH)
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
#!/usr/bin/env nix-shell
|
||||
#!nix-shell -p bash -i bash -p mypy -p black -p ruff -p nix
|
||||
#!nix-shell -p bash -i bash -p mypy -p black -p ruff -p nix -p python3Packages.aiohttp
|
||||
|
||||
set -eux -o pipefail # Exit with nonzero exit code if anything fails
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue