Skip to content

Python SQL ORM 操作

该指南以 Python 的 SQLAlchemy 和 MySQL 数据库 为例,因为 MySQL 是日常开发最常使用的数据库,同时,Python 的很多 ORM 框架都是由 SQLAlchemy 衍生而来。

SQLAlchemy基础

SQLAlchemy是一个实现了ORM的框架。使用SQLAlchemy就无需自己构建ORM,无需自己编写SQL语句,也无需自己思考如何遵照规范设计,并且可以随时修改。

使用pip install mysqlclient安装MySQL驱动,也可用pymysql替代;使用pip install SQLAlchemy安装SQLAlchemy。

SQLAlchemy驱动和Flask-SQLAlchemy比较像,都是先配置驱动,然后创建模型类,使模型类继承自Model类,然后编写成员变量,这些成员变量就是对应着数据表的字段。下面以一个学生和成绩的例子演示如何构建ORM。

python
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import sessionmaker

engine = create_engine(
    'mysql://root:254456@127.0.0.1:3306/db?charset=utf8', pool_size=20, max_overflow=0, pool_recycle=3600, echo=True
)
Session = sessionmaker(bind=engine)  # 创建Session会话,连接数据库
BaseModel = declarative_base()  # 创建数据库基类

class Student(BaseModel):
    __tablename__ = 'tb_student'  # 数据表名
    id = Column(Integer, primary_key=True)  # 主键
    name = Column(String(10), nullable=False)
    _class = Column(String(10), nullable=False)

class Score(BaseModel):
    __tablename__ = 'tb_score'
    id = Column(Integer, primary_key=True)
    subject = Column(String(10), nullable=False)
    score = Column(Integer, nullable=False)
    student_id = Column(Integer, ForeignKey('tb_student.id'))  # 外键

def create_tables():
    BaseModel.metadata.create_all(engine)  # 令所有继承自基类的模型类创建数据库  

if __name__ == '__main__':
    create_tables()

运行这段代码后,程序就会在db数据表中创建tb_student和tb_score这两个数据表。

接下来使用上方的模型创建几条数据。

python
if __name__ == '__main__':
    session = Session()
    student = Student(name='小明', _class='三年一班')
    score = Score(subject='数学', score=100, student_id=1)
    session.add_all([student, score])
    session.commit()
    session.close()

执行这段代码后,数据库中会多两条数据。若插入数据时报了外键约束的错误,建议将这两个对象分开添加。

下面将详细讲解SQLAlchemy每一步都该如何配置。

引擎配置

SQLAlchemy第一步就是创建引擎。create_engine()函数通过解析URL,确定要连接的数据库、用户名、密码、端口、地址等信息,并连接数据库。单一的引擎代表进程管理多个数据库连接,以并发方式调用。引擎只会创建一次,不会再在每个对象或函数调用时重复创建。

  1. 给API配置引擎
python
sqlalchemy.create_engine(*args, **kwargs)

该方法用于创建新的引擎实例,常常第一个用于传URL,后面再跟关键字参数。

python
engine = create_engine(
    'mysql://root:254456@127.0.0.1:3306/db?charset=utf8', pool_size=20, max_overflow=0, pool_recycle=3600, echo=True
)

这部分代码不会真正创建连接。

create_engine需要如下参数:

  • url:字符串形式为dialect[+driver]://user:password@host/dbname[?key=value...]。其中dialect是数据库名称,如mysql、oracle、postgrasql等,driver是DBAPI的名称,如pymysql、mysqlclient。
  • case_sensitive:表示列名是否区分大小写。
  • connect_args:将直接传递给DBAPI的字典。其作为connect方法的其他关键字参数,常常用于自定义底层连接。
  • echo:是否开启日志记录器。
  • echo_pool:是否记录连接信息并在默认日志记录器中输出。
  • encoding:用于字符串编码和解码。
  • implicit_returning:是否在发出没有现有RETURNING()子句的单行INSERT语句时使用与返回兼容的构造来获取新生成的主键值。
  • label_length:可选的整数值,将动态生成的列标签的大小限制为多个字符。
  • logging_name:将sqlalchemy.engine中生成日志记录的name字段改为自定义的标识符。
  • max_overflow:允许在连接池中溢出的连接数。
  • module:指定引擎直接使用的DBAPI模块,应为模块引用,而不是字符串。
  • pool_pre_ping:是否启用连接池预ping功能。
  • pool_size:在连接池中保持打开的连接数。
  • pool_recycle:设置回收时间,默认为-1,不回收。
  • pool_timeout:获取连接超时等待的时间。
  1. 常用连接配置

连接MySQL:

python
MYSQL_URL = 'mysql://root:123456@127.0.0.1:3306/teaching?charset=utf8'
engine = create_engine(MYSQL_URL, pool_size=20, max_overflow=20, pool_recycle=3600,pool_pre_ping=True, encoding='UTF-8')

连接到SQLite:

python
engine = create_engine('sqlite:////home/path/data.db')  # Linux
engine = create_engine(r'sqlite:///C:\path\data.db')  # Windows

创建会话

ORM和Core是SQLAlchemy核心,会话则是ORM中的核心。会话用于管理数据库及数据对象的操作。

  1. 会话的创建

与数据库交互前需要创建会话。Session是ORM操作数据库的句柄,但是Session并未真正地与数据库创建链接,只有发生数据库操作时才会创建连接。

python
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

engine = create_engine('mysql://root:254456@127.0.0.1:3306/db?charset=utf8', pool_size=20, max_overflow=20, pool_recycle=3600, echo='debug')
Session = sessionmaker(bind=engine)

上面是提前配置引擎后创建Session对象,还可以延迟绑定引擎。

python
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine

Session = sessionmaker()
engine = create_engine('mysql://root:254456@127.0.0.1:3306/db?charset=utf8', pool_size=20, max_overflow=20, pool_recycle=3600, echo='debug')
Session.configure(bind=engine)

Session配置一次即可,然后由其他模块导入。使用前先实例化Session,最后将Session关闭。

Session接口类为sqlalchemy.orm.session.Session,下面列举常用的内部方法和配置参数。

  • __init__():初始化Session,其有如下参数:

    • autocommit:默认为False。若为True,则会话不会保持持久的事务运行,将自动根据需要从引擎中获取数据库连接,并在使用后立即返回连接。若为False,则每次需要使用Session.begin()开始事务,使用commit()提交事务。

    • autoflush:默认为False。若为True则执行查询SQL之前都会先提交事务,以便数据库返回结果。autoflush通常与autocommit一起使用,很少单独使用flush。

    • bind:将会话绑定到指定的引擎。

    • binds:字典类型,为每种不同的table对象或Mapper对象指定不同的引擎,执行SQL时根据这种映射关系启用不同的引擎。如Session = sessionmaker(binds={someMappedClass:create_engine('postgresql://engine1', someDaclarativeClass:'postgresql://engine2)})

    • expire_on_commit:默认为True,每次调用commit()之后,所有实例都将过期,完成事务之后的所有对象属性的访问都将从数据库加载。

注:在最新版本的SQLAlchemy中,autocommit方法已不再支持。这里由于只在记笔记而没有运行代码,因此到了自动提交事务时才发现该参数已经废弃。由于篇幅巨大不能进行改正,因此下方凡是涉及到自动提交事务的只能改为使用begin和commit方法手动开启事务。特此说明。

  • add(instance, _warn=True):在会话中放置对象,它的状态将在下次刷新操作时持久化到数据库。重复调用将被忽略。

  • add_all(instances):接收列表参数。在会话中放置列表中的所有对象。

  • begin(substransactions=False, nested=False):在会话中标记事务的开始。Session.begin方法和autocommit一起使用。

    • substransactions:若为True则代表开启子事务。
    • nested:若为True,则标记一个事务保存点。
python
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker

Base = declarative_base()  # 创建一个声明性基类
engine = create_engine("mysql://root:254456@127.0.0.1:3306/db?charset=utf8", pool_size=20, max_overflow=0,
                       pool_recycle=3600, echo="debug")  # 配置引擎
Session = sessionmaker(engine)

class User(Base):
    """
    创建一个声明性数据表类
    """
    __tablename__ = 'users'  # 对应数据库中的表名

    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    fullname = Column(String(50))
    nickname = Column(String(50))

    def __repr__(self):
        """
        自定义类描述
        :return:
        """
        return f"<User(name={self.name}, fullname={self.fullname}, nickname={self.nickname})>"

if __name__ == '__main__':
    Base.metadata.create_all(engine)  # 在数据库生成相应表
    session = Session()
    user_one = User(name="明", fullname="李明", nickname="小明")
    session.add(user_one)
    session.autocommit = True  # 使用autocommit模式
    session.begin(nested=True)  # 标记事务开始,并且是事务保存点
    user_two = User(name="杰克", fullname="杰克·斯帕罗", nickname="杰克船长")
    user_three = User(name="彼得", fullname="彼得·帕克", nickname="蜘蛛侠")
    session.add_all([user_two, user_three])
    print(session.new)
    session.rollback()
    session.commit()
运行结果
2023-07-09 21:18:25,738 INFO sqlalchemy.engine.Engine SELECT DATABASE()
2023-07-09 21:18:25,738 INFO sqlalchemy.engine.Engine [raw sql] ()
2023-07-09 21:18:25,738 DEBUG sqlalchemy.engine.Engine Col ('DATABASE()',)
2023-07-09 21:18:25,739 DEBUG sqlalchemy.engine.Engine Row ('db',)
2023-07-09 21:18:25,739 INFO sqlalchemy.engine.Engine SELECT @@sql_mode
2023-07-09 21:18:25,739 INFO sqlalchemy.engine.Engine [raw sql] ()
2023-07-09 21:18:25,739 DEBUG sqlalchemy.engine.Engine Col ('@@sql_mode',)
2023-07-09 21:18:25,739 DEBUG sqlalchemy.engine.Engine Row ('ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION',)
2023-07-09 21:18:25,739 INFO sqlalchemy.engine.Engine SELECT @@lower_case_table_names
2023-07-09 21:18:25,739 INFO sqlalchemy.engine.Engine [raw sql] ()
2023-07-09 21:18:25,739 DEBUG sqlalchemy.engine.Engine Col ('@@lower_case_table_names',)
2023-07-09 21:18:25,739 DEBUG sqlalchemy.engine.Engine Row (1,)
2023-07-09 21:18:25,740 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-07-09 21:18:25,740 INFO sqlalchemy.engine.Engine DESCRIBE `db`.`users`
2023-07-09 21:18:25,740 INFO sqlalchemy.engine.Engine [raw sql] ()
2023-07-09 21:18:25,742 INFO sqlalchemy.engine.Engine 
CREATE TABLE users (
	id INTEGER NOT NULL AUTO_INCREMENT, 
	name VARCHAR(50), 
	fullname VARCHAR(50), 
	nickname VARCHAR(50), 
	PRIMARY KEY (id)
)

2023-07-09 21:18:25,742 INFO sqlalchemy.engine.Engine [no key 0.00009s] ()
2023-07-09 21:18:25,748 INFO sqlalchemy.engine.Engine COMMIT
2023-07-09 21:18:25,749 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-07-09 21:18:25,750 INFO sqlalchemy.engine.Engine INSERT INTO users (name, fullname, nickname) VALUES (%s, %s, %s)
2023-07-09 21:18:25,750 INFO sqlalchemy.engine.Engine [generated in 0.00014s] ('明', '李明', '小明')
IdentitySet([<User(name=杰克, fullname=杰克·斯帕罗, nickname=杰克船长)>, <User(name=彼得, fullname=彼得·帕克, nickname=蜘蛛侠)>])
2023-07-09 21:18:25,752 INFO sqlalchemy.engine.Engine ROLLBACK

运行结果为日志输出,因为设置了echo参数为debug。

  • begin_nested():标记嵌套事务的开始。允许Session.begin_nested使用with块,则不必显式使用commit()方法或flush()方法提交事务。
  • bind_mapper(mapper, bind):将映射对象绑定到指定引擎或连接,可参考Session.binds、Session.bind_table()。参数说明如下。
    • mapper:表示映射器对象、映射类的实例、映射类的基类。
    • bind:表示引擎或数据库连接。
  • bind_table(table, bind):把声明性类绑定到指定引擎或连接,参数说明如下。
    • table:是Table对象,通常是ORM映射的目标,或者存在于映射的可选对象中。
    • bind:表示引擎或数据库连接。
  • bulk_insert_mappings(mapper, mappings, return_defaults=False, render_nulls=False):为给定映射的声明性类和大量的Table构建参数的字典的列表指定批量插入,参数解释如下。
    • mapper:表示映射类或实际mapper对象,表示映射列表中的创建对象。
    • mappings:表示字典列表,每个字典都包含要插入的映射行键值对。若映射引用多个表,则每个字典必须包含要填充的每个表的所有键。
    • render_defaults:若为True,则将一次插入一行缺少默认值的行,如自增主键。允许连接继承和其他多表映射正确插入,且无需提前提供主键值。
    • render_nulls:若为True,则值为None的列将导致INSERT语句中包含空值,而不是在INSERT中省略该列。若数据库有默认值则默认值不生效,仍为null。
python
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import sessionmaker, declarative_base

Base = declarative_base()  # 创建一个声明性基类
engine = create_engine("mysql://root:254456@127.0.0.1:3306/db?charset=utf8", pool_size=20, max_overflow=0,
                       pool_recycle=3600, echo="debug")  # 配置引擎
Session = sessionmaker(engine)


class User(Base):
    """
    创建一个声明性数据表类
    """
    __tablename__ = 'users'  # 对应数据库中的表名

    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    fullname = Column(String(50))
    nickname = Column(String(50))

    def __repr__(self):
        """
        自定义类描述
        :return:
        """
        return f"<User(name={self.name}, fullname={self.fullname}, nickname={self.nickname})>"


if __name__ == '__main__':
    # Base.metadata.create_all(engine)  # 在数据库生成相应表
    session = Session()
    session.bulk_insert_mappings(User, [{"name": "杰克", "fullname": "杰克·斯帕罗", "nickname": "杰克船长"},
                                        {"name": "彼得", "fullname": "彼得·帕克", "nickname": "蜘蛛侠"}])
    # 将批量向数据库插入值
运行结果
2023-07-09 21:34:49,489 INFO sqlalchemy.engine.Engine SELECT DATABASE()
2023-07-09 21:34:49,489 INFO sqlalchemy.engine.Engine [raw sql] ()
2023-07-09 21:34:49,490 DEBUG sqlalchemy.engine.Engine Col ('DATABASE()',)
2023-07-09 21:34:49,490 DEBUG sqlalchemy.engine.Engine Row ('db',)
2023-07-09 21:34:49,490 INFO sqlalchemy.engine.Engine SELECT @@sql_mode
2023-07-09 21:34:49,490 INFO sqlalchemy.engine.Engine [raw sql] ()
2023-07-09 21:34:49,490 DEBUG sqlalchemy.engine.Engine Col ('@@sql_mode',)
2023-07-09 21:34:49,490 DEBUG sqlalchemy.engine.Engine Row ('ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION',)
2023-07-09 21:34:49,490 INFO sqlalchemy.engine.Engine SELECT @@lower_case_table_names
2023-07-09 21:34:49,490 INFO sqlalchemy.engine.Engine [raw sql] ()
2023-07-09 21:34:49,491 DEBUG sqlalchemy.engine.Engine Col ('@@lower_case_table_names',)
2023-07-09 21:34:49,491 DEBUG sqlalchemy.engine.Engine Row (1,)
2023-07-09 21:34:49,491 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-07-09 21:34:49,491 INFO sqlalchemy.engine.Engine DESCRIBE `db`.`users`
2023-07-09 21:34:49,491 INFO sqlalchemy.engine.Engine [raw sql] ()
2023-07-09 21:34:49,493 DEBUG sqlalchemy.engine.Engine Col ('Field', 'Type', 'Null', 'Key', 'Default', 'Extra')
2023-07-09 21:34:49,493 DEBUG sqlalchemy.engine.Engine Row ('id', 'int', 'NO', 'PRI', None, 'auto_increment')
2023-07-09 21:34:49,493 INFO sqlalchemy.engine.Engine COMMIT
2023-07-09 21:34:49,494 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-07-09 21:34:49,495 INFO sqlalchemy.engine.Engine INSERT INTO users (name, fullname, nickname) VALUES (%s, %s, %s)
2023-07-09 21:34:49,495 INFO sqlalchemy.engine.Engine [generated in 0.00012s] [('杰克', '杰克·斯帕罗', '杰克船长'), ('彼得', '彼得·帕克', '蜘蛛侠')]

  • bulk_update_mappings(mapper, mappings):为给定映射的字典执行批量更新。mapping中不是主键的值将用于UPDATE的SET子句中,主键将应用于WHERE子句中。如session.bulk_update_mapping({'id':22,'fullname':'杰克·斯帕罗','nickname':'杰克船长'})

  • close():关闭此会话。这将清除所有项目病结束正在进行的任何事务。若此会话使用了autocommit模式,则立即开启新的事务。

  • commit():提交事务。

  • delete(instance):将实例标记为已删除。在这之后调用commit()方法或flush()方法会从数据库中删除此数据。

  • deleted:包含所有已标记为已删除的对象的集合。

  • dirty:返回数据发生了持久化改变对象集合。所有属性设置或集合修改操作都会将实例标记为dirty,并将其放置在此集合中。

  • execute(clause, params=None, mapper=None, bind=None, **kw):执行SQL语句。参数解释如下。

    • clause:可执行语句。
    • param:绑定参数值。
    • mapper:标记适当的绑定。
    • bind:绑定要操作的引擎。
python
# 举例
result = session.execute(User.__table__.select().where(User.__table__.c.id==5))
result = session.execute('SELECT * FROM user WHERE id = :param', {'param':5})
result = session.execute(text('SELECT * FROM user WHERE id = :param'), {'param':5})

它的第二个参数为可选参数集,可以作为单个字典传递还是作为字典列表传递。

python
from sqlalchemy import insert

result = session.execute(User.__table__.insert(), {'id':6,'name':'someone'})
result = session.execute(insert(User), {'id':7,'name':'someone'})
# 插入多条记录
result  =session.execute(insert(User), [{'id':8,'name':'someone'},{'id':9,'name':'someone'}, {'id':10,'name':'someone'}])
  • expire(instance, attribute_name=None):标记Session中持久性实例的过期属性,下次再访问过期属性时将向会话对象的当前事务上下文发起查询,以便更新给定实例的所有过期属性。
  • expite_all():标记Session对象中所有过期的持久性实例。
  • expunge(instance):移除Session中的指定实例。
  • expunge_all():移除Session对象的全部实例。
  • flush(objects=None):将所有对对象的更改写入数据库,若发生错误则回滚整个事务。
  • rollback():回滚事务。

会话使用需要遵循规范,如及时关闭资源、不要每次执行操作时都新创建一个Session,而是将Session作为一个局部参数,使其尽量具有一个生命周期。SQLAlchemy官网提供了一个比较全面的案例。

python
from contextlib import contextmanager

@contextmanager
def session_scope():
    """Provice a transactional scope around a series of operations."""
    session = Session()
    try:
        yield session
        session.commit
   	except:
        session.rollback()
        raise
    finally:
        sesson.close

def run_my_programm():
    with session.scope() as session:
        ThingOne().go(session)
        ThingTwo().go(session)

Session也不是线程安全的。使用Session应该确保每个事务中的单个操作序列存在一个实例。应该确保Session一次只在一个线程中工作,实现适当的锁定方案。

创造声明性类

将数据表的描述和数据表抽象类映射到数据表这两个操作是同步进行的,在SQLAlchemy中由声明性系统完成。声明性类需要继承自一个特定基类,类中需要有__tablename__属性、定义表中字段等操作。

python
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.orm import declarative_base

Base = declarative_base()  # 创建声明性基类

class User(Base):  # 令声明类继承自声明性基类
    __tablename__ = 'user'
    id = Column(Integer, primary_key=True)
    name = Column(String(5))
    fullname = Column(String(5))
    nickname = Column(String(5))
    
    def __repr__(self):
        return f'<User(name={self.name}, fullname={self.name}, nickmame={self.nickname})'

当声明类时,声明性系统使用了一个Python元类,以便在类生命完成后执行其他活动。元类将根据定制的规范创建一个Table对象,并通过构造一个Mapper对象将其与类关联,Mapper定义了类属性与数据表列的映射关系。

Table对象时较大集合MetaData的成员,MetaData是一个注册表,可以使用metadata属性获取声明性类中的MetaData对象,其中包括向数据库发出一组有限的数据表生成命令的方法即metadata.create_all()方法。接着,用上述定义的声明性数据表对象在数据库中生成对应的数据表,后面继承声明性基类的类统称为声明性类或数据类。

python
engine = create_engine('mysql://root:123456@127.0.0.1:3306/db?charset=utf8', pool_size=20, max_overflow=0, pool_recycle=3600, echo='debug')
Base.metadata.create_all(engine)

定义数据列及类型

在声明性类中除了__table__变量代表表名称,使用Column定义的变量就为列。当执行MetaData.create_all()方法时就会创建这些列。上方的许多例子已经演示了如何使用Column定义列。这里说明一下Column接收的参数及意义。

  • name:数据中name表示此列的名称,该参数可作为第一个位置参数,也可省略,列名将跟随字段名。
  • type:列的类型,该参数可作为第二个位置参数。可选的列类型以及在Python和MySQL中对应的类型如下表。
SQLAlchemy列类型Python类型MySQL类型说明
BIGINTint或longBIGINT极大整数值
BINARYbin()BINARY固定长度二进制字符串
BOOLEANboolBOOLEAN布尔值,MySQL中使用TINYINT代替
CHARstrCHAR固定长度字符串,大小0~225字节
DATEdatetime.datetimeDATE日期和时间,格式为YYYY-MM-DD HH:MM:SS
DECIMALfloatDECIMAL高精度的原始数值
ENUMstrENUM字符串类型。ENUM是一个字符串对象,其值是从允许值的列表中选择的值
FLOATfloatFLOAT单精度浮点数值
INT、INTEGERintINT或INGEGERInteger的别名
JSONJSONJSON从MySQL5.7.8开始,MySQL支持RFC7159标准中的JSON定义数据类型
NUMERICdecimal.DecimalNUMERIC在MySQL中,NUMERIC时限为DECIMAL
PICKLETYPE可序列化对象BLOBBLOB是一个可变对象,可以容纳可变数量的数据
REALfloatREAL不精确数值数据类型
SMALLINTEGERintSMALLINT大整数值
STRINGstrVARVHAR变长字符串
TEXTstrTEXT长文本数据
TIMEdatetime.timeTIME时间值
TIMESTAMPtime.timeTIMESTAMP时间戳
VARBINARYbin()VARBINARY可变长度二进制的字符串
VARCHARstrVARCHAR变长字符串

除此之外,还可对数据表中的字段增加约束,也是通过参数限定。这些参数有autoincrement、default、doc(类似Python的文档属性)、key(用于标识此类对象)、index、nullable、onupdate、primary_key、unique、comment。

增删改查

这里先预定义一个类,代码如下。

python
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.orm import sessionmaker, declarative_base

engine = create_engine('mysql://root:254456@127.0.0.1:3306/db?charset=utf8', pool_size=20, pool_recycle=3600,
                       max_overflow=0, echo='debug')
Base = declarative_base()
Session = sessionmaker(engine)

class User(Base):
    __tablename__ = 'user'
    id = Column(Integer, autoincrement=True, primary_key=True)
    username = Column(String(10), nullable=False)
    password = Column(String(10), nullable=False)

    def __repr__(self):
        return f'<User(username={self.username}, password={self.password})'

def create_tables():
    Base.metadata.create_all(engine)

if __name__ == '__main__':
    create_tables()

添加数据

添加数据时常常用到Session对象的add()方法和add_all()、session.bulk_insert_mappings()、execute执行可执行子句的方法。使用add和add_all两个方法时,Table实例不会向数据库发送指令,只有需要的时候Session才使用一个flush()方法保存更改。下面就是用前面定义的User来添加一条数据。

python
>>> from main import *
>>> session = Session()
>>> user1 = User(username='qi', password='qi')
>>> session.add(user1)
>>> session.new
IdentitySet([<User(username=qi, password=qi)])

add()方法用于添加一个实例,若想添加多个实例则可以使用add_all()方法。

python
>>> user2 = User(username='qi2', password='qi2') 
>>> user3 = User(username='qi3', password='qi3')
>>> # 使用add_all方法添加多个实例
>>> session.add_all([user2, user3])
>>> session.new
IdentitySet([<User(username=qi, password=qi), <User(username=qi2, password=qi2), <User(username=qi3, password=qi3)])
>>> # 使用数据字典的方式批量插入
>>> session.bulk_insert_mappings(User, [{'username': 'qi3', 'password': 'qi3'}, {'username': 'qi4', 'password': 'qi4'}])
2023-07-10 19:56:00,613 ...(日志输出结果)
>>> # 用execute批量插入
>>> session.execute(User.__table__.insert(), [{'username': 'qi5', 'password': 'qi5'}, {'username': 'qi6', 'password': 'qi6'}])
2023-07-10 20:01:56,940 ...(日志输出结果)
>>> session.commit()

更新数据

更新数据采用先更新后查询的方式:单条数据可以先查询后再提交,多条数据可以通过update提交,session.bulk_update_mappings、execute方法可以执行可执行子句方法。

python
>>> # 先查询后更新
>>> user = session.query(User).filter(User.username == 'qi').first()
2023-07-10 21:25:12,532 ...(日志输出结果)
>>> user.username   
'qi'
>>> user.username = 'qi666'
>>> session.dirty  # 查看修改的数据
IdentitySet([<User(username=qi666, password=qi)])
>>> # 以字典方式更新单条数据
>>> session.query(User).filter(User.name == 'qi2').update({User.name: 'qi777', User.password: 'qi777'})
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: type object 'User' has no attribute 'name'
>>> session.query(User).filter(User.username == 'qi2').update({User.username: 'qi777', User.password: 'qi777'}) 
2023-07-10 21:41:39,134 ...(日志输出结果)
>>> # 以数据字典方式批量更新
>>> session.bulk_update_mappings(User, [{'id': 4, 'username': 'qi3', 'password': 'qi888'},{'id': 5, 'username': 'qi999', 'password': 'qi999'}]) 
2023-07-10 21:53:23,947 ...(日志输出结果)

查询及过滤

以下结果已将日志输出省略。

  • 查询的方式是使用query()方法返回一个查询对象,少数情况下直接关联Session实例化Query对象。
python
 >>> # 常用方式
>>> session.query(User)  
<sqlalchemy.orm.query.Query object at 0x0000025F5633F650>
>>> # 不常用方式
>>> from sqlalchemy.orm.query import Query
>>> Query(User, session)
<sqlalchemy.orm.query.Query object at 0x0000025F5630D0D0>
  • 当多个类实体或基于列的实体做query方法的参数时,返回结果为元组,但是这个不是Python内置包的元组,是兼顾了Python元组的特性也具有对象的特性。下文称其为result元组。
python
>>> query = session.query(User.username, User.password)                                   
>>> list(query)
[('qi3', 'qi3'), ('qi4', 'qi4'), ('qi5', 'qi5'), ('qi6', 'qi6'), ('qi', 'qi'), ('qi2', 'qi2'), ('qi3', 'qi3')]
  • 使用label方法将列属性值映射到一个新的属性名。
python
>>> query = session.query(User.username.label('username'))
>>> for item in query:
...     print(item.username)                               
...
qi3
qi4
qi5
qi6
qi
qi2
qi3
  • 使用order_by方法对查询结果进行排序。
python
>>> query = session.query(User.id).order_by(User.id)  # 以id正序排序
>>> list(query)
[(12,), (13,), (14,), (15,), (16,), (17,), (18,)]
>>> query = session.query(User.id).order_by(User.id.desc())  # 以id倒序排序
>>> list(query)
[(18,), (17,), (16,), (15,), (14,), (13,), (12,)]
>>> query = session.query(User.id).order_by('id')  # 以id正序排序>>> from sqlalchemy import desc                 
>>> query = session.query(User.id).order_by(desc('id'))
>>> list(query)
[(18,), (17,), (16,), (15,), (14,), (13,), (12,)]
>>> list(query)
[(12,), (13,), (14,), (15,), (16,), (17,), (18,)]
>>> from sqlalchemy import desc                 
>>> query = session.query(User.id).order_by(desc('id'))  # 以id倒序排序
>>> list(query)
[(18,), (17,), (16,), (15,), (14,), (13,), (12,)]
  • 使用group_by方法对查询结果进行分组。
python
>>> query = session.query(User.username).group_by(User.username)
>>> query.all()
[('qi3',), ('qi4',), ('qi5',), ('qi6',), ('qi',), ('qi2',)]
  • 使用get方法查询指定主键的数据。
python
>>> session.query(User).get(13)          
<User(username=qi4, password=qi4)
>>> session.get(User, 13)                
<User(username=qi4, password=qi4)
  • 使用filter_by方法加关键字参数筛选结果。该方法的特点是不支持运算符,多个参数并列查询,支持多个filter_by方法叠加。
python
>>> query = session.query(User.id, User.username).filter_by(id=12) 
>>> query.all()
[(12, 'qi3')]
>>> # 多个参数并列查询
>>> query = session.query(User.id, User.username).filter_by(id=12, username='qi13')
>>> # filte_by重复调用
>>> query = session.query(User.id, User.username).filter_by(id=12).filter_by(username='qi13')
  • 使用filter方法加SQL表达式语句筛选构造结果。其特点是支持Python运算符。filter方法支持很多运算符,其中>>===(相等)<<=!=等运算符可直接放在函数里作为判断条件,如查询id大于10的数据,表达式为filter(User.id>10);其他运算符功能如下表所示。
运算符含义示例说明
LIKE模糊查询filter(User.name.like('%word'))筛选名字以word结尾的数据
NOT LIKE模糊查询的非filter(User.name.notlike('%word'))筛选名字不以wor的结尾的数据
ILIKE不匹配大小写的模糊查询filter(User.name.ilike('%word'))筛选名字以word结尾的数据,不区分大小写
NOT ILIKE不匹配大小写的模糊查询的非filter(User.name.notilike('%word'))筛选名字不以word结尾的数据,不区分大小写
IN在给定范围内查询filter(User,id.in_([13,14]))筛选id为13和14的数据
NOT IN在给定范围外查询filter(User.id.notin_([13,14]))筛选id不为13和14的数据
IS NULL值为NULLfilter(User.username.is_(None))筛选username值为None的数据
IS NOT NULL值为非NULLfilter(User.username.isnot(None))筛选username为非空的数据
STARTSWITH匹配值以字符串开头filter(User.username.startwith('word'))筛选username值以word开头的数据
ENDSWITH匹配值以字符串结尾filter(User.username.endswith('word'))筛选username值以word结尾的数据
CONTAINS匹配值包含指定字符串filter(User.username.contains('word'))筛选username值包含word的数据
BETWEEN匹配值在指定范围filter(User.id.between(10,14))筛选id值在10和14之间的数据
AND逻辑与filter(and_(User.id==1, User.username == 'qi'))筛选id为1和username为qi的数据
OR逻辑或filter(or_(User.id==10, User.username=='qi'))筛选id为10或username为10的数据
python
>>> from sqlalchemy import or_              
>>> query = session.query(User).filter(or_(User.id == 13, User.username == 'qi6'))
>>> list(query)
[<User(username=qi4, password=qi4), <User(username=qi6, password=qi6)]
  • 多个filter或filter_by语法可以混用,顺序没有标准。
  • 需要注意,query得到的对象是一个Query对象,而不是Python内置的元组或列表。使用Query对象的all()方法可以获取所有结果。使用first()可以获取第一个运算结果。
python
>>> query = session.query(User)
>>> query.all()
[<User(username=qi3, password=qi3), <User(username=qi4, password=qi4), <User(username=qi5, password=qi5), <User(username=qi6, password=qi6), <User(username=qi, password=qi), <User(username=qi2, password=qi2), <User(username=qi3, password=qi3)]
>>> query.first()
<User(username=qi3, password=qi3)
  • 使用one()方法获取唯一一个结果,当结果集没有结果或有多个结果时都将引发错误,只有一个结果才会返回该结果。one_or_None()方法在只有一个结果时返回该结果,无结果时返回None,有多个结果时会引发错误。
python
>>> query = session.query(User)  # 存在多个结果
>>> query.one()
Traceback (most recent call last):
  ...
sqlalchemy.exc.MultipleResultsFound: Multiple rows were found when exactly one was required
>>> query = session.query(User).where(User.id==13)  # 只有一个结果
>>> query.one()
<User(username=qi4, password=qi4)
>>> query = session.query(User).where(User.id==-1)  # 无结果
>>> query.one()
Traceback (most recent call last):
...
sqlalchemy.exc.NoResultFound: No row was found when one was required
  • 使用offset()、limit()、slice()限定查询范围。offset用于设置索引偏移量,limit用于设置查询数量,slice用于设置查询索引及查询量。
python
>>> query = session.query(User)                        
>>> query.all()                 
[<User(username=qi3, password=qi3), <User(username=qi4, password=qi4), <User(username=qi5, password=qi5), <User(username=qi6, password=qi6), <User(username=qi, password=qi), <User(username=qi2, password=qi2), <User(username=qi3, password=qi3)]
>>> query = session.query(User).offset(2)
>>> query.all()
[<User(username=qi5, password=qi5), <User(username=qi6, password=qi6), <User(username=qi, password=qi), <User(username=qi2, password=qi2), <User(username=qi3, password=qi3)]
>>> query = session.query(User).slice(2, 4)  # 返回索引位于2和4之间的数据,不包括4
>>> query.all()
[<User(username=qi5, password=qi5), <User(username=qi6, password=qi6)]
>>> query = session.query(User).limit(3)  # 返回3条数据                           
>>> query.all()
[<User(username=qi3, password=qi3), <User(username=qi4, password=qi4), <User(username=qi5, password=qi5)]
  • 使用count方法返回结果个数,使用func、count方法构造具体的统计字段。
python
>>> query = session.query(User) 
>>> query.count()
7
>>> from sqlalchemy import func
>>> query = session.query(func.count(User.username), User.username).group_by(User.username)  # 对指定字段计数,需要根据该字段先分组
>>> query.all()
[(2, 'qi3'), (1, 'qi4'), (1, 'qi5'), (1, 'qi6'), (1, 'qi'), (1, 'qi2')]

删除数据

delete方法用于删除数据。同时也可以使用Session.execute方法和Query.delete方法删除一个数据对象。Session.delete方法适用于删除多个元素,Session.execute和Query.delete适用于批量删除。

python
>>> session.query(User).all()
[<User(username=qi3, password=qi3), <User(username=qi4, password=qi4), <User(username=qi5, password=qi5), <User(username=qi6, password=qi6), <User(username=qi, password=qi), <User(username=qi2, password=qi2), <User(username=qi3, password=qi3)]
>>> obj = session.query(User).first()    
>>> session.delete(obj)
>>> session.commit()
>>> session.query(User).all() 
[<User(username=qi4, password=qi4), <User(username=qi5, password=qi5), <User(username=qi6, password=qi6), <User(username=qi, password=qi), <User(username=qi2, password=qi2), <User(username=qi3, password=qi3)]
>>> session.query(User).filter_by(username='qi').delete()       
1
>>> session.commit()
>>> session.query(User).all()                                   
[<User(username=qi4, password=qi4), <User(username=qi5, password=qi5), <User(username=qi6, password=qi6), <User(username=qi2, password=qi2), <User(username=qi3, password=qi3)]

书中还有第三种方式,为session.execute(User.__table__.delete(User.__table__.c.name=='qi3')),但是我运行失败了,而这个为命令行运行,没有代码补全,我也就不想再继续研究了。

ORM事务操作

当初始化Session时,session处于无事务状态。当Session接收到需要执行SQL请求,如commit方法、flush方法,请求会传递给引擎,引擎通过query方法、execute方法执行SQL。收到这些请求后,Session配置的引擎会与会话维护的、正在进行的事务相关联。书中有更多关于ORM事务的介绍,这里不介绍了。

Session提供了一个rollback方法用于回滚事务,若当前事务有嵌套事务则回滚嵌套事务。会话中已提交的更改都将被取消,不会提交到数据库中,使用begin(subtransaction=True)方法开启的子事务也会全部关闭。如果使用了begin(nested=True)事务保存点,那么rollback方法会回滚到最近的保存点。

下面将介绍几种SQLAlchemy事务的使用场景和方式。

使用保存点

通过begin_nested方法可以设置任意数量的保存点,但是每个保存点必须有对应的rollback方法或commit方法来发布事务。一般常用begin_nested方法与begin方法,它们都可用于上下文管理器中。

python
# 使用方式一
session = Session()
session.add(u1)
session.add(u2)
session.begin_nested()  # 设置一个保存点
session.add(u3)
session.rollback()  # 将撤销u1对象,保留u1、u2
session.commit()  # 提交u1、u2操作

# 使用方式二
for user in users:
    try:
        with session.begin_nested():  # 隐式提交事务
            session.merge(user)  # 将给定实例的状态复制到Session
    except:
        print(f'跳过{user}')
session.commit()

自动提交模式

该模式比较老,使用Session.begin方法启动一个事务,或者再需要数据库操作时自动开启事务,比如调用session.execute()。使用时需先设置autocommit=True,然后调用session.begin方法开启新事务,最后使用session.commit()方法提交事务或使用session.rollback()方法回滚事务,一个完整的事务就结束了。

python
Session = sessionmaker(bind=engine, autocommit=True)
session = Session()

# 使用try-except
session.begin()  # 标记事务开始
try:
    user1 = session.query(User).get(1)
    user2 = session.query(User).get(2)
    user1.username = '棋'
    user2.username = '棋2'
    session.commit()
except:
    session.rollback()
    raise 

# 使用with子句
with session.begin():
    user1 = session.query(User).get(1)
    user2 = session.query(User).get(2)
    user1.username = '棋'
    user2.username = '棋2'

自动提交子事务

子事务由Session.begin产生,这是一个非事务性的定界结构,该结构允许嵌套调用begin和commit方法,让独立于启动事务的外部代码在事务中执行,也可以在已经划分事务的块内进行。

subtransactions一般只和autocommit使用,达到事务块嵌套的效果,使得任意数量的函数都可以调用Connection.begin和Transaction.commit()。

python
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, create_engine, update
from sqlalchemy.orm import sessionmaker

Base = declarative_base()  # 创建一个声明性基类
engine = create_engine("mysql://root:123456@127.0.0.1:3306/teaching?charset=utf8", pool_size=20, max_overflow=0,
                       pool_recycle=3600, echo="debug")  # 配置引擎
Session = sessionmaker(engine)

class User(Base):
    """
    创建一个声明性数据表类
    """
    __tablename__ = 'users'  # 对应数据库中的表名

    id = Column(Integer, primary_key=True)
    name = Column(String(50))
    fullname = Column(String(50))
    nickname = Column(String(50))

    def __repr__(self):
        """
        自定义类描述
        :return:
        """
        return f"<User(name={self.name}, fullname={self.fullname}, nickname={self.nickname})>"

def a(session):
    session.begin(subtransactions=True)
    try:
        b(session)
        session.commit()
        print("子事务a执行完成")
    except:
        session.rollback()
        raise

def b(session):
    session.begin(subtransactions=True)
    try:
        session.add(User(**{"name": "杰克", "fullname": "杰克·斯帕罗", "nickname": "杰克船长"}))
        session.commit()
        print("子事务b执行完成")
    except:
        session.rollback()
        raise

if __name__ == '__main__':
    # Base.metadata.create_all(engine)  # 在数据库生成相应表
    session = Session(autocommit=True)
    a(session)
    session.close()

为方便开发而创建的常用库指南