#!/usr/bin/env python3 import subprocess import threading import time import sys from queue import Queue class Colors: RED = '\033[0;31m' GREEN = '\033[0;32m' YELLOW = '\033[1;33m' BLUE = '\033[0;34m' RESET = '\033[0m' class ParallelTaskRunner: def __init__(self): self.tasks = [] self.results = Queue() self.task_positions = {} self.lock = threading.Lock() def add_task(self, name, command): """Add a task to the execution queue.""" self.tasks.append({'name': name, 'command': command}) def _update_task_line(self, task_id, text): """Update a specific task's display line with thread-safe cursor movement.""" with self.lock: if task_id in self.task_positions: lines_up = len(self.task_positions) - self.task_positions[task_id] print(f"\033[{lines_up}A\r{text}\033[K\033[{lines_up}B", end="", flush=True) else: self.task_positions[task_id] = len(self.task_positions) print(text, flush=True) def _execute_task(self, task): """Execute a single task with real-time status updates.""" name, command = task['name'], task['command'] task_id = threading.get_ident() self._update_task_line(task_id, f"{Colors.YELLOW}[STARTING]{Colors.RESET} {name}") try: process = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True ) # Animated progress indicator spinner_chars = "|/-\\" spinner_idx = 0 while process.poll() is None: spinner = f"{Colors.BLUE}[RUNNING]{Colors.RESET} {name} [{spinner_chars[spinner_idx]}]" self._update_task_line(task_id, spinner) spinner_idx = (spinner_idx + 1) % len(spinner_chars) time.sleep(0.1) stdout, _ = process.communicate() if process.returncode == 0: self._update_task_line(task_id, f"{Colors.GREEN}[COMPLETED]{Colors.RESET} {name}") self.results.put({'name': name, 'success': True, 'output': stdout}) else: self._update_task_line(task_id, f"{Colors.RED}[FAILED]{Colors.RESET} {name}") if stdout: with self.lock: print(f"\nError output for {name}:\n{stdout}") self.results.put({'name': name, 'success': False, 'output': stdout}) except Exception as e: self._update_task_line(task_id, f"{Colors.RED}[FAILED]{Colors.RESET} {name}") with self.lock: print(f"\nException for {name}: {e}") self.results.put({'name': name, 'success': False, 'output': str(e)}) def run_all(self): """Execute all tasks in parallel and return overall success status.""" print("Starting parallel upgrade tasks...\n") # Start all tasks threads = [ threading.Thread(target=self._execute_task, args=(task,)) for task in self.tasks ] for thread in threads: thread.start() for thread in threads: thread.join() # Display summary print(f"\n{'=' * 47}") # Count results properly all_results = [] while not self.results.empty(): all_results.append(self.results.get()) failed_tasks = [result for result in all_results if not result['success']] if not failed_tasks: print(f"{Colors.GREEN}All upgrade tasks completed successfully!{Colors.RESET}") return True else: print(f"{Colors.RED}FAILED TASKS:{Colors.RESET}") for task in failed_tasks: print(f"\n{Colors.RED}• {task['name']}{Colors.RESET}") if task['output']: # Print first 10 lines of error output output_lines = task['output'].strip().split('\n') error_lines = output_lines[:10] for line in error_lines: print(f" {line}") if len(output_lines) > 10: print(f" ... ({len(output_lines) - 10} more lines)") print(f"\n{Colors.RED}Total failed: {len(failed_tasks)} out of {len(all_results)} tasks{Colors.RESET}") return False def main(): runner = ParallelTaskRunner() runner.add_task("Claude Code Update", "npm install -g -f @anthropic-ai/claude-code ccusage ccstatusline") runner.add_task("Homebrew Update", "brew update && brew upgrade --greedy") runner.add_task("Config Git Pull", "cd ~/.config && git pull") runner.add_task("Neovim Config Git Pull", "cd ~/.config/nvim && git pull") runner.add_task("SConfig Git Pull", "cd ~/.sconfig && git pull") runner.add_task("Node Apps Update", "npm install -g -f instant-markdown-d") success = runner.run_all() sys.exit(0 if success else 1) if __name__ == "__main__": main()