Skip to content

Commit

Permalink
Merge branch 'develop' into feature/commodity-spot-prices
Browse files Browse the repository at this point in the history
  • Loading branch information
IgorWounds committed Sep 27, 2024
2 parents 61bf827 + a14e0cf commit 776b7a9
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 0 deletions.
12 changes: 12 additions & 0 deletions examples/openbb-apachebeam/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# OBB Dataflow Sample


This is a sample how to invoke OBB fetchers in an Apache Beam pipeline. (GCP dataflow is build on Apache Beam)
Pre-requisites
- You need to create a Conda environment (or a virtual env) using requirements.txt in this directory
- The script exercise 3 OBB endpoints, all of which require no credentials
- Run the test from the main directory
python -m unittest .\tests\test_obb_pipeline.py

The script will run a pipeline consisting of 3 task which will fetch AAPL quote, profile and news
This is just a very basic sample which can be used as building block to create more complex scenarios
2 changes: 2 additions & 0 deletions examples/openbb-apachebeam/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
apache-beam
openbb-yfinance
Empty file.
48 changes: 48 additions & 0 deletions examples/openbb-apachebeam/tests/test_obb_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import unittest
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.options.pipeline_options import PipelineOptions
import asyncio
import apache_beam as beam
from openbb_yfinance.models.equity_quote import YFinanceEquityQuoteFetcher as quote_fetcher
from openbb_yfinance.models.equity_profile import YFinanceEquityProfileFetcher as profile_fetcher
from openbb_yfinance.models.company_news import YFinanceCompanyNewsFetcher as news_fetcher


class AsyncProcess(beam.DoFn):

def __init__(self, credentials, fetcher):
self.credentials = credentials
self.fetcher = fetcher

async def fetch_data(self, element: str):
params = dict(symbol=element)
data = await self.fetcher.fetch_data(params, self.credentials)
return [d.model_dump(exclude_none=True) for d in data]

def process(self, element: str):
return asyncio.run(self.fetch_data(element))

class MyTestCase(unittest.TestCase):


def test_sample_pipeline(self):
credentials = {} # Running OBB endpoints which do not require credentials
debug_sink = beam.Map(print)
ticker = 'AAPL'

with TestPipeline(options=PipelineOptions()) as p:
quote = (p | 'Start Quote' >> beam.Create([ticker])
| 'Run Quote' >> beam.ParDo(AsyncProcess(credentials, quote_fetcher))
| 'Print quote' >> debug_sink)

profile = (p | 'Start Profile' >> beam.Create([ticker])
| 'Run Profile' >> beam.ParDo(AsyncProcess(credentials, profile_fetcher))
| 'Print profile' >> debug_sink)

news = (p | 'Start News' >> beam.Create([ticker])
| 'Run News' >> beam.ParDo(AsyncProcess(credentials, news_fetcher))
| 'Print nes' >> debug_sink)


if __name__ == '__main__':
unittest.main()

0 comments on commit 776b7a9

Please sign in to comment.