import pandas
import sqlalchemy
from .defaults import *
from .env import *
from .tools import *
[docs]class Database:
"""
Class for MSDSS database management.
Parameters
----------
driver : str
The driver name of the database connection, which are commonly ``postgresql``, ``sqlite``, ``mysql``, ``oracle`` or ``mssql`` (see `SQLAlchemy supported databases <https://docs.sqlalchemy.org/en/14/core/engines.html#supported-databases>`_).
user : str
User name for the connection.
password : str
Password for the user.
host : str
Host address of the connection.
port : str
Port number of the connection.
database : str
Database name of the connection.
load_env : bool
Whether to load the environmental variables using parameter ``env`` or not. The environment will only be loaded if the ``env_file`` exists.
env : :class:`msdss_base_database.env.DatabaseDotEnv` or bool
An object to set environment variables related to database configuration.
These environment variables will overwrite the parameters above if they exist.
By default, the parameters above are assigned to each of the environment variables below:
.. jupyter-execute::
:hide-code:
from msdss_base_database.defaults import DEFAULT_DOTENV_KWARGS
defaults = {k:v for k, v in DEFAULT_DOTENV_KWARGS.items() if k not in ['defaults', 'env_file', 'key_path']}
print('<parameter> = <environment variable>\\n')
for k, v in defaults.items():
print(k + ' = ' + v)
*args, **kwargs
Additional arguments passed to :func:`sqlalchemy:sqlalchemy.create_engine`.
Attributes
----------
_connection : :class:`sqlalchemy.engine.base.Engine`
The database engine object from `sqlalchemy <https://www.sqlalchemy.org/>`_.
_inspector: :class:`sqlalchemy.engine.reflection.Inspector`
The database inspector object from `sqlalchemy <https://www.sqlalchemy.org/>`_.
_metadata : :class:`sqlalchemy.schema.MetaData`
The metadata object from `sqlalchemy <https://www.sqlalchemy.org/>`_.
Author
------
Richard Wen <rrwen.dev@gmail.com>
Example
-------
.. jupyter-execute::
from msdss_base_database import Database
# Initiate a connection with the default test user and database
db = Database()
# Check if the table exists and drop if it does
if db.has_table("test_table"):
db.drop_table("test_table")
# Create sample table
columns = [
dict(name='id', type_='Integer', primary_key=True),
('column_one', 'String'),
('column_two', 'Integer')
]
db.create_table('test_table', columns)
# Write sample data
data = {
'id': [1, 2, 3],
'column_one': ['a', 'b', 'c'],
'column_two': [2, 4, 6]
}
db.insert('test_table', data)
# Read the table to a pandas dataframe
df = db.select('test_table')
# Skip a row
df_skip = db.select('test_table', offset=1)
# Insert values into the table
new = {
'id': [5, 6, 7],
'column_one': ['e', 'f', 'g'],
'column_two': [10, 12, 14]
}
db.insert('test_table', new)
df_insert = db.select('test_table')
# Delete rows from the table
db.delete(
'test_table',
where=('id', '=', 5)
)
df_delete = db.select('test_table')
# Update values in table
db.update(
'test_table',
where=('id', '>', 3),
values={'column_one': 'AA'})
df_update = db.select('test_table')
# Display results
print('df:\\n')
print(df)
print('\\ndf_skip:\\n')
print(df_skip)
print('\\ndf_insert:\\n')
print(df_insert)
print('\\ndf_delete:\\n')
print(df_delete)
print('\\ndf_update:\\n')
print(df_update)
"""
def __init__(
self,
driver=DEFAULT_DOTENV_KWARGS['defaults']['driver'],
user=DEFAULT_DOTENV_KWARGS['defaults']['user'],
password=DEFAULT_DOTENV_KWARGS['defaults']['password'],
host=DEFAULT_DOTENV_KWARGS['defaults']['host'],
port=DEFAULT_DOTENV_KWARGS['defaults']['port'],
database=DEFAULT_DOTENV_KWARGS['defaults']['database'],
load_env=True,
env=DatabaseDotEnv(),
*args, **kwargs):
# (Database_connect_str) Build connection str from parameters
connection_str = get_database_url(
driver=driver,
user=user,
password=password,
host=host,
port=port,
database=database,
load_env=load_env,
env=env
)
# (Database_attr) Create attributes for database obj
self._connection = sqlalchemy.create_engine(connection_str, *args, **kwargs)
self._inspector = sqlalchemy.inspect(self._connection)
self._metadata = sqlalchemy.MetaData(bind=self._connection)
[docs] def _build_query(
self,
table,
select='*',
where=None,
group_by=None,
aggregate=None,
aggregate_func='count',
order_by=None,
order_by_sort='asc',
limit=None,
offset=None,
where_boolean='AND',
update=False,
delete=False,
values=None,
*args, **kwargs):
"""
Get a SQL select statement using sqlalchemy functions.
Parameters
----------
table : str
Name of the database table.
select : str or list(str) or list(:class:`sqlalchemy:sqlalchemy.schema.Column`) or None
List of column names or a single column name to filter or select from the table.
* If ``None``, columns will not be added to the select statement.
* If ``'*'``, then all columns will be selected
where : list of list or list of tuple or None
list of where statements the form of ``['column_name', 'operator', value]`` to further filter individual values or rows.
* Operators are one of:
.. jupyter-execute::
:hide-code:
from msdss_base_database.defaults import DEFAULT_SUPPORTED_OPERATORS
for operator in DEFAULT_SUPPORTED_OPERATORS:
print(operator)
* Values can be any single value such as ``int`` or ``str``
* Examples: ``['column_two', '>', 2]``, ``['column_one', 'LIKE', 'a']``, ``[['column_two', '>', 2], ['column_one', 'LIKE', 'a']]``
group_by : str or list(str) or None
Single or list of column names to group by. This should be used with ``aggregate`` and ``aggregate_func``.
aggregate : str or list(str) or None
Single or list of column names to aggregate using the ``aggregate_func``. This should be used with ``aggregate_func``.
aggregate_func : str or list(str)
Function name (such as 'count' or 'sum') from :class:`sqlalchemy:sqlalchemy.sql.functions.Function` for aggregating records from each ``aggregate`` column.
If a list of str, then it must have the same number of elements as ``aggregate`` or else only the shortest length list will be used.
order_by : str or list(str) or None
Single or list of column names to order or sort by.
order_by_sort : str or list(str)
Sort the records in increasing or decreasing order by each column in ``order_by``, where the value can be one of 'asc' for ascending or 'desc' for descending'.
If a list of str, then it must have the same number of elements as ``order_by`` or else only the shortest length list will be used.
limit : int or None
Integer number to limit the number of rows returned.
offset : int or None
Number of rows to skip for the query.
where_boolean : str
One of ``AND`` or ``OR`` to combine ``where`` statements with. Defaults to ``AND`` if not one of ``AND`` or ``OR``.
update : bool
Whether to update rows from the table matching the query or not. Overrides the ``select`` parameter.
delete : bool
Whether to delete rows from the table matching the query or not. Overrides the ``select`` and ``update`` parameters.
values : dict
A dictionary of values to use for update if the ``update`` parameter is ``True`` and not overridden.
*args, **kwargs
Additional arguments to accept any extra parameters passed through.
Returns
-------
:class:`sqlalchemy:sqlalchemy.sql.expression.Select`
sqlalchemy object that represents a table in the database.
Author
------
Richard Wen <rrwen.dev@gmail.com>
Example
-------
.. jupyter-execute::
from msdss_base_database.core import Database
# Setup database
db = Database()
# Check if the table exists and drop if it does
if db.has_table("test_table"):
db.drop_table("test_table")
# Create sample table
columns = [
dict(name='id', type_='Integer', primary_key=True),
('column_one', 'String'),
('column_two', 'Integer')
]
db.create_table('test_table', columns)
# Select columns and limit to 6 rows
sql_limit = db._build_query(
'test_table',
select = ['column_one', 'column_two'],
limit=6
)
# Select columns with where statement
sql_where = db._build_query(
'test_table',
select='column_one',
where=[('column_two', '<', 3), ('column_one', '=', 'b')],
where_boolean='AND'
)
# Select columns and order each column
sql_order = db._build_query(
'test_table',
select=['column_one', 'column_two'],
order_by=['column_one', 'column_two'],
order_by_sort=['asc', 'desc']
)
# Select columns, group, and aggregate data
sql_agg = db._build_query(
'test_table',
select='column_one',
group_by='column_one',
aggregate=['column_two', 'column_one'],
aggregate_func=['sum', 'count']
)
# Update rows
sql_update = db._build_query(
'test_table',
where=[('id', '=', 1)],
values=dict(column_one='A'),
update=True
)
# Delete rows
sql_delete = db._build_query(
'test_table',
where=[('column_two', '<', 2), ('column_one', '=', 'a')],
where_boolean='OR',
delete=True
)
# Display gen statements
print('sql_limit:\\n\\n' + str(sql_limit))
print('\\nsql_where:\\n\\n' + str(sql_where))
print('\\nsql_order:\\n\\n' + str(sql_order))
print('\\nsql_agg:\\n\\n' + str(sql_agg))
print('\\nsql_update:\\n\\n' + str(sql_update))
print('\\nsql_delete:\\n\\n' + str(sql_delete))
"""
# (Database_build_query_var_list) Format single variables into lists
select = [select] if isinstance(select, str) else select
group_by = [group_by] if isinstance(group_by, str) else group_by
aggregate = [aggregate] if isinstance(aggregate, str) else aggregate
order_by = [order_by] if isinstance(order_by, str) else order_by
# (Database_build_query_table) Get the table object
target = self._get_table(table)
# (Database_build_query_select) Gather columns to select
if select is None: # no cols
select_columns = []
elif select[0] == '*': # all cols
select_columns = [target]
else: # specified cols
select_columns = [target.c[c] for c in select]
# (Database_build_query_aggregate) Gather aggregation columns to select
if aggregate is not None:
aggregate_columns = [target.c[c] for c in aggregate]
if isinstance(aggregate_func, list):
aggregate_labels = [a + '_' + f for a, f in zip(aggregate, aggregate_func)]
aggregate_columns = [getattr(sqlalchemy.func, f.lower())(c).label(l) for c, f, l in zip(aggregate_columns, aggregate_func, aggregate_labels)]
else:
aggregate_labels = [a + '_' + aggregate_func for a in aggregate]
aggregate_columns = [getattr(sqlalchemy.func, aggregate_func)(c).label(l) for c, l in zip(aggregate_columns, aggregate_labels)]
select_columns = select_columns + aggregate_columns
# (Database_build_query_operation) Add select, update, or delete statement
if delete:
out = target.delete()
elif update:
out = target.update()
else:
out = sqlalchemy.select(select_columns)
# (Database_build_query_where) Add where statement
if where is not None:
# (Database_build_query_where_format) Get where boolean operator and ensure two-dimensional list
where = [where] if not any(isinstance(w, list) or isinstance(w, tuple) for w in where) else where
where_boolean = sqlalchemy.or_ if where_boolean.lower() == 'or' else sqlalchemy.and_
# (Database_build_query_where_convert) Convert list to where clauses
where_clauses = []
for clause_list in where:
# (Database_build_query_where_convert_vars) Get clause parts from list
clause_col = clause_list[0]
clause_op = clause_list[1]
clause_val = clause_list[2]
# (Database_build_query_where_convert_op) Convert to clause based on operator
if clause_op in ('=', '=='):
clause = target.c[clause_col] == clause_val
elif clause_op in ('!=', '!=='):
clause = target.c[clause_col] != clause_val
elif clause_op == '>':
clause = target.c[clause_col] > clause_val
elif clause_op == '>=':
clause = target.c[clause_col] >= clause_val
elif clause_op == '<':
clause = target.c[clause_col] < clause_val
elif clause_op == '<=':
clause = target.c[clause_col] <= clause_val
elif clause_op.lower() == 'like':
clause = target.c[clause_col].like(clause_val)
elif clause_op.lower() == 'notlike':
clause = target.c[clause_col].notlike(clause_val)
elif clause_op.lower() == 'ilike':
clause = target.c[clause_col].ilike(clause_val)
elif clause_op.lower() == 'notilike':
clause = target.c[clause_col].notilike(clause_val)
elif clause_op.lower() == 'contains':
clause = target.c[clause_col].contains(clause_val)
elif clause_op.lower() == 'startswith':
clause = target.c[clause_col].startswith(clause_val)
elif clause_op.lower() == 'endswith':
clause = target.c[clause_col].endswith(clause_val)
else:
raise ValueError(clause_op + ' is not supported')
where_clauses.append(clause)
# (Database_build_query_where_add) Add where clauses to select query
out = out.where(where_boolean(*where_clauses))
# (Database_build_query_group) Add group by statement
if group_by is not None:
group_by_columns = [target.c[c] for c in group_by]
out = out.group_by(*group_by_columns)
# (Database_build_query_order) Add order by statement
if order_by is not None:
order_by_columns = [target.c[c] for c in order_by]
if isinstance(order_by_sort, list):
order_by_columns = [getattr(c, s.lower())() for c, s in zip(order_by_columns, order_by_sort)]
else:
order_by_columns = [getattr(c, order_by_sort)() for c in order_by_columns]
out = out.order_by(*order_by_columns)
# (Database_build_query_offset) Add offset statement
if offset is not None:
out = out.offset(offset)
# (Database_build_query_limit) Add limit statement
if limit is not None:
out = out.limit(limit)
# (Database_build_query_values) Add values statement
if values is not None and update:
out = out.values(**values)
# (Database_build_query_return) Return the sqlalchemy query
return out
[docs] def _execute_query(self, sql, *args, **kwargs):
"""
Executes a query statement.
Parameters
----------
sql : str
SQL str representing the query statement to execute.
*args, **kwargs
Additional parameters passed to :func:`sqlalchemy:sqlalchemy.engine.Connection.execute`.
Returns
-------
:class:`sqlalchemy:sqlalchemy.engine.CursorResult`
Cursor result from query.
Author
------
Richard Wen <rrwen.dev@gmail.com>
Example
-------
.. jupyter-execute::
from msdss_base_database.core import Database
db = Database()
# Check if the table exists and drop if it does
if db.has_table("test_table"):
db.drop_table("test_table")
# Create sample table
columns = [
dict(name='id', type_='Integer', primary_key=True),
('column_one', 'String'),
('column_two', 'Integer')
]
db.create_table('test_table', columns)
# Write sample data
data = {
'id': [1, 2, 3],
'column_one': ['a', 'b', 'c'],
'column_two': [2, 4, 6]
}
db.insert('test_table', data)
# Get data from query
cursor = db._execute_query('SELECT * FROM test_table LIMIT 5;')
cursor = db._execute_query('SELECT column_one, column_two FROM test_table WHERE column_two > 3;')
"""
with self._connection.connect() as connection:
out = connection.execute(sql, *args, **kwargs)
return out
[docs] def _get_table(self, table, *args, **kwargs):
"""
Get a table object from the database.
Parameters
----------
table : str
Name of the table to remove.
*args, **kwargs
Additional arguments passed to :class:`sqlalchemy.schema.Table`.
Return
------
:class:`sqlalchemy.schema.Table`
A ``Table`` object from ``sqlalchemy``.
Author
------
Richard Wen <rrwen.dev@gmail.com>
Example
-------
.. jupyter-execute::
from msdss_base_database.core import Database
db = Database()
# Check if the table exists and drop if it does
if db.has_table("test_table"):
db.drop_table("test_table")
# Create sample table
columns = [
dict(name='id', type_='Integer', primary_key=True),
('column_one', 'String'),
('column_two', 'Integer')
]
db.create_table('test_table', columns)
# Get the table object
tb = db._get_table('test_table')
print(str(tb))
"""
out = sqlalchemy.Table(table, self._metadata, autoload_with=self._connection, *args, **kwargs)
return out
[docs] def _list_to_columns(self, clist):
"""
Converts a list of dict or list to ``sqlalchemy`` columns.
Parameters
----------
clist : list(dict) or list(list)
List of dict (kwargs) or lists (positional args) that are passed to :class:`sqlalchemy.schema.Column`. Data types can also be specified as str.
* Example list(dict): ``[dict(name='id', type_='Integer', autoincrement=True, primary_key=True), dict(name='col', type='String')]``
* Example list(list): ``[('id', 'Integer'), ('col', 'String')]``
*args, **kwargs
Additional arguments passed to :class:`sqlalchemy.schema.Table`.
Return
------
list(:class:`sqlalchemy.schema.Column`)
A list of ``Column`` objects from ``sqlalchemy``.
Author
------
Richard Wen <rrwen.dev@gmail.com>
Example
-------
.. jupyter-execute::
from msdss_base_database.core import Database
db = Database()
# Create column definitions
clist = [
dict(name='id', type_='Integer', primary_key=True),
dict(name='column_one', type_='String'),
dict(name='column_two', type_='Integer')
]
# Convert to sqlalchemy column objects
columns = db._list_to_columns(clist)
print(str(columns))
"""
# (Database_list_to_columns_convert) Convert data types from str
clist = [list(c) if isinstance(c, tuple) else c for c in clist] # conv tuples to list
for i, c in enumerate(clist):
# (Database_list_to_columns_convert_type) Get data type depending on list or dict
if isinstance(c, dict):
cidx = 'type_'
elif isinstance(c, list):
cidx = 1
ctype = c[cidx]
# (Databse_list_columns_convert_str) Convert to column if str
if isinstance(ctype, str):
clist[i][cidx] = getattr(sqlalchemy, ctype)
# (Database_list_to_columns_return) Return the list of sqlalchemy columns
out = [sqlalchemy.Column(*c) if isinstance(c, list) else sqlalchemy.Column(**c) for c in clist]
return out
[docs] def _write_data(self, table, data, schema=None, if_exists='append', index=False, *args, **kwargs):
"""
Write data to the database.
Parameters
----------
table : str
Name of the table to write to.
data : dict or list or :class:`pandas:pandas.DataFrame`
Dataframe with the data to write to the database. If ``dict`` or ``list`` see :class:`pandas:pandas.DataFrame`.
schema : str
Name of the schema to for the table.
if_exists : 'fail' or 'replace' or 'append'
String indicating whether to throw an error (fail), replace the data, or append the data to the table if it already exists.
index : bool
Set to True to include the row indices as a column and False to omit them.
*args, **kwargs
Additional arguments passed to :class:`pandas:pandas.DataFrame` if parameter ``data`` is ``dict`` or ``list``.
Author
------
Richard Wen <rrwen.dev@gmail.com>
Example
-------
.. jupyter-execute::
from msdss_base_database import Database
# Setup database
db = Database()
# Check if the table exists and drop if it does
if db.has_table("test_table"):
db.drop_table("test_table")
# Write data to table
data = {
'id': [1, 2, 3],
'column_one': ['a', 'b', 'c'],
'column_two': [2, 4, 6]
}
db._write_data('test_table', data, if_exists = 'replace')
"""
data = pandas.DataFrame(data, *args, **kwargs) if not isinstance(data, pandas.DataFrame) else data
data.to_sql(table, con = self._connection, schema = schema, if_exists = if_exists, index = index)
[docs] def columns(self, table):
"""
Get number of columns for a table.
Parameters
----------
table : str
Name of the table to get columns for.
Author
------
Richard Wen <rrwen.dev@gmail.com>
Example
-------
.. jupyter-execute::
from msdss_base_database import Database
# Setup database
db = Database()
# Check if the table exists and drop if it does
if db.has_table("test_table"):
db.drop_table("test_table")
# Create sample table
columns = [
dict(name='id', type_='Integer', primary_key=True),
('column_one', 'String'),
('column_two', 'Integer')
]
db.create_table('test_table', columns)
# Write data to table
data = {
'id': [1, 2, 3],
'column_one': ['a', 'b', 'c'],
'column_two': [2, 4, 6]
}
db.insert('test_table', data)
df = db.select('test_table')
# Get number of rows in table
columns = db.columns('test_table')
# Display results
print('df:\\n')
print(df)
print(f'\\ncolumns: {columns}')
"""
table = self._get_table(table)
out = len(table.c)
return out
[docs] def create_table(self, table, columns):
"""
Create a table in the database.
Parameters
----------
table : str
Name of the table to create.
columns : list(dict) or list(list)
List of dict (kwargs) or lists (positional args) that are passed to :class:`sqlalchemy.schema.Column`. Data types can also be specified as str.
* Example list(dict): ``[dict(name='id', type_='Integer', autoincrement=True, primary_key=True), dict(name='col', type='String')]``
* Example list(list): ``[('id', 'Integer'), ('col', 'String')]``
Author
------
Richard Wen <rrwen.dev@gmail.com>
Example
-------
.. jupyter-execute::
from msdss_base_database import Database
# Setup database
db = Database()
# Drop table if exists
if db.has_table('test_table'):
db.drop_table('test_table')
# Create column definitions
columns = [
dict(name='id', type_='Integer', primary_key=True),
dict(name='column_one', type_='String'),
dict(name='column_two', type_='Integer')
]
# Create the table
db.create_table('test_table', columns)
# View the table
df = db.select('test_table')
print(df)
"""
columns = self._list_to_columns(columns)
tb = sqlalchemy.Table(table, self._metadata, *columns, extend_existing=True)
tb.create(self._connection)
[docs] def delete(self, table, where, where_boolean='AND', *args, **kwargs):
"""
Remove rows from a table in the database.
Parameters
----------
table : str
Name of the table to remove rows from.
where : list of list or list of tuple or None
list of where statements the form of ``['column_name', 'operator', value]`` to further filter individual values or rows.
See parameter ``where`` in :meth:`msdss_base_database.core.Database._build_query`.
where_boolean : str
One of ``AND`` or ``OR`` to combine ``where`` statements with. Defaults to ``AND`` if not one of ``AND`` or ``OR``.
* Operators are one of:
.. jupyter-execute::
:hide-code:
from msdss_base_database.defaults import DEFAULT_SUPPORTED_OPERATORS
for operator in DEFAULT_SUPPORTED_OPERATORS:
print(operator)
* Values can be any single value such as ``int`` or ``str``
* Examples: ``['column_two', '>', 2]``, ``['column_one', 'LIKE', 'a']``, ``[['column_two', '>', 2], ['column_one', 'LIKE', 'a']]``
*args, **kwargs
Additional arguments passed to :meth:`msdss_base_database.core.Database._execute_query`.
Author
------
Richard Wen <rrwen.dev@gmail.com>
Example
-------
.. jupyter-execute::
from msdss_base_database.core import Database
db = Database()
# Check if the table exists and drop if it does
if db.has_table("test_table"):
db.drop_table("test_table")
# Create sample table
columns = [
dict(name='id', type_='Integer', primary_key=True),
('column_one', 'String'),
('column_two', 'Integer')
]
db.create_table('test_table', columns)
# Write sample data
data = {
'id': [1, 2, 3],
'column_one': ['a', 'b', 'c'],
'column_two': [2, 4, 6]
}
db.insert('test_table', data)
df = db.select('test_table')
# Delete a row with matching id
db.delete(
'test_table',
where=('id', '=', 1)
)
df_delete_id = db.select('test_table')
# Delete a row with matching query
db.delete(
'test_table',
where=[('id', '<', 3), ('column_one', '=', 'c')],
where_boolean='OR'
)
df_delete_where = db.select('test_table')
# Print results
print('df:\\n')
print(df)
print('\\ndf_delete_id:\\n')
print(df_delete_id)
print('\\ndf_delete_where:\\n')
print(df_delete_where)
"""
sql = self._build_query(table=table, where=where, where_boolean=where_boolean, delete=True)
cursor = self._execute_query(sql, *args, **kwargs)
[docs] def drop_table(self, table, *args, **kwargs):
"""
Remove a table from the database.
Parameters
----------
table : str
Name of the table to remove.
*args, **kwargs
Additional arguments passed to :meth:`sqlalchemy.schema.Table.drop`.
Author
------
Richard Wen <rrwen.dev@gmail.com>
Example
-------
.. jupyter-execute::
from msdss_base_database.core import Database
db = Database()
# Check if the table exists and drop if it does
if db.has_table("test_table"):
db.drop_table("test_table")
# Create sample table
columns = [
dict(name='id', type_='Integer', primary_key=True),
('column_one', 'String'),
('column_two', 'Integer')
]
db.create_table('test_table', columns)
before_drop = db.has_table('test_table')
# Drop the table
db.drop_table('test_table')
after_drop = db.has_table('test_table')
# Print results
print('before_drop: ' + str(before_drop))
print('after_drop: ' + str(after_drop))
"""
tb = self._get_table(table)
tb.drop(*args, **kwargs)
[docs] def has_table(self, table, *args, **kwargs):
"""
Check if a table exists.
Parameters
----------
table : str
Name of the table to check.
*args, **kwargs
Additional arguments passed to :meth:`sqlalchemy.engine.reflection.Inspector.has_table`.
Return
------
bool
Returns ``True`` if the table exists and ``False`` otherwise.
Author
------
Richard Wen <rrwen.dev@gmail.com>
Example
-------
.. jupyter-execute::
from msdss_base_database.core import Database
db = Database()
# Drop the table if it exists
if db.has_table('test_table'):
db.drop_table('test_table')
before_create = db.has_table('test_table') # False
# Create sample table
columns = [
dict(name='id', type_='Integer', primary_key=True),
('column_one', 'String'),
('column_two', 'Integer')
]
db.create_table('test_table', columns)
# Check if table exists again
after_create = db.has_table('test_table') # True
# Print results
print('before_create: ' + str(before_create))
print('after_create: ' + str(after_create))
"""
out = self._inspector.has_table(table, *args, **kwargs)
return out
[docs] def insert(self, table, data, *args, **kwargs):
"""
Insert additional data to the database.
Parameters
----------
table : str
Name of the table to insert additional data to.
data : dict or list or :class:`pandas:pandas.DataFrame`
Dataframe with the data to write to the database. If ``dict`` or ``list`` see :class:`pandas:pandas.DataFrame`.
*args, **kwargs
Additional arguments passed to :meth:`msdss_base_database.core.Database._write`. Except that ``if_exists`` is always set to ``append``.
Author
------
Richard Wen <rrwen.dev@gmail.com>
Example
-------
.. jupyter-execute::
from msdss_base_database import Database
# Setup database
db = Database()
# Check if the table exists and drop if it does
if db.has_table("test_table"):
db.drop_table("test_table")
# Create sample table
columns = [
dict(name='id', type_='Integer', primary_key=True),
('column_one', 'String'),
('column_two', 'Integer')
]
db.create_table('test_table', columns)
df = db.select('test_table')
# Write data to table
data = {
'id': [1, 2, 3],
'column_one': ['a', 'b', 'c'],
'column_two': [2, 4, 6]
}
db.insert('test_table', data)
df_insert = db.select('test_table')
# Insert more new values to table
more_new = {
'id': [5, 6, 7],
'column_one': ['e', 'f', 'g'],
'column_two': [10, 12, 14]
}
db.insert('test_table', more_new)
df_insert_more = db.select('test_table')
# Display results
print('df:\\n')
print(df)
print('\\ndf_insert:\\n')
print(df_insert)
print('\\ndf_insert_more:\\n')
print(df_insert_more)
"""
self._write_data(table=table, data=data, if_exists='append', *args, **kwargs)
[docs] def rows(self, table):
"""
Get number of rows for a table.
Parameters
----------
table : str
Name of the table to get rows for.
Author
------
Richard Wen <rrwen.dev@gmail.com>
Example
-------
.. jupyter-execute::
from msdss_base_database import Database
# Setup database
db = Database()
# Check if the table exists and drop if it does
if db.has_table("test_table"):
db.drop_table("test_table")
# Create sample table
columns = [
dict(name='id', type_='Integer', primary_key=True),
('column_one', 'String'),
('column_two', 'Integer')
]
db.create_table('test_table', columns)
# Write data to table
data = {
'id': [1, 2, 3],
'column_one': ['a', 'b', 'c'],
'column_two': [2, 4, 6]
}
db.insert('test_table', data)
df = db.select('test_table')
# Get number of columns in table
rows = db.rows('test_table')
# Display results
print('df:\\n')
print(df)
print(f'\\nrows: {rows}')
"""
table = self._get_table(table)
out = sqlalchemy.select([sqlalchemy.func.count()]).select_from(table).scalar()
return out
[docs] def select(
self,
table,
select='*',
where=None,
group_by=None,
aggregate=None,
aggregate_func='count',
order_by=None,
order_by_sort='asc',
limit=None,
offset=None,
where_boolean='AND',
*args, **kwargs):
"""
Query data from a table in the database.
Parameters
----------
table : str
Name of the database table to query from.
select : str or list(str) or list(:class:`sqlalchemy:sqlalchemy.schema.Column`) or None
List of column names or a single column name to filter or select from the table.
* If ``None``, columns will not be added to the select statement.
* If ``'*'``, then all columns will be selected
where : list of list or list of tuple or None
list of where statements the form of ``['column_name', 'operator', value]`` to further filter individual values or rows.
* Operators are one of:
.. jupyter-execute::
:hide-code:
from msdss_base_database.defaults import DEFAULT_SUPPORTED_OPERATORS
for operator in DEFAULT_SUPPORTED_OPERATORS:
print(operator)
* Values can be any single value such as ``int`` or ``str``
* Examples: ``['column_two', '>', 2]``, ``['column_one', 'LIKE', 'a']``, ``[['column_two', '>', 2], ['column_one', 'LIKE', 'a']]``
group_by : str or list(str) or None
Single or list of column names to group by. This should be used with ``aggregate`` and ``aggregate_func``.
aggregate : str or list(str) or None
Single or list of column names to aggregate using the ``aggregate_func``. This should be used with ``aggregate_func``.
aggregate_func : str or list(str)
Function name (such as 'count' or 'sum') from :class:`sqlalchemy:sqlalchemy.sql.functions.Function` for aggregating records from each ``aggregate`` column.
If a list of str, then it must have the same number of elements as ``aggregate`` or else only the shortest length list will be used.
order_by : str or list(str) or None
Single or list of column names to order or sort by.
order_by_sort : str or list(str)
Sort the records in increasing or decreasing order by each column in ``order_by``, where the value can be one of 'asc' for ascending or 'desc' for descending'.
If a list of str, then it must have the same number of elements as ``order_by`` or else only the shortest length list will be used.
limit : int or None
Integer number to limit the number of rows returned.
offset : int or None
Number of rows to skip for the query.
where_boolean : str
One of ``AND`` or ``OR`` to combine ``where`` statements with. Defaults to ``AND`` if not one of ``AND`` or ``OR``.
*args, **kwargs
Additional parameters passed to :meth:`pandas:pandas.read_sql`.
Returns
-------
:class:`pandas:pandas.DataFrame`
pandas dataframe containing the queried data.
Author
------
Richard Wen <rrwen.dev@gmail.com>
Example
-------
.. jupyter-execute::
from msdss_base_database.core import Database
# Setup database
db = Database()
# Check if the table exists and drop if it does
if db.has_table("test_table"):
db.drop_table("test_table")
# Create sample table
columns = [
dict(name='id', type_='Integer', primary_key=True),
('column_one', 'String'),
('column_two', 'Integer')
]
db.create_table('test_table', columns)
# Write sample data
data = {
'id': [1, 2, 3],
'column_one': ['a', 'b', 'c'],
'column_two': [2, 4, 6]
}
db.insert('test_table', data)
df = db.select('test_table')
# Read data from table using select and limit
data = db.select(
'test_table',
select = ['column_one', 'column_two'],
limit = 5
)
df_select = db.select('test_table')
# Use where statements
data = db.select(
'test_table',
select = 'column_one',
where = [('column_two', '<', '15'), ('column_one', '=', 'b')],
where_boolean = 'AND'
)
df_where = db.select('test_table')
# Order results
data = db.select(
'test_table',
select = ['column_one', 'column_two'],
order_by = ['column_one', 'column_two'],
order_by_sort = ['asc', 'desc']
)
df_order = db.select('test_table')
# Aggregate read
data = db.select(
'test_table',
select = 'column_one',
group_by = 'column_one',
aggregate = 'column_two',
aggregate_func = ['count', 'sum']
)
df_agg = db.select('test_table')
# Display results
print('df:\\n')
print(df)
print('\\ndf_select:\\n')
print(df_select)
print('\\ndf_where:\\n')
print(df_where)
print('\\ndf_order:\\n')
print(df_order)
print('\\ndf_agg:\\n')
print(df_agg)
"""
sql = self._build_query(
table,
select=select,
where=where,
group_by=group_by,
aggregate=aggregate,
aggregate_func=aggregate_func,
order_by=order_by,
order_by_sort=order_by_sort,
limit=limit,
offset=offset,
where_boolean=where_boolean
)
out = pandas.read_sql(sql = sql, con = self._connection, *args, **kwargs)
return out
[docs] def update(self, table, where, values, *args, **kwargs):
"""
Update a table from the database.
Parameters
----------
table : str
Name of the table to update.
where : list of list or list of tuple or None
list of where statements the form of ``['column_name', 'operator', value]`` to update individual values or rows.
* Operators are one of:
.. jupyter-execute::
:hide-code:
from msdss_base_database.defaults import DEFAULT_SUPPORTED_OPERATORS
for operator in DEFAULT_SUPPORTED_OPERATORS:
print(operator)
* Values can be any single value such as ``int`` or ``str``
* Examples: ``['column_two', '>', 2]``, ``['column_one', 'LIKE', 'a']``, ``[['column_two', '>', 2], ['column_one', 'LIKE', 'a']]``
values : dict
Dictionary representing values to update if they match the ``where`` parameter requirements.
*args, **kwargs
Additional arguments passed to :meth:`msdss_base_database.core.Database._execute_query`.
Author
------
Richard Wen <rrwen.dev@gmail.com>
Example
-------
.. jupyter-execute::
from msdss_base_database import Database
# Setup database
db = Database()
# Check if the table exists and drop if it does
if db.has_table("test_table"):
db.drop_table("test_table")
# Create sample table
columns = [
dict(name='id', type_='Integer', primary_key=True),
('column_one', 'String'),
('column_two', 'Integer')
]
db.create_table('test_table', columns)
# Write sample data
data = {
'id': [1, 2, 3],
'column_one': ['a', 'b', 'c'],
'column_two': [2, 4, 6]
}
db.insert('test_table', data)
df = db.select('test_table')
# Update values in table
db.update(
'test_table',
where=('id', '>', 1),
values={'column_one': 'AA'})
df_update = db.select('test_table')
# Print results
print('df:\\n')
print(df)
print('\\ndf_update:\\n')
print(df_update)
"""
sql = self._build_query(table=table, where=where, values=values, update=True)
cursor = self._execute_query(sql, *args, **kwargs)