added python 2 dependency functools32

This commit is contained in:
Jakub Trllo 2022-03-07 18:08:25 +01:00
parent abe340130f
commit f021eaf389
4 changed files with 739 additions and 0 deletions

View file

@ -0,0 +1 @@
from .functools32 import *

View file

@ -0,0 +1,158 @@
"""Drop-in replacement for the thread module.
Meant to be used as a brain-dead substitute so that threaded code does
not need to be rewritten for when the thread module is not present.
Suggested usage is::
try:
try:
import _thread # Python >= 3
except:
import thread as _thread # Python < 3
except ImportError:
import _dummy_thread as _thread
"""
# Exports only things specified by thread documentation;
# skipping obsolete synonyms allocate(), start_new(), exit_thread().
__all__ = ['error', 'start_new_thread', 'exit', 'get_ident', 'allocate_lock',
'interrupt_main', 'LockType']
# A dummy value
TIMEOUT_MAX = 2**31
# NOTE: this module can be imported early in the extension building process,
# and so top level imports of other modules should be avoided. Instead, all
# imports are done when needed on a function-by-function basis. Since threads
# are disabled, the import lock should not be an issue anyway (??).
class error(Exception):
"""Dummy implementation of _thread.error."""
def __init__(self, *args):
self.args = args
def start_new_thread(function, args, kwargs={}):
"""Dummy implementation of _thread.start_new_thread().
Compatibility is maintained by making sure that ``args`` is a
tuple and ``kwargs`` is a dictionary. If an exception is raised
and it is SystemExit (which can be done by _thread.exit()) it is
caught and nothing is done; all other exceptions are printed out
by using traceback.print_exc().
If the executed function calls interrupt_main the KeyboardInterrupt will be
raised when the function returns.
"""
if type(args) != type(tuple()):
raise TypeError("2nd arg must be a tuple")
if type(kwargs) != type(dict()):
raise TypeError("3rd arg must be a dict")
global _main
_main = False
try:
function(*args, **kwargs)
except SystemExit:
pass
except:
import traceback
traceback.print_exc()
_main = True
global _interrupt
if _interrupt:
_interrupt = False
raise KeyboardInterrupt
def exit():
"""Dummy implementation of _thread.exit()."""
raise SystemExit
def get_ident():
"""Dummy implementation of _thread.get_ident().
Since this module should only be used when _threadmodule is not
available, it is safe to assume that the current process is the
only thread. Thus a constant can be safely returned.
"""
return -1
def allocate_lock():
"""Dummy implementation of _thread.allocate_lock()."""
return LockType()
def stack_size(size=None):
"""Dummy implementation of _thread.stack_size()."""
if size is not None:
raise error("setting thread stack size not supported")
return 0
class LockType(object):
"""Class implementing dummy implementation of _thread.LockType.
Compatibility is maintained by maintaining self.locked_status
which is a boolean that stores the state of the lock. Pickling of
the lock, though, should not be done since if the _thread module is
then used with an unpickled ``lock()`` from here problems could
occur from this class not having atomic methods.
"""
def __init__(self):
self.locked_status = False
def acquire(self, waitflag=None, timeout=-1):
"""Dummy implementation of acquire().
For blocking calls, self.locked_status is automatically set to
True and returned appropriately based on value of
``waitflag``. If it is non-blocking, then the value is
actually checked and not set if it is already acquired. This
is all done so that threading.Condition's assert statements
aren't triggered and throw a little fit.
"""
if waitflag is None or waitflag:
self.locked_status = True
return True
else:
if not self.locked_status:
self.locked_status = True
return True
else:
if timeout > 0:
import time
time.sleep(timeout)
return False
__enter__ = acquire
def __exit__(self, typ, val, tb):
self.release()
def release(self):
"""Release the dummy lock."""
# XXX Perhaps shouldn't actually bother to test? Could lead
# to problems for complex, threaded code.
if not self.locked_status:
raise error
self.locked_status = False
return True
def locked(self):
return self.locked_status
# Used to signal that interrupt_main was called in a "thread"
_interrupt = False
# True when not executing in a "thread"
_main = True
def interrupt_main():
"""Set _interrupt flag to True to have start_new_thread raise
KeyboardInterrupt upon exiting."""
if _main:
raise KeyboardInterrupt
else:
global _interrupt
_interrupt = True

View file

@ -0,0 +1,423 @@
"""functools.py - Tools for working with functions and callable objects
"""
# Python module wrapper for _functools C module
# to allow utilities written in Python to be added
# to the functools module.
# Written by Nick Coghlan <ncoghlan at gmail.com>
# and Raymond Hettinger <python at rcn.com>
# Copyright (C) 2006-2010 Python Software Foundation.
# See C source code for _functools credits/copyright
__all__ = ['update_wrapper', 'wraps', 'WRAPPER_ASSIGNMENTS', 'WRAPPER_UPDATES',
'total_ordering', 'cmp_to_key', 'lru_cache', 'reduce', 'partial']
from _functools import partial, reduce
from collections import MutableMapping, namedtuple
from .reprlib32 import recursive_repr as _recursive_repr
from weakref import proxy as _proxy
import sys as _sys
try:
from thread import allocate_lock as Lock
except ImportError:
from ._dummy_thread32 import allocate_lock as Lock
################################################################################
### OrderedDict
################################################################################
class _Link(object):
__slots__ = 'prev', 'next', 'key', '__weakref__'
class OrderedDict(dict):
'Dictionary that remembers insertion order'
# An inherited dict maps keys to values.
# The inherited dict provides __getitem__, __len__, __contains__, and get.
# The remaining methods are order-aware.
# Big-O running times for all methods are the same as regular dictionaries.
# The internal self.__map dict maps keys to links in a doubly linked list.
# The circular doubly linked list starts and ends with a sentinel element.
# The sentinel element never gets deleted (this simplifies the algorithm).
# The sentinel is in self.__hardroot with a weakref proxy in self.__root.
# The prev links are weakref proxies (to prevent circular references).
# Individual links are kept alive by the hard reference in self.__map.
# Those hard references disappear when a key is deleted from an OrderedDict.
def __init__(self, *args, **kwds):
'''Initialize an ordered dictionary. The signature is the same as
regular dictionaries, but keyword arguments are not recommended because
their insertion order is arbitrary.
'''
if len(args) > 1:
raise TypeError('expected at most 1 arguments, got %d' % len(args))
try:
self.__root
except AttributeError:
self.__hardroot = _Link()
self.__root = root = _proxy(self.__hardroot)
root.prev = root.next = root
self.__map = {}
self.__update(*args, **kwds)
def __setitem__(self, key, value,
dict_setitem=dict.__setitem__, proxy=_proxy, Link=_Link):
'od.__setitem__(i, y) <==> od[i]=y'
# Setting a new item creates a new link at the end of the linked list,
# and the inherited dictionary is updated with the new key/value pair.
if key not in self:
self.__map[key] = link = Link()
root = self.__root
last = root.prev
link.prev, link.next, link.key = last, root, key
last.next = link
root.prev = proxy(link)
dict_setitem(self, key, value)
def __delitem__(self, key, dict_delitem=dict.__delitem__):
'od.__delitem__(y) <==> del od[y]'
# Deleting an existing item uses self.__map to find the link which gets
# removed by updating the links in the predecessor and successor nodes.
dict_delitem(self, key)
link = self.__map.pop(key)
link_prev = link.prev
link_next = link.next
link_prev.next = link_next
link_next.prev = link_prev
def __iter__(self):
'od.__iter__() <==> iter(od)'
# Traverse the linked list in order.
root = self.__root
curr = root.next
while curr is not root:
yield curr.key
curr = curr.next
def __reversed__(self):
'od.__reversed__() <==> reversed(od)'
# Traverse the linked list in reverse order.
root = self.__root
curr = root.prev
while curr is not root:
yield curr.key
curr = curr.prev
def clear(self):
'od.clear() -> None. Remove all items from od.'
root = self.__root
root.prev = root.next = root
self.__map.clear()
dict.clear(self)
def popitem(self, last=True):
'''od.popitem() -> (k, v), return and remove a (key, value) pair.
Pairs are returned in LIFO order if last is true or FIFO order if false.
'''
if not self:
raise KeyError('dictionary is empty')
root = self.__root
if last:
link = root.prev
link_prev = link.prev
link_prev.next = root
root.prev = link_prev
else:
link = root.next
link_next = link.next
root.next = link_next
link_next.prev = root
key = link.key
del self.__map[key]
value = dict.pop(self, key)
return key, value
def move_to_end(self, key, last=True):
'''Move an existing element to the end (or beginning if last==False).
Raises KeyError if the element does not exist.
When last=True, acts like a fast version of self[key]=self.pop(key).
'''
link = self.__map[key]
link_prev = link.prev
link_next = link.next
link_prev.next = link_next
link_next.prev = link_prev
root = self.__root
if last:
last = root.prev
link.prev = last
link.next = root
last.next = root.prev = link
else:
first = root.next
link.prev = root
link.next = first
root.next = first.prev = link
def __sizeof__(self):
sizeof = _sys.getsizeof
n = len(self) + 1 # number of links including root
size = sizeof(self.__dict__) # instance dictionary
size += sizeof(self.__map) * 2 # internal dict and inherited dict
size += sizeof(self.__hardroot) * n # link objects
size += sizeof(self.__root) * n # proxy objects
return size
update = __update = MutableMapping.update
keys = MutableMapping.keys
values = MutableMapping.values
items = MutableMapping.items
__ne__ = MutableMapping.__ne__
__marker = object()
def pop(self, key, default=__marker):
'''od.pop(k[,d]) -> v, remove specified key and return the corresponding
value. If key is not found, d is returned if given, otherwise KeyError
is raised.
'''
if key in self:
result = self[key]
del self[key]
return result
if default is self.__marker:
raise KeyError(key)
return default
def setdefault(self, key, default=None):
'od.setdefault(k[,d]) -> od.get(k,d), also set od[k]=d if k not in od'
if key in self:
return self[key]
self[key] = default
return default
@_recursive_repr()
def __repr__(self):
'od.__repr__() <==> repr(od)'
if not self:
return '%s()' % (self.__class__.__name__,)
return '%s(%r)' % (self.__class__.__name__, list(self.items()))
def __reduce__(self):
'Return state information for pickling'
items = [[k, self[k]] for k in self]
inst_dict = vars(self).copy()
for k in vars(OrderedDict()):
inst_dict.pop(k, None)
if inst_dict:
return (self.__class__, (items,), inst_dict)
return self.__class__, (items,)
def copy(self):
'od.copy() -> a shallow copy of od'
return self.__class__(self)
@classmethod
def fromkeys(cls, iterable, value=None):
'''OD.fromkeys(S[, v]) -> New ordered dictionary with keys from S.
If not specified, the value defaults to None.
'''
self = cls()
for key in iterable:
self[key] = value
return self
def __eq__(self, other):
'''od.__eq__(y) <==> od==y. Comparison to another OD is order-sensitive
while comparison to a regular mapping is order-insensitive.
'''
if isinstance(other, OrderedDict):
return len(self)==len(other) and \
all(p==q for p, q in zip(self.items(), other.items()))
return dict.__eq__(self, other)
# update_wrapper() and wraps() are tools to help write
# wrapper functions that can handle naive introspection
WRAPPER_ASSIGNMENTS = ('__module__', '__name__', '__doc__')
WRAPPER_UPDATES = ('__dict__',)
def update_wrapper(wrapper,
wrapped,
assigned = WRAPPER_ASSIGNMENTS,
updated = WRAPPER_UPDATES):
"""Update a wrapper function to look like the wrapped function
wrapper is the function to be updated
wrapped is the original function
assigned is a tuple naming the attributes assigned directly
from the wrapped function to the wrapper function (defaults to
functools.WRAPPER_ASSIGNMENTS)
updated is a tuple naming the attributes of the wrapper that
are updated with the corresponding attribute from the wrapped
function (defaults to functools.WRAPPER_UPDATES)
"""
wrapper.__wrapped__ = wrapped
for attr in assigned:
try:
value = getattr(wrapped, attr)
except AttributeError:
pass
else:
setattr(wrapper, attr, value)
for attr in updated:
getattr(wrapper, attr).update(getattr(wrapped, attr, {}))
# Return the wrapper so this can be used as a decorator via partial()
return wrapper
def wraps(wrapped,
assigned = WRAPPER_ASSIGNMENTS,
updated = WRAPPER_UPDATES):
"""Decorator factory to apply update_wrapper() to a wrapper function
Returns a decorator that invokes update_wrapper() with the decorated
function as the wrapper argument and the arguments to wraps() as the
remaining arguments. Default arguments are as for update_wrapper().
This is a convenience function to simplify applying partial() to
update_wrapper().
"""
return partial(update_wrapper, wrapped=wrapped,
assigned=assigned, updated=updated)
def total_ordering(cls):
"""Class decorator that fills in missing ordering methods"""
convert = {
'__lt__': [('__gt__', lambda self, other: not (self < other or self == other)),
('__le__', lambda self, other: self < other or self == other),
('__ge__', lambda self, other: not self < other)],
'__le__': [('__ge__', lambda self, other: not self <= other or self == other),
('__lt__', lambda self, other: self <= other and not self == other),
('__gt__', lambda self, other: not self <= other)],
'__gt__': [('__lt__', lambda self, other: not (self > other or self == other)),
('__ge__', lambda self, other: self > other or self == other),
('__le__', lambda self, other: not self > other)],
'__ge__': [('__le__', lambda self, other: (not self >= other) or self == other),
('__gt__', lambda self, other: self >= other and not self == other),
('__lt__', lambda self, other: not self >= other)]
}
roots = set(dir(cls)) & set(convert)
if not roots:
raise ValueError('must define at least one ordering operation: < > <= >=')
root = max(roots) # prefer __lt__ to __le__ to __gt__ to __ge__
for opname, opfunc in convert[root]:
if opname not in roots:
opfunc.__name__ = opname
opfunc.__doc__ = getattr(int, opname).__doc__
setattr(cls, opname, opfunc)
return cls
def cmp_to_key(mycmp):
"""Convert a cmp= function into a key= function"""
class K(object):
__slots__ = ['obj']
def __init__(self, obj):
self.obj = obj
def __lt__(self, other):
return mycmp(self.obj, other.obj) < 0
def __gt__(self, other):
return mycmp(self.obj, other.obj) > 0
def __eq__(self, other):
return mycmp(self.obj, other.obj) == 0
def __le__(self, other):
return mycmp(self.obj, other.obj) <= 0
def __ge__(self, other):
return mycmp(self.obj, other.obj) >= 0
def __ne__(self, other):
return mycmp(self.obj, other.obj) != 0
__hash__ = None
return K
_CacheInfo = namedtuple("CacheInfo", "hits misses maxsize currsize")
def lru_cache(maxsize=100):
"""Least-recently-used cache decorator.
If *maxsize* is set to None, the LRU features are disabled and the cache
can grow without bound.
Arguments to the cached function must be hashable.
View the cache statistics named tuple (hits, misses, maxsize, currsize) with
f.cache_info(). Clear the cache and statistics with f.cache_clear().
Access the underlying function with f.__wrapped__.
See: http://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used
"""
# Users should only access the lru_cache through its public API:
# cache_info, cache_clear, and f.__wrapped__
# The internals of the lru_cache are encapsulated for thread safety and
# to allow the implementation to change (including a possible C version).
def decorating_function(user_function,
tuple=tuple, sorted=sorted, len=len, KeyError=KeyError):
hits, misses = [0], [0]
kwd_mark = (object(),) # separates positional and keyword args
lock = Lock() # needed because OrderedDict isn't threadsafe
if maxsize is None:
cache = dict() # simple cache without ordering or size limit
@wraps(user_function)
def wrapper(*args, **kwds):
key = args
if kwds:
key += kwd_mark + tuple(sorted(kwds.items()))
try:
result = cache[key]
hits[0] += 1
return result
except KeyError:
pass
result = user_function(*args, **kwds)
cache[key] = result
misses[0] += 1
return result
else:
cache = OrderedDict() # ordered least recent to most recent
cache_popitem = cache.popitem
cache_renew = cache.move_to_end
@wraps(user_function)
def wrapper(*args, **kwds):
key = args
if kwds:
key += kwd_mark + tuple(sorted(kwds.items()))
with lock:
try:
result = cache[key]
cache_renew(key) # record recent use of this key
hits[0] += 1
return result
except KeyError:
pass
result = user_function(*args, **kwds)
with lock:
cache[key] = result # record recent use of this key
misses[0] += 1
if len(cache) > maxsize:
cache_popitem(0) # purge least recently used cache entry
return result
def cache_info():
"""Report cache statistics"""
with lock:
return _CacheInfo(hits[0], misses[0], maxsize, len(cache))
def cache_clear():
"""Clear the cache and cache statistics"""
with lock:
cache.clear()
hits[0] = misses[0] = 0
wrapper.cache_info = cache_info
wrapper.cache_clear = cache_clear
return wrapper
return decorating_function

View file

@ -0,0 +1,157 @@
"""Redo the builtin repr() (representation) but with limits on most sizes."""
__all__ = ["Repr", "repr", "recursive_repr"]
import __builtin__ as builtins
from itertools import islice
try:
from thread import get_ident
except ImportError:
from _dummy_thread32 import get_ident
def recursive_repr(fillvalue='...'):
'Decorator to make a repr function return fillvalue for a recursive call'
def decorating_function(user_function):
repr_running = set()
def wrapper(self):
key = id(self), get_ident()
if key in repr_running:
return fillvalue
repr_running.add(key)
try:
result = user_function(self)
finally:
repr_running.discard(key)
return result
# Can't use functools.wraps() here because of bootstrap issues
wrapper.__module__ = getattr(user_function, '__module__')
wrapper.__doc__ = getattr(user_function, '__doc__')
wrapper.__name__ = getattr(user_function, '__name__')
wrapper.__annotations__ = getattr(user_function, '__annotations__', {})
return wrapper
return decorating_function
class Repr:
def __init__(self):
self.maxlevel = 6
self.maxtuple = 6
self.maxlist = 6
self.maxarray = 5
self.maxdict = 4
self.maxset = 6
self.maxfrozenset = 6
self.maxdeque = 6
self.maxstring = 30
self.maxlong = 40
self.maxother = 30
def repr(self, x):
return self.repr1(x, self.maxlevel)
def repr1(self, x, level):
typename = type(x).__name__
if ' ' in typename:
parts = typename.split()
typename = '_'.join(parts)
if hasattr(self, 'repr_' + typename):
return getattr(self, 'repr_' + typename)(x, level)
else:
return self.repr_instance(x, level)
def _repr_iterable(self, x, level, left, right, maxiter, trail=''):
n = len(x)
if level <= 0 and n:
s = '...'
else:
newlevel = level - 1
repr1 = self.repr1
pieces = [repr1(elem, newlevel) for elem in islice(x, maxiter)]
if n > maxiter: pieces.append('...')
s = ', '.join(pieces)
if n == 1 and trail: right = trail + right
return '%s%s%s' % (left, s, right)
def repr_tuple(self, x, level):
return self._repr_iterable(x, level, '(', ')', self.maxtuple, ',')
def repr_list(self, x, level):
return self._repr_iterable(x, level, '[', ']', self.maxlist)
def repr_array(self, x, level):
header = "array('%s', [" % x.typecode
return self._repr_iterable(x, level, header, '])', self.maxarray)
def repr_set(self, x, level):
x = _possibly_sorted(x)
return self._repr_iterable(x, level, 'set([', '])', self.maxset)
def repr_frozenset(self, x, level):
x = _possibly_sorted(x)
return self._repr_iterable(x, level, 'frozenset([', '])',
self.maxfrozenset)
def repr_deque(self, x, level):
return self._repr_iterable(x, level, 'deque([', '])', self.maxdeque)
def repr_dict(self, x, level):
n = len(x)
if n == 0: return '{}'
if level <= 0: return '{...}'
newlevel = level - 1
repr1 = self.repr1
pieces = []
for key in islice(_possibly_sorted(x), self.maxdict):
keyrepr = repr1(key, newlevel)
valrepr = repr1(x[key], newlevel)
pieces.append('%s: %s' % (keyrepr, valrepr))
if n > self.maxdict: pieces.append('...')
s = ', '.join(pieces)
return '{%s}' % (s,)
def repr_str(self, x, level):
s = builtins.repr(x[:self.maxstring])
if len(s) > self.maxstring:
i = max(0, (self.maxstring-3)//2)
j = max(0, self.maxstring-3-i)
s = builtins.repr(x[:i] + x[len(x)-j:])
s = s[:i] + '...' + s[len(s)-j:]
return s
def repr_int(self, x, level):
s = builtins.repr(x) # XXX Hope this isn't too slow...
if len(s) > self.maxlong:
i = max(0, (self.maxlong-3)//2)
j = max(0, self.maxlong-3-i)
s = s[:i] + '...' + s[len(s)-j:]
return s
def repr_instance(self, x, level):
try:
s = builtins.repr(x)
# Bugs in x.__repr__() can cause arbitrary
# exceptions -- then make up something
except Exception:
return '<%s instance at %x>' % (x.__class__.__name__, id(x))
if len(s) > self.maxother:
i = max(0, (self.maxother-3)//2)
j = max(0, self.maxother-3-i)
s = s[:i] + '...' + s[len(s)-j:]
return s
def _possibly_sorted(x):
# Since not all sequences of items can be sorted and comparison
# functions may raise arbitrary exceptions, return an unsorted
# sequence in that case.
try:
return sorted(x)
except Exception:
return list(x)
aRepr = Repr()
repr = aRepr.repr