Source code for querypool.pools.threading

from time import time
from threading import RLock
from contextlib import contextmanager
from typing import Any, Callable, Dict, Optional
from concurrent.futures import ThreadPoolExecutor, Future, TimeoutError

from . import GEVENT_PATCHED
from .interface import QueryPool
from ..utils.maxsizedict import MaxSizeDict


[docs] class ThreadQueryPool(QueryPool): """ This query pool is to be used in an environment that is not gevent patched. """ def __init__( self, timeout: Optional[float] = None, maxqueries: Optional[int] = None ): if GEVENT_PATCHED: raise RuntimeError("requires an environment non pactehd by gevent") self.__pool = ThreadPoolExecutor(max_workers=10000) if timeout is None: timeout = 0.1 self.timeout = timeout self.__futures: Dict[tuple, Future] = dict() if maxqueries: self.__lock = RLock() self.__results = MaxSizeDict(maxsize=maxqueries) else: self.__lock = None self.__results = dict() __init__.__doc__ = QueryPool.__init__.__doc__ @contextmanager def _lock(self): if self.__lock is None: yield else: with self.__lock: yield
[docs] def execute( self, query: Callable, args: Optional[tuple] = tuple(), kwargs: Optional[dict] = None, timeout: Optional[float] = None, default=None, ) -> Any: if kwargs is None: kwargs = dict() call_id = query, args, tuple(kwargs.items()) future = self.__futures.get(call_id) if future is None: def wrapper(): try: result = query(*args, **kwargs) with self._lock(): self.__results[call_id] = False, result except BaseException as e: with self._lock(): self.__results[call_id] = True, e raise finally: with self._lock(): self.__futures.pop(call_id, None) with self._lock(): self.__futures[call_id] = future = self.__pool.submit(wrapper) if timeout is None: timeout = self.timeout try: future.result(timeout=timeout) except TimeoutError: pass result = self.__results.get(call_id, None) if result is None: return default is_error, result = result if is_error: raise result return result
execute.__doc__ = QueryPool.execute.__doc__
[docs] def wait(self, timeout=None) -> bool: try: with self._lock(): futures = list(self.__futures.values()) for future in futures: t0 = time() future.result(timeout=timeout) timeout = max(timeout - time() + t0, 0) except TimeoutError: return False return True
wait.__doc__ = QueryPool.wait.__doc__
[docs] def cancel(self, timeout=None, block=True) -> Optional[bool]: raise NotImplementedError("not supported")
cancel.__doc__ = QueryPool.cancel.__doc__
ThreadQueryPool.__doc__ = ThreadQueryPool.__doc__ + QueryPool.__doc__