QuickstartΒΆ
Let’s see alchy
in action. We’ll start with some model definitions.
from alchy import ModelBase, make_declarative_base
from sqlalchemy import orm, Column, types, ForeignKey
class Base(ModelBase):
# extend/override ModelBase if necessary
pass
Model = make_declarative_base(Base=Base)
class User(Model):
__tablename__ = 'user'
_id = Column(types.Integer(), primary_key=True)
name = Column(types.String())
email = Column(types.String())
level = Column(types.Integer())
items = orm.relationship('UserItem')
class UserItem(Model):
# when no __tablename__ defined,
# one is autogenerated using class name
# like this:
#__tablename__ = 'user_item'
_id = Column(types.Integer(), primary_key=True)
user_id = Column(types.Integer(), ForeignKey('user._id'))
name = Column(types.String())
user = orm.relationship('User')
Next, we need to interact with our database. For that we will use a alchy.manager.Manager
.
from alchy import Manager
# Config can be either (1) dict, (2) class, or (3) module.
config = {
'SQLALCHEMY_DATABASE_URI': 'sqlite://'
}
# Be sure to pass in our declarative base defined previously.
# This is needed so that Model.metadata operations like
# create_all(), drop_all(), and reflect() work.
db = Manager(config=config, Model=Model)
Create our database tables.
db.create_all()
Now, create some records.
# initialize using keyword args
user1 = User(name='Fred', email='fred@example.com')
# print('user1:', user1)
# ...or initialize using a dict
user2 = User({'name': 'Barney'})
# print('user2:', user2)
# update using either method as well
user2.update(email='barney@example.org')
user2.update({'email': 'barney@example.com'})
# print('user2 updated:', user2)
Add them to the database.
# there are several options for adding records
# add and commit in one step using positional args
db.add_commit(user1, user2)
# ...or add/commit using a list
users = [user1, user2]
db.add_commit(users)
# ...or separate add and commit calls
db.add(user1, user2)
db.commit()
# ...or with a list
db.add(users)
db.commit()
# ...or separate adds and commit
db.add(user1)
db.add(user2)
db.commit()
Fetch model and operate.
# create user
db.add_commit(User(name='Wilma', email='wilma@example.com'))
# fetch from database
user = User.get(user1._id)
# print('user:', user)
# convert to dict
user_dict = user.to_dict()
# print('user dict:', user_dict)
# ...or just pass object directly to dict()
user_dict = dict(user)
# make some changes
user.update(level=5)
# and refresh
user.refresh()
# or flush
user.flush()
# access the session that loaded the model instance
assert user.session() == db.object_session(user)
# delete user
user.delete()
db.commit()
# ...or via db
db.delete(user)
db.commit()
# ...or all-in-one step
db.delete_commit(user)
Query records from the database.
# add some more users
db.add_commit(
User(items=[UserItem()]),
User(items=[UserItem()]),
User(items=[UserItem()]),
User(items=[UserItem()]),
User(items=[UserItem()])
)
# there are several syntax options for querying records
# using db.session directly
# print('all users:', db.session.query(User).all())
# ...or using db directly (i.e. db.session proxy)
assert db.query(User).all() == db.session.query(User).all()
# ...or via query property on model class
assert User.query.all() == db.session.query(User).all()
Use features from the enhanced query class.
q = User.query.join(UserItem)
# entities
assert q.entities == [User]
assert q.join_entities == [UserItem]
assert q.all_entities == [User, UserItem]
# paging
assert str(q.page(2, per_page=2)) == str(q.limit(2).offset((2-1) * 2))
# pagination
page2 = q.paginate(2, per_page=2)
assert str(page2.query) == str(q)
assert page2.page == 2
assert page2.per_page == 2
assert page2.total == q.count()
assert page2.items == q.limit(2).offset((2-1) * 2).all()
assert page2.prev_num == 1
assert page2.has_prev == True
assert page2.next_num == 3
assert page2.has_next == True
page_1 = page2.prev()
page_3 = page2.next()
# searching
# ...extend class definitions to support advanced and simple searching
User.__advanced_search__ = User.__simple_search__ = {
'user_email': lambda value: User.email.like('%{0}%'.format(value)),
'user_name': lambda value: User.name.like('%{0}%'.format(value))
}
UserItem.__advanced_search__ = {
'item_name': lambda value: UserItem.name.like('%{0}%'.format(value))
}
search = User.query.search('example.com', {'user_name': 'wilma'})
# print('search:', str(search))
assert search.count() > 0
# entity loading
User.query.join_eager(User.items)
User.query.joinedload(User.items)
User.query.lazyload(User.items)
User.query.immediateload(User.items)
User.query.noload(User.items)
User.query.subqueryload(User.items)
# column loading
User.query.load_only('_id', 'name')
User.query.defer('email')
User.query.undefer('email') # if User.email undeferred in class definition
User.query.undefer_group('group1', 'group2') # if under groups defined in class
# utilities
User.query.map(lambda user: user.level)
User.query.pluck('level')
User.query.index_by('email')
User.query.chain().value()
User.query.reduce(
lambda result, user: result + 1 if user.level > 5 else result,
initial=0
)
For more details regarding the chaining API (i.e. Query.chain()
), see the pydash documentation.
Utilize ORM events.
from alchy import events
class User(Model):
__table_args__ = {
# this is needed since we're replacing the ``User`` class defined above
'extend_existing': True
}
_id = Column(types.Integer(), primary_key=True)
name = Column(types.String())
email = Column(types.String())
level = Column(types.Integer())
@events.before_insert_update()
def validate(self, *args, **kargs):
'''Validate model instance'''
# do validation
return
@events.on_set('email')
def on_set_email(self, value, oldvalue, initator):
if self.query.filter(User.email==value, User._id!=self._id).count() > 0:
raise ValueError('Email already exists in database')
user = User(email='one@example.com')
db.add_commit(user)
try:
User(email=user.email)
except ValueError as ex:
pass
Finally, clean up after ourselves.
db.drop_all()
See also
For further details consult API Reference.