논문 정리 - MapReduce: Simplified Data Processing on Large Clusters


Abstract
맵리듀스는 큰 데이터 셋을 다루는 프로그래밍 모델이다. 사용자는 map 함수를 통해서 키/밸류 쌍을 이용하여 intermediate key/value pair를 만들고 reduce 함수를 통해서 그 intermediate pair들을 같은 key 별로 합쳐준다. 많은 일들이 이런 모델을 통해 표현될 수 있다.
이 모델은 분산 시스템이나 병렬 시스템에 대한 경험이 부족하더라도 분산 시스템의 자원을 쉽게 사용할 수 있게 해준다.

1. introduction
데이터가 많아지면서 reasonable time 안에 작업을 끝내기 위해 수백 혹은 수천대의 기계에 분산되어야 했다. 이 때 어떻게 연산을 병렬화하고, 데이터를 분산하고, 실패 작업을 어떻게 처리할지가 이슈가 되었다.
이런 복잡한 문제들을 해결하기 위해 병렬화의 세세한 부분들은 숨기고 fault tolerance하며 데이터를 분산시켜주고 로드 밸런싱을 해주는 라이브러리를 통해서 연산을 맵과 리듀스로 간단히 표현하도록 했다.

2. Programming Model
사용자가 직접 만드는 맵 함수는 입력으로 key/value pairs를 받아서 intermediate key/value pairs를 만든다. 그러면 intermediate key/value pairs에 대해서 맵리듀스 라이브러리가 같은 key 별로 value들을 묶어서 리듀스 함수로 보내준다. 마찬가지로 사용자가 직접 만드는 리듀스 함수는 이 데이터를 받아서 key 별로 묶인 value들을 합쳐서 표현한다. 리듀스 함수에 전달되는 intermediate 데이터는 iterator의 형태로 전달되기 때문에 메모리에 다 안 들어갈 정도로 큰 데이터도 처리할 수 있다.
예를 들어서, 어떤 문서에 있는 각 단어들이 몇 번 나왔는지 세는 프로그램을 생각해보자. 그렇다면 맵 함수는 입력으로 <문서 이름, 문서 내용><key, value> pair를 받을 수 있을 것이다. 이 입력에 대해서 맵 함수가 value의 각 단어에 대해서 <word, 1>intermediate key/value pair를 생성한다. 그러면 이 intermediate 데이터가 key 별로 정리돼서 <word, list of counts>의 형태로 리듀스 함수에게 입력돼고 리듀스 함수는 value에 있는 count들을 다 합친 뒤 그 결과를 출력한다.

3. Implementation
맵 동작은 다수의 기계에 분산되어 있다. 자동으로 입력 데이터를 M개의 split으로 쪼개서. 이 쪼개진 split은 다른 머신들에서 병렬도 처리될 수 있다. 리듀스 작업은 R개로 intermediate key space를 쪼개서 분산된다.

3.1 Execution Overview
Figure 1 참고해서 아래 flow 설명.
1) 라이브러리가 입력 파일을 보통 16~64 MB정도의 사용자가 정한 크기의 조각으로 쪼개서 M개의 split을 만든다. 그 다음 클러스터에 프로그램을 복사하기 시작한다.
2) 복제들 중 하나가 master이고 나머지는 그 master에게 일을 받는 worker가 된다. M개의 맵 작업이 있고 R개의 리듀스 작업이 있을텐데 마스터는 쉬고 있는 worker에게 맵이나 리듀스 작업을 준다.
3) worker가 맵 작업을 맡게 되면 해당하는 split을 읽어서 key/value pair로 맵 함수에 넘겨준다. Intermediate data는 메모리에 저장된다.
4) 주기적으로, 메모리에 있는 데이터는 partitioning 함수에 의해 R개의 영역에 나눠서 로컬 디스크에 저장된다(맵은 R개의 파일을 만드는 것). 저장된 위치는 마스터에게 전달된다.
5) 리듀스 작업을 하는 worker는 마스터에 의해 필요한 데이터가 있는 위치를 전달받게 되고 remote procedure call를 통해서 읽어온다. 모든 intermediate 데이터를 읽어왔으면 key에 따라 sorting을 해서 같은 key들끼리 있도록 한다. 만약 메모리에 담기기 너무 큰 사이즈의 데이터라면 external sort를 사용한다.
6) 리듀스 worker는 정렬된 데이터를 interate하면서 새로운 key를 만날 때마다 그 키와 그에 해당하는 value들의 set을 리듀스 함수에 넘겨준다. 이 리듀스 함수의 결과는 해당하는 리듀스 partitionoutput file에 추가된다.
7) 모든 맵과 리듀스 작업이 끝나면 마스터는 유저 프로그램에게 알린다.
맵리듀스의 결과는 R개의 파일로 나온다. 대부분의 경우는 이 R개의 파일을 하나의 파일로 합칠 필요 없이 다른 맵리듀스나 다른 분산 프로그램의 입력으로 전달한다.


3.2 Master Data Structures
각 맵이나 리듀스 작업들은 idle, in-progress, completed의 상태를 갖고 idle 상태가 아니라면 해당하는 worker machine의 정보까지 갖고 있다. 맵 작업이 끝나게 되면 그 결과를 마스터가 받아서 리듀스 태스크를 하는 worker에게 push해준다.

3.3 Fault Tolerance
Worker Failure
마스터는 모든 worker를 주기적으로 확인(ping)한다. 어떤 worker에게서 일정 시간 이상 응답이 없으면 마스터는 fail으로 인식한다. worker에 의해 처리된 맵 작업들은 다시 처음의 idle 상태로 돌아오고 다시 스케줄링된다. 또한 그 worker에서 처리 중이었던 맵이나 리듀스 작업 또한 idle 상태로 돌아와서 다시 스케줄링된다. 완료된 맵 작업들까지도 다시 실행돼야하는 이유는 그 맵의 결과들이 failed worker의 로컬 디스크에 있기 때문이다. 반면에 완료된 리듀스 작업은 global file system에 저장되기 때문에 다시 실행할 필요가 없다. 하나의 workerfail돼서 다른 worker가 맵 작업을 대신 한다면 리듀스 작업을 하는 worker들도 바뀐 worker에 대해 전달 받고 그 worker에게서 데이터를 읽게 된다.
Master Failure
마스터가 주기적으로 checkpoint를 만들어서 마스터가 죽었을 때 마지막의 checkpoint에서 다시 시작할 수 있다. 하지만 마스터가 하나라면 master fail의 경우 맵리듀스 연산을 취소하고 다시 시작해야한다.
Semantics in the Presence of Failures
만약 작업이 deterministic하다면 이 분산 작업은 non-faulting sequential execution of the entire program과 같은 결과를 가져올 것이다. 맵 작업이 끝나면 workerR개의 임시 파일의 정보와 함께 메세지를 전달한다. 그러면 마스터는 그 정보를 마스터의 data structure에 저장한다. 리듀스 작업이 끝나면 temporary output filefinal output filerename한다. 같은 리듀스 작업이 여러 머신에서 실행되어도 같은 output 파일에 대해 rename call이 일어난다. final file system state는 하나의 리듀스 작업에서 만들어진 데이터만 갖고 있도록 보장한다. 대부분의 작업들은 deterministic하다. 만약 맵이나 리듀스 작업이 non-deterministic하다면 weaker but reasonable semantics를 제공한다. 만약 맵 작업 M과 리듀스 작업 R1, R2가 있다면 R1은 어떤 M의 결과를 읽고 R2는 다른 M의 결과를 읽을 수도 있기 때문에 weaker semantics가 발생한다.

3.4 Locality
맵리듀스 마스터가 입력 파일의 위치 정보를 받은 뒤 해당하는 입력 데이터의 복제본이 있는 머신에서 맵 작업을 스케줄링 한다. 이렇게 함으로써 대부분의 입력 데이터가 locally 읽히고 네트워크 밴드위스를 쓰지 않는다.

3.5 Task Granularity
MRworker의 수보다 훨씬 많을수록 이상적이다. 그럴수록 동적인 로드 밸런싱이 가능해지고 worker fail이 일어나도 fail된 작업들이 다른 worker들에게 나눠질 수 있기에 빠른 회복이 가능하기 때문이다. 하지만 마스터가 O(M+R)의 스케줄링 결정을 해야하고 O(M*R)의 상태를 저장해야하기 때문에 MR이 커지는 데에는 한계가 있다. 상태 저장에는 하나의 task/reduce task pair 1 바이트 정도의 작은 메모리가 필요하긴 하다. 또한 리듀스 작업은 각각 별개의 결과 파일로 생성되기 때문에 R은 사용자가 종종 제한을 둔다. 통상적으로 M은 각각이 대략 16~64MB가 되도록 골라서 locality가 효율적으로 활용될 수 있도록 한다. 그리고 Rworker 수의 2~3배 정도로 설정한다.

3.6 Backup Tasks
전체 시간을 늦추는 흔한 원인 중 하나는 어떤 태스크를 할 때 마지막 몇 step들을 비정상적으로 길게 처리하는 머신(straggler)이다. 디스크 문제일 수도, 캐시 문제일 수도 있다. 이런 straggler의 방해를 줄이기 위해서 마스터가 어떤 작업이 끝나갈 때 남은 in-progress 작업에 대해 backup execution을 스케줄링한다. 그래서 그 작업은 원래의 처리나 백업 처리 둘 중 하나가 완료가 되면 completed상태가 된다. 이 메커니즘은 자원을 단지 몇 퍼센트도 소모하지 않지만 backup task 메커니즘을 사용하지 않을 경우 44퍼센트까지 느려지는 경우도 있었다.


4. Refinements
유용한 extensions.

4.1 Partitioning Function
intermediate data에 대해 partitioning하는 함수는 defaultmod를 이용한 해싱이 설정돼있다.. 하지만 단순히 mod말고 다른 함수가 유용할 때가 있다. 만약 입력이 url 주소이고 리듀스의 결과 파일이 같은 hostname 별로 생성되길 바란다면 partitioning 함수를 그게 맞게 수정해야한다.

4.2 Ordering Guarantees
partition에 대해서 intermediate key/value pair가 오름차순으로 정렬이 돼 있다. 이로써 결과 파일에서 key 기반 lookup도 효율적이다.

4.3 Combiner Function
어떤 경우에는 리듀스 작업이 commutative and associative하고 맵 작업에서 intermediate key가 매우 반복적으로 나올 때가 있다. 만약 word count 예시를 본다면, <the, 1>pair는 매우 많이 나올 것이고 이 데이터들이 네트워크를 통해서 리듀스 함수로 전달될 것이다. 그래서 Combiner 함수를 만들어서 네트워크로 보내기 전에 일부분을 합치는 기능을 제공한다. 컴바이너 함수는 맵 작업을 수행하는 머신에서 실행된다. 보통은 컴바이너와 리듀스 둘 다 같은 코드를 사용한다. 차이는 단순히 그 함수의 결과를 맵리듀스 라이브러리가 어떻게 처리하느냐이다. 리듀스의 결과는 final output file에 쓰여지고, combiner 함수의 결과는 intermediate file에 쓰여진다.

4.4 Input and Output Types
맵리듀스 라이브러리는 입력 데이터를 다양한 포맷으로 읽는 것을 지원한다. 텍스트 파일을 읽을 때 key는 파일의 offset이고 value는 각 줄의 내용이 될 수 있다.

4.5 Side-effects
한 작업에 대해 하나의 결과 파일 말고 또 다른 보조의 결과 파일을 만들고 싶어할 수 있다. (부족)

4.6 Skipping Bad Records
작성한 코드에 버그가 있어서 맵이나 리듀스 함수에 deterministic하게 crash할 수가 있다. 이런 버그가 맵리듀스 작업을 끝내는 데에 방해를 한다. 버그를 고치는 게 기본이긴 하지만 힘들 때가 있다. 맵리듀스 라이브러리가 어떤 record에서 deterministic crash를 유발하는지를 감지하고 이 record들을 건너 뛰는 모드를 제공한다. (원리)

4.7 Local Execution
연산들이 마스터에 의해 동적으로 분산되어 처리되므로 디버깅이 까다롭다. 이런 점을 보완하기 위해 로컬 머신에서 맵리듀스 작업을 sequentially 할 수 있는 라이브러리를 만들었다. 사용자가 특정 작업들에 한정시켜서 실행시킬 수 있고 사용자가 gdb 등의 툴을 이용해서 디버깅한다.

4.8 Status Information
마스터는 내부의 HTTP 서버를 실행시켜서 사람이 볼 수 있는 상태 페이지를 띄운다. 어떤 작업들이 완료됐고 어떤 작업이 진행 중이고 데이터들의 크기, 진행률, worker fail, task fail 등이 나온다.

4.9 Counters
처리되는 입력 key/value pair의 수나 출력 수 등은 자동으로 counting이 되고, 자동으로 counting 되지 않는 경우에도 프로그램에 counting을 하는 코드를 추가할 수 있다. 인풋과 아웃풋의 수가 같은지 등을 체크하는 등, sanity check에 쓰일 수 있다.


8. Conclusion
맵리듀스의 성공에는 다음과 같은 이유가 있다. 첫번째로, 디테일들은 라이브러리에 숨겨 놓고 모델이 사용하기 쉬워서 분산이나 병렬 프로그래밍에 경험이 없는 프로그래머도 사용할 수 있다. 두번째로, 소팅, 데이터 마이닝, 머신 러닝 등 많은 문제들이 맵리듀스 연산으로 표현될 수 있다. 세번째로, 수 천대의 머신들로 이루어진 클러스터에 적용될 수 있다.
이 연구로 배운 점들도 있다. 첫째, 프로그래밍 모델을 제한시키는 것이 병렬화나 분산화 하기 쉽고 fault tolerant한 연산을 만들 수 있다. 둘째, 네트워크 bandwidth는 부족한 자원이어서 이 부분을 최적화하는 것이 중요하다. 셋째로는 중복된 작업은 느린 머신의 영향을 줄이거나 machine failure에 사용될 수 있다.

댓글

이 블로그의 인기 게시물

논문 정리 - The Google File System

kazoo: Using zookeeper api with python