Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Added es and cerebro services, rudimentary unstract-metrics lib added #473

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion docker/docker-compose-dev-essentials.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,31 @@ services:
labels:
- traefik.enable=false

es:
image: "docker.elastic.co/elasticsearch/elasticsearch:8.14.1"
container_name: unstract-es
restart: unless-stopped
ports:
- "9200:9200"
environment:
node.name: es
discovery.seed_hosts: es
cluster.initial_master_nodes: es
cluster.name: unstract-metrics
bootstrap.memory_lock: "true"
xpack.security.enabled: "false"
ES_JAVA_OPTS: -Xms256m -Xmx256m
volumes:
- es_data:/usr/share/elasticsearch/data
ulimits:
memlock:
soft: -1
hard: -1
labels:
- traefik.enable=false
profiles:
- unstract-metrics

minio:
image: 'minio/minio:latest'
container_name: unstract-minio
Expand Down Expand Up @@ -121,8 +146,8 @@ services:
- ./essentials.env

volumes:
flipt_data:
minio_data:
postgres_data:
qdrant_data:
redis_data:
es_data:
9 changes: 9 additions & 0 deletions unstract/metrics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Unstract Metrics Aggregator

Helps collect metrics from Unstract and its adapters and pushes them to Elasticsearch.

Run `elasticsearch` with the compose profile `unstract-metrics`.

```shell
VERSION=<version> docker compose -f docker-compose.yaml --profile unstract-metrics up -d
```
507 changes: 507 additions & 0 deletions unstract/metrics/pdm.lock

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions unstract/metrics/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[build-system]
requires = ["pdm-backend"]
build-backend = "pdm.backend"

[project]
name = "unstract-metrics"
version = "0.0.1"
description = "Helps with collection of metrics from Unstract's adapters"
authors = [{ name = "Zipstack Inc.", email = "[email protected]" }]
dependencies = ["elasticsearch-dsl~=8.14.0"]
# <3.11.1 due to resolution error from Unstract SDK
requires-python = ">=3.9,<3.11.1"
readme = "README.md"
classifiers = ["Programming Language :: Python"]

[tool.pdm.dev-dependencies]
test = [
"pytest>=8.2.2",
"pytest-mock>=3.14.0",
"pytest-dotenv>=0.5.2",
"pytest-cov>=5.0.0",
"pytest-md-report>=0.6.2",
]

[tool.pdm.build]
includes = ["src"]
package-dir = "src"

[tool.pytest.ini_options]
env_files = ["tests/.env"]
addopts = "-s"
log_level = "INFO"
log_cli = true

[tool.pdm.scripts]
test.cmd = "pytest -s -v"
test.env_file = "tests/.env"
test.help = "Runs pytests for Unstract Metrics"
16 changes: 16 additions & 0 deletions unstract/metrics/src/unstract/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import os

from elasticsearch_dsl import connections

from .metrics import MetricsAggregator, capture_metrics # noqa: F401

ES_URL = os.getenv("ES_URL")
ES_CLOUD_ID = os.getenv("ES_CLOUD_ID")
ES_API_KEY = os.getenv("ES_API_KEY")
if not ES_URL or (ES_CLOUD_ID and ES_API_KEY):
raise ValueError(
"Either env ES_URL or ES_CLOUD_ID and ES_API_KEY "
"is required to import unstract-metrics"
)

connections.create_connection(hosts=[ES_URL], cloud_id=ES_CLOUD_ID, api_key=ES_API_KEY)
6 changes: 6 additions & 0 deletions unstract/metrics/src/unstract/metrics/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class MetricsConstants:
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f%z"


class MetricsEnv:
COLLECT_UNSTRACT_METRICS = "COLLECT_UNSTRACT_METRICS"
63 changes: 63 additions & 0 deletions unstract/metrics/src/unstract/metrics/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import functools
import json
import logging
import os
from datetime import datetime
from typing import Optional
from uuid import uuid4

from unstract.metrics.constants import MetricsConstants, MetricsEnv
from unstract.metrics.models.metrics import Metrics

logger = logging.getLogger(__name__)


class MetricsAggregator:

def __init__(self, index_to_clone: Optional[str] = None) -> None:
# TODO: Create index with dynamic templates through a separate command
if not Metrics._index.exists():
Metrics.init(index=index_to_clone)

def add_metrics(self, metrics, index: str = "unstract-metrics-0"):
metrics_doc = Metrics(**metrics)
metrics_doc.save(index=index)

def query_metrics(self, run_id: str, index: str = "unstract-metrics-0"):
s = Metrics.search(index=index).query("match", run_id=run_id)
response = s.execute()
return response.to_dict()


def capture_metrics(index="unstract-metrics-0", **metric_kwargs):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):

if (
os.getenv(MetricsEnv.COLLECT_UNSTRACT_METRICS, "False").lower()
== "false"
):
return func(*args, **kwargs)

logger.debug(
f"Collecting metrics with kwargs: {json.dumps(metric_kwargs, indent=2)}"
)
metrics = Metrics(**metric_kwargs)
if not metrics.run_id:
metrics.run_id = uuid4()
metrics.start_time = datetime.now().strftime(
MetricsConstants.DATETIME_FORMAT
)
try:
result = func(*args, **kwargs)
finally:
metrics.end_time = datetime.now().strftime(
MetricsConstants.DATETIME_FORMAT
)
metrics.save(index=index)
return result

return wrapper

return decorator
7 changes: 7 additions & 0 deletions unstract/metrics/src/unstract/metrics/models/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from elasticsearch_dsl import Date, InnerDoc, Keyword, Text


class Log(InnerDoc):
level = Keyword()
time = Date()
message = Text()
100 changes: 100 additions & 0 deletions unstract/metrics/src/unstract/metrics/models/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import logging

from elasticsearch_dsl import Date, Document, Keyword, Nested, Text

from .operation import (
EmbeddingOperation,
LLMOperation,
Operation,
VectorDBOperation,
X2TextOperation,
)

logger = logging.getLogger(__name__)


class Metrics(Document):
org_id = Keyword(required=True)
run_id = Keyword()
start_time = Date(required=True)
end_time = Date()
owner = Keyword()
agent = Keyword() # TODO: Enum - WF | API | PS
agent_name = Text()
agent_id = Keyword()
status = Keyword() # TODO: Make enum
api_key = Text()
operations = Nested(Operation)

class Index:
name = "unstract-metrics-*"
settings = {"number_of_replicas": 0, "number_of_shards": 1}

def save(
self,
using=None,
index=None,
validate=True,
skip_empty=True,
return_doc_meta=False,
**kwargs,
):
self.meta.id = self.run_id
return super().save(
using, index, validate, skip_empty, return_doc_meta, **kwargs
)

@classmethod
def create_index(cls):
cls.init()
# Add dynamic templates for sub_process specific mappings
cls._index.put_mapping(
body={
"dynamic_templates": [
{
"llm_template": {
"path_match": "operations.sub_process",
"match_mapping_type": "string",
"mapping": {
"type": "nested",
"properties": LLMOperation._doc_type.mapping.properties.to_dict(), # noqa: E501
},
"match": "LLM",
}
},
{
"vectordb_template": {
"path_match": "operations.sub_process",
"match_mapping_type": "string",
"mapping": {
"type": "nested",
"properties": VectorDBOperation._doc_type.mapping.properties.to_dict(), # noqa: E501
},
"match": "VECTORDB",
}
},
{
"embedding_template": {
"path_match": "operations.sub_process",
"match_mapping_type": "string",
"mapping": {
"type": "nested",
"properties": EmbeddingOperation._doc_type.mapping.properties.to_dict(), # noqa: E501
},
"match": "EMBEDDING",
}
},
{
"embedding_template": {
"path_match": "operations.sub_process",
"match_mapping_type": "string",
"mapping": {
"type": "nested",
"properties": X2TextOperation._doc_type.mapping.properties.to_dict(), # noqa: E501
},
"match": "X2TEXT",
}
},
]
}
)
102 changes: 102 additions & 0 deletions unstract/metrics/src/unstract/metrics/models/operation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from elasticsearch_dsl import (
Date,
Float,
InnerDoc,
Integer,
Keyword,
Nested,
Object,
Text,
)

from unstract.metrics.models.log import Log


class Operation(InnerDoc):
operation_id = Keyword()
process = Keyword() # TODO: Specify enum
sub_process = Keyword() # LLM | VECTORDB | EMBEDDING | X2TEXT
context = Text() # REVIEW: Make Keyword() if we wish to search by filename
status = Keyword()
start_time = Date()
end_time = Date()
chunk_size = Integer(doc_values=False)
chunk_overlap = Integer(doc_values=False)
prompt_key_name = Text()
# adapter_metadata = Object()
connector_metadata = Object()
metrics = Object()
logs = Nested(Log)


class LLMOperation(InnerDoc):
prompt = Text()
generated_response = Text()
adapter_metadata = Object(
properties={
"adapter_instance_id": Keyword(),
"type": Keyword(),
"name": Text(),
"model": Text(),
"max_retries": Integer(doc_values=False),
"max_output_tokens": Integer(doc_values=False),
}
)
metrics = Object(
properties={
"input_tokens": Integer(),
"output_tokens": Integer(),
"latency": Float(),
"input_tokens_cost": Float(),
"output_tokens_cost": Float(),
"total_cost": Float(),
}
)


class VectorDBOperation(InnerDoc):
doc_id = Keyword()
retrieved_docs = Keyword(multi=True)
adapter_metadata = Object(
properties={
"adapter_instance_id": Keyword(),
"type": Keyword(),
"name": Text(),
"dimension": Integer(doc_values=False),
}
)
metrics = Object(
properties={"operation": Keyword(), "count": Integer(), "latency": Float()}
)


class EmbeddingOperation(InnerDoc):
adapter_metadata = Object(
properties={
"adapter_instance_id": Keyword(),
"type": Keyword(),
"name": Text(),
"model": Text(),
"embed_batch_size": Integer(),
}
)
metrics = Object(
properties={"tokens": Integer(), "latency": Float(), "cost": Float()}
)


class X2TextOperation(InnerDoc):
adapter_metadata = Object(
properties={
"adapter_instance_id": Keyword(),
"type": Keyword(),
"name": Text(),
"mode": Text(),
}
)
metrics = Object(
properties={
"pages_extracted": Integer(),
"latency": Float(),
}
)
Loading
Loading