import pandas as pd
import numpy as np
import itertools
from typing import Union, Iterator, List, Callable, Sequence, Optional
from txgraffiti.logic import *
from txgraffiti.playground.registry import list_playgrounds
from txgraffiti.processing.registry import get_post
from txgraffiti.export_utils.lean4 import conjecture_to_lean4
from txgraffiti.generators import list_gens
__all__ = [
'ForAll',
'Exists',
'ConjecturePlayground',
]
from collections import defaultdict
def find_strengthened_equalities(conjs):
"""
Given a list of Conjecture, return new Conjecture of the form
(H1 ∧ H2) → (LHS == RHS)
whenever you have both
H1 → (LHS <= RHS)
and
H2 → (LHS >= RHS)
with identical LHS and identical RHS.name.
"""
# bucket: (lhs_prop, rhs_name) → { op_symbol: hypothesis_predicate }
buckets = defaultdict(dict)
for c in conjs:
ineq = c.conclusion
if not isinstance(ineq, Inequality):
continue
key = (ineq.lhs, ineq.rhs.name)
buckets[key][ineq.op] = c.hypothesis
equalities = []
for (lhs_prop, rhs_name), op2hyp in buckets.items():
if "<=" in op2hyp and ">=" in op2hyp:
H1 = op2hyp["<="]
H2 = op2hyp[">="]
# build the joint hypothesis
H = H1 & H2
# build equality Predicate: lhs == rhs
eq_pred = lhs_prop == Constant(0) # dummy, we'll override
# better: directly use Inequality
eq_pred = Inequality(lhs_prop, "==",
Property(rhs_name,
lambda df, p=rhs_name: pd.Series(
df.eval(p), index=df.index)))
# but easier: reuse one of the originals:
# pick the <= version to supply the rhs Property
any_le_conj = next(c for c in conjs
if isinstance(c.conclusion, Inequality)
and c.hypothesis is H1
and c.conclusion.op in ("<=", "<"))
rhs_prop = any_le_conj.conclusion.rhs
eq_pred = Inequality(lhs_prop, "==", rhs_prop)
# and finally the new Conjecture
equalities.append(Conjecture(H, eq_pred))
return equalities
[docs]
class ForAll:
def __init__(self, conj: Predicate, df: pd.DataFrame, object_symbol="G"):
self.pred = conj; self.df = df
self.name = f"∀ {object_symbol}: {conj.name}"
[docs]
def is_true(self) -> bool:
return bool(self.pred(self.df).all())
[docs]
def counterexamples(self) -> pd.DataFrame:
return self.conj.counterexamples(self.df)
def __repr__(self):
return self.name
[docs]
class Exists:
def __init__(self, pred: Predicate, df: pd.DataFrame, object_symbol="G"):
self.pred = pred; self.df = df
self.name = f"∃ {object_symbol}: {pred.name}"
[docs]
def is_true(self) -> bool:
return bool(self.pred(self.df).any())
[docs]
def witness(self) -> pd.DataFrame:
return self.df[self.pred(self.df)]
def __repr__(self):
return self.name
[docs]
class ConjecturePlayground:
def __init__(self, df: pd.DataFrame, object_symbol="G", base: Optional[Union[str, Predicate]] = None):
self.df = df
self.conjectures: list[Conjecture] = [] # cache slot
self.object_symbol = object_symbol
# Set up the base predicate
if base is None:
# default to “always true”
self.base: Predicate = TRUE
elif isinstance(base, Predicate):
self.base = base
elif isinstance(base, str):
# lift via getattr; assumes you have a boolean column or existing predicate
self.base = getattr(self, base)
else:
raise TypeError("`base` must be None, a column-name (str), or a Predicate")
self.conjectures: List[Conjecture] = []
self.custom_hypotheses: dict[str, Predicate] = {}
[docs]
def prop(self, name: str) -> Property:
return Property(name, lambda df, c=name: df[c])
def __getattr__(self, attr):
if attr in self.df.columns:
col = self.df[attr]
if pd.api.types.is_bool_dtype(col):
return Predicate(attr, lambda df, c=attr: df[c])
return self.prop(attr)
return super().__getattribute__(attr)
[docs]
def forall(self, pred: Predicate) -> bool:
# return bool(pred(self.df).all())
return ForAll(pred, self.df, object_symbol=self.object_symbol)
[docs]
def exists(self, pred: Predicate) -> bool:
# return bool(pred(self.df).any())
return Exists(pred, self.df, object_symbol=self.object_symbol)
[docs]
def generate(
self,
*,
methods: List[Callable[..., Iterator[Conjecture]]] = None,
features: Optional[List[Union[str, Property]]] = None,
target: Optional[Union[str, Property]] = None,
hypothesis: Optional[
Union[str, Predicate, Sequence[Union[str, Predicate]]]
] = None,
heuristics: Optional[List[Callable[[Conjecture, List[Conjecture], pd.DataFrame], bool]]] = None,
post_processors: Optional[List[Union[str, Callable[[List[Conjecture], pd.DataFrame], List[Conjecture]]]]] = None,
**kwargs
) -> Iterator[Conjecture]:
gens = methods or list_playgrounds()
# Build hyp_list by conjoining each user hypothesis with self.base
if hypothesis is None:
hyp_list = [self.base]
elif isinstance(hypothesis, (list, tuple)):
lifted = [
(getattr(self, h) if isinstance(h, str) else h)
for h in hypothesis
]
hyp_list = [self.base & h for h in lifted]
else:
h = (getattr(self, hypothesis) if isinstance(hypothesis, str)
else hypothesis)
hyp_list = [self.base & h]
# Lift features & target
feat_list = [] if features is None else features
targ_arg = None if target is None else target
feat_props = [
(self.prop(f) if isinstance(f, str) else f)
for f in feat_list
]
targ_prop = (self.prop(targ_arg) if isinstance(targ_arg, str) else targ_arg)
# 4) raw iterator
def raw_all():
for gen_fn in gens:
for hyp in hyp_list:
yield from gen_fn(
self.df,
features = feat_props,
target = targ_prop,
hypothesis = hyp,
**kwargs
)
# … + heuristics & post‐processing as before …
stream = raw_all()
# ———————————————————————————————
# 2) apply heuristics into a new iterator
# ———————————————————————————————
if heuristics:
kept: List[Conjecture] = []
def filtered() -> Iterator[Conjecture]:
for conj in raw_all():
if all(h(conj, kept, self.df) for h in heuristics):
kept.append(conj)
yield conj
stream = filtered()
else:
stream = raw_all()
# ———————————————————————————————
# 3) post-processing if requested
# ———————————————————————————————
if post_processors:
# collect into list
all_kept = list(stream)
# apply each post-processor in order
for proc in post_processors:
fn = get_post(proc) if isinstance(proc, str) else proc
all_kept = fn(all_kept, self.df)
# yield final
for c in all_kept:
yield c
else:
# no post-processing → just stream
yield from stream
[docs]
def discover(self, *args, **kwargs) -> list[Conjecture]:
"""
Run generate(...) with the given parameters, cache, and return the list.
"""
conjs = list(self.generate(*args, **kwargs))
self.conjectures = find_strengthened_equalities(conjs)
self.conjectures.extend(conjs)
return self.conjectures
[docs]
def discover_equalities(
self,
*,
generators: list[Callable[..., Iterator[Conjecture]]] = None,
min_fraction: float = 0.15
) -> list[Predicate]:
"""
Find all single‐feature inequalities of the form L≤R or L≥R
(with hypothesis TRUE) that are *tight* (slack==0) on at least
`min_fraction` of the rows. For each, register & return
a Predicate `L_eq_R`.
"""
df = self.df
gens = generators or list_gens()
N = len(df)
found: list[tuple[Predicate, float]] = []
# 1) numeric columns as Properties
num_cols = df.select_dtypes(include=[np.number]).columns
props = {c: self.prop(c) for c in num_cols}
# 2) for each generator × each (P,Q) pair
for gen_fn in gens:
for P_name, Q_name in itertools.permutations(num_cols, 2):
P, Q = props[P_name], props[Q_name]
# run under TRUE, but safely
try:
conj_iter = gen_fn(
df,
features = [P],
target = Q,
hypothesis = TRUE
)
except Exception:
# skip any generator that errors out on this (P,Q)
continue
for conj in conj_iter:
if not isinstance(conj.conclusion, Inequality):
continue
ineq = conj.conclusion
support = ineq.touch_count(df) / N
if support < min_fraction:
continue
L, R = ineq.lhs, ineq.rhs
pred = L == R
if pred.name not in self.custom_hypotheses:
self.custom_hypotheses[pred.name] = pred
found.append((pred, support))
# 3) return just the Predicates, sorted by descending support
found.sort(key=lambda pr: pr[1], reverse=True)
return [pr[0] for pr in found]
[docs]
def append_row(self, row: dict):
"""
Append a new row (dict of col:value) to self.df,
then clear any cached conjectures.
"""
# Append in-place
self.df.loc[len(self.df)] = row
# Clear cached results
self.conjectures = []
[docs]
def reset(self, new_df: pd.DataFrame = None):
"""
If new_df is given, replace self.df; otherwise rebind the same DataFrame
(e.g. after in-place edits). In either case, clear cached conjectures.
"""
if new_df is not None:
self.df = new_df
# Invalidate cache
self.conjectures = []
[docs]
def export_conjectures(self, path: str, format: str = "json"):
"""
Write self.conjectures to disk in JSON or CSV form.
"""
if not hasattr(self, "conjectures"):
raise RuntimeError("No conjectures to export; run discover() first.")
rows = []
for c in self.conjectures:
rows.append({
"hypothesis": c.hypothesis.name,
"conclusion": c.conclusion.name,
"accuracy": c.accuracy(self.df),
# you could even serialize masks or touch_counts
})
dfc = pd.DataFrame(rows)
if format == "json":
dfc.to_json(path, orient="records", lines=True)
else:
dfc.to_csv(path, index=False)
[docs]
def convert_columns(self, convert_dict : dict):
for key, func in convert_dict.items():
self.df[key] = self.df[key].map(func)
[docs]
def export_to_lean(
self,
path: str,
name_prefix: str = "conjecture",
object_symbol: Optional[str] = None
):
"""
Write all cached conjectures to `path` in Lean theorem‐stub format.
Each theorem will be named `{name_prefix}_{i}`.
"""
symbol = object_symbol or self.object_symbol
lines = []
for i, conj in enumerate(self.conjectures, start=1):
thm_name = f"{name_prefix}_{i}"
# render using your helper, passing through the object symbol
src = conjecture_to_lean4(conj, thm_name, object_symbol=symbol)
lines.append(src)
# Write out to file
with open(path, "w") as f:
f.write("\n".join(lines))
print(f"Wrote {len(lines)} Lean theorems to {path}")