Coverage for controller / converter_controller.py: 100.00%
220 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-07 00:07 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-07 00:07 +0000
1"""
2Controller for document conversion workflow.
4This controller orchestrates the conversion process, handling:
5- Batch file processing
6- File routing to appropriate converters
7- Merge functionality
8- Progress tracking coordination
9"""
10from typing import List, Optional, Dict, Type, Callable
11import time
12from controller.path_protocol import PathLike
13from view.merge_mode import MergeMode
14from view.interface import UIInterface
15from view.output_format import OutputFormat
16from domain.core.output_handler import OutputHandler
17from domain.core.base_converter import BaseConverter
18from domain.model.file import File
19from domain.adapters.file_factories import file_from_path
20from controller.workflow.state_machine import WorkflowState, ConversionWorkflow
21from view.interface import ActionResult, ActionKind
24ConverterMap = Dict[str, Type[BaseConverter]]
25HandlerMap = Dict[OutputFormat, Type[OutputHandler]]
26PathFactory = Callable[[str], PathLike]
28MERGE_SOURCE_DELIMITER = "\n--- start source: {source} ---\n"
30class ConverterController:
31 """Controller that orchestrates document conversion workflow."""
33 def __init__(
34 self,
35 ui: UIInterface,
36 converters: ConverterMap,
37 handlers: HandlerMap,
38 path_factory: PathFactory,
39 file_factory: Callable[[PathLike], File] = file_from_path,
40 ):
42 self.ui = ui
43 self.converters = converters
44 self.handlers = handlers
45 self.path_factory = path_factory
46 self.file_factory = file_factory
47 self.workflow = ConversionWorkflow(on_state_change=self._update_breadcrumb_state)
49 self._update_breadcrumb_state()
51 def run(self, loop: bool = True):
52 """Run the workflow.
54 Args:
55 loop: If True, run until completion; if False, execute a single state step and
56 return a boolean indicating whether the workflow should continue.
58 Returns:
59 When `loop` is False, returns True to indicate the caller may continue, or
60 False to indicate the workflow should stop. When `loop` is True, returns None.
61 """
62 def run_once() -> bool:
63 handlers: Dict[WorkflowState, Callable[[], Optional[bool]]] = {
64 WorkflowState.SOURCE_INPUT: self._handle_source_input,
65 WorkflowState.ERROR: self._handle_error,
66 WorkflowState.FORMAT_SELECTION: self._handle_format_selection,
67 WorkflowState.MERGE_MODE_SELECTION: self._handle_merge_mode_selection,
68 WorkflowState.FILES_SELECTION: self._handle_files_selection,
69 WorkflowState.PROCESSING: self._handle_processing,
70 WorkflowState.COMPLETE: self._handle_complete,
71 }
73 current_state = self.workflow.get_state()
74 result = handlers.get(current_state)()
76 if result.kind == ActionKind.BACK and self.workflow.can_go_back():
77 self.workflow.back()
78 return True
79 if result.kind == ActionKind.TERMINATE:
80 return False
81 if result.kind == ActionKind.ERROR:
82 # Set error in context and transition to ERROR state
83 self.workflow.context.error_message = result.message
84 self.workflow.context.error_origin = current_state
85 self.workflow.state = WorkflowState.ERROR
86 return True
87 if result.kind == ActionKind.PROCEED:
88 return True
90 return True
92 if not loop:
93 return run_once()
95 while run_once():
96 ...
98 def _get_files_to_process(self, input_path: PathLike) -> List[PathLike]:
99 """
100 Determine which files to process based on input path.
102 Args:
103 input_path: User-provided path (file or directory)
105 Returns:
106 List of files to process
107 """
108 if input_path.is_dir():
109 compatible_files = self.workflow.context.compatible_files
110 file_data = [self.file_factory(path).to_dict() for path in compatible_files]
111 result = self.ui.select_files(file_data)
112 if result.kind != ActionKind.VALUE:
113 return []
114 selected_indices = result.payload
115 return [compatible_files[i] for i in selected_indices]
116 else:
117 return [input_path]
119 def _process_files(
120 self,
121 files: List[PathLike],
122 handler: OutputHandler,
123 merge_mode: MergeMode
124 ) -> tuple[List[str], int, int]:
125 """
126 Process all files with progress tracking.
128 Args:
129 files: List of files to process
130 handler: Output format handler
131 merge_mode: MergeMode enum value (NO_MERGE, MERGE, or PER_PAGE)
133 Returns:
134 Tuple of (accumulator, output_count, total_output_size)
135 - accumulator: List of accumulated content (empty unless merge_mode == MergeMode.MERGE)
136 - output_count: Number of output files/pages/chapters created
137 - total_output_size: Total size of all output files created
138 """
139 accumulator = []
140 output_count = 0
141 total_output_size = 0
143 with self.ui.get_progress_bar() as progress:
144 # Create progress task for each file
145 tasks = {
146 file: progress.add_task(
147 "",
148 total=100,
149 status="pending",
150 filename=file.name
151 )
152 for file in files
153 }
155 for file in files:
156 content, file_output_count, file_output_size = self._process_single_file(
157 file,
158 tasks[file],
159 progress,
160 handler,
161 merge_mode
162 )
164 output_count += file_output_count
165 total_output_size += file_output_size
167 if content and merge_mode == MergeMode.MERGE:
168 accumulator.append(
169 MERGE_SOURCE_DELIMITER.format(source=file.name) + content
170 )
172 return accumulator, output_count, total_output_size
174 def _process_single_file(
175 self,
176 file: PathLike,
177 task_id: int,
178 progress,
179 handler: OutputHandler,
180 merge_mode: MergeMode
181 ) -> tuple[Optional[str], int, int]:
182 """
183 Process a single file with progress tracking.
185 Args:
186 file: File to process
187 task_id: Progress bar task ID
188 progress: Progress bar instance
189 handler: Output format handler
190 merge_mode: One of "no_merge", "merge", or "per_page"
192 Returns:
193 Tuple of (content, output_count, output_size)
194 - content: Extracted content if merge_mode == MergeMode.MERGE, None otherwise
195 - output_count: Number of output files/pages/chapters created for this file
196 - output_size: Total size of output files created for this file
197 """
198 file_start = time.perf_counter()
200 # Update progress to converting
201 progress.update(
202 task_id,
203 status="converting",
204 filename=file.name,
205 completed=0,
206 start_time=file_start,
207 )
209 converter = self.converters[file.suffix.lower()](file)
211 # Create progress callback
212 def progress_callback(current, total):
213 try:
214 pct = int((current / total) * 100) if total else 100
215 progress.update(
216 task_id,
217 completed=pct,
218 status="converting",
219 filename=file.name,
220 start_time=file_start,
221 )
222 except Exception:
223 pass
225 # Extract and save based on merge mode
226 output_count = 0
227 output_size = 0
228 if merge_mode == MergeMode.PER_PAGE:
229 # Per-page output: extract and save individual pages as separate files
230 contents = converter.extract_content_per_item(progress_callback=progress_callback)
231 output_size = handler.save_multiple(contents, file, file.name)
232 output_count = len(contents) # Number of pages/chapters
233 content = None # No content to accumulate for merge
234 else:
235 # Extract content as single string
236 content = converter.extract_content(progress_callback=progress_callback)
238 # Save based on merge mode
239 if merge_mode == MergeMode.NO_MERGE:
240 # Save as single file (existing behavior)
241 output_size = handler.save(content, file)
242 output_count = 1 # One output file per input file
243 # If merge_mode == MergeMode.MERGE, don't save now, will be merged later
244 # output_count will be set to 1 later for the merged file
246 # Mark as complete
247 file_elapsed = time.perf_counter() - file_start
248 progress.update(
249 task_id,
250 completed=100,
251 status="done",
252 filename=file.name,
253 conversion_time=file_elapsed,
254 )
256 return content, output_count, output_size
258 def _save_merged_output(
259 self,
260 input_path: PathLike,
261 handler: OutputHandler,
262 accumulator: List[str],
263 format_choice: OutputFormat,
264 merged_filename: str,
265 ) -> tuple[str, int]:
266 """
267 Save merged output to single file.
269 Args:
270 input_path: Original input path
271 handler: Output format handler
272 accumulator: List of content strings to merge
273 merged_filename: Optional custom filename for the merged output (without extension)
274 format_choice: OutputFormat choice
276 Returns:
277 Tuple of (filename, output_size)
278 - filename: Name of the merged output file with extension
279 - output_size: Size of the merged output file
280 """
281 # Place merged file inside `input_path` if it's a directory,
282 # otherwise place it adjacent to the input file using `with_name()`
283 # so this works with Path-like mocks used in tests.
284 if input_path.is_dir():
285 output_name = input_path / merged_filename
286 else:
287 output_name = input_path.with_name(merged_filename)
288 output_size = handler.save("\n\n".join(accumulator), output_name)
289 # Compute the actual filename with extension
290 actual_filename = output_name.with_suffix(format_choice.extension).name
292 return actual_filename, output_size
294 def _get_compatible_files(self, directory: PathLike) -> List[PathLike]:
295 supported_extensions = list(self.converters.keys())
296 return [
297 f for f in directory.iterdir()
298 if f.suffix.lower() in supported_extensions
299 ]
301 def _update_breadcrumb_state(self):
302 """Update UI breadcrumb state based on current workflow state and context.
304 Build a list of simple, UI-friendly labels (strings) that describe the
305 path taken through the workflow. The breadcrumb is derived from the
306 workflow state stack (history) plus the current state, and uses values
307 from the workflow context when available (e.g. input filename,
308 selected format, merge display name) to create meaningful labels.
310 Returns:
311 List[str]: breadcrumb labels suitable for rendering by the UI.
312 """
313 ctx = self.workflow.context
314 # Only update breadcrumb for the initial workflow steps
315 # (source -> format -> merge mode -> files). Avoid updating for
316 # processing/complete/error states where a breadcrumb is not useful.
317 current = self.workflow.get_state()
318 if current not in (
319 WorkflowState.SOURCE_INPUT,
320 WorkflowState.FORMAT_SELECTION,
321 WorkflowState.MERGE_MODE_SELECTION,
322 WorkflowState.FILES_SELECTION,
323 WorkflowState.PROCESSING,
324 ):
325 return
327 def label_for_state(state: WorkflowState) -> str:
328 if state == WorkflowState.SOURCE_INPUT:
329 return str(ctx.input_path) if ctx.input_path else state.display_name
331 if state == WorkflowState.FORMAT_SELECTION:
332 return ctx.format_choice.value if ctx.format_choice else state.display_name
334 if state == WorkflowState.MERGE_MODE_SELECTION:
335 if ctx.merged_filename:
336 return f"merged to: {ctx.merged_filename}"
337 return ctx.merge_mode.display_name if ctx.merge_mode else state.display_name
339 if state == WorkflowState.PROCESSING:
340 return f"({(l := len(ctx.files))}) file{"s" if l > 1 else ""}"
342 return state.display_name
344 segments = self.workflow.get_history() + [self.workflow.get_state()]
345 self.ui.breadcrumb = [label_for_state(s) for s in segments if s != WorkflowState.FILES_SELECTION]
347 def _handle_source_input(self):
348 result = self.ui.get_path_input()
349 if result.kind != ActionKind.VALUE:
350 return result
352 if not (input_str := result.payload):
353 return ActionResult.error("please provide a source file or directory")
355 input_path = self.path_factory(input_str)
357 if not input_path.exists():
358 return ActionResult.error("path not found")
360 if input_path.is_dir():
361 compatible_files = self._get_compatible_files(input_path)
362 if not compatible_files:
363 return ActionResult.error("no compatible files found in directory")
364 else:
365 if input_path.suffix.lower() not in self.converters:
366 return ActionResult.error("selected file type is not supported")
367 compatible_files = [input_path]
369 self.workflow.context.compatible_files = compatible_files
370 self.workflow.context.input_path = input_path
371 self.workflow.next()
372 return ActionResult.proceed()
374 def _handle_format_selection(self):
375 result = self.ui.select_output_format()
376 if result.kind != ActionKind.VALUE:
377 return result
378 format_choice = result.payload
380 self.workflow.context.format_choice = format_choice
381 self.workflow.next()
382 return ActionResult.proceed()
384 def _handle_merge_mode_selection(self):
385 result = self.ui.select_merge_mode()
386 if result.kind != ActionKind.VALUE:
387 return result
388 merge_mode = result.payload
390 self.workflow.context.merge_mode = merge_mode
391 if merge_mode == MergeMode.MERGE:
392 merged_result = self.ui.prompt_merged_filename()
393 if merged_result.kind != ActionKind.VALUE:
394 return merged_result
395 merged_filename = merged_result.payload
396 self.workflow.context.merged_filename = merged_filename
398 self.workflow.next()
399 return ActionResult.proceed()
401 def _handle_files_selection(self):
402 input_path = self.workflow.context.input_path
403 compatible_files = self.workflow.context.compatible_files
405 if input_path.is_dir():
406 file_data = [self.file_factory(path).to_dict() for path in compatible_files]
407 result = self.ui.select_files(file_data)
408 if result.kind != ActionKind.VALUE:
409 return result
410 selected_indices = result.payload
411 files = [compatible_files[i] for i in selected_indices]
412 else:
413 files = [input_path]
415 if not files:
416 self.workflow.back()
417 return ActionResult.error("no files selected")
419 self.workflow.context.files = files
420 self.workflow.next()
421 return ActionResult.proceed()
423 def _handle_processing(self):
424 context = self.workflow.context
425 context.handler = (handler := self.handlers[context.format_choice]())
427 total_input_size = sum(file.stat().st_size for file in context.files)
429 start_time = time.perf_counter()
430 accumulator, output_count, total_output_size = self._process_files(
431 context.files, handler, context.merge_mode
432 )
434 merged_output_filename = None
435 if context.merge_mode == MergeMode.MERGE and accumulator:
436 merged_output_filename, merge_output_size = self._save_merged_output(
437 context.input_path, handler, accumulator, context.format_choice, context.merged_filename
438 )
439 total_output_size += merge_output_size
440 output_count = 1
442 elapsed = time.perf_counter() - start_time
444 single_output_filename = None
445 if context.merge_mode == MergeMode.NO_MERGE and len(context.files) == 1:
446 single_output_filename = context.files[0].with_suffix(context.format_choice.extension).name
448 self.workflow.next()
449 self._update_breadcrumb_state()
451 self.ui.show_conversion_summary(
452 total_files=len(context.files),
453 output_count=output_count,
454 merge_mode=context.merge_mode,
455 merged_filename=merged_output_filename,
456 total_runtime=elapsed,
457 total_input_size_formatted=File.format_file_size(total_input_size),
458 total_output_size_formatted=File.format_file_size(total_output_size),
459 single_output_filename=single_output_filename
460 )
461 return ActionResult.proceed()
463 def _handle_complete(self) -> bool:
464 if self.ui.ask_again().kind == ActionKind.PROCEED:
465 self.workflow.reset()
466 return ActionResult.proceed()
467 else:
468 self.workflow.next()
469 return ActionResult.terminate()
471 def _handle_error(self) -> bool:
472 """Handle transient errors: show message and use `ask_again()` for retry/quit.
474 Returns:
475 True to continue (restart), False to stop the run loop (quit).
476 """
477 msg = self.workflow.context.error_message
478 self.ui.show_error(msg)
480 if self.ui.ask_again().kind == ActionKind.PROCEED:
481 # On retry, return to the originating state if available.
482 origin = self.workflow.context.error_origin
483 # Clear error fields
484 self.workflow.context.error_message = None
485 self.workflow.context.error_origin = None
487 if origin is not None:
488 self.workflow.state = origin
489 return ActionResult.proceed()
490 # Fallback: reset workflow
491 self.workflow.reset()
492 return ActionResult.proceed()
493 else:
494 return ActionResult.terminate()