当这个操作完成之后,这个 User 实例的状态为 pending。目前实际上还没有执行SQL操作,也就是说数据库中还没有产生和这个 User 实例对应的行。Session 将会在需要的时候执行相应的SQL命令,这个过程称之为flush。如果试图查询 Ed Jones,所有处于 pending 状态的信息将会首先被 flush,然后负责进行查询的 SQL 语言在此之后立即被执行。
例如,创建一个查询来获取刚刚创建的用户。这个查询会返回一个和之前添加的用户相同的用户实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
>>> our_user = session.query(User).filter_by(name='ed').first() BEGIN (implicit) INSERT INTO users (name, fullname, password) VALUES (?, ?, ?) ('ed', 'Ed Jones', 'edspassword') SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password FROM users WHERE users.name = ? LIMIT ? OFFSET ? ('ed', 1, 0) >>> our_user <User(name='ed', fullname='Ed Jones', password='edspassword')>
Session 的 query 函数会返回一个 Query 对象。query 函数可以接受多种参数类型。可以是类,或者是类的instrumented descriptor。下面的这个例子取出了所有的 User 记录。
1 2 3 4 5 6
>>> for instance in session.query(User).order_by(User.id): ... print(instance.name, instance.fullname) ed Ed Jones wendy Wendy Williams mary Mary Contrary fred Fred Flinstone
>>> for name, fullname in session.query(User.name, User.fullname): ... print(name, fullname) ed Ed Jones wendy Wendy Williams mary Mary Contrary fred Fred Flinstone
>>> for u in session.query(User).order_by(User.id)[1:3]: ... print(u) <User(name='wendy', fullname='Wendy Williams', password='foobar')> <User(name='mary', fullname='Mary Contrary', password='xxg527')>
>>> for name, in session.query(User.name).filter_by(fullname='Ed Jones'): ... print(name) ed >>> for name, in session.query(User.name).filter(User.fullname=='Ed Jones'): ... print(name) ed
注意 Query 对象是 generative 的,这意味可以把他们串接起来调用,如下:
1 2 3 4 5
>>> for user in session.query(User).\ ... filter(User.name=='ed').\ ... filter(User.fullname=='Ed Jones'): ... print(user) <User(name='ed', fullname='Ed Jones', password='f8s7ccs')>
>>> user = query.one() Traceback (most recent call last): ... MultipleResultsFound: Multiple rows were found for one()
没有查找到结果时:
1 2 3 4
>>> user = query.filter(User.id == 99).one() Traceback (most recent call last): ... NoResultFound: No row was found for one()
one_or_none():从名称可以看出,当结果数量为0时返回 None, 多于1个时报错
scalar() 和 one() 类似,但是返回单项而不是 tuple
嵌入使用SQL
可以在 Query 中通过 text() 使用SQL语句。例如:
1 2 3 4 5 6 7 8 9
>>> from sqlalchemy import text >>> for user in session.query(User).\ ... filter(text("id<224")).\ ... order_by(text("id")).all(): ... print(user.name) ed wendy mary fred
>>> session.query(User).from_statement( ... text("SELECT * FROM users where name=:name")).\ ... params(name='ed').all() [<User(name='ed', fullname='Ed Jones', password='f8s7ccs')>]
计数
Query 定义了一个很方便的计数函数 count()
1 2 3 4 5 6 7 8 9 10
>>> session.query(User).filter(User.name.like('%ed')).count() SELECT count(*) AS count_1 FROM (SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password FROM users WHERE users.name LIKE ?) AS anon_1 ('%ed',) 2
>>> from sqlalchemy import func >>> session.query(func.count(User.name), User.name).group_by(User.name).all() SELECT count(users.name) AS count_1, users.name AS users_name FROM users GROUP BY users.name () [(1, u'ed'), (1, u'fred'), (1, u'mary'), (1, u'wendy')]
为了实现最简单的 SELECT count(*) FROM table,可以如下调用
1 2 3 4 5
>>> session.query(func.count('*')).select_from(User).scalar() SELECT count(?) AS count_1 FROM users ('*',) 4
如果对 User 的主键进行计数,那么 select_from 也可以省略。
1 2 3 4 5
>>> session.query(func.count(User.id)).scalar() SELECT count(users.id) AS count_1 FROM users () 4
关系(Relationship)
『关系』是关系型数据库的一大特色,是在建模过程中的一个重要的抽象过程。
建立关系
之前已经建立了一个用户( User )表,现在考虑增加一个与用户关联的新的表。在系统里面,用户可以存储多个与之相关的 email 地址。这是一种基本的一对多的关系。这个新增加的存储 email 地址的表称为 addresses。应用 Declarative,按照如下方式定义这个新表:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
from sqlalchemy import ForeignKey from sqlalchemy.orm import relationship
可以发现上面执行了三个 INSERT 命令,也就是说与 jack 关联的两个 Address 对象也被提交了。现在通过查询来取出jack。
1 2 3 4 5 6 7 8 9 10 11 12
>>> jack = session.query(User).filter_by(name='jack').one() BEGIN (implicit) SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password FROM users WHERE users.name = ? ('jack',)
>>> jack <User(name='jack', fullname='Jack Bean', password='gjffdd')>
可以发现目前只有针对 User 表的查询,而没有对 Address 表的查询。此时访问 addresses 属性,相关的SQL才会执行
1 2 3 4 5 6 7 8 9
>>> jack.addresses SELECT addresses.id AS addresses_id, addresses.email_address AS addresses_email_address, addresses.user_id AS addresses_user_id FROM addresses WHERE ? = addresses.user_id ORDER BY addresses.id (5,) [<Address(email_address='jack@google.com')>, <Address(email_address='j25@yahoo.com')>]
为了在 User 和 Address 之间构造一个简单的 join,可以通过 Query.filter() 来连接其相关列(本质是隐式写法的JOIN)。下面是一个简单的例子:
1 2 3 4 5 6 7 8
>>> for u, a in session.query(User, Address).\ ... filter(User.id==Address.user_id).\ ... filter(Address.email_address=='jack@google.com').\ ... all(): ... print(u) ... print(a) <User(name='jack', fullname='Jack Bean', password='gjffdd')> <Address(email_address='jack@google.com')>
而实际的 SQL JOIN 语法,可以通过 Query.join() 来想实现
1 2 3 4 5 6 7 8 9 10 11
>>> session.query(User).join(Address).\ ... filter(Address.email_address=='jack@google.com').\ ... all() users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password FROM users JOIN addresses ON users.id = addresses.user_id WHERE addresses.email_address = ? ('jack@google.com',) [<User(name='jack', fullname='Jack Bean', password='gjffdd')>]
>>> from sqlalchemy.orm import aliased >>> adalias1 = aliased(Address) >>> adalias2 = aliased(Address) >>> for username, email1, email2 in \ ... session.query(User.name, adalias1.email_address, adalias2.email_address).\ ... join(adalias1, User.addresses).\ ... join(adalias2, User.addresses).\ ... filter(adalias1.email_address=='jack@google.com').\ ... filter(adalias2.email_address=='j25@yahoo.com'): ... print(username, email1, email2) SELECT users.name AS users_name, addresses_1.email_address AS addresses_1_email_address, addresses_2.email_address AS addresses_2_email_address FROM users JOIN addresses AS addresses_1 ON users.id = addresses_1.user_id JOIN addresses AS addresses_2 ON users.id = addresses_2.user_id WHERE addresses_1.email_address = ? AND addresses_2.email_address = ? ('jack@google.com', 'j25@yahoo.com') jack jack@google.com j25@yahoo.com
使用子查询(Subqueries)
Query 适合于用来构造子查询。假如想要取出 User 记录,并且同时计算各个用户的 Address 的数量。产生这种功能的 SQL 指令最好的办法是按照 user 的 id 分组统计地址的数量,然后 join 到外层查询。此时需要 LEFT JOIN,这样可以使得没有地址的用户也会出现在查询结果中(地址数量为0)。
期望的SQL命令是这样的:
1 2 3 4
SELECT users.*, adr_count.address_count FROM users LEFTOUTERJOIN (SELECT user_id, count(*) AS address_count FROM addresses GROUPBY user_id) AS adr_count ON users.id=adr_count.user_id
当生成了 statement 之后,其完全可以视为一个 Table 来使用。可以通过 c 来访问它的属性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
>>> for u, count in session.query(User, stmt.c.address_count).\ ... outerjoin(stmt, User.id==stmt.c.user_id).order_by(User.id): ... print(u, count) SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password, anon_1.address_count AS anon_1_address_count FROM users LEFT OUTER JOIN (SELECT addresses.user_id AS user_id, count(?) AS address_count FROM addresses GROUP BY addresses.user_id) AS anon_1 ON users.id = anon_1.user_id ORDER BY users.id ('*',) <User(name='ed', fullname='Ed Jones', password='f8s7ccs')> None <User(name='wendy', fullname='Wendy Williams', password='foobar')> None <User(name='mary', fullname='Mary Contrary', password='xxg527')> None <User(name='fred', fullname='Fred Flinstone', password='blah')> None <User(name='jack', fullname='Jack Bean', password='gjffdd')> 2
>>> stmt = session.query(Address).\ ... filter(Address.email_address != 'j25@yahoo.com').\ ... subquery() >>> adalias = aliased(Address, stmt) >>> for user, address in session.query(User, adalias).\ ... join(adalias, User.addresses): ... print(user) ... print(address) SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password, anon_1.id AS anon_1_id, anon_1.email_address AS anon_1_email_address, anon_1.user_id AS anon_1_user_id FROM users JOIN (SELECT addresses.id AS id, addresses.email_address AS email_address, addresses.user_id AS user_id FROM addresses WHERE addresses.email_address != ?) AS anon_1 ON users.id = anon_1.user_id ('j25@yahoo.com',) <User(name='jack', fullname='Jack Bean', password='gjffdd')> <Address(email_address='jack@google.com')>
>>> from sqlalchemy.sql import exists >>> stmt = exists().where(Address.user_id==User.id) >>> for name, in session.query(User.name).filter(stmt): ... print(name) SELECT users.name AS users_name FROM users WHERE EXISTS (SELECT * FROM addresses WHERE addresses.user_id = users.id) () jack
Query 还定义了若干个自动使用了 EXISTS 的操作。上面的例子可以用 any() 来完成:
1 2 3 4 5 6 7 8 9 10
>>> for name, in session.query(User.name).\ ... filter(User.addresses.any()): ... print(name) SELECT users.name AS users_name FROM users WHERE EXISTS (SELECT 1 FROM addresses WHERE users.id = addresses.user_id) () jack
any()也接受筛选条件来限制匹配的行:
1 2 3 4
>>> for name, in session.query(User.name).\ ... filter(User.addresses.any(Address.email_address.like('%google%'))): ... print(name) jack
>>> from sqlalchemy.orm import subqueryload >>> jack = session.query(User).\ ... options(subqueryload(User.addresses)).\ ... filter_by(name='jack').one() SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password FROM users WHERE users.name = ? ('jack',) SELECT addresses.id AS addresses_id, addresses.email_address AS addresses_email_address, addresses.user_id AS addresses_user_id, anon_1.users_id AS anon_1_users_id FROM (SELECT users.id AS users_id FROM users WHERE users.name = ?) AS anon_1 JOIN addresses ON anon_1.users_id = addresses.user_id ORDER BY anon_1.users_id, addresses.id ('jack',) >>> jack <User(name='jack', fullname='Jack Bean', password='gjffdd')>
>>> jack = session.query(User).\ ... options(joinedload(User.addresses)).\ ... filter_by(name='jack').one() SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password, addresses_1.id AS addresses_1_id, addresses_1.email_address AS addresses_1_email_address, addresses_1.user_id AS addresses_1_user_id FROM users LEFT OUTER JOIN addresses AS addresses_1 ON users.id = addresses_1.user_id WHERE users.name = ? ORDER BY addresses_1.id ('jack',)
>>> jack <User(name='jack', fullname='Jack Bean', password='gjffdd')>
>>> from sqlalchemy.orm import contains_eager >>> jacks_addresses = session.query(Address).\ ... join(Address.user).\ ... filter(User.name=='jack').\ ... options(contains_eager(Address.user)).\ ... all() SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password, addresses.id AS addresses_id, addresses.email_address AS addresses_email_address, addresses.user_id AS addresses_user_id FROM addresses JOIN users ON users.id = addresses.user_id WHERE users.name = ? ('jack',)
>>> session.delete(jack) >>> session.query(User).filter_by(name='jack').count() UPDATE addresses SET user_id=? WHERE addresses.id = ? ((None, 1), (None, 2)) DELETE FROM users WHERE users.id = ? (5,) SELECT count(*) AS count_1 FROM (SELECT users.id AS users_id, users.name AS users_name, users.fullname AS users_fullname, users.password AS users_password FROM users WHERE users.name = ?) AS anon_1 ('jack',) 0
此时可以开始查询了。先以 ‘firstpost’ 为关键字来检索所有的博文。使用 any 来查询拥有关键词 ‘firstpost’ 的博文:
1 2 3 4
>>> session.query(BlogPost).\ ... filter(BlogPost.keywords.any(keyword='firstpost')).\ ... all() [BlogPost("Wendy's Blog Post", 'This is a test', <User(name='wendy', fullname='Wendy Williams', password='foobar')>)]
如果希望将查询范围限制在wendy用户所拥有的博文之内,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
>>> session.query(BlogPost).\ ... filter(BlogPost.author==wendy).\ ... filter(BlogPost.keywords.any(keyword='firstpost')).\ ... all() SELECT posts.id AS posts_id, posts.user_id AS posts_user_id, posts.headline AS posts_headline, posts.body AS posts_body FROM posts WHERE ? = posts.user_id AND (EXISTS (SELECT 1 FROM post_keywords, keywords WHERE posts.id = post_keywords.post_id AND keywords.id = post_keywords.keyword_id AND keywords.keyword = ?)) (2, 'firstpost') [BlogPost("Wendy's Blog Post", 'This is a test', <User(name='wendy', fullname='Wendy Williams', password='foobar')>)]
或者可以直接在wendy的posts属性上进行查询:
1 2 3 4
>>> wendy.posts.\ ... filter(BlogPost.keywords.any(keyword='firstpost')).\ ... all() [BlogPost("Wendy's Blog Post", 'This is a test', <User(name='wendy', fullname='Wendy Williams', password='foobar')>)]