Skip to content

ProbLog Compiler

Bases: Compiler

Source code in src/typedlogic/integrations/solvers/problog/problog_compiler.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
class ProbLogCompiler(Compiler):
    default_suffix: ClassVar[str] = "problog"
    _predicate_mappings: Optional[Dict[str, str]] = None

    def compile(self, theory: Theory, syntax: Optional[Union[str, ModelSyntax]] = None, **kwargs) -> str:
        """
        Compile a Theory object into ProbLog code.

        Example:

            >>> from typedlogic import SentenceGroup, PredicateDefinition, Forall, Variable, Theory
            >>> x = Variable('x')
            >>> y = Variable('y')
            >>> z = Variable('z')
            >>> theory = Theory(
            ...     predicate_definitions=[PredicateDefinition(predicate="AncestorOf", arguments={'ancestor': 'str', 'descendant': 'str'})],
            ...     ground_terms=[Term('AncestorOf', 'p1', 'p1a'), Term('AncestorOf', 'p1a', 'p1aa')],
            ... )
            >>> theory.add(Forall([x, y, z], (Term('AncestorOf', x, y) & Term('AncestorOf', y, z)) >> Term('AncestorOf', x, z)))
            >>> theory.add(Probability(0.5, That(Term('AncestorOf', 'p1', 'p1a'))))
            >>> theory.add(Probability(0.5, That(Term('AncestorOf', 'p1a', 'p1aa'))))
            >>> compiler = ProbLogCompiler()
            >>> print(compiler.compile(theory))
            ancestorof(X, Z) :- ancestorof(X, Y), ancestorof(Y, Z).
            0.5::ancestorof("p1", "p1a").
            0.5::ancestorof("p1a", "p1aa").
            ancestorof("p1", "p1a").
            ancestorof("p1a", "p1aa").
            query(ancestorof(Ancestor, Descendant)).

        Note like most compilers, you don't need to use this directly. It is more common to use ProblogSolver, which takes care of
        compiling the problog program, feeding it to problog, and parsing results.

        :param theory:
        :param syntax:
        :param kwargs:
        :return:
        """
        include_queries = kwargs.get("include_queries", True)
        prolog_config = self.prolog_config()
        if not self._predicate_mappings:
            self._predicate_mappings = {}
        for pd in theory.predicate_definitions:
            prolog_pd = as_prolog(Term(pd.predicate), config=prolog_config)
            if "(" in prolog_pd:
                prolog_pd = prolog_pd[: prolog_pd.index("(")]
            self._predicate_mappings[prolog_pd] = pd.predicate
        clauses = []
        clauses.extend(self.false_predicate_declarations(theory, prolog_config))
        for sentence in theory.sentences + theory.ground_terms:
            clause = self._sentence_to_problog(sentence, prolog_config)
            if clause:
                clauses.append(clause)
        if include_queries:
            for pd in theory.predicate_definitions:
                if pd.predicate in [Probability.__name__, That.__name__, Evidence.__name__]:
                    continue
                query_vars = [Variable(a) for a in pd.arguments.keys()]
                term = Term("query", Term(pd.predicate, *query_vars))
                clause = self._sentence_to_problog(term, prolog_config)
                clauses.append(clause)
        return "\n".join(clauses)

    @staticmethod
    def prolog_config() -> PrologConfig:
        """Return Prolog rendering configuration for ProbLog syntax."""
        return PrologConfig(
            disjunctive_datalog=True,
            double_quote_strings=True,
            operator_map={"eq": "=", "ne": r"\="},
            allow_nesting=False,
            allow_ungrounded_vars_in_head=True,
        )

    @staticmethod
    def false_predicate_declarations(theory: Theory, prolog_config: PrologConfig) -> List[str]:
        """Return false clauses for declared predicates that may otherwise be empty."""
        declarations = []
        seen: set[tuple[str, int]] = set()
        for predicate_definition in theory.predicate_definitions:
            if predicate_definition.predicate in [Probability.__name__, That.__name__, Evidence.__name__]:
                continue
            predicate = as_prolog(Term(predicate_definition.predicate), config=prolog_config)
            if "(" in predicate:
                predicate = predicate[: predicate.index("(")]
            arity = len(predicate_definition.arguments)
            key = (predicate, arity)
            if key in seen:
                continue
            seen.add(key)
            if arity == 0:
                declarations.append(f"{predicate} :- fail.")
                continue
            args = ", ".join("_" for _ in range(arity))
            declarations.append(f"{predicate}({args}) :- fail.")
        return declarations

    def _sentence_to_problog(self, sentence: Sentence, prolog_config: PrologConfig) -> str:
        if isinstance(sentence, Forall):
            if isinstance(sentence.sentence, Term):
                if sentence.sentence.predicate == "eq":
                    vals = sentence.sentence.values
                    if len(vals) != 2:
                        raise ValueError(f"Invalid equality sentence: {sentence}")
                    first = vals[0]
                    if isinstance(first, Term) and first.predicate == "probability":
                        inner_expr = first.values[0]
                        pr = vals[1]
                        return self._sentence_to_problog(
                            Term(Probability.__name__, pr, That(inner_expr).to_model_object()), prolog_config
                        )

        def _to_rules(s: Sentence) -> List[Sentence]:
            rules = []
            try:
                for rule in to_horn_rules(s, allow_disjunctions_in_head=False, allow_goal_clauses=True):
                    rewritten_rule = self._comparison_head_to_constraint(rule)
                    if rewritten_rule is None:
                        continue
                    rules.append(rewritten_rule)
            except NotInProfileError as e:
                logger.info(f"Skipping sentence {s} due to {e}")
            return rules

        pr_sent = self._sentence_probability(sentence)
        if pr_sent:
            pr, inner = pr_sent
            inner_rules = _to_rules(inner)
            strs = []
            for r in inner_rules:
                rendered = self._rule_to_problog(r, prolog_config)
                if rendered:
                    strs.append(f"{pr}::{rendered}")
            return "\n".join(strs)
        elif isinstance(sentence, Term) and sentence.predicate == Evidence.__name__:
            # special treatment for evidence sentences
            if len(sentence.values) != 2:
                raise ValueError(f"Invalid evidence sentence: {sentence}")
            inner = sentence.values[0]
            inner_prolog = as_prolog(inner, config=prolog_config, strict=True)
            if not inner_prolog:
                raise ValueError(f"Invalid evidence inner sentence: {inner}")
            truth_value = sentence.values[1]
            return f"evidence({inner_prolog}, {'true' if truth_value else 'false'})."
        else:
            rules = _to_rules(sentence)
            return "\n".join([rendered for r in rules if (rendered := self._rule_to_problog(r, prolog_config))])

    def _sentence_probability(self, sentence: Sentence) -> Optional[Tuple[Union[float, int], Sentence]]:
        if isinstance(sentence, Forall):
            return self._sentence_probability(sentence.sentence)
        if isinstance(sentence, Term):
            if sentence.predicate == PROBABILITY_PREDICATE:
                if len(sentence.values) != 2:
                    raise ValueError(f"Invalid probability sentence: {sentence}")
                pr = sentence.values[0]
                inner = sentence.values[1]
                if not isinstance(pr, (float, int)):
                    raise ValueError(f"Invalid probability: {pr}")
                if not isinstance(inner, Sentence):
                    raise ValueError(f"Invalid inner sentence: {inner}")
                if isinstance(inner, Extension):
                    inner = inner.to_model_object()
                if not isinstance(inner, Term):
                    raise ValueError(f"Invalid inner term: {inner}")
                if inner.predicate != THAT_PREDICATE:
                    raise ValueError(f"Invalid inner predicate: {inner.predicate}")
                inner_ref = inner.values[0]
                if not isinstance(inner_ref, Sentence):
                    raise ValueError(f"Invalid inner reference: {inner_ref}")
                return pr, inner_ref
        return None

    @staticmethod
    def _comparison_head_to_constraint(rule: Sentence) -> Optional[Sentence]:
        """Convert implication heads like ``X != Y`` into ProbLog constraints."""
        if not isinstance(rule, Implies):
            return rule
        head = rule.consequent
        if not isinstance(head, Term) or head.predicate not in NAME_TO_INFIX_OP:
            return rule
        negated_predicate = NEGATED_COMPARISON_PREDICATES.get(head.predicate)
        if not negated_predicate:
            logger.info(f"Skipping rule with built-in predicate in head: {rule}")
            return None
        negated_comparison = Term(negated_predicate, *head.values)
        return Implies(And(rule.antecedent, negated_comparison), Or())

    @staticmethod
    def _rule_to_problog(rule: Sentence, prolog_config: PrologConfig) -> str:
        """Render one rule, encoding goal clauses as ProbLog evidence."""
        if isinstance(rule, Implies) and isinstance(rule.consequent, Or) and not rule.consequent.operands:
            violation = Implies(rule.antecedent, Term(CONSTRAINT_VIOLATION_PREDICATE))
            return (
                f"{as_prolog(violation, config=prolog_config, strict=False)}\n"
                f"evidence({CONSTRAINT_VIOLATION_PREDICATE}, false)."
            )
        return as_prolog(rule, config=prolog_config, strict=False)

    def decompile_term(self, compiled_term: Any) -> Term:
        if isinstance(compiled_term, pl.Term):
            pms = self._predicate_mappings or {}
            functor = pms.get(compiled_term.functor, compiled_term.functor)

            def _decompile_const(a: Any) -> Any:
                if isinstance(a, pl.Constant):
                    v = a.value
                    if isinstance(v, str):
                        if v.startswith('"') and v.endswith('"'):
                            v = v[1:-1]
                    return v
                return str(a)

            vals = [_decompile_const(a) for a in compiled_term.args]
            plt_term = Term(functor, *vals)
            return plt_term
        raise ValueError(f"Expected a Prolog term, got {compiled_term}")

compile(theory, syntax=None, **kwargs)

Compile a Theory object into ProbLog code.

Example:

>>> from typedlogic import SentenceGroup, PredicateDefinition, Forall, Variable, Theory
>>> x = Variable('x')
>>> y = Variable('y')
>>> z = Variable('z')
>>> theory = Theory(
...     predicate_definitions=[PredicateDefinition(predicate="AncestorOf", arguments={'ancestor': 'str', 'descendant': 'str'})],
...     ground_terms=[Term('AncestorOf', 'p1', 'p1a'), Term('AncestorOf', 'p1a', 'p1aa')],
... )
>>> theory.add(Forall([x, y, z], (Term('AncestorOf', x, y) & Term('AncestorOf', y, z)) >> Term('AncestorOf', x, z)))
>>> theory.add(Probability(0.5, That(Term('AncestorOf', 'p1', 'p1a'))))
>>> theory.add(Probability(0.5, That(Term('AncestorOf', 'p1a', 'p1aa'))))
>>> compiler = ProbLogCompiler()
>>> print(compiler.compile(theory))
ancestorof(X, Z) :- ancestorof(X, Y), ancestorof(Y, Z).
0.5::ancestorof("p1", "p1a").
0.5::ancestorof("p1a", "p1aa").
ancestorof("p1", "p1a").
ancestorof("p1a", "p1aa").
query(ancestorof(Ancestor, Descendant)).

Note like most compilers, you don't need to use this directly. It is more common to use ProblogSolver, which takes care of compiling the problog program, feeding it to problog, and parsing results.

Parameters:

Name Type Description Default
theory Theory
required
syntax Optional[Union[str, ModelSyntax]]
None
kwargs
{}

Returns:

Type Description
str
Source code in src/typedlogic/integrations/solvers/problog/problog_compiler.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def compile(self, theory: Theory, syntax: Optional[Union[str, ModelSyntax]] = None, **kwargs) -> str:
    """
    Compile a Theory object into ProbLog code.

    Example:

        >>> from typedlogic import SentenceGroup, PredicateDefinition, Forall, Variable, Theory
        >>> x = Variable('x')
        >>> y = Variable('y')
        >>> z = Variable('z')
        >>> theory = Theory(
        ...     predicate_definitions=[PredicateDefinition(predicate="AncestorOf", arguments={'ancestor': 'str', 'descendant': 'str'})],
        ...     ground_terms=[Term('AncestorOf', 'p1', 'p1a'), Term('AncestorOf', 'p1a', 'p1aa')],
        ... )
        >>> theory.add(Forall([x, y, z], (Term('AncestorOf', x, y) & Term('AncestorOf', y, z)) >> Term('AncestorOf', x, z)))
        >>> theory.add(Probability(0.5, That(Term('AncestorOf', 'p1', 'p1a'))))
        >>> theory.add(Probability(0.5, That(Term('AncestorOf', 'p1a', 'p1aa'))))
        >>> compiler = ProbLogCompiler()
        >>> print(compiler.compile(theory))
        ancestorof(X, Z) :- ancestorof(X, Y), ancestorof(Y, Z).
        0.5::ancestorof("p1", "p1a").
        0.5::ancestorof("p1a", "p1aa").
        ancestorof("p1", "p1a").
        ancestorof("p1a", "p1aa").
        query(ancestorof(Ancestor, Descendant)).

    Note like most compilers, you don't need to use this directly. It is more common to use ProblogSolver, which takes care of
    compiling the problog program, feeding it to problog, and parsing results.

    :param theory:
    :param syntax:
    :param kwargs:
    :return:
    """
    include_queries = kwargs.get("include_queries", True)
    prolog_config = self.prolog_config()
    if not self._predicate_mappings:
        self._predicate_mappings = {}
    for pd in theory.predicate_definitions:
        prolog_pd = as_prolog(Term(pd.predicate), config=prolog_config)
        if "(" in prolog_pd:
            prolog_pd = prolog_pd[: prolog_pd.index("(")]
        self._predicate_mappings[prolog_pd] = pd.predicate
    clauses = []
    clauses.extend(self.false_predicate_declarations(theory, prolog_config))
    for sentence in theory.sentences + theory.ground_terms:
        clause = self._sentence_to_problog(sentence, prolog_config)
        if clause:
            clauses.append(clause)
    if include_queries:
        for pd in theory.predicate_definitions:
            if pd.predicate in [Probability.__name__, That.__name__, Evidence.__name__]:
                continue
            query_vars = [Variable(a) for a in pd.arguments.keys()]
            term = Term("query", Term(pd.predicate, *query_vars))
            clause = self._sentence_to_problog(term, prolog_config)
            clauses.append(clause)
    return "\n".join(clauses)

prolog_config() staticmethod

Return Prolog rendering configuration for ProbLog syntax.

Source code in src/typedlogic/integrations/solvers/problog/problog_compiler.py
 95
 96
 97
 98
 99
100
101
102
103
104
@staticmethod
def prolog_config() -> PrologConfig:
    """Return Prolog rendering configuration for ProbLog syntax."""
    return PrologConfig(
        disjunctive_datalog=True,
        double_quote_strings=True,
        operator_map={"eq": "=", "ne": r"\="},
        allow_nesting=False,
        allow_ungrounded_vars_in_head=True,
    )

false_predicate_declarations(theory, prolog_config) staticmethod

Return false clauses for declared predicates that may otherwise be empty.

Source code in src/typedlogic/integrations/solvers/problog/problog_compiler.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@staticmethod
def false_predicate_declarations(theory: Theory, prolog_config: PrologConfig) -> List[str]:
    """Return false clauses for declared predicates that may otherwise be empty."""
    declarations = []
    seen: set[tuple[str, int]] = set()
    for predicate_definition in theory.predicate_definitions:
        if predicate_definition.predicate in [Probability.__name__, That.__name__, Evidence.__name__]:
            continue
        predicate = as_prolog(Term(predicate_definition.predicate), config=prolog_config)
        if "(" in predicate:
            predicate = predicate[: predicate.index("(")]
        arity = len(predicate_definition.arguments)
        key = (predicate, arity)
        if key in seen:
            continue
        seen.add(key)
        if arity == 0:
            declarations.append(f"{predicate} :- fail.")
            continue
        args = ", ".join("_" for _ in range(arity))
        declarations.append(f"{predicate}({args}) :- fail.")
    return declarations