Source code for msdss_data_api.routers

from copy import deepcopy
from datetime import datetime
from fastapi import APIRouter, Body, Depends, Query
from typing import Any, Dict, List, Literal, Optional

from .managers import *
from .models import *
from .tools import *

async def _no_current_user():
    return None

[docs]def get_data_router( users_api=None, database=None, route_settings=DEFAULT_DATA_ROUTE_SETTINGS, prefix='/data', tags=['data'], *args, **kwargs): """ Get a data router. Parameters ---------- users_api : :class:`msdss_users_api:msdss_users_api.core.UsersAPI` or None Users API object to enable user authentication for data routes. If ``None``, user authentication will not be used for data routes. database : :class:`msdss_base_database:msdss_base_database.core.Database` or None A :class:`msdss_base_database:msdss_base_database.core.Database` object for managing data. If ``None``, a default database will be used. route_settings : dict Dictionary of settings for the data routes. Each route consists of the following keys: * ``path``: resource path for the route * ``tags``: tags for open api spec * ``_enable`` (bool): Whether this route should be included or not * ``_restricted_tables`` (list(str)): List of table names not accessible by this route * ``_get_user`` (dict or None): Additional arguments passed to the :meth:`msdss_users_api.msdss_users_api.core.UsersAPI.get_current_user` function for the route - only applies if parameter ``users_api`` is not ``None`` and this settings is not ``None``, otherwise no user authentication will be added for this route * ``**kwargs``: Additional arguments passed to :meth:`fastapi:fastapi.FastAPI.get` for the id route The default settings are: .. jupyter-execute:: :hide-code: from msdss_data_api.defaults import DEFAULT_DATA_ROUTE_SETTINGS from pprint import pprint pprint(DEFAULT_DATA_ROUTE_SETTINGS) Any unspecified settings will be replaced by their defaults. prefix : str Prefix path to all routes belonging to this router. tags : list(str) Tags for all routes in this router. *args, **kwargs Additional arguments to accept any extra parameters passed to :class:`fastapi:fastapi.routing.APIRouter`. Returns ------- :class:`fastapi:fastapi.routing.APIRouter` A router object used for data routes. See `FastAPI bigger apps <https://fastapi.tiangolo.com/tutorial/bigger-applications/>`_ Author ------ Richard Wen <rrwen.dev@gmail.com> Example ------- .. jupyter-execute:: from msdss_base_database import Database from msdss_base_api import API from msdss_users_api import UsersAPI from msdss_data_api.routers import get_data_router # Create database object database = Database( driver='postgresql', user='msdss', password='msdss123', host='localhost', port='5432', database='msdss' ) # Create an app app = API() # Add the data router router = get_data_router(database=database) app.add_router(router) # Add the data router with users # CHANGE SECRETS TO STRONG PHRASES app = API() users_api = UsersAPI( 'cookie-secret', 'jwt-secret', 'reset-secret', 'verification-secret', database=database ) router = get_data_router(users_api, database=database) app.add_router(router) # Host app at https://localhost:8000 # Try it at https://localhost:8000/docs # app.start() """ database = database if database else Database() # (get_data_router_defaults) Merge defaults and user params get_user = {} settings = deepcopy(DEFAULT_DATA_ROUTE_SETTINGS) for k in settings: if k in route_settings: settings[k].update(route_settings[k]) # (get_data_router_apply) Apply settings to obtain dependencies get_user = {} get_data_manager = {} enable = {} for k, v in settings.items(): get_user[k] = users_api.get_current_user(**v['_get_user']) if users_api and '_get_user' in v else _no_current_user del v['_get_user'] get_data_manager[k] = create_data_manager_func(database=database, restricted_tables=v.pop('_restricted_tables')) enable[k] = v.pop('_enable') # (get_data_router_metamanager) Create metadata manager func get_metadata_manager = create_metadata_manager_func(database=database) # (get_data_router_create) Create api router for data routes out = APIRouter(prefix=prefix, tags=tags, *args, **kwargs) # (get_data_router_columns) Add columns route to data router if enable['columns']: @out.get(**settings['columns']) async def get_columns( dataset: str = Query(..., description='Name of the dataset'), data_manager = Depends(get_data_manager['query']), user = Depends(get_user['columns']) ): response = data_manager.get_columns(dataset) return response # (get_data_router_create) Add create route to data router if enable['create']: @out.post(**settings['create']) async def create_data( dataset: str = Query(..., description='Name of the dataset to create - the request body is used to upload JSON data under the "data" key in the form of "[{col: val, col2: val2, ...}, {col: val, col2: val2, ...}]", where each key represents a column and its corresponding value. Objects in this list should have the same keys.'), body: DataCreate = Body( ..., example={ 'title': 'Title for Dataset', 'description': 'Description for dataset...', 'source': 'Data source for dataset', 'data': [ {'col_one': 1, 'col_two': 'a'}, {'col_one': 2, 'col_two': 'b'}, {'col_one': 3, 'col_two': 'c'} ] } ), data_manager = Depends(get_data_manager['create']), metadata_manager = Depends(get_metadata_manager), user = Depends(get_user['create']) ): # (get_data_router_create_data) Get data body = body.dict() data = body.pop('data') # (get_data_router_create_metadata) Format metadata metadata = body metadata['dataset'] = dataset metadata['created_at'] = datetime.now() metadata['updated_at'] = datetime.now() # (get_data_router_create_users) Add user operations if available if user: metadata['created_by'] = user.email # (get_data_router_create_run) Create dataset and metadata data_manager.create(name=dataset, data=data) metadata_manager.create(name=dataset, data=metadata) # (get_data_router_delete) Add delete route to data router if enable['delete']: @out.delete(**settings['delete']) async def delete_data( dataset: str = Query(..., description='Name of the dataset to delete data from'), where: Optional[List[str]] = Query(None, description='Where statements to filter data to remove in the form of "column operator value" (e.g. "var < 3") - valid operators are: =, !=, >, >=, >, <, <=, !=, LIKE, ILIKE, NOTLIKE, NOTILIKE, CONTAINS, STARTSWITH, ENDSWITH'), where_boolean: Literal['AND', 'OR'] = Query('AND', alias='where-boolean', description='Either "AND" or "OR" to combine where statements'), delete_all: Optional[bool] = Query(False, description='Whether to remove the entire dataset or not'), data_manager = Depends(get_data_manager['delete']), metadata_manager = Depends(get_metadata_manager), user = Depends(get_user['delete']) ): data_manager.delete(name=dataset, where=where, where_boolean=where_boolean, delete_all=delete_all) metadata_manager.updated_at(name=dataset) if delete_all: metadata_manager.delete(name=dataset) # (get_data_router_id) Add id route to data router if enable['id']: @out.get(**settings['id']) async def get_data_by_id( dataset: str = Query(..., description='Name of the dataset'), id: str = Query(..., description='Identifier value to retrieve a specific document in the dataset'), id_column: Optional[str] = Query('id', description='Identifier column name for the dataset'), data_manager = Depends(get_data_manager['id']), user = Depends(get_user['id']) ): where = [f'{id_column} = {id}'] response = data_manager.get(name=dataset, where=where) return response # (get_data_router_insert) Add insert route to data router if enable['insert']: @out.put(**settings['insert']) async def insert_data( dataset: str = Query(..., description='Name of the dataset to insert - the request body is used to upload JSON data in the form of "[{key: value, key2: value2, ... }, {key: value, key2: value2, ...}]" where each key is a column name'), data: List[Dict[str, Any]] = Body(...), data_manager = Depends(get_data_manager['insert']), metadata_manager = Depends(get_metadata_manager), user = Depends(get_user['insert']) ): data_manager.insert(name=dataset, data=data) metadata_manager.updated_at(dataset) # (get_data_router_metadata) Add metadata route to data router if enable['metadata']: @out.get(**settings['metadata']) async def get_metadata( dataset: str = Query(..., description='Name of the dataset to get metadata for'), metadata_manager = Depends(get_metadata_manager), user = Depends(get_user['metadata']) ): response = metadata_manager.get(name=dataset) return response # (get_data_router_metadata) Add metadata route to data router if enable['metadata_update']: @out.put(**settings['metadata_update']) async def update_metadata( dataset: str = Query(..., description='Name of the dataset to update metadata for. Upload user and creation/update times can not be updated.'), body: MetadataUpdate = Body( ..., example={ 'title': 'New Title to Replace Existing', 'description': 'New description to replace existing...', 'source': 'New data source to replace existing' } ), metadata_manager = Depends(get_metadata_manager), user = Depends(get_user['metadata_update']) ): response = metadata_manager.update(name=dataset, data=body.dict()) return response # (get_data_router_query) Add query route to data router if enable['query']: @out.get(**settings['query']) async def query_data( dataset: str = Query(..., description='Name of the dataset to query'), select: Optional[List[str]] = Query('*', description='columns to include - "*" means all columns and "None" means to omit selection (useful for aggregate queries)'), where: Optional[List[str]] = Query(None, description='Where statements to filter data in the form of "column operator value" (e.g. "var < 3") - valid operators are: =, !=, >, >=, >, <, <=, !=, LIKE, ILIKE, NOTLIKE, NOTILIKE, CONTAINS, STARTSWITH, ENDSWITH'), group_by: Optional[List[str]] = Query(None, alias='group-by', description='column names to group by - should be used with aggregate and aggregate_func parameters'), aggregate: Optional[List[str]] = Query(None, description='column names to aggregate with the same order as the aggregate_func parameter'), aggregate_func: Optional[List[str]] = Query(None, alias='aggregate-func', description='Aggregate functions in the same order as the aggregate parameter'), order_by: Optional[List[str]] = Query(None, alias='order-by', description='column names to order by in the same order as parameter order_by_sort'), order_by_sort: Optional[List[Literal['asc', 'desc']]] = Query(None, alias='order-by-sort', description='Either "asc" for ascending or "desc" for descending order in the same order as parameter order_by'), limit: Optional[int] = Query(None, description='Number of items to return'), offset: Optional[int] = Query(None, description='Number of items to skip'), where_boolean: Literal['AND', 'OR'] = Query('AND', alias='where-boolean', description='Either "AND" or "OR" to combine where statements'), data_manager = Depends(get_data_manager['query']), user = Depends(get_user['query']) ): select = None if select[0] == 'None' else select response = data_manager.get( name=dataset, select=select, where=where, group_by=group_by, aggregate=aggregate, aggregate_func=aggregate_func, order_by=order_by, order_by_sort=order_by_sort, limit=limit, offset=offset, where_boolean=where_boolean ) return response # (get_data_router_rows) Add rows route to data router if enable['rows']: @out.get(**settings['rows']) async def get_rows( dataset: str = Query(..., description='Name of the dataset'), data_manager = Depends(get_data_manager['query']), user = Depends(get_user['rows']) ): response = data_manager.get_rows(dataset) return response # (get_data_router_search) Add search route to data router if enable['search']: @out.get(**settings['search']) async def search_data( select: Optional[List[str]] = Query('*', description='columns to include in search - "*" means all columns and "None" means to omit selection (useful for aggregate queries).'), where: Optional[List[str]] = Query(None, description='Where statements to filter data in the form of "column operator value" (e.g. "dataset = test_data") - valid operators are: =, !=, >, >=, >, <, <=, !=, LIKE, ILIKE, NOTLIKE, NOTILIKE, CONTAINS, STARTSWITH, ENDSWITH'), order_by: Optional[List[str]] = Query(None, alias='order-by', description='column names to order by in the same order as parameter order_by_sort'), order_by_sort: Optional[List[Literal['asc', 'desc']]] = Query(None, alias='order-by-sort', description='Either "asc" for ascending or "desc" for descending order in the same order as parameter order_by'), limit: Optional[int] = Query(None, description='Number of items to return'), offset: Optional[int] = Query(None, description='Number of items to skip'), where_boolean: Literal['AND', 'OR'] = Query('AND', alias='where-boolean', description='Either "AND" or "OR" to combine where statements'), metadata_manager = Depends(get_metadata_manager), user = Depends(get_user['search']) ): select = None if select[0] == 'None' else select response = metadata_manager.search( select=select, where=where, order_by=order_by, order_by_sort=order_by_sort, limit=limit, offset=offset, where_boolean=where_boolean ) return response # (get_data_router_update) Add update route to data router if enable['update']: @out.put(**settings['update']) async def update_data( dataset: str = Query(..., description='Name of the dataset to update - the request body is used to upload JSON data in the form of "{key: value, key2: value2, ... }" where each key is a column name and each value is the new value to use (matching the where parameter)'), body: Dict[str, Any] = Body( ..., example={'col_one': 1, 'col_two': 'a'} ), where: List[str] = Query(..., description='Where statements to filter data to update in the form of "column operator value" (e.g. "var < 3") - valid operators are: =, !=, >, >=, >, <, <=, !=, LIKE, ILIKE, NOTLIKE, NOTILIKE, CONTAINS, STARTSWITH, ENDSWITH'), data_manager = Depends(get_data_manager['update']), metadata_manager = Depends(get_metadata_manager), user = Depends(get_user['update']) ): data_manager.update(name=dataset, data=body, where=where) metadata_manager.updated_at(dataset) return out