English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

python 동기화2asyncio를 사용하여 동기화 처리

asyncio

Python 2의 시대에, 높은 성능의 네트워크 프로그래밍은 Twisted, Tornado, Gevent 이 세 가지 라이브러리를 사용했습니다. 그러나 이들의 비동기 코드는 서로 불호응적이며 이전으로 이동할 수 없습니다. 이전 장에서 설명한 것처럼, Gvanrossum은 Python 3 데이터 소스에 기반한 내장된 동기 IO 지원을 포함한 원시적인 코루틴 라이브러리를 구현했습니다. 이것이 asyncio입니다. Python 3.4표준 라이브러리에 도입되었습니다.

asyncio 이 패키지는 이벤트 루프를 기반으로 한 코루틴을 사용하여 동기화를 구현합니다.

asyncio 패키지는 표준 라이브러리에 포함되기 전에는 'Tulip'(장미)로 알려져 있었기 때문에, 자료를 검색할 때 이 꽃의 이름을 자주 볼 수 있습니다.

이벤트 루프는 무엇인가요?63;

wiki에 따르면, 이벤트 루프는 '프로그램이 이벤트나 메시지를 할당을 기다리는 프로그래밍 아키텍처'입니다. 기본적으로 이벤트 루프는 'A가 발생할 때, B를 실행합니다'라고 말할 수 있습니다. 또는 가장 간단한 예로, 모든 브라우저에서 존재하는 JavaScript 이벤트 루프를 사용할 수 있습니다. 어떤 것을 클릭하면('A가 발생할 때'), 이 클릭 동작이 JavaScript 이벤트 루프에 전달되고, 등록된 onclick 콜백이 클릭 동작을 처리하는지 확인됩니다(또는 B를 실행합니다). 등록된 콜백 함수가 있으면, 클릭 동작의 상세 정보와 함께 실행됩니다. 이벤트 루프는 이벤트를 지속적으로 수신하고 루프를 통해 이러한 이벤트에 대한 대응을 발생시키기 때문에, 이벤트 루프는 가상의 것으로 간주됩니다.

Python에서는 이벤트 루프를 제공하는 asyncio 패키지가 표준 라이브러리에 포함되었습니다. asyncio는 네트워크 서비스에서의 문제를 해결하기 위해 중점을 두고 있으며, 이벤트 루프는 소켓에서의 I/O가 이미 읽고/또는 selectors 모듈을 통해 'A가 발생할 때'로 쓸 수 있습니다./O, 이벤트 루프는 다른 스레드나 서브 프로세스에서 코드를 실행하고 이벤트 루프를 조정 기제로 사용하기도 합니다(예: 협력적 멀티태스킹). Python의 GIL을 이해하는 경우, 이벤트 루프는 GIL을 해제해야 하는 곳에서 매우 유용합니다.

스레드와 코루틴

우리는 두 가지 코드를 보겠습니다. 하나는 threading 모듈과 asyncio 패키지를 사용하여 작성된 코드입니다.

# sinner_thread.py
import threading
import itertools
import time
import sys
class Signal: # 이 클래스는 외부에서 스레드를 제어하기 위한 변하지 않는 객체를 정의합니다
 go = True
def spin(msg, signal): # 이 함수는 독립적인 스레드에서 실행되며, signal 파라미터는 위에서 정의한 Signal 클래스의 인스턴스입니다
 write, flush = sys.stdout.write, sys.stdout.flush
 for char in itertools.cycle('|)/-\"): # itertools.cycle 함수는 지정된 시리얼에서 반복적으로 요소를 생성
  status = char + ' ' + msg
  write(status)
  flush()
  write('\x0)8' * len(status)) # 스크롤백 문자로 커서를 행의 시작에 이동
  time.sleep(.)1) # 0.1 초에 한 번 새로 고칩니다
  if not signal.go: # go 속성이 True가 아니면, 루프를 탈출합니다
   break
 write(' ' * len(status) + '\x08' * len(status)) # 공백을 사용하여 상태 메시지를 지우고, 커서를 처음으로 이동합니다
def slow_function(): # 시간이 소요되는 작업을 모의합니다
 # I를 기다리는 것을 가장하려면/O 일정 시간
 time.sleep(3) # sleep 호출은 메인 스레드를 블록하며, 이렇게 하면 GIL을 해제하고 서브스레드를 생성합니다
 return 42
def supervisor(): # 이 함수는 서브스레드를 설정하고, 스레드 객체를 표시하고, 실행 시간 계산을 수행하고, 마지막으로 프로세스를 종료합니다
 signal = Signal()
 spinner = threading.Thread(target=spin,
        args=('thinking!', signal))
 print('spinner object:', spinner) # 스레드 객체를 출력 spinner object: <Thread(Thread-1, initial)>
 spinner.start() # 부속 프로세스를 시작
 result = slow_function() # slow_function 행을 실행합니다. 동시에 주요 스레드를 애니메이션으로 회전 지시자로 블록
 signal.go = False
 spinner.join() # spinner 스레드가 종료될 때까지 기다립니다
 return result
def main():
 result = supervisor() 
 print('Answer', result)
if __name__ == '__main__':
 main()

실행해보면, 결과는 이렇게 나타납니다:

이는 애니메이션입니다. 'thinking' 전의 선은 움직입니다. (녹화를 위해 sleep 시간을 늘렸습니다)

python은 스레드를 종료하는 API를 제공하지 않기 때문에, 스레드를 닫으려면 스레드에 메시지를 보내는 것이 필요합니다. 여기서는 signal.go 속성을 사용합니다: 메인 스레드에서 False로 설정하면, spinner 스레드는 이를 수신하고 종료됩니다

이제 asyncio 패키지를 사용하는 버전을 다시 보겠습니다:

# spinner_asyncio.py
# 코루틴을 통해 텍스트로 회전 지시자를 애니메이션으로 표시
import asyncio
import itertools
import sys
@asyncio.coroutine # asyncio를 처리하는 코루틴을 위한 @asyncio.coroutine 장식
def spin(msg):
 write, flush = sys.stdout.write, sys.stdout.flush
 for char in itertools.cycle('|)/-\"): # itertools.cycle 함수는 지정된 시리얼에서 반복적으로 요소를 생성
  status = char + ' ' + msg
  write(status)
  flush()
  write('\x0)8' * len(status)) # 스크롤백 문자로 커서를 행의 시작에 이동
  try:
   yield from asyncio.sleep(0.)1) yield from asyncio.sleep(0.) 사용1) time.sleep(.) 대신1이러한 휴면은 이벤트 루프를 블록하지 않습니다.
  except asyncio.CancelledError: # spin 함수가 깨어나 asyncio.CancelledError 예외를 투척하면, 이는 취소 요청이 발생한 원인입니다
   break
 write(' ' * len(status) + '\x08' * len(status)) # 공백을 사용하여 상태 메시지를 지우고, 커서를 처음으로 이동합니다
@asyncio.coroutine
def slow_function(): # 5 현재 이 함수는 코루틴이며, 잠시를 가장하기 위해 잠시를 사용합니다/# O 작업 시, yield from을 사용하여 이벤트 루프를 계속 실행합니다
 # I를 기다리는 것을 가장하려면/O 일정 시간
 yield from asyncio.sleep(3) # 이 표현식은 제어권을 메인 루프에 넘겨주며, 잠시 후 이 코루틴에 돌아옵니다
 return 42
@asyncio.coroutine
def supervisor(): # 이 함수도 코루틴이기 때문에 yield from로 slow_function을 드라이브할 수 있습니다
 spinner = asyncio.async(spin('thinking!')) # asyncio.async() 함수는 코루틴 실행 시간을 계획하고, spin 코루틴을 Task 객체로 포장하여 즉시 반환합니다
 print('spinner object:', spinner) # Task 객체, 출력은 spinner object: <Task pending coro=<spin() running at spinner_asyncio.py:6>>
 # slow_function() 함수를 드라이브하면, 실행이 끝나면 반환 값을 가져오며, 이벤트 루프는 계속 실행됩니다
 # slow_function 함수가 마지막으로 yield from asyncio.sleep()을 사용하기 때문에3) 표현식은 제어권을 메인 루프에 넘겨줍니다
 result = yield from slow_function()
 # Task 객체는 취소할 수 있으며, 취소 후 코루틴이 현재 일시정지된 yield에서 asyncio.CancelledError 예외를 투척합니다
 # 코루틴은 이 예외를 잡을 수 있으며, 지연으로 취소하거나 취소를 거부할 수 있습니다
 spinner.cancel()
 return result
def main():
 loop = asyncio.get_event_loop() # 이벤트 루프 참조를 가져옵니다
 # 코루틴 supervisor 드라이브, 실행을 완료하도록; 이 코루틴의 반환 값은 이 호출의 반환 값입니다
 result = loop.run_until_complete(supervisor())
 loop.close()
 print('Answer', result)
if __name__ == '__main__':
 main()

메인 스레드를 블록하고 이벤트 루프나 전체 애플리케이션을 동결하고 싶지 않다면, asyncio 코루틴에서 time.sleep()를 사용하지 마세요.

코루틴이 일정 기간 동안 아무런 행동도 하지 않아야 한다면, yield from asyncio.sleep(DELAY)를 사용해야 합니다.

@asyncio.coroutine 데코레이터는 필수적이지 않지만, 이렇게 하면 코드에서 코루틴을 강조할 수 있어 좋습니다. 코루틴이 값을 내리지 않았다면(즉, 작업이 완료되지 않았다면), 가벼운 경고를 발생시킬 수 있습니다. 이 데코레이터는 코루틴을 미리 활성화하지 않습니다.

이 두 부분의 코드 실행 결과는 거의 동일합니다. 지금 두 부분의 코드의 핵심 코드 supervisor 주요 차이점을 보겠습니다:

  1. asyncio.Task 객체는 threading.Thread 객체와 거의 동일합니다(Task 객체는 작성 시간 동안 멀티태스킹을 구현하는 라이브러리의 녹색 스레드와 같습니다).
  2. Task 객체는 코루틴을 운영하며, Thread 객체는 호출할 수 있는 객체를 호출합니다.
  3. Task 객체는 직접 인스턴스화되지 않으며, asyncio.async(...) 함수에 코루틴을 전달하거나 loop.create_task(...) 메서드를 통해 얻습니다.
  4. 얻은 Task 객체는 이미 실행 시간이 설정되어 있습니다; Thread 인스턴스는 start 메서드를 호출하여 명확히 실행을 알려야 합니다.
  5. 스레드 버전의 supervisor 함수에서 slow_function은 일반 함수로 스레드가 직접 호출하며, 비동기 버전의 slow_function 함수는 코루틴으로 yield from이 동작을 유도합니다.
  6. API는 외부에서 스레드를 종료할 수 없습니다. 왜냐하면 스레드는 언제든지 중단될 수 있기 때문입니다. 그리고 작업을 종료하려면 Task.cancel() 인스턴스 메서드를 사용하여 코루틴 내에서 CancelledError 예외를 발생시킬 수 있습니다. 코루틴은 대기 중인 yield 점에서 이 예외를 캡처할 수 있어 종료 요청을 처리할 수 있습니다.
  7. supervisor 코루틴은 main 함수에서 loop.run_until_complete 메서드를 통해 실행되어야 합니다.

코루틴과 스레드를 비교했을 때 중요한 장점 중 하나는, 스레드는 중요한 프로그램 부분을 보호하기 위해 락을 유지해야 하며, 여러 단계操作이 실행되는 동안 중단되지 않도록 하고, 산수가 유지되는 상태에서 코루틴은 기본적으로 보호를 해줍니다. 그러나 우리는 프로그램의 나머지 부분이 실행되도록 제어권을 내려주려면 명시적으로(예: yield 또는 yield from 사용) 나가야 합니다.

asyncio.Future:故意的에 블록되지 않음

asynci.Future 클래스와 concurrent.futures.Future 클래스의 인터페이스는 기본적으로 일치하지만, 구현 방식이 다르기 때문에 교환할 수 없습니다.

이전 [python 병행 1:futures를 사용하여 병행 처리]() 우리는 concurrent.futures.Future의 future를 소개했으며, concurrent.futures.Future에서 future는 특정 작업을 실행하는 결과를 스케줄링하는 것입니다. asyncio 패키지에서 BaseEventLoop.create_task(...) 메서드는 코루틴을 받아서, 그 실행 시간을 정렬하고, asyncio.Task 인스턴스(Asyncio.Future 클래스의 인스턴스, Task는 Future의 서브클래스로, 코루틴을 포장하는 데 사용됩니다.(concurrent.futures.Future에서는 Executor.submit(...)과 유사한 작업을 수행합니다.).

concurrent.futures.Future 클래스와 유사하게, asyncio.Future 클래스도 다음과 같이 제공합니다:

  1. .done()은 부울 값을 반환하여 Future가 이미 실행되었는지 여부를 나타냅니다.
  2. .add_done_callback() 메서드는 하나의 매개변수를 가지며, 이 매개변수는 호출 가능 객체입니다. Future가 실행이 끝나면 이 객체를 콜백합니다.
  3. .result() 메서드는 매개변수가 없기 때문에, 지정된 시간을 설정할 수 없습니다. .result() 메서드를 호출할 때에도 아직 실행이 완료되지 않았다면 asyncio.InvalidStateError 예외가 발생합니다.

concurrent.futures.Future 클래스의 Future를 실행 마치고 result()를 호출하면, 호출 가능 객체의 결과를 반환하거나, 호출 가능 객체를 실행하는 동안 발생한 예외를 던집니다. Future가 실행되지 않은 상태에서 f.result() 메서드를 호출하면, 호출자의 스레드가 블록되고, 결과가 반환될 때까지 기다립니다. 이 경우 result 메서드는 timeout 파라미터를 받을 수 있으며, 지정된 시간 내에 Future가 실행되지 않으면 TimeoutError 예외를 던집니다.

asyncio.Future를 사용할 때, 일반적으로 yield from을 사용하여 결과를 얻고, result() 메서드를 사용하지 않습니다. yield from 표현식은 일시정지된 코루틴에서 반환 값을 생성하고, 다시 실행 프로세스를 복귀합니다.

asyncio.Future 클래스는 yield from과 함께 사용되기 위해 설계되었기 때문에, 일반적으로 다음 메서드를 사용하지 않습니다:

  1. my_future.add_down_callback(...), 호출은 필요 없습니다. 왜냐하면 future가 실행되고 끝나는 후에 수행하고자 하는 작업을 코루틴에 yield from my_future 표현식 뒤에 직접 넣을 수 있습니다. (코루틴은 함수를 일시 중지하고 재개할 수 있기 때문입니다.)
  2. my_future.result() 호출은 필요 없습니다. 왜냐하면 yield from이 생성하는 결과는 (result = yield from my_future)입니다.

asyncio 패키지에서는 asyncio.Future 객체에서 결과를 생산할 수 있는 yield from을 사용할 수 있습니다. 이는 다음과 같이 쓸 수 있음을 의미합니다:

res = yield from foo() # foo는 코루틴 함수나 Future나 task 인스턴스를 반환하는 일반 함수일 수 있습니다.

asyncio.async(...)* 함수

asyncio.async(coro_or_future, *, loop=None)

이 함수는 코루틴과 Future를 일관성 있게 합니다: 첫 번째 매개변수는 둘 중 하나일 수 있습니다. Future나 Task 객체일 경우 그대로 반환되며, 코루틴이면 async 함수가 자동으로 loop.create_task(...) 메서드를 호출하여 Task 객체를 생성합니다. loop 매개변수는 선택사항이며, 이벤트 루프를 전달합니다; 전달하지 않으면 async 함수는 asyncio.get_event_loop() 함수를 호출하여 루프 객체를 가져옵니다.

BaseEventLoop.create_task(coro)

이 메서드는 코루틴의 실행 시간을 정해 asyncio.Task 객체를 반환합니다.自定义의 BaseEventLoop 서브 클래스에서 호출된 경우, 반환된 객체는 Task 클래스와 호환되는 외부 라이브러리의 어떤 클래스의 인스턴스일 수 있습니다.

BaseEventLoop.create_task() 메서드는 Python에서만 사용할 수 있습니다.3.4.2 이상 버전에서 사용할 수 있습니다. Python3.3 아래 asyncio.async(...) 함수만 사용할 수 있습니다.
Python 컨솔이나 소형 테스트 스크립트에서 future와 코루틴을 실험하고 싶다면 다음 부분을 사용할 수 있습니다:

import asyncio
def run_sync(coro_or_future):
 loop = asyncio.get_event_loop()
 return loop.run_until_complete(coro_or_future)
a = run_sync(some_coroutine())

asyncio와 aiohttp 패키지를 사용하여 다운로드

지금, asyncio의 기본 지식을 이해했으므로, 이전에 작성한 [python concurrency]을 asyncio로 다시 작성할 때가 되었습니다 1(): 사용자 정의 futures를 처리하여 국기를 다운로드하는 스크립트를 작성했습니다

먼저 코드를 보겠습니다:

import asyncio
import aiohttp # aiohttp를 설치해야 합니다. pip install aiohttp
from flags import save_flag, show, main, BASE_URL
@asyncio.coroutine # 우리는 코루틴이 asyncio.coroutine 데코레이터를 사용해야 한다고 알고 있습니다
def get_flag(cc):
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
  # 블록된 작업은 코루틴을 통해 구현되며, 고객 코드는 yield from을 통해 책임을 위임하여 비동기 작업을 수행합니다
 resp = yield from aiohttp.request('GET', url) 
 # 읽기도 비동기 작업입니다
 image = yield from resp.read()
 return image
@asyncio.coroutine
def download_one(cc): # 이 함수도 코루틴이어야 합니다. 왜냐하면 yield from을 사용하기 때문입니다
 image = yield from get_flag(cc) 
 show(cc)
 save_flag(image, cc.lower()) + '.gif')}}
 return cc
def download_many(cc_list):
 loop = asyncio.get_event_loop() # 이벤트 루프의 하부 구현을 참조하는 이벤트 순서를 가져옵니다
 to_do = [download_one(cc) for cc in sorted(cc_list)] # 각 국기를 가져오기 위해 download_one을 호출하고, 생성자 객체 목록을 생성합니다
 # 함수 이름은 wait라도 블록형 함수가 아닙니다. wait는 코루틴이며, 전달받은 모든 코루틴이 실행되면 종료됩니다
 wait_coro = asyncio.wait(to_do)
 res, _ = loop.run_until_complete(wait_coro) # 이벤트 루프를 실행하여 wait_coro가 완료될 때까지 기다림; 이벤트 루프가 실행되는 동안, 이 스크립트는 여기서 블록됩니다.
 loop.close() # 이벤트 루프를 닫습니다
 return len(res)
if __name__ == '__main__':
 main(download_many)

이 코드의 실행 요약은 다음과 같습니다:

  1. download_many 함수에서 이벤트 루프를 가져오고, download_one 함수 호출로 생성된 여러 코루틴 객체를 처리합니다
  2. asyncio 이벤트 루프는 각 코루틴을 한 번에 활성화합니다
  3. 고객 코드의 코루틴(get_flag)이 라이브러리의 코루틴(aiohttp.request)에 책임을 위임하려고 yield from을 사용할 때, 제어권은 이벤트 루프에게 돌려져 있으며, 실행 전에 예약된 코루틴을 실행합니다
  4. 事件循环通过基于回调的底层API,在阻塞的操作执行完毕后获得通知。
  5. 이벤트 루프는 블록형 작업이 완료되면 알림을 받는 기본적인回调 API를 사용하여 알림을 받습니다.
  6. 알림을 받으면, 메인 루프는 일시정지된 코루틴에 결과를 보냅니다4코루틴은 다음 yield from 표현식까지 앞으로 진행합니다. 예를 들어 get_flag 함수의 yield from resp.read()과 같습니다. 이벤트 루프가 다시 제어권을 얻고, 다시 1을 반복합니다.6~

루프가 종료될 때까지 단계별로 진행합니다.

download_many 함수에서는 asyncio.wait(...) 함수를 사용했습니다. 이 함수는 코루틴으로, 코루틴의 파라미터는 future나 코루틴으로 구성된 이터러블 객체입니다; wait는 각 코루틴을 Task 객체에 포장합니다. 최종 결과는 wait가 처리한 모든 객체가 Future 클래스의 인스턴스로 변환됩니다.

wait는 코루틴 함수이므로, 반환되는 것은 코루틴이나 제너레이터 객체입니다; waite_coro 변수에 저장된 것은 이런 객체입니다.

<section class="caption">wait</section>에는 timeout과 return_when이라는 두 개의 이름이 있는 파라미터가 있습니다. 이 두 파라미터가 설정되면 미완료된 future를 반환할 수 있습니다.

이전에 사용한 requests 라이브러리가 블록형 I/O 연산. asyncio 패키지를 사용하려면, 함수를 비동기 버전으로 변경해야 합니다.

간단한 팁

요청을 사용하면 코드가 이해하기 어려워 보이면, 파이썬의 아버지 (Guido van Rossum)의 조언을 따라 yield from을 존재하지 않는 것처럼 생각할 수 있습니다.

위의 코드를 예로 들어보겠습니다:

@asyncio.coroutine
def get_flag(cc):
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = yield from aiohttp.request('GET', url) 
 image = yield from resp.read()
 return image
# yield form을 제거하세요
def get_flag(cc):
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = aiohttp.request('GET', url) 
 image = resp.read()
 return image
# 더 명확해졌나요

지식 포인트

asyncio 패키지의 API에서 yield from을 사용할 때, 주의해야 할 한 가지 세세한 점이 있습니다:

asyncio 패키지를 사용할 때, 우리는 asyncio 자체가 코루틴을 운영하는 비동기 코드를 작성하며, 생성자는 최종적으로 asyncio 패키지 또는 제3자 라이브러리의 코루틴에 대한 책임을 위임합니다. 이 처리 방식은 asyncio 이벤트 루프가 기본 비동기 I/O를 운영하는 파이프라인을 건설하는 것과 동일합니다./O 라이브러리 함수.

블록형 호출을 피합니다

우리는 이 그래프를 통해 컴퓨터가 다른 저장 매체에서 데이터를 읽는 지연 상황을 보여줍니다:

이 그래프를 통해, 블록형 호출이 CPU에 대한 큰 낭비임을 볼 수 있습니다. 블록형 호출이 전체 애플리케이션을 중지하지 않도록 어떻게 피할 수 있을까요?

두 가지 방법이 있습니다:

  1. 단일 스레드에서 각 블록형操作을 실행합니다
  2. 모든 블록형操作을 비블록형 비동기 호출로 변환합니다

물론 두 번째 방법을 추천합니다. 첫 번째 방법에서는 각 연결마다 스레드를 사용하는 경우 비용이 너무 많기 때문입니다.

두 번째로, 생성자를 코루틴으로 사용하여 비동기 프로그래밍을 구현할 수 있습니다. 이벤트 루프는 콜백 호출과 일시정지된 코루틴에 대해 .send() 메서드를 호출하는 것과 비슷합니다. 일시정지된 코루틴이 소비하는 메모리는 스레드보다 훨씬 적습니다.

지금 이제 flags_asyncio.py 스크립트가 flags.py보다 훨씬 빠르다는 것을 이해할 수 있을 것입니다.

flags.py는 차례대로 동기로 다운로드되며, 각 다운로드마다 수십억 개의 CPU 주기가 결과를 기다리는 시간이 걸립니다. flags_asyncio.py에서는 download_many 함수에서 loop.run_until_complete 메서드를 호출할 때, 이벤트 루프가 각 download_one 커리틴을 운영하며 yield from 표현식에서 멈추고, 그 표현식이 각 get_flag 커리틴을 운영하며 첫 번째 yield from 표현식까지 운영합니다. 이 호출은 블록이 되지 않기 때문에 수십 분之一의 초 내에 모든 요청이 시작할 수 있습니다.

asyncio 다운로드 스크립트를 개선합니다

지금 우리는 상단의 flags_asyncio.py를 개선해 보겠습니다. 그 안에 예외 처리와 카운터를 추가합니다.

import asyncio
import collections
from collections import namedtuple
from enum import Enum
import aiohttp
from aiohttp import web
from flags import save_flag, show, main, BASE_URL
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
Result = namedtuple('Result', 'status data')
HTTPStatus = Enum('Status', 'ok not_found error')
# 다른 HTTP 또는 네트워크 예외를 감싸고 country_code를 가져와서 오류를 보고하기 위한 사용자 정의 예외
class FetchError(Exception):
 def __init__(self, country_code):
  self.country_code = country_code
@asyncio.coroutine
def get_flag(cc):
 # 이 코루틴은 세 가지 결과를 반환합니다:
 # 1. 다운로드된 이미지를 반환
 # 2. HTTP 응답은404 . 예를 들어, web.HTTPNotFound 예외를 표출
 # 3. 다른 HTTP 상태 코드를 반환할 때, aiohttp.HttpProcessingError를 표출
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = yield from aiohttp.request('GET', url)
 if resp.status == 200:
  image = yield from resp.read()
  return image
 elif resp.status == 404:
  raise web.HttpNotFound()
 else:
  raise aiohttp.HttpProcessionError(
   code=resp.status, message=resp.reason,
   headers=resp.headers
  )
@asyncio.coroutine
def download_one(cc, semaphore):
 # semaphore 파라미터는 asyncio.Semaphore 클래스의 인스턴스입니다
 # Semaphore 클래스는 동기 장치로, 동시 요청을 제한하기 위해 사용됩니다
 try:
  with (yield from semaphore):
    # yield from 표현式中 semaphore를 컨텍스트 관리자로 사용하여 전체 시스템을 차단하지 않도록 합니다
    # semaphore 계산자의 값이 허용 가능한 최대 값이면, 그 코루틴만이 차단됨
    image = yield from get_flag(cc)
    # with 문을 벗어났을 때 semaphore 계산자의 값이 감소
    # 다른 semaphore 객체를 기다리는 다른 코루틴 인스턴스를 차단 해제
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  save_flag(image, cc.lower()) + '.gif')}}
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)
@asyncio.coroutine
def downloader_coro(cc_list):
 counter = collections.Counter()
 # asyncio.Semaphore 인스턴스 생성, 최대로 활성화할 수 있는 MAX_CONCUR_REQ 개의 코루틴
 semaphore = asyncio.Semaphore(MAX_CONCUR_REQ)
 # 다중 호출 download_one 코루틴, 코루틴 객체 목록 생성
 to_do = [download_one(cc, semaphore) for cc in sorted(cc_list)]
 # Get an iterator that returns the future after the future runs
 to_do_iter = asyncio.as_completed(to_do)
 for future in to_do_iter:
  # Iterate over the finished future 
  try:
   res = yield from future # Get the result of the asyncio.Future object (can also call future.result)
  except FetchError as exc:
   # All exceptions thrown are wrapped in a FetchError object
   country_code = exc.country_code
   try:
    # Try to get the error message from the original exception (__cause__)
    error_msg = exc.__cause__.args[0]
   except IndexError:
    # If the error message is not found in the original exception, use the class name of the connected exception as the error message
    error_msg = exc.__cause__.__class__.__name__
   if error_msg:
    msg = '*** Error for {}: {}'
    print(msg.format(country_code, error_msg))
   status = HTTPStatus.error
  else:
   status = res.status
  counter[status] += 1
 return counter
def download_many(cc_list):
 loop = asyncio.get_event_loop()
 coro = downloader_coro(cc_list)
 counts = loop.run_until_complete(coro)
 loop.close()
 return counts
if __name__ == '__main__':
 main(download_many)

코루틴에서 시작하는 요청이 빠르기 때문에, 서버에 많은 동시 요청을 보내서 서버 과부하를 방지하기 위해, download_coro 함수에서 asyncio.Semaphore 인스턴스를 생성한 후 download_one 함수에 전달합니다.

<secion class="caption">Semaphore</section> 객체는 내부 카운터를 유지하며, 객체에서 .acquire() 코루틴 메서드를 호출하면 카운터가 감소합니다. 객체에서 .release() 코루틴 메서드를 호출하면 카운터가 증가합니다. 카운터의 값은 초기화 시 설정됩니다.

카운터가 0보다 크면 .acquire() 메서드 호출은 블록킹되지 않으며, 카운터가 0이면 .acquire() 메서드는 이 메서드를 호출하는 코루틴을 블록킹합니다. 다른 코루틴이 동일한 Semaphore 객체에서 .release() 메서드를 호출하여 카운터를 증가시키기까지 블록됩니다.

위의 코드에서 우리는 직접 .acquire() 또는 .release() 메서드를 호출하지 않았습니다. 오히려 download_one 함수에서 semaphore를 컨텍스트 관리자로 사용했습니다:

with (yield from semaphore):
 image = yield from get_flag(cc)

이 코드는 언제든지 MAX_CONCUR_REQ 개 이상의 get_flag 코루틴이 시작되지 않도록 보장합니다.

asyncio.as_completed 함수 사용

yield from을 사용하여 asyncio.as_completed 함수에서 future 결과를 가져오기 때문에, as_completed 함수는 코루틴에서 호출되어야 합니다. download_many 함수가 비코루틴의 main 함수에 전달될 수 있도록 하기 위해 새로운 downloader_coro 코루틴을 추가했습니다. 이렇게 하면 download_many 함수는 이벤트 루프를 설정하는 데 사용됩니다.

Executor 객체를 사용하여 이벤트 루프를 블록킹 방지

이제 위의 컴퓨터가 다른 저장 매체에서 데이터를 읽는 지연 상황 그래프를 다시 보겠습니다. 실시간으로 주의해야 할 것은 로컬 파일 시스템에 접근하는 것도 블록킹된다는 것입니다.

위의 코드에서 save_flag 함수는 클라이언트 코드와 asyncio 이벤트 루프가 공유하는 유일한 스레드를 블록킹하여 파일을 저장할 때 전체 애플리케이션이 중지됩니다. 이 문제를 피하기 위해 이벤트 루프 객체의 run_in_executor 메서드를 사용할 수 있습니다.

asyncio 이벤트 루프는 백그라운드에서 ThreadPoolExecutor 객체를 유지하고 있으며, run_in_executor 메서드를 호출하여 호출 가능한 객체를 전달하여 실행할 수 있습니다.

아래는 변경된 코드입니다:

@asyncio.coroutine
def download_one(cc, semaphore):
 try:
  with (yield from semaphore):
   image = yield from get_flag(cc)
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  # 여기는 변경된 부분입니다
  loop = asyncio.get_event_loop() # 이벤트 루프 참조를 가져옵니다
  loop.run_in_executor(None, save_flag, image, cc.lower()) + '.gif')}}
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)

run_in_executor 메서드의 첫 번째 매개변수는 Executor 인스턴스입니다. None으로 설정되면 이벤트 루프의 기본 ThreadPoolExecutor 인스턴스를 사용합니다.

콜백에서 future로부터 코루틴으로

코루틴에 대해 알기 전에는 콜백에 대해 몇 가지 이해가 있었을 수 있습니다. 그렇다면 콜백과 비교해보면 코루틴은 어떤 개선을 가져왔을까요?

Python에서의 콜백 코드 스타일:

def stage1(response1)
 request2 = step1(response1)
 api_call2(request2, stage2)
def stage2(response2)
 request3 = step3(response3)
 api_call3(request3, stage3) 
 def stage3(response3)
  step3(response3) 
api_call1(request1, stage1)

위 코드의 문제점:

  1. 콜백 지옥이 발생할 가능성이 높습니다
  2. 이 코드는 읽기 어렵습니다

이 문제에서 코루틴은 매우 중요한 역할을 합니다. 코루틴과 yield from을 사용하여 비동기 코드를 작성한 예제는 다음과 같습니다:

@asyncio.coroutine
def three_stages(request1)
 response1 = yield from api_call1(request1)
 request2 = step1(response1)
 response2 = yield from api_call2(requests)
 request3 = step2(response2)
 response3 = yield from api_call3(requests)
 step3(response3) 
loop.create_task(three_stages(request1)

이전 코드와 비교해보면 이 코드는 훨씬 이해하기 쉽습니다. asyncio를 사용하여 api_call에 대한 비동기 호출을 하면:1,api_call2,api_call3 예외가 발생하면 해당 yield from 표현식을 try 블록에 넣을 수 있습니다./except 블록에서 예외를 처리합니다.

코루틴을 사용하려면 yield from 표현식에 익숙해야 하며, 코루틴은 직접 호출할 수 없습니다. 명시적으로 코루틴 실행 시간을 설정하거나 다른 실행 시간이 설정된 코루틴에서 yield from 표현식을 사용하여 활성화하도록 하세요. 예를 들어 loop.create_task(three_stages(request1))이면 어떤 일도 일어나지 않습니다.

아래에서 실제 예제를 통해 보여드리겠습니다:

각 번호로 다운로드할 때마다 여러 번 요청을发起합니다

위의 국기 다운로드 코드를 수정하여 국기 다운로드와 함께 국가 이름을 가져와 이미지 저장 시 사용하도록 합니다.
이 문제를 해결하기 위해 코루틴과 yield from을 사용합니다:

@asyncio.coroutine
def http_get(url):
 resp = yield from aiohttp.request('GET', url)
 if resp.status == 200:
  ctype = resp.headers.get('Content-type', '').lower()
  ctype에 'json'이 포함되거나 url이 'json'으로 끝나는 경우:
   data = yield from resp.json()
  else:
   data = yield from resp.read()
  return data
 elif resp.status == 404:
  raise web.HttpNotFound()
 else:
  raise aiohttp.HttpProcessionError(
   code=resp.status, message=resp.reason,
   headers=resp.headers)
@asyncio.coroutine
def get_country(cc):
 url = ""/{cc}/metadata.json".format(BASE_URL, cc=cc.lower())
 metadata = yield from http_get(url)
 return metadata['country']
@asyncio.coroutine
def get_flag(cc):
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 return (yield from http_get(url))
@asyncio.coroutine
def download_one(cc, semaphore):
 try:
  with (yield from semaphore):
   image = yield from get_flag(cc)
  with (yield from semaphore):
   country = yield from get_country(cc)
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  country = country.replace(' ', '_')
  filename = '{}--{}.gif'.format(country, cc)
  print(filename)
  loop = asyncio.get_event_loop()
  loop.run_in_executor(None, save_flag, image, filename)
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)

이 구간 코드에서, download_one 함수에서 semaphore가 제어하는 두 개의 with 블록에서 get_flag과 get_country를 호출하여 시간을 절약하는 것입니다

get_flag의 return 문은 외부에 괄호를 추가하여, ()의 연산자 우선순위가 높아 괄호 내의 yield from 문이 먼저 실행되고, return된 결과를 먼저 처리하게 되며, 이렇게 하지 않으면 문법 오류가 발생합니다

()를 추가하면, 이는 다음과 동일합니다

image = yield from http_get(url)
return image

()가 없으면 프로그램은 yield from에서 중단되고, 제어권을 내려주며, 이 때 return을 사용하면 문법 오류가 발생합니다.

요약

이 편에서 우리는 다음을 논의했습니다:

  1. 다중 스레드 프로그램과 asyncio 버전을 대조하여, 다중 스레드와 비동기 작업 간의 관계를 설명했습니다
  2. asyncio.Future 클래스와 concurrent.futures.Future 클래스의 차이점을 비교했습니다
  3. 비동기 프로그래밍을 사용하여 네트워크 애플리케이션에서 고성능을 관리하는 방법
  4. 비동기 프로그래밍에서, 콜백 대비하여, 커루틴이 성능을 극대화하는 방법

이것이 본 글의 전부입니다. 많은 도움이 되길 바랍니다. 또한, 나르스 튜토리얼을 많이 지지해 주시기 바랍니다.

고지: 본 내용은 인터넷에서 수집된 것이며, 원작자의 소유물입니다. 인터넷 사용자가 자발적으로 기여하고 업로드한 내용이며, 이 사이트는 소유권을 가지지 않으며, 인공적으로 편집된 것이 아니며, 관련 법적 책임을 부담하지 않습니다. 저작권 위반이 의심되는 내용이 있으면 notice#w로 이메일을 보내 주세요.3codebox.com에 이메일을 보내면 (#을 @으로 변경해 주세요) 신고를 하고, 관련 증거를 제공하면, 해당 내용이 위반되면 즉시 내용을 삭제할 것입니다.

추천해드립니다