若何在 Python 中异步操做数据库?aiomysql、asyncpg、aioredis 利用介绍

3个月前 (11-21 18:48)阅读3回复0
wsygfsj
wsygfsj
  • 管理员
  • 注册排名5
  • 经验值132695
  • 级别管理员
  • 主题26539
  • 回复0
楼主

菜学Python”,抉择“星标”公家号

超等无敌干货,第一时间送达!!!

来源:古明地觉的编程教室

Python 目前已经进化到了 3.8 版本,对操做数据库也供给了响应的异步撑持。当我们做一个 Web 办事时,性能的瓶颈绝大部门都在数据库上,假设一个恳求从数据库中读数据的时候可以主动切换、往处置其它恳求的话,是不是就能进步并发量了呢。

(编者注:原文写于 2020 年 2 月,其时最新为Python 3.8,文章内容如今仍未过时)

下面我们来看看若何利用 Python 异步操做 MySQL、PostgreSQL 以及 Redis,以上几个能够说是最常用的数据库了。至于 SQLServer、Oracle,本人没有找到响应的异步驱动,有兴致能够本身往摸索一下。

而操做数据库无非就是增删改查,下面我们来看看若何异步实现它们。

异步操做 MySQL

异步操做 MySQL 的话,需要利用一个 aiomysql,间接 pip install aiomysql 即可。

aiomysql 底层依靠于 pymysql,所以 aiomysql 并没有零丁实现响应的毗连驱动,而是在 pymysql 之长进行了封拆。

查询笔录

下面先来看看若何查询笔录。

importasyncio

importaiomysql.sa asaio_sa

asyncdefmain:

# 创建一个异步引擎

engine = awaitaio_sa.create_engine(host= "xx.xxx.xx.xxx",

port= 3306,

user= "root",

password= "root",

db= "_hanser",

connect_timeout= 10)

# 通过 engine.acquire 获取一个毗连

asyncwithengine.acquire asconn:

# 异步施行, 返回一个 class 'aiomysql.sa.result.ResultProxy' 对象

result = awaitconn.execute( "SELECT * FROM girl")

# 通过 await result.fetchone 能够获取称心前提的第一条笔录, 一个 class 'aiomysql.sa.result.RowProxy' 对象

data = awaitresult.fetchone

# 能够将 class 'aiomysql.sa.result.RowProxy' 对象想象成一个字典

print(data.keys) # KeysView((1, '古明地觉', 16, '地灵殿'))

print(list(data.keys)) # ['id', 'name', 'age', 'place']

print(data.values) # ValuesView((1, '古明地觉', 16, '地灵殿'))

print(list(data.values)) # [1, '古明地觉', 16, '地灵殿']

print(data.items) # ItemsView((1, '古明地觉', 16, '地灵殿'))

print(list(data.items)) # [('id', 1), ('name', '古明地觉'), ('age', 16), ('place', '地灵殿')]

# 间接转成字典也是能够的

print(dict(data)) # {'id': 1, 'name': '古明地觉', 'age': 16, 'place': '地灵殿'}

# 最初别忘记封闭引擎, 当然你在创建引擎的时候也能够通过 async with aio_sa.create_engine 的体例创建

# async with 语句完毕后会主动施行下面两行代码

engine.close

awaitengine.wait_closed

loop = asyncio.get_event_loop

loop.run_until_complete(main)

loop.close

怎么样,是不是很简单呢,和同步库的操做体例其实是类似的。但是很明显,我们在获取笔录的时候不会只获取一条,而是会获取多条,获取多条的话利用 await result.fetchall 即可。

importasyncio

frompprint importpprint

importaiomysql.sa asaio_sa

asyncdefmain:

# 通过异步上下文治理器的体例创建, 会主动帮我们封闭引擎

asyncwithaio_sa.create_engine(host= "xx.xxx.xx.xxx",

port= 3306,

user= "root",

password= "root",

db= "_hanser",

connect_timeout= 10) asengine:

asyncwithengine.acquire asconn:

result = awaitconn.execute( "SELECT * FROM girl")

# 此时的 data 是一个列表, 列内外面是 class 'aiomysql.sa.result.RowProxy' 对象

data = awaitresult.fetchall

# 将里面的元素转成字典

pprint(list(map(dict, data)))

[{'age': 16, 'id': 1, 'name': '古明地觉', 'place': '地灵殿'},

{'age': 16, 'id': 2, 'name': '雾雨魔理沙', 'place': '魔法丛林'},

{'age': 400, 'id': 3, 'name': '芙兰朵露', 'place': '红魔馆'}]

loop = asyncio.get_event_loop

loop.run_until_complete(main)

loop.close

除了 fetchone、fetchall 之外,还有一个 fetchmany,能够获取指定笔录的条数。

importasyncio

frompprint importpprint

importaiomysql.sa asaio_sa

asyncdefmain:

# 通过异步上下文治理器的体例创建, 会主动帮我们封闭引擎

asyncwithaio_sa.create_engine(host= "xx.xxx.xx.xxx",

port= 3306,

user= "root",

password= "root",

db= "_hanser",

connect_timeout= 10) asengine:

asyncwithengine.acquire asconn:

result = awaitconn.execute( "SELECT * FROM girl")

# 默认是获取一条, 得到的仍然是一个列表

data = awaitresult.fetchmany( 2)

pprint(list(map(dict, data)))

[{'age': 16, 'id': 1, 'name': '古明地觉', 'place': '地灵殿'},

{'age': 16, 'id': 2, 'name': '雾雨魔理沙', 'place': '魔法丛林'}]

loop = asyncio.get_event_loop

loop.run_until_complete(main)

loop.close

以上就是通过 aiomysql 查询数据库中的笔录,没什么难度。但是值得一提的是,await conn.execute 里面除了能够传递一个原生的 SQL 语句之外,我们还能够借助 SQLAlchemy。

importasyncio

frompprint importpprint

importaiomysql.sa asaio_sa

fromsqlalchemy.sql.selectable importSelect

fromsqlalchemy importtext

asyncdefmain:

asyncwithaio_sa.create_engine(host= "xx.xxx.xx.xxx",

port= 3306,

user= "root",

password= "root",

db= "_hanser",

connect_timeout= 10) asengine:

asyncwithengine.acquire asconn:

sql = Select([text( "id, name, place")], whereclause=text( "id != 1"), from_obj=text( "girl"))

result = awaitconn.execute(sql)

data = awaitresult.fetchall

pprint(list(map(dict, data)))

[{'id': 2, 'name': '雾雨魔理沙', 'place': '魔法丛林'},

{'id': 3, 'name': '芙兰朵露', 'place': '红魔馆'}]

loop = asyncio.get_event_loop

loop.run_until_complete(main)

loop.close

添加笔录

然后是添加笔录,我们同样能够借助 SQLAlchemy 搀扶帮助我们拼接 SQL 语句。

importasyncio

frompprint importpprint

importaiomysql.sa asaio_sa

fromsqlalchemy importTable, MetaData, create_engine

asyncdefmain:

asyncwithaio_sa.create_engine(host= "xx.xx.xx.xxx",

port= 3306,

user= "root",

password= "root",

db= "_hanser",

connect_timeout= 10) asengine:

asyncwithengine.acquire asconn:

# 我们还需要创建一个 SQLAlchemy 中的引擎, 然后将表反射出来

s_engine = create_engine( "mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser")

tbl = Table( "girl", MetaData(bind=s_engine), autoload= True)

insert_sql = tbl.insert.values(

[{ "name": "十六夜咲夜", "age": 17, "place": "红魔馆"},

{ "name": "琪露诺", "age": 60, "place": "雾之湖"}])

# 重视: 施行的施行必需开启一个事务, 不然数据是不会进进到数据库中的

asyncwithconn.begin:

# 同样会返回一个 class 'aiomysql.sa.result.ResultProxy' 对象

# 虽然我们插进了多条, 但只会返回最初一条的插进信息

result = awaitconn.execute(insert_sql)

# 返回最初一条笔录的自增 id

print(result.lastrowid)

# 影响的行数

print(result.rowcount)

# 从头查询, 看看笔录能否进进到数据库中

asyncwithengine.acquire asconn:

data = await( awaitconn.execute( "select * from girl")).fetchall

data = list(map(dict, data))

pprint(data)

[{'age': 16, 'id': 1, 'name': '古明地觉', 'place': '地灵殿'},

{'age': 16, 'id': 2, 'name': '雾雨魔理沙', 'place': '魔法丛林'},

{'age': 400, 'id': 3, 'name': '芙兰朵露', 'place': '红魔馆'},

{'age': 17, 'id': 16, 'name': '十六夜咲夜', 'place': '红魔馆'},

{'age': 60, 'id': 17, 'name': '琪露诺', 'place': '雾之湖'}]

loop = asyncio.get_event_loop

loop.run_until_complete(main)

loop.close

仍是很便利的,但是插进多条笔录的话只会返回插进的最初一条笔录的信息,所以假设你期看获取每一条的信息,那么就一条一条插进。

修改笔录

修改笔录和添加笔录是类似的,我们来看一下。

importasyncio

frompprint importpprint

importaiomysql.sa asaio_sa

fromsqlalchemy importTable, MetaData, create_engine, text

asyncdefmain:

asyncwithaio_sa.create_engine(host= "xx.xx.xx.xxx",

port= 3306,

user= "root",

password= "root",

db= "_hanser",

connect_timeout= 10) asengine:

asyncwithengine.acquire asconn:

s_engine = create_engine( "mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser")

tbl = Table( "girl", MetaData(bind=s_engine), autoload= True)

update_sql = tbl.update.where(text( "name = '古明地觉'")).values({ "place": "东方地灵殿"})

# 同样需要开启一个事务

asyncwithconn.begin:

result = awaitconn.execute(update_sql)

print(result.lastrowid) # 0

print(result.rowcount) # 1

# 查询成果

asyncwithengine.acquire asconn:

data = await( awaitconn.execute( "select * from girl where name = '古明地觉'")).fetchall

data = list(map(dict, data))

pprint(data)

[{'age': 16, 'id': 1, 'name': '古明地觉', 'place': '东方地灵殿'}]

loop = asyncio.get_event_loop

loop.run_until_complete(main)

loop.close

能够看到,笔录被胜利的修改了。

删除笔录

删除笔录就更简单了,间接看代码。

importasyncio

importaiomysql.sa asaio_sa

fromsqlalchemy importTable, MetaData, create_engine, text

asyncdefmain:

asyncwithaio_sa.create_engine(host= "xx.xx.xx.xxx",

port= 3306,

user= "root",

password= "root",

db= "_hanser",

connect_timeout= 10) asengine:

asyncwithengine.acquire asconn:

s_engine = create_engine( "mysql+pymysql://root:root@xx.xx.xx.xxx:3306/_hanser")

tbl = Table( "girl", MetaData(bind=s_engine), autoload= True)

update_sql = tbl.delete # 全数删除

# 同样需要开启一个事务

asyncwithconn.begin:

result = awaitconn.execute(update_sql)

# 返回最初一条笔录的自增 id, 我们之前修改了 id = 0 笔录, 所以它跑到最初了

print(result.lastrowid) # 0

# 受影响的行数

print(result.rowcount) # 6

loop = asyncio.get_event_loop

loop.run_until_complete(main)

loop.close

此时数据库中的笔录已经全数被删除了。

整体来看仍是比力简单的,而且撑持的功用也比力全面。

异步操做 PostgreSQL

异步操做 PostgreSQL 的话,我们有两个抉择,一个是 asyncpg 库,另一个是 aiopg 库。

asyncpg 是本身实现了一套毗连驱动,而 aiopg 则是对 psycopg2 停止了封拆,小我更选举 asyncpg,性能和活泼度都比 aiopg 要好。

下面来看看若何利用 asyncpg,起首是安拆,间接 pip install asyncpg 即可。

查询笔录

起首是查询笔录。

importasyncio

frompprint importpprint

importasyncpg

asyncdefmain:

# 创建毗连数据库的驱动

conn = awaitasyncpg.connect(host= "localhost",

port= 5432,

user= "postgres",

password= "zgghyys123",

database= "postgres",

timeout= 10)

# 除了上面的体例,还能够利用类似于 SQLAlchemy 的体例创建

# await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")

# 挪用 await conn.fetchrow 施行 select 语句,获取称心前提的单条笔录

# 挪用 await conn.fetch 施行 select 语句,获取称心前提的全数笔录

row1 = awaitconn.fetchrow( "select * from girl")

row2 = awaitconn.fetch( "select * from girl")

# 返回的是一个 Record 对象,那个 Record 对象等于将返回的笔录停止了一个封拆

# 至于怎么用后面会说

print(row1) # Record id=1 name='古明地觉' age=16 place='地灵殿'

pprint(row2)

[Record id=1 name='古明地觉' age=16 place='地灵殿',

Record id=2 name='椎名实白' age=16 place='樱花庄',

Record id=3 name='古明地恋' age=15 place='地灵殿']

# 封闭毗连

awaitconn.close

loop = asyncio.get_event_loop

loop.run_until_complete(main)

loop.close

以上我们演示了若何利用 asyncpg 来获取数据库中的笔录,我们看到施行 select 语句的话,我们能够利用 conn.fetchrow(query) 来获取称心前提的单条笔录,conn.fetch(query) 来获取称心前提的所有笔录。

Record 对象

我们说利用 conn.fetchone 查询得到的是一个 Record 对象,利用 conn.fetch 查询得到的是多个 Record 对象构成的列表,那么那个 Rcord 对象怎么用呢?

importasyncio

importasyncpg

asyncdefmain:

conn = awaitasyncpg.connect( "postgres://postgres:zgghyys123@localhost:5432/postgres")

row = awaitconn.fetchrow( "select * from girl")

print(type(row)) # class 'asyncpg.Record'

print(row) # Record id=1 name='古明地觉' age=16 place='地灵殿'

# 那个 Record 对象能够想象成一个字典

# 我们能够将返回的字段名做为 key, 通过字典的体例停止获取

print(row[ "id"], row[ "name"]) # 1 古明地觉

# 除此之外,还能够通过 get 获取,获取不到的时候会返回默认值

print(row.get( "id"), row.get( "name")) # 1 古明地觉

print(row.get( "xxx"), row.get( "xxx", "不存在的字段")) # None 不存在的字段

# 除此之外还能够挪用 keys、values、items,那个不消我说,都应该晓得意味着什么

# 只不外返回的是一个迭代器

print(row.keys) # tuple_iterator object at 0x000001D6FFDAE610

print(row.values) # tuple_iterator object at 0x000001D6FFDAE610

print(row.items) # RecordItemsIterator object at 0x000001D6FFDF20C0

# 我们需要转成列表或者元组

print(list(row.keys)) # ['id', 'name', 'age', 'place']

print(list(row.values)) # [1, '古明地觉', 16, '地灵殿']

print(dict(row.items)) # {'id': 1, 'name': '古明地觉', 'age': 16, 'place': '地灵殿'}

print(dict(row)) # {'id': 1, 'name': '古明地觉', 'age': 16, 'place': '地灵殿'}

# 封闭毗连

awaitconn.close

if__name__ == '__main__':

asyncio.run(main)

当然我们也能够借助 SQLAlchemy 帮我们拼接 SQL 语句。

importasyncio

frompprint importpprint

importasyncpg

fromsqlalchemy.sql.selectable importSelect

fromsqlalchemy importtext

asyncdefmain:

conn = awaitasyncpg.connect( "postgres://postgres:zgghyys123@localhost:5432/postgres")

sql = Select([text( "id, name, place")], whereclause=text( "id != 1"), from_obj=text( "girl"))

# 我们不克不及间接传递一个 Select 对象, 而是需要将其转成原生的字符串才能够

rows = awaitconn.fetch(str(sql))

pprint(list(map(dict, rows)))

[{'id': 2, 'name': '椎名实白', 'place': '樱花庄'},

{'id': 3, 'name': '古明地恋', 'place': '地灵殿'}]

# 封闭毗连

awaitconn.close

if__name__ == '__main__':

asyncio.run(main)

此外,conn.fetch 里面还撑持占位符,利用百分号加数字的体例,举个例子:

importasyncio

frompprint importpprint

importasyncpg

asyncdefmain:

conn = awaitasyncpg.connect( "postgres://postgres:zgghyys123@localhost:5432/postgres")

rows = awaitconn.fetch( "select * from girl where id != $1", 1)

pprint(list(map(dict, rows)))

[{'age': 16, 'id': 2, 'name': '椎名实白', 'place': '樱花庄'},

{'age': 15, 'id': 3, 'name': '古明地恋', 'place': '地灵殿'}]

# 封闭毗连

awaitconn.close

if__name__ == '__main__':

asyncio.run(main)

添加笔录

然后是添加笔录,我们看看若何往库里面添加数据。

importasyncio

frompprint importpprint

importasyncpg

fromsqlalchemy.sql.selectable importSelect

fromsqlalchemy importtext

asyncdefmain:

conn = awaitasyncpg.connect( "postgres://postgres:zgghyys123@localhost:5432/postgres")

# 施行 insert 语句我们能够利用 execute

row = awaitconn.execute( "insert into girl(name, age, place) values ($1, $2, $3)",

'十六夜咲夜', 17, '红魔馆')

pprint(row) # INSERT 0 1

pprint(type(row)) # class 'str'

awaitconn.close

if__name__ == '__main__':

asyncio.run(main)

通过 execute 能够插进单条笔录,同时返回相关信息,但是说实话那个信息没什么太大用。除了 execute 之外,还有 executemany,用来施行多条插进语句。

importasyncio

importasyncpg

asyncdefmain:

conn = awaitasyncpg.connect( "postgres://postgres:zgghyys123@localhost:5432/postgres")

# executemany:第一条参数是一个模板,第二条号令是包罗多个元组的列表

# 施行多条笔录的话,返回的成果为 None

rows = awaitconn.executemany( "insert into girl(name, age, place) values ($1, $2, $3)",

[( '十六夜咲夜', 17, '红魔馆'), ( '琪露诺', 60, '雾之湖')])

print(rows) # None

# 封闭毗连

awaitconn.close

if__name__ == '__main__':

asyncio.run(main)

重视:假设是施行大量 insert 语句的 话,那么 executemany 要比 execute 快良多,但是 executemany 不具备事务功。

进门: 最全的零根底学Python的问题 | 零根底学了8个月的Python |实战项目 | 学Python就是那条捷径

干货:爬取豆瓣短评,片子《后来的我们》 | 38年NBA更佳球员阐发 |从万寡等待到口碑扑街!唐探3令人失看 | 笑看新倚天屠龙记 | 灯谜答题王 | 用Python做个海量蜜斯姐素描图 | 碟中谍那么火,我用机器进修做个迷你选举系统片子

兴趣:弹球游戏 | 九宫格 | 标致的花 | 两百行Python《天天酷跑》游戏!

AI:会做诗的机器人 | 给图片上色 | 揣测收进 | 碟中谍那么火,我用机器进修做个迷你选举系统片子

小东西: Pdf转Word,轻松搞定表格和水印! | 一键把html网页保留为pdf! |再见PDF提取收费! | 用90行代码打造最强PDF转换器,word、PPT、excel、markdown、html一键转换 | 造造一款钉钉低价机票提醒器! |60行代码做了一个语音壁纸切换器天天看蜜斯姐! |

年度爆款案牍

1). 卧槽!Pdf转Word用Python轻松搞定 !

2).学Python实香!我用100行代码做了个网站,帮人PS游览图片,赚个鸡腿食

3).首播过亿,火爆全网,我阐发了《披荆斩棘的姐姐》,发现了那些奥秘

4). 80行代码!用Python做一个哆来A梦分身

5).你必需掌握的20个python代码,短小精悍,用途无限

6). 30个Python奇淫身手集

7). 我总结的80页《菜鸟学Python精选干货.pdf》,都是干货

8). 再见Python!我要学Go了!2500字深度阐发 !

9).发现一个舔狗福利!那个Python爬虫神器太爽了,主动下载妹子图片

0
回帖

若何在 Python 中异步操做数据库?aiomysql、asyncpg、aioredis 利用介绍 期待您的回复!

取消