from copy import deepcopy
from fastapi import APIRouter, Body, Depends, Query
from typing import Any, Dict, List, Literal, Optional
from .defaults import *
from .managers import *
from .models import *
async def _no_current_user():
return None
[docs]def get_models_router(
bg_manager,
users_api=None,
route_settings=DEFAULT_MODELS_ROUTE_SETTINGS,
prefix='/models',
tags=['models'],
*args, **kwargs
):
"""
Get a models router.
Parameters
----------
bg_manager : :class:`msdss_models_api.managers.ModelsBackgroundManager` or :class:`msdss_models_api.managers.DBModelsBackgroundManager`
Models background manager for managing model operations.
users_api : :class:`msdss_users_api:msdss_users_api.core.UsersAPI` or None
Users API object to enable user authentication for routes.
If ``None``, user authentication will not be used for routes.
route_settings : dict
Dictionary of settings for the data routes. Each route consists of the following keys:
* ``path``: resource path for the route
* ``tags``: tags for open api spec
* ``_enable`` (bool): Whether this route should be included or not
* ``_restricted_tables`` (list(str)): List of table names not accessible by this route
* ``_get_user`` (dict or None): Additional arguments passed to the :meth:`msdss_users_api.msdss_users_api.core.UsersAPI.get_current_user` function for the route - only applies if parameter ``users_api`` is not ``None`` and this settings is not ``None``, otherwise no user authentication will be added for this route
* ``**kwargs``: Additional arguments passed to :meth:`fastapi:fastapi.FastAPI.get` for the id route
The default settings are:
.. jupyter-execute::
:hide-code:
from msdss_models_api.defaults import DEFAULT_MODELS_ROUTE_SETTINGS
from pprint import pprint
pprint(DEFAULT_MODELS_ROUTE_SETTINGS)
Any unspecified settings will be replaced by their defaults.
prefix : str
Prefix path to all routes belonging to this router.
tags : list(str)
Tags for all routes in this router.
*args, **kwargs
Additional arguments to accept any extra parameters passed to :class:`fastapi:fastapi.routing.APIRouter`.
Returns
-------
:class:`fastapi:fastapi.routing.APIRouter`
A router object used for model routes. See `FastAPI bigger apps <https://fastapi.tiangolo.com/tutorial/bigger-applications/>`_
Author
------
Richard Wen <rrwen.dev@gmail.com>
Example
-------
.. jupyter-execute::
import tempfile
from celery import Celery
from msdss_base_database import Database
from msdss_data_api.managers import DataManager
from msdss_models_api.managers import *
from msdss_models_api.defaults import *
from msdss_models_api.models import *
from msdss_models_api.routers import get_models_router
with tempfile.TemporaryDirectory() as folder_path:
# Setup available models
models = [Model]
# Setup database
db = Database()
# Check if the metadata table exists and drop if it does
if db.has_table(DEFAULT_METADATA_TABLE):
db.drop_table(DEFAULT_METADATA_TABLE)
# Create data manager
data_manager = DataManager(database=db)
# Create models manager
models_manager = ModelsManager(models, folder=folder_path)
# Create metadata manager
metadata_manager = ModelsMetadataManager(data_manager, models_manager)
# Create background manager
worker = Celery(broker='redis://localhost:6379/0', backend='redis://localhost:6379/0') # rabbitmq
bg_manager = ModelsBackgroundManager(worker, models_manager, metadata_manager=metadata_manager)
# Get models router
models_router = get_models_router(bg_manager)
"""
# (get_models_router_defaults) Merge defaults and user params
get_user = {}
settings = deepcopy(DEFAULT_MODELS_ROUTE_SETTINGS)
for k in settings:
if k in route_settings:
settings[k].update(route_settings[k])
# (get_models_router_apply) Apply settings to obtain dependencies
get_user = {}
enable = {}
for k, v in settings.items():
get_user[k] = users_api.get_current_user(**v['_get_user']) if users_api and '_get_user' in v else _no_current_user
del v['_get_user']
enable[k] = v.pop('_enable')
# (get_models_router_bg) Create bg manager func
def get_bg_manager():
yield bg_manager
# (get_models_router_api) Create api router for model routes
out = APIRouter(prefix=prefix, tags=tags, *args, **kwargs)
# (get_models_router_cancel) Add cancel route to models router
if enable['cancel']:
@out.post(**settings['cancel'])
async def cancel_model_instance_processing(
name: str = Query(..., description='Name of the model instance'),
bg_manager = Depends(get_bg_manager),
user = Depends(get_user['cancel'])
):
bg_manager.cancel(name)
response = bg_manager.get_status(name)
return response
# (get_models_router_create) Add create route to models router
if enable['create']:
@out.post(**settings['create'])
async def create_model_instance(
name: str = Query(..., description='Name of the model instance'),
model: str = Query(..., description='Type of model to create an instance of'),
body: ModelCreate = Body(
...,
example={
'title': 'Title for Model',
'description': 'Description for model...',
'tags': 'tag1 tag2 tag3',
'source': 'Source for model',
'settings': {}
}),
bg_manager = Depends(get_bg_manager),
user = Depends(get_user['create'])
):
metadata = body.dict()
parameters = {'settings': metadata.pop('settings', {})}
if user:
metadata['created_by'] = user.email
bg_manager.create(name, model, metadata=metadata, parameters=parameters)
# (get_models_router_delete) Add delete route to models router
if enable['delete']:
@out.delete(**settings['delete'])
async def delete_model_instance(
name: str = Query(..., description='Name of the model instance'),
bg_manager = Depends(get_bg_manager),
user = Depends(get_user['delete'])
):
bg_manager.delete(name)
# (get_models_router_status) Add status route to models router
if enable['status']:
@out.get(**settings['status'])
async def model_instance_status(
name: str = Query(..., description='Name of the model instance'),
bg_manager = Depends(get_bg_manager),
user = Depends(get_user['status'])
):
response = bg_manager.get_status(name)
return response
# (get_models_router_input) Add input route to models router
if enable['input']:
@out.post(**settings['input'])
async def model_instance_input(
name: str = Query(..., description='Name of the model instance - the request body is used to upload JSON data under the "data" key in the form of "[{col: val, col2: val2, ...}, {col: val, col2: val2, ...}]", where each key represents a column and its corresponding value. Objects in this list should have the same keys. The "settings" key is used for specific model input settings, where each key is a setting name.'),
data: List[Dict[str, Any]] = Body(...),
parameters: Dict[str, Any] = {},
bg_manager = Depends(get_bg_manager),
user = Depends(get_user['input'])
):
bg_manager.input(name, data, parameters)
response = bg_manager.get_status(name)
return response
# (get_models_router_input_db) Add input db route to models router
if enable['input_db']:
@out.post(**settings['input_db'])
async def model_instance_input_with_dataset(
name: str = Query(..., description='Name of the model instance - the "settings" key in the response body is used for specific model input settings, where each key is a setting name.'),
dataset: str = Query(..., description='Dataset name to use for initializing the model instance'),
parameters: Dict[str, Any] = {},
select: Optional[List[str]] = Query('*', description='Columns to include - "*" means all columns and "None" means to omit selection (useful for aggregate queries)'),
where: Optional[List[str]] = Query(None, description='Where statements to filter data in the form of "column operator value" (e.g. "var < 3") - valid operators are: =, !=, >, >=, >, <, <=, !=, LIKE, ILIKE, NOTLIKE, NOTILIKE, CONTAINS, STARTSWITH, ENDSWITH'),
group_by: Optional[List[str]] = Query(None, alias='group-by', description='column names to group by - should be used with aggregate and aggregate_func parameters'),
aggregate: Optional[List[str]] = Query(None, description='Column names to aggregate with the same order as the aggregate_func parameter'),
aggregate_func: Optional[List[str]] = Query(None, alias='aggregate-func', description='Aggregate functions in the same order as the aggregate parameter'),
order_by: Optional[List[str]] = Query(None, alias='order-by', description='Column names to order by in the same order as parameter order_by_sort'),
order_by_sort: Optional[List[Literal['asc', 'desc']]] = Query(None, alias='order-by-sort', description='Either "asc" for ascending or "desc" for descending order in the same order as parameter order_by'),
limit: Optional[int] = Query(None, description='Number of items to return'),
where_boolean: Literal['AND', 'OR'] = Query('AND', alias='where-boolean', description='Either "AND" or "OR" to combine where statements'),
bg_manager = Depends(get_bg_manager),
user = Depends(get_user['input_db'])
):
bg_manager.input_db(
name,
dataset,
settings=settings,
select=select,
where=where,
group_by=group_by,
aggregate=aggregate,
aggregate_func=aggregate_func,
order_by=order_by,
order_by_sort=order_by_sort,
limit=limit,
where_boolean=where_boolean)
response = bg_manager.get_status(name)
return response
# (get_data_router_metadata) Add metadata route to data router
if enable['metadata']:
@out.get(**settings['metadata'])
async def get_model_instance_metadata(
name: str = Query(..., description='Name of the model to get metadata for'),
bg_manager = Depends(get_bg_manager),
user = Depends(get_user['metadata'])
):
response = bg_manager.metadata_manager.get(name=name)
return response
# (get_models_router_metadata) Add metadata route to models router
if enable['metadata_update']:
@out.put(**settings['metadata_update'])
async def update_model_instance_metadata(
name: str = Query(..., description='Name of the model to update metadata for. Upload user and creation/update times can not be updated.'),
body: ModelMetadataUpdate = Body(
...,
example={
'title': 'New Title to Replace Existing',
'description': 'New description to replace existing...',
'source': 'New source to replace existing...',
'tags': 'newtag1 newtag2 newtag3'
}
),
bg_manager = Depends(get_bg_manager),
user = Depends(get_user['metadata_update'])
):
bg_manager.metadata_manager.update(name=name, data=body.dict())
# (get_models_router_output) Add output route to models router
if enable['output']:
@out.post(**settings['output'])
async def model_instance_output(
name: str = Query(..., description='Name of the model instance - the request body is used to upload JSON data under the "data" key in the form of "[{col: val, col2: val2, ...}, {col: val, col2: val2, ...}]", where each key represents a column and its corresponding value. Objects in this list should have the same keys. The "settings" key is used for specific model output settings, where each key is a setting name.'),
data: List[Dict[str, Any]] = Body(...),
parameters: Dict[str, Any] = {},
bg_manager = Depends(get_bg_manager),
user = Depends(get_user['output'])
):
response = bg_manager.output(name, data, parameters).to_dict('records')
return response
# (get_data_router_search) Add search route to data router
if enable['search']:
@out.get(**settings['search'])
async def search_models_and_instances(
select: Optional[List[str]] = Query('*', description='Columns to include in search - "*" means all columns and "None" means to omit selection (useful for aggregate queries).'),
where: Optional[List[str]] = Query(None, description='Where statements to filter data in the form of "column operator value" (e.g. "dataset = test_data") - valid operators are: =, !=, >, >=, >, <, <=, !=, LIKE, ILIKE, NOTLIKE, NOTILIKE, CONTAINS, STARTSWITH, ENDSWITH'),
order_by: Optional[List[str]] = Query(None, alias='order-by', description='column names to order by in the same order as parameter order_by_sort'),
order_by_sort: Optional[List[Literal['asc', 'desc']]] = Query(None, alias='order-by-sort', description='Either "asc" for ascending or "desc" for descending order in the same order as parameter order_by'),
limit: Optional[int] = Query(None, description='Number of items to return'),
offset: Optional[int] = Query(None, description='Number of items to skip'),
where_boolean: Literal['AND', 'OR'] = Query('AND', alias='where-boolean', description='Either "AND" or "OR" to combine where statements'),
what: str = Query('', description='What to search for (default is "instances") - one of: "instances", "models"'),
bg_manager = Depends(get_bg_manager),
user = Depends(get_user['search'])
):
select = None if select[0] == 'None' else select
if what.lower() == 'models':
response = bg_manager.metadata_manager.search_base_models(
select=select,
where=where,
order_by=order_by,
order_by_sort=order_by_sort,
limit=limit,
offset=offset,
where_boolean=where_boolean
)
else:
response = bg_manager.metadata_manager.search(
select=select,
where=where,
order_by=order_by,
order_by_sort=order_by_sort,
limit=limit,
offset=offset,
where_boolean=where_boolean
)
return response
# (get_models_router_update) Add update route to models router
if enable['update']:
@out.put(**settings['update'])
async def update_model_instance(
name: str = Query(..., description='Name of the model instance - the request body is used to upload JSON data under the "data" key in the form of "[{col: val, col2: val2, ...}, {col: val, col2: val2, ...}]", where each key represents a column and its corresponding value. Objects in this list should have the same keys. The "settings" key is used for specific model update settings, where each key is a setting name.'),
data: List[Dict[str, Any]] = Body(...),
parameters: Dict[str, Any] = {},
bg_manager = Depends(get_bg_manager),
user = Depends(get_user['update'])
):
bg_manager.update(name, data, parameters)
response = bg_manager.get_status(name)
return response
# (get_models_router_update_db) Add update db route to models router
if enable['update_db']:
@out.put(**settings['update_db'])
async def update_model_instance_with_dataset(
name: str = Query(..., description='Name of the model instance - the "settings" key in the response body is used for specific model update settings, where each key is a setting name.'),
dataset: str = Query(..., description='Dataset name to use for updating the model instance'),
select: Optional[List[str]] = Query('*', description='Columns to include - "*" means all columns and "None" means to omit selection (useful for aggregate queries)'),
where: Optional[List[str]] = Query(None, description='Where statements to filter data in the form of "column operator value" (e.g. "var < 3") - valid operators are: =, !=, >, >=, >, <, <=, !=, LIKE, ILIKE, NOTLIKE, NOTILIKE, CONTAINS, STARTSWITH, ENDSWITH'),
group_by: Optional[List[str]] = Query(None, alias='group-by', description='Column names to group by - should be used with aggregate and aggregate_func parameters'),
aggregate: Optional[List[str]] = Query(None, description='Column names to aggregate with the same order as the aggregate_func parameter'),
aggregate_func: Optional[List[str]] = Query(None, alias='aggregate-func', description='Aggregate functions in the same order as the aggregate parameter'),
order_by: Optional[List[str]] = Query(None, alias='order-by', description='cClumn names to order by in the same order as parameter order_by_sort'),
order_by_sort: Optional[List[Literal['asc', 'desc']]] = Query(None, alias='order-by-sort', description='Either "asc" for ascending or "desc" for descending order in the same order as parameter order_by'),
limit: Optional[int] = Query(None, description='Number of items to return'),
offset: Optional[int] = Query(None, description='Number of items to skip'),
where_boolean: Literal['AND', 'OR'] = Query('AND', alias='where-boolean', description='Either "AND" or "OR" to combine where statements'),
bg_manager = Depends(get_bg_manager),
user = Depends(get_user['update_db'])
):
bg_manager.update_db(
name,
dataset,
settings=settings,
select=select,
where=where,
group_by=group_by,
aggregate=aggregate,
aggregate_func=aggregate_func,
order_by=order_by,
order_by_sort=order_by_sort,
limit=limit,
offset=offset,
where_boolean=where_boolean)
response = bg_manager.get_status(name)
return response
return out