協程(Coroutine),也可以被稱為微線程,是一種用戶態內的上下文切換技術。簡而言之,其實就是通過一個線程實現代碼塊相互切換執行。例如:
def func1():print(1)...print(2)def func2():print(3)...print(4)func1()
func2()
上述代碼是普通的函數定義和執行,按流程分別執行兩個函數中的代碼,并先后會輸出:1、2、3、4
。但如果介入協程技術那么就可以實現函數見代碼切換執行,最終輸入:1、3、2、4
。
在Python中有多種方式可以實現協程,例如:
異步攜程方式,greentlet是一個第三方模塊,需要提前安裝 pip3 install greenlet
才能使用。
from greenlet import greenletdef func1():print(1) # 第1步:輸出 1gr2.switch() # 第3步:切換到 func2 函數print(2) # 第6步:輸出 2gr2.switch() # 第7步:切換到 func2 函數,從上一次執行的位置繼續向后執行def func2():print(3) # 第4步:輸出 3gr1.switch() # 第5步:切換到 func1 函數,從上一次執行的位置繼續向后執行print(4) # 第8步:輸出 4gr1 = greenlet(func1)
gr2 = greenlet(func2)
gr1.switch() # 第1步:去執行 func1 函數
注意:switch中也可以傳遞參數用于在切換執行時相互傳遞值。
基于Python的生成器的yield和yield form關鍵字實現協程代碼。
def func1():yield 1yield from func2()yield 2def func2():yield 3yield 4f1 = func1()
for item in f1:print(item)
注意:yield form關鍵字是在Python3.3中引入的。
在Python3.4之前官方未提供協程的類庫,一般大家都是使用greenlet等其他來實現。在Python3.4發布后官方正式支持協程,即:asyncio模塊。
import asyncio@asyncio.coroutine
def func1():print(1)yield from asyncio.sleep(2) # 遇到IO耗時操作,自動化切換到tasks中的其他任務print(2)@asyncio.coroutine
def func2():print(3)yield from asyncio.sleep(2) # 遇到IO耗時操作,自動化切換到tasks中的其他任務print(4)tasks = [asyncio.ensure_future( func1() ),asyncio.ensure_future( func2() )
]loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
注意:基于asyncio模塊實現的協程比之前的要更厲害,因為他的內部還集成了遇到IO耗時操作自動切花的功能。
協程的異步轉同步。async & awit 關鍵字在Python3.5版本中正式引入,基于他編寫的協程代碼其實就是 上一示例 的加強版,讓代碼可以更加簡便。
Python3.8之后 @asyncio.coroutine
裝飾器就會被移除,推薦使用async & awit 關鍵字實現協程代碼。
import asyncioasync def func1():print(1)await asyncio.sleep(2)print(2)async def func2():print(3)await asyncio.sleep(2)print(4)tasks = [asyncio.ensure_future(func1()),asyncio.ensure_future(func2())
]loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
關于協程有多種實現方式,目前主流使用是Python官方推薦的asyncio模塊和async&await關鍵字的方式,例如:在tonado、sanic、fastapi、django3 中均已支持。
接下來,我們也會針對 asyncio模塊
+ async & await
關鍵字進行更加詳細的講解。
通過學習,我們已經了解到協程可以通過一個線程在多個上下文中進行來回切換執行。
但是,協程來回切換執行的意義何在呢?(網上看到很多文章舔協程,協程牛逼之處是哪里呢?)
計算型的操作,利用協程來回切換執行,沒有任何意義,來回切換并保存狀態 反倒會降低性能。
IO型的操作,利用協程在IO等待時間就去切換執行其他任務,當IO操作結束后再自動回調,那么就會大大節省資源并提供性能,從而實現異步編程(不等待任務結束就可以去執行其他代碼)。
異步asyncio?例如:用代碼實現下載 url_list
中的圖片。
方式一:同步編程實現
"""
下載圖片使用第三方模塊requests,請提前安裝:pip3 install requests
"""
import requestsdef download_image(url):print("開始下載:",url)# 發送網絡請求,下載圖片response = requests.get(url)print("下載完成")# 圖片保存到本地文件file_name = url.rsplit('_')[-1]with open(file_name, mode='wb') as file_object:file_object.write(response.content)if __name__ == '__main__':url_list = ['https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg','https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg','https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg']for item in url_list:download_image(item)
方式二:基于協程的異步編程實現
"""
下載圖片使用第三方模塊aiohttp,請提前安裝:pip3 install aiohttp
"""
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import aiohttp
import asyncioasync def fetch(session, url):print("發送請求:", url)async with session.get(url, verify_ssl=False) as response:content = await response.content.read()file_name = url.rsplit('_')[-1]with open(file_name, mode='wb') as file_object:file_object.write(content)async def main():async with aiohttp.ClientSession() as session:url_list = ['https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg','https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg','https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg']tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]await asyncio.wait(tasks)if __name__ == '__main__':asyncio.run(main())
上述兩種的執行對比之后會發現,基于協程的異步編程
要比 同步編程
的效率高了很多。因為:
協程一般應用在有IO操作的程序中,因為協程可以利用IO等待的時間去執行一些其他的代碼,從而提升代碼執行效率。
生活中不也是這樣的么,假設 你是一家制造汽車的老板,員工點擊設備的【開始】按鈕之后,在設備前需等待30分鐘,然后點擊【結束】按鈕,此時作為老板的你一定希望這個員工在等待的那30分鐘的時間去做點其他的工作。
python 4?基于async
& await
關鍵字的協程可以實現異步編程,這也是目前python異步相關的主流技術。
想要真正的了解Python中內置的異步編程,根據下文的順序一點點來看。
事件循環,可以把他當做是一個while循環,這個while循環在周期性的運行并執行一些任務
,在特定條件下終止循環。
# 偽代碼任務列表 = [ 任務1, 任務2, 任務3,... ]while True:可執行的任務列表,已完成的任務列表 = 去任務列表中檢查所有的任務,將'可執行'和'已完成'的任務返回for 就緒任務 in 已準備就緒的任務列表:執行已就緒的任務for 已完成的任務 in 已完成的任務列表:在任務列表中移除 已完成的任務如果 任務列表 中的任務都已完成,則終止循環
在編寫程序時候可以通過如下代碼來獲取和創建事件循環。
import asyncioloop = asyncio.get_event_loop()
協程函數,定義形式為 async def
的函數。
協程對象,調用 協程函數 所返回的對象。
# 定義一個協程函數
async def func():pass# 調用協程函數,返回一個協程對象
result = func()
python3。注意:調用協程函數時,函數內部代碼不會執行,只是會返回一個協程對象。
程序中,如果想要執行協程函數的內部代碼,需要 事件循環
和 協程對象
配合才能實現,如:
import asyncioasync def func():print("協程內部代碼")# 調用協程函數,返回一個協程對象。
result = func()# 方式一
# loop = asyncio.get_event_loop() # 創建一個事件循環
# loop.run_until_complete(result) # 將協程當做任務提交到事件循環的任務列表中,協程執行完成之后終止。# 方式二
# 本質上方式一是一樣的,內部先 創建事件循環 然后執行 run_until_complete,一個簡便的寫法。
# asyncio.run 函數在 Python 3.7 中加入 asyncio 模塊,
asyncio.run(result)
這個過程可以簡單理解為:將協程
當做任務添加到 事件循環
的任務列表,然后事件循環檢測列表中的協程
是否 已準備就緒(默認可理解為就緒狀態),如果準備就緒則執行其內部代碼。
await是一個只能在協程函數中使用的關鍵字,用于遇到IO操作時掛起 當前協程(任務),當前協程(任務)掛起過程中 事件循環可以去執行其他的協程(任務),當前協程IO處理完成時,可以再次切換回來執行await之后的代碼。代碼如下:
示例1:
import asyncioasync def func():print("執行協程函數內部代碼")# 遇到IO操作掛起當前協程(任務),等IO操作完成之后再繼續往下執行。# 當前協程掛起時,事件循環可以去執行其他協程(任務)。response = await asyncio.sleep(2)print("IO請求結束,結果為:", response)result = func()asyncio.run(result)
示例2:
import asyncioasync def others():print("start")await asyncio.sleep(2)print('end')return '返回值'async def func():print("執行協程函數內部代碼")# 遇到IO操作掛起當前協程(任務),等IO操作完成之后再繼續往下執行。當前協程掛起時,事件循環可以去執行其他協程(任務)。response = await others()print("IO請求結束,結果為:", response)asyncio.run( func() )
python async。示例3:
import asyncioasync def others():print("start")await asyncio.sleep(2)print('end')return '返回值'async def func():print("執行協程函數內部代碼")# 遇到IO操作掛起當前協程(任務),等IO操作完成之后再繼續往下執行。當前協程掛起時,事件循環可以去執行其他協程(任務)。response1 = await others()print("IO請求結束,結果為:", response1)response2 = await others()print("IO請求結束,結果為:", response2)asyncio.run( func() )
上述的所有示例都只是創建了一個任務,即:事件循環的任務列表中只有一個任務,所以在IO等待時無法演示切換到其他任務效果。
在程序想要創建多個任務對象,需要使用Task對象來實現。
Tasks are used to schedule coroutines concurrently.
When a coroutine is wrapped into a Task with functions like
asyncio.create_task()
the coroutine is automatically scheduled to run soon。
Tasks用于并發調度協程,通過asyncio.create_task(協程對象)
的方式創建Task對象,這樣可以讓協程加入事件循環中等待被調度執行。除了使用 asyncio.create_task()
函數以外,還可以用低層級的 loop.create_task()
或 ensure_future()
函數。不建議手動實例化 Task 對象。
python asyncio、本質上是將協程對象封裝成task對象,并將協程立即加入事件循環,同時追蹤協程的狀態。
注意:asyncio.create_task()
函數在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低層級的 asyncio.ensure_future()
函數。
示例1:
import asyncioasync def func():print(1)await asyncio.sleep(2)print(2)return "返回值"async def main():print("main開始")# 創建協程,將協程封裝到一個Task對象中并立即添加到事件循環的任務列表中,等待事件循環去執行(默認是就緒狀態)。task1 = asyncio.create_task(func())# 創建協程,將協程封裝到一個Task對象中并立即添加到事件循環的任務列表中,等待事件循環去執行(默認是就緒狀態)。task2 = asyncio.create_task(func())print("main結束")# 當執行某協程遇到IO操作時,會自動化切換執行其他任務。# 此處的await是等待相對應的協程全都執行完畢并獲取結果ret1 = await task1ret2 = await task2print(ret1, ret2)asyncio.run(main())
示例2:
import asyncioasync def func():print(1)await asyncio.sleep(2)print(2)return "返回值"async def main():print("main開始")# 創建協程,將協程封裝到Task對象中并添加到事件循環的任務列表中,等待事件循環去執行(默認是就緒狀態)。# 在調用task_list = [asyncio.create_task(func(), name="n1"),asyncio.create_task(func(), name="n2")]print("main結束")# 當執行某協程遇到IO操作時,會自動化切換執行其他任務。# 此處的await是等待所有協程執行完畢,并將所有協程的返回值保存到done# 如果設置了timeout值,則意味著此處最多等待的秒,完成的協程返回值寫入到done中,未完成則寫到pending中。done, pending = await asyncio.wait(task_list, timeout=None)print(done, pending)asyncio.run(main())
注意:asyncio.wait
源碼內部會對列表中的每個協程執行ensure_future從而封裝為Task對象,所以在和wait配合使用時task_list的值為[func(),func()]
也是可以的。
示例3:
import asyncioasync def func():print("執行協程函數內部代碼")# 遇到IO操作掛起當前協程(任務),等IO操作完成之后再繼續往下執行。當前協程掛起時,事件循環可以去執行其他協程(任務)。response = await asyncio.sleep(2)print("IO請求結束,結果為:", response)coroutine_list = [func(), func()]# 錯誤:coroutine_list = [ asyncio.create_task(func()), asyncio.create_task(func()) ]
# 此處不能直接 asyncio.create_task,因為將Task立即加入到事件循環的任務列表,
# 但此時事件循環還未創建,所以會報錯。# 使用asyncio.wait將列表封裝為一個協程,并調用asyncio.run實現執行兩個協程
# asyncio.wait內部會對列表中的每個協程執行ensure_future,封裝為Task對象。
done,pending = asyncio.run( asyncio.wait(coroutine_list) )
python異步和多線程區別。A
Future
is a special low-level awaitable object that represents an eventual result of an asynchronous operation.
asyncio中的Future對象是一個相對更偏向底層的可對象,通常我們不會直接用到這個對象,而是直接使用Task對象來完成任務的并和狀態的追蹤。( Task 是 Futrue的子類 )
Future為我們提供了異步編程中的 最終結果 的處理(Task類也具備狀態處理的功能)。
示例1:
async def main():# 獲取當前事件循環loop = asyncio.get_running_loop()# # 創建一個任務(Future對象),這個任務什么都不干。fut = loop.create_future()# 等待任務最終結果(Future對象),沒有結果則會一直等下去。await futasyncio.run(main())
示例2:
import asyncioasync def set_after(fut):await asyncio.sleep(2)fut.set_result("666")async def main():# 獲取當前事件循環loop = asyncio.get_running_loop()# 創建一個任務(Future對象),沒綁定任何行為,則這個任務永遠不知道什么時候結束。fut = loop.create_future()# 創建一個任務(Task對象),綁定了set_after函數,函數內部在2s之后,會給fut賦值。# 即手動設置future任務的最終結果,那么fut就可以結束了。await loop.create_task(set_after(fut))# 等待 Future對象獲取 最終結果,否則一直等下去data = await futprint(data)asyncio.run(main())
Future對象本身函數進行綁定,所以想要讓事件循環獲取Future的結果,則需要手動設置。而Task對象繼承了Future對象,其實就對Future進行擴展,他可以實現在對應綁定的函數執行完成之后,自動執行set_result
,從而實現自動結束。
python協程。雖然,平時使用的是Task對象,但對于結果的處理本質是基于Future對象來實現的。
擴展:支持 await 對象
語 法的對象課成為可等待對象,所以 協程對象
、Task對象
、Future對象
都可以被成為可等待對象。
在Python的concurrent.futures
模塊中也有一個Future對象,這個對象是基于線程池和進程池實現異步操作時使用的對象。
import time
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutordef func(value):time.sleep(1)print(value)pool = ThreadPoolExecutor(max_workers=5)
# 或 pool = ProcessPoolExecutor(max_workers=5)for i in range(10):fut = pool.submit(func, i)print(fut)
兩個Future對象是不同的,他們是為不同的應用場景而設計,例如:concurrent.futures.Future
不支持await語法 等。
官方提示兩對象之間不同:
unlike asyncio Futures, concurrent.futures.Future
instances cannot be awaited.
python異步處理?asyncio.Future.result()
and asyncio.Future.exception()
do not accept the timeout argument.
asyncio.Future.result()
and asyncio.Future.exception()
raise an InvalidStateError
exception when the Future is not done.
Callbacks registered with asyncio.Future.add_done_callback()
are not called immediately. They are scheduled with loop.call_soon()
instead.
asyncio Future is not compatible with the concurrent.futures.wait()
and concurrent.futures.as_completed()
functions.
在Python提供了一個將futures.Future
對象包裝成asyncio.Future
對象的函數 asynic.wrap_future
。
接下里你肯定問:為什么python會提供這種功能?
python回調函數,其實,一般在程序開發中我們要么統一使用 asycio 的協程實現異步操作、要么都使用進程池和線程池實現異步操作。但如果 協程的異步
和 進程池/線程池的異步
混搭時,那么就會用到此功能了。
import time
import asyncio
import concurrent.futuresdef func1():# 某個耗時操作time.sleep(2)return "SB"async def main():loop = asyncio.get_running_loop()# 1. Run in the default loop's executor ( 默認ThreadPoolExecutor )# 第一步:內部會先調用 ThreadPoolExecutor 的 submit 方法去線程池中申請一個線程去執行func1函數,并返回一個concurrent.futures.Future對象# 第二步:調用asyncio.wrap_future將concurrent.futures.Future對象包裝為asycio.Future對象。# 因為concurrent.futures.Future對象不支持await語法,所以需要包裝為 asycio.Future對象 才能使用。fut = loop.run_in_executor(None, func1)result = await futprint('default thread pool', result)# 2. Run in a custom thread pool:# with concurrent.futures.ThreadPoolExecutor() as pool:# result = await loop.run_in_executor(# pool, func1)# print('custom thread pool', result)# 3. Run in a custom process pool:# with concurrent.futures.ProcessPoolExecutor() as pool:# result = await loop.run_in_executor(# pool, func1)# print('custom process pool', result)asyncio.run(main())
應用場景:當項目以協程式的異步編程開發時,如果要使用一個第三方模塊,而第三方模塊不支持協程方式異步編程時,就需要用到這個功能,例如:
import asyncio
import requestsasync def download_image(url):# 發送網絡請求,下載圖片(遇到網絡下載圖片的IO請求,自動化切換到其他任務)print("開始下載:", url)loop = asyncio.get_event_loop()# requests模塊默認不支持異步操作,所以就使用線程池來配合實現了。future = loop.run_in_executor(None, requests.get, url)response = await futureprint('下載完成')# 圖片保存到本地文件file_name = url.rsplit('_')[-1]with open(file_name, mode='wb') as file_object:file_object.write(response.content)if __name__ == '__main__':url_list = ['https://www3.autoimg.cn/newsdfs/g26/M02/35/A9/120x90_0_autohomecar__ChsEe12AXQ6AOOH_AAFocMs8nzU621.jpg','https://www2.autoimg.cn/newsdfs/g30/M01/3C/E2/120x90_0_autohomecar__ChcCSV2BBICAUntfAADjJFd6800429.jpg','https://www3.autoimg.cn/newsdfs/g26/M0B/3C/65/120x90_0_autohomecar__ChcCP12BFCmAIO83AAGq7vK0sGY193.jpg']tasks = [download_image(url) for url in url_list]loop = asyncio.get_event_loop()loop.run_until_complete( asyncio.wait(tasks) )
什么是異步迭代器
實現了 __aiter__()
和 __anext__()
方法的對象。__anext__
必須返回一個 awaitable 對象。async for
會處理異步迭代器的 __anext__()
方法所返回的可等待對象,直到其引發一個 StopAsyncIteration
異常。由 PEP 492 引入。
什么是異步可迭代對象?
可在 async for
語句中被使用的對象。必須通過它的 __aiter__()
方法返回一個 asynchronous iterator。由 PEP 492 引入。
import asyncioclass Reader(object):""" 自定義異步迭代器(同時也是異步可迭代對象) """def __init__(self):self.count = 0async def readline(self):# await asyncio.sleep(1)self.count += 1if self.count == 100:return Nonereturn self.countdef __aiter__(self):return selfasync def __anext__(self):val = await self.readline()if val == None:raise StopAsyncIterationreturn valasync def func():# 創建異步可迭代對象async_iter = Reader()# async for 必須要放在async def函數內,否則語法錯誤。async for item in async_iter:print(item)asyncio.run(func())
協程與異步asyncio。異步迭代器其實沒什么太大的作用,只是支持了async for語法而已。
此種對象通過定義 __aenter__()
和 __aexit__()
方法來對 async with
語句中的環境進行控制。由 PEP 492 引入。
import asyncioclass AsyncContextManager:def __init__(self):self.conn = connasync def do_something(self):# 異步操作數據庫return 666async def __aenter__(self):# 異步鏈接數據庫self.conn = await asyncio.sleep(1)return selfasync def __aexit__(self, exc_type, exc, tb):# 異步關閉數據庫鏈接await asyncio.sleep(1)async def func():async with AsyncContextManager() as f:result = await f.do_something()print(result)asyncio.run(func())
這個異步的上下文管理器還是比較有用的,平時在開發過程中 打開、處理、關閉 操作時,就可以用這種方式來處理。
在程序中只要看到async
和await
關鍵字,其內部就是基于協程實現的異步編程,這種異步編程是通過一個線程在IO等待時間去執行其他任務,從而實現并發。
以上就是異步編程的常見操作,內容參考官方文檔。
Python標準庫中提供了asyncio
模塊,用于支持基于協程的異步編程。
python scrapy,uvloop是 asyncio 中的事件循環的替代方案,替換后可以使得asyncio性能提高。事實上,uvloop要比nodejs、gevent等其他python異步框架至少要快2倍,性能可以比肩Go語言。
安裝uvloop
pip3 install uvloop
在項目中想要使用uvloop替換asyncio的事件循環也非常簡單,只要在代碼中這么做就行。
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())# 編寫asyncio的代碼,與之前寫的代碼一致。# 內部的事件循環自動化會變為uvloop
asyncio.run(...)
注意:知名的asgi uvicorn內部就是使用的uvloop的事件循環。
為了更好理解,上述所有示例的IO情況都是以 asyncio.sleep
為例,而真實的項目開發中會用到很多IO的情況。
當通過python去操作redis時,鏈接、設置值、獲取值 這些都涉及網絡IO請求,使用asycio異步的方式可以在IO等待時去做一些其他任務,從而提升性能。
異步。安裝Python異步操作redis模塊
pip3 install aioredis
示例1:異步操作redis。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import asyncio
import aioredisasync def execute(address, password):print("開始執行", address)# 網絡IO操作:創建redis連接redis = await aioredis.create_redis(address, password=password)# 網絡IO操作:在redis中設置哈希值car,內部在設三個鍵值對,即: redis = { car:{key1:1,key2:2,key3:3}}await redis.hmset_dict('car', key1=1, key2=2, key3=3)# 網絡IO操作:去redis中獲取值result = await redis.hgetall('car', encoding='utf-8')print(result)redis.close()# 網絡IO操作:關閉redis連接await redis.wait_closed()print("結束", address)asyncio.run(execute('redis://47.93.4.198:6379', "root!2345"))
示例2:連接多個redis做操作(遇到IO會切換其他任務,提供了性能)。
import asyncio
import aioredisasync def execute(address, password):print("開始執行", address)# 網絡IO操作:先去連接 47.93.4.197:6379,遇到IO則自動切換任務,去連接47.93.4.198:6379redis = await aioredis.create_redis_pool(address, password=password)# 網絡IO操作:遇到IO會自動切換任務await redis.hmset_dict('car', key1=1, key2=2, key3=3)# 網絡IO操作:遇到IO會自動切換任務result = await redis.hgetall('car', encoding='utf-8')print(result)redis.close()# 網絡IO操作:遇到IO會自動切換任務await redis.wait_closed()print("結束", address)task_list = [execute('redis://47.93.4.197:6379', "root!2345"),execute('redis://47.93.4.198:6379', "root!2345")
]asyncio.run(asyncio.wait(task_list))
更多redis操作參考aioredis官網:https://aioredis.readthedocs.io/en/v1.3.0/start.html
當通過python去操作MySQL時,連接、執行SQL、關閉都涉及網絡IO請求,使用asycio異步的方式可以在IO等待時去做一些其他任務,從而提升性能。
安裝Python異步操作redis模塊
pip3 install aiomysql
kotlin協程異步同步?示例1:
import asyncio
import aiomysqlasync def execute():# 網絡IO操作:連接MySQLconn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123', db='mysql', )# 網絡IO操作:創建CURSORcur = await conn.cursor()# 網絡IO操作:執行SQLawait cur.execute("SELECT Host,User FROM user")# 網絡IO操作:獲取SQL結果result = await cur.fetchall()print(result)# 網絡IO操作:關閉鏈接await cur.close()conn.close()asyncio.run(execute())
示例2:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import asyncio
import aiomysqlasync def execute(host, password):print("開始", host)# 網絡IO操作:先去連接 47.93.40.197,遇到IO則自動切換任務,去連接47.93.40.198:6379conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')# 網絡IO操作:遇到IO會自動切換任務cur = await conn.cursor()# 網絡IO操作:遇到IO會自動切換任務await cur.execute("SELECT Host,User FROM user")# 網絡IO操作:遇到IO會自動切換任務result = await cur.fetchall()print(result)# 網絡IO操作:遇到IO會自動切換任務await cur.close()conn.close()print("結束", host)task_list = [execute('47.93.40.197', "root!2345"),execute('47.93.40.197', "root!2345")
]asyncio.run(asyncio.wait(task_list))
FastAPI是一款用于構建API的高性能web框架,框架基于Python3.6+的 type hints
搭建。
接下里的異步示例以FastAPI
和uvicorn
來講解(uvicorn是一個支持異步的asgi)。
安裝FastAPI web 框架,
pip3 install fastapi
安裝uvicorn,本質上為web提供socket server的支持的asgi(一般支持異步稱asgi、不支持異步稱wsgi)
pip3 install uvicorn
python攜程怎么做數據同步、示例:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import asyncioimport uvicorn
import aioredis
from aioredis import Redis
from fastapi import FastAPIapp = FastAPI()REDIS_POOL = aioredis.ConnectionsPool('redis://47.193.14.198:6379', password="root123", minsize=1, maxsize=10)@app.get("/")
def index():""" 普通操作接口 """return {"message": "Hello World"}@app.get("/red")
async def red():""" 異步操作接口 """print("請求來了")await asyncio.sleep(3)# 連接池獲取一個連接conn = await REDIS_POOL.acquire()redis = Redis(conn)# 設置值await redis.hmset_dict('car', key1=1, key2=2, key3=3)# 讀取值result = await redis.hgetall('car', encoding='utf-8')print(result)# 連接歸還連接池REDIS_POOL.release(conn)return resultif __name__ == '__main__':uvicorn.run("luffy:app", host="127.0.0.1", port=5000, log_level="info")
在有多個用戶并發請求的情況下,異步方式來編寫的接口可以在IO等待過程中去處理其他的請求,提供性能。
例如:同時有兩個用戶并發來向接口 http://127.0.0.1:5000/red
發送請求,服務端只有一個線程,同一時刻只有一個請求被處理。 異步處理可以提供并發是因為:當視圖函數在處理第一個請求時,第二個請求此時是等待被處理的狀態,當第一個請求遇到IO等待時,會自動切換去接收并處理第二個請求,當遇到IO時自動化切換至其他請求,一旦有請求IO執行完畢,則會再次回到指定請求向下繼續執行其功能代碼。
在編寫爬蟲應用時,需要通過網絡IO去請求目標數據,這種情況適合使用異步編程來提升性能,接下來我們使用支持異步編程的aiohttp模塊來實現。
安裝aiohttp模塊
pip3 install aiohttp
示例:
import aiohttp
import asyncioasync def fetch(session, url):print("發送請求:", url)async with session.get(url, verify_ssl=False) as response:text = await response.text()print("得到結果:", url, len(text))async def main():async with aiohttp.ClientSession() as session:url_list = ['https://python.org','https://www.baidu.com','https://www.pythonav.com']tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]await asyncio.wait(tasks)if __name__ == '__main__':asyncio.run(main())
協程與異步,為了提升性能越來越多的框架都在向異步編程靠攏,例如:sanic、tornado、django3.0、django channels組件 等,用更少資源可以做處理更多的事,何樂而不為呢。
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态