22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166 | class ProbLogCompiler(Compiler):
default_suffix: ClassVar[str] = "problog"
_predicate_mappings: Optional[Dict[str, str]] = None
def compile(self, theory: Theory, syntax: Optional[Union[str, ModelSyntax]] = None, **kwargs) -> str:
"""
Compile a Theory object into ProbLog code.
Example:
>>> from typedlogic import SentenceGroup, PredicateDefinition, Forall, Variable, Theory
>>> x = Variable('x')
>>> y = Variable('y')
>>> z = Variable('z')
>>> theory = Theory(
... predicate_definitions=[PredicateDefinition(predicate="AncestorOf", arguments={'ancestor': 'str', 'descendant': 'str'})],
... ground_terms=[Term('AncestorOf', 'p1', 'p1a'), Term('AncestorOf', 'p1a', 'p1aa')],
... )
>>> theory.add(Forall([x, y, z], (Term('AncestorOf', x, y) & Term('AncestorOf', y, z)) >> Term('AncestorOf', x, z)))
>>> theory.add(Probability(0.5, That(Term('AncestorOf', 'p1', 'p1a'))))
>>> theory.add(Probability(0.5, That(Term('AncestorOf', 'p1a', 'p1aa'))))
>>> compiler = ProbLogCompiler()
>>> print(compiler.compile(theory))
ancestorof(X, Z) :- ancestorof(X, Y), ancestorof(Y, Z).
0.5::ancestorof("p1", "p1a").
0.5::ancestorof("p1a", "p1aa").
ancestorof("p1", "p1a").
ancestorof("p1a", "p1aa").
query(ancestorof(Ancestor, Descendant)).
:param theory:
:param syntax:
:param kwargs:
:return:
"""
prolog_config = PrologConfig(disjunctive_datalog=True, double_quote_strings=True, allow_nesting=False)
if not self._predicate_mappings:
self._predicate_mappings = {}
for pd in theory.predicate_definitions:
prolog_pd = as_prolog(Term(pd.predicate), config=prolog_config)
if "(" in prolog_pd:
prolog_pd = prolog_pd[: prolog_pd.index("(")]
self._predicate_mappings[prolog_pd] = pd.predicate
clauses = []
for sentence in theory.sentences + theory.ground_terms:
clause = self._sentence_to_problog(sentence, prolog_config)
if clause:
clauses.append(clause)
for pd in theory.predicate_definitions:
if pd.predicate in [Probability.__name__, That.__name__, Evidence.__name__]:
continue
query_vars = [Variable(a) for a in pd.arguments.keys()]
term = Term("query", Term(pd.predicate, *query_vars))
clause = self._sentence_to_problog(term, prolog_config)
clauses.append(clause)
return "\n".join(clauses)
def _sentence_to_problog(self, sentence: Sentence, prolog_config: PrologConfig) -> str:
if isinstance(sentence, Forall):
if isinstance(sentence.sentence, Term):
if sentence.sentence.predicate == "eq":
vals = sentence.sentence.values
if len(vals) != 2:
raise ValueError(f"Invalid equality sentence: {sentence}")
first = vals[0]
if isinstance(first, Term) and first.predicate == "probability":
inner_expr = first.values[0]
pr = vals[1]
return self._sentence_to_problog(
Term(Probability.__name__, pr, That(inner_expr).to_model_object()), prolog_config
)
def _to_rules(s: Sentence) -> List[Sentence]:
rules = []
try:
for rule in to_horn_rules(s, allow_disjunctions_in_head=True, allow_goal_clauses=True):
rules.append(rule)
except NotInProfileError as e:
logger.info(f"Skipping sentence {s} due to {e}")
return rules
pr_sent = self._sentence_probability(sentence)
if pr_sent:
pr, inner = pr_sent
inner_rules = _to_rules(inner)
strs = []
for r in inner_rules:
strs.append(f"{pr}::{as_prolog(r, config=prolog_config)}")
return "\n".join(strs)
elif isinstance(sentence, Term) and sentence.predicate == Evidence.__name__:
if len(sentence.values) != 2:
raise ValueError(f"Invalid evidence sentence: {sentence}")
inner = sentence.values[0]
inner_prolog = as_prolog(inner, config=prolog_config, strict=True)
if not inner_prolog:
raise ValueError(f"Invalid evidence inner sentence: {inner}")
truth_value = sentence.values[1]
return f"evidence({inner_prolog}, {'true' if truth_value else 'false'})."
else:
rules = _to_rules(sentence)
return "\n".join([as_prolog(r, config=prolog_config, strict=False) for r in rules])
def _sentence_probability(self, sentence: Sentence) -> Optional[Tuple[Union[float, int], Sentence]]:
if isinstance(sentence, Forall):
return self._sentence_probability(sentence.sentence)
if isinstance(sentence, Term):
if sentence.predicate == PROBABILITY_PREDICATE:
if len(sentence.values) != 2:
raise ValueError(f"Invalid probability sentence: {sentence}")
pr = sentence.values[0]
inner = sentence.values[1]
if not isinstance(pr, (float, int)):
raise ValueError(f"Invalid probability: {pr}")
if not isinstance(inner, Sentence):
raise ValueError(f"Invalid inner sentence: {inner}")
if isinstance(inner, Extension):
inner = inner.to_model_object()
if not isinstance(inner, Term):
raise ValueError(f"Invalid inner term: {inner}")
if inner.predicate != THAT_PREDICATE:
raise ValueError(f"Invalid inner predicate: {inner.predicate}")
inner_ref = inner.values[0]
if not isinstance(inner_ref, Sentence):
raise ValueError(f"Invalid inner reference: {inner_ref}")
return pr, inner_ref
return None
def decompile_term(self, compiled_term: Any) -> Term:
if isinstance(compiled_term, pl.Term):
pms = self._predicate_mappings or {}
functor = pms.get(compiled_term.functor, compiled_term.functor)
def _decompile_const(a: Any) -> Any:
if isinstance(a, pl.Constant):
v = a.value
if isinstance(v, str):
if v.startswith('"') and v.endswith('"'):
v = v[1:-1]
return v
return str(a)
vals = [_decompile_const(a) for a in compiled_term.args]
plt_term = Term(functor, *vals)
return plt_term
raise ValueError(f"Expected a Prolog term, got {compiled_term}")
|