mirror of
https://github.com/theniceboy/.config.git
synced 2025-12-26 14:44:57 +08:00
947 lines
32 KiB
Python
Executable file
947 lines
32 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import json
|
|
import os
|
|
import shutil
|
|
import stat
|
|
import subprocess
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
import sys
|
|
from queue import Queue
|
|
|
|
class Colors:
|
|
RED = '\033[0;31m'
|
|
GREEN = '\033[0;32m'
|
|
YELLOW = '\033[1;33m'
|
|
BLUE = '\033[0;34m'
|
|
RESET = '\033[0m'
|
|
|
|
class ParallelTaskRunner:
|
|
def __init__(self):
|
|
self.tasks = []
|
|
self.results = Queue()
|
|
self.task_positions = {}
|
|
self.lock = threading.Lock()
|
|
|
|
def add_task(self, name, command=None, action=None):
|
|
"""Add a task to the execution queue."""
|
|
if command is None and action is None:
|
|
raise ValueError("Task must define a command or action")
|
|
self.tasks.append({'name': name, 'command': command, 'action': action})
|
|
|
|
def _update_task_line(self, task_id, text):
|
|
"""Update a specific task's display line with thread-safe cursor movement."""
|
|
with self.lock:
|
|
if task_id in self.task_positions:
|
|
lines_up = len(self.task_positions) - self.task_positions[task_id]
|
|
print(f"\033[{lines_up}A\r{text}\033[K\033[{lines_up}B", end="", flush=True)
|
|
else:
|
|
self.task_positions[task_id] = len(self.task_positions)
|
|
print(text, flush=True)
|
|
|
|
def _execute_task(self, task):
|
|
"""Execute a single task with real-time status updates."""
|
|
name = task['name']
|
|
command = task.get('command')
|
|
action = task.get('action')
|
|
task_id = threading.get_ident()
|
|
|
|
self._update_task_line(task_id, f"{Colors.YELLOW}[STARTING]{Colors.RESET} {name}")
|
|
|
|
if command is not None:
|
|
try:
|
|
process = subprocess.Popen(
|
|
command,
|
|
shell=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True
|
|
)
|
|
|
|
# Animated progress indicator
|
|
spinner_chars = "|/-\\"
|
|
spinner_idx = 0
|
|
|
|
while process.poll() is None:
|
|
spinner = f"{Colors.BLUE}[RUNNING]{Colors.RESET} {name} [{spinner_chars[spinner_idx]}]"
|
|
self._update_task_line(task_id, spinner)
|
|
spinner_idx = (spinner_idx + 1) % len(spinner_chars)
|
|
time.sleep(0.1)
|
|
|
|
stdout, _ = process.communicate()
|
|
|
|
if process.returncode == 0:
|
|
self._update_task_line(task_id, f"{Colors.GREEN}[COMPLETED]{Colors.RESET} {name}")
|
|
self.results.put({'name': name, 'success': True, 'output': stdout})
|
|
else:
|
|
self._update_task_line(task_id, f"{Colors.RED}[FAILED]{Colors.RESET} {name}")
|
|
if stdout:
|
|
with self.lock:
|
|
print(f"\nError output for {name}:\n{stdout}")
|
|
self.results.put({'name': name, 'success': False, 'output': stdout})
|
|
|
|
except Exception as e:
|
|
self._update_task_line(task_id, f"{Colors.RED}[FAILED]{Colors.RESET} {name}")
|
|
with self.lock:
|
|
print(f"\nException for {name}: {e}")
|
|
self.results.put({'name': name, 'success': False, 'output': str(e)})
|
|
return
|
|
|
|
if action is None:
|
|
return
|
|
|
|
# Animated progress for callable actions
|
|
spinner_chars = "|/-\\"
|
|
spinner_idx = 0
|
|
result_container = {}
|
|
exception_container = {}
|
|
done_event = threading.Event()
|
|
|
|
def run_action():
|
|
try:
|
|
result_container['result'] = action()
|
|
except Exception as exc:
|
|
exception_container['exception'] = exc
|
|
finally:
|
|
done_event.set()
|
|
|
|
worker = threading.Thread(target=run_action)
|
|
worker.start()
|
|
|
|
while not done_event.is_set():
|
|
spinner = f"{Colors.BLUE}[RUNNING]{Colors.RESET} {name} [{spinner_chars[spinner_idx]}]"
|
|
self._update_task_line(task_id, spinner)
|
|
spinner_idx = (spinner_idx + 1) % len(spinner_chars)
|
|
time.sleep(0.1)
|
|
|
|
worker.join()
|
|
|
|
if exception_container:
|
|
exception = exception_container['exception']
|
|
self._update_task_line(task_id, f"{Colors.RED}[FAILED]{Colors.RESET} {name}")
|
|
with self.lock:
|
|
print(f"\nException for {name}: {exception}")
|
|
self.results.put({'name': name, 'success': False, 'output': str(exception)})
|
|
return
|
|
|
|
result = result_container.get('result')
|
|
success = True
|
|
note = ''
|
|
output = ''
|
|
|
|
if isinstance(result, tuple):
|
|
if len(result) == 3:
|
|
success, note, output = result
|
|
elif len(result) == 2:
|
|
success, output = result
|
|
elif isinstance(result, dict):
|
|
success = bool(result.get('success', True))
|
|
note = result.get('note', '')
|
|
output = result.get('output', '')
|
|
elif isinstance(result, bool):
|
|
success = result
|
|
elif result is not None:
|
|
output = str(result)
|
|
|
|
if success:
|
|
final_text = f"{Colors.GREEN}[COMPLETED]{Colors.RESET} {name}"
|
|
if note:
|
|
final_text += f" {note}"
|
|
self._update_task_line(task_id, final_text)
|
|
self.results.put({'name': name, 'success': True, 'output': output})
|
|
else:
|
|
self._update_task_line(task_id, f"{Colors.RED}[FAILED]{Colors.RESET} {name}")
|
|
if output:
|
|
with self.lock:
|
|
print(f"\nError output for {name}:\n{output}")
|
|
self.results.put({'name': name, 'success': False, 'output': output})
|
|
|
|
def run_all(self):
|
|
"""Execute all tasks in parallel and return overall success status."""
|
|
print("Starting parallel upgrade tasks...\n")
|
|
|
|
# Start all tasks
|
|
threads = [
|
|
threading.Thread(target=self._execute_task, args=(task,))
|
|
for task in self.tasks
|
|
]
|
|
|
|
for thread in threads:
|
|
thread.start()
|
|
|
|
for thread in threads:
|
|
thread.join()
|
|
|
|
# Display summary
|
|
print(f"\n{'=' * 47}")
|
|
|
|
# Count results properly
|
|
all_results = []
|
|
while not self.results.empty():
|
|
all_results.append(self.results.get())
|
|
|
|
failed_tasks = [result for result in all_results if not result['success']]
|
|
|
|
if not failed_tasks:
|
|
print(f"{Colors.GREEN}All upgrade tasks completed successfully!{Colors.RESET}")
|
|
return True
|
|
else:
|
|
print(f"{Colors.RED}FAILED TASKS:{Colors.RESET}")
|
|
for task in failed_tasks:
|
|
print(f"\n{Colors.RED}• {task['name']}{Colors.RESET}")
|
|
if task['output']:
|
|
# Print first 10 lines of error output
|
|
output_lines = task['output'].strip().split('\n')
|
|
error_lines = output_lines[:10]
|
|
for line in error_lines:
|
|
print(f" {line}")
|
|
if len(output_lines) > 10:
|
|
print(f" ... ({len(output_lines) - 10} more lines)")
|
|
print(f"\n{Colors.RED}Total failed: {len(failed_tasks)} out of {len(all_results)} tasks{Colors.RESET}")
|
|
return False
|
|
|
|
NPM_PACKAGE_LOCKS = {
|
|
# "@openai/codex": "0.40.0",
|
|
}
|
|
|
|
def ensure_homebrew():
|
|
if shutil.which("brew") is not None:
|
|
return True, "(found)", ""
|
|
|
|
candidate_paths = [
|
|
"/opt/homebrew/bin/brew",
|
|
"/usr/local/bin/brew",
|
|
"/home/linuxbrew/.linuxbrew/bin/brew",
|
|
os.path.expanduser("~/.linuxbrew/bin/brew"),
|
|
]
|
|
|
|
def configure_env(brew_path):
|
|
env_result = subprocess.run(
|
|
[brew_path, "shellenv"], capture_output=True, text=True
|
|
)
|
|
for line in env_result.stdout.splitlines():
|
|
if not line.startswith("export "):
|
|
continue
|
|
key_val = line[len("export "):]
|
|
if "=" not in key_val:
|
|
continue
|
|
key, val = key_val.split("=", 1)
|
|
val = val.strip().strip('"')
|
|
if "${PATH" in val or "$PATH" in val:
|
|
current_path = os.environ.get("PATH", "")
|
|
val = val.replace("${PATH+:$PATH}", f":{current_path}" if current_path else "")
|
|
val = val.replace("$PATH", current_path)
|
|
os.environ[key] = val
|
|
if shutil.which("brew") is None:
|
|
os.environ["PATH"] = f"{os.path.dirname(brew_path)}:{os.environ.get('PATH','')}"
|
|
|
|
for path in candidate_paths:
|
|
if os.path.exists(path):
|
|
configure_env(path)
|
|
return True, "(found)", ""
|
|
|
|
install_output_parts = []
|
|
script_url = "https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh"
|
|
try:
|
|
with tempfile.NamedTemporaryFile(delete=False) as tmp:
|
|
tmp_path = tmp.name
|
|
fetch = subprocess.run(
|
|
["curl", "-fsSL", script_url, "-o", tmp_path],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
install_output_parts.append(fetch.stdout or "")
|
|
install_output_parts.append(fetch.stderr or "")
|
|
if fetch.returncode != 0:
|
|
os.unlink(tmp_path)
|
|
return False, "", "".join(install_output_parts)
|
|
|
|
env = os.environ.copy()
|
|
env["NONINTERACTIVE"] = "1"
|
|
run_install = subprocess.run(
|
|
["/bin/bash", tmp_path],
|
|
capture_output=True,
|
|
text=True,
|
|
env=env,
|
|
)
|
|
install_output_parts.append(run_install.stdout or "")
|
|
install_output_parts.append(run_install.stderr or "")
|
|
os.unlink(tmp_path)
|
|
|
|
if run_install.returncode != 0:
|
|
return False, "", "".join(install_output_parts)
|
|
except Exception as exc:
|
|
return False, "", f"{''.join(install_output_parts)}\n{exc}"
|
|
|
|
for path in candidate_paths:
|
|
if os.path.exists(path):
|
|
configure_env(path)
|
|
return True, "(installed)", "".join(install_output_parts)
|
|
|
|
return False, "", "".join(install_output_parts)
|
|
|
|
def ensure_git_available():
|
|
if shutil.which("git") is not None:
|
|
return True, "(found)", ""
|
|
|
|
result = subprocess.run(
|
|
["brew", "install", "git"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
output = f"{result.stdout or ''}{result.stderr or ''}"
|
|
if result.returncode == 0:
|
|
return True, "(installed)", output
|
|
return False, "", output
|
|
|
|
|
|
def ensure_npm_available():
|
|
def npm_works():
|
|
path_now = shutil.which("npm")
|
|
if path_now is None:
|
|
return False, "npm missing"
|
|
check = subprocess.run(["npm", "--version"], capture_output=True, text=True)
|
|
if check.returncode != 0:
|
|
return False, f"npm --version failed: {check.stdout}{check.stderr}"
|
|
return True, check.stdout.strip()
|
|
|
|
ok, detail = npm_works()
|
|
if ok:
|
|
return True, f"(found {detail})", ""
|
|
|
|
install = subprocess.run(["brew", "install", "node"], capture_output=True, text=True)
|
|
output_parts = [install.stdout or '', install.stderr or '']
|
|
ok, detail = npm_works()
|
|
if ok:
|
|
return True, "(installed node)", ''.join(output_parts)
|
|
|
|
reinstall = subprocess.run(["brew", "reinstall", "node"], capture_output=True, text=True)
|
|
output_parts.extend([reinstall.stdout or '', reinstall.stderr or ''])
|
|
ok, detail = npm_works()
|
|
if ok:
|
|
return True, "(reinstalled node)", ''.join(output_parts)
|
|
return False, "", ''.join(output_parts) + detail
|
|
|
|
|
|
def ensure_github_known_host():
|
|
ssh_dir = os.path.expanduser("~/.ssh")
|
|
known_hosts = os.path.join(ssh_dir, "known_hosts")
|
|
os.makedirs(ssh_dir, exist_ok=True)
|
|
|
|
check = subprocess.run(
|
|
["ssh-keygen", "-F", "github.com"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if check.returncode == 0 and check.stdout:
|
|
return True
|
|
|
|
scan = subprocess.run(
|
|
["ssh-keyscan", "-H", "github.com"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if scan.returncode != 0:
|
|
return False
|
|
|
|
with open(known_hosts, "a", encoding="utf-8") as f:
|
|
f.write(scan.stdout)
|
|
return True
|
|
|
|
def check_github_ssh():
|
|
if not ensure_github_known_host():
|
|
return False, "", "failed to add github.com host key"
|
|
|
|
cmd = ["ssh", "-o", "BatchMode=yes", "-T", "git@github.com"]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
output = f"{result.stdout or ''}{result.stderr or ''}"
|
|
ok_phrase = "successfully authenticated"
|
|
if result.returncode in (0, 1) and ok_phrase in output:
|
|
return True, "(auth ok)", output
|
|
if "Permission denied" in output:
|
|
return False, "", "GitHub SSH key rejected (Permission denied)"
|
|
if "Could not resolve hostname" in output:
|
|
return False, "", "Cannot reach github.com"
|
|
return False, "", output
|
|
|
|
def install_brew_packages():
|
|
"""Install missing brew packages."""
|
|
print("📦 Checking brew packages...")
|
|
|
|
# Get list of installed packages
|
|
try:
|
|
result = subprocess.run(['brew', 'list', '--formula', '-1'],
|
|
capture_output=True, text=True, check=True)
|
|
installed_packages = result.stdout.strip()
|
|
except subprocess.CalledProcessError:
|
|
print("❌ Failed to get brew package list")
|
|
return False
|
|
|
|
base_packages = [
|
|
# System utilities
|
|
"htop", "dust", "ncdu", "fswatch", "pipx", "uv",
|
|
# macOS GNU utilities / common core tools
|
|
"coreutils", "gnu-tar", "gnu-getopt", "gnu-sed",
|
|
# Development tools
|
|
"node", "go", "gcc", "gdb", "automake", "cmake", "jsdoc3", "oven-sh/bun/bun",
|
|
# Text processing and search
|
|
"ripgrep", "the_silver_searcher", "fd", "fzf", "bat", "ccat", "tree", "jq", "yq",
|
|
# File management
|
|
"yazi", "sevenzip",
|
|
# Git and version control
|
|
"git", "git-delta", "git-flow", "jesseduffield/lazygit/lazygit", "gh",
|
|
# Media and graphics
|
|
"ffmpeg", "imagemagick", "poppler", "yt-dlp",
|
|
# Network and web
|
|
"wget", "speedtest-cli", "cliproxyapi",
|
|
# Terminal and shell
|
|
"tmux", "neovim", "starship", "rainbarf", "neofetch", "onefetch", "tldr", "bmon", "loc",
|
|
# Applications
|
|
"awscli", "azure-cli", "hashicorp/tap/terraform",
|
|
]
|
|
|
|
mac_only = ["terminal-notifier"]
|
|
linux_skip = ["oven-sh/bun/bun"]
|
|
|
|
system = os.uname().sysname
|
|
packages = base_packages + (mac_only if system == "Darwin" else [])
|
|
if system == "Linux":
|
|
packages = [p for p in packages if p not in linux_skip]
|
|
|
|
missing_packages = []
|
|
installed_list = installed_packages.split('\n')
|
|
|
|
for package in packages:
|
|
# Handle tap packages (contains /)
|
|
if "/" in package:
|
|
package_name = package.split("/")[-1] # Extract package name after last /
|
|
if package_name not in installed_list:
|
|
missing_packages.append(package)
|
|
else:
|
|
if package not in installed_list:
|
|
missing_packages.append(package)
|
|
|
|
failed = []
|
|
if missing_packages:
|
|
print(f"Installing {len(missing_packages)} missing packages...")
|
|
for package in missing_packages:
|
|
try:
|
|
if package.startswith("hashicorp/tap/"):
|
|
subprocess.run(
|
|
['brew', 'tap', 'hashicorp/tap'],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True
|
|
)
|
|
subprocess.run(['brew', 'install', package], check=True)
|
|
print(f"✅ Installed {package}")
|
|
except subprocess.CalledProcessError:
|
|
print(f"❌ Failed to install {package}")
|
|
failed.append(package)
|
|
if failed:
|
|
print(f"❌ Failed packages: {', '.join(failed)}")
|
|
return False
|
|
return True
|
|
else:
|
|
print("✅ All packages already installed")
|
|
return True
|
|
|
|
def get_brew_outdated():
|
|
result = subprocess.run(
|
|
['brew', 'outdated', '--greedy', '--verbose', '--json=v2'],
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
stdout = result.stdout.strip()
|
|
if not stdout:
|
|
return []
|
|
|
|
try:
|
|
data = json.loads(stdout)
|
|
except json.JSONDecodeError:
|
|
return []
|
|
|
|
upgrades = []
|
|
|
|
for formula in data.get('formulae') or []:
|
|
name = formula.get('name')
|
|
installed = formula.get('installed_versions') or []
|
|
current = formula.get('current_version')
|
|
if name and installed and current:
|
|
from_version = installed[-1]
|
|
upgrades.append(f"{name} {from_version}->{current}")
|
|
|
|
for cask in data.get('casks') or []:
|
|
name = cask.get('name')
|
|
installed = cask.get('installed_versions') or []
|
|
current = cask.get('current_version')
|
|
if isinstance(name, list):
|
|
name = name[0] if name else None
|
|
if name and installed and current:
|
|
from_version = installed[-1]
|
|
upgrades.append(f"{name} {from_version}->{current}")
|
|
|
|
return upgrades
|
|
|
|
def create_brew_update_action():
|
|
def action():
|
|
if shutil.which('brew') is None:
|
|
return False, "", "brew not found"
|
|
|
|
before = get_brew_outdated()
|
|
output_parts = []
|
|
|
|
update_result = subprocess.run(
|
|
['brew', 'update'],
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
output_parts.extend([update_result.stdout or "", update_result.stderr or ""])
|
|
if update_result.returncode != 0:
|
|
return False, "", "".join(output_parts)
|
|
|
|
upgrade_result = subprocess.run(
|
|
['brew', 'upgrade', '--greedy'],
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
output_parts.extend([upgrade_result.stdout or "", upgrade_result.stderr or ""])
|
|
if upgrade_result.returncode != 0:
|
|
return False, "", "".join(output_parts)
|
|
|
|
after = get_brew_outdated()
|
|
|
|
note = ""
|
|
if before:
|
|
before_names = {entry.split()[0] for entry in before if entry.split()}
|
|
after_names = {entry.split()[0] for entry in after if entry.split()} if after else set()
|
|
updated_entries = [entry for entry in before if entry.split() and entry.split()[0] not in after_names]
|
|
if updated_entries:
|
|
if len(updated_entries) > 6:
|
|
shown = ", ".join(updated_entries[:6])
|
|
note = f"(updated: {shown}, +{len(updated_entries) - 6} more)"
|
|
else:
|
|
note = f"(updated: {', '.join(updated_entries)})"
|
|
else:
|
|
note = "(brew already up to date)"
|
|
else:
|
|
note = "(brew already up to date)"
|
|
|
|
return True, note, "".join(output_parts)
|
|
|
|
return action
|
|
|
|
def get_npm_updates(packages, locks=None):
|
|
packages = list(dict.fromkeys(packages))
|
|
locks = locks or {}
|
|
|
|
list_result = subprocess.run(
|
|
['npm', 'list', '-g', '--json', '--depth=0'],
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
installed_versions = {}
|
|
if list_result.stdout.strip():
|
|
try:
|
|
data = json.loads(list_result.stdout)
|
|
dependencies = data.get('dependencies', {})
|
|
if isinstance(dependencies, dict):
|
|
for name, info in dependencies.items():
|
|
if isinstance(info, dict):
|
|
version = info.get('version')
|
|
if version:
|
|
installed_versions[name] = version
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Only check "outdated" for packages that are not locked.
|
|
unlocked_packages = [p for p in packages if p not in locks]
|
|
outdated_info = {}
|
|
if unlocked_packages:
|
|
outdated_cmd = ['npm', 'outdated', '-g', '--json'] + unlocked_packages
|
|
outdated_result = subprocess.run(
|
|
outdated_cmd,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if outdated_result.returncode not in (0, 1):
|
|
raise subprocess.CalledProcessError(
|
|
outdated_result.returncode,
|
|
outdated_cmd,
|
|
output=outdated_result.stdout,
|
|
stderr=outdated_result.stderr
|
|
)
|
|
|
|
stdout = outdated_result.stdout.strip()
|
|
if stdout and stdout != 'null':
|
|
try:
|
|
data = json.loads(stdout)
|
|
if isinstance(data, dict):
|
|
outdated_info = data
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
to_install = []
|
|
updates = []
|
|
installs = []
|
|
|
|
for package in packages:
|
|
lock_version = locks.get(package)
|
|
if lock_version:
|
|
installed = installed_versions.get(package)
|
|
if installed != lock_version:
|
|
to_install.append(f"{package}@{lock_version}")
|
|
if installed:
|
|
updates.append(f"{package}@{lock_version}")
|
|
else:
|
|
installs.append(f"{package}@{lock_version}")
|
|
# Skip normal outdated handling when locked
|
|
continue
|
|
|
|
if package in outdated_info:
|
|
to_install.append(package)
|
|
updates.append(package)
|
|
continue
|
|
if package not in installed_versions:
|
|
to_install.append(package)
|
|
installs.append(package)
|
|
|
|
return to_install, updates, installs
|
|
|
|
def format_package_for_install(package):
|
|
return package if '@' in package[1:] else f"{package}@latest"
|
|
|
|
|
|
def create_npm_update_action(packages, locks=None):
|
|
packages = list(packages)
|
|
locks = locks or {}
|
|
|
|
def action():
|
|
to_install, updates, installs = get_npm_updates(packages, locks=locks)
|
|
if not to_install:
|
|
return True, "(up to date)", ""
|
|
|
|
install_args = ['npm', 'install', '-g', '-f'] + [format_package_for_install(pkg) for pkg in to_install]
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
install_args,
|
|
capture_output=True,
|
|
text=True,
|
|
check=True
|
|
)
|
|
note_parts = []
|
|
if updates:
|
|
note_parts.append(f"updated: {', '.join(updates)}")
|
|
if installs:
|
|
note_parts.append(f"installed: {', '.join(installs)}")
|
|
note = f"({'; '.join(note_parts)})" if note_parts else ''
|
|
output_parts = [part for part in [result.stdout, result.stderr] if part]
|
|
return True, note, "".join(output_parts)
|
|
except subprocess.CalledProcessError as err:
|
|
output_parts = [part for part in [err.stdout, err.stderr] if part]
|
|
output = "".join(output_parts) or str(err)
|
|
return False, "", output
|
|
|
|
return action
|
|
|
|
def schedule_npm_updates(runner):
|
|
if shutil.which('npm') is None:
|
|
print(f"{Colors.RED}❌ npm not found; skipping npm package checks{Colors.RESET}")
|
|
return
|
|
|
|
packages = [
|
|
"@anthropic-ai/claude-code",
|
|
"ccusage",
|
|
"@ccusage/codex",
|
|
"ccstatusline",
|
|
"@openai/codex",
|
|
"instant-markdown-d",
|
|
"mcp-proxy",
|
|
"opencode-ai",
|
|
"yarn",
|
|
"@google/gemini-cli",
|
|
]
|
|
|
|
runner.add_task("Node Apps Update", action=create_npm_update_action(packages, locks=NPM_PACKAGE_LOCKS))
|
|
|
|
def interpret_action_result(result):
|
|
success = True
|
|
note = ''
|
|
output = ''
|
|
if isinstance(result, tuple):
|
|
if len(result) == 3:
|
|
success, note, output = result
|
|
elif len(result) == 2:
|
|
success, output = result
|
|
elif isinstance(result, dict):
|
|
success = bool(result.get('success', True))
|
|
note = result.get('note', '')
|
|
output = result.get('output', '')
|
|
elif isinstance(result, bool):
|
|
success = result
|
|
elif result is not None:
|
|
output = str(result)
|
|
return bool(success), note, output
|
|
|
|
def ensure_symlink(target, link_name):
|
|
target = os.path.expanduser(target)
|
|
link_name = os.path.expanduser(link_name)
|
|
|
|
if os.path.islink(link_name):
|
|
if os.readlink(link_name) == target:
|
|
return True, "(already linked)", ""
|
|
os.remove(link_name)
|
|
elif os.path.exists(link_name):
|
|
backup = f"{link_name}.backup"
|
|
while os.path.exists(backup):
|
|
backup = f"{backup}.{int(time.time())}"
|
|
os.rename(link_name, backup)
|
|
|
|
os.symlink(target, link_name)
|
|
return True, f"(linked to {target})", ""
|
|
|
|
def create_symlink_action(target, link_name):
|
|
def action():
|
|
return ensure_symlink(target, link_name)
|
|
return action
|
|
|
|
def ensure_repo(path, slug, must_exist=False):
|
|
path = os.path.expanduser(path)
|
|
repo_url = f"git@github.com:{slug}.git"
|
|
def normalize(url):
|
|
return url.rstrip("/").removesuffix(".git")
|
|
|
|
if not os.path.exists(path):
|
|
if must_exist:
|
|
return False, "", f"{path} missing"
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
clone = subprocess.run(
|
|
["git", "clone", repo_url, path],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
output = f"{clone.stdout or ''}{clone.stderr or ''}"
|
|
if clone.returncode != 0:
|
|
return False, "", output
|
|
return True, "(cloned)", output
|
|
|
|
git_dir = os.path.join(path, ".git")
|
|
if not os.path.isdir(git_dir):
|
|
return False, "", f"{path} exists but is not a git repo"
|
|
|
|
remote = subprocess.run(
|
|
["git", "-C", path, "config", "--get", "remote.origin.url"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
remote_url = (remote.stdout or "").strip()
|
|
if remote.returncode != 0:
|
|
return False, "", f"cannot read remote for {path}"
|
|
if normalize(remote_url) != normalize(repo_url):
|
|
return False, "", f"remote mismatch: {remote_url}"
|
|
|
|
fetch = subprocess.run(
|
|
["git", "-C", path, "fetch", "--all", "--prune"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
fetch_output = f"{fetch.stdout or ''}{fetch.stderr or ''}"
|
|
if fetch.returncode != 0:
|
|
return False, "", fetch_output
|
|
|
|
pull = subprocess.run(
|
|
["git", "-C", path, "pull", "--ff-only"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
pull_output = f"{pull.stdout or ''}{pull.stderr or ''}"
|
|
if pull.returncode != 0:
|
|
return False, "", f"{fetch_output}{pull_output}"
|
|
|
|
return True, "(updated)", f"{fetch_output}{pull_output}"
|
|
|
|
def create_repo_action(path, slug, must_exist=False):
|
|
def action():
|
|
return ensure_repo(path, slug, must_exist=must_exist)
|
|
return action
|
|
|
|
def is_writable(path):
|
|
path = os.path.expanduser(path)
|
|
try:
|
|
test_file = os.path.join(path, ".writetest")
|
|
with open(test_file, "w", encoding="utf-8") as f:
|
|
f.write("ok")
|
|
os.remove(test_file)
|
|
return True
|
|
except OSError:
|
|
return False
|
|
|
|
def ensure_paths_writable(paths):
|
|
for p in paths:
|
|
if not is_writable(p):
|
|
return False, "", f"path not writable: {os.path.expanduser(p)}"
|
|
return True, "(writable)", ""
|
|
|
|
def ensure_system_build_tools():
|
|
if os.uname().sysname != "Linux":
|
|
return True, "(not needed on macOS)", ""
|
|
update = subprocess.run(
|
|
["sudo", "apt-get", "update", "-y"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
output_parts = [update.stdout or "", update.stderr or ""]
|
|
if update.returncode != 0:
|
|
return False, "", "".join(output_parts)
|
|
|
|
install = subprocess.run(
|
|
["sudo", "apt-get", "install", "-y", "build-essential"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
output_parts.extend([install.stdout or "", install.stderr or ""])
|
|
if install.returncode != 0:
|
|
return False, "", "".join(output_parts)
|
|
return True, "(build-essential ok)", "".join(output_parts)
|
|
|
|
def create_zshrc_action():
|
|
def action():
|
|
zshrc_path = os.path.expanduser("~/.zshrc")
|
|
source_line = "source ~/.config/zsh/zshrc"
|
|
|
|
if not os.path.exists(zshrc_path):
|
|
with open(zshrc_path, "w", encoding="utf-8") as f:
|
|
f.write(f"{source_line}\n")
|
|
return True, "(created ~/.zshrc)", ""
|
|
|
|
with open(zshrc_path, "r", encoding="utf-8") as f:
|
|
lines = f.read().splitlines()
|
|
if any(source_line in line for line in lines):
|
|
return True, "(already sources config)", ""
|
|
|
|
with open(zshrc_path, "a", encoding="utf-8") as f:
|
|
if lines and lines[-1].strip() != "":
|
|
f.write("\n")
|
|
f.write(f"{source_line}\n")
|
|
return True, "(added zsh source line)", ""
|
|
|
|
return action
|
|
|
|
def create_command_action(command):
|
|
def action():
|
|
result = subprocess.run(
|
|
command,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
output = f"{result.stdout or ''}{result.stderr or ''}"
|
|
return result.returncode == 0, "", output
|
|
return action
|
|
|
|
def create_agent_tracker_service_action():
|
|
def action():
|
|
env = os.environ.copy()
|
|
env["HOMEBREW_NO_AUTO_UPDATE"] = "1"
|
|
|
|
service = subprocess.run(
|
|
["bash", os.path.expanduser("~/.config/agent-tracker/scripts/install_brew_service.sh")],
|
|
capture_output=True,
|
|
text=True,
|
|
env=env,
|
|
)
|
|
output = f"{service.stdout or ''}{service.stderr or ''}"
|
|
if service.returncode != 0:
|
|
return False, "", output
|
|
return True, "(service installed)", output
|
|
|
|
return action
|
|
|
|
def create_agent_tracker_build_action():
|
|
def action():
|
|
base = os.path.expanduser("~/.config/agent-tracker")
|
|
if not os.path.isdir(base):
|
|
return False, "", f"{base} missing"
|
|
|
|
build = subprocess.run(
|
|
["bash", "-lc", "cd ~/.config/agent-tracker && ./install.sh"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
output = f"{build.stdout or ''}{build.stderr or ''}"
|
|
if build.returncode != 0:
|
|
return False, "", output
|
|
|
|
binary = os.path.join(base, "bin", "tracker-server")
|
|
if not os.path.isfile(binary):
|
|
return False, "", f"tracker-server still missing after build ({binary})"
|
|
return True, "(built)", output
|
|
|
|
return action
|
|
|
|
def run_inline(name, action):
|
|
success, note, output = interpret_action_result(action())
|
|
prefix = f"{Colors.GREEN}[OK]{Colors.RESET}" if success else f"{Colors.RED}[FAIL]{Colors.RESET}"
|
|
note_part = f" {note}" if note else ""
|
|
print(f"{prefix} {name}{note_part}")
|
|
if not success and output:
|
|
lines = output.strip().splitlines()
|
|
for line in lines[:10]:
|
|
print(f" {line}")
|
|
if len(lines) > 10:
|
|
print(f" ... ({len(lines) - 10} more lines)")
|
|
return success
|
|
|
|
def main():
|
|
runner = ParallelTaskRunner()
|
|
|
|
pre_steps = [
|
|
("Homebrew Available", ensure_homebrew),
|
|
("Git Available", ensure_git_available),
|
|
("GitHub SSH Auth", check_github_ssh),
|
|
("Paths Writable", lambda: ensure_paths_writable(["~", "~/.config", "~/.config/bin"])),
|
|
("System Build Tools", ensure_system_build_tools),
|
|
("NPM Available", ensure_npm_available),
|
|
("Repo ~/.config", create_repo_action("~/.config", "theniceboy/.config", must_exist=True)),
|
|
("Repo ~/.config/nvim", create_repo_action("~/.config/nvim", "theniceboy/nvim")),
|
|
("Repo ~/.sconfig", create_repo_action("~/.sconfig", "theniceboy/.sconfig")),
|
|
("Repo ~/Github/mac-ctrl", create_repo_action("~/Github/mac-ctrl", "theniceboy/mac-ctrl")),
|
|
]
|
|
|
|
for name, action in pre_steps:
|
|
if not run_inline(name, action):
|
|
sys.exit(1)
|
|
|
|
# Install missing brew packages first
|
|
if not install_brew_packages():
|
|
print("❌ Brew package installation failed")
|
|
sys.exit(1)
|
|
|
|
env_steps = [
|
|
("Zsh Config Ensure", create_zshrc_action()),
|
|
("Symlink Tmux Config", create_symlink_action("~/.config/.tmux.conf", "~/.tmux.conf")),
|
|
("Symlink Claude Config", create_symlink_action("~/.config/claude", "~/.claude")),
|
|
("Symlink Codex Config", create_symlink_action("~/.config/codex", "~/.codex")),
|
|
("Symlink Ripgrep Config", create_symlink_action("~/.config/rg/ripgreprc", "~/.ripgreprc")),
|
|
("Agent Tracker Build", create_agent_tracker_build_action()),
|
|
("Agent Tracker Brew Service", create_agent_tracker_service_action()),
|
|
]
|
|
|
|
for name, action in env_steps:
|
|
if not run_inline(name, action):
|
|
sys.exit(1)
|
|
|
|
schedule_npm_updates(runner)
|
|
runner.add_task("Homebrew Update", action=create_brew_update_action())
|
|
runner.add_task("Agent Tracker Build", "cd ~/.config/agent-tracker && ./install.sh")
|
|
|
|
success = runner.run_all()
|
|
sys.exit(0 if success else 1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|