SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据库API执行SQL并获取执行结果。
成都创新互联公司一直通过网站建设和网站营销帮助企业获得更多客户资源。 以"深度挖掘,量身打造,注重实效"的一站式服务,以成都网站设计、网站建设、移动互联产品、全网营销推广服务为核心业务。10年网站制作的经验,使用新网站建设技术,全新开发出的标准网站,不但价格便宜而且实用、灵活,特别适合中小公司网站制作。网站管理系统简单易用,维护方便,您可以完全操作网站资料,是中小公司快速网站建设的选择。
安装:
pip3 install SQLAlchemy
版本检查:
import sqlalchemy sqlalchemy.__version__
不同数据库配置:
根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作:
格式:'数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名'
MySQL-Python mysql+mysqldb://: @ [: ]/ pymysql mysql+pymysql:// : @ [: ]/ [? ] MySQL-Connector mysql+mysqlconnector:// : @ [: ]/ cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
连接数据库:
from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:a13896321z@127.0.0.1/school",encoding='utf-8',echo=True)
声明映射:
declaractive用来表示类与表的关系。
声明基类,将类映射到数据表,自定义数据表类必须继承这个基类。
from sqlalchemy.ext.declaractive import declaractive_base Base = declaractive_base()
创建表结构类:
一个表结构类必须包含一个__tablename__和primary_key的字段
字段的类型:
SmallInteger Integer BigInteger Float Numeric(precision=None, scale=None, decimal_return_scale=None, asdecimal=True) Boolean Enum Date DateTime Time Interval(native=True, second_precision=None, day_precision=None) LargeBinary(length=None) MatchType(create_constraint=True, name=None, _create_events=True) PickleType(protocol=4, pickler=None, comparator=None) SchemaType(name=None, schema=None, metadata=None, inherit_schema=False, quote=None, _create_events=True) String(length=None, collation=None, convert_unicode=False, unicode_error=None, _warn_on_bytestring=False) Text(length=None, collation=None, convert_unicode=False, unicode_error=None, _warn_on_bytestring=False) Unicode(length=None, **kwargs) # 常用的类型: 类型名 python类型 说明 Integer int 普通整数,32位 Float float 浮点数 String str 变长字符串 Text str 变长字符串,对较长字符串做了优化 Boolean bool 布尔值 PickleType 任何python对象 自动使用Pickle序列化
字段的选项:
Column( nullable=True, # 可以为空 autoincrement=True, # 数值自增 default='red' # 指定默认值 primary_key = True # 设为主键 ForeignKey = 'table_name.id' # 指定外键,ForeignKey需要导入 unique = True # 设定此字段键值唯一,不允许出现重复值。 )
建立sqlalchemy连接:
from sqlalchemy import create_engine engine = create_engine()
声明表结构:
class School(Base): __tablename__ = "school" # 这个才是表名 id = Column(Integer, primary_key = True) sch_name = Column(String(32)) sch_addr = Column(String(255)) sch_tel = Column(Integer) def __repr__(self): # 查询的时候显示的是值,需不是一个内存地址。 return "school name:{},tel:{}".format(self.sch_name,self.sch_tel)
动态添加表字段。
def add_filed(table_name,): for i in range(3): setattr(table_class,'Col'+str(i),(Column('Col'+str(i), String(50),comment='Col'+str(i)))) Base.metadata.create_all(engine)
创建表:
Base.metadata.create_all(engine) # 删除表 Base.metadata.drop_all(engine)
增加一条记录:
from sqlalchemy.orm import sessionmaker Session = sessionmaker(bind=engine) # 创建一个与数据库的会话,生成的对象是类 session = Session() # 实例化这个会话类 class AddRecord(object): # 交互添加记录类。 def __init__(self,*args): # self.table_name = table_name self.school_feild = {'sch_name':'学校名:','sch_addr':'学校地址:','sch_tel':'学校电话:'} def school(self): f = {} for k,v in self.school.items(): f[k] = input('{}'.format(v)).strip() return f # 返回一个字典 add = AddRecord() school_attr = add.school() # 获取输入的值 # 下面a1是创建一条记录 a1 = School(sch_name = school_attr[sch_name],sch_addr= school_attr[sch_addr],sch_tel = school_attr[sch_tel]) session.add(a1) # 把记录添加到数据库 session.commit() # 提交到数据库,最终对数据库更改。
表的参数__table_args__:
__table_args__:参数需要导入。
from sqlalchemy import UniqueConstraint, PrimaryKeyConstraint, Index
参数名 | 参数 | 作用 | 示例 |
UniqueConstraint('field1','feild2',..., name='') | field1~n:要联合唯一的字段名 name:设置联合的字段名 | 把多个字段建立一个联合唯一的限制 | __table_args__=( UniqueConstraint('ip','port',name='ip_port'), ) |
Index('field1','feild2',...,) | field1~n:字段名 | 把指定字段建立索引 | __table_args__=( Index('ip','port'), ) |
PrimeryKeyConstraint('field1','feild2',...,) | field1~n:字段名 | 设置多字段主键 | __table_args__=( PrimeryKeyConstraint('ip','port'), ) |
...更多参数待补充 |
增、查、改、删:
这四个操作都需要导入sessionmaker ,用sessionmaker创建一个会话,除了建库外,所有操作都在会话中完成。
from sqlalchemy.orm import sessionmaker
Session = sessionmaker()
session =Session()
增:
add(记录) | 记录:由表类生成的实例 | 增加一条记录 | d1=School(name='DL',addr='GJZ') session.add(d1) |
add_all(记录列表) | 记录列表:由表类生成的实例列表 | 同时增加多条记录 | d1=School(name='DL',addr='GJZ') d2=School(name='BJ',addr='HLG') session.add([d1,d2]) |
查:
这三个操作都需要用到query
query | 参数 | 作用 | |
query(table) | table:表名。
| 查询表中所有数据 以表结构__repr__中定义的格式显示 | session.query(table).all() |
query(table.field1,table.field2,……) | table:表名。 field:字段名,可多个。 | 显示指定字段,不以表结构__repr__格式输出 | session.query(table.id,table.name).all() |
结果显示的方法: | |||
all() | 显示所有结果,结果是一个列表 | session.query(table).all() | |
first() | 只显示第一个结果 | session.query(table).first() | |
one() | 如果没有获得结果或者返回了多个结果,则会产生一个 error 结果是一个实例类。 | a=session.query(table).filter(table.id==6).one() a.id#就可以查看id的值。 | |
scalar() | 干啥用的? 感觉和one一样。。。 | ||
count() | 计数 | session.query(table).count() | |
[m:n] | 切片,读取指定的结果和列表切片一样 | session.query(table)[:3] | |
offset(2) | 从第3条数据开始读 | session.query(table).offset(2).all() | |
limit(3) | 只显示前三条 | session.query(table).limit(3).all() | |
order_by | |||
desc | 降序 查询结果降序显示 | session.query(t).order_by(t.id.desc()).filter(t.id>3).all() | |
asc | 升序 查询结果升序显示 | session.query(t).order_by(t.id.asc()).filter(t.id>3).all() | |
query给表和字段重命名 | |||
lable() | 给字段重命名 调用时可以调用lable内的名字。 | a=session.query(table.filed.lable('other')).all() for i in a: print (i.other) | |
aliased | 给表重命名,需要导入 | from sqlalchemy.orm import aliased table_a = aliased(table) | |
query使用text执行SQL语句 text需要导入 |
|
from sqlalchemy import text | |
text('SQL') | SQL:SQL语句或SQL表达式或表的字段 | ||
在filter()方法中使用 | 指定过滤条件 | session.query(t).filter(text('id >2')).all() | |
在order_by()方法中使用 | 指定排序的字段 | session.query(t).order_by(text('id desc')).all() | |
在from_statement()方法中使用 | 运行完整的SQL语句 | session.query(t).from_statement(text( 'select * from talbe where id>2' )) | |
params(变量1=值1,变量2=值2...) | TEXT中的变量前面必须加冒号。 例:text('id = id') | 给text中的变量指定值。 | session.query(t).filter(text('id>:id')).params(id=2).all() 相当于text('id>2') |
query.filter 条件查询 支持所有SQL条件表达式(where部分) | |||
filter(表名.字段 == 值) | 表名.字段:固定格式,不能省略表名。 ==:条件表达式,==,>=,<=,>,<,!= | 根据条件查询数据 | session.query(table).filter(table.id <2).all() |
filter(表名.字段.关键字(值) ) | 表名.字段:固定格式,不能省略表名。 关键字:in_,like,notin_,notlike,between,contains,is_,notis 值:可以是列表,元组,字符串,数字 字符串:可以用通配符%,例:"%a%",单字符通配 _ | 1在列表里 2没在列表里 3匹配字符串 4不匹配字符串 5匹配字符串,不区分大小写 6不匹配字符串,不区分大小写 7在2和6之间,含2和6 8包含字符a 9field是真 10field是不是真 11匹配索引,匹配表报错 | 1-filter(table.field.in_([1,2,'1'])) 2-filter(table.field.notin_([1,2,'1'])) 3-filter(table.field.like('%a%')) 4-filter(table.field.notlike('%a%')) 5-filter(table.field.ilike('%a%')) 6-filter(table.field.notilike('%a%')) 7-filter(table.field.between(2,6)) 8-filter(table.field.contains('a')) 9-filter(table.field.is_(True)) 10-filter(table.field.notis(True)) 11-filter(table.field.match('dage')) |
fileter多条件查询,or_,and_ | |||
filter(逻辑运算符(条件表达式1,条件表达式2,......)) | 逻辑运算符:and_,or_ | 需要从sqlalchemy导入 from sqlalchmy import and_,or_ | filter(and_(table.id >1,table.id<6)) filter(or_(table.id>1,table.name=='dage')) |
func写在query方法里,可以和字段一起显示。 | |||
func.参数 | avg:求平均值 count:统计数量 sum:求和 max:最大 min:最小 | 对分组和字段进行简单的数学运算。 如果不符合逻辑会提示错误“Inaggregated query without GROUP BY” | session.query(table_b.name ,func.count(table_b.age), func.min(table_b.age)).group_by(table_b.name).all() 这一句是对name分组,显示name,显示每个name的个数,和每组最小的age |
改:
有两种方法: | |||
query.fileter(条件).update(值) | 值:字典格式。{表名.字段:值} | 更新所有filter筛选的记录的指定字段 | session.query(table_b).filter(table_b.name == 'erge').update({table_b.score : 60}) |
实例.字段=值 | 先创建一个查询结果的实例 再通过实例改字段的值 提交更改 | sql=seesion.query(table_b).filter(table_b.id ==3).first() sql.name = 'new name' session.commit() | |
实例.字段=值 批量更新 | sql=seesion.query(table_b).filter(table_b.age ==20).all() for i in range(len(sql)): sql(i).score= 100 session.commit() | ||
update(值,synchronize_session=False) | synchronize_session=False立即提交,更新的时候速度更快,同时批量更新时不加此参数报错 | session.query(table_b).filter(table_b.name == 'erge').update({table_b.score : 60},synchronize_session =False) | |
user = User(id=1, name='通过主键改内容') session.merge(user) | merge的作用是合并,查找primary key是否一致,一致则合并,不一致则新建 |
删:
delete() | qurey后面加limit.all,first,等方法时, 不能直接加delete() 需要使用for删除。 | sql1 = session.query(table_b).filter(and_(table_b.name == 'dage', table_b.age == 19)).delete() 或者: sql1 = session.query(table_b).filter(and_(table_b.name == 'dage', table_b.age == 19).first() sql1.delete() | |
in_查询的结果,删除会报错 | 错误提示: sqlalchemy.exc.InvalidRequestError: Could not evaluate current criteria in Python. Specify 'fetch' or False for the synchronize_session parameter | 解决方法: 在delete中,添加synchronize_session=False,含义是同步删除。 delete(synchronize_session=False) | sql1 = session.query(table_b).filter(table_b.name.in_('dage', 'erge')).delete(synchronize_session=False) |
删除指定条数 | 删除d开头的前三条记录 | sql1 = session.query(table_b).filter(table_b.name.like('d%')).limit(3).all() for i in sql1: session.delete(i) | |
一个基本表的创建与操作:
from sqlalchemy import create_engine, ForeignKey, Table, Column, String, Integer, Boolean, Enum
from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy.ext.declarative import declarative_base mysql_name = 'david' mysql_pw = 'Yaotiao&shunv666' mysql_server = '192.168.2.120' database_name = 'test' # 1、连接数据库 engine = create_engine("mysql+pymysql://{name}:{pw}@{server}/{database}". format(mysql_name,mysql_pw,mysql_server,database_name), encoding ='utf-8' ) # 2、生成ORM基类 Base = declarative_base() # 3、继承基类,定义表结构 class Product(Base): # 创建orm对象 __tablename__ = 'products' # 数据表名 id = Column(Integer, primary_key = True) # 字段,设为主键,默认不用赋值,此字段会自增 name = Column(String(32)) price = Column(Integer) def __repr__(self): # 打印查询结果实例显示结果,不加此项显示的是类的内存地址 return "name:{},price:{}".format(self.name,self.price) # 4、创建数据表 Base.metadata.create_all(engine) # 在数据库中创建表,已存在则不创建 # 5、添加多条数据 product1 = Product(name = '华为100', price = 999) # 创建两个实例 product2 = Product(name = '华为400', price = 1999) session.add_all([product1,product2 ]) # add_all把实例列表添加到数据库 # 6、添加单条数据 product = Product(name = '一条', price = 9999) session.add(product) # 7、查询数据 query_data = session.query(Product).all() # 查所有,如果不加__repr__,结果是内存地址列表,使用for查看 for i in query_data: # 查看结果 print (i.id,i.name,i.price) # 8、修改一项数据 query_data = session.query(Product).filter(Product.name=='一条').first() query_data.name = 'yitiao' # 只改name一个数据 # 9、批量更新:把所有华为开头的产品价格改成10000 session.query(Product).filter(Product.name.like('华为%')).update({Product.price:10000},synchronize_session =False) # 10、通过主键改内容merge: update_name = Product(id=3,price=3651) session.merge(update_name) # id 和 name不变,只修改了价格price = 3651 # 11、删除一条数据:把华为开头的价格最高的一条数据删掉 del_product = session.query(Product).fileter(Product.price.like('华为%'))).order_by(Product.price.desc()).first() session.delete(del_product) # 12、删除所有符合条件的数据: session.query(Product).filter(Product.name.like('华为%')).delete(synchronize_session = False)
问题集:https://blog.51cto.com/yishi/2335554