kazoo: Using zookeeper api with python

```
pip3 install kazoo
```
https://kazoo.readthedocs.io/en/latest/basic_usage.html
https://kazoo.readthedocs.io/en/latest/api/client.html

### 접속

```
from kazoo.client import KazooClient

zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
```
호스트에 접속할 때 default port는 2181이다.

### 상태 Listener
zookeeper의 상태가 변하는 것을 알 수 있다.
```
from kazoo.client import KazooState

def my_listener(state):
    if state == KazooState.LOST:
        # Register somewhere that the session was lost
    elif state == KazooState.SUSPENDED:
        # Handle being disconnected from Zookeeper
    else:
        # Handle being connected/reconnected to Zookeeper

zk.add_listener(my_listener)
```

LOST
CONNECTED
SUSPENDED
의 state를 가질 수 있다.

가능한 상태 변화
- LOST -> CONNECTED
- CONNECTED -> SUSPENDED
- CONNECTED -> LOST
- SUSPENDED -> LOST
- SUSPENDED -> CONNECTED

### CRUD

- `zk.create("/my/favorite/node", b"a value")`
`create(path, value='', acl=None, ephemeral=False, sequence=False, makepath=False)`
makepath 옵션이 True가 아니라면 경로가 존재해야만 가능하다.
data는 byte string을 받는다.
Returns: 새로 생성된 노드의 path. string type.
이미 있는 노드면 NodeExistsError
1MB 이상의 크기의 노드를 생성하려 하면 ZookeeperError
- `if zk.exists("/my/favorite"):`
- `data, stat = zk.get("/my/favorite")`
stat에 노드 정보가 있고 data에 데이터가 있다.
- `children = zk.get_children("/my/favorite")`
children의 리스트를 받는다.
- `zk.set("/my/favorite", b"some data")`
노드의 데이터를 업데이트한다. 
- `zk.delete("/my/favorite/node", recursive=True)`
모든 children을 지우려면 recursive 옵션을 넣는다.
- `result = zk.retry(zk.get, "/path/to/node")`
kazoo는 기본적으로 retry을 하지 않기 때문에 retry()를 이용해서 연결 문제에 대해 처리할 수 있다.


### Watchers

watch function을 설정하면 노드나 노드의 children에 변화가 있으면 trigger된다.

아래와 같이 get(), get_children(), exists() 함수에 watch function을 넘길 수 있다.
```
def my_func(event):
    # check to see what the children are now

# Call my_func when the children change
children = zk.get_children("/my/favorite/node", watch=my_func)
```
해당 노드 데이터가 변하거나 노드 자체가 삭제될 때 WatchedEvenet instance로 전달된다.

ChildrenWatch, DataWatch 메소드는 변화가 있을 때마다 불리거나 function이 False를 return할 때까지 불린다.
allow_session_lost가 True가 되면 함수는 세션이 끊겼을 때 더 이상 불리지 않는다.
```
@zk.ChildrenWatch("/my/favorite/node")
def watch_children(children):
    print("Children are now: %s" % children)
# Above function called immediately, and from then on

@zk.DataWatch("/my/favorite")
def watch_node(data, stat):
    print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))
```



---

 ## 예제

### example

```
(env) [irteam@css1381 kazoo]$ cat zk.py
from kazoo.client import KazooClient
import sys
import time
import datetime

zk = KazooClient(hosts="zk-etcd1.bdp.bdata.ai,zk-etcd2.bdp.bdata.ai,zk-etcd3.bdp.bdata.ai")
zk.start()

print("zk state is")
print(zk.state)


lt = time.localtime(time.time())
current_time = str(lt.tm_year) + str(lt.tm_mon) +str(lt.tm_mday) +str(lt.tm_hour) +str(lt.tm_min) +str(lt.tm_sec)

new_path = zk.create("/c3-assigner/1381", current_time.encode())
print(new_path)

if zk.exists('/c3-assigner'):
    children = zk.get_children('/c3-assigner')
    print(children)

zk.stop()
```

```
(env) [irteam@css1381 kazoo]$ python3 zk.py
zk state is
CONNECTED
/c3-assigner/1381
['1381']
```

```
[zk: localhost:2181(CONNECTED) 2] ls /c3-assigner
[1381]
[zk: localhost:2181(CONNECTED) 3] get /c3-assigner/1381
202093202824
cZxid = 0x1001e057ca
ctime = Thu Sep 03 20:28:24 KST 2020
mZxid = 0x1001e057ca
mtime = Thu Sep 03 20:28:24 KST 2020
pZxid = 0x1001e057ca
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 12
numChildren = 0
```

만든 순서대로가 지켜질까 리스트에? 그러면 list[0]으로 락 확인할 수 있다.




## 분산 lock 구현하기

## Apache document: Locks

https://zookeeper.apache.org/doc/r3.4.6/recipes.html#sc_recipes_Locks

### 구현 방법

1. "_locknode_/lock-" 의 위치에 create() 를 실행한다.
sequence와 ephemeral 플래그가 설정된다.
2. watch flag 없이 getChildren()을 lock node에 실행한다.
3. 1번 단계에서 만들어진 pathname이 가장 낮은 인덱스에 있다면 그 클라이언트는 락을 갖게 되고 프로토콜을 빠져나온다.(작업 수행 후 락을 버리라는 뜻인듯)
4. 클라이언트는 그 lock 디렉토리에서 그 다음으로 낮은 인덱스에 exists() 를 watch flag와 함께 호출한다.
5. exists()가 false를 return하면 2번 단계로 간다. 아니면 알림이 오고나서 2번으로 간다.

lock을 갖고 있던 클라이언트가 lock을 버리는 건 단순히 1번에서 만들어진 노드를 delete하면 된다.

특징이 몇 가지 있다.
- 노드 삭제가 일어나면 오직 하나의 클라이언트에게만 신호가 간다. 왜냐하면 각 노드는 하나의 클라이언트에게만 watch 가 되고 있기 때문이다. 이렇게 함으로써 herd effect를 피할 수 있다.
- polling이나 timeout이 발생하지 않는다.
- lock을 구성함으로써 lock contention, break locks 등을 확인하기가 쉽다.

### 참고 - znode 종류에 대해
https://blog.seulgi.kim/2014/05/zookeeper-1-znode-zookeeper-data.html

- persistent vs ephemeral
life cycle에 관한 설정이다. 
persistent는 직접 삭제하기 전까지는 없어지지 않는다. 
ephemeral은 클라이언트와 서버 사이의 connection이 종료되면 자동으로 사라진다.
- sequence
uniqueness를 보장하기 위한 옵션이다.
sequence 모드로 만들어진 znode는 주어진 읾 뒤에 int 범위의 숫자 10개가 postfix로 붙는다.
이 숫자는 atomic하게 증가하여, 같은 이름으로 만든 znode라도 다른 이름의 znode로 만들어진다.

ephemeral + sequence로 노드를 생성하는 것은 문제가 생길 수 있다. 
생성을 요청하고 connection이 끊어진다면 생성이 성공했는지 실패했는지 응답을 받지 못하고 timeout이 발생한다.
znode가 생성되었는지를 확인하면 성공인지 실패인지 알 수 있는데 노드 이름을 서버에서 정하기 때문에 클라이언트는 성공 여부를 알 수가 없다.
curator 라이브러리에서 이를 위해 protect mode라는 것을 도입했다고 한다.


### shared lock

lock protocol에서 몇 가지만 바꾸면 shared lock을 구현할 수 있다.


Reference

https://kazoo.readthedocs.io/en/latest/basic_usage.html

https://kazoo.readthedocs.io/en/latest/api/client.html


댓글

이 블로그의 인기 게시물

논문 정리 - MapReduce: Simplified Data Processing on Large Clusters

논문 정리 - The Google File System