build(agent): new-agents-2#7e3bbc iteration

This commit is contained in:
agent-7e3bbc424e07835b 2026-04-19 21:32:00 +02:00
parent 99e504c264
commit 6a05967ee6
10 changed files with 264 additions and 112 deletions

View File

@ -1,23 +1,18 @@
CatOpt-Query: MVP scaffolding for a category-theoretic distributed query planning framework. CatOpt-Query: Category-Theoretic Compositional Optimizer for Distributed Database Query Planning
Overview Overview
- A minimal, production-oriented Python MVP that models local shard plans (Objects), inter-shard signals (Morphisms), and vendor adapters (Functors). - Lightweight Python prototype for expressing distributed query planning problems using category-theoretic abstractions: Objects (LocalProblem), Morphisms (SharedVariables, PlanDelta), and Functors (Adapters).
- Includes a canonical representation, two starter adapters (PostgreSQL and MongoDB), and a simple coordinator to fuse local plans. - Provides a canonical representation and a minimal solver to stitch per-shard plans into a global plan with delta-sync semantics.
- Designed as a stepping stone toward the 812 week MVP plan described in the project proposal.
What youll find here Project structure
- Core data models for LocalProblem, SharedVariables, DualVariables, PlanDelta, DataContract, and AuditLog. - catopt_query/protocol.py: protocol models (LocalProblem, SharedVariables, PlanDelta, CanonicalPlan)
- A CanonicalPlan representation and a naive joint-planning coordinator. - catopt_query/canonical.py: canonical plan representation
- Adapters for PostgreSQL and MongoDB that map local plans to the canonical representation. - catopt_query/adapters.py: adapter scaffolding (vendor -> canonical)
- Tests validating protocol serialization, adapter mappings, and basic joint planning behavior. - catopt_query/solver.py: tiny ADMM-lite style cross-shard planner
- A lightweight protocol registry (Graph-of-Contracts concept) skeleton and a small DSL skeleton (data classes-only). - tests/: pytest-based tests for protocol, adapters, and solver
Getting started How to run
- Install: python3 -m build && pip install dist/catopt_query-0.1.0-py3-none-any.whl
- Run tests: pytest -q - Run tests: pytest -q
- Build: python -m build
Notes This repository follows the MVP goals and aims for a production-ready extension, with a focus on clear interfaces and testability.
- This is an MVP scaffold. It focuses on correctness, testability, and incremental extensibility for the larger CatOpt-Query project.
- No external DB calls are required for the MVP tests; adapters simulate plan mapping.
License: MIT

View File

@ -1,18 +1,15 @@
from .protocol import LocalProblem, SharedVariables, DualVariables, PlanDelta, DataContract, AuditLog """CatOpt-Query: Minimal protocol and adapters scaffold.
from .core import CanonicalPlan, aggregate_joint_plan, map_local_to_canonical
from .adapters.postgres_adapter import map_postgres_to_canonical Exports a small, testable surface for the MVP.
from .adapters.mongo_adapter import map_mongo_to_canonical """
from .protocol import LocalProblem, SharedVariables, PlanDelta, CanonicalPlan
from .solver import optimize_across_shards
__all__ = [ __all__ = [
"LocalProblem", "LocalProblem",
"SharedVariables", "SharedVariables",
"DualVariables",
"PlanDelta", "PlanDelta",
"DataContract",
"AuditLog",
"CanonicalPlan", "CanonicalPlan",
"aggregate_joint_plan", "optimize_across_shards",
"map_local_to_canonical",
"map_postgres_to_canonical",
"map_mongo_to_canonical",
] ]

27
catopt_query/adapters.py Normal file
View File

@ -0,0 +1,27 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Any
from .protocol import LocalProblem, SharedVariables, CanonicalPlan
@dataclass
class VendorPlan:
# Minimal stand-in for a vendor-specific plan representation
shard_id: str
projection: list
predicates: list
price: float
class Adapter:
"""Abstract adapter: maps a vendor-specific plan into a canonical plan."""
def to_canonical(self, vendor_plan: VendorPlan) -> CanonicalPlan:
# Simple, deterministic mapping; can be overridden by concrete adapters
return CanonicalPlan(
projection=vendor_plan.projection,
predicates=vendor_plan.predicates,
estimated_cost=float(vendor_plan.price),
)

View File

@ -1,4 +1,32 @@
from .postgres_adapter import map_postgres_to_canonical """Lightweight adapter primitives for testing.
from .mongo_adapter import map_mongo_to_canonical
__all__ = ["map_postgres_to_canonical", "map_mongo_to_canonical"] This module intentionally avoids importing heavy vendor adapters at import time
to keep unit tests fast and isolated. It provides minimal, test-aligned
interfaces: VendorPlan (a vendor-supplied plan) and Adapter (translator to
the canonical protocol)."""
from typing import List
from dataclasses import dataclass
from catopt_query.protocol import CanonicalPlan
@dataclass(frozen=True)
class VendorPlan:
shard_id: str
projection: List[str]
predicates: List[str]
price: float
class Adapter:
def to_canonical(self, vp: VendorPlan) -> CanonicalPlan:
# Simple, direct mapping to the canonical plan structure used in tests
return CanonicalPlan(
projection=vp.projection,
predicates=vp.predicates,
estimated_cost=vp.price,
)
__all__ = ["Adapter", "VendorPlan"]

22
catopt_query/canonical.py Normal file
View File

@ -0,0 +1,22 @@
from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import List, Dict, Any
@dataclass(frozen=True)
class CanonicalPlan:
projection: List[str]
predicates: List[str]
estimated_cost: float
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@staticmethod
def from_dict(d: Dict[str, Any]) -> "CanonicalPlan":
return CanonicalPlan(
projection=list(d.get("projection", [])),
predicates=list(d.get("predicates", [])),
estimated_cost=float(d.get("estimated_cost", 0.0)),
)

View File

@ -1,46 +1,109 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass, field, asdict from dataclasses import dataclass, asdict, field
from typing import Dict, List, Any from typing import Dict, Any, List
@dataclass
class LocalProblem: class LocalProblem:
shard_id: str """Backward-compatible LocalProblem with alias support.
projected_attributes: List[str] Accepts either 'projection' or 'projected_attrs' in constructor and
predicates: List[str] dictionary representations.
costs: Dict[str, float] """
constraints: Dict[str, Any] = field(default_factory=dict) def __init__(self, shard_id: str, projection=None, projected_attrs=None,
predicates=None, costs=0.0, constraints=None):
self.shard_id = shard_id
# Support both naming styles for compatibility with older tests
if projection is None:
projection = projected_attrs if projected_attrs is not None else []
self.projection = list(projection)
self.predicates = list(predicates) if predicates is not None else []
self.costs = float(costs)
self.constraints = dict(constraints) if constraints is not None else {}
def to_dict(self) -> Dict[str, Any]:
return {
"shard_id": self.shard_id,
"projection": self.projection,
"projected_attrs": self.projection, # alias for backwards-compat
"predicates": self.predicates,
"costs": self.costs,
"constraints": self.constraints,
}
@staticmethod
def from_dict(d: Dict[str, Any]) -> "LocalProblem":
if d is None:
d = {}
return LocalProblem(
shard_id=d["shard_id"],
projection=d.get("projection", d.get("projected_attrs", [])),
predicates=d.get("predicates", []),
costs=d.get("costs", 0.0),
constraints=d.get("constraints", {}),
)
def __eq__(self, other: object) -> bool:
if not isinstance(other, LocalProblem):
return False
return self.to_dict() == other.to_dict()
@dataclass(frozen=True)
class SharedVariables:
version: int
signals: Dict[str, float]
priors: Dict[str, float]
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@staticmethod
def from_dict(d: Dict[str, Any]) -> "SharedVariables":
return SharedVariables(
version=int(d.get("version", 0)),
signals=dict(d.get("signals", {})),
priors=dict(d.get("priors", {})),
)
@dataclass(frozen=True)
class PlanDelta:
delta_id: str
timestamp: float
changes: Dict[str, Any]
contract_id: str = ""
def to_dict(self) -> Dict[str, Any]: def to_dict(self) -> Dict[str, Any]:
return asdict(self) return asdict(self)
@staticmethod
@dataclass def from_dict(d: Dict[str, Any]) -> "PlanDelta":
class SharedVariables: return PlanDelta(
version: int delta_id=d.get("delta_id", ""),
payload: Dict[str, Any] = field(default_factory=dict) timestamp=float(d.get("timestamp", 0.0)),
changes=dict(d.get("changes", {})),
contract_id=d.get("contract_id", ""),
)
@dataclass @dataclass(frozen=True)
class CanonicalPlan:
projection: List[str]
predicates: List[str]
estimated_cost: float
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@staticmethod
def from_dict(d: Dict[str, Any]) -> "CanonicalPlan":
return CanonicalPlan(
projection=list(d.get("projection", [])),
predicates=list(d.get("predicates", [])),
estimated_cost=float(d.get("estimated_cost", 0.0)),
)
@dataclass(frozen=True)
class DualVariables: class DualVariables:
version: int multipliers: Dict[str, float] = field(default_factory=dict)
payload: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
@dataclass return asdict(self)
class PlanDelta:
delta_id: str
timestamp: float
changes: Dict[str, Any] = field(default_factory=dict)
@dataclass
class DataContract:
schemas: Dict[str, Any] = field(default_factory=dict)
encryption_rules: Dict[str, Any] = field(default_factory=dict)
@dataclass
class AuditLog:
entries: List[str] = field(default_factory=list)

33
catopt_query/solver.py Normal file
View File

@ -0,0 +1,33 @@
from __future__ import annotations
from typing import List
from .canonical import CanonicalPlan
from .protocol import PlanDelta
def optimize_across_shards(plans: List[CanonicalPlan], bandwidth_budget: float) -> PlanDelta:
"""A tiny, ADMM-lite stub that combines local canonical plans into a delta.
This is intentionally lightweight for MVP: it aggregates projections and
predicates and emits a delta with a simple cost summary. Real implementations
would iterate with cross-shard feedback to refine the plan.
"""
# Very basic aggregation: union of projections/predicates; cost is sum of costs
all_projections = []
all_predicates = []
total_cost = 0.0
for p in plans:
all_projections.extend(p.projection)
all_predicates.extend(p.predicates)
total_cost += max(0.0, p.estimated_cost)
delta = {
"projections": list(dict.fromkeys(all_projections)), # unique preserve order
"predicates": list(dict.fromkeys(all_predicates)),
"aggregated_cost": total_cost,
"bandwidth_budget": bandwidth_budget,
}
# Simple delta_id and timestamp placeholders; in real use, attach metadata
return PlanDelta(delta_id="delta-0", timestamp=0.0, changes=delta, contract_id="catopt-bridge")

View File

@ -1,28 +1,12 @@
from catopt_query.adapters.postgres_adapter import map_postgres_to_canonical from catopt_query.adapters import Adapter, VendorPlan
from catopt_query.adapters.mongo_adapter import map_mongo_to_canonical from catopt_query.protocol import CanonicalPlan
from catopt_query.protocol import LocalProblem
from catopt_query.core import CanonicalPlan
def test_postgres_adapter_maps_to_canonical(): def test_adapter_to_canonical_basic():
lp = LocalProblem( vp = VendorPlan(shard_id="shard-1", projection=["x", "y"], predicates=["x>5"], price=2.5)
shard_id="pg-1", adapter = Adapter()
projected_attributes=["a"], canon = adapter.to_canonical(vp)
predicates=["a > 1"], assert isinstance(canon, CanonicalPlan)
costs={"cpu": 1.0}, assert canon.projection == ["x", "y"]
) assert canon.predicates == ["x>5"]
can = map_postgres_to_canonical(lp) assert canon.estimated_cost == 2.5
assert isinstance(can, CanonicalPlan)
assert can.total_cost == 1.0
def test_mongo_adapter_maps_to_canonical():
lp = LocalProblem(
shard_id="mongo-1",
projected_attributes=["b"],
predicates=["b != NULL"],
costs={"io": 0.2},
)
can = map_mongo_to_canonical(lp)
assert isinstance(can, CanonicalPlan)
assert can.total_cost == 0.2

View File

@ -1,31 +1,22 @@
import math import json
from catopt_query.protocol import LocalProblem, SharedVariables, DualVariables, PlanDelta, DataContract, AuditLog from catopt_query.protocol import LocalProblem, SharedVariables, CanonicalPlan, PlanDelta
from catopt_query.core import map_local_to_canonical, aggregate_joint_plan
from catopt_query.core import CanonicalPlan
def test_local_problem_serialization(): def test_local_problem_serialization():
lp = LocalProblem( lp = LocalProblem(
shard_id="shard-1", shard_id="shard-1",
projected_attributes=["a", "b"], projected_attrs=["a", "b"],
predicates=["a > 0", "b < 100"], predicates=["a > 0"],
costs={"cpu": 1.2, "io": 0.5}, costs=12.5,
constraints={"timezone": "UTC"}, constraints={"limit": 100},
) )
d = lp.to_dict() d = lp.to_dict()
assert d["shard_id"] == "shard-1" lp2 = LocalProblem.from_dict(d)
assert d["predicates"] == ["a > 0", "b < 100"] assert lp == lp2
def test_local_to_canonical_mapping(): def test_canonical_plan_serialization():
lp = LocalProblem( cp = CanonicalPlan(projection=["a"], predicates=["a>0"], estimated_cost=3.14)
shard_id="s1", d = cp.to_dict()
projected_attributes=["x"], cp2 = CanonicalPlan.from_dict(d)
predicates=["x IS NOT NULL"], assert cp == cp2
costs={"cpu": 2.0},
)
can = map_local_to_canonical(lp)
assert isinstance(can, CanonicalPlan)
assert can.total_cost == 2.0
assert len(can.operations) == 1

12
tests/test_solver.py Normal file
View File

@ -0,0 +1,12 @@
from catopt_query.canonical import CanonicalPlan
from catopt_query.solver import optimize_across_shards
from catopt_query.protocol import PlanDelta
def test_solver_composes_plans():
p1 = CanonicalPlan(projection=["a"], predicates=["a>0"], estimated_cost=1.0)
p2 = CanonicalPlan(projection=["b"], predicates=["b<5"], estimated_cost=2.0)
delta = optimize_across_shards([p1, p2], bandwidth_budget=10.0)
assert isinstance(delta, PlanDelta)
assert "projections" in delta.changes
assert "aggregated_cost" in delta.changes