Skip to content

Commit

Permalink
added missed delta changes (#760)
Browse files Browse the repository at this point in the history
  • Loading branch information
muhammad-ali-e authored Sep 30, 2024
1 parent e4a051d commit 05ab71c
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 58 deletions.
16 changes: 13 additions & 3 deletions backend/workflow_manager/endpoint_v2/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class SourceKey:
FILE_EXTENSIONS = "fileExtensions"
PROCESS_SUB_DIRECTORIES = "processSubDirectories"
MAX_FILES = "maxFiles"
ROOT_FOLDER = "rootFolder"
FOLDERS = "folders"


class DestinationKey:
Expand All @@ -57,6 +57,8 @@ class DestinationKey:
PATH = "path"
OUTPUT_FOLDER = "outputFolder"
OVERWRITE_OUTPUT_DOCUMENT = "overwriteOutput"
FILE_PATH = "filePath"
EXECUTION_ID = "executionId"


class OutputJsonKey:
Expand All @@ -71,8 +73,16 @@ class FileType:

class FilePattern:
PDF_DOCUMENTS = ["*.pdf"]
TEXT_DOCUMENTS = ["*.txt"]
IMAGES = ["*.jpg", "*.jpeg", "*.png", "*.gif", "*.bmp"]
TEXT_DOCUMENTS = ["*.txt", "*.doc", "*.docx"]
IMAGES = [
"*.jpg",
"*.jpeg",
"*.png",
"*.gif",
"*.bmp",
"*.tif",
"*.tiff",
]


class SourceConstant:
Expand Down
9 changes: 7 additions & 2 deletions backend/workflow_manager/endpoint_v2/database_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def get_sql_values_for_query(
for column in values:
if cls_name == DBConnectionClass.SNOWFLAKE:
col = column.lower()
type_x = column_types[col]
type_x = column_types.get(col, "")
if type_x == "VARIANT":
values[column] = values[column].replace("'", "\\'")
sql_values[column] = f"parse_json($${values[column]}$$)"
Expand Down Expand Up @@ -162,6 +162,10 @@ def get_column_types(
def get_columns_and_values(
column_mode_str: str,
data: Any,
file_path: str,
execution_id: str,
file_path_name: str = "file_path",
execution_id_name: str = "execution_id",
include_timestamp: bool = False,
include_agent: bool = False,
agent_name: Optional[str] = AgentName.UNSTRACT_DBWRITER.value,
Expand Down Expand Up @@ -214,7 +218,8 @@ def get_columns_and_values(
values[single_column_name] = data
else:
values[single_column_name] = json.dumps(data)

values[file_path_name] = file_path
values[execution_id_name] = execution_id
return values

@staticmethod
Expand Down
61 changes: 35 additions & 26 deletions backend/workflow_manager/endpoint_v2/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import logging
import os
from typing import Any, Optional
from typing import Any, Optional, Union

import fsspec
import magic
Expand Down Expand Up @@ -36,6 +36,9 @@
from workflow_manager.workflow_v2.models.file_history import FileHistory
from workflow_manager.workflow_v2.models.workflow import Workflow

from backend.exceptions import UnstractFSException
from unstract.connectors.exceptions import ConnectorError

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -182,16 +185,17 @@ def handle_output(
self.insert_into_db(input_file_path=input_file_path)
elif connection_type == WorkflowEndpoint.ConnectionType.API:
result = self.get_result(file_history)
metadata = self.get_metadata(file_history)
exec_metadata = self.get_metadata(file_history)
self._handle_api_result(
file_name=file_name, error=error, result=result, metadata=metadata
file_name=file_name, error=error, result=result, metadata=exec_metadata
)
elif connection_type == WorkflowEndpoint.ConnectionType.MANUALREVIEW:
self._push_data_to_queue(file_name, workflow, input_file_path)
if self.execution_service:
self.execution_service.publish_log(
message=f"File '{file_name}' processed successfully"
)

if use_file_history and not file_history:
FileHistoryHelper.create_file_history(
cache_key=file_hash.file_hash,
Expand Down Expand Up @@ -222,31 +226,33 @@ def copy_output_to_output_directory(self) -> None:
destination_volume_path = os.path.join(
self.execution_dir, ToolExecKey.OUTPUT_DIR
)
destination_fs.create_dir_if_not_exists(input_dir=output_directory)
destination_fsspec = destination_fs.get_fsspec_fs()

# Traverse local directory and create the same structure in the
# output_directory
for root, dirs, files in os.walk(destination_volume_path):
for dir_name in dirs:
destination_fsspec.mkdir(
os.path.join(

try:
destination_fs.create_dir_if_not_exists(input_dir=output_directory)

# Traverse local directory and create the same structure in the
# output_directory
for root, dirs, files in os.walk(destination_volume_path):
for dir_name in dirs:
current_dir = os.path.join(
output_directory,
os.path.relpath(root, destination_volume_path),
dir_name,
)
)
destination_fs.create_dir_if_not_exists(input_dir=current_dir)

for file_name in files:
source_path = os.path.join(root, file_name)
destination_path = os.path.join(
output_directory,
os.path.relpath(root, destination_volume_path),
file_name,
)
normalized_path = os.path.normpath(destination_path)
with open(source_path, "rb") as source_file:
destination_fsspec.write_bytes(normalized_path, source_file.read())
for file_name in files:
source_path = os.path.join(root, file_name)
destination_path = os.path.join(
output_directory,
os.path.relpath(root, destination_volume_path),
file_name,
)
destination_fs.upload_file_to_storage(
source_path=source_path, destination_path=destination_path
)
except ConnectorError as e:
raise UnstractFSException(core_err=e) from e

def insert_into_db(self, input_file_path: str) -> None:
"""Insert data into the database."""
Expand Down Expand Up @@ -276,7 +282,10 @@ def insert_into_db(self, input_file_path: str) -> None:
if not data:
return
# Remove metadata from result
data.pop("metadata", None)
# Tool text-extractor returns data in the form of string.
# Don't pop out metadata in this case.
if isinstance(data, dict):
data.pop("metadata", None)
values = DatabaseUtils.get_columns_and_values(
column_mode_str=column_mode,
data=data,
Expand Down Expand Up @@ -401,7 +410,7 @@ def get_result(self, file_history: Optional[FileHistory] = None) -> Optional[Any
output_file = os.path.join(self.execution_dir, WorkflowFileType.INFILE)
metadata: dict[str, Any] = self.get_workflow_metadata()
output_type = self.get_output_type(metadata)
result: Optional[Any] = None
result: Union[dict[str, Any], str] = ""
try:
# TODO: SDK handles validation; consider removing here.
mime = magic.Magic()
Expand Down Expand Up @@ -431,7 +440,7 @@ def get_metadata(
"""Get metadata from the output file.
Returns:
Union[dict[str, Any], str]: Meta data.
Union[dict[str, Any], str]: Metadata.
"""
if file_history and file_history.metadata:
return self.parse_string(file_history.metadata)
Expand Down
14 changes: 12 additions & 2 deletions backend/workflow_manager/endpoint_v2/queue_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Any
from typing import Any, Optional

from utils.constants import Common
from workflow_manager.endpoint_v2.exceptions import UnstractQueueException
Expand Down Expand Up @@ -34,8 +34,18 @@ def get_queue_inst(connector_settings: dict[str, Any] = {}) -> UnstractQueue:
@dataclass
class QueueResult:
file: str
whisper_hash: str
status: QueueResultStatus
result: Any
workflow_id: str
file_content: str
whisper_hash: Optional[str] = None

def to_dict(self) -> Any:
return {
"file": self.file,
"whisper_hash": self.whisper_hash,
"status": self.status,
"result": self.result,
"workflow_id": self.workflow_id,
"file_content": self.file_content,
}
1 change: 1 addition & 0 deletions backend/workflow_manager/workflow_v2/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ class WorkflowMessages:
)
FILE_MARKER_CLEAR_SUCCESS = "File marker cleared successfully."
FILE_MARKER_CLEAR_FAILED = "Failed to clear file marker."
WORKFLOW_EXECUTION_NOT_FOUND = "Workflow execution not found."
52 changes: 52 additions & 0 deletions backend/workflow_manager/workflow_v2/dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,58 @@ def __post_init__(self) -> None:
self.message = self.message or None
self.status_api = self.status_api or None

def remove_result_metadata_keys(self, keys_to_remove: list[str] = []) -> None:
"""Removes specified keys from the 'metadata' dictionary within each
'result' dictionary in the 'result' list attribute of the instance. If
'keys_to_remove' is empty, the 'metadata' key itself is removed.
Args:
keys_to_remove (List[str]): List of keys to be removed from 'metadata'.
"""
if not isinstance(self.result, list):
return

for item in self.result:
if not isinstance(item, dict):
break

result = item.get("result")
if not isinstance(result, dict):
break

self._remove_specific_keys(result=result, keys_to_remove=keys_to_remove)

def _remove_specific_keys(self, result: dict, keys_to_remove: list[str]) -> None:
"""Removes specified keys from the 'metadata' dictionary within the
provided 'result' dictionary. If 'keys_to_remove' is empty, the
'metadata' dictionary is cleared.
Args:
result (dict): The dictionary containing the 'metadata' key.
keys_to_remove (List[str]): List of keys to be removed from 'metadata'.
"""
metadata = result.get("metadata", {})
if keys_to_remove:
for key in keys_to_remove:
metadata.pop(key, None)
else:
metadata = {}
self._update_metadata(result=result, metadata=metadata)

def _update_metadata(self, result: dict, metadata: dict) -> None:
"""Updates the 'metadata' key in the provided 'result' dictionary. If
'metadata' is empty, removes the 'metadata' key from 'result'.
Args:
result (dict): The dictionary to be updated.
metadata (dict): The new metadata dictionary to be set. If empty, 'metadata'
is removed.
"""
if metadata:
result["metadata"] = metadata
else:
result.pop("metadata", None)


@dataclass
class AsyncResultData:
Expand Down
31 changes: 7 additions & 24 deletions backend/workflow_manager/workflow_v2/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,15 @@ def execute(self, run_id: str, file_name: str, single_step: bool = False) -> Non
execution_time = end_time - start_time
logger.info(f"Execution {self.execution_id} stopped")
raise exception
except Exception as exc:
except Exception as exception:
end_time = time.time()
execution_time = end_time - start_time
message = str(exc)[:EXECUTION_ERROR_LENGTH]
message = str(exception)[:EXECUTION_ERROR_LENGTH]
logger.error(
f"Execution {self.execution_id} ran for {execution_time:.4f}s, {exc}"
f"Execution {self.execution_id} ran for {execution_time:.4f}s, "
f" Error {exception}"
)
raise WorkflowExecutionError(message) from exc
raise WorkflowExecutionError(message) from exception

def publish_initial_workflow_logs(self, total_files: int) -> None:
"""Publishes the initial logs for the workflow.
Expand Down Expand Up @@ -320,26 +321,6 @@ def execute_input_file(
execution_type = ExecutionType.COMPLETE
if single_step:
execution_type = ExecutionType.STEP
self.execute_uncached_input(
run_id=run_id, file_name=file_name, single_step=single_step
)
self.publish_log(f"Tool executed successfully for '{file_name}'")
self._handle_execution_type(execution_type)

def execute_uncached_input(
self, run_id: str, file_name: str, single_step: bool
) -> None:
"""Executes the uncached input file.
Args:
run_id (str): UUID for a single run of a file
file_name (str): The name of the file to be executed.
single_step (bool): Flag indicating whether to execute in
single step mode.
Returns:
None
"""
self.publish_log(
"No entries found in cache, executing the tools"
f"running the tool(s) for {file_name}"
Expand All @@ -350,6 +331,8 @@ def execute_uncached_input(
component=LogComponent.SOURCE,
)
self.execute(run_id, file_name, single_step)
self.publish_log(f"Tool executed successfully for '{file_name}'")
self._handle_execution_type(execution_type)

def initiate_tool_execution(
self,
Expand Down
5 changes: 4 additions & 1 deletion backend/workflow_manager/workflow_v2/workflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ def execute_workflow(
scheduled: bool = False,
execution_mode: Optional[tuple[str, str]] = None,
pipeline_id: Optional[str] = None,
use_file_history: bool = True,
**kwargs: dict[str, Any],
) -> Optional[list[Any]]:
"""Asynchronous Execution By celery.
Expand All @@ -557,7 +558,8 @@ def execute_workflow(
WorkflowExecution Mode. Defaults to None.
pipeline_id (Optional[str], optional): Id of pipeline.
Defaults to None.
include_metadata (bool): Whether to include metadata in the prompt output
use_file_history (bool): Use FileHistory table to return results on already
processed files. Defaults to True
Kwargs:
log_events_id (str): Session ID of the user, helps establish
Expand Down Expand Up @@ -598,6 +600,7 @@ def execute_workflow(
workflow_execution=workflow_execution,
execution_mode=execution_mode,
hash_values_of_files=hash_values,
use_file_history=use_file_history,
)
except Exception as error:
error_message = traceback.format_exc()
Expand Down

0 comments on commit 05ab71c

Please sign in to comment.