프로그래밍/Python

[Python] zstandard로 dictionary list 압축하기

Lou Park 2022. 6. 26. 15:25

# ZstdHelper

아래 내용을 토대로만든 Zstd 헬퍼 클래스입니다.

# 압축
ZstdHelper().compress(dictlist)
# 압축해제
dictlist = ZstdHelper().decompress(filename)

 

# pickle?

파이썬에서 dictionary list를 파일로 저장하고, 다시 읽으려고 할때 일반적으로는 간편한 pickle을 이용한다. 10만개의 딕셔너리를 담고있는 파이썬 리스트를 파일로 저장하고, 읽는 예시 코드를 보자.

if __name__ == '__main__':
    result = list()
    
    for i in range(1, 100000):
        result.append({"indexofitem": i})
        
    pickle_test(result)
def pickle_test(dictlist: list):
    parent_dir = os.path.join(os.getcwd(), "my_game", "backup")
    result_filename = os.path.join(parent_dir, "pickle")
    
    # pickle write
    with open(result_filename, "wb") as f:
        pickle.dump(dictlist, f)
        
    
    # pickle read    
    with open(result_filename, "rb") as f:
        result = pickle.load(f)
    
    print(result[191]["indexofitem"])
    # prints: 192

무사히 저장되고, 값을 제대로 읽을 수 있다. 하지만 저장된 파일인 "pickle"의 파일 사이즈는 어떠할까? 무려 849KB이다. 키값이 하나인 딕셔너리 10만개를 저장했을 뿐인데, 용량은 아주 크다.

 

# zstandard

facebook/zstandard

zstd라고 불리는 zstandard는 페이스북에서 만든 빠른 실시간 압축 알고리즘이다. 이 알고리즘을 이용해 파일을 압축시킬 수 있는 zstandard 파이썬 모듈이 존재하는데, 이를 이용해 딕셔너리 리스트를 바로 파일로 압축하고, 읽어보려고 한다. 먼저 모듈을 설치하자!

pip install zstandard

 

zstd에서 제공하는 여러가지 클래스가 있지만, 가장 중요한건 압축하는 ZstdCompressor와 압축을 푸는ZstdDecompressor이다. 하나씩 코드를 통해 살펴보겠다.

 

ZstdCompressor

ZstdCompressor.compress(data)를 통하면 binary 데이터를 바로 압축할 수 있지만, 큰 데이터를 다룰때는 메모리 사용량이 엄청나서 권장하지 않는 방법이다. 내가 유용하다고 생각한 메소드는 바로 copy_stream인데, 입력과 출력 파일 스트림을 각각 받아서 입력을 받고 압축후에 바로 출력해버린다.  

 

공식문서에서의 예제코드는 다음과 같다.

>>> cctx = zstandard.ZstdCompressor()
>>> with open(input_path, "rb") as ifh, open(output_path, "wb") as ofh:
...     cctx.copy_stream(ifh, ofh)

 

이미 만들어진 파일이 있다면 이 메소드를 바로 이용하면된다. 하지만 나는 딕셔너리 리스트를 스트림으로 변환하기 위해서 추가적인 조치가 필요했다. 

 

IterStreamer

이것은 StackOverFlow에 있는 소스를 약간 변형시킨건데, next()와 self.leftover부분만 binary를 사용하도록 변경했다. next()가 호출되면, 딕셔너리를 string으로 변환하고, 뒤에 개행문자(\r\n)를 더해서 binary로 인코드하여 리턴한다.

class IterStreamer(object):
    """
    File-like streaming iterator.
    """
    def __init__(self, generator):
        self.generator = generator
        self.iterator = iter(generator)
        self.leftover = b''

    def __len__(self):
        return self.generator.__len__()

    def __iter__(self):
        return self.iterator

    def next(self):
        return (json.dumps(self.iterator.__next__(), ensure_ascii=False) + "\r\n").encode('utf-8')

    def read(self, size):
        data = self.leftover
        count = len(self.leftover)

        if count < size:
            try:
                while count < size:
                    chunk = self.next()
                    data += chunk
                    count += len(chunk)
            except StopIteration:
                pass

        self.leftover = data[size:]

        return data[:size]

IterStreamer를 이용해 딕셔너리 리스트를 스트림으로 변환할 수 있게 되었으니 똑같은 데이터로 ZstdCompressor.copy_stream을 이용해 압축해보자. 

def zstd_compress(dictlist: list):
    parent_dir = os.path.join(os.getcwd(), "my_game", "backup")
    result_filename = os.path.join(parent_dir, "zstdcomp")
    
    stream = IterStreamer(dictlist)
        
    cctx = zstd.ZstdCompressor()
    with open(result_filename, "wb") as ofh:
        cctx.copy_stream(stream, ofh)

if __name__ == '__main__':
    result = list()
    
    for i in range(1, 100000):
        result.append({"indexofitem": i})
    
    zstd_compress(result)

압축한 zstdcomp 파일의 용량은 49KB이다. 무려 94%의 용량이 절감되었다. 

 

ZstdDecompressor

이제 읽어야하는데, 읽기는 좀.. 코드가 더럽긴하다. 나중에 정리가되면 다시 github에 다듬어서 올리겠다. read 바이트 청크단위는 몇 번 돌려보고 적절한 크기로 잘랐으나 원하는 숫자로 적어주면된다.

def zstd_decompress():
    parent_dir = os.path.join(os.getcwd(), "my_game", "backup")
    result_filename = os.path.join(parent_dir, "zstdcomp")
    
    result = list()
    dctx = zstd.ZstdDecompressor()
    with open(result_filename, "rb") as f:    
    	# 버퍼
        current = ''
        with dctx.stream_reader(f) as reader:                
            for chunk in iter(lambda: reader.read(512), ''):
                if len(chunk) == 0:
                    break
                current += chunk.decode('utf-8')
                while True:
                	# 개행문자 찾기
                    lbps = current.find('\r\n')
                    if lbps == -1:
                        break
                    # 개행문자가 있다면 해당 부분까지 잘라서 list에 더한다.
                    for obj in current[:lbps].split('\r\n'):
                        result.append(json.loads(json.loads(obj.replace("'", "\""))))
                    # 버퍼에는 이후의 데이터만 남도록한다.
                    current = current[(lbps + len('\r\n')):]
                    
    print(result[590]["indexofitem"])
    # prints: 591

stream_reader를 사용해서 read하는 방법대신 for chunk in dctx.read_to_iter(f, read_size=512) 같은방식으로 좀더 깔끔하게 코드를 짤 수 있지만, 속도문제로 저렇게 쓰게 되었다. (실행시간 대략 0.1초 더 증가함)

 

# 실행시간 비교

zstd는 0.6초, pickle은 0.06초로 피클이 10배 빨랐다. 10배빠르지만 10배 많은 용량, 10배 느리지만 10배 적은 용량의 TradeOff를 고려해서 사용해야할 것 같다.

591
# zstd: 0.6090011596679688 s
192
# pickle: 0.06699967384338379 s

 

*zstandard 문서 https://python-zstandard.readthedocs.io/en/latest/