Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Added es and cerebro services, rudimentary unstract-metrics lib added #473

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion docker/docker-compose-dev-essentials.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,31 @@ services:
labels:
- traefik.enable=false

es:
image: "docker.elastic.co/elasticsearch/elasticsearch:8.14.1"
container_name: unstract-es
restart: unless-stopped
ports:
- "9200:9200"
environment:
node.name: es
discovery.seed_hosts: es
cluster.initial_master_nodes: es
cluster.name: unstract-metrics
bootstrap.memory_lock: "true"
xpack.security.enabled: "false"
ES_JAVA_OPTS: -Xms256m -Xmx256m
volumes:
- es_data:/usr/share/elasticsearch/data
ulimits:
memlock:
soft: -1
hard: -1
labels:
- traefik.enable=false
profiles:
- unstract-metrics

minio:
image: 'minio/minio:latest'
container_name: unstract-minio
Expand Down Expand Up @@ -123,8 +148,8 @@ services:
- ./essentials.env

volumes:
flipt_data:
minio_data:
postgres_data:
qdrant_data:
redis_data:
es_data:
61 changes: 38 additions & 23 deletions docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,6 @@ services:
volumes:
- ./workflow_data:/data

# Celery Flower
celery-flower:
image: unstract/backend:${VERSION}
container_name: unstract-celery-flower
restart: unless-stopped
entrypoint: .venv/bin/celery
command: "-A backend flower --port=5555 --purge_offline_workers=5"
env_file:
- ../backend/.env
depends_on:
- execution-consumer
- redis
labels:
- traefik.enable=false
ports:
- "5555:5555"
environment:
- ENVIRONMENT=development
volumes:
- unstract_data:/data
profiles:
- optional

# Celery Beat
celery-beat:
image: unstract/backend:${VERSION}
Expand Down Expand Up @@ -164,6 +141,44 @@ services:
labels:
- traefik.enable=false

# Web Admin tools to monitor services
# Celery Flower
celery-flower:
image: unstract/backend:${VERSION}
container_name: unstract-celery-flower
restart: unless-stopped
entrypoint: .venv/bin/celery
command: "-A backend flower --port=5555 --purge_offline_workers=5"
env_file:
- ../backend/.env
depends_on:
- execution-consumer
- redis
labels:
- traefik.enable=false
ports:
- "5555:5555"
environment:
- ENVIRONMENT=development
volumes:
- unstract_data:/data
profiles:
- optional

# Cerebro
cerebro:
chandrasekharan-zipstack marked this conversation as resolved.
Show resolved Hide resolved
image: lmenezes/cerebro:0.9.4
container_name: unstract-cerebro
restart: unless-stopped
ports:
- 9201:9000
depends_on:
- es
labels:
- "traefik.enable=false"
profiles:
- unstract-metrics

document-service:
profiles:
- optional
Expand Down
22 changes: 22 additions & 0 deletions unstract/metrics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Unstract Metrics Aggregator

Helps collect metrics from Unstract and its adapters and pushes them to Elasticsearch.

Run the below services with the compose profile `unstract-metrics`.
- elasticsearch
- cerebro (UI for managing es instance)

```shell
VERSION=<version> docker compose -f docker-compose.yaml --profile unstract-metrics up -d
```


## Using Cerebro: An Elasticsearch Web Admin tool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chandrasekharan-zipstack Create a confluence page for Cerebro no need to include here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaseemjaskp Wondering if we should mention it in README so that users can install for admin tasks if they wish? Are there any other recommended tools?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hari-kuriakose kibana maybe?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hari-kuriakose @chandrasekharan-zipstack Elasicsearch official one is Kibana. we can mention that in readme if required. https://github.com/lmenezes/cerebro is not up to date with official ES.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jaseemjaskp if cerebro is not upto date with official ES do you think we should also try kibana?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chandrasekharan-zipstack try out Kibana. but no need into include in docker-compose file


- Run Cerebro with

```shell
VERSION=<version> docker compose -f docker-compose.yaml --profile unstract-metrics up -d cerebro
```

- Connect to `http://localhost:9201/` with the node address of `http://es:9200/`
507 changes: 507 additions & 0 deletions unstract/metrics/pdm.lock

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions unstract/metrics/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
[build-system]
requires = ["pdm-backend"]
build-backend = "pdm.backend"

[project]
name = "unstract-metrics"
version = "0.0.1"
description = "Helps with collection of metrics from Unstract's adapters"
authors = [{ name = "Zipstack Inc.", email = "[email protected]" }]
dependencies = ["elasticsearch-dsl~=8.14.0"]
# <3.11.1 due to resolution error from Unstract SDK
requires-python = ">=3.9,<3.11.1"
readme = "README.md"
classifiers = ["Programming Language :: Python"]

[tool.pdm.dev-dependencies]
test = [
"pytest>=8.2.2",
"pytest-mock>=3.14.0",
"pytest-dotenv>=0.5.2",
"pytest-cov>=5.0.0",
"pytest-md-report>=0.6.2",
]

[tool.pdm.build]
includes = ["src"]
package-dir = "src"

[tool.pytest.ini_options]
env_files = ["tests/.env"]
addopts = "-s"
log_level = "INFO"
log_cli = true

[tool.pdm.scripts]
test.cmd = "pytest -s -v"
test.env_file = "tests/.env"
test.help = "Runs pytests for Unstract Metrics"
16 changes: 16 additions & 0 deletions unstract/metrics/src/unstract/metrics/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import os

from elasticsearch_dsl import connections

from .metrics import MetricsAggregator # noqa: F401

ES_URL = os.getenv("ES_URL")
ES_CLOUD_ID = os.getenv("ES_CLOUD_ID")
ES_API_KEY = os.getenv("ES_API_KEY")
if not ES_URL or (ES_CLOUD_ID and ES_API_KEY):
raise ValueError(
"Either env ES_URL or ES_CLOUD_ID and ES_API_KEY "
"is required to import unstract-metrics"
)

connections.create_connection(hosts=[ES_URL], cloud_id=ES_CLOUD_ID, api_key=ES_API_KEY)
20 changes: 20 additions & 0 deletions unstract/metrics/src/unstract/metrics/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from typing import Optional

from unstract.metrics.models.metrics import Metrics


class MetricsAggregator:

def __init__(self, index_to_clone: Optional[str] = None) -> None:
# TODO: Create index with dynamic templates through a separate command
if not Metrics._index.exists():
Metrics.init(index=index_to_clone)

def add_metrics(self, metrics, index: str = "unstract-metrics-0"):
metrics_doc = Metrics(**metrics)
metrics_doc.save(index=index)

def query_metrics(self, run_id: str, index: str = "unstract-metrics-0"):
s = Metrics.search(index=index).query("match", run_id=run_id)
response = s.execute()
return response.to_dict()
7 changes: 7 additions & 0 deletions unstract/metrics/src/unstract/metrics/models/log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from elasticsearch_dsl import Date, InnerDoc, Keyword, Text


class Log(InnerDoc):
level = Keyword()
time = Date()
message = Text()
162 changes: 162 additions & 0 deletions unstract/metrics/src/unstract/metrics/models/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from elasticsearch_dsl import (
Date,
Document,
Float,
InnerDoc,
Integer,
Keyword,
Nested,
Object,
Text,
)

from .operation import Operation


class LLMOperation(InnerDoc):
prompt = Text()
generated_response = Text()
adapter_metadata = Object(
properties={
"adapter_instance_id": Keyword(),
"type": Keyword(),
"name": Text(),
"model": Text(),
"max_retries": Integer(doc_values=False),
"max_output_tokens": Integer(doc_values=False),
}
)
metrics = Object(
properties={
"input_tokens": Integer(),
"output_tokens": Integer(),
"latency": Float(),
"input_tokens_cost": Float(),
"output_tokens_cost": Float(),
"total_cost": Float(),
}
)


class VectorDBOperation(InnerDoc):
doc_id = Keyword()
retrieved_docs = Keyword(multi=True)
adapter_metadata = Object(
properties={
"adapter_instance_id": Keyword(),
"type": Keyword(),
"name": Text(),
"dimension": Integer(doc_values=False),
}
)
metrics = Object(
properties={"operation": Keyword(), "count": Integer(), "latency": Float()}
)


class EmbeddingOperation(InnerDoc):
adapter_metadata = Object(
properties={
"adapter_instance_id": Keyword(),
"type": Keyword(),
"name": Text(),
"model": Text(),
"embed_batch_size": Integer(),
}
)
metrics = Object(
properties={"tokens": Integer(), "latency": Float(), "cost": Float()}
)


class X2TextOperation(InnerDoc):
adapter_metadata = Object(
properties={
"adapter_instance_id": Keyword(),
"type": Keyword(),
"name": Text(),
"mode": Text(),
}
)
metrics = Object(
properties={
"pages_extracted": Integer(),
"latency": Float(),
}
)


class Metrics(Document):
org_id = Keyword(required=True)
run_id = Keyword(required=True)
start_time = Date(required=True)
end_time = Date(required=True)
owner = Keyword()
agent = Keyword() # TODO: Enum - WF | API | PS
agent_name = Text()
agent_id = Keyword()
status = Keyword() # TODO: Make enum
api_key = Text()
operations = Nested(Operation)

class Index:
name = "unstract-metrics-*"
settings = {"number_of_replicas": 0, "number_of_shards": 1}

def save(
self,
using=None,
index=None,
validate=True,
skip_empty=True,
return_doc_meta=False,
**kwargs,
):
self.meta.id = self.run_id
return super().save(
using, index, validate, skip_empty, return_doc_meta, **kwargs
)

@classmethod
def create_index(cls):
cls.init()
# Add dynamic templates for sub_process specific mappings
cls._index.put_mapping(
body={
"dynamic_templates": [
{
"llm_template": {
"path_match": "operations.sub_process",
"match_mapping_type": "string",
"mapping": {
"type": "nested",
"properties": LLMOperation._doc_type.mapping.properties.to_dict(), # noqa: E501
},
"match": "LLM",
}
},
{
"vectordb_template": {
"path_match": "operations.sub_process",
"match_mapping_type": "string",
"mapping": {
"type": "nested",
"properties": VectorDBOperation._doc_type.mapping.properties.to_dict(), # noqa: E501
},
"match": "VECTORDB",
}
},
{
"embedding_template": {
"path_match": "operations.sub_process",
"match_mapping_type": "string",
"mapping": {
"type": "nested",
"properties": EmbeddingOperation._doc_type.mapping.properties.to_dict(), # noqa: E501
},
"match": "EMBEDDING",
}
},
]
}
)
Loading
Loading