diff --git a/backend/workflow_manager/endpoint_v2/constants.py b/backend/workflow_manager/endpoint_v2/constants.py index d9553245d..6a23900a0 100644 --- a/backend/workflow_manager/endpoint_v2/constants.py +++ b/backend/workflow_manager/endpoint_v2/constants.py @@ -44,7 +44,7 @@ class SourceKey: FILE_EXTENSIONS = "fileExtensions" PROCESS_SUB_DIRECTORIES = "processSubDirectories" MAX_FILES = "maxFiles" - ROOT_FOLDER = "rootFolder" + FOLDERS = "folders" class DestinationKey: @@ -57,6 +57,8 @@ class DestinationKey: PATH = "path" OUTPUT_FOLDER = "outputFolder" OVERWRITE_OUTPUT_DOCUMENT = "overwriteOutput" + FILE_PATH = "filePath" + EXECUTION_ID = "executionId" class OutputJsonKey: @@ -71,8 +73,16 @@ class FileType: class FilePattern: PDF_DOCUMENTS = ["*.pdf"] - TEXT_DOCUMENTS = ["*.txt"] - IMAGES = ["*.jpg", "*.jpeg", "*.png", "*.gif", "*.bmp"] + TEXT_DOCUMENTS = ["*.txt", "*.doc", "*.docx"] + IMAGES = [ + "*.jpg", + "*.jpeg", + "*.png", + "*.gif", + "*.bmp", + "*.tif", + "*.tiff", + ] class SourceConstant: diff --git a/backend/workflow_manager/endpoint_v2/database_utils.py b/backend/workflow_manager/endpoint_v2/database_utils.py index 00b9fbf73..69f7e60c8 100644 --- a/backend/workflow_manager/endpoint_v2/database_utils.py +++ b/backend/workflow_manager/endpoint_v2/database_utils.py @@ -59,7 +59,7 @@ def get_sql_values_for_query( for column in values: if cls_name == DBConnectionClass.SNOWFLAKE: col = column.lower() - type_x = column_types[col] + type_x = column_types.get(col, "") if type_x == "VARIANT": values[column] = values[column].replace("'", "\\'") sql_values[column] = f"parse_json($${values[column]}$$)" @@ -162,6 +162,10 @@ def get_column_types( def get_columns_and_values( column_mode_str: str, data: Any, + file_path: str, + execution_id: str, + file_path_name: str = "file_path", + execution_id_name: str = "execution_id", include_timestamp: bool = False, include_agent: bool = False, agent_name: Optional[str] = AgentName.UNSTRACT_DBWRITER.value, @@ -214,7 +218,8 @@ def get_columns_and_values( values[single_column_name] = data else: values[single_column_name] = json.dumps(data) - + values[file_path_name] = file_path + values[execution_id_name] = execution_id return values @staticmethod diff --git a/backend/workflow_manager/endpoint_v2/destination.py b/backend/workflow_manager/endpoint_v2/destination.py index 39a989d70..88f45b312 100644 --- a/backend/workflow_manager/endpoint_v2/destination.py +++ b/backend/workflow_manager/endpoint_v2/destination.py @@ -3,7 +3,7 @@ import json import logging import os -from typing import Any, Optional +from typing import Any, Optional, Union import fsspec import magic @@ -36,6 +36,9 @@ from workflow_manager.workflow_v2.models.file_history import FileHistory from workflow_manager.workflow_v2.models.workflow import Workflow +from backend.exceptions import UnstractFSException +from unstract.connectors.exceptions import ConnectorError + logger = logging.getLogger(__name__) @@ -182,9 +185,9 @@ def handle_output( self.insert_into_db(input_file_path=input_file_path) elif connection_type == WorkflowEndpoint.ConnectionType.API: result = self.get_result(file_history) - metadata = self.get_metadata(file_history) + exec_metadata = self.get_metadata(file_history) self._handle_api_result( - file_name=file_name, error=error, result=result, metadata=metadata + file_name=file_name, error=error, result=result, metadata=exec_metadata ) elif connection_type == WorkflowEndpoint.ConnectionType.MANUALREVIEW: self._push_data_to_queue(file_name, workflow, input_file_path) @@ -192,6 +195,7 @@ def handle_output( self.execution_service.publish_log( message=f"File '{file_name}' processed successfully" ) + if use_file_history and not file_history: FileHistoryHelper.create_file_history( cache_key=file_hash.file_hash, @@ -222,31 +226,33 @@ def copy_output_to_output_directory(self) -> None: destination_volume_path = os.path.join( self.execution_dir, ToolExecKey.OUTPUT_DIR ) - destination_fs.create_dir_if_not_exists(input_dir=output_directory) - destination_fsspec = destination_fs.get_fsspec_fs() - - # Traverse local directory and create the same structure in the - # output_directory - for root, dirs, files in os.walk(destination_volume_path): - for dir_name in dirs: - destination_fsspec.mkdir( - os.path.join( + + try: + destination_fs.create_dir_if_not_exists(input_dir=output_directory) + + # Traverse local directory and create the same structure in the + # output_directory + for root, dirs, files in os.walk(destination_volume_path): + for dir_name in dirs: + current_dir = os.path.join( output_directory, os.path.relpath(root, destination_volume_path), dir_name, ) - ) + destination_fs.create_dir_if_not_exists(input_dir=current_dir) - for file_name in files: - source_path = os.path.join(root, file_name) - destination_path = os.path.join( - output_directory, - os.path.relpath(root, destination_volume_path), - file_name, - ) - normalized_path = os.path.normpath(destination_path) - with open(source_path, "rb") as source_file: - destination_fsspec.write_bytes(normalized_path, source_file.read()) + for file_name in files: + source_path = os.path.join(root, file_name) + destination_path = os.path.join( + output_directory, + os.path.relpath(root, destination_volume_path), + file_name, + ) + destination_fs.upload_file_to_storage( + source_path=source_path, destination_path=destination_path + ) + except ConnectorError as e: + raise UnstractFSException(core_err=e) from e def insert_into_db(self, input_file_path: str) -> None: """Insert data into the database.""" @@ -276,7 +282,10 @@ def insert_into_db(self, input_file_path: str) -> None: if not data: return # Remove metadata from result - data.pop("metadata", None) + # Tool text-extractor returns data in the form of string. + # Don't pop out metadata in this case. + if isinstance(data, dict): + data.pop("metadata", None) values = DatabaseUtils.get_columns_and_values( column_mode_str=column_mode, data=data, @@ -401,7 +410,7 @@ def get_result(self, file_history: Optional[FileHistory] = None) -> Optional[Any output_file = os.path.join(self.execution_dir, WorkflowFileType.INFILE) metadata: dict[str, Any] = self.get_workflow_metadata() output_type = self.get_output_type(metadata) - result: Optional[Any] = None + result: Union[dict[str, Any], str] = "" try: # TODO: SDK handles validation; consider removing here. mime = magic.Magic() @@ -431,7 +440,7 @@ def get_metadata( """Get metadata from the output file. Returns: - Union[dict[str, Any], str]: Meta data. + Union[dict[str, Any], str]: Metadata. """ if file_history and file_history.metadata: return self.parse_string(file_history.metadata) diff --git a/backend/workflow_manager/endpoint_v2/queue_utils.py b/backend/workflow_manager/endpoint_v2/queue_utils.py index fde790e42..2d26a3ad5 100644 --- a/backend/workflow_manager/endpoint_v2/queue_utils.py +++ b/backend/workflow_manager/endpoint_v2/queue_utils.py @@ -1,7 +1,7 @@ import logging from dataclasses import dataclass from enum import Enum -from typing import Any +from typing import Any, Optional from utils.constants import Common from workflow_manager.endpoint_v2.exceptions import UnstractQueueException @@ -34,8 +34,18 @@ def get_queue_inst(connector_settings: dict[str, Any] = {}) -> UnstractQueue: @dataclass class QueueResult: file: str - whisper_hash: str status: QueueResultStatus result: Any workflow_id: str file_content: str + whisper_hash: Optional[str] = None + + def to_dict(self) -> Any: + return { + "file": self.file, + "whisper_hash": self.whisper_hash, + "status": self.status, + "result": self.result, + "workflow_id": self.workflow_id, + "file_content": self.file_content, + } diff --git a/backend/workflow_manager/workflow_v2/constants.py b/backend/workflow_manager/workflow_v2/constants.py index 95aab13e3..2b9d3cf8e 100644 --- a/backend/workflow_manager/workflow_v2/constants.py +++ b/backend/workflow_manager/workflow_v2/constants.py @@ -58,3 +58,4 @@ class WorkflowMessages: ) FILE_MARKER_CLEAR_SUCCESS = "File marker cleared successfully." FILE_MARKER_CLEAR_FAILED = "Failed to clear file marker." + WORKFLOW_EXECUTION_NOT_FOUND = "Workflow execution not found." diff --git a/backend/workflow_manager/workflow_v2/dto.py b/backend/workflow_manager/workflow_v2/dto.py index 5f2e0db43..b1d8fab12 100644 --- a/backend/workflow_manager/workflow_v2/dto.py +++ b/backend/workflow_manager/workflow_v2/dto.py @@ -41,6 +41,58 @@ def __post_init__(self) -> None: self.message = self.message or None self.status_api = self.status_api or None + def remove_result_metadata_keys(self, keys_to_remove: list[str] = []) -> None: + """Removes specified keys from the 'metadata' dictionary within each + 'result' dictionary in the 'result' list attribute of the instance. If + 'keys_to_remove' is empty, the 'metadata' key itself is removed. + + Args: + keys_to_remove (List[str]): List of keys to be removed from 'metadata'. + """ + if not isinstance(self.result, list): + return + + for item in self.result: + if not isinstance(item, dict): + break + + result = item.get("result") + if not isinstance(result, dict): + break + + self._remove_specific_keys(result=result, keys_to_remove=keys_to_remove) + + def _remove_specific_keys(self, result: dict, keys_to_remove: list[str]) -> None: + """Removes specified keys from the 'metadata' dictionary within the + provided 'result' dictionary. If 'keys_to_remove' is empty, the + 'metadata' dictionary is cleared. + + Args: + result (dict): The dictionary containing the 'metadata' key. + keys_to_remove (List[str]): List of keys to be removed from 'metadata'. + """ + metadata = result.get("metadata", {}) + if keys_to_remove: + for key in keys_to_remove: + metadata.pop(key, None) + else: + metadata = {} + self._update_metadata(result=result, metadata=metadata) + + def _update_metadata(self, result: dict, metadata: dict) -> None: + """Updates the 'metadata' key in the provided 'result' dictionary. If + 'metadata' is empty, removes the 'metadata' key from 'result'. + + Args: + result (dict): The dictionary to be updated. + metadata (dict): The new metadata dictionary to be set. If empty, 'metadata' + is removed. + """ + if metadata: + result["metadata"] = metadata + else: + result.pop("metadata", None) + @dataclass class AsyncResultData: diff --git a/backend/workflow_manager/workflow_v2/execution.py b/backend/workflow_manager/workflow_v2/execution.py index e71b198f4..7d302f03a 100644 --- a/backend/workflow_manager/workflow_v2/execution.py +++ b/backend/workflow_manager/workflow_v2/execution.py @@ -242,14 +242,15 @@ def execute(self, run_id: str, file_name: str, single_step: bool = False) -> Non execution_time = end_time - start_time logger.info(f"Execution {self.execution_id} stopped") raise exception - except Exception as exc: + except Exception as exception: end_time = time.time() execution_time = end_time - start_time - message = str(exc)[:EXECUTION_ERROR_LENGTH] + message = str(exception)[:EXECUTION_ERROR_LENGTH] logger.error( - f"Execution {self.execution_id} ran for {execution_time:.4f}s, {exc}" + f"Execution {self.execution_id} ran for {execution_time:.4f}s, " + f" Error {exception}" ) - raise WorkflowExecutionError(message) from exc + raise WorkflowExecutionError(message) from exception def publish_initial_workflow_logs(self, total_files: int) -> None: """Publishes the initial logs for the workflow. @@ -320,26 +321,6 @@ def execute_input_file( execution_type = ExecutionType.COMPLETE if single_step: execution_type = ExecutionType.STEP - self.execute_uncached_input( - run_id=run_id, file_name=file_name, single_step=single_step - ) - self.publish_log(f"Tool executed successfully for '{file_name}'") - self._handle_execution_type(execution_type) - - def execute_uncached_input( - self, run_id: str, file_name: str, single_step: bool - ) -> None: - """Executes the uncached input file. - - Args: - run_id (str): UUID for a single run of a file - file_name (str): The name of the file to be executed. - single_step (bool): Flag indicating whether to execute in - single step mode. - - Returns: - None - """ self.publish_log( "No entries found in cache, executing the tools" f"running the tool(s) for {file_name}" @@ -350,6 +331,8 @@ def execute_uncached_input( component=LogComponent.SOURCE, ) self.execute(run_id, file_name, single_step) + self.publish_log(f"Tool executed successfully for '{file_name}'") + self._handle_execution_type(execution_type) def initiate_tool_execution( self, diff --git a/backend/workflow_manager/workflow_v2/workflow_helper.py b/backend/workflow_manager/workflow_v2/workflow_helper.py index ca41c9f95..9b2c432de 100644 --- a/backend/workflow_manager/workflow_v2/workflow_helper.py +++ b/backend/workflow_manager/workflow_v2/workflow_helper.py @@ -542,6 +542,7 @@ def execute_workflow( scheduled: bool = False, execution_mode: Optional[tuple[str, str]] = None, pipeline_id: Optional[str] = None, + use_file_history: bool = True, **kwargs: dict[str, Any], ) -> Optional[list[Any]]: """Asynchronous Execution By celery. @@ -557,7 +558,8 @@ def execute_workflow( WorkflowExecution Mode. Defaults to None. pipeline_id (Optional[str], optional): Id of pipeline. Defaults to None. - include_metadata (bool): Whether to include metadata in the prompt output + use_file_history (bool): Use FileHistory table to return results on already + processed files. Defaults to True Kwargs: log_events_id (str): Session ID of the user, helps establish @@ -598,6 +600,7 @@ def execute_workflow( workflow_execution=workflow_execution, execution_mode=execution_mode, hash_values_of_files=hash_values, + use_file_history=use_file_history, ) except Exception as error: error_message = traceback.format_exc()