theniceboy/bin/upgrade-all
2025-09-17 11:07:49 -07:00

397 lines
14 KiB
Python
Executable file

#!/usr/bin/env python3
import json
import shutil
import subprocess
import threading
import time
import sys
from queue import Queue
class Colors:
RED = '\033[0;31m'
GREEN = '\033[0;32m'
YELLOW = '\033[1;33m'
BLUE = '\033[0;34m'
RESET = '\033[0m'
class ParallelTaskRunner:
def __init__(self):
self.tasks = []
self.results = Queue()
self.task_positions = {}
self.lock = threading.Lock()
def add_task(self, name, command=None, action=None):
"""Add a task to the execution queue."""
if command is None and action is None:
raise ValueError("Task must define a command or action")
self.tasks.append({'name': name, 'command': command, 'action': action})
def _update_task_line(self, task_id, text):
"""Update a specific task's display line with thread-safe cursor movement."""
with self.lock:
if task_id in self.task_positions:
lines_up = len(self.task_positions) - self.task_positions[task_id]
print(f"\033[{lines_up}A\r{text}\033[K\033[{lines_up}B", end="", flush=True)
else:
self.task_positions[task_id] = len(self.task_positions)
print(text, flush=True)
def _execute_task(self, task):
"""Execute a single task with real-time status updates."""
name = task['name']
command = task.get('command')
action = task.get('action')
task_id = threading.get_ident()
self._update_task_line(task_id, f"{Colors.YELLOW}[STARTING]{Colors.RESET} {name}")
if command is not None:
try:
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True
)
# Animated progress indicator
spinner_chars = "|/-\\"
spinner_idx = 0
while process.poll() is None:
spinner = f"{Colors.BLUE}[RUNNING]{Colors.RESET} {name} [{spinner_chars[spinner_idx]}]"
self._update_task_line(task_id, spinner)
spinner_idx = (spinner_idx + 1) % len(spinner_chars)
time.sleep(0.1)
stdout, _ = process.communicate()
if process.returncode == 0:
self._update_task_line(task_id, f"{Colors.GREEN}[COMPLETED]{Colors.RESET} {name}")
self.results.put({'name': name, 'success': True, 'output': stdout})
else:
self._update_task_line(task_id, f"{Colors.RED}[FAILED]{Colors.RESET} {name}")
if stdout:
with self.lock:
print(f"\nError output for {name}:\n{stdout}")
self.results.put({'name': name, 'success': False, 'output': stdout})
except Exception as e:
self._update_task_line(task_id, f"{Colors.RED}[FAILED]{Colors.RESET} {name}")
with self.lock:
print(f"\nException for {name}: {e}")
self.results.put({'name': name, 'success': False, 'output': str(e)})
return
if action is None:
return
# Animated progress for callable actions
spinner_chars = "|/-\\"
spinner_idx = 0
result_container = {}
exception_container = {}
done_event = threading.Event()
def run_action():
try:
result_container['result'] = action()
except Exception as exc:
exception_container['exception'] = exc
finally:
done_event.set()
worker = threading.Thread(target=run_action)
worker.start()
while not done_event.is_set():
spinner = f"{Colors.BLUE}[RUNNING]{Colors.RESET} {name} [{spinner_chars[spinner_idx]}]"
self._update_task_line(task_id, spinner)
spinner_idx = (spinner_idx + 1) % len(spinner_chars)
time.sleep(0.1)
worker.join()
if exception_container:
exception = exception_container['exception']
self._update_task_line(task_id, f"{Colors.RED}[FAILED]{Colors.RESET} {name}")
with self.lock:
print(f"\nException for {name}: {exception}")
self.results.put({'name': name, 'success': False, 'output': str(exception)})
return
result = result_container.get('result')
success = True
note = ''
output = ''
if isinstance(result, tuple):
if len(result) == 3:
success, note, output = result
elif len(result) == 2:
success, output = result
elif isinstance(result, dict):
success = bool(result.get('success', True))
note = result.get('note', '')
output = result.get('output', '')
elif isinstance(result, bool):
success = result
elif result is not None:
output = str(result)
if success:
final_text = f"{Colors.GREEN}[COMPLETED]{Colors.RESET} {name}"
if note:
final_text += f" {note}"
self._update_task_line(task_id, final_text)
self.results.put({'name': name, 'success': True, 'output': output})
else:
self._update_task_line(task_id, f"{Colors.RED}[FAILED]{Colors.RESET} {name}")
if output:
with self.lock:
print(f"\nError output for {name}:\n{output}")
self.results.put({'name': name, 'success': False, 'output': output})
def run_all(self):
"""Execute all tasks in parallel and return overall success status."""
print("Starting parallel upgrade tasks...\n")
# Start all tasks
threads = [
threading.Thread(target=self._execute_task, args=(task,))
for task in self.tasks
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# Display summary
print(f"\n{'=' * 47}")
# Count results properly
all_results = []
while not self.results.empty():
all_results.append(self.results.get())
failed_tasks = [result for result in all_results if not result['success']]
if not failed_tasks:
print(f"{Colors.GREEN}All upgrade tasks completed successfully!{Colors.RESET}")
return True
else:
print(f"{Colors.RED}FAILED TASKS:{Colors.RESET}")
for task in failed_tasks:
print(f"\n{Colors.RED}{task['name']}{Colors.RESET}")
if task['output']:
# Print first 10 lines of error output
output_lines = task['output'].strip().split('\n')
error_lines = output_lines[:10]
for line in error_lines:
print(f" {line}")
if len(output_lines) > 10:
print(f" ... ({len(output_lines) - 10} more lines)")
print(f"\n{Colors.RED}Total failed: {len(failed_tasks)} out of {len(all_results)} tasks{Colors.RESET}")
return False
def install_brew_packages():
"""Install missing brew packages."""
print("📦 Checking brew packages...")
# Get list of installed packages
try:
result = subprocess.run(['brew', 'list', '--formula', '-1'],
capture_output=True, text=True, check=True)
installed_packages = result.stdout.strip()
except subprocess.CalledProcessError:
print("❌ Failed to get brew package list")
return False
packages = [
# System utilities
"htop", "dust", "ncdu", "fswatch", "pipx", "uv", "terminal-notifier",
# macOS GNU utilities
"coreutils", "gnu-tar", "gnu-getopt", "gnu-sed",
# Development tools
"node", "go", "gcc", "gdb", "automake", "cmake", "jsdoc3", "oven-sh/bun/bun",
# Text processing and search
"ripgrep", "the_silver_searcher", "fd", "fzf", "bat", "ccat", "tree", "jq", "yq",
# File management
"yazi", "sevenzip",
# Git and version control
"git", "git-delta", "git-flow", "jesseduffield/lazygit/lazygit", "gh",
# Media and graphics
"ffmpeg", "imagemagick", "poppler", "yt-dlp",
# Network and web
"wget", "speedtest-cli",
# Terminal and shell
"tmux", "neovim", "starship", "rainbarf", "neofetch", "onefetch", "tldr", "bmon", "loc",
# Applications
"awscli", "azure-cli", "terraform"
]
missing_packages = []
installed_list = installed_packages.split('\n')
for package in packages:
# Handle tap packages (contains /)
if "/" in package:
package_name = package.split("/")[-1] # Extract package name after last /
if package_name not in installed_list:
missing_packages.append(package)
else:
if package not in installed_list:
missing_packages.append(package)
if missing_packages:
print(f"Installing {len(missing_packages)} missing packages...")
for package in missing_packages:
try:
subprocess.run(['brew', 'install', package], check=True)
print(f"✅ Installed {package}")
except subprocess.CalledProcessError:
print(f"❌ Failed to install {package}")
return True
else:
print("✅ All packages already installed")
return True
def get_npm_updates(packages):
packages = list(dict.fromkeys(packages))
list_result = subprocess.run(
['npm', 'list', '-g', '--json', '--depth=0'],
capture_output=True,
text=True
)
installed_versions = {}
if list_result.stdout.strip():
try:
data = json.loads(list_result.stdout)
dependencies = data.get('dependencies', {})
if isinstance(dependencies, dict):
for name, info in dependencies.items():
if isinstance(info, dict):
version = info.get('version')
if version:
installed_versions[name] = version
except json.JSONDecodeError:
pass
outdated_cmd = ['npm', 'outdated', '-g', '--json'] + packages
outdated_result = subprocess.run(
outdated_cmd,
capture_output=True,
text=True
)
if outdated_result.returncode not in (0, 1):
raise subprocess.CalledProcessError(
outdated_result.returncode,
outdated_cmd,
output=outdated_result.stdout,
stderr=outdated_result.stderr
)
outdated_info = {}
stdout = outdated_result.stdout.strip()
if stdout and stdout != 'null':
try:
data = json.loads(stdout)
if isinstance(data, dict):
outdated_info = data
except json.JSONDecodeError:
pass
to_install = []
updates = []
installs = []
for package in packages:
if package in outdated_info:
to_install.append(package)
updates.append(package)
continue
if package not in installed_versions:
to_install.append(package)
installs.append(package)
return to_install, updates, installs
def format_package_for_install(package):
return package if '@' in package[1:] else f"{package}@latest"
def create_npm_update_action(packages):
packages = list(packages)
def action():
to_install, updates, installs = get_npm_updates(packages)
if not to_install:
return True, "(up to date)", ""
install_args = ['npm', 'install', '-g', '-f'] + [format_package_for_install(pkg) for pkg in to_install]
try:
result = subprocess.run(
install_args,
capture_output=True,
text=True,
check=True
)
note_parts = []
if updates:
note_parts.append(f"updated: {', '.join(updates)}")
if installs:
note_parts.append(f"installed: {', '.join(installs)}")
note = f"({'; '.join(note_parts)})" if note_parts else ''
output_parts = [part for part in [result.stdout, result.stderr] if part]
return True, note, "".join(output_parts)
except subprocess.CalledProcessError as err:
output_parts = [part for part in [err.stdout, err.stderr] if part]
output = "".join(output_parts) or str(err)
return False, "", output
return action
def schedule_npm_updates(runner):
if shutil.which('npm') is None:
print(f"{Colors.RED}❌ npm not found; skipping npm package checks{Colors.RESET}")
return
packages = [
"@anthropic-ai/claude-code",
"ccusage",
"ccstatusline",
"@openai/codex",
"instant-markdown-d",
"mcp-proxy",
]
runner.add_task("Node Apps Update", action=create_npm_update_action(packages))
def main():
runner = ParallelTaskRunner()
# Install missing brew packages first
if not install_brew_packages():
print("❌ Brew package installation failed")
sys.exit(1)
schedule_npm_updates(runner)
runner.add_task("Homebrew Update", "brew update && brew upgrade --greedy")
runner.add_task("Config Git Pull", "cd ~/.config && git pull")
runner.add_task("Neovim Config Git Pull", "cd ~/.config/nvim && git pull")
runner.add_task("SConfig Git Pull", "cd ~/.sconfig && git pull")
runner.add_task( "Agent Tracker Build", "cd ~/.config/agent-tracker && ./install.sh")
success = runner.run_all()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()