MRUキャッシュとmemoize

適当なモジュールが見つからなかったので自分で書いた。

  • memoize.py:
# -*- coding: utf-8 -*-

from UserDict import UserDict
import cPickle
import logging


class MRUCache(UserDict):
    mru = []
    default_capacity = 1000

    def __init__(self, capacity=0):
        UserDict.__init__(self)
        self.capacity = capacity or self.default_capacity

    def _touch(self, key):
        try:
            i = self.mru.index(key)
            if i > 0:
                self.mru.insert(0, self.mru.pop(i))
        except ValueError:
            self.mru.insert(0, key)
            if len(self.mru) > self.capacity:
                key = self.mru.pop()
                UserDict.__delitem__(self, key)

    def _untouch(self, key):
        try:
            i = self.mru.index(key)
            if i > 0:
                key = self.mru.pop(i)
                UserDict.__delitem__(self, key)
        except ValueError:
            pass

    def __getitem__(self, key):
        self._touch(key)
        return UserDict.__getitem__(self, key)

    def __setitem__(self, key, value):
        self._touch(key)
        UserDict.__setitem__(self, key, value)

    def __delitem__(self, key):
        self._untouch(key)
        UserDict.__delitem__(self, key)

    def clear(self):
        UserDict.clear(self)
        self.mru = []

    def copy(self):
        d = UserDict.copy(self)
        d.mru = self.mru[:]


# ASPN : Python Cookbook : Memoize decorator with O(1) length-limited
#        LRU cache, supports mutable types
#
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/498110

class DecoratorHelper(object):
    def __new__(typ, *attr_args, **attr_kwargs):
        def decorator(orig_func):
            self = object.__new__(typ)
            self.__init__(orig_func, *attr_args, **attr_kwargs)
            return self
		
        return decorator

class memoize(DecoratorHelper):
    def _default_keyfunc(*args, **kwargs):
        return cPickle.dumps((args, kwargs))

    def __init__(self, func, capacity,
                 keyfunc=_default_keyfunc,
                 filterfunc=None):
        self.func       = func
        self.capacity   = capacity
        self.keyfunc    = keyfunc
        self.filterfunc = filterfunc
        self.reset()
    
    def reset(self):
        self.mru = MRUCache(self.capacity)
        self.hits = 0
        self.misses = 0
    
    def __call__(self, *args, **kwargs):
        key = self.keyfunc(*args, **kwargs)
        try:
            value = self.mru[key]
        except KeyError:
            # We have an entry not in the cache
            self.misses += 1
            
            value = self.func(*args, **kwargs)
            if self.filterfunc is None or self.filterfunc(value):
                self.mru[key] = value
        else:
            # We have an entry in the cache
            self.hits += 1
            logging.debug('key %s is found in the cache' % key)
        return value


__all__ = ['MRUCache', 'DecoratorHelper', 'memoize']
  • test_memoize.py:
# -*- coding: utf-8 -*-

import sys
import time
import logging
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s')
logging.getLogger().setLevel(logging.INFO)

from memoize import memoize


def periodic_call(callable, n):
    width = 40
    t = time.time()
    for i in xrange(0, n):
        callable()
        c = int((i + 1) / n * width) - int(i / n * width)
        if c > 0:
            sys.stderr.write("*" * c)
    print >> sys.stderr, ' %g sec elapsed' % (time.time() - t)


def fib(n):
    return (n > 1) and (fib(n - 1) + fib(n - 2)) or 1

def test_fib():
    seq = [fib(n) for n in xrange(0,10)]
    assert seq == [1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
    print >> sys.stderr, ""

    print >> sys.stderr, "fib(20) ",
    periodic_call(lambda: fib(20), 100)


@memoize(3)
def fib1(n):
    return (n > 1) and (fib1(n - 1) + fib1(n - 2)) or 1

def test_fib1():
    assert fib1(100) == 573147844013817084101L
    print >> sys.stderr, ""

    print >> sys.stderr, "fib1(20) ",
    periodic_call(lambda: [fib1.reset()] and fib1(20), 100)
    print >> sys.stderr, "fib1(50) ",
    periodic_call(lambda: [fib1.reset()] and fib1(50), 100)


# This is faster because it doesn't use the default key function -
# it doesn't need to call cPickle.dumps((*args, **kwargs))
@memoize(100, lambda n: n)
def fib2(n):
    return (n > 1) and (fib2(n - 1) + fib2(n - 2)) or 1

def test_fib2():
    assert fib2(100) == 573147844013817084101L
    print >> sys.stderr, ""

    print >> sys.stderr, "fib2(20) ",
    periodic_call(lambda: [fib2.reset()] and fib2(20), 100)
    print >> sys.stderr, "fib2(50) ",
    periodic_call(lambda: [fib2.reset()] and fib2(50), 100)

結果:

$ nosetests -v test_memoize.py 
test_memoize.test_fib ... 
fib(20) **************************************** 2.73702 sec elapsed
ok
test_memoize.test_fib1 ... 
fib1(20) **************************************** 0.555827 sec elapsed
fib1(50) **************************************** 1.62828 sec elapsed
ok
test_memoize.test_fib2 ... 
fib2(20) **************************************** 0.34834 sec elapsed
fib2(50) **************************************** 0.919944 sec elapsed
ok

----------------------------------------------------------------------
Ran 3 tests in 7.687s

OK