Coverage for controller / converter_controller.py: 100.00%

220 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-07 00:07 +0000

1""" 

2Controller for document conversion workflow. 

3 

4This controller orchestrates the conversion process, handling: 

5- Batch file processing 

6- File routing to appropriate converters 

7- Merge functionality 

8- Progress tracking coordination 

9""" 

10from typing import List, Optional, Dict, Type, Callable 

11import time 

12from controller.path_protocol import PathLike 

13from view.merge_mode import MergeMode 

14from view.interface import UIInterface 

15from view.output_format import OutputFormat 

16from domain.core.output_handler import OutputHandler 

17from domain.core.base_converter import BaseConverter 

18from domain.model.file import File 

19from domain.adapters.file_factories import file_from_path 

20from controller.workflow.state_machine import WorkflowState, ConversionWorkflow 

21from view.interface import ActionResult, ActionKind 

22 

23 

24ConverterMap = Dict[str, Type[BaseConverter]] 

25HandlerMap = Dict[OutputFormat, Type[OutputHandler]] 

26PathFactory = Callable[[str], PathLike] 

27 

28MERGE_SOURCE_DELIMITER = "\n--- start source: {source} ---\n" 

29 

30class ConverterController: 

31 """Controller that orchestrates document conversion workflow.""" 

32 

33 def __init__( 

34 self, 

35 ui: UIInterface, 

36 converters: ConverterMap, 

37 handlers: HandlerMap, 

38 path_factory: PathFactory, 

39 file_factory: Callable[[PathLike], File] = file_from_path, 

40 ): 

41 

42 self.ui = ui 

43 self.converters = converters 

44 self.handlers = handlers 

45 self.path_factory = path_factory 

46 self.file_factory = file_factory 

47 self.workflow = ConversionWorkflow(on_state_change=self._update_breadcrumb_state) 

48 

49 self._update_breadcrumb_state() 

50 

51 def run(self, loop: bool = True): 

52 """Run the workflow. 

53 

54 Args: 

55 loop: If True, run until completion; if False, execute a single state step and 

56 return a boolean indicating whether the workflow should continue. 

57 

58 Returns: 

59 When `loop` is False, returns True to indicate the caller may continue, or 

60 False to indicate the workflow should stop. When `loop` is True, returns None. 

61 """ 

62 def run_once() -> bool: 

63 handlers: Dict[WorkflowState, Callable[[], Optional[bool]]] = { 

64 WorkflowState.SOURCE_INPUT: self._handle_source_input, 

65 WorkflowState.ERROR: self._handle_error, 

66 WorkflowState.FORMAT_SELECTION: self._handle_format_selection, 

67 WorkflowState.MERGE_MODE_SELECTION: self._handle_merge_mode_selection, 

68 WorkflowState.FILES_SELECTION: self._handle_files_selection, 

69 WorkflowState.PROCESSING: self._handle_processing, 

70 WorkflowState.COMPLETE: self._handle_complete, 

71 } 

72 

73 current_state = self.workflow.get_state() 

74 result = handlers.get(current_state)() 

75 

76 if result.kind == ActionKind.BACK and self.workflow.can_go_back(): 

77 self.workflow.back() 

78 return True 

79 if result.kind == ActionKind.TERMINATE: 

80 return False 

81 if result.kind == ActionKind.ERROR: 

82 # Set error in context and transition to ERROR state 

83 self.workflow.context.error_message = result.message 

84 self.workflow.context.error_origin = current_state 

85 self.workflow.state = WorkflowState.ERROR 

86 return True 

87 if result.kind == ActionKind.PROCEED: 

88 return True 

89 

90 return True 

91 

92 if not loop: 

93 return run_once() 

94 

95 while run_once(): 

96 ... 

97 

98 def _get_files_to_process(self, input_path: PathLike) -> List[PathLike]: 

99 """ 

100 Determine which files to process based on input path. 

101  

102 Args: 

103 input_path: User-provided path (file or directory) 

104  

105 Returns: 

106 List of files to process 

107 """ 

108 if input_path.is_dir(): 

109 compatible_files = self.workflow.context.compatible_files 

110 file_data = [self.file_factory(path).to_dict() for path in compatible_files] 

111 result = self.ui.select_files(file_data) 

112 if result.kind != ActionKind.VALUE: 

113 return [] 

114 selected_indices = result.payload 

115 return [compatible_files[i] for i in selected_indices] 

116 else: 

117 return [input_path] 

118 

119 def _process_files( 

120 self, 

121 files: List[PathLike], 

122 handler: OutputHandler, 

123 merge_mode: MergeMode 

124 ) -> tuple[List[str], int, int]: 

125 """ 

126 Process all files with progress tracking. 

127  

128 Args: 

129 files: List of files to process 

130 handler: Output format handler 

131 merge_mode: MergeMode enum value (NO_MERGE, MERGE, or PER_PAGE) 

132  

133 Returns: 

134 Tuple of (accumulator, output_count, total_output_size) 

135 - accumulator: List of accumulated content (empty unless merge_mode == MergeMode.MERGE) 

136 - output_count: Number of output files/pages/chapters created 

137 - total_output_size: Total size of all output files created 

138 """ 

139 accumulator = [] 

140 output_count = 0 

141 total_output_size = 0 

142 

143 with self.ui.get_progress_bar() as progress: 

144 # Create progress task for each file 

145 tasks = { 

146 file: progress.add_task( 

147 "", 

148 total=100, 

149 status="pending", 

150 filename=file.name 

151 ) 

152 for file in files 

153 } 

154 

155 for file in files: 

156 content, file_output_count, file_output_size = self._process_single_file( 

157 file, 

158 tasks[file], 

159 progress, 

160 handler, 

161 merge_mode 

162 ) 

163 

164 output_count += file_output_count 

165 total_output_size += file_output_size 

166 

167 if content and merge_mode == MergeMode.MERGE: 

168 accumulator.append( 

169 MERGE_SOURCE_DELIMITER.format(source=file.name) + content 

170 ) 

171 

172 return accumulator, output_count, total_output_size 

173 

174 def _process_single_file( 

175 self, 

176 file: PathLike, 

177 task_id: int, 

178 progress, 

179 handler: OutputHandler, 

180 merge_mode: MergeMode 

181 ) -> tuple[Optional[str], int, int]: 

182 """ 

183 Process a single file with progress tracking. 

184  

185 Args: 

186 file: File to process 

187 task_id: Progress bar task ID 

188 progress: Progress bar instance 

189 handler: Output format handler 

190 merge_mode: One of "no_merge", "merge", or "per_page" 

191  

192 Returns: 

193 Tuple of (content, output_count, output_size) 

194 - content: Extracted content if merge_mode == MergeMode.MERGE, None otherwise 

195 - output_count: Number of output files/pages/chapters created for this file 

196 - output_size: Total size of output files created for this file 

197 """ 

198 file_start = time.perf_counter() 

199 

200 # Update progress to converting 

201 progress.update( 

202 task_id, 

203 status="converting", 

204 filename=file.name, 

205 completed=0, 

206 start_time=file_start, 

207 ) 

208 

209 converter = self.converters[file.suffix.lower()](file) 

210 

211 # Create progress callback 

212 def progress_callback(current, total): 

213 try: 

214 pct = int((current / total) * 100) if total else 100 

215 progress.update( 

216 task_id, 

217 completed=pct, 

218 status="converting", 

219 filename=file.name, 

220 start_time=file_start, 

221 ) 

222 except Exception: 

223 pass 

224 

225 # Extract and save based on merge mode 

226 output_count = 0 

227 output_size = 0 

228 if merge_mode == MergeMode.PER_PAGE: 

229 # Per-page output: extract and save individual pages as separate files 

230 contents = converter.extract_content_per_item(progress_callback=progress_callback) 

231 output_size = handler.save_multiple(contents, file, file.name) 

232 output_count = len(contents) # Number of pages/chapters 

233 content = None # No content to accumulate for merge 

234 else: 

235 # Extract content as single string 

236 content = converter.extract_content(progress_callback=progress_callback) 

237 

238 # Save based on merge mode 

239 if merge_mode == MergeMode.NO_MERGE: 

240 # Save as single file (existing behavior) 

241 output_size = handler.save(content, file) 

242 output_count = 1 # One output file per input file 

243 # If merge_mode == MergeMode.MERGE, don't save now, will be merged later 

244 # output_count will be set to 1 later for the merged file 

245 

246 # Mark as complete 

247 file_elapsed = time.perf_counter() - file_start 

248 progress.update( 

249 task_id, 

250 completed=100, 

251 status="done", 

252 filename=file.name, 

253 conversion_time=file_elapsed, 

254 ) 

255 

256 return content, output_count, output_size 

257 

258 def _save_merged_output( 

259 self, 

260 input_path: PathLike, 

261 handler: OutputHandler, 

262 accumulator: List[str], 

263 format_choice: OutputFormat, 

264 merged_filename: str, 

265 ) -> tuple[str, int]: 

266 """ 

267 Save merged output to single file. 

268  

269 Args: 

270 input_path: Original input path 

271 handler: Output format handler 

272 accumulator: List of content strings to merge 

273 merged_filename: Optional custom filename for the merged output (without extension) 

274 format_choice: OutputFormat choice 

275  

276 Returns: 

277 Tuple of (filename, output_size) 

278 - filename: Name of the merged output file with extension 

279 - output_size: Size of the merged output file 

280 """ 

281 # Place merged file inside `input_path` if it's a directory, 

282 # otherwise place it adjacent to the input file using `with_name()` 

283 # so this works with Path-like mocks used in tests. 

284 if input_path.is_dir(): 

285 output_name = input_path / merged_filename 

286 else: 

287 output_name = input_path.with_name(merged_filename) 

288 output_size = handler.save("\n\n".join(accumulator), output_name) 

289 # Compute the actual filename with extension 

290 actual_filename = output_name.with_suffix(format_choice.extension).name 

291 

292 return actual_filename, output_size 

293 

294 def _get_compatible_files(self, directory: PathLike) -> List[PathLike]: 

295 supported_extensions = list(self.converters.keys()) 

296 return [ 

297 f for f in directory.iterdir() 

298 if f.suffix.lower() in supported_extensions 

299 ] 

300 

301 def _update_breadcrumb_state(self): 

302 """Update UI breadcrumb state based on current workflow state and context. 

303 

304 Build a list of simple, UI-friendly labels (strings) that describe the 

305 path taken through the workflow. The breadcrumb is derived from the 

306 workflow state stack (history) plus the current state, and uses values 

307 from the workflow context when available (e.g. input filename, 

308 selected format, merge display name) to create meaningful labels. 

309 

310 Returns: 

311 List[str]: breadcrumb labels suitable for rendering by the UI. 

312 """ 

313 ctx = self.workflow.context 

314 # Only update breadcrumb for the initial workflow steps 

315 # (source -> format -> merge mode -> files). Avoid updating for 

316 # processing/complete/error states where a breadcrumb is not useful. 

317 current = self.workflow.get_state() 

318 if current not in ( 

319 WorkflowState.SOURCE_INPUT, 

320 WorkflowState.FORMAT_SELECTION, 

321 WorkflowState.MERGE_MODE_SELECTION, 

322 WorkflowState.FILES_SELECTION, 

323 WorkflowState.PROCESSING, 

324 ): 

325 return 

326 

327 def label_for_state(state: WorkflowState) -> str: 

328 if state == WorkflowState.SOURCE_INPUT: 

329 return str(ctx.input_path) if ctx.input_path else state.display_name 

330 

331 if state == WorkflowState.FORMAT_SELECTION: 

332 return ctx.format_choice.value if ctx.format_choice else state.display_name 

333 

334 if state == WorkflowState.MERGE_MODE_SELECTION: 

335 if ctx.merged_filename: 

336 return f"merged to: {ctx.merged_filename}" 

337 return ctx.merge_mode.display_name if ctx.merge_mode else state.display_name 

338 

339 if state == WorkflowState.PROCESSING: 

340 return f"({(l := len(ctx.files))}) file{"s" if l > 1 else ""}" 

341 

342 return state.display_name 

343 

344 segments = self.workflow.get_history() + [self.workflow.get_state()] 

345 self.ui.breadcrumb = [label_for_state(s) for s in segments if s != WorkflowState.FILES_SELECTION] 

346 

347 def _handle_source_input(self): 

348 result = self.ui.get_path_input() 

349 if result.kind != ActionKind.VALUE: 

350 return result 

351 

352 if not (input_str := result.payload): 

353 return ActionResult.error("please provide a source file or directory") 

354 

355 input_path = self.path_factory(input_str) 

356 

357 if not input_path.exists(): 

358 return ActionResult.error("path not found") 

359 

360 if input_path.is_dir(): 

361 compatible_files = self._get_compatible_files(input_path) 

362 if not compatible_files: 

363 return ActionResult.error("no compatible files found in directory") 

364 else: 

365 if input_path.suffix.lower() not in self.converters: 

366 return ActionResult.error("selected file type is not supported") 

367 compatible_files = [input_path] 

368 

369 self.workflow.context.compatible_files = compatible_files 

370 self.workflow.context.input_path = input_path 

371 self.workflow.next() 

372 return ActionResult.proceed() 

373 

374 def _handle_format_selection(self): 

375 result = self.ui.select_output_format() 

376 if result.kind != ActionKind.VALUE: 

377 return result 

378 format_choice = result.payload 

379 

380 self.workflow.context.format_choice = format_choice 

381 self.workflow.next() 

382 return ActionResult.proceed() 

383 

384 def _handle_merge_mode_selection(self): 

385 result = self.ui.select_merge_mode() 

386 if result.kind != ActionKind.VALUE: 

387 return result 

388 merge_mode = result.payload 

389 

390 self.workflow.context.merge_mode = merge_mode 

391 if merge_mode == MergeMode.MERGE: 

392 merged_result = self.ui.prompt_merged_filename() 

393 if merged_result.kind != ActionKind.VALUE: 

394 return merged_result 

395 merged_filename = merged_result.payload 

396 self.workflow.context.merged_filename = merged_filename 

397 

398 self.workflow.next() 

399 return ActionResult.proceed() 

400 

401 def _handle_files_selection(self): 

402 input_path = self.workflow.context.input_path 

403 compatible_files = self.workflow.context.compatible_files 

404 

405 if input_path.is_dir(): 

406 file_data = [self.file_factory(path).to_dict() for path in compatible_files] 

407 result = self.ui.select_files(file_data) 

408 if result.kind != ActionKind.VALUE: 

409 return result 

410 selected_indices = result.payload 

411 files = [compatible_files[i] for i in selected_indices] 

412 else: 

413 files = [input_path] 

414 

415 if not files: 

416 self.workflow.back() 

417 return ActionResult.error("no files selected") 

418 

419 self.workflow.context.files = files 

420 self.workflow.next() 

421 return ActionResult.proceed() 

422 

423 def _handle_processing(self): 

424 context = self.workflow.context 

425 context.handler = (handler := self.handlers[context.format_choice]()) 

426 

427 total_input_size = sum(file.stat().st_size for file in context.files) 

428 

429 start_time = time.perf_counter() 

430 accumulator, output_count, total_output_size = self._process_files( 

431 context.files, handler, context.merge_mode 

432 ) 

433 

434 merged_output_filename = None 

435 if context.merge_mode == MergeMode.MERGE and accumulator: 

436 merged_output_filename, merge_output_size = self._save_merged_output( 

437 context.input_path, handler, accumulator, context.format_choice, context.merged_filename 

438 ) 

439 total_output_size += merge_output_size 

440 output_count = 1 

441 

442 elapsed = time.perf_counter() - start_time 

443 

444 single_output_filename = None 

445 if context.merge_mode == MergeMode.NO_MERGE and len(context.files) == 1: 

446 single_output_filename = context.files[0].with_suffix(context.format_choice.extension).name 

447 

448 self.workflow.next() 

449 self._update_breadcrumb_state() 

450 

451 self.ui.show_conversion_summary( 

452 total_files=len(context.files), 

453 output_count=output_count, 

454 merge_mode=context.merge_mode, 

455 merged_filename=merged_output_filename, 

456 total_runtime=elapsed, 

457 total_input_size_formatted=File.format_file_size(total_input_size), 

458 total_output_size_formatted=File.format_file_size(total_output_size), 

459 single_output_filename=single_output_filename 

460 ) 

461 return ActionResult.proceed() 

462 

463 def _handle_complete(self) -> bool: 

464 if self.ui.ask_again().kind == ActionKind.PROCEED: 

465 self.workflow.reset() 

466 return ActionResult.proceed() 

467 else: 

468 self.workflow.next() 

469 return ActionResult.terminate() 

470 

471 def _handle_error(self) -> bool: 

472 """Handle transient errors: show message and use `ask_again()` for retry/quit. 

473 

474 Returns: 

475 True to continue (restart), False to stop the run loop (quit). 

476 """ 

477 msg = self.workflow.context.error_message 

478 self.ui.show_error(msg) 

479 

480 if self.ui.ask_again().kind == ActionKind.PROCEED: 

481 # On retry, return to the originating state if available. 

482 origin = self.workflow.context.error_origin 

483 # Clear error fields 

484 self.workflow.context.error_message = None 

485 self.workflow.context.error_origin = None 

486 

487 if origin is not None: 

488 self.workflow.state = origin 

489 return ActionResult.proceed() 

490 # Fallback: reset workflow 

491 self.workflow.reset() 

492 return ActionResult.proceed() 

493 else: 

494 return ActionResult.terminate()