menu Chancel's blog
rss_feed
Chancel's blog
有善始者实繁,能克终者盖寡。

Python3库之requests与aiohttp的使用

作者:Chancel Yang, 创建:2021-03-30, 字数:16668, 已阅:1411, 最后更新:2024-03-10

1. 背景

Python作为优秀的脚本语言,其出色的网络库Requests是非常惊艳的

唯一的不足之处是它无法实现异步请求,只能实现同步请求

如果要实现异步请求,则需要用到aiohttp/httpx

在了解异步请求框架aiohttp之前,要先确定具体的业务场景是否需要异步请求

像爬虫更多的挑战是如何使用代理IP,如何安全合理合法的爬取非版权数据

相较于同步请求,异步请求一不小心容易形成对网站高频的网络攻击,爬虫一定要避免法律风险以及在爬虫方允许的范围内爬取数据

像测试场景就非常适合使用aiohttp来模拟并发测试

2. requests

在了解aiohttp库之前,我们先来看看Python3下有哪些优秀的HTTP库,分别是

  • requests
  • httpx
  • aiohttp

其中requests是适合刚入门Python的初学者使用,是一个极易上手的同步请求HTTP工具库

HTTPX则是后起之秀,支持同步与异步两种语法,而aiohttp则相对只支持异步请求

在异步请求效率方面,httpx与aiohttp的差距不明显

PS:有兴趣也可以自行写两个简易demo验证一下httpx与aiohttp的异步请求性能差距,httpx不在本文的讨论范围内

2.1. 安装

reqeusts非Python3自带库,使用pip安装如下

Bash
pip3 install requests

2.2. Get与Post使用

GET请求

以请求我的网站留言列表Json数据为例

Python
>>> import requests
>>> response = requests.get('https://www.chancel.me/messages?ownType=2&ownID=0',timeout=10)
>>> response.status_code
200
>>> response.json
{'data': {'hasNext': False, 'hasPrev': False, 'items': [{'create_time': 'Mon, 14 Jan 2019 17:34:42 GMT', 'id': 8, 'm_author': '浮光', 'm_content': '<blockquote><p>理解得越多就越痛苦。知道得越多就越撕裂。但他有着同痛苦相对称的清澈,与绝望相 均衡的坚韧。</p>\n<p>-- 勒内.夏尔</p>\n</blockquote>\n', 'm_email': 'ycs1026@vip.qq.com', 'm_environ': [], 'm_gravatar': 'https://www.gravatar.com/avatar/d45071b54cf3339bd3d16bb35f750f35?d=https%3A%2F%2Fwww.chancel.me%2Fstatic%2Fimg%2Fgravatar.jpg&s=64', 'm_own_id': 0, 'm_parent_id': None, 'm_site_url': None, 'm_type': 2, 'sub_messages': [], 't_message_type': {'id': 2, 'm_type': 'book'}}], 'page': 1, 'pages': 1, 'perPage': 100, 'total': 1}, 'message': '留言获取成功', 'success': True}
>>> response.json['message']
'获取成功'

Get请求使用相对简单,requests针对返回的Json封装也非常方便

POST请求

以切换我的博客主题为例,我将网站首次访问分配的Cookies手动放入headers中的cookies字段中

Python
headers = {'Cookie': 'IDTAG=3e56264d-8fa8-11eb-9636-0050560000a0'}
>>> request_data = {"theme":{"appClass":"mdui-color-white","bodyClass":"mdui-theme-layout-dark","containerClass":"","footerClass":""}}
>>> response = requests.post('https://www.chancel.me/idtag',headers=headers,json=request_data)
>>> response.status_code
200
>>> response.text
'{"data":{},"message":"存储成功","success":true}\n'

Post请求通常需要提交数据,对于json数据(Content-Type:application/json)类型来说参数是Json

如果是表单请求,则可以使用data参数,如下

Python
>>> response = requests.post('https://www.chancel.me/idtag',headers=headers,data=request_data)

2.3. session使用

实际场景下,请求往往是连续的且带有Cookies的,每次POST提交都手工维护Cookies是一件麻烦的差事

如反复请求更换博客网站主题时,需要携带访问博客的cookies信息,可以在代码中手动添加了headers里的cookie信息

显然每次都这样做不太方便,我们可以使用requests携带的session来解决这个问题

以切换我的博客主题为例

Python
>>> requests_session = requests.session()
>>> requests_session.get('https://www.chancel.me')
<Response [200]>
>>> requests_session.cookies
<RequestsCookieJar[Cookie(version=0, name='IDTAG', value='2155a6c4-8fa9-11eb-afcb-0050560000a0', port=None, port_specified=False, domain='www.chancel.me', domain_specified=False, domain_initial_dot=False, path='/', path_specified=True, secure=True, expires=1648460213, discard=False, comment=None, comment_url=None, rest={'HttpOnly': None}, rfc2109=False)]>
>>> request = requests_session.post('https://www.chancel.me/idtag',json=request_data)
>>> request.status_code
200
>>> request.text
'{"data":{},"message":"存储成功","success":true}\n'

session可以有效的存储当前所有请求的cookies,跟踪Cookies的变化,针对一些复杂场景下的连续请求是非常好用的封装

而且在网站登录方面会非常的方便,只要执行了登录操作,session对象会自动保存当前cookies以便进行其他请求操作

我们也可以将session的cookies属性序列化到本地,等到下一次启动时再次使用,避免重复登录获取Cookies的问题

写入本地Cookies

Python
>>> with open('/tmp/chancel.ltd-cookies', 'w') as f:
...     f.write(json.dumps(requests_session.cookies.get_dict()))
...

查看本地保存的cookies信息

Bash
# chancel @ home-ubuntu in ~ [17:42:22]
$ cat /tmp/chancel.ltd-cookies
{"IDTAG": "2155a6c4-8fa9-11eb-afcb-0050560000a0"}%

读取本地保存的Cookies信息

Python
with open('/tmp/chancel.ltd-cookies','r') as f:
    requests_session.cookies.update(json.loads(f.read()))

以上的使用已经足够大部分请求场景所需要的语法,更多关于request的高级用法(例如文件上传、SSL证书忽略、网络代理风等)可以参考官方文档

Requests: HTTP for Humans™

3. aiohttp

aiohttp是一个非常优秀的异步HTTP库,但相对的,上手难度高于requests

相信有编程经验的你对异步并不陌生,这里可以简单解释下同步与异步的区别

  • 同步请求:发起请求后,直到请求返回或者异常才执行下一步代码
  • 异步请求:发起请求后,可以继续执行下一步代码

由异步的原理,我们可以准备大量的请求来观察两者的执行区别

3.1. 安装

安装aiohttp也可以使用pip

Bash
pip3 install aiohttp asyncio

3.2. Get和Post的使用

Get请求

以官网的例子为例稍微改造,还是获取博客的留言列表Json数据,Get请求使用异步方法是标准的asyncio库方法

Python
import aiohttp
import asyncio

async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://www.chancel.me/messages?ownType=2&ownID=0') as response:
            print('Response status -> %d' % response.status)

            response_json = await response.json()
            print('Response data -> %s' % response_json)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# output
Response status -> 200
Response data -> {'data': {'hasNext': False, 'hasPrev': False, 'items': [{'create_time': 'Mon, 14 Jan 2019 17:34:42 GMT', 'id': 8, 'm_author': '浮光', 'm_content': '<blockquote><p>理解得越多就越痛苦。知道得越多就越撕裂。但他有着同痛苦相对称的清澈,与绝望相均衡的坚韧。</p>\n<p>-- 勒内.夏尔</p>\n</blockquote>\n', 'm_email': 'ycs1026@vip.qq.com', 'm_environ': [], 'm_gravatar': 'https://cdn.v2ex.com/gravatar/d45071b54cf3339bd3d16bb35f750f35?d=https%3A%2F%2Fwww.chancel.me%2Fstatic%2Fimg%2Fgravatar.jpg&s=64', 'm_own_id': 0, 'm_parent_id': None, 'm_site_url': None, 'm_type': 2, 'sub_messages': [], 't_message_type': {'id': 2, 'm_type': 'book'}}], 'page': 1, 'pages': 1, 'perPage': 100, 'total': 1}, 'message': '留言获取成功', 'success': True}

POST请求

还是以切换我的博客主题为例

Python
import aiohttp
import asyncio


headers = {'Cookie': 'IDTAG=f4fad1e1-911d-11eb-bbcb-0050560000a0'}
request_data = {"theme":{"appClass":"mdui-color-white","bodyClass":"mdui-theme-layout-dark","containerClass":"","footerClass":""}}

async def main():
    async with aiohttp.ClientSession() as session:
        async with session.post('https://www.chancel.me/idtag',headers=headers,json=request_data) as response:
            print(response.status)

            response_json = await response.json()
            print(response_json)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# output
Response status -> 200
Response data -> {'data': {}, 'message': '存储成功', 'success': True}

aiohttp的普通请求并不复杂,如果不清楚await/async语法,可以参考 Coroutines and Tasks - doc.python.org

3.3. Session的使用

aiohttp的cookies使用比较复杂,也支持导出为文件,但文件格式为二进制,也可以像Requests的session一致导出为Json文件,但需要使用unsafe标志,详细参考 Cookie Jar - docs.aiohttp.org

Python
import aiohttp
import asyncio


request_data = {"theme":{"appClass":"mdui-color-white","bodyClass":"mdui-theme-layout-dark","containerClass":"","footerClass":""}}

async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get('https://www.chancel.me'):
            async with session.post('https://www.chancel.me/idtag',json=request_data) as response:

                session.cookie_jar.save('www.chancel.me.cookies')
                # session.cookie_jar.load(file_path) 可以读回cookies,大部分情况无需理会cookie内容

                print('Response status -> %d' % response.status)

                response_json = await response.json()
                print('Response data -> %s' % response_json)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

if __name__ == '__main__':
    main()

4. 速度对比

与Requests对比,aiohttp最大的不同之处在于异步请求,以请求10次博客主题切换为例子,执行下面的代码需要消耗15秒左右,且从返回可以看出来是顺序执行每一次请求的

Python
import requests
import time

index_url = 'https://www.chancel.me'
post_url = 'https://www.chancel.me/idtag'
post_data = {"theme": {"appClass": "mdui-color-white", "bodyClass": "mdui-theme-layout-dark", "containerClass": "", "footerClass": ""}}


def requests_post(count: int):
    requests_session = requests.session()
    requests_session.get(index_url)
    response = requests_session.post(post_url, json=post_data)
    if response.ok:
        print('第%d次Post请求更换主题成功,返回结果Response数据->%s' % (count,response.json()))

if __name__ == '__main__':
    try_count = 10
    post_count = 1
    start_time = time.time()
    while post_count < try_count + 1:
        requests_post(count=post_count)
        post_count += 1
    stopwatch = time.time() - start_time
    print('Requests使用POST请求%d次共用时%d秒' % (try_count, stopwatch))

# output
第1次Post请求更换主题成功返回结果Response数据->{'data': {}, 'message': '存储成功', 'success': True}
第2次Post请求更换主题成功返回结果Response数据->{'data': {}, 'message': '存储成功', 'success': True}
...
第7次Post请求更换主题成功返回结果Response数据->{'data': {}, 'message': '存储成功', 'success': True}
第8次Post请求更换主题成功返回结果Response数据->{'data': {}, 'message': '存储成功', 'success': True}
第9次Post请求更换主题成功返回结果Response数据->{'data': {}, 'message': '存储成功', 'success': True}
第10次Post请求更换主题成功返回结果Response数据->{'data': {}, 'message': '存储成功', 'success': True}
Requests使用POST请求10次共用时15秒

而同样的请求逻辑,aiohttp只需1秒就可以完成,且可以看到执行请求是没有顺序的

Python
import aiohttp
import asyncio
import time

index_url = 'https://www.chancel.me'
post_url = 'https://www.chancel.me/idtag'
post_data = {"theme": {"appClass": "mdui-color-white", "bodyClass": "mdui-theme-layout-dark", "containerClass": "", "footerClass": ""}}


async def aiohttp_post(count: int):
    async with aiohttp.ClientSession() as session:
        async with session.get(index_url):
            async with session.post(post_url, json=post_data) as response:
                response_json = await response.json()
                print('第%d次Post请求更换主题成功,返回结果Response数据-> %s' % (count, response_json))


if __name__ == '__main__':
    try_count = 10
    post_count = 1
    start_time = time.time()
    tasks = []
    while post_count < try_count + 1:
        tasks.append(aiohttp_post(count=post_count))
        post_count += 1
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    stopwatch = time.time() - start_time
    print('Requests使用POST请求%d次共用时%d秒' % (try_count, stopwatch))

# output
第2次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
第4次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
第10次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
第6次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
第1次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
第8次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
第9次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
第7次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
第5次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
第3次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
Requests使用POST请求10次共用时1秒

在需要短时间获取大量数据的业务场景下,aiohttp是更合适的选择

5. aiohttp的并发控制与超时设置

代码开发中网络请求最常见的就是超时设置,在异步库中,还需要控制QPS(并发请求),同步库因为是顺序执行的不存在并发限制

Warn:大部分网站对于单机并发请求都是有限制的,挂网络代理牢饭警告,合理利用技术,避免法律风险

5.1. 并发限制

以上面的例子为例,限制aiohttp请求切换博客主题的频率代码如下

Python
import aiohttp
import asyncio
import time

index_url = 'https://www.chancel.me'
post_url = 'https://www.chancel.me/idtag'
post_data = {"theme": {"appClass": "mdui-color-white", "bodyClass": "mdui-theme-layout-dark", "containerClass": "", "footerClass": ""}}

connector = aiohttp.TCPConnector(limit=1)


async def aiohttp_post(session: aiohttp.ClientSession, count: int):
    async with session.get(index_url):
        async with session.post(post_url, json=post_data) as response:
            response_json = await response.json()
            print('第%d次Post请求更换主题成功,返回结果Response数据-> %s' % (count, response_json))


if __name__ == '__main__':
    try_count = 10
    post_count = 1
    start_time = time.time()
    tasks = []
    client_session = aiohttp.ClientSession(connector=connector)
    while post_count < try_count + 1:
        tasks.append(aiohttp_post(session=client_session, count=post_count))
        post_count += 1
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    stopwatch = time.time() - start_time
    print('Requests使用POST请求%d次共用时%d秒' % (try_count, stopwatch))

# output
第3次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
第7次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
第10次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
第4次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
第8次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
第6次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
第5次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
第1次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
第9次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
第2次Post请求更换主题成功返回结果Response数据-> {'data': {}, 'message': '存储成功', 'success': True}
Requests使用POST请求10次共用时5秒

这里限制了每次只允许单个请求连接,但速度依然比requests要快得多,但并不是说异步请求一定比同步请求快,需要结合场景具体讨论

5.2. 超时限制

超时限制在Requests中十分简单,如

Python
import requests

response = requests.get('https://www.chancel.me',timeout=10)

所以在aiohttp也是这么简单吗?可以试试看

Python
import aiohttp
import asyncio
import time

session = aiohttp.ClientSession()


async def aiohttp_post(session: aiohttp.ClientSession, count: int):
    async with session.get('https://www.chancel.me',timeout=10) as response:
        print('第%d次请求返回状态码%d' % (count, response.status))


if __name__ == '__main__':
    try_count = 100
    post_count = 1
    start_time = time.time()
    tasks = []
    connector = aiohttp.TCPConnector(limit=1)
    client_session = aiohttp.ClientSession(connector=connector)
    while post_count < try_count + 1:
        tasks.append(aiohttp_post(session=client_session, count=post_count))
        post_count += 1
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    stopwatch = time.time() - start_time
    print('请求首页%d次共耗时%d秒' % (try_count, stopwatch))

# output
第55次请求返回状态码200
第56次请求返回状态码200
第57次请求返回状态码200
第58次请求返回状态码200
第59次请求返回状态码200
第60次请求返回状态码200
第61次请求返回状态码200
第62次请求返回状态码200
第32次请求返回状态码200
Task exception was never retrieved
future: <Task finished name='Task-68' coro=<aiohttp_post() done, defined at /mnt/sda/Codes/dev/test_code/demo.py:8> exception=TimeoutError()>
Traceback (most recent call last):
  File "/mnt/sda/Codes/dev/test_code/demo.py", line 9, in aiohttp_post
    async with session.get('https://www.chancel.me',timeout=5) as response:
  File "/mnt/sda/Codes/dev/test_code/.venv/lib/python3.9/site-packages/aiohttp/client.py", line 1117, in __aenter__
    self._resp = await self._coro
  File "/mnt/sda/Codes/dev/test_code/.venv/lib/python3.9/site-packages/aiohttp/client.py", line 619, in _request
    break
  File "/mnt/sda/Codes/dev/test_code/.venv/lib/python3.9/site-packages/aiohttp/helpers.py", line 656, in __exit__
    raise asyncio.TimeoutError from None
asyncio.exceptions.TimeoutError
Task exception was never retrieved
请求首页100次共耗时10秒

如果你尝试这么做,会发现无论请求100次还是10000次博客首页,都会在10秒内返回

抛出了10秒超时异常的原因是 session.get('https://www.chancel.me',timeout=10) 中的timeout是指整个session的超时时间,看起来非常别扭

结合官方的例子,可以设置整个异步请求的总超时时间,与上面的代码其实是一致的,只是看起来不那么别扭了

Python
import aiohttp
import asyncio
import time

session = aiohttp.ClientSession()


async def aiohttp_post(session: aiohttp.ClientSession, count: int):
    async with session.get('https://www.chancel.me') as response:
        print('第%d次请求返回状态码%d' % (count, response.status))


if __name__ == '__main__':
    try_count = 100
    post_count = 1
    start_time = time.time()
    tasks = []
    connector = aiohttp.TCPConnector(limit=1)
    timeout = aiohttp.ClientTimeout(total=60 * 5, connect=None, sock_connect=10, sock_read=None)
    client_session = aiohttp.ClientSession(connector=connector,timeout=timeout)
    while post_count < try_count + 1:
        tasks.append(aiohttp_post(session=client_session, count=post_count))
        post_count += 1
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    stopwatch = time.time() - start_time
    print('请求首页%d次共耗时%d秒' % (try_count, stopwatch))

# output
第26次请求返回状态码200
第88次请求返回状态码200
第27次请求返回状态码200
...
第25次请求返回状态码200
第87次请求返回状态码200
请求首页100次共耗时17秒

完整100次请求耗时17秒左右,与requests成绩相似,timeout对象有4个属性

  • total:整个session的超时时间(单位秒,类型float)
  • connect:连接池获得连接等待秒数,即等待分配请求资源的超时时间(单位秒,类型float)
  • sock_connect:连接到对方服务器的超时时间,即传统的请求超时时间(单位秒,类型float)
  • sock_read:读取资源(通常是读取返回的data)的超时时间

6. 最后

实践使用下来,关于aiohttp的很多用法都跟requests不一样,异步的调用在不少API上跟传统的多线程方法是完全不一样的思路,有疑问最好的方法还是看文档

资料参考

  • https://docs.aiohttp.org/en/stable/
  • https://docs.python.org/3/library/asyncio-task.html

[[replyMessage== null?"发表评论":"发表评论 @ " + replyMessage.m_author]]

account_circle
email
web_asset
textsms

评论列表([[messageResponse.total]])

还没有可以显示的留言...
[[messageItem.m_author]] [[messageItem.m_author]]
[[messageItem.create_time]]
[[getEnviron(messageItem.m_environ)]]
[[subMessage.m_author]] [[subMessage.m_author]] @ [[subMessage.parent_message.m_author]] [[subMessage.parent_message.m_author]]
[[subMessage.create_time]]
[[getEnviron(messageItem.m_environ)]]