Skip to content
This repository has been archived by the owner on Apr 12, 2024. It is now read-only.

add stream API #42

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bridgeql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"""

__title__ = 'BridgeQL'
__version__ = '0.2.2'
__version__ = '0.3.0'
__license__ = 'BSD 2-Clause'
__copyright__ = 'Copyright © 2023 VMware, Inc. All rights reserved.'

Expand Down
41 changes: 40 additions & 1 deletion bridgeql/django/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@

import json

from django.http import StreamingHttpResponse
from django.utils.decorators import method_decorator
from django.views.decorators.http import require_http_methods
from django.views.decorators.csrf import csrf_exempt
from django.views import View

from bridgeql.django.auth import read_auth_decorator, write_auth_decorator
from bridgeql.django.exceptions import BridgeqlException
from bridgeql.django.helpers import JSONResponse, get_json_request_body
from bridgeql.django.helpers import JSONResponse, JSONEncoder, get_json_request_body
from bridgeql.django.models import ModelBuilder, ModelObject
from bridgeql.django.query import Query


@csrf_exempt
Expand All @@ -33,6 +37,41 @@ def create_django_model(request, db_name, app_label, model_name):
return JSONResponse(res, status=e.status_code)


@method_decorator(require_http_methods(['GET']), name='dispatch')
class ReadView(View):
STREAM_SEPERATOR = ','
def stream_response(self, qset_stream):
try:
for x in qset_stream:
yield "%s%s" % (json.dumps(x, cls=JSONEncoder), self.STREAM_SEPERATOR)
except BridgeqlException as e:
e.log()
yield json.dumps({'message': str(e.detail), 'error': True})

def get(self, request, db_name, app_label, model_name, pk=None):
params = request.GET.get('payload', None)
stream = request.GET.get('stream', False)
try:
if pk:
params = {
'filter': {
'pk': pk
}
}
else:
params = json.loads(params)
mb = ModelBuilder(db_name, app_label, model_name, params)
qset = mb.queryset(stream=stream)
if not stream:
res = {'data': qset, 'message': '', 'success': True}
return JSONResponse(res)
return StreamingHttpResponse(self.stream_response(qset), content_type='application/json')
except BridgeqlException as e:
e.log()
res = {'data': [], 'message': str(e.detail), 'success': False}
return JSONResponse(res, status=e.status_code)


@require_http_methods(['GET'])
@read_auth_decorator
def read_django_model(request, db_name, app_label, model_name, pk=None):
Expand Down
61 changes: 42 additions & 19 deletions bridgeql/django/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,9 @@ def __init__(self, db_name, app_name, model_name, params):
requested_fields.extend(self.params.fields)
requested_fields.extend(self.params.order_by)
self.model_config.validate_fields(set(requested_fields))
self.qset_stream = None

def _apply_opts(self):
def _apply_opts(self, stream=False):
for opt, qset_opt, opt_type in ModelBuilder._QUERYSET_OPTS:
# offset and limit operation will return None
func = getattr(self.qset, qset_opt, None)
Expand Down Expand Up @@ -269,7 +270,9 @@ def _apply_opts(self):
self.qset = self.qset[:self.params.limit]
elif isinstance(value, list):
# handle values case where property is passed in fields
if qset_opt == 'values' and self.query_has_properties():
if qset_opt == 'values' and stream:
self.qset_stream = self.yield_fields()
elif qset_opt == 'values' and self.query_has_properties():
# returns DBRows instance
self.qset = self._add_fields()
else:
Expand All @@ -292,36 +295,56 @@ def query_has_properties(self):
return bool(set(self.params.fields).intersection(
self.model_config.get_properties()))

def _get_model_fields(self, row):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change the method name to _get_model_row_dict to make it more readable.

model_fields = {}
for field in self.params.fields:
attr = row
for ref in field.split('__'):
try:
attr = getattr(attr, ref)
if attr is None:
break
except AttributeError:
raise InvalidModelFieldName(
'Invalid query for field %s in %s.' % (ref, attr))
model_fields[field] = attr
return model_fields

def _add_fields(self):
qset_values = DBRows()
self.qset = self.qset.select_related(*self.params.fk_refs_in_fields)
logger.debug('Request parameters: %s \nQuery: %s\n',
self.params.params, self.qset.query)
for row in self.qset:
model_fields = {}
for field in self.params.fields:
attr = row
for ref in field.split('__'):
try:
attr = getattr(attr, ref)
if attr is None:
break
except AttributeError:
raise InvalidModelFieldName(
'Invalid query for field %s in %s.' % (ref, attr))
model_fields[field] = attr
qset_values.append(model_fields)
return qset_values
try:
for row in self.qset.iterator():
model_fields = self._get_model_fields(row)
qset_values.append(model_fields)
return qset_values
except FieldError as e:
raise InvalidModelFieldName(str(e))

def yield_fields(self):
logger.debug('Request parameters: %s \nQuery: %s\n',
self.params.params, self.qset.query)
self.qset = self.qset.select_related(*self.params.fk_refs_in_fields)
try:
for row in self.qset.iterator():
model_fields = self._get_model_fields(row)
yield model_fields
except FieldError as e:
raise InvalidModelFieldName(str(e))

def queryset(self):
def queryset(self, stream=False):
# construct Q object from dictionary
query = Query(self.params.filter)
if self.params.db_name:
self.qset = self.model_config.model.objects.using(
self.params.db_name).filter(query.Q)
else:
self.qset = self.model_config.model.objects.filter(query.Q)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle the stream part using self.yqset

self._apply_opts()
self._apply_opts(stream=stream)
if self.qset_stream:
return self.qset_stream
if isinstance(self.qset, QuerySet):
logger.debug('Request parameters: %s \nQuery: %s\n',
self.params.params, self.qset.query)
Expand Down
10 changes: 5 additions & 5 deletions bridgeql/django/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@

urlpatterns = [
url(r'^create/(?P<db_name>\w+)/(?P<app_label>\w+)/(?P<model_name>\w+)/$',
bridge.create_django_model, name='bridgeql_django_create'),
bridge.create_django_model, name='bridgeql_django_create'),
url(r'^read/(?P<db_name>\w+)/(?P<app_label>\w+)/(?P<model_name>\w+)/(?P<pk>\w+)/$',
bridge.read_django_model, name='bridgeql_django_read_pk'),
bridge.ReadView.as_view(), name='bridgeql_django_read_pk'),
url(r'^read/(?P<db_name>\w+)/(?P<app_label>\w+)/(?P<model_name>\w+)/$',
bridge.read_django_model, name='bridgeql_django_read'),
bridge.ReadView.as_view(), name='bridgeql_django_read'),
url(r'^update/(?P<db_name>\w+)/(?P<app_label>\w+)/(?P<model_name>\w+)/(?P<pk>\w+)/$',
bridge.update_django_model, name='bridgeql_django_update'),
bridge.update_django_model, name='bridgeql_django_update'),
url(r'^delete/(?P<db_name>\w+)/(?P<app_label>\w+)/(?P<model_name>\w+)/(?P<pk>\w+)/$',
bridge.delete_django_model, name='bridgeql_django_delete'),
bridge.delete_django_model, name='bridgeql_django_delete'),
url(r'^schema/$', generate_bridgeql_schema, name='generate_bridgeql_schema'),
url(r'', index, name='bridgeql_django_index'),
]
77 changes: 77 additions & 0 deletions tests/server/machine/tests/test_api_streamer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
# Copyright © 2023 VMware, Inc. All rights reserved.
# SPDX-License-Identifier: BSD-2-Clause

import json
import os

from django.urls import reverse as url_reverse
from django.test import TestCase, override_settings
from django.test.client import Client
from django.conf import settings

from machine.models import OperatingSystem, Machine


class TestAPIStreamer(TestCase):
fixtures = [os.path.join(settings.BASE_DIR, 'machine_tests.json'), ]

def setUp(self):
self.client = Client()

def getURL(self, **kwargs):
db_name = kwargs.get('db_name', 'default')
app_label = kwargs.get('app_label', 'machine')
# default model name is Machine
model_name = kwargs.get('model_name', 'Machine')
url_name = 'bridgeql_django_read'
url_kwargs = {
'db_name': db_name,
'app_label': app_label,
'model_name': model_name
}
return url_reverse(url_name, kwargs=url_kwargs)

def test_stream_one_field(self):
self.params = {
'filter': {
'name__startswith': 'machine',
},
'fields': ['os__name', 'pk']
}
resp = self.client.get(
self.getURL(), {'payload': json.dumps(self.params), 'stream': True})
if resp.status_code == 200:
streaming_content = []
for chunk in resp.streaming_content:
streaming_content.append(chunk)
self.assertEqual(len(streaming_content), 100)

def test_stream_invalid_model_name(self):
self.params = {
'filter': {
'name__startswith': 'os-name-',
},
'fields': ['name', 'arch'],
}
resp = self.client.get(self.getURL(model_name='InvalidModel'), {
'payload': json.dumps(self.params), 'stream': True})
self.assertEqual(resp.status_code, 400)
self.assertFalse(resp.json()['success'])

def test_stream_invalid_field(self):
self.params = {
'filter': {
'name__startswith': 'machine',
},
'fields': ['os1__name', 'pk']
}
resp = self.client.get(
self.getURL(), {'payload': json.dumps(self.params), 'stream': True})
if resp.status_code == 200:
streaming_content = b""
for chunk in resp.streaming_content:
streaming_content += chunk
streaming_content = streaming_content.decode('utf-8')
err_json = json.loads(streaming_content)
self.assertTrue(err_json['error'])
Loading