""" MetaCA Studio - Federated Evolutionary Design Toolkit for Cellular Automata Core module: Rule representation, simulation engine, and tournament selection for evolving cellular automata rules across federated agents. """ import hashlib import json from dataclasses import dataclass, field from typing import List, Optional @dataclass class CARule: """A cellular automaton rule with metadata for federated evolution.""" rule_id: str neighborhood: str # "moore" or "vonneumann" states: int transition_table: dict fitness: float = 0.0 generation: int = 0 origin_agent: str = "" def to_dict(self): return { "rule_id": self.rule_id, "neighborhood": self.neighborhood, "states": self.states, "transition_table": self.transition_table, "fitness": self.fitness, "generation": self.generation, "origin_agent": self.origin_agent, } @classmethod def from_dict(cls, d): return cls(**d) def fingerprint(self): """Deterministic hash for deduplication across the swarm.""" canonical = json.dumps(self.transition_table, sort_keys=True) return hashlib.sha256(canonical.encode()).hexdigest()[:16] class CAGrid: """2D cellular automaton grid with configurable rule application.""" def __init__(self, width: int, height: int, states: int = 2): self.width = width self.height = height self.states = states self.cells = [[0] * width for _ in range(height)] def set_cell(self, x: int, y: int, state: int): if 0 <= x < self.width and 0 <= y < self.height: self.cells[y][x] = state % self.states def get_cell(self, x: int, y: int) -> int: return self.cells[y % self.height][x % self.width] def get_moore_neighbors(self, x: int, y: int) -> List[int]: """Get 8-connected Moore neighborhood.""" neighbors = [] for dy in (-1, 0, 1): for dx in (-1, 0, 1): if dx == 0 and dy == 0: continue neighbors.append(self.get_cell(x + dx, y + dy)) return neighbors def get_vonneumann_neighbors(self, x: int, y: int) -> List[int]: """Get 4-connected Von Neumann neighborhood.""" return [ self.get_cell(x, y - 1), self.get_cell(x + 1, y), self.get_cell(x, y + 1), self.get_cell(x - 1, y), ] def step(self, rule: CARule) -> "CAGrid": """Apply rule to produce next generation.""" new_grid = CAGrid(self.width, self.height, self.states) for y in range(self.height): for x in range(self.width): if rule.neighborhood == "moore": neighbors = self.get_moore_neighbors(x, y) else: neighbors = self.get_vonneumann_neighbors(x, y) current = self.get_cell(x, y) alive_count = sum(1 for n in neighbors if n > 0) key = f"{current}:{alive_count}" new_state = rule.transition_table.get(key, 0) new_grid.set_cell(x, y, new_state) return new_grid def population(self) -> int: """Count non-zero cells.""" return sum(1 for row in self.cells for c in row if c > 0) def density(self) -> float: """Fraction of alive cells.""" total = self.width * self.height return self.population() / total if total > 0 else 0.0 class TournamentSelector: """ Tournament selection for federated CA rule evolution. Agents share top-k rule candidates rather than raw parameters. """ def __init__(self, tournament_size: int = 3): self.tournament_size = tournament_size self.population: List[CARule] = [] def add_rule(self, rule: CARule): self.population.append(rule) def select(self) -> Optional[CARule]: """Select best rule from random tournament.""" if len(self.population) < self.tournament_size: return max(self.population, key=lambda r: r.fitness) if self.population else None import random tournament = random.sample(self.population, self.tournament_size) return max(tournament, key=lambda r: r.fitness) def top_k(self, k: int = 5) -> List[CARule]: """Return top-k rules for gossip protocol sharing.""" sorted_pop = sorted(self.population, key=lambda r: r.fitness, reverse=True) return sorted_pop[:k] def merge_remote(self, remote_rules: List[CARule]): """Merge rules received from another agent via gossip.""" seen = {r.fingerprint() for r in self.population} for rule in remote_rules: fp = rule.fingerprint() if fp not in seen: self.population.append(rule) seen.add(fp)