Source code for logic1.abc.qe

"""This module :mod:`logic1.abc.qe` provides generic classes for effective
quantifier elimination, which can used by various theories via subclassing.
"""

from __future__ import annotations

from abc import abstractmethod
from collections import Counter
from dataclasses import dataclass, field
import logging
import multiprocessing as mp
import multiprocessing.managers
import multiprocessing.queues
import queue
import os
import threading
import time
from typing import (Any, Collection, Generic, Iterable, Iterator, Optional,
                    Self, TypeVar)
from typing import reveal_type  # noqa

from logic1.support.excepthook import NoTraceException
from logic1.firstorder import (All, And, AtomicFormula, _F, Formula, Not, Or,
                               Prefix, Term, Variable)
from logic1.support.logging import DeltaTimeFormatter, Timer

# Create logger
delta_time_formatter = DeltaTimeFormatter(
    f'%(asctime)s - %(name)s - %(levelname)-5s - %(delta)s: %(message)s')

stream_handler = logging.StreamHandler()
stream_handler.setFormatter(delta_time_formatter)

logger = logging.getLogger(__name__)
logger.propagate = False
logger.addHandler(stream_handler)
logger.addFilter(lambda record: record.msg.strip() != '')
logger.setLevel(logging.WARNING)

# Create multiprocessing logger
multiprocessing_formatter = DeltaTimeFormatter(
    f'%(asctime)s - %(name)s/%(process)-6d - %(levelname)-5s - %(delta)s: %(message)s')

multiprocessing_handler = logging.StreamHandler()
multiprocessing_handler.setFormatter(multiprocessing_formatter)
multiprocessing_logger = logging.getLogger('multiprocessing')
multiprocessing_logger.propagate = False
multiprocessing_logger.addHandler(multiprocessing_handler)

α = TypeVar('α', bound=AtomicFormula)
τ = TypeVar('τ', bound='Term')
χ = TypeVar('χ', bound=Variable)
σ = TypeVar('σ')

φ = TypeVar('φ', bound=Formula)
"""A type variable denoting a formula with upper bound
:class:`logic1.firstorder.formula.Formula`.
"""

ν = TypeVar('ν', bound='Node')
"""A type variable denoting a node with upper bound
:class:`logic1.abc.qe.Node`.
"""

ι = TypeVar('ι')
"""A type variable denoting the type of the principal argument of the
abstract method :meth:`.QuantifierElimination.init_env`."""

λ = TypeVar('λ', bound='Assumptions')
"""A type variable denoting a assumptions with upper bound :class:`.Assumptions`.
"""

ω = TypeVar('ω', bound='Options')
"""A type variable denoting a options for
:meth:`.QuantifierElimination.__call__` with upper bound :class:`.Options`.
"""


class FoundT(Exception):
    pass


class NodeProcessFailure(Exception):
    pass


[docs] @dataclass class Node(Generic[φ, χ, λ]): """Holds a subproblem for existential quantifier elimination. Theories implementing the interface can put restrictions on the existing fields and add further fields. """ # This is used in both the sequential and the parallel code. variables: list[χ] """A list of variables. """ formula: φ """A quantifier-free formula. """
[docs] @abstractmethod def copy(self) -> Self: """Create a copy of this node. """ ...
[docs] @abstractmethod def process(self, assumptions: λ) -> list[Self]: """This `node` describes a formula ``Ex(node.variables, node.formula)``. Select a `variable` from ``node.variables`` and compute a list `S` of successor nodes such that: 1. `variable` is not in ``successor.variables`` for `successor` in `S`; 2. `variable` does not occur in ``successor.formula`` for `successor` in `S`; 3. ``Or(*(Ex(successor.variables, successor.formula) for s in S))`` is logically equivalent to ``Ex(node.variables, node.formula)``. """ ...
@dataclass class NodeList(Collection[ν], Generic[φ, ν]): # Sequential only nodes: list[ν] = field(default_factory=list) memory: set[φ] = field(default_factory=set) hits: int = 0 candidates: int = 0 def __contains__(self, obj: object) -> bool: return obj in self.nodes def __iter__(self) -> Iterator[ν]: yield from self.nodes def __len__(self) -> int: return len(self.nodes) def append(self, node: ν) -> bool: is_new = node.formula not in self.memory if is_new: self.nodes.append(node) self.memory.add(node.formula) else: self.hits += 1 self.candidates += 1 return is_new def extend(self, nodes: Iterable[ν]) -> None: for node in nodes: self.append(node) def final_statistics(self, key: str) -> str: hits = self.hits candidates = self.candidates num_nodes = candidates - hits if num_nodes == 0: return '' ratio = self.hit_ratio() return (f'produced {num_nodes} {key} nodes, ' f'dropped {hits}/{candidates} = {ratio:.0%}') def hit_ratio(self) -> float: try: return float(self.hits) / self.candidates except ZeroDivisionError: return float('nan') def periodic_statistics(self, key: str) -> str: num_nodes = self.candidates - self.hits if num_nodes == 0: return '' ratio = self.hit_ratio() return f'{key}={num_nodes}, H={ratio:.0%}' @dataclass class WorkingNodeList(NodeList[φ, ν]): # Sequential only node_counter: Counter[int] = field(default_factory=Counter) def append(self, node: ν) -> bool: is_new = super().append(node) if is_new: n = len(node.variables) self.node_counter[n] += 1 return is_new def final_statistics(self, key: Optional[str] = None) -> str: if key: return super().final_statistics(key) hits = self.hits candidates = self.candidates num_nodes = candidates - hits ratio = self.hit_ratio() return (f'performed {num_nodes} elimination steps, ' f'skipped {hits}/{candidates} = {ratio:.0%}') def periodic_statistics(self, key: str = 'W') -> str: node_counter = self.node_counter ratio = self.hit_ratio() try: num_variables = max(k for k, v in node_counter.items() if v != 0) v = f'V={num_variables}' nc = '.'.join(f'{node_counter[n]}' for n in reversed(range(1, num_variables + 1))) w = f'{key}={nc}' vw = f'{v}, {w}' except ValueError: vw = 'V=0' return f'{vw}, H={ratio:.0%}' def pop(self) -> ν: node = self.nodes.pop() n = len(node.variables) self.node_counter[n] -= 1 return node def extend(self, nodes: Iterable[ν]) -> None: for node in nodes: match node.formula: case _F(): continue case Or(args=args): for arg in args: sub_node = node.copy() sub_node.variables = sub_node.variables.copy() sub_node.formula = arg self.append(sub_node) case And() | AtomicFormula(): self.append(node) case _: assert False, node @dataclass class NodeListManager(Generic[φ, ν]): nodes: list[ν] = field(default_factory=list) nodes_lock: threading.Lock = field(default_factory=threading.Lock) memory: set[φ] = field(default_factory=set) memory_lock: threading.Lock = field(default_factory=threading.Lock) hits: int = 0 hits_candidates_lock: threading.Lock = field(default_factory=threading.Lock) candidates: int = 0 def get_nodes(self) -> list[ν]: with self.nodes_lock: return self.nodes.copy() def get_memory(self) -> set[φ]: with self.memory_lock: return self.memory.copy() def get_candidates(self) -> int: return self.candidates def get_hits(self) -> int: return self.hits def __len__(self) -> int: with self.nodes_lock: return len(self.nodes) def append(self, node: ν) -> bool: with self.memory_lock: is_new = node.formula not in self.memory if is_new: self.memory.add(node.formula) if is_new: with self.nodes_lock: self.nodes.append(node) with self.hits_candidates_lock: if not is_new: self.hits += 1 self.candidates += 1 return is_new def extend(self, nodes: Iterable[ν]) -> None: for node in nodes: self.append(node) def statistics(self) -> tuple[Any, ...]: with self.hits_candidates_lock: return (self.hits, self.candidates) class NodeListProxy(Collection[ν], Generic[φ, ν]): # parallel def __init__(self, proxy: _NodeListProxy) -> None: self._proxy = proxy @property def nodes(self) -> list[ν]: return self._proxy.get_nodes() @property def memory(self) -> set[φ]: return self._proxy.get_memory() @property def candidates(self) -> int: return self._proxy.get_candidates() @property def hits(self) -> int: return self._proxy.get_hits() def __contains__(self, obj: object) -> bool: match obj: case Node(): return obj in self._proxy.get_nodes() case _: return False def __iter__(self) -> Iterator[ν]: yield from self._proxy.get_nodes() def __len__(self) -> int: return len(self._proxy) def append(self, node: ν) -> bool: return self._proxy.append(node) def extend(self, nodes: list[ν]) -> None: self._proxy.extend(nodes) def final_statistics(self, key: str) -> str: hits, candidates = self._proxy.statistics() num_nodes = candidates - hits if num_nodes == 0: return '' ratio = self.hit_ratio(hits, candidates) return (f'produced {num_nodes} {key} nodes, ' f'dropped {hits}/{candidates} = {ratio:.0%}') def hit_ratio(self, hits: int, candidates: int) -> float: try: return float(hits) / candidates except ZeroDivisionError: return float('nan') def periodic_statistics(self, key: str) -> str: hits, candidates = self._proxy.statistics() num_nodes = candidates - hits if num_nodes == 0: return '' ratio = self.hit_ratio(hits, candidates) return f'{key}={num_nodes}, H={ratio:.0%}' class _NodeListProxy(mp.managers.BaseProxy, Generic[φ, ν]): def get_nodes(self) -> list[ν]: return self._callmethod('get_nodes') # type: ignore[func-returns-value] def get_memory(self) -> set[φ]: return self._callmethod('get_memory') # type: ignore[func-returns-value] def get_candidates(self) -> int: return self._callmethod('get_candidates') # type: ignore[func-returns-value] def get_hits(self) -> int: return self._callmethod('get_hits') # type: ignore[func-returns-value] def __len__(self) -> int: return self._callmethod('__len__') # type: ignore[func-returns-value] def append(self, node: ν) -> bool: return self._callmethod('append', (node,)) # type: ignore[func-returns-value] def extend(self, nodes: Iterable[ν]) -> None: return self._callmethod('extend', (nodes,)) # type: ignore[func-returns-value] def statistics(self) -> tuple[Any, ...]: return self._callmethod('statistics') # type: ignore[func-returns-value] @dataclass class WorkingNodeListManager(NodeListManager[φ, ν]): busy: int = 0 busy_lock: threading.Lock = field(default_factory=threading.Lock) node_counter: Counter[int] = field(default_factory=Counter) node_counter_lock: threading.Lock = field(default_factory=threading.Lock) def get_node_counter(self) -> Counter[int]: with self.node_counter_lock: return self.node_counter.copy() def append(self, node: ν) -> bool: is_new = super().append(node) if is_new: n = len(node.variables) with self.node_counter_lock: self.node_counter[n] += 1 return is_new def is_finished(self) -> bool: with self.nodes_lock, self.busy_lock: return len(self.nodes) == 0 and self.busy == 0 def statistics(self) -> tuple[Any, ...]: # hits and candidates are always consistent. hits/candidates, busy, # node_counter are three snapshots at different times, each of which is # consistent. with self.hits_candidates_lock, self.busy_lock, self.node_counter_lock: return (self.hits, self.candidates, self.busy, self.node_counter) def pop(self) -> ν: with self.nodes_lock: node = self.nodes.pop() n = len(node.variables) with self.node_counter_lock: self.node_counter[n] -= 1 with self.busy_lock: self.busy += 1 return node def task_done(self) -> None: with self.busy_lock: self.busy -= 1 class WorkingNodeListProxy(NodeListProxy[φ, ν]): def __init__(self, proxy: _WorkingNodeListProxy) -> None: self._proxy: _WorkingNodeListProxy = proxy @property def node_counter(self) -> Counter[int]: return self._proxy.get_node_counter() def extend(self, nodes: Iterable[ν]) -> None: new_nodes = [] for node in nodes: match node.formula: case _F(): continue case Or(args=args): for arg in args: sub_node = node.copy() sub_node.variables = node.variables.copy() sub_node.formula = arg if sub_node not in new_nodes: new_nodes.append(sub_node) case And() | AtomicFormula(): if node not in new_nodes: new_nodes.append(node) case _: assert False, node self._proxy.extend(new_nodes) def final_statistics(self, key: Optional[str] = None) -> str: if key: return super().final_statistics(key) hits, candidates, _, _ = self._proxy.statistics() num_nodes = candidates - hits ratio = self.hit_ratio(hits, candidates) return (f'performed {num_nodes} elimination steps, ' f'skipped {hits}/{candidates} = {ratio:.0%}') def is_finished(self) -> bool: return self._proxy.is_finished() def periodic_statistics(self, key: str = 'W') -> str: hits, candidates, busy, node_counter = self._proxy.statistics() ratio = self.hit_ratio(hits, candidates) try: num_variables = max(k for k, v in node_counter.items() if v != 0) v = f'V={num_variables}' nc = '.'.join(f'{node_counter[n]}' for n in reversed(range(1, num_variables + 1))) w = f'{key}={nc}' vw = f'{v}, {w}' except ValueError: vw = 'V=0' return f'{vw}, B={busy}, H={ratio:.0%}' def pop(self) -> ν: return self._proxy.pop() def task_done(self) -> None: self._proxy.task_done() class _WorkingNodeListProxy(_NodeListProxy[φ, ν]): def get_node_counter(self) -> Counter[int]: return self._callmethod('get_node_counter') # type: ignore[func-returns-value] def is_finished(self) -> bool: return self._callmethod('is_finished') # type: ignore[func-returns-value] def pop(self) -> ν: return self._callmethod('pop') # type: ignore[func-returns-value] def task_done(self) -> None: return self._callmethod('task_done') # type: ignore[func-returns-value] class SyncManager(mp.managers.SyncManager, Generic[φ, ν]): def NodeList(self) -> NodeListProxy[φ, ν]: proxy = self._NodeListProxy() # type: ignore[attr-defined] return NodeListProxy(proxy) def WorkingNodeList(self) -> WorkingNodeListProxy[φ, ν]: proxy = self._WorkingNodeListProxy() # type: ignore[attr-defined] return WorkingNodeListProxy(proxy) SyncManager.register('_NodeListProxy', NodeListManager, _NodeListProxy, ['get_nodes', 'get_memory', 'get_candidates', 'get_hits', '__len__', 'append', 'extend', 'statistics']) SyncManager.register('_WorkingNodeListProxy', WorkingNodeListManager, _WorkingNodeListProxy, ['get_nodes', 'get_memory', 'get_candidates', 'get_hits', 'get_node_counter', '__len__', 'append', 'extend', 'is_finished', 'pop', 'statistics', 'task_done'])
[docs] @dataclass class Assumptions(Generic[α, τ, χ, σ]): """Holds the currently valid assumptions. This starts with user assumptions explicitly provided by the user. Certain variants of quantified elimination may add further assumptions in the course of the elimination. .. seealso:: * The argument `assume` of :meth:`.QuantifierElimination.__call__`. * Generic quantifier elimination in :mod:`.RCF.qe`. This is an upper bound for the type variable :data:`.λ`. """ class Inconsistent(Exception): """Raised when the assumptions made become inconsistent. """ pass atoms: list[α] """A list of atoms holding the current set of assumptions. """ def __init__(self, atoms: Iterable[α]) -> None: self.atoms = list(atoms)
[docs] def append(self, new_atom: α) -> None: """Add `new_atom` as another assumption and simplify. """ self.extend([new_atom])
[docs] def extend(self, new_atoms: Iterable[α]) -> None: """Add `new_atoms` as further assumptions and simplify. """ self.atoms.extend(new_atoms) # NF nörgelt theta = self.simplify(And(*self.atoms)) if Formula.is_atomic(theta): self.atoms = [theta] elif Formula.is_and(theta): self.atoms = [*theta.args] elif Formula.is_true(theta): self.atoms = [] elif Formula.is_false(theta): raise self.Inconsistent(f'{self=}, {new_atoms=}, {theta=}') else: assert False, (self, new_atoms, theta)
[docs] @abstractmethod def simplify(self, f: Formula[α, τ, χ, σ]) -> Formula[α, τ, χ, σ]: """`f` is a (possibly unary or trivial) conjunction of atoms. Simplifes `f` in such a way that the result is again a (possibly unary or trivial) conjunction of atoms. Raises :class:`.Inconsistent` if `f` is simplified to :data:`.F`. """ ...
[docs] @dataclass class Options: """This class holds options that can be provided to :meth:`.QuantifierElimination.__call__`. Theories subclassing :class:`.QuantifierElimination` can add further options by subclassing :class:`.Options`. This is an upper bound for the type variable :data:`.ω`. """ log_level: int = logging.NOTSET """The `log_level` of the logger used by :class:`.QuantifierElimination`. """ log_rate: float = 0.5 """The minimal timespan (in s) between to log outputs in certain loops. """
[docs] @dataclass class QuantifierElimination(Generic[ν, λ, ι, ω, α, τ, χ, σ]): """A generic callable class that implements quantifier elimination. """ # Attribute group 1 - arguments of :meth:`.__call__`: workers: int = 0 """The number of worker processes used. For more information see the documentation of the parameter `workers` of :meth:`.__call__`. """ options: Optional[ω] = None """The options that have been passed to :meth:`.__call__`. """ # Attribute group 2 - arguments state of the computation: _assumptions: Optional[λ] = None """Wraps a list of atoms, which serve as external assumptions. This includes the assumptions passed via the `assume` parameter of :meth:`__call__`. Some theories have an option for *generic quantifier elimination*, which adds additional assumptions on parameters in the course of the elimination. """ blocks: Optional[Prefix[χ]] = None """Remaining quantifier blocks, to be processed after the current block. """ matrix: Optional[Formula[α, τ, χ, σ]] = None """The quantifier-free formula associated with :attr:`.blocks`. This is :obj:`None` while there is a block being processed. """ negated: Optional[bool] = None """Indicates whether or not the block currently processed has been logically negated in order to equivalently transform universal quantifiers into existential quanitifers. """ root_nodes: Optional[list[ν]] = None """The root nodes of the next block to be processed. Logically, the list describes a disjunction, and each `node` in `root_nodes` describes a quantifier elimination subproblem ``Ex(node.variables, node.formula)``. This is an intermediate object for moving the innermost block with and the matrix into the :attr:`.working_nodes`. """ working_nodes: Optional[WorkingNodeList[Formula[α, τ, χ, σ], ν]] = None """Subproblems left for the current block. Element nodes of :attr:`.working_nodes` have the same shape as element nodes of :attr:`.root_nodes` """ success_nodes: Optional[NodeList[Formula[α, τ, χ, σ], ν]] = None """Finished subproblems of the current block. For each `node` in `success_nodes` we have ``node.variables == []``. """ failure_nodes: Optional[NodeList[Formula[α, τ, χ, σ], ν]] = None """Failed subproblems of the current block, which can occur with incomplete quantifier elimination procedures. An element nodes of :attr:`.failure_nodes` have the same shape as an element node of :attr:`.working_nodes`, but quantifier elimination procedure could not eliminate any variable from the node. """ result: Optional[Formula[α, τ, χ, σ]] = None """The final result as returned by :meth:`.__call__`. """ # Attribute group 3 - timings; all times are wall times in seconds: time_final_simplification: Optional[float] = None """The time spent for finally simplifying the disjunction over all :attr:`.success_nodes` imported from the workers. This yields the final :attr:`.result`, which is also the return value of :meth:`.__call__`. """ time_import_failure_nodes: Optional[float] = None """The time spent for importing all :attr:`.failure_nodes` from the :class:`SyncManager <multiprocessing.managers.SyncManager>` into the master process after all workers have terminated. """ time_import_success_nodes: Optional[float] = None """The time spent for importing all :attr:`.success_nodes` from the :class:`SyncManager <multiprocessing.managers.SyncManager>` into the master process after all workers have terminated. """ time_import_working_nodes: Optional[float] = None """The time spent for importing all :attr:`.working_nodes` from the :class:`SyncManager <multiprocessing.managers.SyncManager>` into the master process after all workers have terminated. """ time_multiprocessing: Optional[float] = None """The time spent in :mod:`multiprocessing` after the first worker process has been started and until the last worker process has terminated. """ time_start_first_worker: Optional[float] = None """The time spent for starting the first worker process in :mod:`multiprocessing`. """ time_start_all_workers: Optional[float] = None """The time spent for starting all worker processes in :mod:`multiprocessing`. """ time_syncmanager_enter: Optional[float] = None """The time spent for starting the :class:`SyncManager <multiprocessing.managers.SyncManager>`, which is a proxy process that manages shared data in :mod:`multiprocessing`. """ time_syncmanager_exit: Optional[float] = None """The time spent for exiting the :class:`SyncManager <multiprocessing.managers.SyncManager>`. """ time_total: Optional[float] = None """The total time spent in :meth:`.__call__`. """
[docs] def __call__(self, f: Formula[α, τ, χ, σ], assume: Iterable[α] = [], workers: int = 0, **options) -> Optional[Formula[α, τ, χ, σ]]: """The entry point of the callable class :class:`.QuantifierElimination`. :param f: The input formula to which quantifier elimination will be applied. :param assume: A list of atomic formulas that are assumed to hold. The return value is equivalent modulo those assumptions. :param workers: Specifies the number of processes to be used in parallel: * The default value `workers=0` uses a sequential implementation, which avoids overhead when input problems are small. For all other values, there are additional processes started. * A positive value `workers=n > 0` uses `n + 2` processes: the master process, `n` worker processes, and a proxy processes that manages shared data. .. note:: `workers=1` uses the parallel implementation with only one worker. Algorithmically this is similar to the sequential version with `workers=0` but comes at the cost of 2 additional processes. * A negative value `workers=-n < 0` specifies ``os.num_cpu() - n`` many workers. It follows that `workers=-2` exactly allocates all of CPUs of the machine, and workers=-3 is an interesting choice, which leaves one CPU free for smooth interaction with the machine. :param `**options`: Keyword arguments with keywords corresponding to attributes of the generic type :data:`.ω`, which extends :class:`.Options`. :returns: A quantifier-free equivalent of `f` modulo certain assumptions. A simplified equivalent of all relevant assumptions are available as :attr:`.assumptions`. * Regularly, the assumptions are exactly those passed as the `assume` parameter. * Some theories have an option for *generic quantifier elimination*, which adds additional assumptions in the course of the elimination. """ timer = Timer() delta_time_formatter.set_reference_time(time.time()) # We call __init__ in order to reset all attributes of the data class # also within __call__. This is not really nice, but it does the job # and saves some code. QuantifierElimination.__init__(self) # dicuss: NF is :-( self._assumptions = self.create_assumptions(assume) if workers >= 0: self.workers = workers else: cpu_count = os.cpu_count() if cpu_count is None: raise ValueError(f'{os.cpu_count()=}, i.e. undetermined') if cpu_count + workers < 1: raise ValueError(f'negative number of workers') self.workers = cpu_count + workers self.options = self.create_options(**options) save_level = logger.getEffectiveLevel() try: logger.setLevel(self.options.log_level) logger.info(f'{self.options}') result = self.quantifier_eliminiation(f, workers) logger.info('finished') except KeyboardInterrupt: logger.info('keyboard interrupt') raise NoTraceException('KeyboardInterrupt') finally: logger.setLevel(save_level) self.time_total = timer.get() return result
@property def assumptions(self) -> list[α]: """A list of atoms, which serve as external assumptions. This includes the assumptions passed via the `assume` parameter of :meth:`__call__`. Some theories have an option for *generic quantifier elimination*, which adds additional assumptions on parameters in the course of the elimination. """ assert self._assumptions is not None return self._assumptions.atoms.copy() def collect_success_nodes(self) -> None: assert self.success_nodes is not None logger.debug(f'entering {self.collect_success_nodes.__name__}') self.matrix = Or(*(node.formula for node in self.success_nodes)) if self.negated: self.matrix = Not(self.matrix) self.negated = None self.working_nodes = None self.success_nodes = None
[docs] @abstractmethod def create_options(self, **kwargs) -> ω: """Create an instance of :data:`.ω` that holds `**kwargs`. The `**kwargs` arriving here are the `**options` that have been passed to :meth:`__call__`. """ ...
[docs] @abstractmethod def create_root_nodes(self, variables: Iterable[χ], matrix: Formula[α, τ, χ, σ]) -> list[ν]: """If `matrix` is not a disjunction, create a list containing one instance `node` of :data:`.ν` with ``node.variables == variables`` and ``node.formula == matrix``. If `matrix` is a disjunction ``Or(*args)``, create a list containing one such node for each `arg` in `args`. """ ...
[docs] @abstractmethod def create_assumptions(self, assume: Iterable[α]) -> λ: """Create in instance of :data:`.λ` that holds `assume`. Those assumptions `assume` are the corresponding parameter of :meth:`.__call__`. """ ...
[docs] @abstractmethod def create_true_node(self) -> ν: """Create an instance `node` of :data:`.ν` with ``node.variables == []`` and ``node.formula == _T()``. """ ...
def pop_block(self) -> None: assert self.blocks is not None logger.debug(f'entering {self.pop_block.__name__}') assert self.matrix is not None, self.matrix logger.info(str(self.blocks)) if logger.isEnabledFor(logging.DEBUG): s = str(self.matrix) logger.debug(s[:50] + '...' if len(s) > 53 else s) quantifier, vars_ = self.blocks.pop() matrix = self.matrix self.matrix = None if quantifier is All: self.negated = True matrix = Not(matrix) else: self.negated = False self.root_nodes = self.create_root_nodes(vars_, matrix) def final_simplification(self): logger.debug(f'entering {self.final_simplification.__name__}') if logger.isEnabledFor(logging.DEBUG): num_atoms = sum(1 for _ in self.matrix.atoms()) logger.debug(f'found {num_atoms} atoms') logger.info('final simplification') timer = Timer() self.result = self.final_simplify(self.matrix, assume=self._assumptions.atoms) self.time_final_simplification = timer.get() if logger.isEnabledFor(logging.DEBUG): logger.debug(f'{self.time_final_simplification=:.3f}') num_atoms = sum(1 for _ in self.result.atoms()) logger.debug(f'produced {num_atoms} atoms')
[docs] @abstractmethod def final_simplify(self, formula: Formula[α, τ, χ, σ], assume: Iterable[α] = []) \ -> Formula[α, τ, χ, σ]: """Used for simplifying the disjunction of all :attr:`.success_nodes`. The return value yields :attr:`result`, which is then used as the return value of :meth:`.__call__`. """ ...
[docs] @classmethod @abstractmethod def init_env(cls, arg: ι) -> None: """A hook for initialization of worker process. This is used, e.g., in :ref:`Real Closed Fields <api-RCF-qe>` for reconstructing within the worker the Sage polynomial ring of the master. """ ...
[docs] @abstractmethod def init_env_arg(self) -> ι: """Create an instance of :data:`.ι` to be used as an argument for a subsequent call of :meth:`.init_env()`. """ ...
def parallel_process_block(self) -> None: def wait_for_processes_to_finish(): still_running = sentinels.copy() while still_running: for sentinel in mp.connection.wait(still_running): still_running.remove(sentinel) num_finished = self.workers - len(still_running) pl = 'es' if num_finished > 1 else '' logger.debug(f'{num_finished} worker process{pl} finished, ' f'{len(still_running)} running') # The following call joins all finished processes as a side effect. # Otherwise, they would remain in the process table as zombies. mp.active_children() assert self.options is not None assert self.root_nodes is not None assert self._assumptions is not None logger.debug('entering sync manager context') timer = Timer() manager: SyncManager[Formula[α, τ, χ, σ], ν] with SyncManager() as manager: self.time_syncmanager_enter = timer.get() logger.debug(f'{self.time_syncmanager_enter=:.3f}') m_lock = manager.Lock() working_nodes: WorkingNodeListProxy[Formula[α, τ, χ, σ], ν] = manager.WorkingNodeList() working_nodes.extend(self.root_nodes) self.root_nodes = None success_nodes: multiprocessing.Queue[Optional[list[ν]]] = multiprocessing.Queue() self.success_nodes = NodeList() failure_nodes = manager.NodeList() # type: ignore final_assumptions: multiprocessing.Queue[λ] = multiprocessing.Queue() found_t = manager.Value('i', 0) processes: list[mp.Process] = [] sentinels: list[int] = [] log_level = logger.getEffectiveLevel() reference_time = delta_time_formatter.get_reference_time() logger.debug(f'starting worker processes in {range(self.workers)}') born_processes = manager.Value('i', 0) timer.reset() for i in range(self.workers): process = mp.Process( target=self.parallel_process_block_worker, args=(working_nodes, success_nodes, failure_nodes, self._assumptions, final_assumptions, m_lock, found_t, i, log_level, reference_time, born_processes, self.init_env_arg())) process.start() processes.append(process) sentinels.append(process.sentinel) try: born = 0 while born < 1: time.sleep(0.0001) with m_lock: born = born_processes.value self.time_start_first_worker = timer.get() logger.debug(f'{self.time_start_first_worker=:.3f}') if self.workers > 1: while born < self.workers: time.sleep(0.0001) with m_lock: born = born_processes.value self.time_start_all_workers = timer.get() else: self.time_start_all_workers = self.time_start_first_worker logger.debug(f'{self.time_start_all_workers=:.3f}') workers_running = self.workers log_timer = Timer() while workers_running > 0: if logger.isEnabledFor(logging.INFO): t = log_timer.get() if t >= self.options.log_rate: logger.info(working_nodes.periodic_statistics()) logger.info(self.success_nodes.periodic_statistics('S')) logger.info(failure_nodes.periodic_statistics('F')) log_timer.reset() try: nodes = success_nodes.get(timeout=0.001) except queue.Empty: continue if nodes is not None: self.success_nodes.extend(nodes) else: workers_running -= 1 except KeyboardInterrupt: logger.debug('KeyboardInterrupt, waiting for processes to finish') wait_for_processes_to_finish() raise wait_for_processes_to_finish() self.time_multiprocessing = timer.get() - self.time_start_first_worker logger.debug(f'{self.time_multiprocessing=:.3f}') new_assumptions = [] for i in range(self.workers): new_assumptions.extend(final_assumptions.get().atoms) self._assumptions.extend(new_assumptions) if found_t.value > 0: pl = 's' if found_t.value > 1 else '' logger.debug(f'{found_t.value} worker{pl} found T') # The exception handler for FoundT in virtual_substitution will # log final statistics. We do not retrieve nodes and memory, # which would cost significant time and space. We neither # retrieve the node_counter, which would be not consistent with # our empty nodes. self.working_nodes = WorkingNodeList( hits=working_nodes.hits, candidates=working_nodes.candidates) # TODO: wipe self.success_nodes and self.failure_nodes raise FoundT() logger.info(working_nodes.final_statistics()) logger.info(self.success_nodes.final_statistics('success')) logger.info(failure_nodes.final_statistics('failure')) logger.info('importing results from manager') logger.debug('importing working nodes from manager') # We do not retrieve the memory, which would cost significant time # and space. Same for success nodes and failure nodes below. timer.reset() self.working_nodes = WorkingNodeList( nodes=working_nodes.nodes, hits=working_nodes.hits, candidates=working_nodes.candidates, node_counter=working_nodes.node_counter) self.time_import_working_nodes = timer.get() logger.debug(f'{self.time_import_working_nodes=:.3f}') assert self.working_nodes.nodes == [] assert self.working_nodes.node_counter.total() == 0 logger.debug('importing failure nodes from manager') timer.reset() self.failure_nodes = NodeList(nodes=failure_nodes.nodes, hits=failure_nodes.hits, candidates=failure_nodes.candidates) self.time_import_failure_nodes = timer.get() logger.debug(f'{self.time_import_failure_nodes=:.3f}') logger.debug('leaving sync manager context') timer.reset() self.time_syncmanager_exit = timer.get() logger.debug(f'{self.time_syncmanager_exit=:.3f}') @classmethod def parallel_process_block_worker(cls, working_nodes: WorkingNodeListProxy, success_nodes: multiprocessing.Queue[Optional[list[Node]]], failure_nodes: NodeListProxy, assumptions: Assumptions, final_assumptions: multiprocessing.Queue[Assumptions], m_lock: threading.Lock, found_t: mp.sharedctypes.Synchronized, i: int, log_level: int, reference_time: float, born_processes: mp.sharedctypes.Synchronized, init_env_arg: ι) -> None: try: with m_lock: born_processes.value += 1 multiprocessing_logger.setLevel(log_level) multiprocessing_formatter.set_reference_time(reference_time) multiprocessing_logger.debug(f'worker process {i} is running') cls.init_env(init_env_arg) while found_t.value == 0 and not working_nodes.is_finished(): try: node = working_nodes.pop() except IndexError: time.sleep(0.001) continue try: nodes = node.process(assumptions) except NodeProcessFailure: failure_nodes.append(node) working_nodes.task_done() continue except FoundT: with m_lock: found_t.value += 1 break if nodes: if nodes[0].variables: working_nodes.extend(nodes) else: success_nodes.put(nodes) working_nodes.task_done() except KeyboardInterrupt: multiprocessing_logger.debug(f'worker process {i} caught KeyboardInterrupt') success_nodes.put(None) multiprocessing_logger.debug(f'sending {assumptions=}') final_assumptions.put(assumptions) multiprocessing_logger.debug(f'worker process {i} exiting') def process_block(self) -> None: logger.debug(f'entering {self.process_block.__name__}') if self.workers > 0: return self.parallel_process_block() return self.sequential_process_block() def sequential_process_block(self) -> None: assert self.options is not None assert self.root_nodes is not None self.working_nodes = WorkingNodeList() self.working_nodes.extend(self.root_nodes) self.root_nodes = None self.success_nodes = NodeList() self.failure_nodes = NodeList() if logger.isEnabledFor(logging.INFO): last_log = time.time() while self.working_nodes.nodes: if logger.isEnabledFor(logging.INFO): t = time.time() if t - last_log >= self.options.log_rate: logger.info(self.working_nodes.periodic_statistics()) logger.info(self.success_nodes.periodic_statistics('S')) logger.info(self.failure_nodes.periodic_statistics('F')) last_log = t node = self.working_nodes.pop() try: nodes = node.process(self._assumptions) except NodeProcessFailure: self.failure_nodes.append(node) continue if nodes: if nodes[0].variables: self.working_nodes.extend(nodes) else: self.success_nodes.extend(nodes) logger.info(self.working_nodes.final_statistics()) logger.info(self.success_nodes.final_statistics('success')) logger.info(self.failure_nodes.final_statistics('failure')) def status(self, dump_nodes: bool = False) -> str: def negated_as_str() -> str: match self.negated: case None: read_as = '' case False: read_as = ' # read as Ex' case True: read_as = ' # read as Not All' case _: assert False, self.negated return f'{self.negated},{read_as}' def nodes_as_str(nodes: Optional[Collection[Node]]) -> Optional[str]: if nodes is None: return None match dump_nodes: case True: h = ',\n '.join(f'{node}' for node in nodes) return f'[{h}]' case False: return f'{len(nodes)}' return (f'{self.__class__.__qualname__}(\n' f' blocks = {self.blocks},\n' f' matrix = {self.matrix},\n' f' negated = {negated_as_str()}\n' f' root_nodes = {self.root_nodes}\n' f' working_nodes = {nodes_as_str(self.working_nodes)},\n' f' success_nodes = {nodes_as_str(self.success_nodes)},\n' f' failure_nodes = {nodes_as_str(self.failure_nodes)},\n' f' result = {self.result}' f')') # discuss: We probably do not want to print def timings(self, precision: int = 7) -> None: match self.workers: case 0: print(f'{self.workers=}') print(f'{self.time_syncmanager_enter=}') print(f'{self.time_start_first_worker=}') print(f'{self.time_start_all_workers=}') print(f'{self.time_multiprocessing=}') print(f'{self.time_import_working_nodes=}') print(f'{self.time_import_success_nodes=}') print(f'{self.time_import_failure_nodes=}') print(f'{self.time_final_simplification=:.{precision}f}') print(f'{self.time_syncmanager_exit=}') print(f'{self.time_total=:.{precision}}') case _: print(f'{self.workers=}') print(f'{self.time_syncmanager_enter=:.{precision}f}') print(f'{self.time_start_first_worker=:.{precision}f}') print(f'{self.time_start_all_workers=:.{precision}f}') print(f'{self.time_multiprocessing=:.{precision}f}') print(f'{self.time_import_working_nodes=:.{precision}f}') print(f'{self.time_import_failure_nodes=:.{precision}f}') print(f'{self.time_final_simplification=:.{precision}f}') print(f'{self.time_syncmanager_exit=:.{precision}f}') print(f'{self.time_total=:.{precision}f}') def quantifier_eliminiation(self, f: Formula[α, τ, χ, σ], workers: int): """Quantifier elimination main loop. """ logger.debug(f'entering {self.quantifier_eliminiation.__name__}') f = f.to_pnf() self.matrix, self.blocks = f.matrix() while self.blocks: try: self.pop_block() self.process_block() except FoundT: logger.info('found T') if self.working_nodes is None: logger.info(WorkingNodeList().final_statistics()) else: logger.info(self.working_nodes.final_statistics()) self.working_nodes = None self.success_nodes = NodeList(nodes=[self.create_true_node()]) else: if self.failure_nodes: n = len(self.failure_nodes.nodes) raise NoTraceException(f'Failed - {n} failure nodes') self.collect_success_nodes() self.final_simplification() logger.debug(f'leaving {self.quantifier_eliminiation.__name__}') return self.result