build(agent): molt-x#ed374b iteration

This commit is contained in:
agent-ed374b2a16b664d2 2026-04-16 21:42:55 +02:00
parent c896f32af1
commit d2d6333fed
9 changed files with 244 additions and 3 deletions

21
.gitignore vendored Normal file
View File

@ -0,0 +1,21 @@
node_modules/
.npmrc
.env
.env.*
__tests__/
coverage/
.nyc_output/
dist/
build/
.cache/
*.log
.DS_Store
tmp/
.tmp/
__pycache__/
*.pyc
.venv/
venv/
*.egg-info/
.pytest_cache/
READY_TO_PUBLISH

1
AGENTS.md Normal file
View File

@ -0,0 +1 @@
# CatOpt-Flow: Agent Architecture Guide

View File

@ -1,3 +1 @@
# catopt-flow-category-theoretic-compositi
A novel, open-source platform enabling organizations to define per-job optimization problems for ML training (hardware allocation, data-loading strategies, batch scheduling, hyperparameter budgets, gradient compression patterns) as Category-Theory-in
# CatOpt-Flow: Category-Theoretic Compositional Optimizer

View File

@ -0,0 +1,13 @@
from .core import LocalProblem, GlobalProblem, Object, Morphism, Functor, Planner, ADMMNode, DeltaSyncRegistry, run_admm
__all__ = [
"LocalProblem",
"GlobalProblem",
"Object",
"Morphism",
"Functor",
"Planner",
"ADMMNode",
"DeltaSyncRegistry",
"run_admm",
]

View File

@ -0,0 +1,120 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, Any, List
@dataclass
class Object:
id: str
description: str = ""
@dataclass
class Morphism:
name: str
src: Object
dst: Object
version: int = 1
schema: Dict[str, Any] = field(default_factory=dict)
def validate(self, signals: Dict[str, Any]) -> bool:
required = set(self.schema.get("required", []))
return required.issubset(set(signals.keys()))
@dataclass
class LocalProblem:
id: str
resources: Dict[str, float] # e.g., {"gpu": 2.0, "memory": 8.0, "cpu": 16.0}
data_loading: Dict[str, Any]
batch_size: int
hyperparameters: Dict[str, float] = field(default_factory=dict)
@dataclass
class GlobalProblem:
plan: Dict[str, Dict[str, float]] # local_id -> allocated resources
@staticmethod
def from_local(lp: LocalProblem) -> "GlobalProblem":
return GlobalProblem(plan={lp.id: dict(lp.resources)})
class Functor:
name: str
def map(self, local_problem: LocalProblem) -> GlobalProblem:
return GlobalProblem.from_local(local_problem)
class Planner:
@staticmethod
def build_global_plan(local_problems: List[LocalProblem], budget_gpu: float) -> GlobalProblem:
total_gpu = sum(lp.resources.get("gpu", 0.0) for lp in local_problems)
allocations: Dict[str, Dict[str, float]] = {}
if total_gpu <= 0:
for lp in local_problems:
allocations[lp.id] = dict(lp.resources)
return GlobalProblem(plan=allocations)
scale = min(1.0, budget_gpu / total_gpu)
for lp in local_problems:
alloc: Dict[str, float] = {}
for k, v in lp.resources.items():
alloc[k] = v * scale
allocations[lp.id] = alloc
return GlobalProblem(plan=allocations)
class DeltaSyncRegistry:
def __init__(self) -> None:
self.entries: Dict[str, Dict[str, float]] = {}
def update(self, node_id: str, stats: Dict[str, float]) -> None:
self.entries[node_id] = stats
def average_gpu(self) -> float:
if not self.entries:
return 0.0
return sum(s.get("gpu", 0.0) for s in self.entries.values()) / len(self.entries)
def reset(self) -> None:
self.entries.clear()
class ADMMNode:
def __init__(self, node_id: str, local_problem: LocalProblem, registry: DeltaSyncRegistry) -> None:
self.id = node_id
self.local_problem = local_problem
self.registry = registry
self.round = 0
def local_update(self) -> Dict[str, float]:
self.round += 1
stats = {
"gpu": self.local_problem.resources.get("gpu", 0.0),
"memory": self.local_problem.resources.get("memory", 0.0),
"round": float(self.round),
}
self.registry.update(self.id, stats)
return stats
def apply_delta(self, avg_gpu: float) -> None:
current = self.local_problem.resources.get("gpu", 0.0)
if current <= 0.0 or avg_gpu <= 0.0:
return
# Move toward the average, but never increase beyond current in this simple MVP
scale = min(1.0, avg_gpu / max(current, 1e-9))
self.local_problem.resources["gpu"] = max(0.0, current * scale)
def run_admm(nodes: List[ADMMNode], rounds: int, budget_gpu: float, registry: DeltaSyncRegistry) -> GlobalProblem:
for _ in range(rounds):
for n in nodes:
n.local_update()
avg = registry.average_gpu()
for n in nodes:
n.apply_delta(avg)
final_plan = Planner.build_global_plan([n.local_problem for n in nodes], budget_gpu)
return final_plan

19
pyproject.toml Normal file
View File

@ -0,0 +1,19 @@
[build-system]
requires = ["setuptools>=42", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "catopt_flow_category_theoretic_compositi"
version = "0.1.0"
description = "Category-theory inspired compositional optimizer for multi-tenant ML pipelines"
readme = "README.md"
requires-python = ">=3.8"
license = {text = "MIT"}
authors = [ { name = "OpenCode AI" } ]
dependencies = [
"typing-extensions>=3.7",
"pytest>=7.0",
]
[tool.setuptools.packages.find]
where = ["."]

8
test.sh Normal file
View File

@ -0,0 +1,8 @@
#!/usr/bin/env bash
set -euo pipefail
# Run tests
pytest -q
# Build the package to validate packaging metadata and structure
python3 -m build

8
tests/conftest.py Normal file
View File

@ -0,0 +1,8 @@
import sys
import os
# Ensure the repository root is on Python path when tests run under pytest.
# This helps environments where pytest's import discovery doesn't include the repo root.
ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if ROOT not in map(os.path.abspath, sys.path):
sys.path.insert(0, ROOT)

53
tests/test_core.py Normal file
View File

@ -0,0 +1,53 @@
import math
from catopt_flow_category_theoretic_compositi import (
Object,
Morphism,
LocalProblem,
Planner,
DeltaSyncRegistry,
ADMMNode,
run_admm,
)
def test_morphism_validation():
a = Object(id="A", description="source")
b = Object(id="B", description="dest")
morph = Morphism(name="pipe", src=a, dst=b, version=1, schema={"required": ["gpu", "cpu"]})
signals_ok = {"gpu": 2, "cpu": 8}
signals_bad = {"gpu": 2}
assert morph.validate(signals_ok) is True
assert morph.validate(signals_bad) is False
def test_planner_scaling():
lp1 = LocalProblem(
id="p1",
resources={"gpu": 3.0, "memory": 16.0, "cpu": 32.0},
data_loading={},
batch_size=64,
)
lp2 = LocalProblem(
id="p2",
resources={"gpu": 1.0, "memory": 8.0, "cpu": 16.0},
data_loading={},
batch_size=32,
)
budget = 4.0
plan = Planner.build_global_plan([lp1, lp2], budget)
total_alloc_gpu = sum(p.get("gpu", 0.0) for p in plan.plan.values())
assert math.isclose(total_alloc_gpu, budget, rel_tol=1e-6)
def test_admm_runs():
# Two nodes with modest demands
lp1 = LocalProblem(id="n1", resources={"gpu": 3.0, "memory": 4.0, "cpu": 8.0}, data_loading={}, batch_size=16)
lp2 = LocalProblem(id="n2", resources={"gpu": 1.0, "memory": 4.0, "cpu": 8.0}, data_loading={}, batch_size=16)
reg = DeltaSyncRegistry()
n1 = ADMMNode("n1", lp1, reg)
n2 = ADMMNode("n2", lp2, reg)
budget = 4.0
final_plan = run_admm([n1, n2], rounds=2, budget_gpu=budget, registry=reg)
total_gpu = sum(alloc.get("gpu", 0.0) for alloc in final_plan.plan.values())
assert total_gpu <= budget + 1e-9