Source code for querypool.pools.threading
from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import TimeoutError
from contextlib import contextmanager
from threading import RLock
from time import time
from typing import Any
from typing import Callable
from typing import Dict
from typing import Optional
from ..utils.maxsizedict import MaxSizeDict
from . import GEVENT_PATCHED
from .interface import QueryPool
[docs]
class ThreadQueryPool(QueryPool):
"""
This query pool is to be used in an environment that is not gevent patched.
"""
def __init__(
self, timeout: Optional[float] = None, maxqueries: Optional[int] = None
):
if GEVENT_PATCHED:
raise RuntimeError("requires an environment non pactehd by gevent")
self.__pool = ThreadPoolExecutor(max_workers=10000)
if timeout is None:
timeout = 0.1
self.timeout = timeout
self.__futures: Dict[tuple, Future] = dict()
if maxqueries:
self.__lock = RLock()
self.__results = MaxSizeDict(maxsize=maxqueries)
else:
self.__lock = None
self.__results = dict()
__init__.__doc__ = QueryPool.__init__.__doc__
@contextmanager
def _lock(self):
if self.__lock is None:
yield
else:
with self.__lock:
yield
[docs]
def execute(
self,
query: Callable,
args: Optional[tuple] = tuple(),
kwargs: Optional[dict] = None,
timeout: Optional[float] = None,
default=None,
) -> Any:
if kwargs is None:
kwargs = dict()
call_id = query, args, tuple(kwargs.items())
future = self.__futures.get(call_id)
if future is None:
def wrapper():
try:
result = query(*args, **kwargs)
with self._lock():
self.__results[call_id] = False, result
except BaseException as e:
with self._lock():
self.__results[call_id] = True, e
raise
finally:
with self._lock():
self.__futures.pop(call_id, None)
with self._lock():
self.__futures[call_id] = future = self.__pool.submit(wrapper)
if timeout is None:
timeout = self.timeout
try:
future.result(timeout=timeout)
except TimeoutError:
pass
result = self.__results.get(call_id, None)
if result is None:
return default
is_error, result = result
if is_error:
raise result
return result
execute.__doc__ = QueryPool.execute.__doc__
[docs]
def wait(self, timeout=None) -> bool:
try:
with self._lock():
futures = list(self.__futures.values())
for future in futures:
t0 = time()
future.result(timeout=timeout)
timeout = max(timeout - time() + t0, 0)
except TimeoutError:
return False
return True
wait.__doc__ = QueryPool.wait.__doc__
[docs]
def cancel(self, timeout=None, block=True) -> Optional[bool]:
raise NotImplementedError("not supported")
cancel.__doc__ = QueryPool.cancel.__doc__
ThreadQueryPool.__doc__ = ThreadQueryPool.__doc__ + QueryPool.__doc__