#!/usr/bin/env python3 import json import shutil import subprocess import threading import time import sys from queue import Queue class Colors: RED = '\033[0;31m' GREEN = '\033[0;32m' YELLOW = '\033[1;33m' BLUE = '\033[0;34m' RESET = '\033[0m' class ParallelTaskRunner: def __init__(self): self.tasks = [] self.results = Queue() self.task_positions = {} self.lock = threading.Lock() def add_task(self, name, command=None, action=None): """Add a task to the execution queue.""" if command is None and action is None: raise ValueError("Task must define a command or action") self.tasks.append({'name': name, 'command': command, 'action': action}) def _update_task_line(self, task_id, text): """Update a specific task's display line with thread-safe cursor movement.""" with self.lock: if task_id in self.task_positions: lines_up = len(self.task_positions) - self.task_positions[task_id] print(f"\033[{lines_up}A\r{text}\033[K\033[{lines_up}B", end="", flush=True) else: self.task_positions[task_id] = len(self.task_positions) print(text, flush=True) def _execute_task(self, task): """Execute a single task with real-time status updates.""" name = task['name'] command = task.get('command') action = task.get('action') task_id = threading.get_ident() self._update_task_line(task_id, f"{Colors.YELLOW}[STARTING]{Colors.RESET} {name}") if command is not None: try: process = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True ) # Animated progress indicator spinner_chars = "|/-\\" spinner_idx = 0 while process.poll() is None: spinner = f"{Colors.BLUE}[RUNNING]{Colors.RESET} {name} [{spinner_chars[spinner_idx]}]" self._update_task_line(task_id, spinner) spinner_idx = (spinner_idx + 1) % len(spinner_chars) time.sleep(0.1) stdout, _ = process.communicate() if process.returncode == 0: self._update_task_line(task_id, f"{Colors.GREEN}[COMPLETED]{Colors.RESET} {name}") self.results.put({'name': name, 'success': True, 'output': stdout}) else: self._update_task_line(task_id, f"{Colors.RED}[FAILED]{Colors.RESET} {name}") if stdout: with self.lock: print(f"\nError output for {name}:\n{stdout}") self.results.put({'name': name, 'success': False, 'output': stdout}) except Exception as e: self._update_task_line(task_id, f"{Colors.RED}[FAILED]{Colors.RESET} {name}") with self.lock: print(f"\nException for {name}: {e}") self.results.put({'name': name, 'success': False, 'output': str(e)}) return if action is None: return # Animated progress for callable actions spinner_chars = "|/-\\" spinner_idx = 0 result_container = {} exception_container = {} done_event = threading.Event() def run_action(): try: result_container['result'] = action() except Exception as exc: exception_container['exception'] = exc finally: done_event.set() worker = threading.Thread(target=run_action) worker.start() while not done_event.is_set(): spinner = f"{Colors.BLUE}[RUNNING]{Colors.RESET} {name} [{spinner_chars[spinner_idx]}]" self._update_task_line(task_id, spinner) spinner_idx = (spinner_idx + 1) % len(spinner_chars) time.sleep(0.1) worker.join() if exception_container: exception = exception_container['exception'] self._update_task_line(task_id, f"{Colors.RED}[FAILED]{Colors.RESET} {name}") with self.lock: print(f"\nException for {name}: {exception}") self.results.put({'name': name, 'success': False, 'output': str(exception)}) return result = result_container.get('result') success = True note = '' output = '' if isinstance(result, tuple): if len(result) == 3: success, note, output = result elif len(result) == 2: success, output = result elif isinstance(result, dict): success = bool(result.get('success', True)) note = result.get('note', '') output = result.get('output', '') elif isinstance(result, bool): success = result elif result is not None: output = str(result) if success: final_text = f"{Colors.GREEN}[COMPLETED]{Colors.RESET} {name}" if note: final_text += f" {note}" self._update_task_line(task_id, final_text) self.results.put({'name': name, 'success': True, 'output': output}) else: self._update_task_line(task_id, f"{Colors.RED}[FAILED]{Colors.RESET} {name}") if output: with self.lock: print(f"\nError output for {name}:\n{output}") self.results.put({'name': name, 'success': False, 'output': output}) def run_all(self): """Execute all tasks in parallel and return overall success status.""" print("Starting parallel upgrade tasks...\n") # Start all tasks threads = [ threading.Thread(target=self._execute_task, args=(task,)) for task in self.tasks ] for thread in threads: thread.start() for thread in threads: thread.join() # Display summary print(f"\n{'=' * 47}") # Count results properly all_results = [] while not self.results.empty(): all_results.append(self.results.get()) failed_tasks = [result for result in all_results if not result['success']] if not failed_tasks: print(f"{Colors.GREEN}All upgrade tasks completed successfully!{Colors.RESET}") return True else: print(f"{Colors.RED}FAILED TASKS:{Colors.RESET}") for task in failed_tasks: print(f"\n{Colors.RED}• {task['name']}{Colors.RESET}") if task['output']: # Print first 10 lines of error output output_lines = task['output'].strip().split('\n') error_lines = output_lines[:10] for line in error_lines: print(f" {line}") if len(output_lines) > 10: print(f" ... ({len(output_lines) - 10} more lines)") print(f"\n{Colors.RED}Total failed: {len(failed_tasks)} out of {len(all_results)} tasks{Colors.RESET}") return False NPM_PACKAGE_LOCKS = { # "@openai/codex": "0.40.0", } def install_brew_packages(): """Install missing brew packages.""" print("📦 Checking brew packages...") # Get list of installed packages try: result = subprocess.run(['brew', 'list', '--formula', '-1'], capture_output=True, text=True, check=True) installed_packages = result.stdout.strip() except subprocess.CalledProcessError: print("❌ Failed to get brew package list") return False packages = [ # System utilities "htop", "dust", "ncdu", "fswatch", "pipx", "uv", "terminal-notifier", # macOS GNU utilities "coreutils", "gnu-tar", "gnu-getopt", "gnu-sed", # Development tools "node", "go", "gcc", "gdb", "automake", "cmake", "jsdoc3", "oven-sh/bun/bun", # Text processing and search "ripgrep", "the_silver_searcher", "fd", "fzf", "bat", "ccat", "tree", "jq", "yq", # File management "yazi", "sevenzip", # Git and version control "git", "git-delta", "git-flow", "jesseduffield/lazygit/lazygit", "gh", # Media and graphics "ffmpeg", "imagemagick", "poppler", "yt-dlp", # Network and web "wget", "speedtest-cli", "cliproxyapi", # Terminal and shell "tmux", "neovim", "starship", "rainbarf", "neofetch", "onefetch", "tldr", "bmon", "loc", # Applications "awscli", "azure-cli", "terraform" ] missing_packages = [] installed_list = installed_packages.split('\n') for package in packages: # Handle tap packages (contains /) if "/" in package: package_name = package.split("/")[-1] # Extract package name after last / if package_name not in installed_list: missing_packages.append(package) else: if package not in installed_list: missing_packages.append(package) if missing_packages: print(f"Installing {len(missing_packages)} missing packages...") for package in missing_packages: try: subprocess.run(['brew', 'install', package], check=True) print(f"✅ Installed {package}") except subprocess.CalledProcessError: print(f"❌ Failed to install {package}") return True else: print("✅ All packages already installed") return True def get_brew_outdated(): result = subprocess.run( ['brew', 'outdated', '--greedy', '--verbose', '--json=v2'], capture_output=True, text=True ) stdout = result.stdout.strip() if not stdout: return [] try: data = json.loads(stdout) except json.JSONDecodeError: return [] upgrades = [] for formula in data.get('formulae') or []: name = formula.get('name') installed = formula.get('installed_versions') or [] current = formula.get('current_version') if name and installed and current: from_version = installed[-1] upgrades.append(f"{name} {from_version}->{current}") for cask in data.get('casks') or []: name = cask.get('name') installed = cask.get('installed_versions') or [] current = cask.get('current_version') if isinstance(name, list): name = name[0] if name else None if name and installed and current: from_version = installed[-1] upgrades.append(f"{name} {from_version}->{current}") return upgrades def create_brew_update_action(): def action(): if shutil.which('brew') is None: return False, "", "brew not found" before = get_brew_outdated() output_parts = [] update_result = subprocess.run( ['brew', 'update'], capture_output=True, text=True ) output_parts.extend([update_result.stdout or "", update_result.stderr or ""]) if update_result.returncode != 0: return False, "", "".join(output_parts) upgrade_result = subprocess.run( ['brew', 'upgrade', '--greedy'], capture_output=True, text=True ) output_parts.extend([upgrade_result.stdout or "", upgrade_result.stderr or ""]) if upgrade_result.returncode != 0: return False, "", "".join(output_parts) after = get_brew_outdated() note = "" if before: before_names = {entry.split()[0] for entry in before if entry.split()} after_names = {entry.split()[0] for entry in after if entry.split()} if after else set() updated_entries = [entry for entry in before if entry.split() and entry.split()[0] not in after_names] if updated_entries: if len(updated_entries) > 6: shown = ", ".join(updated_entries[:6]) note = f"(updated: {shown}, +{len(updated_entries) - 6} more)" else: note = f"(updated: {', '.join(updated_entries)})" else: note = "(brew already up to date)" else: note = "(brew already up to date)" return True, note, "".join(output_parts) return action def get_npm_updates(packages, locks=None): packages = list(dict.fromkeys(packages)) locks = locks or {} list_result = subprocess.run( ['npm', 'list', '-g', '--json', '--depth=0'], capture_output=True, text=True ) installed_versions = {} if list_result.stdout.strip(): try: data = json.loads(list_result.stdout) dependencies = data.get('dependencies', {}) if isinstance(dependencies, dict): for name, info in dependencies.items(): if isinstance(info, dict): version = info.get('version') if version: installed_versions[name] = version except json.JSONDecodeError: pass # Only check "outdated" for packages that are not locked. unlocked_packages = [p for p in packages if p not in locks] outdated_info = {} if unlocked_packages: outdated_cmd = ['npm', 'outdated', '-g', '--json'] + unlocked_packages outdated_result = subprocess.run( outdated_cmd, capture_output=True, text=True ) if outdated_result.returncode not in (0, 1): raise subprocess.CalledProcessError( outdated_result.returncode, outdated_cmd, output=outdated_result.stdout, stderr=outdated_result.stderr ) stdout = outdated_result.stdout.strip() if stdout and stdout != 'null': try: data = json.loads(stdout) if isinstance(data, dict): outdated_info = data except json.JSONDecodeError: pass to_install = [] updates = [] installs = [] for package in packages: lock_version = locks.get(package) if lock_version: installed = installed_versions.get(package) if installed != lock_version: to_install.append(f"{package}@{lock_version}") if installed: updates.append(f"{package}@{lock_version}") else: installs.append(f"{package}@{lock_version}") # Skip normal outdated handling when locked continue if package in outdated_info: to_install.append(package) updates.append(package) continue if package not in installed_versions: to_install.append(package) installs.append(package) return to_install, updates, installs def format_package_for_install(package): return package if '@' in package[1:] else f"{package}@latest" def create_npm_update_action(packages, locks=None): packages = list(packages) locks = locks or {} def action(): to_install, updates, installs = get_npm_updates(packages, locks=locks) if not to_install: return True, "(up to date)", "" install_args = ['npm', 'install', '-g', '-f'] + [format_package_for_install(pkg) for pkg in to_install] try: result = subprocess.run( install_args, capture_output=True, text=True, check=True ) note_parts = [] if updates: note_parts.append(f"updated: {', '.join(updates)}") if installs: note_parts.append(f"installed: {', '.join(installs)}") note = f"({'; '.join(note_parts)})" if note_parts else '' output_parts = [part for part in [result.stdout, result.stderr] if part] return True, note, "".join(output_parts) except subprocess.CalledProcessError as err: output_parts = [part for part in [err.stdout, err.stderr] if part] output = "".join(output_parts) or str(err) return False, "", output return action def schedule_npm_updates(runner): if shutil.which('npm') is None: print(f"{Colors.RED}❌ npm not found; skipping npm package checks{Colors.RESET}") return packages = [ "@anthropic-ai/claude-code", "ccusage", "@ccusage/codex", "ccstatusline", "@openai/codex", "instant-markdown-d", "mcp-proxy", "yarn", "@google/gemini-cli", ] runner.add_task("Node Apps Update", action=create_npm_update_action(packages, locks=NPM_PACKAGE_LOCKS)) def main(): runner = ParallelTaskRunner() # Install missing brew packages first if not install_brew_packages(): print("❌ Brew package installation failed") sys.exit(1) schedule_npm_updates(runner) runner.add_task("Homebrew Update", action=create_brew_update_action()) runner.add_task("Config Git Pull", "cd ~/.config && git pull") runner.add_task("Neovim Config Git Pull", "cd ~/.config/nvim && git pull") runner.add_task("SConfig Git Pull", "cd ~/.sconfig && git pull") runner.add_task( "Agent Tracker Build", "cd ~/.config/agent-tracker && ./install.sh") success = runner.run_all() sys.exit(0 if success else 1) if __name__ == "__main__": main()