명량한 표정과는 다르게 현실 세계에서 두 마리 토끼를 잡기 위해서는 트레이드 오프가 필요한 순간이 온다.
우리는 가난하다. 그래서 서버도 가난한 서버를 쓴다. c6a.large, vcpu는 2개 메모리는 4g인 사양이다. 물론 이보다 더 저사양 서버에서 고군분투하는 분들에게는 이 또한 높은 사양일수도 있겠다. 문제는 어쨌든 이 열악한 서버에서 연산을 진행해야 한다는 의미다. 연산할 것이 별로 많지 않다면 문제가 되진 않는다. 우리가 처리할 대상은 대략 2g 정도의 텍스트 파일이고, 해당 텍스트 파일에서 맵을 추출해서 csv로 생성하는 기능을 한다. 정상 생성된다면 맵은 대략 pickle로 전환했을 때 대략 10g가 조금 넘는 크기로 추정된다. 속도도 문제다. 적어도 5분 내에는 이 작업이 완료되어야 한다. 게다가 우리는 파이썬으로 작업 중이다.
물론 파이썬을 쓰면서도 별로 느리다는 생각은 해보질 않았다. 대체로 그렇게 큰 작업을 수행하는 일은 없기 때문이다. 하지만 일이 커지면 느려지는 건 당연하다. 이 작업은 메모리가 많은 괜찮은 성능 pc에서 수행하더라도 20분이 넘게 걸렸다. 그리고 4g에서는 당연한 얘기겠지만 메모리 부족 오류로 종료됐다. 애쩠든 이런 상황을 우리는 어떻게 해결할 수 있을까?
rust 포팅
rust로 작업을 포팅하면 당연한 소리겠지만 엄청 빨라질 것이다. 일전에 우리는 간단한 python 로그 서버를 rust로 포팅했는데 속도가 무려 77배 빨라졌다. 최악의 경우에는 이 방법 또한 고려 사항 중엔 하나였다. 하지만 어쨌든 최대한 일을 적게하는 방법을 찾아야 했기에 기존 코드를 고치지 않고 수행할 방법을 잦아야 했다.
nuitka
nuitka는 파이썬 코드를 네이티브 코드로 빌드해주는 툴이다. 네이티브 코드로 변환해주니까 뭔가 더 빨라지지 않을까, 라는 생각이 들기 쉽다. 그래서 테스트를 해보았지만 속도 향상은 별로 없었다. 스크립트 코드를 제공하고 싶지 않을 때 선택할 수 있는 선택 사항 정도가 아닌가 싶다.
pypy
pypy는 파이썬의 파이썬 구현체에 jit을 탑재한 인터프리터다. 순수 파이썬 코드가 많으면 jit이 탑재된 pypy가 빠를 수 있다. 테스트 해보니 생각보다 효과는 드라마틱했다. 대략 14.5배 빨라졌다. 여기에 약간 함정이 있는데 pypy에서 일부 파이썬 코드는 cpython 구현보다 느린 경우도 있다는 사실이다. 그래서 많은 반복 작업 속에 그런 코드가 있다면 파이썬 구현을 찾아서 더 빠르게 실행되도록 변경하는 작업을 일부 해줘야 한다. 어쨌든 이런 작업을 진행하고 나면 대체로 아주 빨라진다.
메모리는?
그렇게 빨라진 줄만 알았지 메모리 문제를 놓고 있었던 스크립트는 서버에서 동작하면 메모리 부족 오류가 발생했다. 서버 메모리는 4g, 우리가 실제 동작에 필요한 메모리는 대략 넉넉하게 12g 정도 된다. 물론 이는 우리가 무식하게 맵을 그냥 메모리에 다 올렸기 때문이다. 그런데 또 그렇게 무식하게 쓰자고 파이썬 쓰는 게 아닌가 싶기도 하다. 고상하게 설계하고 고민하고 할 거였음 애초에 더 복잡한 언어로 더 정교하게 만들었을테니 말이다. 어쨌든 이 말도 안되게 부족한 메모리를 극복하는 방법은 없을까, 를 최신 신상 gemini 2.5 pro와 고민해 보았다.
나의 고민에 나온 첫 반응은 shelve였다. 나는 이런게 있는 줄도 몰랐지만 표준 기능으로 있는 도구였다. 맵을 메모리가 아닌 디스크에 기록하게 해준다. 여윽시 인공지능인가를 생각하며 맵을 shelve로 교체했다. 당연하 소리겠지만 메모리는 너무나 작게 썼고 파일 용량은 커져만 갔다. 안타깝게도 속도가 너무 느렸다. shelve는 맵이 업데이트될때마다 파일 io를 동반하기 때문에 어쩔 수가 없는 구조였다. 뭔가 shelve가 메모리를 좀 더 사용하도록 하면 좋을 것 같아서 그런 기능에 대해 문의했으나 그런 기능은 존재하질 않았다. 하지만 녀석은 최신 신상 인공지능 답게, 직접 구현한 구현체를 주었다. CachedShelve. 캐시맵을 추가해서 캐시에서 발견되는 경우는 캐시를 사용하도록 변경한 구조였다. 캐시를 추가하니 눈에 띄게 빨라졌지만 우리의 요구 사항을 충족하긴 힘들었다. 역시나 두마리 토끼를 손쉽게 잡는 은탄환 따위는 존재하지 않았다. 결국 불필요한 요소를 제거해서 메모리를 덜 사용하는 식으로 코드를 변경했다.
import shelve
from collections import OrderedDict
import time # 캐시 효과 시연용
class CachedShelve:
"""
shelve 객체를 위한 LRU 읽기 캐시를 제공하는 래퍼 클래스.
컨텍스트 관리자 프로토콜을 지원합니다 ('with' 문 사용 가능).
"""
def __init__(self, filename, cache_size=100, flag='c', writeback=False):
"""
CachedShelve를 초기화합니다.
Args:
filename (str): shelve 파일 경로.
cache_size (int): 캐시에 저장할 최대 항목 수 (LRU). 0보다 커야 합니다.
flag (str): shelve.open()에 전달할 플래그 ('c', 'n', 'w', 'r').
writeback (bool): shelve.open()에 전달할 writeback 플래그.
True로 설정 시 이 클래스의 캐싱 이점이 줄어들 수 있으며,
메모리 사용 및 데이터 유실 위험이 증가합니다. 비권장.
"""
if cache_size <= 0:
raise ValueError("Cache size must be a positive integer.")
self.filename = filename
self.cache_size = cache_size
self.flag = flag
self.writeback = writeback # shelve 자체의 writeback 사용 여부
self._db = None
# LRU 캐시를 위해 OrderedDict 사용
self._cache = OrderedDict()
print(f"CachedShelve initialized for '{filename}' with cache size {cache_size}")
def __enter__(self):
"""컨텍스트 관리자 진입: shelve 파일을 열고 캐시를 비웁니다."""
print(f"Opening shelve '{self.filename}' (writeback={self.writeback})...")
# writeback 옵션도 shelve.open에 전달
self._db = shelve.open(self.filename, flag=self.flag, writeback=self.writeback)
self._cache.clear()
print("Shelve opened and cache cleared.")
return self # 'with ... as cs:' 에서 cs 변수에 할당될 객체
def __exit__(self, exc_type, exc_val, exc_tb):
"""컨텍스트 관리자 종료: shelve 파일을 닫고 캐시를 비웁니다."""
print(f"Closing shelve '{self.filename}'...")
if self._db is not None:
try:
start_close = time.time()
self._db.close() # writeback=True인 경우 시간이 걸릴 수 있음
end_close = time.time()
print(f"Shelve closed (took {end_close - start_close:.4f}s).")
except Exception as e:
print(f"Error closing shelve: {e}")
finally:
self._db = None
self._cache.clear()
print("Cache cleared.")
if exc_type:
print(f"Exception occurred in 'with' block: {exc_type.__name__}")
# 예외를 전파시키려면 None 또는 False 반환 (기본값)
return None
def _check_open(self):
"""shelve가 열려 있는지 확인합니다."""
if self._db is None:
raise RuntimeError("Shelve is not open. Use within a 'with' block.")
def __getitem__(self, key):
"""obj[key] 구문 지원 (읽기). 캐시 확인 후 shelve 접근."""
self._check_open()
# 1. 캐시 확인
if key in self._cache:
# 캐시 히트: 값 반환 (LRU 위해 순서 변경)
# print(f"[Cache Hit] Key: '{key}'") # 디버깅용
self._cache.move_to_end(key)
return self._cache[key]
else:
# 2. 캐시 미스: shelve에서 읽기 (느림)
# print(f"[Cache Miss] Key: '{key}', reading from shelve...") # 디버깅용
# shelve에서 값을 읽음 (KeyError 발생 가능)
value = self._db[key]
# 3. 캐시에 추가
self._cache[key] = value
self._cache.move_to_end(key) # 방금 사용했으므로 맨 뒤로
# 4. 캐시 크기 관리 (오래된 항목 제거)
if len(self._cache) > self.cache_size:
removed_key, removed_value = self._cache.popitem(last=False) # 가장 오래된 항목 제거
# print(f"[Cache Evict] Removed '{removed_key}' from cache.") # 디버깅용
return value
def __setitem__(self, key, value):
"""obj[key] = value 구문 지원 (쓰기). shelve에 먼저 쓰고 캐시 업데이트."""
self._check_open()
# 1. shelve에 먼저 쓰기 (영속성 보장)
self._db[key] = value
# print(f"[Shelve Write] Key: '{key}'") # 디버깅용
# 2. 캐시 업데이트 (또는 무효화)
# 간단하게는 캐시에서도 값을 업데이트하고 LRU 순서 조정
self._cache[key] = value
self._cache.move_to_end(key) # 방금 썼으므로 맨 뒤로
# 3. 캐시 크기 관리 (새 항목 추가로 인한 제거)
if len(self._cache) > self.cache_size:
removed_key, removed_value = self._cache.popitem(last=False)
# print(f"[Cache Evict] Removed '{removed_key}' after set.") # 디버깅용
# 대안: 쓰기 시 캐시 항목을 그냥 삭제하여 다음 읽기 시 다시 로드하게 할 수도 있음
# if key in self._cache:
# del self._cache[key]
def __delitem__(self, key):
"""del obj[key] 구문 지원. shelve에서 먼저 삭제 후 캐시에서도 삭제."""
self._check_open()
# 1. shelve에서 먼저 삭제 (KeyError 발생 가능)
del self._db[key]
# print(f"[Shelve Delete] Key: '{key}'") # 디버깅용
# 2. 캐시에서도 삭제 (존재하는 경우)
if key in self._cache:
del self._cache[key]
# print(f"[Cache Delete] Key: '{key}'") # 디버깅용
def __contains__(self, key):
"""'key in obj' 구문 지원. 캐시 먼저 확인 후 shelve 확인."""
self._check_open()
# 캐시에 있으면 True 반환 (빠름)
if key in self._cache:
return True
# 캐시에 없으면 shelve 확인 (느림)
# 이 확인만으로는 캐시에 추가하지 않음
return key in self._db
def get(self, key, default=None):
"""딕셔너리의 get과 유사하게 작동. 키 없으면 default 반환."""
try:
# __getitem__을 사용하여 캐싱 로직 활용
return self[key]
except KeyError:
# __getitem__에서 KeyError가 발생하면 (shelve에 키 없음) default 반환
# print(f"Key '{key}' not found, returning default.") # get 호출 시에만 출력 (필요시 활성화)
return default
except RuntimeError as e: # shelve가 닫혀있을 때
raise e
except Exception as e: # 기타 예상치 못한 오류
print(f"Error in get method for key '{key}': {e}")
return default # 예상치 못한 오류 시에도 default 반환
# --- 추가 유틸리티 메소드 ---
def sync(self):
"""내부 shelve 객체의 sync()를 호출합니다."""
self._check_open()
print("Syncing underlying shelve...")
self._db.sync()
def clear_cache(self):
"""수동으로 캐시를 비웁니다."""
print("Clearing cache manually.")
self._cache.clear()
def __len__(self):
"""len(obj) 지원. 실제 shelve의 크기를 반환합니다 (캐시 크기 아님)."""
self._check_open()
return len(self._db)
def keys(self):
"""
shelve의 키 목록 (뷰)을 반환합니다 (캐시 아님).
이는 실제 shelve 파일의 모든 키를 나타냅니다.
"""
self._check_open()
return self._db.keys()
# --- 추가된 메소드 ---
def values(self):
"""
shelve의 값 목록 (뷰)을 반환합니다 (캐시 아님).
주의: 큰 shelve의 경우 모든 값을 로드해야 하므로 느릴 수 있습니다.
캐시의 이점을 활용하지 않습니다.
"""
self._check_open()
# 실제 shelve 객체의 values() 메소드를 호출하여 반환
return self._db.values()
def items(self):
"""
shelve의 키-값 쌍 목록 (뷰)을 반환합니다 (캐시 아님).
주의: 큰 shelve의 경우 모든 항목을 로드해야 하므로 느릴 수 있습니다.
캐시의 이점을 활용하지 않습니다.
"""
self._check_open()
# 실제 shelve 객체의 items() 메소드를 호출하여 반환
return self._db.items()
# --- 클래스 사용 예제 ---
if __name__ == "__main__":
filename = 'my_cached_shelve.db'
cache_size = 3
# 1. 데이터 쓰기 (with 사용)
print("\n--- Writing data ---")
try:
with CachedShelve(filename, cache_size=cache_size, flag='c') as cs:
cs['a'] = 1
cs['b'] = {'x': 10, 'y': 20}
cs['c'] = [1, 2, 3]
cs['d'] = "hello"
cs['e'] = "world"
print(f"Length of shelve: {len(cs)}")
print(f"Keys in shelve: {list(cs.keys())}") # list()로 변환하여 내용 확인
print(f"Values in shelve: {list(cs.values())}")
print(f"Items in shelve: {list(cs.items())}")
except Exception as e:
print(f"An error occurred during writing: {e}")
# 2. 데이터 읽기 및 캐시 확인 (새로운 with 블록)
print("\n--- Reading data and testing cache ---")
try:
with CachedShelve(filename, cache_size=cache_size, flag='r') as cs: # 읽기 모드
print(f"Accessing 'e': {cs['e']}") # Cache Miss, Load 'e' -> Cache: {'e'}
print(f"Accessing 'd': {cs['d']}") # Cache Miss, Load 'd' -> Cache: {'e', 'd'}
print(f"Accessing 'c': {cs['c']}") # Cache Miss, Load 'c' -> Cache: {'e', 'd', 'c'}
print(f"Accessing 'b': {cs['b']}") # Cache Miss, Load 'b', Evict 'e' -> Cache: {'d', 'c', 'b'}
print(f"Accessing 'e': {cs['e']}") # Cache Miss (evicted), Load 'e', Evict 'd' -> Cache: {'c', 'b', 'e'}
print(f"Accessing 'c': {cs['c']}") # Cache Hit, Move 'c' -> Cache: {'b', 'e', 'c'}
# get() 메소드 테스트
print(f"Getting 'a': {cs.get('a')}") # Cache Miss, Load 'a', Evict 'b' -> Cache: {'e', 'c', 'a'}
print(f"Getting 'z' (non-existent): {cs.get('z', 'Not Found')}")
# __contains__ 테스트
print(f"Is 'a' in cs? {'a' in cs}") # Cache Hit
print(f"Is 'b' in cs? {'b' in cs}") # Cache Miss (evicted), Check in shelve -> True
print(f"Is 'z' in cs? {'z' in cs}") # Check in shelve -> False
# values() 와 items() 사용
print("\n--- Using values() and items() ---")
# 이 메소드들은 캐시를 직접 활용하지 않고 shelve 전체를 대상으로 함
print("All values:", list(cs.values()))
print("All items:", list(cs.items()))
except Exception as e:
print(f"An error occurred during reading: {e}")
# 3. 데이터 삭제 (새로운 with 블록)
print("\n--- Deleting data ---")
try:
# 삭제를 위해 flag='c' 또는 'w' 사용
with CachedShelve(filename, cache_size=cache_size, flag='c') as cs:
print("Current keys:", list(cs.keys()))
# 'a'가 캐시에 있을 수 있음 (마지막 get 호출)
print(f"Cache before del 'a': {list(cs._cache.keys())}")
del cs['a']
print(f"Deleted 'a'. Cache after del: {list(cs._cache.keys())}") # 캐시에서도 삭제됨
print("Keys after deleting 'a':", list(cs.keys()))
# 캐시에 없는 항목 삭제 시도
if 'b' not in cs._cache: print("'b' is not in cache.")
del cs['b']
print("Keys after deleting 'b':", list(cs.keys()))
print(f"Cache after del 'b': {list(cs._cache.keys())}") # 캐시에 없었으므로 변화 없음
except KeyError as e:
print(f"Tried to delete non-existent key: {e}")
except Exception as e:
print(f"An error occurred during deletion: {e}")
# shelve 파일 자체 삭제 (정리)
import os
for ext in ['', '.bak', '.dat', '.dir']: # shelve가 만드는 파일 확장자들
try:
os.remove(filename + ext)
print(f"Removed {filename + ext}")
except FileNotFoundError:
pass
쓰기는?
gemini가 작성해준 코드가 아까워서 글을 올리려고 보니 속도가 느린 이유가 있기도 한 것 같다. set부분은 그냥 생각 없이 모두 shelve에 기록하고 있었다. 그러니 get 부분에서만 캐시 히트하는 정도의 속도 향상 밖에는 없었다고 할 수 있겠다. 물론 이 부분도 gemini에 넣으면 알아서 고쳐주겠지? 라는 생각으로 넣어봤다. 그랬더니 쓰기 캐시를 추가한 버전을 만들어 주었다.
import shelve
from collections import OrderedDict
import time
import warnings # 경고 메시지용
# 쓰기 버퍼에서 삭제된 항목을 표시하기 위한 고유 객체
_DELETED_SENTINEL = object()
class CachedShelve:
"""
shelve 객체를 위한 LRU 읽기 캐시 및 쓰기 지연 버퍼를 제공하는 래퍼 클래스.
쓰기 작업은 버퍼에 먼저 기록되고, 컨텍스트 종료, sync() 호출 시,
또는 쓰기 버퍼가 설정된 한계치(write_buffer_limit)에 도달했을 때 shelve 파일에 반영됩니다.
**경고:** 쓰기 지연 방식으로 인해 프로그램이 비정상 종료될 경우,
마지막 동기화 이후의 변경 사항이 유실될 수 있습니다.
shelve.open의 writeback=True 플래그와 함께 사용하는 것은 권장되지 않습니다.
컨텍스트 관리자 프로토콜을 지원합니다 ('with' 문 사용 가능).
"""
def __init__(self, filename, cache_size=100, write_buffer_limit=None, flag='c', writeback=False):
"""
CachedShelve를 초기화합니다.
Args:
filename (str): shelve 파일 경로.
cache_size (int): 읽기 캐시에 저장할 최대 항목 수 (LRU). 0보다 커야 합니다.
write_buffer_limit (int, optional): 쓰기 버퍼에 저장할 최대 항목 수.
None이면 무제한. 이 한계에 도달하면 버퍼 전체가
자동으로 플러시(동기화)됩니다. 0보다 커야 합니다.
flag (str): shelve.open()에 전달할 플래그 ('c', 'n', 'w', 'r').
writeback (bool): shelve.open()에 전달할 writeback 플래그.
**True 설정은 이 클래스의 쓰기 지연 방식과 충돌할 수 있어 강력히 비권장됩니다.**
"""
if cache_size <= 0:
raise ValueError("Cache size must be a positive integer.")
if writeback:
raise ValueError("Cannot use shelve's writeback=True with CachedShelve's write buffering. Set writeback=False.")
if write_buffer_limit is not None and write_buffer_limit <= 0:
raise ValueError("Write buffer limit must be a positive integer or None.")
self.filename = filename
self.cache_size = cache_size
# --- New ---
self.write_buffer_limit = write_buffer_limit
# -----------
self.flag = flag
self.shelve_writeback = False # Always False for internal shelve
self._db = None
self._cache = OrderedDict()
self._write_buffer = {}
limit_msg = f"write buffer limit={self.write_buffer_limit}" if self.write_buffer_limit else "unlimited write buffer"
print(f"CachedShelve initialized for '{filename}' with read cache size {cache_size}, {limit_msg} (Write Buffering Enabled)")
# __enter__, __exit__, _check_open 은 이전과 동일
def __enter__(self):
"""컨텍스트 관리자 진입: shelve 파일을 열고 캐시와 버퍼를 비웁니다."""
print(f"Opening shelve '{self.filename}' (internal writeback=False)...")
self._db = shelve.open(self.filename, flag=self.flag, writeback=self.shelve_writeback)
self._cache.clear()
self._write_buffer.clear()
print("Shelve opened, read cache and write buffer cleared.")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""컨텍스트 관리자 종료: 버퍼를 플러시하고 shelve 파일을 닫습니다."""
print("Exiting context...")
if self._db is not None:
try:
if self._write_buffer: # 버퍼에 내용이 있을 때만 플러시 시도
print("Flushing write buffer on exit...")
start_flush = time.time()
self._flush_write_buffer()
end_flush = time.time()
print(f"Write buffer flushed on exit (took {end_flush - start_flush:.4f}s).")
else:
print("Write buffer empty on exit, no flush needed.")
except Exception as e:
print(f"Error during buffer flush on exit: {e}")
# Depending on policy, you might want to re-raise here
finally:
print(f"Closing shelve '{self.filename}'...")
try:
start_close = time.time()
self._db.close()
end_close = time.time()
print(f"Shelve closed (took {end_close - start_close:.4f}s).")
except Exception as e:
print(f"Error closing shelve: {e}")
finally:
self._db = None
self._cache.clear()
# Ensure buffer is clear even if flush failed (or wasn't needed but close failed)
self._write_buffer.clear()
print("Read cache and write buffer cleared.")
if exc_type:
print(f"Exception occurred in 'with' block: {exc_type.__name__}")
print(f"WARNING: Changes made since the last sync might be lost due to the exception.")
return None # Propagate exception
def _check_open(self):
"""shelve가 열려 있는지 확인합니다."""
if self._db is None:
raise RuntimeError("Shelve is not open. Use within a 'with' block or ensure it's manually opened.")
def _flush_write_buffer(self):
"""쓰기 버퍼의 내용을 실제 shelve 객체에 반영하고 버퍼를 비웁니다."""
self._check_open()
if not self._write_buffer:
# print("Write buffer is empty, nothing to flush.") # Avoid noise if called often
return
print(f"Flushing {len(self._write_buffer)} items from write buffer to shelve...")
items_processed = 0
errors_occurred = 0
# Use list(self._write_buffer.items()) to avoid issues if flushing modifies the buffer indirectly
# Though in this simple flush, it shouldn't be an issue.
for key, value in list(self._write_buffer.items()):
try:
if value is _DELETED_SENTINEL:
if key in self._db:
del self._db[key]
else:
self._db[key] = value
items_processed += 1
except Exception as e:
errors_occurred += 1
print(f"Error flushing key '{key}': {e}")
print(f"Flush complete: Processed {items_processed} items with {errors_occurred} errors.")
self._write_buffer.clear()
# Optional: force disk write immediately (can impact performance)
# self._db.sync()
# print("Underlying shelve synced after flush.")
# --- New Helper Method ---
def _check_and_flush_buffer_if_full(self):
"""쓰기 버퍼가 설정된 한계에 도달했는지 확인하고, 도달했다면 플러시합니다."""
if self.write_buffer_limit is not None and len(self._write_buffer) >= self.write_buffer_limit:
print(f"Write buffer limit ({self.write_buffer_limit}) reached. Auto-flushing...")
self._flush_write_buffer()
# print("Auto-flush complete.") # Included in _flush_write_buffer now
# __getitem__ 은 이전과 동일
def __getitem__(self, key):
"""obj[key] 구문 지원 (읽기). 버퍼 -> 캐시 -> shelve 순으로 확인."""
self._check_open()
# 1. 쓰기 버퍼 확인 (최신 상태 반영)
if key in self._write_buffer:
value = self._write_buffer[key]
if value is _DELETED_SENTINEL:
raise KeyError(key)
else:
# Update read cache as well
self._cache[key] = value
self._cache.move_to_end(key)
if len(self._cache) > self.cache_size:
self._cache.popitem(last=False)
return value
# 2. 읽기 캐시 확인
if key in self._cache:
self._cache.move_to_end(key)
return self._cache[key]
# 3. shelve에서 읽기
value = self._db[key] # May raise KeyError
# 4. 읽기 캐시에 추가
self._cache[key] = value
self._cache.move_to_end(key)
# 5. 캐시 크기 관리
if len(self._cache) > self.cache_size:
self._cache.popitem(last=False)
return value
def __setitem__(self, key, value):
"""obj[key] = value 구문 지원 (쓰기). 쓰기 버퍼에 우선 기록. 버퍼가 차면 자동 플러시."""
self._check_open()
# --- Modified ---
# 버퍼에 추가하기 *전에* 가득 찼는지 확인하고 필요시 플러시
self._check_and_flush_buffer_if_full()
# --- End Modified ---
# 1. 쓰기 버퍼에 값 저장
self._write_buffer[key] = value
# print(f"[Buffer Write] Key: '{key}', Buffer size: {len(self._write_buffer)}") # Debugging
# 2. 읽기 캐시에도 반영
self._cache[key] = value
self._cache.move_to_end(key)
# 3. 읽기 캐시 크기 관리
if len(self._cache) > self.cache_size:
self._cache.popitem(last=False)
def __delitem__(self, key):
"""del obj[key] 구문 지원. 쓰기 버퍼에 삭제 표시. 버퍼가 차면 자동 플러시."""
self._check_open()
# 삭제 전 키 존재 여부 확인
if not self.__contains__(key):
raise KeyError(key)
# --- Modified ---
# 버퍼에 추가하기 *전에* 가득 찼는지 확인하고 필요시 플러시
self._check_and_flush_buffer_if_full()
# --- End Modified ---
# 1. 쓰기 버퍼에 삭제 표시
self._write_buffer[key] = _DELETED_SENTINEL
# print(f"[Buffer Delete] Key: '{key}', Buffer size: {len(self._write_buffer)}") # Debugging
# 2. 읽기 캐시에서도 제거
if key in self._cache:
del self._cache[key]
# __contains__, get, sync, clear_cache, __len__, keys, values, items 은 이전과 동일
def __contains__(self, key):
"""'key in obj' 구문 지원. 버퍼 -> 캐시 -> shelve 순 확인."""
self._check_open()
if key in self._write_buffer:
return self._write_buffer[key] is not _DELETED_SENTINEL
if key in self._cache:
return True
return key in self._db
def get(self, key, default=None):
"""딕셔너리의 get과 유사하게 작동. 키 없으면 default 반환."""
try:
return self[key]
except KeyError:
return default
except RuntimeError as e:
raise e
except Exception as e:
print(f"Error in get method for key '{key}': {e}")
return default
def sync(self):
"""
쓰기 버퍼의 변경 사항을 즉시 shelve 파일에 반영합니다.
"""
self._check_open()
print("Manual Sync: Flushing write buffer to shelve...")
self._flush_write_buffer()
# print("Manual Sync complete.") # Included in _flush_write_buffer
def clear_cache(self):
"""
수동으로 읽기 캐시만 비웁니다. 쓰기 버퍼는 유지됩니다.
"""
print("Clearing read cache manually (write buffer remains).")
self._cache.clear()
def __len__(self):
"""
len(obj) 지원. 실제 shelve와 쓰기 버퍼의 변경 사항을 조합하여 크기 계산.
"""
self._check_open()
return len(list(self.keys())) # Based on keys() result for accuracy
def keys(self):
"""
shelve와 쓰기 버퍼를 조합한 현재 키 집합을 반환합니다. (set으로 반환)
"""
self._check_open()
db_keys = set(self._db.keys())
buffer_added_keys = {k for k, v in self._write_buffer.items() if v is not _DELETED_SENTINEL}
buffer_deleted_keys = {k for k, v in self._write_buffer.items() if v is _DELETED_SENTINEL}
final_keys = (db_keys | buffer_added_keys) - buffer_deleted_keys
return final_keys
def values(self):
"""
shelve와 쓰기 버퍼를 조합한 현재 값들의 제너레이터를 반환합니다.
"""
self._check_open()
for key in self.keys():
yield self[key] # __getitem__ handles buffer/cache/db lookup
def items(self):
"""
shelve와 쓰기 버퍼를 조합한 현재 (키, 값) 쌍들의 제너레이터를 반환합니다.
"""
self._check_open()
for key in self.keys():
yield (key, self[key]) # __getitem__ handles buffer/cache/db lookup
# --- 클래스 사용 예제 (쓰기 버퍼 제한 및 자동 플러시 확인) ---
if __name__ == "__main__":
filename = 'my_autoflush_shelve.db'
read_cache_size = 2
# --- New: Set write buffer limit ---
write_limit = 3
print(f"\n--- Test with Write Buffer Limit = {write_limit} ---")
try:
# flag='n' : 항상 새 파일 생성
with CachedShelve(filename, cache_size=read_cache_size, write_buffer_limit=write_limit, flag='n') as cs:
print("\nAdding items (limit should trigger flush)...")
cs['a'] = 1 # Buffer: {'a': 1} (Size 1)
print(f"Buffer size: {len(cs._write_buffer)}")
cs['b'] = 2 # Buffer: {'a': 1, 'b': 2} (Size 2)
print(f"Buffer size: {len(cs._write_buffer)}")
cs['c'] = 3 # Buffer: {'a': 1, 'b': 2, 'c': 3} (Size 3) - Limit reached
print(f"Buffer size: {len(cs._write_buffer)}")
print("\nAdding item 'd' (should trigger auto-flush now)")
# Below line should trigger _check_and_flush_buffer_if_full()
cs['d'] = 4
# Flush happens, buffer cleared, then 'd' is added
# Buffer: {'d': 4} (Size 1)
print(f"Buffer size after adding 'd': {len(cs._write_buffer)}")
print(f"Current keys (after potential flush): {list(cs.keys())}")
print("\nAdding more items...")
cs['e'] = 5 # Buffer: {'d': 4, 'e': 5} (Size 2)
print(f"Buffer size: {len(cs._write_buffer)}")
cs['f'] = 6 # Buffer: {'d': 4, 'e': 5, 'f': 6} (Size 3) - Limit reached again
print(f"Buffer size: {len(cs._write_buffer)}")
print("\nDeleting 'a' (should trigger auto-flush)")
# 'a' was flushed earlier, now we delete it. Buffer limit reached.
del cs['a']
# Flush happens (d, e, f written), buffer cleared, then 'a' deletion added
# Buffer: {'a': SENTINEL} (Size 1)
print(f"Buffer size after deleting 'a': {len(cs._write_buffer)}")
cs['g'] = 7 # Buffer: {'a': SENTINEL, 'g': 7} (Size 2)
print(f"Buffer size: {len(cs._write_buffer)}")
print(f"Final keys before exit: {list(cs.keys())}")
print(f"Final buffer content before exit: {cs._write_buffer}")
# Exit flushes the remaining buffer {'a': SENTINEL, 'g': 7}
print("\n--- After 'with' block (final flush on exit) ---")
except Exception as e:
print(f"An error occurred: {e}")
# Clean up in case of error during test
import os
for ext in ['', '.bak', '.dat', '.dir']:
try: os.remove(filename + ext)
except FileNotFoundError: pass
raise # Re-raise exception
# Verify final contents
print("\n--- Verifying final contents in shelve ---")
try:
with CachedShelve(filename, flag='r') as cs_read: # Read-only
print(f"Final keys in shelve: {sorted(list(cs_read.keys()))}") # sorted for consistent output
expected_keys = {'b', 'd', 'e', 'f', 'g'} # a deleted, c flushed then overwritten by flush of a's deletion? No, c was flushed. b was flushed. d,e,f flushed. a deleted in final flush, g added in final flush.
actual_keys = cs_read.keys()
if actual_keys == expected_keys:
print("Verification successful!")
print("Values:")
for k in sorted(actual_keys):
print(f" {k}: {cs_read[k]}")
else:
print(f"Verification FAILED!")
print(f" Expected: {sorted(list(expected_keys))}")
print(f" Actual: {sorted(list(actual_keys))}")
except Exception as e:
print(f"An error occurred during verification: {e}")
finally:
# Clean up shelve files
import os
print("\nCleaning up shelve files...")
for ext in ['', '.bak', '.dat', '.dir']:
try:
os.remove(filename + ext)
# print(f"Removed {filename + ext}")
except FileNotFoundError:
pass
print("Cleanup complete.")
sync i/o 최적화는?
그럼에도 이 또한 sync 과정에서 i/o가 개별적으로 발생하니 조삼모사 같기도 하다. 거기에 대한 gemini의 생각은 아래와 같다고 한다. 내심 처음 생각했던게 sqlite였는데 코드 고치기가 싫어서 다른 방법 찾다가 돌고돌아 결국 다시 원점으로 왔나 싶기도 하다. 뭔가 수미상관인가?
네, 아주 정확한 지적입니다! 현재
_flush_write_buffer
메소드는_write_buffer
의 각 항목에 대해 루프를 돌면서self._db[key] = value
또는del self._db[key]
를 호출합니다.shelve
객체는 딕셔너리처럼 동작하지만, 내부적으로는 각 할당/삭제 작업이 (대부분의dbm
구현에서) 개별적인 파일 I/O 작업으로 이어질 수 있습니다. 따라서 버퍼에 100개의 변경 사항이 있었다면, 플러시 과정에서 100번의 작은 디스크 접근이 발생할 수 있습니다.이 플러시 과정 자체를 최적화하여 I/O를 모아서 한 번에 처리할 수 있을까요?
안타깝게도 표준
shelve
모듈과 그 기반의dbm
라이브러리 수준에서는 이 작업을 직접적으로 제어하거나 보장하는 간단하고 표준적인 방법이 제한적입니다.몇 가지 관련된 측면과 가능한 접근법(및 한계)을 살펴보겠습니다.
shelve
의 내부 동작 및dbm
:
shelve
는 키-값 저장을 위해dbm
(예:dbm.gnu
,dbm.ndbm
,dbm.dumb
) 같은 라이브러리를 사용합니다. 이러한dbm
라이브러리가 내부적으로 어느 정도의 쓰기 버퍼링을 수행할 수는 있지만,shelve
모듈 자체가 "여러 항목을 한 번의 I/O 작업으로 쓰기" 위한 명시적인 공개 API(예:update_batch(items)
)를 제공하지는 않습니다.- 따라서
self._db[key] = value
호출이 즉시 디스크에 쓰일지, 내부dbm
버퍼에 들어갈지는 사용 중인dbm
구현과 운영체제에 따라 달라질 수 있으며, 파이썬 코드 수준에서 직접 제어하기 어렵습니다.shelve.sync()
의 역할:
shelve
객체의sync()
메소드는 아직 디스크에 기록되지 않은 데이터를 강제로 쓰도록 요청합니다.- 만약
_flush_write_buffer
루프가 끝난 후self._db.sync()
를 호출하면, 루프 동안 요청된 모든 개별 쓰기 작업들이 (만약dbm
이나 OS 수준에서 버퍼링되고 있었다면) 디스크에 확실히 반영되도록 보장하는 데 도움이 됩니다.- 하지만 이것이 루프 동안 발생한 여러 번의 I/O 요청 자체를 한 번의 큰 I/O 요청으로 바꿔주지는 못할 가능성이 높습니다. 이미 여러 번의 쓰기/삭제 작업이
dbm
계층에 전달되었을 수 있습니다.shelve(writeback=True)
재고:
- 이전에 우리는
writeback=True
사용을 금지했습니다. 하지만 이론적으로, 만약shelve.open(..., writeback=True)
를 사용한다면,_flush_write_buffer
루프에서self._db[key] = value
등을 호출하는 것이shelve
의 내부 메모리 캐시만 업데이트할 수 있습니다. 그리고 루프가 끝난 후self._db.sync()
를 한 번 호출하면,shelve
가 내부 캐시의 모든 변경 사항을 (어쩌면 더 효율적으로) 디스크에 쓸 수도 있습니다.- 단점: 이전에 논의했듯이 메모리 사용량 증가 (우리 캐시 +
shelve
캐시), 복잡성 증가,shelve
내부 동작 의존성, 우리 캐싱 전략과의 충돌 가능성 등 문제가 많아 여전히 권장하기 어렵습니다.shelve.update()
사용:
shelve
객체도 딕셔너리처럼update()
메소드를 가집니다. 쓰기 버퍼의 내용을 딕셔너리로 만들어self._db.update(write_dict)
를 호출하면 어떨까요?- 한계: 파이썬의 일반적인
update()
구현은 단순히 입력 딕셔너리를 반복하며 각 항목에 대해__setitem__
을 호출하는 것과 같습니다. 따라서_db.update(d)
는for k, v in d.items(): _db[k] = v
와 동일하게 동작할 가능성이 높으며, I/O 최적화를 보장하지 않습니다. (삭제 작업은 별도로 처리해야 함).- 대안: 다른 백엔드 사용:
- 이 문제의 근본적인 해결책은
shelve
대신 명시적인 배치(batch) 쓰기 API를 제공하는 다른 저장소 백엔드를 사용하는 것입니다.
- SQLite:
sqlite3
모듈을 사용하여 여러 SQLINSERT
,UPDATE
,DELETE
문을 하나의 트랜잭션으로 묶어 실행하면 훨씬 효율적입니다.- LMDB (Lightning Memory-Mapped Database):
lmdb
파이썬 라이브러리는 트랜잭션 내에서 여러 쓰기 작업을 효율적으로 처리하는 기능을 제공합니다.- LevelDB/RocksDB: 이 라이브러리들도 배치 쓰기 기능을 제공하는 경우가 많습니다.
- 단점: 이는
CachedShelve
클래스의 기반 저장소를shelve
가 아닌 다른 것으로 완전히 바꾸는 큰 변경이며, 기존shelve
파일과의 호환성이 사라집니다.결론:
현재
CachedShelve
클래스가 표준shelve
모듈을 사용하는 한,_flush_write_buffer
내의 여러 쓰기/삭제 요청을 하나의 최적화된 디스크 I/O 작업으로 확실하게 묶는 것은 어렵습니다.shelve
API의 한계와dbm
구현의 다양성 때문입니다.우리가 할 수 있는 최선은 다음과 같습니다.
- 쓰기 지연: 현재 구현처럼
_write_buffer
를 사용하여shelve
에 대한 접근 자체를 최대한 지연시키는 것 (이것만으로도 상당한 성능 향상이 있을 수 있습니다).sync()
호출:_flush_write_buffer
가 끝난 후self._db.sync()
를 호출하여 데이터의 디스크 기록을 보장하는 것 (선택 사항, 내구성은 높이지만 약간의 추가 오버헤드 발생 가능).만약 플러시 과정 자체의 I/O 효율을 극대화하는 것이 매우 중요하다면,
shelve
대신 SQLite나 LMDB 같은 다른 백엔드를 사용하도록 클래스를 재설계하는 것을 고려해야 합니다.
결론
결국 데이터 구조를 변경해서 해결했으니 이 모든 논의가 쓸모 없는 시간 낭비는 아닐까라는 생각이 들수도 있다. 하지만 과정이 재밌었고, 뭔가 배운게 있었다면 된 거 아닌가 싶다. 공자 선생님께서 말씀하셨듯이 배우고 때때로 익히면 또한 즐겁지 아니한가?
어쨌든, 오늘의 교훈. 구글도 이제 감 잡은 거 같기도 하다는 거. gemini 2.5 pro 몹시 훌륭하다. 벤치마크에서는 양민 학살중~
덧) 인공지능은 실수할 수 있다. 여기 올려진 코드에 버그가 있다고 너무 나무라지는 말자. 직접 작성해도 그 정도 버그는 나온다. 참고로 gemini는 저 모든 코드를 타이핑 하는 속도보다 빠르게 만들었다.