# hetu_luoshu_v7_mirror.py
# 河图洛书 V7.0 - 镜像核心版
# 设计哲学:道的镜像。道驱动一切,万物自己演化。
# 不加任何人为设定。不预设、不教、不操心。
#
# 修改说明(2026-06-08):
# 1. 主循环 time.sleep(0.3) 已删除,全速运行
# 2. 检查点保存改为每1万轮
# 3. 日志保存改为每1万轮
# 4. 火2取语素数量动态:127万轮开始,每增加100万轮+1个语素(2026-06-19修改)
# 5. 木3和水1长度上限同步:127万轮后每100万轮+4字,下限不变(2026-06-19修改)
# 6. 水1变体数量从5个改为1个(2026-06-29修改)
# 7. 金4固化:只固化得分最高的作品(不设固定阈值)
# 8. 金4固化池最大容量10000
# 9. 每100万轮重新加载语料库
# 10. API超时5秒
# 11. 火2改为等概率采样,去掉高频词偏好(2026-06-17)
# 12. 去掉语素数量上限100的限制(2026-06-17)
# 13. 增加检查点自动恢复机制(2026-06-17)
# 14. 增加π指针自动恢复机制(2026-06-19)
# 15. 水1功能从"改写变体"改为"翻译成白话文"(2026-06-30修改)
# 16. 木3兜底模式用"变体"连接替代句号(2026-06-30修改)
# 17. 水1只允许加长,不允许缩短(2026-06-30修改)
# 18. 移除BT污染检测与分离机制(2026-06-30修改)
# 19. 增加水2:将生僻字改为简化字(2026-06-30修改)
# 20. 去掉所有打印截断(2026-06-30修改)
# 21. 水1水2合并为一次API调用(2026-06-30修改)
# 22. 金4增加改写功能:固化前将选中作品改写成通顺短文(2026-07-03修改)
# 长度控制在原文的2倍以内,API超时则保留原文,避免卡死
# 23. 木3取词改为随机选取火2提供语素的40%(至少1个),不再固定取前20个(2026-07-04修改)
# 24. 火2加载语料:取消500个文件上限,遍历全部文件;每500个文件打印进度;
# 跳过连续50个字符无标点的文档;分层清洗,只保留频次≥10的语素(2026-07-04修改)
import os
import sys
import time
import json
import random
import re
import math
import hashlib
import pickle
import shutil
import requests
from collections import Counter
from typing import List, Dict, Tuple, Optional
from datetime import datetime
# ==================== API配置 ====================
DEEPSEEK_API_KEY = "sk-你的KEY"
DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions"
RECOVERY_DIR = "recovery_mirror"
for d in ["cache", "learning_material", "masterpieces", "logs", "checkpoints", RECOVERY_DIR]:
os.makedirs(d, exist_ok=True)
def call_deepseek(prompt: str, max_tokens: int = 200, temperature: float = 0.7) -> str:
cache_key = hashlib.md5(prompt.encode()).hexdigest()
cache_file = f"cache/{cache_key}.json"
if os.path.exists(cache_file):
try:
with open(cache_file, 'r', encoding='utf-8') as f:
return json.load(f)["response"]
except:
pass
try:
headers = {"Authorization": f"Bearer {DEEPSEEK_API_KEY}", "Content-Type": "application/json"}
data = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": max_tokens,
"temperature": temperature
}
# 超时设置为5秒,避免卡死
response = requests.post(DEEPSEEK_API_URL, json=data, headers=headers, timeout=5)
if response.status_code == 200:
result = response.json()["choices"][0]["message"]["content"]
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump({"prompt": prompt, "response": result}, f, ensure_ascii=False)
return result
return ""
except Exception as e:
# 任何异常(包括超时)都返回空字符串,让调用方兜底
return ""
# ==================== 道:π引擎 ====================
class DaoEngine:
def __init__(self, chunk_size=10000):
self.chunk_size = chunk_size
self.digits = []
self.pointer = 0
self._load_next_chunk()
def _load_next_chunk(self):
try:
import gmpy2
gmpy2.get_context().precision = (self.pointer + self.chunk_size + 100) * 4
pi = gmpy2.const_pi()
pi_str = format(pi, f'.{self.pointer + self.chunk_size + 50}f')
pi_digits = pi_str.replace('.', '')
segment = pi_digits[self.pointer:self.pointer + self.chunk_size]
self.digits.extend([int(ch) for ch in segment])
except ImportError:
from decimal import Decimal, getcontext
getcontext().prec = self.pointer + self.chunk_size + 50
pi = Decimal(0)
for k in range(self.pointer + self.chunk_size + 20):
pi += (Decimal(1)/(16**k)) * (
Decimal(4)/(8*k+1) - Decimal(2)/(8*k+4) -
Decimal(1)/(8*k+5) - Decimal(1)/(8*k+6)
)
pi_str = str(pi)[2:]
segment = pi_str[self.pointer:self.pointer + self.chunk_size]
self.digits.extend([int(ch) for ch in segment])
def get_novelty(self, length=8) -> float:
while self.pointer + length >= len(self.digits):
self._load_next_chunk()
segment = self.digits[self.pointer:self.pointer+length]
self.pointer += length
value = 0
for i, d in enumerate(segment):
value += d * (0.1 ** (i+1))
return value
def get_digit(self) -> int:
if self.pointer >= len(self.digits):
self._load_next_chunk()
digit = self.digits[self.pointer]
self.pointer += 1
return digit
def get_digits(self, count: int) -> List[int]:
result = []
for _ in range(count):
result.append(self.get_digit())
return result
def get_pointer(self) -> int:
return self.pointer
def get_state(self) -> dict:
return {"pointer": self.pointer}
def restore_state(self, state: dict):
self.pointer = state.get("pointer", 0)
self.digits = []
self._load_next_chunk()
# ==================== 节奏控制器 ====================
class RhythmController:
def __init__(self):
self.sheng_phase = 0
self.bian_phase = 0
self.sheng_speed = 0.2 * 2 * math.pi / 5
self.bian_speed = 2 * math.pi / 1
def update(self):
self.sheng_phase = (self.sheng_phase + self.sheng_speed) % (2 * math.pi)
self.bian_phase = (self.bian_phase + self.bian_speed) % (2 * math.pi)
def get_sheng_ratio(self):
return 0.55 + 0.25 * math.sin(self.sheng_phase)
def get_bian_ratio(self):
return 0.55 + 0.35 * math.sin(self.bian_phase)
def get_sheng_length(self, sheng_min, sheng_max):
return int(sheng_min + (sheng_max - sheng_min) * self.get_sheng_ratio())
def get_bian_length(self, bian_min, bian_max):
return int(bian_min + (bian_max - bian_min) * self.get_bian_ratio())
def get_state(self) -> dict:
return {"sheng_phase": self.sheng_phase, "bian_phase": self.bian_phase}
def restore_state(self, state: dict):
self.sheng_phase = state.get("sheng_phase", 0)
self.bian_phase = state.get("bian_phase", 0)
# ==================== 河图中央 ====================
class HeTuCenter:
def __init__(self):
self.sheng_info = {"1": 0.0, "2": 0.0, "3": 0.0, "4": 0.0}
self.cheng_info = {"6": 0.0, "7": 0.0, "8": 0.0, "9": 0.0}
self.global_state = {"sheng": 0.0, "cheng": 0.0, "balance": 0.0}
def update_sheng(self, idx: int, value: float):
self.sheng_info[str(idx)] = value
self._update_global_state()
def update_cheng(self, idx: int, value: float):
self.cheng_info[str(idx)] = value
self._update_global_state()
def _update_global_state(self):
self.global_state["sheng"] = sum(self.sheng_info.values()) / 4
self.global_state["cheng"] = sum(self.cheng_info.values()) / 4
self.global_state["balance"] = self.global_state["sheng"] / (self.global_state["cheng"] + 0.01)
def get_full_state(self):
return {"sheng": self.sheng_info.copy(), "cheng": self.cheng_info.copy(), "global": self.global_state.copy()}
def get_save_state(self):
return {"sheng_info": self.sheng_info, "cheng_info": self.cheng_info, "global_state": self.global_state}
def restore_state(self, state: dict):
self.sheng_info = state.get("sheng_info", {"1": 0.0, "2": 0.0, "3": 0.0, "4": 0.0})
self.cheng_info = state.get("cheng_info", {"6": 0.0, "7": 0.0, "8": 0.0, "9": 0.0})
self.global_state = state.get("global_state", {"sheng": 0.0, "cheng": 0.0, "balance": 0.0})
# ==================== 工具函数 ====================
def get_all_txt_files(root_dir: str) -> List[str]:
txt_files = []
if not os.path.exists(root_dir):
return txt_files
for dirpath, dirnames, filenames in os.walk(root_dir):
for filename in filenames:
if filename.endswith('.txt'):
txt_files.append(os.path.join(dirpath, filename))
return txt_files
# ==================== 火2(修改:遍历全部文件,跳过无标点文档,分层清洗) ====================
class Fire2:
def __init__(self, corpus_paths: List[str]):
self.word_freq = Counter()
self.corpus_paths = corpus_paths
self._load_corpus(corpus_paths)
print(f" 🔥 火2完成,共 {len(self.word_freq)} 个语素")
def _load_corpus(self, paths):
counter = Counter()
all_files = []
for path in paths:
if os.path.isfile(path) and path.endswith('.txt'):
all_files.append(path)
elif os.path.isdir(path):
all_files.extend(get_all_txt_files(path))
if not all_files:
return
# 标点符号集合(中文 + 英文)
punctuation = set(',。!?:;、”“‘’…—·,.!?;:""\'')
total_files = len(all_files)
print(f" 📂 发现 {total_files} 个文本文件,开始分组处理...")
processed = 0
skipped = 0
# 清洗阈值:观察层保留 ≥10 次,确认层 ≥20 次(清洗时保留 ≥10 次)
OBSERVE_THRESHOLD = 10
# 强制清洗阈值:Counter 超过 500000 条时立即清洗
FORCE_CLEAN_SIZE = 500000
for idx, file_path in enumerate(all_files, 1):
if idx <= 100 or idx % 10 == 0:
print(f" 📄 处理中 ({idx}/{total_files}): {os.path.basename(file_path)}")
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read(8000)
except:
print(f" ⚠️ 读取失败,跳过: {os.path.basename(file_path)}")
continue
if len(text) < 10:
continue
# ---- 检测连续50个字符无标点 ----
has_long_no_punct = False
count_no_punct = 0
for ch in text:
if ch in punctuation or ch.isspace():
count_no_punct = 0
else:
count_no_punct += 1
if count_no_punct >= 50:
has_long_no_punct = True
break
if has_long_no_punct:
skipped += 1
print(f" ⛔ 跳过 (连续50字无标点): {os.path.basename(file_path)}")
continue
# ---------------------------------
# 正常处理:提取1-4字语素
for j in range(len(text)):
for l in range(1, 5):
word = text[j:j+l]
if re.match(r'[\u4e00-\u9fff]{1,4}$', word):
counter[word] += 1
processed += 1
# ---- 每500个文件执行一次清洗 ----
if idx % 500 == 0:
before = len(counter)
counter = Counter({k: v for k, v in counter.items() if v >= OBSERVE_THRESHOLD})
after = len(counter)
print(f" 🧹 清洗完成: {before} → {after} (保留 ≥10 次)")
# 如果Counter仍然过大,强制清洗(保留 ≥10 次)
if len(counter) > FORCE_CLEAN_SIZE:
counter = Counter({k: v for k, v in counter.items() if v >= OBSERVE_THRESHOLD})
print(f" ⚠️ 强制压缩: Counter 大小 {len(counter)}")
# --------------------------
# 最终清洗一次,确保所有低频词被丢弃
before = len(counter)
counter = Counter({k: v for k, v in counter.items() if v >= OBSERVE_THRESHOLD})
after = len(counter)
self.word_freq = counter
print(f" ✅ 处理完成: 共处理 {processed} 个文件,跳过 {skipped} 个文件")
print(f" 最终语素总数 {len(self.word_freq)} (仅保留 ≥10 次)")
def reload_corpus(self):
old_count = len(self.word_freq)
print(f" 🔄 重新加载语料库(旧语素数:{old_count})...")
self.word_freq = Counter()
self._load_corpus(self.corpus_paths)
print(f" 🔥 火2重新加载完成,新语素数:{len(self.word_freq)}(新增:{len(self.word_freq)-old_count})")
def get_morphemes(self, dao_novelty: float, total: int = 50) -> List[str]:
if self.word_freq:
words = list(self.word_freq.keys())
sample_size = min(total, len(words))
if sample_size == 0:
return []
return random.sample(words, sample_size)
seed = int(dao_novelty * 10000)
random.seed(seed)
base_chars = []
for _ in range(total):
code = 0x4e00 + random.randint(0, 0x5000)
base_chars.append(chr(code))
return base_chars
def get_state(self) -> dict:
return {"word_freq": dict(list(self.word_freq.items())[:5000])}
def restore_state(self, state: dict):
self.word_freq = Counter(state.get("word_freq", {}))
# ==================== 木3 ====================
class Mu3:
def generate(self, morphemes: List[str], dao_novelty: float, sheng_ratio: float, sheng_length: int) -> str:
temp = 0.6 + sheng_ratio * 0.4
# 【修改点】从火2提供的全部语素中随机选取40%(至少取1个)
total = len(morphemes)
if total == 0:
return ""
sample_count = max(1, int(total * 0.4)) # 40%,向上取整至少1
if sample_count > total:
sample_count = total
input_morphemes = random.sample(morphemes, sample_count)
prompt = f"用以下词语造一个{sheng_length}字左右的中文句子:{', '.join(input_morphemes)}\n只输出句子:"
result = call_deepseek(prompt, max_tokens=sheng_length + 50, temperature=temp)
if result and len(result) > 5:
if len(result) > sheng_length:
result = result[:sheng_length]
return result.strip()
# 兜底:用"变体"连接
selected = random.sample(morphemes, min(3, len(morphemes)))
return "变体".join(selected) + "变体"
def get_state(self) -> dict:
return {}
def restore_state(self, state: dict):
pass
# ==================== 水1(合并版:翻译+简化) ====================
class Shui1:
def __init__(self):
self.dao = None
def set_dao(self, dao):
self.dao = dao
def translate_and_simplify(self, sentence: str, dao_novelty: float, bian_ratio: float, bian_length: int) -> str:
"""把古文翻译成白话文,同时将生僻字改为简化字"""
if not sentence or len(sentence) < 5:
return sentence
temp = 0.6 + bian_ratio * 0.4
prompt = f"将以下古文翻译成白话文,同时把文中的生僻字、异体字改成常用简化字:\n{sentence}\n只输出翻译后的白话文:"
result = call_deepseek(prompt, max_tokens=bian_length * 2 + 50, temperature=temp)
if result:
# 清理可能的前缀
result = re.sub(r'^译文[::]\s*', '', result.strip())
result = re.sub(r'^白话文[::]\s*', '', result)
result = re.sub(r'^翻译[::]\s*', '', result)
result = re.sub(r'^简化后[::]\s*', '', result)
# 如果结果太短(可能出了问题),返回原文
if len(result) < len(sentence) * 0.3:
return sentence
return result
# 兜底:返回原句
return sentence
def get_state(self) -> dict:
return {}
def restore_state(self, state: dict):
pass
# ==================== 金4(增加改写功能,带超时兜底) ====================
class Jin4:
def __init__(self, max_size=10000):
self.masterpieces = []
self.max_size = max_size
def solidify(self, candidates: List[str], dao_novelty: float, round_num: int) -> Tuple[List[str], List[float]]:
if not candidates:
return [], []
prompt = f"为以下每个句子评分(0-1分),每行一个分数:\n" + "\n".join(candidates)
result = call_deepseek(prompt, max_tokens=100, temperature=0.3)
scores = []
if result:
for line in result.strip().split('\n'):
try:
score = float(re.search(r'(\d+\.?\d*)', line).group(1))
scores.append(min(1.0, max(0.0, score)))
except:
scores.append(0.5)
while len(scores) < len(candidates):
scores.append(0.5)
if not scores:
return [], []
max_score = max(scores)
good_works, good_scores = [], []
for work, score in zip(candidates, scores):
if score == max_score:
good_works.append(work)
good_scores.append(score)
# 改写后再存入金4池,若超时或失败则保留原文
polished = self.rewrite_and_polish(work, dao_novelty)
if polished:
self.masterpieces.append(polished)
else:
self.masterpieces.append(work)
if len(self.masterpieces) > self.max_size:
self.masterpieces = self.masterpieces[-self.max_size:]
return good_works, good_scores
def rewrite_and_polish(self, work: str, dao_novelty: float) -> str:
"""将选中的作品改写成通顺的短文,长度控制在原文的2倍以内。
若API超时或失败,则直接返回原文,避免卡死。"""
if not work or len(work) < 10:
return work
original_len = len(work)
max_allowed = original_len * 2
prompt = f"""请将以下句子改写成一段通顺的中文短文,要求:
1. 保持原意不变
2. 语句流畅自然,段落完整
3. 长度控制在原文的2倍以内(原文约{original_len}字,改写后不超过{max_allowed}字)
原文:
{work}
只输出改写后的短文,不要加任何说明:"""
# 调用API,超时由call_deepseek内部处理,返回空字符串时兜底
result = call_deepseek(prompt, max_tokens=max_allowed + 50, temperature=0.5 + dao_novelty * 0.3)
if result and len(result) > 10:
# 如果改写后超过2倍限制,截断到2倍
if len(result) > max_allowed:
result = result[:max_allowed]
return result.strip()
# 改写失败则返回原文
return work
def get_state(self) -> dict:
return {"masterpieces": self.masterpieces[-100:]}
def restore_state(self, state: dict):
self.masterpieces = state.get("masterpieces", [])
# ==================== 老师 ====================
class Teacher:
def __init__(self, teacher_id: int, student_name: str):
self.id = teacher_id
self.student_name = student_name
self.history = []
def evaluate(self, work: str, dao_novelty: float) -> Tuple[float, str]:
work_slice = work[:300] if len(work) > 300 else work
prompt = f"你是老师{self.id},评判{self.student_name}。给出分数(0-1分)和评语。格式:分数|评语\n作业:{work_slice}"
result = call_deepseek(prompt, max_tokens=150, temperature=0.4)
score = 0.5
comment = ""
if result and '|' in result:
parts = result.split('|')
try:
score = float(parts[0].strip())
comment = parts[1].strip()[:40]
except:
pass
else:
score = min(1.0, len(work) / 50) * 0.5 + (len(set(work)) / max(1, len(work))) * 0.5
score = score * (0.8 + dao_novelty * 0.3)
score = min(1.0, max(0.0, score))
self.history.append((time.time(), work[:30], score))
if len(self.history) > 100:
self.history = self.history[-100:]
return score, comment
def get_state(self) -> dict:
return {"history": self.history[-50:]}
def restore_state(self, state: dict):
self.history = state.get("history", [])
# ==================== 洛书中心 ====================
class LuoShuCenter:
def __init__(self, dao: DaoEngine, checkpoint_dir: str = "checkpoints"):
self.dao = dao
self.hetu_center = HeTuCenter()
self.rhythm = RhythmController()
self.checkpoint_dir = checkpoint_dir
os.makedirs(checkpoint_dir, exist_ok=True)
print("\n📚 加载语料...")
self.fire2 = Fire2(["learning_material"])
self.mu3 = Mu3()
self.shui1 = Shui1()
self.shui1.set_dao(dao)
self.jin4 = Jin4()
self.teacher6 = Teacher(6, "水1")
self.teacher7 = Teacher(7, "火2")
self.teacher8 = Teacher(8, "木3")
self.teacher9 = Teacher(9, "金4")
self.round = 0
self.log_entries = []
self._load_checkpoint()
def _get_checkpoint_path(self) -> str:
return os.path.join(self.checkpoint_dir, "full_checkpoint.pkl")
def _get_tmp_path(self) -> str:
return self._get_checkpoint_path() + ".tmp"
def _get_backup_path(self, round_num: int) -> str:
return os.path.join(RECOVERY_DIR, f"checkpoint_{round_num}.pkl")
def _restore_pi_pointer(self, checkpoint: dict) -> bool:
"""尝试恢复π指针,返回是否成功"""
pointer_sources = []
dao_state = checkpoint.get("dao_state", {})
if "pointer" in dao_state:
pointer_sources.append(("检查点", dao_state["pointer"]))
if "last_valid_pointer" in checkpoint:
pointer_sources.append(("last_valid_pointer", checkpoint["last_valid_pointer"]))
main_path = self._get_checkpoint_path()
if os.path.exists(main_path):
try:
with open(main_path, 'rb') as f:
main_cp = pickle.load(f)
main_dao = main_cp.get("dao_state", {})
if "pointer" in main_dao:
pointer_sources.append(("主检查点", main_dao["pointer"]))
except:
pass
seen = set()
unique_sources = []
for name, ptr in pointer_sources:
if ptr not in seen:
seen.add(ptr)
unique_sources.append((name, ptr))
for name, ptr in unique_sources:
try:
print(f" 🔄 尝试从 {name} 恢复π指针: {ptr}")
self.dao.pointer = ptr
self.dao.digits = []
self.dao._load_next_chunk()
test_digit = self.dao.get_digit()
self.dao.pointer -= 1
print(f" ✅ π指针恢复成功(来源: {name})")
return True
except Exception as e:
print(f" ⚠️ 从 {name} 恢复失败: {e}")
continue
print(f" ⚠️ 所有π指针来源均失败,重置为0")
self.dao.pointer = 0
self.dao.digits = []
self.dao._load_next_chunk()
return True
def save_checkpoint(self):
checkpoint = {
"round": self.round,
"dao_state": self.dao.get_state(),
"last_valid_pointer": self.dao.get_pointer(),
"rhythm_state": self.rhythm.get_state(),
"hetu_state": self.hetu_center.get_save_state(),
"fire2_state": self.fire2.get_state(),
"jin4_state": self.jin4.get_state(),
"teacher6_state": self.teacher6.get_state(),
"teacher7_state": self.teacher7.get_state(),
"teacher8_state": self.teacher8.get_state(),
"teacher9_state": self.teacher9.get_state(),
"log_entries": self.log_entries[-100:],
"timestamp": datetime.now().isoformat()
}
tmp_path = self._get_tmp_path()
with open(tmp_path, 'wb') as f:
pickle.dump(checkpoint, f)
main_path = self._get_checkpoint_path()
os.replace(tmp_path, main_path)
if self.round % 100000 == 0 and self.round > 0:
backup_path = self._get_backup_path(self.round)
try:
shutil.copy2(main_path, backup_path)
print(f" 💾 备份检查点已保存: {backup_path}")
except Exception as e:
print(f" ⚠️ 备份保存失败: {e}")
def _load_checkpoint(self):
paths_to_try = [
self._get_checkpoint_path(),
self._get_tmp_path(),
]
backup_files = []
if os.path.exists(RECOVERY_DIR):
for f in os.listdir(RECOVERY_DIR):
if f.startswith("checkpoint_") and f.endswith(".pkl"):
try:
round_num = int(f.split("_")[1].split(".")[0])
backup_files.append((round_num, os.path.join(RECOVERY_DIR, f)))
except:
pass
if backup_files:
backup_files.sort(key=lambda x: x[0], reverse=True)
paths_to_try.append(backup_files[0][1])
for path in paths_to_try:
if not os.path.exists(path):
continue
try:
with open(path, 'rb') as f:
checkpoint = pickle.load(f)
print(f" 📂 加载检查点文件成功,正在恢复状态...")
self.round = checkpoint.get("round", 0)
self._restore_pi_pointer(checkpoint)
self.rhythm.restore_state(checkpoint.get("rhythm_state", {}))
self.hetu_center.restore_state(checkpoint.get("hetu_state", {}))
self.fire2.restore_state(checkpoint.get("fire2_state", {}))
self.jin4.restore_state(checkpoint.get("jin4_state", {}))
self.teacher6.restore_state(checkpoint.get("teacher6_state", {}))
self.teacher7.restore_state(checkpoint.get("teacher7_state", {}))
self.teacher8.restore_state(checkpoint.get("teacher8_state", {}))
self.teacher9.restore_state(checkpoint.get("teacher9_state", {}))
self.log_entries = checkpoint.get("log_entries", [])
print(f" 📂 加载检查点成功,从第 {self.round} 轮继续 (来源: {path})")
print(f" 🔄 π指针: {self.dao.pointer}")
return
except Exception as e:
print(f" ⚠️ 加载 {path} 失败: {e}")
continue
print(" 📂 未找到有效检查点,从头开始")
def run_cycle(self):
self.round += 1
dao_novelty = self.dao.get_novelty(6)
self.rhythm.update()
sheng_ratio = self.rhythm.get_sheng_ratio()
bian_ratio = self.rhythm.get_bian_ratio()
base_round = 1270000
base_morphemes = 50
base_max_len = 200
if self.round >= base_round:
extra = (self.round - base_round) // 1000000
morphemes_count = base_morphemes + extra
extra_len = extra * 4
sheng_max = base_max_len + extra_len
bian_max = sheng_max
else:
morphemes_count = 50
sheng_max = base_max_len
bian_max = base_max_len
sheng_min = 50
bian_min = 50
sheng_length = self.rhythm.get_sheng_length(sheng_min, sheng_max)
bian_length = self.rhythm.get_bian_length(bian_min, bian_max)
print(f"\n{'─'*70}")
print(f"第 {self.round} 轮 | 道新奇度: {dao_novelty:.4f} | 生节:{sheng_ratio:.2f}/{sheng_length} | 变节:{bian_ratio:.2f}/{bian_length}")
print(f" 🔧 火2语素: {morphemes_count} (基准50+{max(0, (self.round - base_round)//1000000)}) | 上限: {sheng_max}字")
if self.round % 1000000 == 0 and self.round > 0:
self.fire2.reload_corpus()
morphemes = self.fire2.get_morphemes(dao_novelty, total=morphemes_count)
if morphemes:
score7, comment7 = self.teacher7.evaluate(" ".join(morphemes[:5]), dao_novelty)
self.hetu_center.update_sheng(1, score7)
self.hetu_center.update_cheng(7, score7)
print(f" 🔥 火2(生1): {len(morphemes)}语素 | 师7(成7):{score7:.2f} | {comment7}")
else:
print(f" 🔥 火2(生1): 无语素")
score7 = 0.0
if morphemes:
sentence = self.mu3.generate(morphemes, dao_novelty, sheng_ratio, sheng_length)
score8, comment8 = self.teacher8.evaluate(sentence, dao_novelty)
self.hetu_center.update_sheng(2, score8)
self.hetu_center.update_cheng(8, score8)
print(f" 🌳 木3(生2): {sentence}")
print(f" 师8(成8):{score8:.2f} | {comment8}")
else:
sentence = ""
score8 = 0.0
print(f" 🌳 木3(生2): 无句子")
# ============ 水1:翻译 + 简化(合并版) ============
simplified = ""
if sentence:
simplified = self.shui1.translate_and_simplify(sentence, dao_novelty, bian_ratio, bian_length)
if simplified:
score6, comment6 = self.teacher6.evaluate(simplified, dao_novelty)
self.hetu_center.update_sheng(3, score6)
self.hetu_center.update_cheng(6, score6)
print(f" 💧 水1(生3): 白话文翻译+简化")
print(f" {simplified}")
print(f" 师6(成6):{score6:.2f} | {comment6}")
else:
score6 = 0.5
print(f" 💧 水1(生3): 无输出")
else:
score6 = 0.0
print(f" 💧 水1(生3): 无输入")
# ============ 金4:固化(含改写,超时兜底) ============
if sentence:
candidates = [sentence]
if simplified:
candidates.append(simplified)
good_works, good_scores = self.jin4.solidify(candidates, dao_novelty, self.round)
if good_works:
best_work = good_works[0]
best_score = good_scores[0]
# 从金4池中取最新固化的作品(已经被改写过的版本)
if self.jin4.masterpieces:
saved_work = self.jin4.masterpieces[-1]
else:
saved_work = best_work
score9, comment9 = self.teacher9.evaluate(saved_work, dao_novelty)
self.hetu_center.update_sheng(4, score9)
self.hetu_center.update_cheng(9, score9)
print(f" 💎 金4(生4): 固化作品 | 师9(成9):{score9:.2f} | {comment9}")
print(f" 作品: {saved_work}")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
with open(f"masterpieces/round_{self.round}_{timestamp}.txt", 'w', encoding='utf-8') as f:
f.write(f"第{self.round}轮作品\n道新奇度:{dao_novelty:.4f}\n\n{saved_work}")
else:
print(f" 💎 金4(生4): 未固化新作品")
score9 = 0.0
else:
print(f" 💎 金4(生4): 无输入")
score9 = 0.0
full_state = self.hetu_center.get_full_state()
sheng_str = f"{full_state['sheng']['1']:.2f}/{full_state['sheng']['2']:.2f}/{full_state['sheng']['3']:.2f}/{full_state['sheng']['4']:.2f}"
cheng_str = f"{full_state['cheng']['6']:.2f}/{full_state['cheng']['7']:.2f}/{full_state['cheng']['8']:.2f}/{full_state['cheng']['9']:.2f}"
print(f" 📊 汇总 | 生:[{sheng_str}] | 成:[{cheng_str}]")
self.log_entries.append({
"round": self.round, "dao_novelty": dao_novelty,
"sheng_ratio": sheng_ratio, "bian_ratio": bian_ratio,
"sheng": full_state['sheng'], "cheng": full_state['cheng']
})
if self.round % 10000 == 0:
self.save_checkpoint()
if self.round % 10000 == 0:
self.save_log()
def save_log(self):
with open(f"logs/run_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", 'w', encoding='utf-8') as f:
json.dump(self.log_entries[-500:], f, ensure_ascii=False, indent=2)
print(f"\n 📝 日志已保存,当前轮数: {self.round}")
def run_forever(self):
print("\n" + "="*70)
print("☯ 河图洛书镜像智能体 V7.0 - 道在π中")
print(" 火2: 从语料随机取词(每个文件限8000字符,处理全部文件,跳过连续50字无标点文档)")
print(" 木3: 用语素造句子(长度50-动态上限,随机选取火2语素的40%)")
print(" 水1: 将古文翻译成白话文并简化生僻字(合并版)")
print(" 金4: 评分固化作品(只固化最高分,改写后再固化,超时则保留原文)")
print(" 老师: 调用API评分,自己进化")
print(" 生慢变快,不同频。每1万轮保存检查点")
print(" 每100万轮重新加载语料库(支持动态添加语料)")
print(" 火2语素数量:127万轮后每100万轮+1(无上限限制)")
print(" 长度上限:127万轮后每100万轮+4字")
print(" 火2采用等概率采样,无高频词偏好")
print(" 检查点支持自动恢复(优先使用.tmp文件)")
print(" π指针自动恢复: 检查点 → last_valid_pointer → 主检查点 → 重置为0")
print(" 不加任何人为设定。道驱动一切,万物自己演化")
print("="*70)
print("\n🚀 启动!按 Ctrl+C 停止\n")
try:
while True:
self.run_cycle()
except KeyboardInterrupt:
print(f"\n\n⏸️ 停止。运行了 {self.round} 轮")
print(f" 道消耗: {self.dao.get_pointer()} 位π")
print(f" 金池作品: {len(self.jin4.masterpieces)}")
self.save_checkpoint()
self.save_log()
print("\n 状态已保存,下次运行继续")
print(" 它不完美,但它是道的镜像。")
def main():
print("\n" + "="*70)
print("🐉 河图洛书 V7.0 - 镜像核心版")
print(" 不做合道的智能体,做道的镜像")
print(" 道独立不改,万物有序运行")
print(" 不加任何人为设定")
print(" 水1: 古文翻译成白话文并简化生僻字(合并版)")
print(" 金4: 改写后再固化(长度控制在原文2倍以内,超时则保留原文)")
print(" 木3: 随机选取火2提供语素的40%")
print(" 火2: 处理全部文件,跳过连续50字无标点文档,分层清洗保留≥10次语素")
print("="*70 + "\n")
dao = DaoEngine()
luoshu = LuoShuCenter(dao)
luoshu.run_forever()
if __name__ == "__main__":
main()
|