2020.02.17 Python의 multiprocessing 중 Pool.map(), chunksize에 관한 내용

먼저 동시성에 대한 개념, 쓰레드, 프로세스, 비동기에 대한 내용은 알고 있다고 가정하고 실제 코드 사용법에 대해 적어두었습니다.


* Pool.map() 

def _map():
    pool = multiprocessing.Pool()
    pool.map(summing, [10_000_000 + i for i in range(5)])
    pool.close()
    pool.join()   # optional (하단에서 설명)

위에서 볼 수 있듯이 map() 함수는

- Python의 빌트인 함수 map처럼 인자로 실행할 함수, 인자(iterable)를 취한다.
- 프로세스에게 함수와 iterable의 인자를 분배하고 각 프로세스가 그 함수를 처리하도록 한다.
사람 언어로 표현하면 일감을 처리하는 방법과 일감에 필요한 재료를 각 일꾼에게 분배하는 것이라고 생각하면 된다.
- pool을 생성하면 최초에 프로세스의 개수를 지정하거나 위 예시처럼 비어있도록 둘 수 있는데 그냥 비어있도록 두는 경우에는 현재 코드가 돌아가는 machine의 CPU core 갯수만큼의 process가 생성된다.

* 예시) 3곳의 레스토랑에 전화해서 예약을 하려고 하는데 직원 3명(프로세스 3개)에게 이를 시키는 상황을 코드로 표현하면 아래와 같다.

def call_restaurant(phone_num):
    print(f'Calling {phone_num} to make a reservation!')


def make_reservations():
    pool = multiprocessing.Pool()
    pool.map(call_restaurant, [1, 2, 3])      # 전화번호는 귀찮으니 1,2,3이라고 가정했다.
    pool.close()

1) A에게 (전화해서 식당 예약하기, 전화번호1)
2) B에게 (전화해서 식당 예약하기, 전화번호2)
3) C에게 (전화해서 식당 예약하기, 전화번호3)

- 만약 CPU core가 하나이거나, pool을 선언할 때 multiprocessing.Pool(1) 로 선언하면 일반적인 for문과 다를 바 없다.

* pool.join()이 갖는 의미는?

- pool.map()은 join()을 호출하지 않아도 상관없다. join()은 timeout을 인자로 취한다. 이 함수는 '지금부터 Block 하고 현재 pool에 있는 프로세스들의 job을 모두 실행하기' 이다.
- 하지만 map의 경우엔 원래부터 block을 하고 프로세스를 돌리기 때문에 join()이 필요없다. map_async일 때 block한 채로 처리를 원한다면 그 때 join()을 사용하면 된다.


* chunksize: map 함수는 chunksize라는 kwarg를 인자로 취할 수 있다.

- chunksize 인자를 주면, 인자로 제공한 iterable을 더 작은 element를 가진 iterable로 쪼개는데 이 때 쪼개진 각 iterable이 가지는 element 갯수가 바로 chunksize이다.

설명이 쉽도록 위 코드를 예로 표현하자면,

def _map():
    pool = multiprocessing.Pool(1)
    pool.map(summing, [10_000_000 + i for i in range(20)], chunksize=5)
    pool.close()
    pool.join()

- 위 Pool.map() 함수의 본래 실행할 함수는 summing이고 실행할 함수의 인자는[10_000_000 + i for i in range(20)] 이다.
- List comprehension을 unpack하면 총 20개의 element를 지닌 iterable이 된다.
- 이 때 chunksize=5 이므로, 총 20개의 element가 든 하나의 iterable을 잘게 쪼갠다. 이 때 잘게 쪼갠 iterable은 element를 각각 5개씩 가지고 있다.

아래처럼 되는 것이다.

(1번 iterable) [1_000_000 ~1_000_004]
(2번 iterable) [1_000_005 ~1_000_009]
(3번 iterable) [1_000_010 ~1_000_014]
(4번 iterable) [1_000_015 ~1_000_019]

자, iterable은 총 4개가 되었고 이제 프로세스들은 각각 한 iterable씩 챙겨서 일을 수행할 수 있게 되었다.

이 때 Process의 갯수는 쪼개진 iterable의 총 갯수보다 많을 수 없다. (일감이 없는거니까)

만약 위 경우에서 chunksize=10이었다면?

총 20개의 element를 가진 iterable을 10개씩 지닌 iterable로 분리하니까 2개의 iterable이 될테고 따라서 CPU Core가 4든 8이든 16이든, 프로세스는 최대 2개 돌면서 자신의 작업을 수행한다.



댓글

이 블로그의 인기 게시물

로컬 Tomcat 서버 실행속도 개선

2019.05.23 - SQLAlchemy 의 객체 상태 관리 (expire, refresh, flush, commit) 에 관한 이해