22Case status spinner for fz package - visual progress indicator for running cases
33"""
44import os
5+ import shutil
56import sys
67import threading
78import time
@@ -19,16 +20,10 @@ class CaseStatus(Enum):
1920
2021class CaseSpinner :
2122 """
22- A visual spinner that shows status of multiple cases in a single line
23-
24- Symbols:
25- - □ (empty square): Case not yet started
26- - |/-\\ (rotating bar): Case currently running
27- - ✓ (check mark): Case completed successfully
28- - ✗ (cross mark): Case failed
23+ A visual progress bar that shows completion percentage in a single line
2924
3025 Example output:
31- [□□|/- \\ ✓✗] ETA: 2m 30s
26+ ◢ [████████>░░░░░░░░░░░] 35% (7/20) ETA: 1m 45s
3227 """
3328
3429 def __init__ (self , num_cases : int , num_calculators : int = 1 ):
@@ -78,17 +73,17 @@ def stop(self, clear: bool = False):
7873 if self .thread :
7974 self .thread .join (timeout = 1.0 )
8075
81- # Render final status line to show "Total time:"
76+ # Clear any previous output first, then render final or clear
77+ if self .last_output :
78+ sys .stdout .write ('\r ' + ' ' * len (self .last_output ) + '\r ' )
79+ sys .stdout .flush ()
80+
8281 if not clear :
8382 final_status = self ._build_status_line ()
8483 sys .stdout .write ('\r ' + final_status )
8584 sys .stdout .flush ()
8685 self .last_output = final_status
87-
88- if clear and self .last_output :
89- # Clear the line
90- sys .stdout .write ('\r ' + ' ' * len (self .last_output ) + '\r ' )
91- sys .stdout .flush ()
86+ else :
9287 self .last_output = ""
9388
9489 def update_status (self , case_index : int , status : CaseStatus ):
@@ -122,19 +117,6 @@ def update_status(self, case_index: int, status: CaseStatus):
122117 duration = current_time - self .case_start_times [case_index ]
123118 self .case_durations .append (duration )
124119
125- def _get_status_char (self , status : CaseStatus ) -> str :
126- """Get the display character for a given status"""
127- if status == CaseStatus .PENDING :
128- return ' '
129- elif status == CaseStatus .RUNNING :
130- return self .spinner_chars [self .spinner_index % len (self .spinner_chars )]
131- elif status == CaseStatus .DONE :
132- return '■'
133- elif status == CaseStatus .FAILED :
134- return '□'
135- else :
136- return 'o'
137-
138120 def _format_eta (self , seconds : float ) -> str :
139121 """Format ETA in human-readable format"""
140122 if seconds < 60 :
@@ -149,50 +131,67 @@ def _format_eta(self, seconds: float) -> str:
149131 return f"{ hours } h { minutes } m"
150132
151133 def _build_status_line (self ) -> str :
152- """Build the status line showing all cases """
134+ """Build a fixed-width progress bar with percentage and ETA """
153135 with self .lock :
154- # Build character array
155- chars = [self ._get_status_char (status ) for status in self .statuses ]
156-
157- # For single case, just show the spinner bar
158- if self .num_cases == 1 :
159- status_line = f"[{ '' .join (chars )} ]"
136+ # Count statuses
137+ done = sum (1 for s in self .statuses if s == CaseStatus .DONE )
138+ failed = sum (1 for s in self .statuses if s == CaseStatus .FAILED )
139+ running = sum (1 for s in self .statuses if s == CaseStatus .RUNNING )
140+ completed = done + failed
141+ remaining = self .num_cases - completed
142+ pct = completed * 100 // self .num_cases if self .num_cases > 0 else 100
143+
144+ # Spinner character for visual feedback when cases are running
145+ spinner = self .spinner_chars [self .spinner_index % len (self .spinner_chars )] if running > 0 else ' '
146+
147+ # Calculate ETA or Total time
148+ if remaining > 0 and self .case_durations :
149+ avg_duration = sum (self .case_durations ) / len (self .case_durations )
150+ eta_seconds = (avg_duration * remaining ) / self .num_calculators
151+ time_text = f"ETA: { self ._format_eta (eta_seconds )} "
152+ elif remaining > 0 :
153+ time_text = "ETA: ..."
160154 else :
161- # Count statuses
162- completed = sum (1 for s in self .statuses if s in (CaseStatus .DONE , CaseStatus .FAILED ))
163- remaining = self .num_cases - completed
164-
165- # Calculate ETA or Total time
166- if remaining > 0 and self .case_durations :
167- # Use average duration of completed cases
168- avg_duration = sum (self .case_durations ) / len (self .case_durations )
169- # Divide by number of calculators for parallel execution
170- eta_seconds = (avg_duration * remaining ) / self .num_calculators
171- eta_text = f"ETA: { self ._format_eta (eta_seconds )} "
172- elif remaining > 0 :
173- # No completed cases yet, show calculating
174- eta_text = "ETA: ..."
155+ if self .start_time is not None :
156+ total_time = time .time () - self .start_time
157+ time_text = f"Total: { self ._format_eta (total_time )} "
175158 else :
176- # All cases completed - show total time
177- if self .start_time is not None :
178- total_time = time .time () - self .start_time
179- eta_text = f"Total time: { self ._format_eta (total_time )} "
180- else :
181- eta_text = "Done"
159+ time_text = "Done"
182160
183- # Build final line
184- status_line = f"[{ '' .join (chars )} ] { eta_text } "
161+ # Build suffix: " 35% (7/20) ETA: 1m 45s"
162+ if failed > 0 :
163+ suffix = f" { pct :3d} % ({ done } +{ failed } err/{ self .num_cases } ) { time_text } "
164+ else :
165+ suffix = f" { pct :3d} % ({ completed } /{ self .num_cases } ) { time_text } "
166+
167+ # Determine bar width from terminal, reserving space for brackets + spinner + suffix
168+ try :
169+ term_width = shutil .get_terminal_size ().columns
170+ except Exception :
171+ term_width = 80
172+ # Format: "S [=====> ] suffix" where S is spinner char
173+ overhead = 2 + 1 + 1 + 2 + len (suffix ) # spinner + space + [ + ] + suffix
174+ bar_width = max (10 , min (40 , term_width - overhead ))
175+
176+ # Build the bar
177+ filled = int (bar_width * completed / self .num_cases ) if self .num_cases > 0 else bar_width
178+ filled = min (filled , bar_width )
179+ if remaining > 0 and filled < bar_width :
180+ bar = '█' * filled + '>' + '░' * (bar_width - filled - 1 )
181+ else :
182+ bar = '█' * filled + '░' * (bar_width - filled )
185183
186- return status_line
184+ return f" { spinner } [ { bar } ] { suffix } "
187185
188186 def _animate (self ):
189187 """Animation loop that runs in background thread"""
190188 while not self .stop_event .is_set ():
191189 # Build and display status line
192190 status_line = self ._build_status_line ()
193191
194- # Update display (overwrite previous line)
195- sys .stdout .write ('\r ' + status_line )
192+ # Clear previous output and write new line
193+ clear_len = max (len (self .last_output ), len (status_line ))
194+ sys .stdout .write ('\r ' + ' ' * clear_len + '\r ' + status_line )
196195 sys .stdout .flush ()
197196 self .last_output = status_line
198197
0 commit comments