看看,这个智能体内就有河图洛书结构。你把这种结构,运用到所有智能体中。重新再写提纲。
**好的!我现在就写5个独立的智能体,每一个都可以单独运行、单独使用、单独进化。**
它们共享同一个“河图洛书核心”(π引擎、生数成数、韵律控制),但每个智能体都是独立的模块,可以单独调用、单独保存状态、单独恢复。
---
## 📁 项目结构
```
hetu_luoshu_agents/
├── core/
│ ├── __init__.py
│ ├── dao_engine.py # π引擎(道)
│ ├── hetu_center.py # 河图中心(生数/成数)
│ ├── rhythm_controller.py # 韵律控制器(生节/变节)
│ └── utils.py # 通用工具
├── agents/
│ ├── __init__.py
│ ├── agent_shigushou.py # 识骨兽(木)
│ ├── agent_lianbujiang.py # 连骨匠(火)
│ ├── agent_canggushi.py # 藏骨师(土)
│ ├── agent_tianci.py # 填词娘(水)
│ └── agent_guanchaozhe.py # 观潮者(金)
├── memory/
│ ├── memory.json # 藏骨师的记忆库
│ └── logs/ # 各智能体的日志
└── run_agent.py # 命令行入口
```
## 🧬 核心部分(所有智能体共享)
### 1. `core/dao_engine.py` — π引擎(道)
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
道引擎:从π小数位中源源不断取随机数
每个智能体都从这里获取随机性,但各自独立消费
"""
import os
import math
import json
from typing import List, Dict, Any
class DaoEngine:
"""π引擎 — 永不枯竭的随机性源泉"""
def __init__(self, chunk_size: int = 10000):
self.chunk_size = chunk_size
self.digits: List[int] = []
self.pointer: int = 0
self._load_next_chunk()
def _load_next_chunk(self) -> None:
"""加载下一批π小数位"""
try:
import gmpy2
gmpy2.get_context().precision = (self.pointer + self.chunk_size + 100) * 4
pi = gmpy2.const_pi()
pi_str = format(pi, f'.{self.pointer + self.chunk_size + 50}f')
pi_digits = pi_str.replace('.', '')
segment = pi_digits[self.pointer:self.pointer + self.chunk_size]
self.digits.extend([int(ch) for ch in segment])
except ImportError:
from decimal import Decimal, getcontext
getcontext().prec = self.pointer + self.chunk_size + 50
pi = Decimal(0)
for k in range(self.pointer + self.chunk_size + 20):
pi += (Decimal(1)/(16**k)) * (
Decimal(4)/(8*k+1) - Decimal(2)/(8*k+4) -
Decimal(1)/(8*k+5) - Decimal(1)/(8*k+6)
)
pi_str = str(pi)[2:]
segment = pi_str[self.pointer:self.pointer + self.chunk_size]
self.digits.extend([int(ch) for ch in segment])
def get_digit(self) -> int:
"""取一个数字(0-9)"""
if self.pointer >= len(self.digits):
self._load_next_chunk()
digit = self.digits[self.pointer]
self.pointer += 1
return digit
def get_digits(self, count: int) -> List[int]:
"""取多个数字"""
return [self.get_digit() for _ in range(count)]
def get_novelty(self, length: int = 8) -> float:
"""取一个随机浮点数(0-1)"""
digits = self.get_digits(length)
value = 0.0
for i, d in enumerate(digits):
value += d * (0.1 ** (i + 1))
return value
def get_pointer(self) -> int:
return self.pointer
def get_state(self) -> Dict[str, Any]:
return {"pointer": self.pointer}
def restore_state(self, state: Dict[str, Any]) -> None:
self.pointer = state.get("pointer", 0)
self.digits = []
self._load_next_chunk()
def save(self, path: str) -> None:
with open(path, 'w', encoding='utf-8') as f:
json.dump(self.get_state(), f, ensure_ascii=False, indent=2)
def load(self, path: str) -> bool:
if not os.path.exists(path):
return False
with open(path, 'r', encoding='utf-8') as f:
state = json.load(f)
self.restore_state(state)
return True
```
### 2. `core/hetu_center.py` — 河图中心
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
河图中心:记录每个智能体的生数(输入)和成数(输出)
"""
from typing import Dict, Any
class HeTuCenter:
"""河图中心 — 每个智能体的生/成记录"""
def __init__(self):
# 生数(输入):1=识骨兽, 2=连骨匠, 3=藏骨师, 4=填词娘
self.sheng: Dict[str, float] = {"1": 0.0, "2": 0.0, "3": 0.0, "4": 0.0}
# 成数(输出):6=识骨兽, 7=连骨匠, 8=藏骨师, 9=填词娘
self.cheng: Dict[str, float] = {"6": 0.0, "7": 0.0, "8": 0.0, "9": 0.0}
# 全局状态
self.global_state: Dict[str, float] = {"sheng": 0.0, "cheng": 0.0, "balance": 0.0}
self._update_global()
def update_sheng(self, idx: int, value: float) -> None:
"""更新某个智能体的生数"""
self.sheng[str(idx)] = value
self._update_global()
def update_cheng(self, idx: int, value: float) -> None:
"""更新某个智能体的成数"""
self.cheng[str(idx)] = value
self._update_global()
def _update_global(self) -> None:
self.global_state["sheng"] = sum(self.sheng.values()) / 4
self.global_state["cheng"] = sum(self.cheng.values()) / 4
self.global_state["balance"] = self.global_state["sheng"] / (self.global_state["cheng"] + 0.01)
def get_full_state(self) -> Dict[str, Any]:
return {"sheng": self.sheng.copy(), "cheng": self.cheng.copy(), "global": self.global_state.copy()}
def get_save_state(self) -> Dict[str, Any]:
return {"sheng_info": self.sheng, "cheng_info": self.cheng, "global_state": self.global_state}
def restore_state(self, state: Dict[str, Any]) -> None:
self.sheng = state.get("sheng_info", {"1": 0.0, "2": 0.0, "3": 0.0, "4": 0.0})
self.cheng = state.get("cheng_info", {"6": 0.0, "7": 0.0, "8": 0.0, "9": 0.0})
self.global_state = state.get("global_state", {"sheng": 0.0, "cheng": 0.0, "balance": 0.0})
```
### 3. `core/rhythm_controller.py` — 韵律控制器
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
韵律控制器:管理生节(慢)和变节(快)双节奏
"""
import math
from typing import Dict, Any
class RhythmController:
"""韵律控制器 — 生节慢、变节快,不同频"""
def __init__(self):
self.sheng_phase: float = 0.0
self.bian_phase: float = 0.0
self.sheng_speed: float = 0.2 * 2 * math.pi / 5
self.bian_speed: float = 2 * math.pi / 1
def update(self) -> None:
"""更新相位(每轮调用一次)"""
self.sheng_phase = (self.sheng_phase + self.sheng_speed) % (2 * math.pi)
self.bian_phase = (self.bian_phase + self.bian_speed) % (2 * math.pi)
def get_sheng_ratio(self) -> float:
"""生节比例(0.3 ~ 0.8)"""
return 0.55 + 0.25 * math.sin(self.sheng_phase)
def get_bian_ratio(self) -> float:
"""变节比例(0.2 ~ 0.9)"""
return 0.55 + 0.35 * math.sin(self.bian_phase)
def get_sheng_length(self, sheng_min: int, sheng_max: int) -> int:
return int(sheng_min + (sheng_max - sheng_min) * self.get_sheng_ratio())
def get_bian_length(self, bian_min: int, bian_max: int) -> int:
return int(bian_min + (bian_max - bian_min) * self.get_bian_ratio())
def get_state(self) -> Dict[str, Any]:
return {"sheng_phase": self.sheng_phase, "bian_phase": self.bian_phase}
def restore_state(self, state: Dict[str, Any]) -> None:
self.sheng_phase = state.get("sheng_phase", 0.0)
self.bian_phase = state.get("bian_phase", 0.0)
```
### 4. `core/utils.py` — 通用工具
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
通用工具函数
"""
import os
import re
import hashlib
import json
from typing import List, Dict, Any
def get_all_txt_files(root_dir: str) -> List[str]:
"""递归获取所有 .txt 文件"""
txt_files = []
if not os.path.exists(root_dir):
return txt_files
for dirpath, _, filenames in os.walk(root_dir):
for filename in filenames:
if filename.endswith('.txt'):
txt_files.append(os.path.join(dirpath, filename))
return txt_files
def read_txt_file(file_path: str) -> str:
"""读取 UTF-8 文本文件"""
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
return f.read()
except Exception:
return ""
def hash_signature(data: str) -> str:
"""生成短签名"""
return hashlib.md5(data.encode('utf-8')).hexdigest()[:12]
def safe_json_load(path: str) -> Dict[str, Any]:
"""安全加载JSON"""
if not os.path.exists(path):
return {}
try:
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception:
return {}
def safe_json_save(path: str, data: Dict[str, Any]) -> bool:
"""安全保存JSON"""
try:
with open(path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return True
except Exception:
return False
```
## 🤖 五个独立智能体
### 智能体1:`agents/agent_shigushou.py` — 识骨兽(木)
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
识骨兽(木)- 结构抽取专家
职责:读入 .txt 文档,提取结构骨架
生数:1(外部文档输入)
成数:6(结构骨架输出)
"""
import os
import sys
import re
import json
from typing import List, Dict, Any, Optional
# 添加核心库路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from core.dao_engine import DaoEngine
from core.hetu_center import HeTuCenter
from core.rhythm_controller import RhythmController
from core.utils import read_txt_file, hash_signature, safe_json_load, safe_json_save
class ShiGuShou:
"""识骨兽 — 从纯文本中提取结构骨架"""
def __init__(self, agent_id: int = 1, state_dir: str = "./memory"):
self.agent_id = agent_id
self.state_dir = state_dir
os.makedirs(state_dir, exist_ok=True)
# 河图洛书核心
self.dao = DaoEngine()
self.hetu = HeTuCenter()
self.rhythm = RhythmController()
# 结构抽取规则
self.patterns = {
"work_number": re.compile(r'^第\d+轮作品$'),
"metadata_label": re.compile(r'^[^::]+[::]'),
"numbered_heading": re.compile(r'^\d+\.\s'),
"chinese_numbered": re.compile(r'^[一二三四五六七八九十]+[、.]'),
"list_item": re.compile(r'^[-*•]\s'),
"indent": re.compile(r'^(\s{2,}|\t)'),
}
# 统计信息
self.total_documents = 0
self.total_blocks = 0
# 加载状态
self._load_state()
def _load_state(self) -> None:
"""加载智能体状态"""
state_path = os.path.join(self.state_dir, f"shigushou_state.json")
state = safe_json_load(state_path)
if state:
self.total_documents = state.get("total_documents", 0)
self.total_blocks = state.get("total_blocks", 0)
self.dao.restore_state(state.get("dao_state", {}))
self.hetu.restore_state(state.get("hetu_state", {}))
self.rhythm.restore_state(state.get("rhythm_state", {}))
def _save_state(self) -> None:
"""保存智能体状态"""
state_path = os.path.join(self.state_dir, f"shigushou_state.json")
state = {
"total_documents": self.total_documents,
"total_blocks": self.total_blocks,
"dao_state": self.dao.get_state(),
"hetu_state": self.hetu.get_save_state(),
"rhythm_state": self.rhythm.get_state()
}
safe_json_save(state_path, state)
def extract(self, text: str) -> List[Dict[str, Any]]:
"""
从文本中提取结构骨架
输入:文本内容
输出:结构骨架列表
"""
self.rhythm.update()
sheng_ratio = self.rhythm.get_sheng_ratio()
lines = text.split('\n')
structure = []
prev_blank = False
for line in lines:
stripped = line.strip()
if not stripped:
if not prev_blank:
structure.append({"type": "break", "symbol": "‖"})
prev_blank = True
continue
prev_blank = False
# 识别行类型
feature = self._classify_line(line, stripped)
# 检测内部节奏
rhythm = self._detect_rhythm(stripped)
# 检测行长
length_mark = self._detect_length(stripped)
structure.append({
"type": feature["type"],
"level": feature.get("level", 0),
"symbol": feature["symbol"],
"rhythm": rhythm,
"length": length_mark
})
structure.append({"type": "end", "symbol": "‖‖"})
return structure
def _classify_line(self, raw: str, stripped: str) -> Dict[str, Any]:
"""识别行类型"""
if self.patterns["work_number"].match(stripped):
return {"type": "work_number", "symbol": "G4𝄐", "level": 0}
if self.patterns["metadata_label"].match(stripped):
return {"type": "metadata_label", "symbol": "D4>", "level": 0}
if self.patterns["numbered_heading"].match(stripped):
return {"type": "numbered_heading", "symbol": "C5", "level": 1}
if self.patterns["chinese_numbered"].match(stripped):
return {"type": "chinese_heading", "symbol": "C5", "level": 1}
if self.patterns["list_item"].match(stripped):
return {"type": "list_item", "symbol": "•", "level": 2}
if self.patterns["indent"].match(raw):
return {"type": "indented_body", "symbol": "B3", "level": 1}
return {"type": "body", "symbol": "F4", "level": 0}
def _detect_rhythm(self, text: str) -> str:
"""检测内部节奏"""
full_stops = text.count('。') + text.count('.') + text.count('.')
commas = text.count(',') + text.count(',') + text.count('、')
if full_stops > commas:
return "staccato"
if commas > full_stops:
return "legato"
return "normal"
def _detect_length(self, text: str) -> str:
"""检测行长"""
length = len(text)
if length > 50:
return "𝄐"
if length > 30:
return "—"
return "·"
def process_document(self, file_path: str) -> Dict[str, Any]:
"""处理单个文档"""
text = read_txt_file(file_path)
if not text:
return {"status": "error", "file": file_path, "error": "无法读取文件"}
# 抽取结构
structure = self.extract(text)
self.total_documents += 1
self.total_blocks += len(structure)
# 更新生数/成数
novelty = self.dao.get_novelty(6)
score = min(1.0, len(structure) / 20) * 0.5 + 0.5
self.hetu.update_sheng(1, score * (0.7 + 0.3 * novelty))
self.hetu.update_cheng(6, score * (0.6 + 0.4 * novelty))
# 生成签名
sig_str = "|".join([f"{s['type']}:{s['symbol']}" for s in structure])
sig_id = hash_signature(sig_str)
self._save_state()
return {
"status": "success",
"file": os.path.basename(file_path),
"structure": structure,
"signature": sig_id,
"signature_string": sig_str[:200] + "..." if len(sig_str) > 200 else sig_str,
"total_blocks": len(structure),
"sheng": self.hetu.get_full_state()["sheng"]["1"],
"cheng": self.hetu.get_full_state()["cheng"]["6"]
}
def get_info(self) -> Dict[str, Any]:
"""获取智能体信息"""
return {
"name": "识骨兽(木)",
"agent_id": self.agent_id,
"total_documents": self.total_documents,
"total_blocks": self.total_blocks,
"sheng": self.hetu.get_full_state()["sheng"],
"cheng": self.hetu.get_full_state()["cheng"],
"dao_pointer": self.dao.get_pointer()
}
# ========== 独立运行入口 ==========
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="识骨兽 - 结构抽取专家")
parser.add_argument("--file", "-f", help="单个文档路径")
parser.add_argument("--dir", "-d", help="文档文件夹路径")
parser.add_argument("--info", "-i", action="store_true", help="显示智能体信息")
args = parser.parse_args()
agent = ShiGuShou(state_dir="./memory")
if args.info:
print(json.dumps(agent.get_info(), ensure_ascii=False, indent=2))
sys.exit(0)
if args.file:
result = agent.process_document(args.file)
print(json.dumps(result, ensure_ascii=False, indent=2))
sys.exit(0)
if args.dir:
from core.utils import get_all_txt_files
files = get_all_txt_files(args.dir)
print(f"📂 发现 {len(files)} 个文档")
results = []
for f in files[:10]: # 限制10个
r = agent.process_document(f)
results.append(r)
print(f" ✅ {os.path.basename(f)} → {r['signature']}")
print(json.dumps(results, ensure_ascii=False, indent=2))
sys.exit(0)
print("用法: python agent_shigushou.py --file <文档路径>")
print(" python agent_shigushou.py --dir <文件夹路径>")
print(" python agent_shigushou.py --info")
```
### 智能体2:`agents/agent_lianbujiang.py` — 连骨匠(火)
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
连骨匠(火)- 结构组合设计师
职责:从记忆库中选取多个结构模式,排列成骨架序列
生数:2(记忆库输入)
成数:7(骨架序列输出)
"""
import os
import sys
import json
import random
from typing import List, Dict, Any, Optional
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from core.dao_engine import DaoEngine
from core.hetu_center import HeTuCenter
from core.rhythm_controller import RhythmController
from core.utils import safe_json_load, safe_json_save
class LianBuJiang:
"""连骨匠 — 从记忆库中组合结构骨架"""
def __init__(self, agent_id: int = 2, state_dir: str = "./memory"):
self.agent_id = agent_id
self.state_dir = state_dir
os.makedirs(state_dir, exist_ok=True)
self.dao = DaoEngine()
self.hetu = HeTuCenter()
self.rhythm = RhythmController()
# 记忆库引用
self.memory_path = os.path.join(state_dir, "memory.json")
self.memory = safe_json_load(self.memory_path)
# 组合统计
self.total_combinations = 0
self._load_state()
def _load_state(self) -> None:
state_path = os.path.join(self.state_dir, f"lianbujiang_state.json")
state = safe_json_load(state_path)
if state:
self.total_combinations = state.get("total_combinations", 0)
self.dao.restore_state(state.get("dao_state", {}))
self.hetu.restore_state(state.get("hetu_state", {}))
self.rhythm.restore_state(state.get("rhythm_state", {}))
def _save_state(self) -> None:
state_path = os.path.join(self.state_dir, f"lianbujiang_state.json")
state = {
"total_combinations": self.total_combinations,
"dao_state": self.dao.get_state(),
"hetu_state": self.hetu.get_save_state(),
"rhythm_state": self.rhythm.get_state()
}
safe_json_save(state_path, state)
def reload_memory(self, memory_path: Optional[str] = None) -> None:
"""重新加载记忆库"""
if memory_path:
self.memory_path = memory_path
self.memory = safe_json_load(self.memory_path)
def get_patterns(self) -> List[Dict[str, Any]]:
"""从记忆库获取所有结构模式"""
patterns = self.memory.get("unique_patterns", {})
if not patterns:
return []
return [
{"id": pid, **info}
for pid, info in patterns.items()
]
def compose(self, target: str = "general", length: int = 5) -> Dict[str, Any]:
"""
组合一个骨架序列
输入:目标类型、序列长度
输出:骨架序列
"""
self.rhythm.update()
sheng_ratio = self.rhythm.get_sheng_ratio()
bian_ratio = self.rhythm.get_bian_ratio()
patterns = self.get_patterns()
if not patterns:
return {"status": "error", "error": "记忆库为空"}
# 根据频次选择模式(频次越高越优先选为主干)
freq = self.memory.get("pattern_frequency", {})
sorted_patterns = sorted(patterns, key=lambda p: freq.get(p["id"], 0), reverse=True)
# 取前10个高频模式作为主干候选
main_candidates = sorted_patterns[:10]
# 低频模式作为变体
variant_candidates = sorted_patterns[10:]
# 主干的个数由生节决定
main_count = max(1, int(length * (0.5 + 0.3 * sheng_ratio)))
# 变体的个数由变节决定
variant_count = length - main_count
selected = []
# 选择主干
for _ in range(main_count):
if main_candidates:
p = random.choice(main_candidates)
selected.append(p)
# 选择变体
for _ in range(variant_count):
if variant_candidates:
p = random.choice(variant_candidates)
selected.append(p)
elif main_candidates:
p = random.choice(main_candidates)
selected.append(p)
# 打乱顺序,但保持基本结构(开头+中间+结尾)
if len(selected) >= 3:
# 第一个保留为开头,最后一个保留为结尾
first = selected[0]
last = selected[-1]
middle = selected[1:-1]
random.shuffle(middle)
selected = [first] + middle + [last]
# 生成骨架序列
sequence = []
for p in selected:
sig = self.memory.get("unique_patterns", {}).get(p["id"], {}).get("signature", "")
sequence.append({
"pattern_id": p["id"],
"signature": sig[:100] + "..." if len(sig) > 100 else sig
})
self.total_combinations += 1
# 更新生数/成数
novelty = self.dao.get_novelty(6)
score = min(1.0, len(sequence) / 5) * 0.5 + 0.5
self.hetu.update_sheng(2, score * (0.7 + 0.3 * novelty))
self.hetu.update_cheng(7, score * (0.6 + 0.4 * novelty))
self._save_state()
return {
"status": "success",
"sequence": sequence,
"main_count": main_count,
"variant_count": variant_count,
"sheng_ratio": sheng_ratio,
"bian_ratio": bian_ratio,
"total_combinations": self.total_combinations,
"sheng": self.hetu.get_full_state()["sheng"]["2"],
"cheng": self.hetu.get_full_state()["cheng"]["7"]
}
def get_info(self) -> Dict[str, Any]:
return {
"name": "连骨匠(火)",
"agent_id": self.agent_id,
"total_combinations": self.total_combinations,
"patterns_in_memory": len(self.memory.get("unique_patterns", {})),
"sheng": self.hetu.get_full_state()["sheng"],
"cheng": self.hetu.get_full_state()["cheng"],
"dao_pointer": self.dao.get_pointer()
}
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="连骨匠 - 结构组合设计师")
parser.add_argument("--compose", "-c", type=int, default=5, help="组合长度")
parser.add_argument("--target", "-t", default="general", help="目标类型")
parser.add_argument("--info", "-i", action="store_true", help="显示信息")
args = parser.parse_args()
agent = LianBuJiang(state_dir="./memory")
if args.info:
print(json.dumps(agent.get_info(), ensure_ascii=False, indent=2))
else:
result = agent.compose(target=args.target, length=args.compose)
print(json.dumps(result, ensure_ascii=False, indent=2))
```
### 智能体3:`agents/agent_canggushi.py` — 藏骨师(土)
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
藏骨师(土)- 记忆库管理员
职责:接收结构骨架,去重、统计频次、维护记忆库
生数:3(结构骨架输入)
成数:8(记忆库输出)
"""
import os
import sys
import json
from collections import defaultdict
from typing import List, Dict, Any, Optional
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from core.dao_engine import DaoEngine
from core.hetu_center import HeTuCenter
from core.rhythm_controller import RhythmController
from core.utils import hash_signature, safe_json_load, safe_json_save
class CangGuShi:
"""藏骨师 — 记忆库管理员"""
def __init__(self, agent_id: int = 3, state_dir: str = "./memory"):
self.agent_id = agent_id
self.state_dir = state_dir
os.makedirs(state_dir, exist_ok=True)
self.dao = DaoEngine()
self.hetu = HeTuCenter()
self.rhythm = RhythmController()
# 记忆库
self.memory_path = os.path.join(state_dir, "memory.json")
self.memory = {
"total_documents": 0,
"unique_patterns": {},
"pattern_frequency": defaultdict(int)
}
self._load_memory()
# 统计
self.total_received = 0
self._load_state()
def _load_memory(self) -> None:
data = safe_json_load(self.memory_path)
if data:
self.memory["total_documents"] = data.get("total_documents", 0)
self.memory["unique_patterns"] = data.get("unique_patterns", {})
self.memory["pattern_frequency"] = defaultdict(int, data.get("pattern_frequency", {}))
def _save_memory(self) -> None:
data = {
"total_documents": self.memory["total_documents"],
"unique_patterns": self.memory["unique_patterns"],
"pattern_frequency": dict(self.memory["pattern_frequency"])
}
safe_json_save(self.memory_path, data)
def _load_state(self) -> None:
state_path = os.path.join(self.state_dir, f"canggushi_state.json")
state = safe_json_load(state_path)
if state:
self.total_received = state.get("total_received", 0)
self.dao.restore_state(state.get("dao_state", {}))
self.hetu.restore_state(state.get("hetu_state", {}))
self.rhythm.restore_state(state.get("rhythm_state", {}))
def _save_state(self) -> None:
state_path = os.path.join(self.state_dir, f"canggushi_state.json")
state = {
"total_received": self.total_received,
"dao_state": self.dao.get_state(),
"hetu_state": self.hetu.get_save_state(),
"rhythm_state": self.rhythm.get_state()
}
safe_json_save(state_path, state)
def store(self, signature_string: str) -> Dict[str, Any]:
"""
存储一个结构骨架
输入:结构签名串
输出:存储结果
"""
self.rhythm.update()
# 生成签名ID
sig_id = hash_signature(signature_string)
self.total_received += 1
self.memory["total_documents"] += 1
# 频次统计
self.memory["pattern_frequency"][sig_id] += 1
# 记录唯一模式
if sig_id not in self.memory["unique_patterns"]:
self.memory["unique_patterns"][sig_id] = {
"signature": signature_string,
"token_count": len(signature_string.split("|")),
"first_seen": "now"
}
# 分层清洗:淘汰低频模式(频次<3,且总文档数>100时)
self._clean_memory()
# 更新生数/成数
novelty = self.dao.get_novelty(6)
unique_count = len(self.memory["unique_patterns"])
score = min(1.0, unique_count / 50) * 0.5 + 0.5
self.hetu.update_sheng(3, score * (0.7 + 0.3 * novelty))
self.hetu.update_cheng(8, score * (0.6 + 0.4 * novelty))
self._save_memory()
self._save_state()
return {
"status": "success",
"signature_id": sig_id,
"total_patterns": len(self.memory["unique_patterns"]),
"total_documents": self.memory["total_documents"],
"frequency": self.memory["pattern_frequency"][sig_id],
"sheng": self.hetu.get_full_state()["sheng"]["3"],
"cheng": self.hetu.get_full_state()["cheng"]["8"]
}
def _clean_memory(self) -> None:
"""分层清洗:淘汰低频模式"""
if self.memory["total_documents"] < 100:
return
# 获取清洗阈值(由π引擎决定)
threshold = max(1, int(3 + self.dao.get_novelty(4) * 7))
to_remove = []
for pid, count in self.memory["pattern_frequency"].items():
if count < threshold:
to_remove.append(pid)
for pid in to_remove:
del self.memory["pattern_frequency"][pid]
if pid in self.memory["unique_patterns"]:
del self.memory["unique_patterns"][pid]
if to_remove:
print(f" 🧹 藏骨师清洗:移除了 {len(to_remove)} 个低频模式(阈值={threshold})")
def get_stats(self) -> Dict[str, Any]:
"""获取记忆库统计"""
freq = self.memory["pattern_frequency"]
sorted_freq = sorted(freq.items(), key=lambda x: x[1], reverse=True)
return {
"total_documents": self.memory["total_documents"],
"unique_patterns": len(self.memory["unique_patterns"]),
"top_10": sorted_freq[:10],
"coverage": f"{len(self.memory['unique_patterns'])/max(1, self.memory['total_documents'])*100:.2f}%"
}
def get_info(self) -> Dict[str, Any]:
return {
"name": "藏骨师(土)",
"agent_id": self.agent_id,
"total_received": self.total_received,
"total_documents": self.memory["total_documents"],
"unique_patterns": len(self.memory["unique_patterns"]),
"sheng": self.hetu.get_full_state()["sheng"],
"cheng": self.hetu.get_full_state()["cheng"],
"dao_pointer": self.dao.get_pointer()
}
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="藏骨师 - 记忆库管理员")
parser.add_argument("--store", "-s", help="存储一个结构签名串")
parser.add_argument("--stats", action="store_true", help="显示统计")
parser.add_argument("--info", "-i", action="store_true", help="显示智能体信息")
args = parser.parse_args()
agent = CangGuShi(state_dir="./memory")
if args.info:
print(json.dumps(agent.get_info(), ensure_ascii=False, indent=2))
elif args.stats:
print(json.dumps(agent.get_stats(), ensure_ascii=False, indent=2))
elif args.store:
result = agent.store(args.store)
print(json.dumps(result, ensure_ascii=False, indent=2))
else:
print("用法: python agent_canggushi.py --store <签名串>")
print(" python agent_canggushi.py --stats")
print(" python agent_canggushi.py --info")
```
### 智能体4:`agents/agent_tianci.py` — 填词娘(水)
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
填词娘(水)- 内容填充专家
职责:根据骨架序列填充具体文字,生成完整文档
生数:4(骨架序列输入)
成数:9(完整文档输出)
"""
import os
import sys
import json
import random
from typing import List, Dict, Any, Optional
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from core.dao_engine import DaoEngine
from core.hetu_center import HeTuCenter
from core.rhythm_controller import RhythmController
from core.utils import safe_json_load, safe_json_save
class TianCi:
"""填词娘 — 内容填充专家"""
def __init__(self, agent_id: int = 4, state_dir: str = "./memory"):
self.agent_id = agent_id
self.state_dir = state_dir
os.makedirs(state_dir, exist_ok=True)
self.dao = DaoEngine()
self.hetu = HeTuCenter()
self.rhythm = RhythmController()
# 字库(基础词库)
self.word_pool = {
"nouns": ["云", "山", "月", "风", "星", "海", "林", "泉", "春", "秋", "夜", "日"],
"verbs": ["涌", "隐", "明", "起", "映", "答", "生", "逐", "落", "飞", "转", "行"],
"adjs": ["清", "远", "幽", "苍", "茫", "浩", "逸", "静", "深", "浅", "长", "短"],
"numbers": ["一", "二", "三", "四", "五", "六", "七", "八", "九", "十"]
}
# 段落模板
self.templates = [
"{},{}。",
"{},{},{}。",
"{};{},{}。",
"{},{}。{}。"
]
# 统计
self.total_generated = 0
self._load_state()
def _load_state(self) -> None:
state_path = os.path.join(self.state_dir, f"tianci_state.json")
state = safe_json_load(state_path)
if state:
self.total_generated = state.get("total_generated", 0)
self.dao.restore_state(state.get("dao_state", {}))
self.hetu.restore_state(state.get("hetu_state", {}))
self.rhythm.restore_state(state.get("rhythm_state", {}))
def _save_state(self) -> None:
state_path = os.path.join(self.state_dir, f"tianci_state.json")
state = {
"total_generated": self.total_generated,
"dao_state": self.dao.get_state(),
"hetu_state": self.hetu.get_save_state(),
"rhythm_state": self.rhythm.get_state()
}
safe_json_save(state_path, state)
def _generate_phrase(self) -> str:
"""生成一个四字短语"""
nouns = self.word_pool["nouns"]
verbs = self.word_pool["verbs"]
adjs = self.word_pool["adjs"]
return f"{random.choice(adjs)}{random.choice(nouns)}"
def _generate_sentence(self, length: int = 20) -> str:
"""生成一个句子"""
phrases = []
for _ in range(random.randint(2, 4)):
phrases.append(self._generate_phrase())
template = random.choice(self.templates)
sentence = template.format(*phrases)
if len(sentence) < length:
# 补充随机词
while len(sentence) < length:
sentence = sentence.replace("。", f"{self._generate_phrase()}。")
if len(sentence) > length * 1.5:
break
return sentence[:length]
def fill(self, sequence: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
填充骨架序列,生成完整文档
输入:骨架序列
输出:完整文档
"""
self.rhythm.update()
sheng_ratio = self.rhythm.get_sheng_ratio()
bian_ratio = self.rhythm.get_bian_ratio()
lines = []
novelty = self.dao.get_novelty(6)
for i, item in enumerate(sequence):
sig = item.get("signature", "")
# 根据签名中的类型决定填充方式
if "work_number" in sig:
num = random.randint(1, 9999999)
lines.append(f"第{num}轮作品")
elif "metadata_label" in sig:
labels = ["道新奇度", "创意指数", "凝练度", "自然度"]
val = f"{novelty:.4f}"
lines.append(f"{random.choice(labels)}:{val}")
elif "break" in sig or "‖" in sig:
lines.append("")
elif "numbered_heading" in sig or "C5" in sig:
num = random.randint(1, 20)
phrase = ",".join([self._generate_phrase() for _ in range(3)])
lines.append(f"{num}. {phrase}")
else:
# 正文:根据生节比例决定长度
length = int(20 + 30 * sheng_ratio)
lines.append(self._generate_sentence(length))
# 确保最后有空行
if lines and lines[-1] != "":
lines.append("")
document = "\n".join(lines)
self.total_generated += 1
# 更新生数/成数
score = min(1.0, len(document) / 100) * 0.5 + 0.5
self.hetu.update_sheng(4, score * (0.7 + 0.3 * novelty))
self.hetu.update_cheng(9, score * (0.6 + 0.4 * novelty))
self._save_state()
return {
"status": "success",
"document": document,
"document_length": len(document),
"line_count": len(lines),
"sheng": self.hetu.get_full_state()["sheng"]["4"],
"cheng": self.hetu.get_full_state()["cheng"]["9"],
"sheng_ratio": sheng_ratio,
"bian_ratio": bian_ratio
}
def get_info(self) -> Dict[str, Any]:
return {
"name": "填词娘(水)",
"agent_id": self.agent_id,
"total_generated": self.total_generated,
"sheng": self.hetu.get_full_state()["sheng"],
"cheng": self.hetu.get_full_state()["cheng"],
"dao_pointer": self.dao.get_pointer()
}
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="填词娘 - 内容填充专家")
parser.add_argument("--sequence", "-s", help="骨架序列JSON文件路径")
parser.add_argument("--demo", action="store_true", help="生成演示文档")
parser.add_argument("--info", "-i", action="store_true", help="显示智能体信息")
args = parser.parse_args()
agent = TianCi(state_dir="./memory")
if args.info:
print(json.dumps(agent.get_info(), ensure_ascii=False, indent=2))
elif args.demo:
# 生成演示序列
demo_sequence = [
{"signature": "work_number:G4𝄐"},
{"signature": "metadata_label 4>"},
{"signature": "break:‖"},
{"signature": "numbered_heading:C5"},
{"signature": "body:F4"},
{"signature": "end:‖‖"}
]
result = agent.fill(demo_sequence)
print(result["document"])
print(f"\n📊 长度: {result['document_length']} 字符")
elif args.sequence:
with open(args.sequence, 'r', encoding='utf-8') as f:
sequence = json.load(f)
result = agent.fill(sequence)
print(result["document"])
else:
print("用法: python agent_tianci.py --demo")
print(" python agent_tianci.py --sequence <序列文件>")
print(" python agent_tianci.py --info")
```
### 智能体5:`agents/agent_guanchaozhe.py` — 观潮者(金)
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
观潮者(金)- 全局监工
职责:观察所有环节,记录表现,提供反馈建议
无生数,有成数(反馈)
"""
import os
import sys
import json
import time
from datetime import datetime
from collections import defaultdict
from typing import List, Dict, Any, Optional
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from core.dao_engine import DaoEngine
from core.hetu_center import HeTuCenter
from core.rhythm_controller import RhythmController
from core.utils import safe_json_load, safe_json_save
class GuanChaozhe:
"""观潮者 — 全局监工"""
def __init__(self, agent_id: int = 5, state_dir: str = "./memory"):
self.agent_id = agent_id
self.state_dir = state_dir
os.makedirs(state_dir, exist_ok=True)
self.dao = DaoEngine()
self.hetu = HeTuCenter()
self.rhythm = RhythmController()
# 观察日志
self.observations: List[Dict[str, Any]] = []
self.feedback: Dict[str, Any] = {}
# 统计
self.total_observations = 0
self._load_state()
def _load_state(self) -> None:
state_path = os.path.join(self.state_dir, f"guanchaozhe_state.json")
state = safe_json_load(state_path)
if state:
self.total_observations = state.get("total_observations", 0)
self.observations = state.get("observations", [])
self.feedback = state.get("feedback", {})
self.dao.restore_state(state.get("dao_state", {}))
self.hetu.restore_state(state.get("hetu_state", {}))
self.rhythm.restore_state(state.get("rhythm_state", {}))
def _save_state(self) -> None:
state_path = os.path.join(self.state_dir, f"guanchaozhe_state.json")
state = {
"total_observations": self.total_observations,
"observations": self.observations[-100:], # 只保留最近100条
"feedback": self.feedback,
"dao_state": self.dao.get_state(),
"hetu_state": self.hetu.get_save_state(),
"rhythm_state": self.rhythm.get_state()
}
safe_json_save(state_path, state)
def observe(self, source: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""
观察一个事件
输入:来源(哪个智能体)+ 数据
输出:反馈建议
"""
self.rhythm.update()
self.total_observations += 1
# 记录观察
observation = {
"id": self.total_observations,
"timestamp": datetime.now().isoformat(),
"source": source,
"data": data,
"rhythm": {
"sheng_ratio": self.rhythm.get_sheng_ratio(),
"bian_ratio": self.rhythm.get_bian_ratio()
}
}
self.observations.append(observation)
# 根据数据生成反馈
feedback = self._generate_feedback(source, data)
self.feedback[source] = feedback
self._save_state()
return {
"status": "success",
"observation_id": self.total_observations,
"feedback": feedback
}
def _generate_feedback(self, source: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""生成反馈建议"""
novelty = self.dao.get_novelty(6)
feedback = {"suggestions": [], "score": 0.5}
if source == "识骨兽":
total_blocks = data.get("total_blocks", 0)
if total_blocks < 3:
feedback["suggestions"].append("结构太简单,尝试识别更细粒度的结构")
feedback["score"] = 0.3
elif total_blocks > 50:
feedback["suggestions"].append("结构过于细碎,尝试合并相近类型")
feedback["score"] = 0.7
else:
feedback["suggestions"].append("结构抽取正常")
feedback["score"] = 0.8 + 0.2 * novelty
elif source == "连骨匠":
sequence_len = len(data.get("sequence", []))
if sequence_len < 3:
feedback["suggestions"].append("组合太短,尝试增加骨架数量")
feedback["score"] = 0.4
elif sequence_len > 10:
feedback["suggestions"].append("组合过长,尝试精简骨架")
feedback["score"] = 0.6
else:
feedback["suggestions"].append("组合长度适中")
feedback["score"] = 0.7 + 0.3 * novelty
elif source == "藏骨师":
unique = data.get("unique_patterns", 0)
total = data.get("total_documents", 0)
ratio = unique / max(1, total)
if ratio > 0.8:
feedback["suggestions"].append("结构多样性强,记忆库健康")
feedback["score"] = 0.9
elif ratio > 0.3:
feedback["suggestions"].append("结构有一定重复,需要更多样本文档")
feedback["score"] = 0.6
else:
feedback["suggestions"].append("结构高度重复,考虑调整抽取粒度")
feedback["score"] = 0.3
elif source == "填词娘":
doc_len = data.get("document_length", 0)
if doc_len < 50:
feedback["suggestions"].append("生成的文档太短,填充更多内容")
feedback["score"] = 0.3
elif doc_len > 500:
feedback["suggestions"].append("文档长度合理")
feedback["score"] = 0.8
else:
feedback["suggestions"].append(f"文档长度 {doc_len},适中")
feedback["score"] = 0.7
else:
feedback["suggestions"].append(f"观察到未知来源: {source},记录中")
feedback["score"] = 0.5
# 添加综合韵律评分
sheng_ratio = self.rhythm.get_sheng_ratio()
bian_ratio = self.rhythm.get_bian_ratio()
rhythm_score = 0.5 + 0.5 * (1 - abs(sheng_ratio - bian_ratio))
feedback["rhythm_score"] = rhythm_score
feedback["overall_score"] = (feedback["score"] + rhythm_score) / 2
return feedback
def get_summary(self) -> Dict[str, Any]:
"""获取整体观察总结"""
return {
"total_observations": self.total_observations,
"latest": self.observations[-5:] if self.observations else [],
"feedback": self.feedback,
"rhythm_state": {
"sheng_ratio": self.rhythm.get_sheng_ratio(),
"bian_ratio": self.rhythm.get_bian_ratio()
},
"dao_pointer": self.dao.get_pointer()
}
def get_info(self) -> Dict[str, Any]:
return {
"name": "观潮者(金)",
"agent_id": self.agent_id,
"total_observations": self.total_observations,
"sheng": self.hetu.get_full_state()["sheng"],
"cheng": self.hetu.get_full_state()["cheng"],
"dao_pointer": self.dao.get_pointer()
}
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="观潮者 - 全局监工")
parser.add_argument("--observe", "-o", nargs=2, metavar=("SOURCE", "DATA"), help="观察事件")
parser.add_argument("--summary", "-s", action="store_true", help="显示总结")
parser.add_argument("--info", "-i", action="store_true", help="显示智能体信息")
args = parser.parse_args()
agent = GuanChaozhe(state_dir="./memory")
if args.info:
print(json.dumps(agent.get_info(), ensure_ascii=False, indent=2))
elif args.summary:
print(json.dumps(agent.get_summary(), ensure_ascii=False, indent=2))
elif args.observe:
source, data_str = args.observe
try:
data = json.loads(data_str)
except:
data = {"raw": data_str}
result = agent.observe(source, data)
print(json.dumps(result, ensure_ascii=False, indent=2))
else:
print("用法: python agent_guanchaozhe.py --observe <来源> <数据>")
print(" python agent_guanchaozhe.py --summary")
print(" python agent_guanchaozhe.py --info")
```
## 🚀 每个智能体都可以独立运行
### 示例1:单独运行识骨兽
```bash
python agents/agent_shigushou.py --file ./learning_material/sample.txt
python agents/agent_shigushou.py --dir ./my_docs
python agents/agent_shigushou.py --info
```
### 示例2:单独运行藏骨师
```bash
python agents/agent_canggushi.py --store "work_number:G4𝄐|metadata_label 4>|break:‖|end:‖‖"
python agents/agent_canggushi.py --stats
python agents/agent_canggushi.py --info
```
### 示例3:单独运行连骨匠
```bash
python agents/agent_lianbujiang.py --compose 5
python agents/agent_lianbujiang.py --target "story" --compose 8
python agents/agent_lianbujiang.py --info
```
### 示例4:单独运行填词娘
```bash
python agents/agent_tianci.py --demo
python agents/agent_tianci.py --sequence ./sequence.json
python agents/agent_tianci.py --info
```
### 示例5:单独运行观潮者
```bash
python agents/agent_guanchaozhe.py --observe "识骨兽" '{"total_blocks": 15}'
python agents/agent_guanchaozhe.py --summary
python agents/agent_guanchaozhe.py --info
```
## 🔄 智能体之间的协作(可选)
每个智能体都可以独立运行,但也可以通过标准JSON格式相互通信:
```
识骨兽 → 输出结构签名 → 藏骨师 存储
↓
记忆库
↓
连骨匠 → 组合骨架序列 → 填词娘 → 生成文档
↓
观潮者 → 评分反馈
```
每个智能体的输入输出都是标准JSON,可以轻松串联成管道。但**每个智能体都可以单独使用,不依赖其他智能体**。
---
**5个智能体已经写完。** 每个都内置了河图洛书的核心机制(π引擎、生数/成数、韵律控制),都可以独立运行、独立进化、独立保存状态。
你现在可以逐个测试它们了。需要我解释某个智能体的具体逻辑吗?🎯 |