sqlalchemy介绍和快速使用,创建操作数据表,scoped_session线程安全,基本增删查改,一对多,多对多,flask-sqlalchemy使用和flask-migrate使用
2022/8/13 2:22:55
本文主要是介绍sqlalchemy介绍和快速使用,创建操作数据表,scoped_session线程安全,基本增删查改,一对多,多对多,flask-sqlalchemy使用和flask-migrate使用,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
1 sqlalchemy介绍和快速使用
# django 的orm框架,对象关系映射,只能在djagno中用 # sqlalchemy:独立的orm框架,轻松的集成到任意项目中去,SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果 # djagno 的orm,sqlalchemy,peewee。。。 目前异步的orm框架,没有特别好的 # 安装:pip3 install sqlalchemy # 组成部分(了解) Engine,框架的引擎 Connection Pooling ,数据库连接池 Dialect,选择连接数据库的DB API种类 Schema/Types,架构和类型 SQL Exprression Language,SQL表达式语言 # SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件 pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] cx_Oracle oracle+cx_oracle://user:[email protected]:port/dbname[?key=value&key=value...] 更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
1.1 原生操作的快速使用
# 原生操作,写原生sql,用的比较少 import time import threading import sqlalchemy from sqlalchemy import create_engine from sqlalchemy.engine.base import Engine engine = create_engine( "mysql+pymysql://root:[email protected]:3306/luffy_api", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) def task(arg): # 从连接池中拿一个链接 conn = engine.raw_connection() cursor = conn.cursor() cursor.execute( "select * from luffy_banner" ) result = cursor.fetchall() print(result) cursor.close() conn.close() for i in range(20): t = threading.Thread(target=task, args=(i,)) t.start()
2 创建操作数据表
import datetime from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() class Users(Base): id = Column(Integer, primary_key=True) # id 主键 name = Column(String(32), index=True, nullable=False) # name列,索引,不可为空 email = Column(String(32), unique=True) #datetime.datetime.now不能加括号,加了括号,以后永远是当前时间 ctime = Column(DateTime, default=datetime.datetime.now) extra = Column(Text, nullable=True) __tablename__ = 'users' # 数据库表名称 __table_args__ = ( UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一 Index('ix_id_name', 'name', 'email'), #索引 ) # orm不能创建数据库,手动创建 #sqlalchemy只能创建表和删除表,不能新增,删除字段 # 创建表,同步到数据库 def init_db(): """ 根据类创建数据库表 """ engine = create_engine( "mysql+pymysql://root:[email protected]:3306/aaa", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) # 创建被Base管理的所有表 Base.metadata.create_all(engine) def drop_db(): """ 根据类删除数据库表 """ engine = create_engine( "mysql+pymysql://root:[email protected]:3306/aaa?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Base.metadata.drop_all(engine) if __name__ == '__main__': # init_db() drop_db() # sqlalchemy 只能创建表,删除表,不能增加删除字段 (在flask中,可以使用第三方支持)
3 scoped_session线程安全
# 1 新增数据 from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users from sqlalchemy.orm import scoped_session from threading import Thread # 第一步:创建engine engine = create_engine( "mysql+pymysql://root:[email protected]:3306/aaa?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) # 第二步:生成session,每次执行数据库操作时,都需要创建一个Session Session = sessionmaker(bind=engine) # 原来的时候 # session = Session() # 变成了它,它就是线程安全,session对象在flask框架中就可以是全局的,任意线程使用的是就是这个全局session,但是不会出现数据错乱问题 # 本质内部使用了local对象,保证了,每个线程实际上使用的是当前线程自己的session session = scoped_session(Session) # scoped_session类的对象,正常来讲是没有 add, close,commit...方法和属性的,但是实际上有,是通过create_proxy_methods装饰器,设置进去的(通过反射setattr写进去的) def task(i): # 第三步:插入数据 user = Users(name='lqz%s'%i, email='33%[email protected]'%i, extra='lqz is handsome') session.add(user) # 第四步:提交 session.commit() # 第五步:关闭链接 session.close() for i in range(10): t = Thread(target=task,args=[i,]) t.start()
4 基本增删查改
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Users from sqlalchemy.sql import text engine = create_engine( "mysql+pymysql://root:[email protected]:3306/aaa?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Session = sessionmaker(bind=engine) session = Session() # 1 增加数据一条 # session.add(对象) # 2 同时增加多条 # session.add_all([Users(name='lqz123', email='[email protected]', extra='asfasdf'),Users(name='pyy', email='[email protected]', extra='asdfeedd')]) # 3 删除 ---》查出来删 # res = session.query(Users).filter(Users.id > 17).delete() # res = session.query(Users).filter(Users.name == 'lqz').delete() # print(res) # 4 修改---->查出来改 # res=session.query(Users).filter(Users.id > 10).update({"name" : "lqz"}) # 类似于django的F查询 # session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False) # session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate") # 5 查询 ----》查询特别多 # 5.1 查所有 # res = session.query(Users).all() # 5.2 指定查询的字段select name as xx,age from users; # res = session.query(Users.name.label('xx'), Users.email) # 5.3 通过filter 过滤 # filter写条件 > < == # res=session.query(Users).filter(Users.id > 0).all() # res=session.query(Users).filter(Users.name == 'lqz2099').all() # 5.4 filter_by 过滤,filter_by表达式 # res = session.query(Users).filter_by(name='lqz',email='[email protected]').all() # res = session.query(Users).filter_by(name='lqz').first() # 5.5 自定制where部分查询sql # res = session.query(Users).filter(text("id>:value or name=:name")).params(value=16, name='lqz099').all() # res = session.query(Users).filter(Users.id<13 , Users.name=='lqz099').all() # 5.6 纯自定义sql---》django中支持这个 # res = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='lqz').all() # 5.7 filter和filter_by的其他使用 # 条件 # res = session.query(Users).filter_by(name='lqz').all() # 表达式,and条件连接 # ret = session.query(Users).filter(Users.id > 1, Users.name == 'lqz').all() # between # res = session.query(Users).filter(Users.id.between(7, 9), Users.name == 'lqz').all() # res = session.query(Users).filter(Users.id.between(7, 9)).all() # res = session.query(Users).filter(Users.id.between(7, 9)) # 原生sql # res = session.query(Users).filter(Users.id.between(7, 9), Users.name == 'lqz') # 原生sql # in # res = session.query(Users).filter(Users.id.in_([7,8,9])).all() # ~非,除。。外 # res = session.query(Users).filter(~Users.id.in_([1, 3, 4])).all() # 二次筛选 # res = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='lqz099'))).all() # 5.8 与 或 非 # or_包裹的都是or条件,and_包裹的都是and条件 from sqlalchemy import and_, or_ # res = session.query(Users).filter(and_(Users.id > 3, Users.name == 'lqz099')).all() # res = session.query(Users).filter(or_(Users.id < 2, Users.name == 'lqz099')).all() # res = session.query(Users).filter( # or_( # Users.id < 2, # and_(Users.name == 'lqz099', Users.id > 3), # Users.extra != "" # )) # 5.9 通配符,以e开头,不以e开头 # res = session.query(Users).filter(Users.name.like('lqz%')).all() # res = session.query(Users).filter(~Users.name.like('%lqz%')).all() # 5.10 限制,用于分页,区间 # res = session.query(Users)[1:2] # 每页显示5条,第3页的数据 # res = session.query(Users)[3*5:3*5+5] # 5.11 排序,根据name降序排列(从大到小) # res = session.query(Users).order_by(Users.name.desc()).all() # 第一个条件重复后,再按第二个条件升序排 # ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 5.12 分组 # 分组 from sqlalchemy.sql import func # select * from user group by name; # res = session.query(Users).group_by(Users.name).all() # 分组之后取最大id,id之和,最小id # select max(id),sum(id),min(id) from user group by name; # res = session.query( # func.max(Users.id), # func.sum(Users.id), # func.min(Users.id)).group_by(Users.name).all() # haviing筛选 # select max(id),sum(id),min(id) from user where email like 33% group by name having min(id) > 2; res = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).filter(Users.email.like('33%')).group_by(Users.name).having(func.min(Users.id) > 2).all() ### 5.13 链表操作 # select * from users,favor where user.id= favor.nid; # 先笛卡尔积再过滤 # ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() #join表,默认是inner join # select * from person inner join favor on person.favor_id=favor.id; # ret = session.query(Person).join(Favor).all() #isouter=True 外连,表示Person left join Favor,没有右连接,反过来即可 # select * from person left join favor on person.favor_id=favor.id; # ret = session.query(Person).join(Favor, isouter=True).all() # 右链接 # ret = session.query(Favor).join(Person, isouter=True).all() # 自己指定on条件(连表条件),第二个参数,支持on多个条件,用and_,同上 # select * from person left join favor on person.id=favor.id; # ret = session.query(Person).join(Favor,Person.id==Favor.id, isouter=True).all() ## 5.14 union和union all的区别? # 组合(了解)UNION 操作符用于合并两个或多个 SELECT 语句的结果集 #union和union all的区别? # q1 = session.query(Users.name).filter(Users.id > 2) # q2 = session.query(Favor.caption).filter(Favor.nid < 2) # ret = q1.union(q2).all() # # q1 = session.query(Users.name).filter(Users.id > 2) # q2 = session.query(Favor.caption).filter(Favor.nid < 2) # ret = q1.union_all(q2).all() print(res) # 第四步:提交 session.commit() # 第五步:关闭链接 session.close()
5 一对多
###一对多关系 class Hobby(Base): __tablename__ = 'hobby' id = Column(Integer, primary_key=True) caption = Column(String(50), default='篮球') class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) # hobby指的是tablename而不是类名,uselist=False hobby_id = Column(Integer, ForeignKey("hobby.id")) # 跟数据库无关,不会新增字段,只用于快速链表操作 # 基于对象的跨表查询: # 类名,backref用于反向查询 hobby = relationship('Hobby', backref='pers') def __repr__(self): return self.name
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Person, Hobby from sqlalchemy.sql import text engine = create_engine( "mysql+pymysql://root:[email protected]:3306/aaa?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Session = sessionmaker(bind=engine) session = Session() # 1 一对多增加 # 方式一: # session.add(Hobby(caption='足球')) # session.add(Person(name='lqz',hobby_id=1)) # session.add(Person(name='pyy',hobby_id=2)) # 有约束,增加失败 # 方式二: # session.add(Person(name='刘亦菲', hobby=Hobby(caption='橄榄球'))) # 2 一对多查询 # 2.1 基于对象的跨表查询 ### 正向查询 # res = session.query(Person).filter(Person.name == '刘亦菲').first() # print(res) # print(res.hobby.caption) ## 反向查询 # res=session.query(Hobby).filter_by(caption='足球').first() # print(res) # # 取出喜欢足球的所有人---》反向查 # print(res.pers) # 2.2 基于连表的跨表查询 # select * from person,hobby where person.hobby_id=hobby.id and person.name=彭于晏; # res=session.query(Person,Hobby).filter(Person.hobby_id==Hobby.id,Person.name=='彭于晏').all() #select * from person inner join hobby on person.hobby_id=hobby.id where person.name=彭于晏; res = session.query(Person).join(Hobby).filter(Person.name=='彭于晏').all() print(res) session.commit() session.close()
6 多对多
#多对多 class Boy2Girl(Base): __tablename__ = 'boy2girl' id = Column(Integer, primary_key=True, autoincrement=True) girl_id = Column(Integer, ForeignKey('girl.id')) boy_id = Column(Integer, ForeignKey('boy.id')) class Girl(Base): __tablename__ = 'girl' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) class Boy(Base): __tablename__ = 'boy' id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(64), unique=True, nullable=False) # 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以 #方便快速查询,写了这个字段,相当于django 的manytomany,快速使用基于对象的跨表查询 girls = relationship('Girl', secondary='boy2girl', backref='boys')
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine from models import Boy,Girl,Boy2Girl from sqlalchemy.sql import text engine = create_engine( "mysql+pymysql://root:[email protected]:3306/aaa?charset=utf8", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Session = sessionmaker(bind=engine) session = Session() # 1 多对多增加 # 方式一:所有表都有,一个个增加 # session.add(Boy(name='dxl')) # session.add(Girl(name='zqh')) # session.add_all([Boy(name='dxl'),Girl(name='zqh')]) # session.add_all([Boy2Girl(boy_id=1,girl_id=1),Boy2Girl(boy_id=1,girl_id=2)]) # 方式二: # session.add(Boy(name='张涛',girls=[Girl(name='芙蓉姐姐'),Girl(name='凤姐')])) # 2 一对多查询 # 2.1 基于对象的跨表查询 ### 正向查询 # res=session.query(Boy).filter_by(name='张涛').first() # print(res.girls) ## 反向查询 # res=session.query(Girl).filter_by(name='凤姐').first() # print(res.boys) # 2.2 基于连表的跨表查询 # print(res) session.commit() session.close()
7 flask-sqlalchemy使用和flask-migrate使用
# flask-sqlalchemy:帮助我们快速把sqlalchemy集成到flask中 # 表字段增加删除不支持:flask-migrate,以后增加了字段,删除字段只需要两条迁移命令就完成了 ### flask—sqlalchemy # 第一步:导入SQLAlchemy,实例化得到对象db # 第二步:注册 db.init_app(app) # 第三步:创建表模型,继承db.Model # 第四步:使用session db.session 处理了线程安全 ### 把表同步到数据库中---》flask-migrate ## 基于flask_script,定制几个命令 python manage.py db init/ migrate/upgrade # 第一个命令: python38 manage.py db init # 初始化,项目使用的时候,只敲一次,生成migrations文件夹 # 第二个命令:只要改了表结构,增加表,删除表,增加字段,删除字段 python38 manage.py db migrate # 不会在数据库中生成记录,只是记录变化等同于makemigrations # 第三个命令 python38 manage.py db upgrade # 等同于migrate
8 请求上下文分析
def wsgi_app(self, environ, start_response): # 读完了:ctx:是RequestContext的对象,内部有当次请求的request对象,session对象,app对象,flash对象 ctx = self.request_context(environ) error = None try: try: ctx.push() response = self.full_dispatch_request() except Exception as e: error = e response = self.handle_exception(e) except: # noqa: B001 error = sys.exc_info()[1] raise return response(environ, start_response) finally: if self.should_ignore_error(error): error = None ctx.auto_pop(error)
请求上下文执行流程(ctx): -0 flask项目一启动,有6个全局变量 -_request_ctx_stack:LocalStack对象 -_app_ctx_stack :LocalStack对象 -request : LocalProxy对象 -session : LocalProxy对象 -1 请求来了 app.__call__()---->内部执行:self.wsgi_app(environ, start_response) -2 wsgi_app() -2.1 执行:ctx = self.request_context(environ):返回一个RequestContext对象,并且封装了request(当次请求的request对象),session,flash,当前app对象 -2.2 执行: ctx.push():RequestContext对象的push方法 -2.2.1 push方法中中间位置有:_request_ctx_stack.push(self),self是ctx对象 -2.2.2 去_request_ctx_stack对象的类中找push方法(LocalStack中找push方法) -2.2.3 push方法源码: def push(self, obj): #通过反射找self._local,在init实例化的时候生成的:self._local = Local() #Local(),flask封装的支持线程和协程的local对象 # 一开始取不到stack,返回None rv = getattr(self._local, "stack", None) if rv is None: #走到这,self._local.stack=[],rv=self._local.stack self._local.stack = rv = [] # 把ctx放到了列表中 #self._local={'线程id1':{'stack':[ctx,]},'线程id2':{'stack':[ctx,]},'线程id3':{'stack':[ctx,]}} rv.append(obj) return rv -3 如果在视图函数中使用request对象,比如:print(request) -3.1 会调用request对象的__str__方法,request类是:LocalProxy -3.2 LocalProxy中的__str__方法:lambda x: str(x._get_current_object()) -3.2.1 内部执行self._get_current_object() -3.2.2 _get_current_object()方法的源码如下: def _get_current_object(self): if not hasattr(self.__local, "__release_local__"): #self.__local() 在init的时候,实例化的,在init中:object.__setattr__(self, "_LocalProxy__local", local) # 用了隐藏属性 #self.__local 实例化该类的时候传入的local(偏函数的内存地址:partial(_lookup_req_object, "request")) #加括号返回,就会执行偏函数,也就是执行_lookup_req_object,不需要传参数了 #这个地方的返回值就是request对象(当此请求的request,没有乱) return self.__local() try: return getattr(self.__local, self.__name__) except AttributeError: raise RuntimeError("no object bound to %s" % self.__name__) -3.2.3 _lookup_req_object函数源码如下: def _lookup_req_object(name): #name是'request'字符串 #top方法是把第二步中放入的ctx取出来,因为都在一个线程内,当前取到的就是当次请求的ctx对象 top = _request_ctx_stack.top if top is None: raise RuntimeError(_request_ctx_err_msg) #通过反射,去ctx中把request对象返回 return getattr(top, name) -3.2.4 所以:print(request) 实质上是在打印当此请求的request对象的__str__ -4 如果在视图函数中使用request对象,比如:print(request.method):实质上是取到当次请求的reuquest对象的method属性 -5 最终,请求结束执行: ctx.auto_pop(error),把ctx移除掉 其他的东西: -session: -请求来了opensession -ctx.push()---->也就是RequestContext类的push方法的最后的地方: if self.session is None: #self是ctx,ctx中有个app就是flask对象, self.app.session_interface也就是它:SecureCookieSessionInterface() session_interface = self.app.session_interface self.session = session_interface.open_session(self.app, self.request) if self.session is None: #经过上面还是None的话,生成了个空session self.session = session_interface.make_null_session(self.app) -请求走了savesession -response = self.full_dispatch_request() 方法内部:执行了before_first_request,before_request,视图函数,after_request,savesession -self.full_dispatch_request()---->执行:self.finalize_request(rv)-----》self.process_response(response)----》最后:self.session_interface.save_session(self, ctx.session, response) -请求扩展相关 before_first_request,before_request,after_request依次执行 -flask有一个请求上下文,一个应用上下文 -ctx: -是:RequestContext对象:封装了request和session -调用了:_request_ctx_stack.push(self)就是把:ctx放到了那个位置 -app_ctx: -是:AppContext(self) 对象:封装了当前的app和g -调用 _app_ctx_stack.push(self) 就是把:app_ctx放到了那个位置 -g是个什么鬼? 专门用来存储用户信息的g对象,g的全称的为global g对象在一次请求中的所有的代码的地方,都是可以使用的 -代理模式 -request和session就是代理对象,用的就是代理模式
这篇关于sqlalchemy介绍和快速使用,创建操作数据表,scoped_session线程安全,基本增删查改,一对多,多对多,flask-sqlalchemy使用和flask-migrate使用的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23增量更新怎么做?-icode9专业技术文章分享
- 2024-11-23压缩包加密方案有哪些?-icode9专业技术文章分享
- 2024-11-23用shell怎么写一个开机时自动同步远程仓库的代码?-icode9专业技术文章分享
- 2024-11-23webman可以同步自己的仓库吗?-icode9专业技术文章分享
- 2024-11-23在 Webman 中怎么判断是否有某命令进程正在运行?-icode9专业技术文章分享
- 2024-11-23如何重置new Swiper?-icode9专业技术文章分享
- 2024-11-23oss直传有什么好处?-icode9专业技术文章分享
- 2024-11-23如何将oss直传封装成一个组件在其他页面调用时都可以使用?-icode9专业技术文章分享
- 2024-11-23怎么使用laravel 11在代码里获取路由列表?-icode9专业技术文章分享
- 2024-11-22怎么实现ansible playbook 备份代码中命名包含时间戳功能?-icode9专业技术文章分享