theniceboy/bin/upgrade-all
2025-08-15 09:23:22 -05:00

139 lines
No EOL
5.2 KiB
Python
Executable file

#!/usr/bin/env python3
import subprocess
import threading
import time
import sys
from queue import Queue
class Colors:
RED = '\033[0;31m'
GREEN = '\033[0;32m'
YELLOW = '\033[1;33m'
BLUE = '\033[0;34m'
RESET = '\033[0m'
class ParallelTaskRunner:
def __init__(self):
self.tasks = []
self.results = Queue()
self.task_positions = {}
self.lock = threading.Lock()
def add_task(self, name, command):
"""Add a task to the execution queue."""
self.tasks.append({'name': name, 'command': command})
def _update_task_line(self, task_id, text):
"""Update a specific task's display line with thread-safe cursor movement."""
with self.lock:
if task_id in self.task_positions:
lines_up = len(self.task_positions) - self.task_positions[task_id]
print(f"\033[{lines_up}A\r{text}\033[K\033[{lines_up}B", end="", flush=True)
else:
self.task_positions[task_id] = len(self.task_positions)
print(text, flush=True)
def _execute_task(self, task):
"""Execute a single task with real-time status updates."""
name, command = task['name'], task['command']
task_id = threading.get_ident()
self._update_task_line(task_id, f"{Colors.YELLOW}[STARTING]{Colors.RESET} {name}")
try:
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True
)
# Animated progress indicator
spinner_chars = "|/-\\"
spinner_idx = 0
while process.poll() is None:
spinner = f"{Colors.BLUE}[RUNNING]{Colors.RESET} {name} [{spinner_chars[spinner_idx]}]"
self._update_task_line(task_id, spinner)
spinner_idx = (spinner_idx + 1) % len(spinner_chars)
time.sleep(0.1)
stdout, _ = process.communicate()
if process.returncode == 0:
self._update_task_line(task_id, f"{Colors.GREEN}[COMPLETED]{Colors.RESET} {name}")
self.results.put({'name': name, 'success': True, 'output': stdout})
else:
self._update_task_line(task_id, f"{Colors.RED}[FAILED]{Colors.RESET} {name}")
if stdout:
with self.lock:
print(f"\nError output for {name}:\n{stdout}")
self.results.put({'name': name, 'success': False, 'output': stdout})
except Exception as e:
self._update_task_line(task_id, f"{Colors.RED}[FAILED]{Colors.RESET} {name}")
with self.lock:
print(f"\nException for {name}: {e}")
self.results.put({'name': name, 'success': False, 'output': str(e)})
def run_all(self):
"""Execute all tasks in parallel and return overall success status."""
print("Starting parallel upgrade tasks...\n")
# Start all tasks
threads = [
threading.Thread(target=self._execute_task, args=(task,))
for task in self.tasks
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
# Display summary
print(f"\n{'=' * 47}")
# Count results properly
all_results = []
while not self.results.empty():
all_results.append(self.results.get())
failed_tasks = [result for result in all_results if not result['success']]
if not failed_tasks:
print(f"{Colors.GREEN}All upgrade tasks completed successfully!{Colors.RESET}")
return True
else:
print(f"{Colors.RED}FAILED TASKS:{Colors.RESET}")
for task in failed_tasks:
print(f"\n{Colors.RED}{task['name']}{Colors.RESET}")
if task['output']:
# Print first 10 lines of error output
output_lines = task['output'].strip().split('\n')
error_lines = output_lines[:10]
for line in error_lines:
print(f" {line}")
if len(output_lines) > 10:
print(f" ... ({len(output_lines) - 10} more lines)")
print(f"\n{Colors.RED}Total failed: {len(failed_tasks)} out of {len(all_results)} tasks{Colors.RESET}")
return False
def main():
runner = ParallelTaskRunner()
runner.add_task("Claude Code Update", "npm install -g @anthropic-ai/claude-code")
runner.add_task("CCUSAGE Update", "npm install -g ccusage")
runner.add_task("Homebrew Update", "brew update && brew upgrade --greedy")
runner.add_task("Config Git Pull", "cd ~/.config && git pull")
runner.add_task("Neovim Config Git Pull", "cd ~/.config/nvim && git pull")
runner.add_task("SConfig Git Pull", "cd ~/.sconfig && git pull")
success = runner.run_all()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()