From a14e0cf9cc5d312c28cbbd046fe030e1455bcf87 Mon Sep 17 00:00:00 2001 From: mmistroni Date: Fri, 27 Sep 2024 00:20:46 +0100 Subject: [PATCH] Feature/openbb apachebeam (#6679) * adding beam sample * adding senate discl * removingn commits on wrong branch * amended according to latest comments * Delete openbb_platform/providers/government_us/openbb_government_us/utils/senate_constants.py file in another bramnch * Delete openbb_platform/providers/government_us/openbb_government_us/utils/senate_helpers.py file belong to antothe rbranch * Update README.md fixed comment * Update README.md --------- Co-authored-by: Danglewood <85772166+deeleeramone@users.noreply.github.com> Co-authored-by: Theodore Aptekarev --- examples/openbb-apachebeam/README.md | 12 +++++ examples/openbb-apachebeam/requirements.txt | 2 + examples/openbb-apachebeam/tests/__init__.py | 0 .../tests/test_obb_pipeline.py | 48 +++++++++++++++++++ 4 files changed, 62 insertions(+) create mode 100644 examples/openbb-apachebeam/README.md create mode 100644 examples/openbb-apachebeam/requirements.txt create mode 100644 examples/openbb-apachebeam/tests/__init__.py create mode 100644 examples/openbb-apachebeam/tests/test_obb_pipeline.py diff --git a/examples/openbb-apachebeam/README.md b/examples/openbb-apachebeam/README.md new file mode 100644 index 000000000000..3e2d1d4ad21e --- /dev/null +++ b/examples/openbb-apachebeam/README.md @@ -0,0 +1,12 @@ +# OBB Dataflow Sample + + +This is a sample how to invoke OBB fetchers in an Apache Beam pipeline. (GCP dataflow is build on Apache Beam) +Pre-requisites +- You need to create a Conda environment (or a virtual env) using requirements.txt in this directory +- The script exercise 3 OBB endpoints, all of which require no credentials +- Run the test from the main directory + python -m unittest .\tests\test_obb_pipeline.py + +The script will run a pipeline consisting of 3 task which will fetch AAPL quote, profile and news +This is just a very basic sample which can be used as building block to create more complex scenarios diff --git a/examples/openbb-apachebeam/requirements.txt b/examples/openbb-apachebeam/requirements.txt new file mode 100644 index 000000000000..57071e8de92b --- /dev/null +++ b/examples/openbb-apachebeam/requirements.txt @@ -0,0 +1,2 @@ +apache-beam +openbb-yfinance \ No newline at end of file diff --git a/examples/openbb-apachebeam/tests/__init__.py b/examples/openbb-apachebeam/tests/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/examples/openbb-apachebeam/tests/test_obb_pipeline.py b/examples/openbb-apachebeam/tests/test_obb_pipeline.py new file mode 100644 index 000000000000..097df4237037 --- /dev/null +++ b/examples/openbb-apachebeam/tests/test_obb_pipeline.py @@ -0,0 +1,48 @@ +import unittest +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.options.pipeline_options import PipelineOptions +import asyncio +import apache_beam as beam +from openbb_yfinance.models.equity_quote import YFinanceEquityQuoteFetcher as quote_fetcher +from openbb_yfinance.models.equity_profile import YFinanceEquityProfileFetcher as profile_fetcher +from openbb_yfinance.models.company_news import YFinanceCompanyNewsFetcher as news_fetcher + + +class AsyncProcess(beam.DoFn): + + def __init__(self, credentials, fetcher): + self.credentials = credentials + self.fetcher = fetcher + + async def fetch_data(self, element: str): + params = dict(symbol=element) + data = await self.fetcher.fetch_data(params, self.credentials) + return [d.model_dump(exclude_none=True) for d in data] + + def process(self, element: str): + return asyncio.run(self.fetch_data(element)) + +class MyTestCase(unittest.TestCase): + + + def test_sample_pipeline(self): + credentials = {} # Running OBB endpoints which do not require credentials + debug_sink = beam.Map(print) + ticker = 'AAPL' + + with TestPipeline(options=PipelineOptions()) as p: + quote = (p | 'Start Quote' >> beam.Create([ticker]) + | 'Run Quote' >> beam.ParDo(AsyncProcess(credentials, quote_fetcher)) + | 'Print quote' >> debug_sink) + + profile = (p | 'Start Profile' >> beam.Create([ticker]) + | 'Run Profile' >> beam.ParDo(AsyncProcess(credentials, profile_fetcher)) + | 'Print profile' >> debug_sink) + + news = (p | 'Start News' >> beam.Create([ticker]) + | 'Run News' >> beam.ParDo(AsyncProcess(credentials, news_fetcher)) + | 'Print nes' >> debug_sink) + + +if __name__ == '__main__': + unittest.main()