两个最新版的老登与狍子。主要是解决了意外关闭后的快速恢复的问题。
老登的代码:
# hetu_luoshu_v7_mirror.py
# 河图洛书 V7.0 - 镜像核心版
# 设计哲学:道的镜像。道驱动一切,万物自己演化。
# 不加任何人为设定。不预设、不教、不操心。
#
# 修改说明(2026-06-08):
# 1. 主循环 time.sleep(0.3) 已删除,全速运行
# 2. 检查点保存改为每1万轮
# 3. 日志保存改为每1万轮
# 4. 火2取语素数量动态:127万轮开始,每增加100万轮+1个语素(2026-06-19修改)
# 5. 木3和水1长度上限同步:127万轮后每100万轮+4字,下限不变(2026-06-19修改)
# 6. 水1变体数量从5个改为2个
# 7. 金4固化:只固化得分最高的作品(不设固定阈值)
# 8. 金4固化池最大容量10000
# 9. 每100万轮重新加载语料库
# 10. API超时5秒
# 11. 火2改为等概率采样,去掉高频词偏好(2026-06-17)
# 12. 去掉语素数量上限100的限制(2026-06-17)
# 13. 增加检查点自动恢复机制(2026-06-17)
# 14. 增加π指针自动恢复机制(2026-06-19)
import os
import sys
import time
import json
import random
import re
import math
import hashlib
import pickle
import shutil
import requests
from collections import Counter
from typing import List, Dict, Tuple, Optional
from datetime import datetime
# ==================== API配置 ====================
DEEPSEEK_API_KEY = "sk-KEY"
DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions"
RECOVERY_DIR = "recovery_mirror"
for d in ["cache", "learning_material", "masterpieces", "logs", "checkpoints", RECOVERY_DIR]:
os.makedirs(d, exist_ok=True)
def call_deepseek(prompt: str, max_tokens: int = 200, temperature: float = 0.7) -> str:
cache_key = hashlib.md5(prompt.encode()).hexdigest()
cache_file = f"cache/{cache_key}.json"
if os.path.exists(cache_file):
try:
with open(cache_file, 'r', encoding='utf-8') as f:
return json.load(f)["response"]
except:
pass
try:
headers = {"Authorization": f"Bearer {DEEPSEEK_API_KEY}", "Content-Type": "application/json"}
data = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": max_tokens,
"temperature": temperature
}
response = requests.post(DEEPSEEK_API_URL, json=data, headers=headers, timeout=5)
if response.status_code == 200:
result = response.json()["choices"][0]["message"]["content"]
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump({"prompt": prompt, "response": result}, f, ensure_ascii=False)
return result
return ""
except Exception as e:
return ""
# ==================== 道:π引擎 ====================
class DaoEngine:
def __init__(self, chunk_size=10000):
self.chunk_size = chunk_size
self.digits = []
self.pointer = 0
self._load_next_chunk()
def _load_next_chunk(self):
try:
import gmpy2
gmpy2.get_context().precision = (self.pointer + self.chunk_size + 100) * 4
pi = gmpy2.const_pi()
pi_str = format(pi, f'.{self.pointer + self.chunk_size + 50}f')
pi_digits = pi_str.replace('.', '')
segment = pi_digits[self.pointer:self.pointer + self.chunk_size]
self.digits.extend([int(ch) for ch in segment])
except ImportError:
from decimal import Decimal, getcontext
getcontext().prec = self.pointer + self.chunk_size + 50
pi = Decimal(0)
for k in range(self.pointer + self.chunk_size + 20):
pi += (Decimal(1)/(16**k)) * (
Decimal(4)/(8*k+1) - Decimal(2)/(8*k+4) -
Decimal(1)/(8*k+5) - Decimal(1)/(8*k+6)
)
pi_str = str(pi)[2:]
segment = pi_str[self.pointer:self.pointer + self.chunk_size]
self.digits.extend([int(ch) for ch in segment])
def get_novelty(self, length=8) -> float:
while self.pointer + length >= len(self.digits):
self._load_next_chunk()
segment = self.digits[self.pointer:self.pointer+length]
self.pointer += length
value = 0
for i, d in enumerate(segment):
value += d * (0.1 ** (i+1))
return value
def get_digit(self) -> int:
if self.pointer >= len(self.digits):
self._load_next_chunk()
digit = self.digits[self.pointer]
self.pointer += 1
return digit
def get_digits(self, count: int) -> List[int]:
result = []
for _ in range(count):
result.append(self.get_digit())
return result
def get_pointer(self) -> int:
return self.pointer
def get_state(self) -> dict:
return {"pointer": self.pointer}
def restore_state(self, state: dict):
self.pointer = state.get("pointer", 0)
self.digits = []
self._load_next_chunk()
# ==================== 节奏控制器 ====================
class RhythmController:
def __init__(self):
self.sheng_phase = 0
self.bian_phase = 0
self.sheng_speed = 0.2 * 2 * math.pi / 5
self.bian_speed = 2 * math.pi / 1
def update(self):
self.sheng_phase = (self.sheng_phase + self.sheng_speed) % (2 * math.pi)
self.bian_phase = (self.bian_phase + self.bian_speed) % (2 * math.pi)
def get_sheng_ratio(self):
return 0.55 + 0.25 * math.sin(self.sheng_phase)
def get_bian_ratio(self):
return 0.55 + 0.35 * math.sin(self.bian_phase)
def get_sheng_length(self, sheng_min, sheng_max):
return int(sheng_min + (sheng_max - sheng_min) * self.get_sheng_ratio())
def get_bian_length(self, bian_min, bian_max):
return int(bian_min + (bian_max - bian_min) * self.get_bian_ratio())
def get_state(self) -> dict:
return {"sheng_phase": self.sheng_phase, "bian_phase": self.bian_phase}
def restore_state(self, state: dict):
self.sheng_phase = state.get("sheng_phase", 0)
self.bian_phase = state.get("bian_phase", 0)
# ==================== 河图中央 ====================
class HeTuCenter:
def __init__(self):
self.sheng_info = {"1": 0.0, "2": 0.0, "3": 0.0, "4": 0.0}
self.cheng_info = {"6": 0.0, "7": 0.0, "8": 0.0, "9": 0.0}
self.global_state = {"sheng": 0.0, "cheng": 0.0, "balance": 0.0}
def update_sheng(self, idx: int, value: float):
self.sheng_info[str(idx)] = value
self._update_global_state()
def update_cheng(self, idx: int, value: float):
self.cheng_info[str(idx)] = value
self._update_global_state()
def _update_global_state(self):
self.global_state["sheng"] = sum(self.sheng_info.values()) / 4
self.global_state["cheng"] = sum(self.cheng_info.values()) / 4
self.global_state["balance"] = self.global_state["sheng"] / (self.global_state["cheng"] + 0.01)
def get_full_state(self):
return {"sheng": self.sheng_info.copy(), "cheng": self.cheng_info.copy(), "global": self.global_state.copy()}
def get_save_state(self):
return {"sheng_info": self.sheng_info, "cheng_info": self.cheng_info, "global_state": self.global_state}
def restore_state(self, state: dict):
self.sheng_info = state.get("sheng_info", {"1": 0.0, "2": 0.0, "3": 0.0, "4": 0.0})
self.cheng_info = state.get("cheng_info", {"6": 0.0, "7": 0.0, "8": 0.0, "9": 0.0})
self.global_state = state.get("global_state", {"sheng": 0.0, "cheng": 0.0, "balance": 0.0})
# ==================== 工具函数 ====================
def get_all_txt_files(root_dir: str) -> List[str]:
txt_files = []
if not os.path.exists(root_dir):
return txt_files
for dirpath, dirnames, filenames in os.walk(root_dir):
for filename in filenames:
if filename.endswith('.txt'):
txt_files.append(os.path.join(dirpath, filename))
return txt_files
# ==================== 火2 ====================
class Fire2:
def __init__(self, corpus_paths: List[str]):
self.word_freq = Counter()
self.corpus_paths = corpus_paths
self._load_corpus(corpus_paths)
print(f" 🔥 火2完成,共 {len(self.word_freq)} 个语素")
def _load_corpus(self, paths):
counter = Counter()
all_files = []
for path in paths:
if os.path.isfile(path) and path.endswith('.txt'):
all_files.append(path)
elif os.path.isdir(path):
all_files.extend(get_all_txt_files(path))
if not all_files:
return
for file_path in all_files[:500]:
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read(8000)
for j in range(len(text)):
for l in range(1, 5):
word = text[j:j+l]
if re.match(r'[\u4e00-\u9fff]{1,4}$', word):
counter[word] += 1
except:
pass
self.word_freq = counter
def reload_corpus(self):
old_count = len(self.word_freq)
print(f" 🔄 重新加载语料库(旧语素数:{old_count})...")
self.word_freq = Counter()
self._load_corpus(self.corpus_paths)
print(f" 🔥 火2重新加载完成,新语素数:{len(self.word_freq)}(新增:{len(self.word_freq)-old_count})")
def get_morphemes(self, dao_novelty: float, total: int = 50) -> List[str]:
if self.word_freq:
words = list(self.word_freq.keys())
sample_size = min(total, len(words))
if sample_size == 0:
return []
return random.sample(words, sample_size)
seed = int(dao_novelty * 10000)
random.seed(seed)
base_chars = []
for _ in range(total):
code = 0x4e00 + random.randint(0, 0x5000)
base_chars.append(chr(code))
return base_chars
def get_state(self) -> dict:
return {"word_freq": dict(list(self.word_freq.items())[:5000])}
def restore_state(self, state: dict):
self.word_freq = Counter(state.get("word_freq", {}))
# ==================== 木3 ====================
class Mu3:
def generate(self, morphemes: List[str], dao_novelty: float, sheng_ratio: float, sheng_length: int) -> str:
temp = 0.6 + sheng_ratio * 0.4
input_morphemes = morphemes[:20] if len(morphemes) > 20 else morphemes
prompt = f"用以下词语造一个{sheng_length}字左右的中文句子:{', '.join(input_morphemes)}\n只输出句子:"
result = call_deepseek(prompt, max_tokens=sheng_length + 50, temperature=temp)
if result and len(result) > 5:
if len(result) > sheng_length:
result = result[:sheng_length]
return result.strip()
selected = random.sample(morphemes, min(3, len(morphemes)))
return "。".join(selected) + "。"
def get_state(self) -> dict:
return {}
def restore_state(self, state: dict):
pass
# ==================== 水1 ====================
class Shui1:
def __init__(self):
self.dao = None
def set_dao(self, dao):
self.dao = dao
def mutate(self, sentence: str, dao_novelty: float, bian_ratio: float, bian_length: int) -> List[str]:
temp = 0.6 + bian_ratio * 0.5
prompt = f"将「{sentence}」改写成2个不同的变体,每行一个:"
result = call_deepseek(prompt, max_tokens=bian_length * 2 + 50, temperature=temp)
if result:
variants = [v.strip() for v in result.strip().split('\n') if v.strip()]
trimmed = []
for v in variants:
if len(v) > bian_length:
v = v[:bian_length]
trimmed.append(v)
return trimmed[:2]
if self.dao:
digits = self.dao.get_digits(min(bian_length, 20))
dao_str = ''.join(str(d) for d in digits)
return [
sentence[:bian_length],
sentence[:bian_length//2] + dao_str[:bian_length//2]
][:2]
else:
return [
sentence[:bian_length],
sentence[:bian_length//2] + "……"
][:2]
def get_state(self) -> dict:
return {}
def restore_state(self, state: dict):
pass
# ==================== 金4 ====================
class Jin4:
def __init__(self, max_size=10000):
self.masterpieces = []
self.max_size = max_size
def solidify(self, candidates: List[str], dao_novelty: float) -> Tuple[List[str], List[float]]:
if not candidates:
return [], []
prompt = f"为以下每个句子评分(0-1分),每行一个分数:\n" + "\n".join(candidates)
result = call_deepseek(prompt, max_tokens=100, temperature=0.3)
scores = []
if result:
for line in result.strip().split('\n'):
try:
score = float(re.search(r'(\d+\.?\d*)', line).group(1))
scores.append(min(1.0, max(0.0, score)))
except:
scores.append(0.5)
while len(scores) < len(candidates):
scores.append(0.5)
if not scores:
return [], []
max_score = max(scores)
good_works, good_scores = [], []
for work, score in zip(candidates, scores):
if score == max_score:
good_works.append(work)
good_scores.append(score)
self.masterpieces.append(work)
if len(self.masterpieces) > self.max_size:
self.masterpieces = self.masterpieces[-self.max_size:]
return good_works, good_scores
def get_state(self) -> dict:
return {"masterpieces": self.masterpieces[-100:]}
def restore_state(self, state: dict):
self.masterpieces = state.get("masterpieces", [])
# ==================== 老师 ====================
class Teacher:
def __init__(self, teacher_id: int, student_name: str):
self.id = teacher_id
self.student_name = student_name
self.history = []
def evaluate(self, work: str, dao_novelty: float) -> Tuple[float, str]:
work_slice = work[:300] if len(work) > 300 else work
prompt = f"你是老师{self.id},评判{self.student_name}。给出分数(0-1分)和评语。格式:分数|评语\n作业:{work_slice}"
result = call_deepseek(prompt, max_tokens=150, temperature=0.4)
score = 0.5
comment = ""
if result and '|' in result:
parts = result.split('|')
try:
score = float(parts[0].strip())
comment = parts[1].strip()[:40]
except:
pass
else:
score = min(1.0, len(work) / 50) * 0.5 + (len(set(work)) / max(1, len(work))) * 0.5
score = score * (0.8 + dao_novelty * 0.3)
score = min(1.0, max(0.0, score))
self.history.append((time.time(), work[:30], score))
if len(self.history) > 100:
self.history = self.history[-100:]
return score, comment
def get_state(self) -> dict:
return {"history": self.history[-50:]}
def restore_state(self, state: dict):
self.history = state.get("history", [])
# ==================== 洛书中心 ====================
class LuoShuCenter:
def __init__(self, dao: DaoEngine, checkpoint_dir: str = "checkpoints"):
self.dao = dao
self.hetu_center = HeTuCenter()
self.rhythm = RhythmController()
self.checkpoint_dir = checkpoint_dir
os.makedirs(checkpoint_dir, exist_ok=True)
print("\n📚 加载语料...")
self.fire2 = Fire2(["learning_material"])
self.mu3 = Mu3()
self.shui1 = Shui1()
self.shui1.set_dao(dao)
self.jin4 = Jin4()
self.teacher7 = Teacher(7, "火2")
self.teacher8 = Teacher(8, "木3")
self.teacher6 = Teacher(6, "水1")
self.teacher9 = Teacher(9, "金4")
self.round = 0
self.log_entries = []
self._load_checkpoint()
def _get_checkpoint_path(self) -> str:
return os.path.join(self.checkpoint_dir, "full_checkpoint.pkl")
def _get_tmp_path(self) -> str:
return self._get_checkpoint_path() + ".tmp"
def _get_backup_path(self, round_num: int) -> str:
return os.path.join(RECOVERY_DIR, f"checkpoint_{round_num}.pkl")
def _restore_pi_pointer(self, checkpoint: dict) -> bool:
"""尝试恢复π指针,返回是否成功"""
pointer_sources = []
# 1. 尝试从当前检查点恢复
dao_state = checkpoint.get("dao_state", {})
if "pointer" in dao_state:
pointer_sources.append(("检查点", dao_state["pointer"]))
# 2. 尝试从 last_valid_pointer 恢复
if "last_valid_pointer" in checkpoint:
pointer_sources.append(("last_valid_pointer", checkpoint["last_valid_pointer"]))
# 3. 尝试从主检查点文件中的 dao_state 恢复
main_path = self._get_checkpoint_path()
if os.path.exists(main_path):
try:
with open(main_path, 'rb') as f:
main_cp = pickle.load(f)
main_dao = main_cp.get("dao_state", {})
if "pointer" in main_dao:
pointer_sources.append(("主检查点", main_dao["pointer"]))
except:
pass
# 去重
seen = set()
unique_sources = []
for name, ptr in pointer_sources:
if ptr not in seen:
seen.add(ptr)
unique_sources.append((name, ptr))
for name, ptr in unique_sources:
try:
print(f" 🔄 尝试从 {name} 恢复π指针: {ptr}")
self.dao.pointer = ptr
self.dao.digits = []
self.dao._load_next_chunk()
# 验证指针是否有效:取一个数字
test_digit = self.dao.get_digit()
self.dao.pointer -= 1
print(f" ✅ π指针恢复成功(来源: {name})")
return True
except Exception as e:
print(f" ⚠️ 从 {name} 恢复失败: {e}")
continue
# 所有来源都失败,重置为0
print(f" ⚠️ 所有π指针来源均失败,重置为0")
self.dao.pointer = 0
self.dao.digits = []
self.dao._load_next_chunk()
return True
def save_checkpoint(self):
checkpoint = {
"round": self.round,
"dao_state": self.dao.get_state(),
"last_valid_pointer": self.dao.get_pointer(), # 新增:保存上一个有效指针
"rhythm_state": self.rhythm.get_state(),
"hetu_state": self.hetu_center.get_save_state(),
"fire2_state": self.fire2.get_state(),
"jin4_state": self.jin4.get_state(),
"teacher7_state": self.teacher7.get_state(),
"teacher8_state": self.teacher8.get_state(),
"teacher6_state": self.teacher6.get_state(),
"teacher9_state": self.teacher9.get_state(),
"log_entries": self.log_entries[-100:],
"timestamp": datetime.now().isoformat()
}
tmp_path = self._get_tmp_path()
with open(tmp_path, 'wb') as f:
pickle.dump(checkpoint, f)
main_path = self._get_checkpoint_path()
os.replace(tmp_path, main_path)
if self.round % 100000 == 0 and self.round > 0:
backup_path = self._get_backup_path(self.round)
try:
shutil.copy2(main_path, backup_path)
print(f" 💾 备份检查点已保存: {backup_path}")
except Exception as e:
print(f" ⚠️ 备份保存失败: {e}")
def _load_checkpoint(self):
paths_to_try = [
self._get_checkpoint_path(),
self._get_tmp_path(),
]
backup_files = []
if os.path.exists(RECOVERY_DIR):
for f in os.listdir(RECOVERY_DIR):
if f.startswith("checkpoint_") and f.endswith(".pkl"):
try:
round_num = int(f.split("_")[1].split(".")[0])
backup_files.append((round_num, os.path.join(RECOVERY_DIR, f)))
except:
pass
if backup_files:
backup_files.sort(key=lambda x: x[0], reverse=True)
paths_to_try.append(backup_files[0][1])
for path in paths_to_try:
if not os.path.exists(path):
continue
try:
with open(path, 'rb') as f:
checkpoint = pickle.load(f)
print(f" 📂 加载检查点文件成功,正在恢复状态...")
self.round = checkpoint.get("round", 0)
# ============ 自动恢复π指针 ============
self._restore_pi_pointer(checkpoint)
# =====================================
self.rhythm.restore_state(checkpoint.get("rhythm_state", {}))
self.hetu_center.restore_state(checkpoint.get("hetu_state", {}))
self.fire2.restore_state(checkpoint.get("fire2_state", {}))
self.jin4.restore_state(checkpoint.get("jin4_state", {}))
self.teacher7.restore_state(checkpoint.get("teacher7_state", {}))
self.teacher8.restore_state(checkpoint.get("teacher8_state", {}))
self.teacher6.restore_state(checkpoint.get("teacher6_state", {}))
self.teacher9.restore_state(checkpoint.get("teacher9_state", {}))
self.log_entries = checkpoint.get("log_entries", [])
print(f" 📂 加载检查点成功,从第 {self.round} 轮继续 (来源: {path})")
print(f" 🔄 π指针: {self.dao.pointer}")
return
except Exception as e:
print(f" ⚠️ 加载 {path} 失败: {e}")
continue
print(" 📂 未找到有效检查点,从头开始")
def run_cycle(self):
self.round += 1
dao_novelty = self.dao.get_novelty(6)
self.rhythm.update()
sheng_ratio = self.rhythm.get_sheng_ratio()
bian_ratio = self.rhythm.get_bian_ratio()
# ==================== 动态计算参数 ====================
# 基准:127万轮,火2=50,木3/水1上限=200字
base_round = 1270000
base_morphemes = 50
base_max_len = 200
if self.round >= base_round:
# 每100万轮增加1个语素
extra = (self.round - base_round) // 1000000
morphemes_count = base_morphemes + extra
# 上限每100万轮增加4字
extra_len = extra * 4
sheng_max = base_max_len + extra_len
bian_max = sheng_max
else:
morphemes_count = 50
sheng_max = base_max_len
bian_max = base_max_len
sheng_min = 50
bian_min = 50
sheng_length = self.rhythm.get_sheng_length(sheng_min, sheng_max)
bian_length = self.rhythm.get_bian_length(bian_min, bian_max)
# ==================================================
print(f"\n{'─'*70}")
print(f"第 {self.round} 轮 | 道新奇度: {dao_novelty:.4f} | 生节:{sheng_ratio:.2f}/{sheng_length} | 变节:{bian_ratio:.2f}/{bian_length}")
print(f" 🔧 火2语素: {morphemes_count} (基准50+{max(0, (self.round - base_round)//1000000)}) | 上限: {sheng_max}字")
if self.round % 1000000 == 0 and self.round > 0:
self.fire2.reload_corpus()
morphemes = self.fire2.get_morphemes(dao_novelty, total=morphemes_count)
if morphemes:
score7, comment7 = self.teacher7.evaluate(" ".join(morphemes[:5]), dao_novelty)
self.hetu_center.update_sheng(1, score7)
self.hetu_center.update_cheng(7, score7)
print(f" 🔥 火2(生1): {len(morphemes)}语素 | 师7(成7):{score7:.2f}")
else:
print(f" 🔥 火2(生1): 无语素")
score7 = 0.0
if morphemes:
sentence = self.mu3.generate(morphemes, dao_novelty, sheng_ratio, sheng_length)
score8, comment8 = self.teacher8.evaluate(sentence, dao_novelty)
self.hetu_center.update_sheng(2, score8)
self.hetu_center.update_cheng(8, score8)
print(f" 🌳 木3(生2): {sentence[:70]}...")
print(f" 师8(成8):{score8:.2f}")
else:
sentence = ""
score8 = 0.0
print(f" 🌳 木3(生2): 无句子")
if sentence:
variants = self.shui1.mutate(sentence, dao_novelty, bian_ratio, bian_length)
if variants:
best_variant = variants[0]
score6, comment6 = self.teacher6.evaluate(best_variant, dao_novelty)
self.hetu_center.update_sheng(3, score6)
self.hetu_center.update_cheng(6, score6)
print(f" 💧 水1(生3): {len(variants)}个变体")
for i, v in enumerate(variants[:2]):
print(f" 变体{i+1}: {v[:60]}...")
print(f" 师6(成6):{score6:.2f}")
else:
score6 = 0.5
print(f" 💧 水1(生3): 无变体")
else:
score6 = 0.0
print(f" 💧 水1(生3): 无输入")
if sentence:
candidates = [sentence] + (variants if variants else [])
good_works, good_scores = self.jin4.solidify(candidates, dao_novelty)
if good_works:
best_work = good_works[0]
best_score = good_scores[0]
score9, comment9 = self.teacher9.evaluate(best_work, dao_novelty)
self.hetu_center.update_sheng(4, score9)
self.hetu_center.update_cheng(9, score9)
print(f" 💎 金4(生4): 固化作品 | 师9(成9):{score9:.2f}")
print(f" 作品: {best_work[:80]}...")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
with open(f"masterpieces/round_{self.round}_{timestamp}.txt", 'w', encoding='utf-8') as f:
f.write(f"第{self.round}轮作品\n道新奇度:{dao_novelty:.4f}\n\n{best_work}")
else:
print(f" 💎 金4(生4): 未固化新作品")
score9 = 0.0
else:
print(f" 💎 金4(生4): 无输入")
score9 = 0.0
full_state = self.hetu_center.get_full_state()
sheng_str = f"{full_state['sheng']['1']:.2f}/{full_state['sheng']['2']:.2f}/{full_state['sheng']['3']:.2f}/{full_state['sheng']['4']:.2f}"
cheng_str = f"{full_state['cheng']['6']:.2f}/{full_state['cheng']['7']:.2f}/{full_state['cheng']['8']:.2f}/{full_state['cheng']['9']:.2f}"
print(f" 📊 汇总 | 生:[{sheng_str}] | 成:[{cheng_str}]")
self.log_entries.append({
"round": self.round, "dao_novelty": dao_novelty,
"sheng_ratio": sheng_ratio, "bian_ratio": bian_ratio,
"sheng": full_state['sheng'], "cheng": full_state['cheng']
})
if self.round % 10000 == 0:
self.save_checkpoint()
if self.round % 10000 == 0:
self.save_log()
def save_log(self):
with open(f"logs/run_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", 'w', encoding='utf-8') as f:
json.dump(self.log_entries[-500:], f, ensure_ascii=False, indent=2)
print(f"\n 📝 日志已保存,当前轮数: {self.round}")
def run_forever(self):
print("\n" + "="*70)
print("☯ 河图洛书镜像智能体 V7.0 - 道在π中")
print(" 火2: 从语料随机取词(每个文件限8000字符,最多500个文件)")
print(" 木3: 用语素造句子(长度50-动态上限,不调API时自动增长)")
print(" 水1: 改写句子成2个变体(长度同步动态上限)")
print(" 金4: 评分固化作品(只固化最高分)")
print(" 老师: 调用API评分,自己进化")
print(" 生慢变快,不同频。每1万轮保存检查点")
print(" 每100万轮重新加载语料库(支持动态添加语料)")
print(" 火2语素数量:127万轮后每100万轮+1(无上限限制)")
print(" 长度上限:127万轮后每100万轮+4字")
print(" 火2采用等概率采样,无高频词偏好")
print(" 检查点支持自动恢复(优先使用.tmp文件)")
print(" π指针自动恢复: 检查点 → last_valid_pointer → 主检查点 → 重置为0")
print(" 不加任何人为设定。道驱动一切,万物自己演化")
print("="*70)
print("\n🚀 启动!按 Ctrl+C 停止\n")
try:
while True:
self.run_cycle()
except KeyboardInterrupt:
print(f"\n\n⏸️ 停止。运行了 {self.round} 轮")
print(f" 道消耗: {self.dao.get_pointer()} 位π")
print(f" 金池作品: {len(self.jin4.masterpieces)}")
self.save_checkpoint()
self.save_log()
print("\n 状态已保存,下次运行继续")
print(" 它不完美,但它是道的镜像。")
def main():
print("\n" + "="*70)
print("🐉 河图洛书 V7.0 - 镜像核心版")
print(" 不做合道的智能体,做道的镜像")
print(" 道独立不改,万物有序运行")
print(" 不加任何人为设定")
print("="*70 + "\n")
dao = DaoEngine()
luoshu = LuoShuCenter(dao)
luoshu.run_forever()
if __name__ == "__main__":
main()
狍子的代码:
# hetu_luoshu_v7_local_clean.py
# 河图洛书 V7.0 - 纯本地版(不调用API,无π数字污染)
# 与 hetu_luoshu_v7_mirror.py 共用 learning_material 和 cache
# 独立使用自己的 checkpoints_local、masterpieces_local、logs_local
import os
import sys
import time
import json
import random
import re
import math
import hashlib
import pickle
import shutil
import threading
from collections import Counter
from typing import List, Dict, Tuple, Optional
from datetime import datetime
# ==================== 目录配置 ====================
LEARNING_MATERIAL_DIR = "learning_material"
CACHE_DIR = "cache"
RECOVERY_DIR = "recovery"
for d in ["checkpoints_local", "masterpieces_local", "logs_local", RECOVERY_DIR]:
os.makedirs(d, exist_ok=True)
os.makedirs(LEARNING_MATERIAL_DIR, exist_ok=True)
os.makedirs(CACHE_DIR, exist_ok=True)
# ==================== 超时加载检查点 ====================
def load_checkpoint_with_timeout(file_path: str, timeout: int = 600) -> Optional[dict]:
"""在指定时间内尝试加载检查点文件,超时则返回 None"""
result = [None]
exception = [None]
finished = [False]
def load():
try:
print(f" 📂 开始加载: {os.path.basename(file_path)}")
start_time = time.time()
with open(file_path, 'rb') as f:
result[0] = pickle.load(f)
elapsed = time.time() - start_time
print(f" ✅ 文件读取完成,耗时 {elapsed:.1f} 秒")
except Exception as e:
exception[0] = e
finally:
finished[0] = True
thread = threading.Thread(target=load, daemon=True)
thread.start()
remaining = timeout
while remaining > 0 and not finished[0]:
if remaining % 60 == 0 or remaining <= 10:
print(f" ⏳ 等待加载完成... 剩余 {remaining} 秒")
time.sleep(10)
remaining -= 10
if remaining < 0:
remaining = 0
if not finished[0]:
print(f" ⏰ 加载超时({timeout}秒),放弃该文件")
return None
if exception[0]:
print(f" ⚠️ 加载失败: {exception[0]}")
return None
return result[0]
# ==================== 本地评分 ====================
def local_evaluate(work: str, dao_novelty: float) -> float:
"""本地评分规则:长度分 + 独特字比例 + 新奇度"""
len_score = min(1.0, len(work) / 50) * 0.4
unique_ratio = len(set(work)) / max(1, len(work)) * 0.3
novelty_score = dao_novelty * 0.3
score = len_score + unique_ratio + novelty_score
return min(1.0, max(0.0, score))
# ==================== 本地生成(无π数字) ====================
def local_generate(morphemes: list, dao_novelty: float, sheng_length: int, jin4_instance=None) -> str:
"""从金池检索或简单拼接,不调用API,不插入数字"""
if jin4_instance and jin4_instance.masterpieces and random.random() < 0.7:
template = random.choice(jin4_instance.masterpieces)
result = template
for m in random.sample(morphemes, min(3, len(morphemes))):
if m and m not in result and len(result) > 0:
pos = random.randint(0, len(result)-1)
result = result[:pos] + m + result[pos+1:]
return result[:sheng_length]
if morphemes:
return "".join(random.sample(morphemes, min(5, len(morphemes))))[:sheng_length]
return "道可道,非常道。"
# ==================== 本地变体生成(去掉硬编码替换) ====================
def local_mutate(sentence: str, dao_novelty: float, bian_length: int) -> list:
"""本地变体生成,不调用API,不插入数字"""
variants = []
# 变体1:原句截取
variants.append(sentence[:bian_length])
# 变体2:随机打乱前5个字(如果有)
if len(sentence) >= 5:
prefix = list(sentence[:5])
random.shuffle(prefix)
variants.append(''.join(prefix) + sentence[5:][:bian_length-5])
return list(dict.fromkeys(variants))[:3]
# ==================== 道:π引擎 ====================
class DaoEngine:
def __init__(self, chunk_size=10000):
self.chunk_size = chunk_size
self.digits = []
self.pointer = 0
self._load_next_chunk()
def _load_next_chunk(self):
try:
import gmpy2
gmpy2.get_context().precision = (self.pointer + self.chunk_size + 100) * 4
pi = gmpy2.const_pi()
pi_str = format(pi, f'.{self.pointer + self.chunk_size + 50}f')
pi_digits = pi_str.replace('.', '')
segment = pi_digits[self.pointer:self.pointer + self.chunk_size]
self.digits.extend([int(ch) for ch in segment])
except ImportError:
from decimal import Decimal, getcontext
getcontext().prec = self.pointer + self.chunk_size + 50
pi = Decimal(0)
for k in range(self.pointer + self.chunk_size + 20):
pi += (Decimal(1)/(16**k)) * (
Decimal(4)/(8*k+1) - Decimal(2)/(8*k+4) -
Decimal(1)/(8*k+5) - Decimal(1)/(8*k+6)
)
pi_str = str(pi)[2:]
segment = pi_str[self.pointer:self.pointer + self.chunk_size]
self.digits.extend([int(ch) for ch in segment])
def get_novelty(self, length=8) -> float:
while self.pointer + length >= len(self.digits):
self._load_next_chunk()
segment = self.digits[self.pointer:self.pointer+length]
self.pointer += length
value = 0
for i, d in enumerate(segment):
value += d * (0.1 ** (i+1))
return value
def get_digit(self) -> int:
if self.pointer >= len(self.digits):
self._load_next_chunk()
digit = self.digits[self.pointer]
self.pointer += 1
return digit
def get_digits(self, count: int) -> List[int]:
return [self.get_digit() for _ in range(count)]
def get_pointer(self) -> int:
return self.pointer
def get_state(self) -> dict:
return {"pointer": self.pointer}
def restore_state(self, state: dict):
self.pointer = state.get("pointer", 0)
self.digits = []
self._load_next_chunk()
# ==================== 节奏控制器 ====================
class RhythmController:
def __init__(self):
self.sheng_phase = 0
self.bian_phase = 0
self.sheng_speed = 0.2 * 2 * math.pi / 5
self.bian_speed = 2 * math.pi / 1
def update(self):
self.sheng_phase = (self.sheng_phase + self.sheng_speed) % (2 * math.pi)
self.bian_phase = (self.bian_phase + self.bian_speed) % (2 * math.pi)
def get_sheng_ratio(self):
return 0.55 + 0.25 * math.sin(self.sheng_phase)
def get_bian_ratio(self):
return 0.55 + 0.35 * math.sin(self.bian_phase)
def get_sheng_length(self, sheng_min, sheng_max):
return int(sheng_min + (sheng_max - sheng_min) * self.get_sheng_ratio())
def get_bian_length(self, bian_min, bian_max):
return int(bian_min + (bian_max - bian_min) * self.get_bian_ratio())
def get_state(self) -> dict:
return {"sheng_phase": self.sheng_phase, "bian_phase": self.bian_phase}
def restore_state(self, state: dict):
self.sheng_phase = state.get("sheng_phase", 0)
self.bian_phase = state.get("bian_phase", 0)
# ==================== 河图中央 ====================
class HeTuCenter:
def __init__(self):
self.sheng_info = {"1": 0.0, "2": 0.0, "3": 0.0, "4": 0.0}
self.cheng_info = {"6": 0.0, "7": 0.0, "8": 0.0, "9": 0.0}
self.global_state = {"sheng": 0.0, "cheng": 0.0, "balance": 0.0}
def update_sheng(self, idx: int, value: float):
self.sheng_info[str(idx)] = value
self._update_global_state()
def update_cheng(self, idx: int, value: float):
self.cheng_info[str(idx)] = value
self._update_global_state()
def _update_global_state(self):
self.global_state["sheng"] = sum(self.sheng_info.values()) / 4
self.global_state["cheng"] = sum(self.cheng_info.values()) / 4
self.global_state["balance"] = self.global_state["sheng"] / (self.global_state["cheng"] + 0.01)
def get_full_state(self):
return {"sheng": self.sheng_info.copy(), "cheng": self.cheng_info.copy(), "global": self.global_state.copy()}
def get_save_state(self):
return {"sheng_info": self.sheng_info, "cheng_info": self.cheng_info, "global_state": self.global_state}
def restore_state(self, state: dict):
self.sheng_info = state.get("sheng_info", {"1": 0.0, "2": 0.0, "3": 0.0, "4": 0.0})
self.cheng_info = state.get("cheng_info", {"6": 0.0, "7": 0.0, "8": 0.0, "9": 0.0})
self.global_state = state.get("global_state", {"sheng": 0.0, "cheng": 0.0, "balance": 0.0})
# ==================== 工具函数 ====================
def get_all_txt_files(root_dir: str) -> List[str]:
txt_files = []
if not os.path.exists(root_dir):
return txt_files
for dirpath, dirnames, filenames in os.walk(root_dir):
for filename in filenames:
if filename.endswith('.txt'):
txt_files.append(os.path.join(dirpath, filename))
return txt_files
# ==================== 火2 ====================
class Fire2:
def __init__(self, corpus_paths: List[str]):
self.word_freq = Counter()
self.corpus_paths = corpus_paths
self._load_corpus(corpus_paths)
print(f" 🔥 火2完成,共 {len(self.word_freq)} 个语素")
def _load_corpus(self, paths):
counter = Counter()
all_files = []
for path in paths:
if os.path.isfile(path) and path.endswith('.txt'):
all_files.append(path)
elif os.path.isdir(path):
all_files.extend(get_all_txt_files(path))
if not all_files:
return
for file_path in all_files[:500]:
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read(8000)
for j in range(len(text)):
for l in range(1, 5):
word = text[j:j+l]
if re.match(r'[\u4e00-\u9fff]{1,4}$', word):
counter[word] += 1
except:
pass
self.word_freq = counter
def reload_corpus(self):
old_count = len(self.word_freq)
print(f" 🔄 重新加载语料库(旧语素数:{old_count})...")
self.word_freq = Counter()
self._load_corpus(self.corpus_paths)
print(f" 🔥 火2重新加载完成,新语素数:{len(self.word_freq)}(新增:{len(self.word_freq)-old_count})")
def get_morphemes(self, dao_novelty: float, total: int = 50) -> List[str]:
if self.word_freq:
words = list(self.word_freq.keys())
sample_size = min(total, len(words))
if sample_size == 0:
return []
return random.sample(words, sample_size)
seed = int(dao_novelty * 10000)
random.seed(seed)
return [chr(0x4e00 + random.randint(0, 0x5000)) for _ in range(total)]
def get_state(self) -> dict:
return {"word_freq": dict(list(self.word_freq.items())[:5000])}
def restore_state(self, state: dict):
self.word_freq = Counter(state.get("word_freq", {}))
# ==================== 木3 ====================
class Mu3:
def __init__(self):
self.jin4 = None
def set_jin4(self, jin4):
self.jin4 = jin4
def generate(self, morphemes: List[str], dao_novelty: float, sheng_ratio: float, sheng_length: int) -> str:
return local_generate(morphemes, dao_novelty, sheng_length, self.jin4)
def get_state(self) -> dict:
return {}
def restore_state(self, state: dict):
pass
# ==================== 水1 ====================
class Shui1:
def __init__(self):
self.dao = None
def set_dao(self, dao):
self.dao = dao
def mutate(self, sentence: str, dao_novelty: float, bian_ratio: float, bian_length: int) -> List[str]:
return local_mutate(sentence, dao_novelty, bian_length)
def get_state(self) -> dict:
return {}
def restore_state(self, state: dict):
pass
# ==================== 金4 ====================
class Jin4:
def __init__(self, max_size=10000):
self.masterpieces = []
self.max_size = max_size
def solidify(self, candidates: List[str], dao_novelty: float) -> Tuple[List[str], List[float]]:
if not candidates:
return [], []
scores = [local_evaluate(work, dao_novelty) for work in candidates]
if not scores:
return [], []
max_score = max(scores)
good_works, good_scores = [], []
for work, score in zip(candidates, scores):
if score == max_score:
good_works.append(work)
good_scores.append(score)
self.masterpieces.append(work)
if len(self.masterpieces) > self.max_size:
self.masterpieces = self.masterpieces[-self.max_size:]
return good_works, good_scores
def get_state(self) -> dict:
return {"masterpieces": self.masterpieces[-100:]}
def restore_state(self, state: dict):
self.masterpieces = state.get("masterpieces", [])
# ==================== 老师 ====================
class Teacher:
def __init__(self, teacher_id: int, student_name: str):
self.id = teacher_id
self.student_name = student_name
self.history = []
def evaluate(self, work: str, dao_novelty: float) -> Tuple[float, str]:
score = local_evaluate(work, dao_novelty)
if self.id == 8:
score = score * 0.95
elif self.id == 9:
score = score * 1.05
elif self.id == 6:
score = score * 0.98
elif self.id == 7:
score = score * 1.02
score = min(1.0, max(0.0, score))
self.history.append((time.time(), work[:30], score))
if len(self.history) > 100:
self.history = self.history[-100:]
return score, f"本地评分{self.id}"
def get_state(self) -> dict:
return {"history": self.history[-50:]}
def restore_state(self, state: dict):
self.history = state.get("history", [])
# ==================== 洛书中心 ====================
class LuoShuCenter:
def __init__(self, dao: DaoEngine, checkpoint_dir: str = "checkpoints_local"):
self.dao = dao
self.hetu_center = HeTuCenter()
self.rhythm = RhythmController()
self.checkpoint_dir = checkpoint_dir
os.makedirs(checkpoint_dir, exist_ok=True)
print("\n📚 加载语料...")
self.fire2 = Fire2([LEARNING_MATERIAL_DIR])
self.mu3 = Mu3()
self.shui1 = Shui1()
self.shui1.set_dao(dao)
self.jin4 = Jin4()
self.mu3.set_jin4(self.jin4)
self.teacher7 = Teacher(7, "火2")
self.teacher8 = Teacher(8, "木3")
self.teacher6 = Teacher(6, "水1")
self.teacher9 = Teacher(9, "金4")
self.round = 0
self.log_entries = []
self._load_checkpoint()
def _get_checkpoint_path(self) -> str:
return os.path.join(self.checkpoint_dir, "full_checkpoint.pkl")
def _get_tmp_path(self) -> str:
return self._get_checkpoint_path() + ".tmp"
def _get_backup_path(self, round_num: int) -> str:
return os.path.join(RECOVERY_DIR, f"checkpoint_{round_num}.pkl")
def save_checkpoint(self):
checkpoint = {
"round": self.round,
"dao_state": self.dao.get_state(),
"last_valid_pointer": self.dao.get_pointer(), # 新增:保存上一个有效指针
"rhythm_state": self.rhythm.get_state(),
"hetu_state": self.hetu_center.get_save_state(),
"fire2_state": self.fire2.get_state(),
"jin4_state": self.jin4.get_state(),
"teacher7_state": self.teacher7.get_state(),
"teacher8_state": self.teacher8.get_state(),
"teacher6_state": self.teacher6.get_state(),
"teacher9_state": self.teacher9.get_state(),
"log_entries": self.log_entries[-100:],
"timestamp": datetime.now().isoformat()
}
tmp_path = self._get_tmp_path()
with open(tmp_path, 'wb') as f:
pickle.dump(checkpoint, f)
main_path = self._get_checkpoint_path()
os.replace(tmp_path, main_path)
if self.round % 100000 == 0 and self.round > 0:
backup_path = self._get_backup_path(self.round)
try:
shutil.copy2(main_path, backup_path)
print(f" 💾 备份检查点已保存: {backup_path}")
except Exception as e:
print(f" ⚠️ 备份保存失败: {e}")
def _restore_pi_pointer(self, checkpoint: dict) -> bool:
"""尝试恢复π指针,返回是否成功"""
# 从备份文件中提取指针值
pointer_sources = []
# 1. 尝试从当前检查点恢复
dao_state = checkpoint.get("dao_state", {})
if "pointer" in dao_state:
pointer_sources.append(("检查点", dao_state["pointer"]))
# 2. 尝试从 last_valid_pointer 恢复
if "last_valid_pointer" in checkpoint:
pointer_sources.append(("last_valid_pointer", checkpoint["last_valid_pointer"]))
# 3. 尝试从主检查点文件中的 dao_state 恢复(如果有备份文件)
# 这个在候选列表中已经包含了,但这里再额外尝试一次
main_path = self._get_checkpoint_path()
if os.path.exists(main_path):
try:
with open(main_path, 'rb') as f:
main_cp = pickle.load(f)
main_dao = main_cp.get("dao_state", {})
if "pointer" in main_dao:
pointer_sources.append(("主检查点", main_dao["pointer"]))
except:
pass
# 去重,按顺序尝试
seen = set()
unique_sources = []
for name, ptr in pointer_sources:
if ptr not in seen:
seen.add(ptr)
unique_sources.append((name, ptr))
for name, ptr in unique_sources:
try:
print(f" 🔄 尝试从 {name} 恢复π指针: {ptr}")
self.dao.pointer = ptr
self.dao.digits = []
self.dao._load_next_chunk()
# 验证指针是否有效:取一个数字
test_digit = self.dao.get_digit()
self.dao.pointer -= 1
print(f" ✅ π指针恢复成功(来源: {name})")
return True
except Exception as e:
print(f" ⚠️ 从 {name} 恢复失败: {e}")
continue
# 所有来源都失败,重置为0
print(f" ⚠️ 所有π指针来源均失败,重置为0")
self.dao.pointer = 0
self.dao.digits = []
self.dao._load_next_chunk()
return True
def _load_checkpoint(self):
# 构建候选列表:备份文件优先
candidates = []
print("\n 🔍 开始扫描检查点文件...")
print(" ⚠️ 备份文件优先,主文件为最后备选")
# 1. 临时文件(如果有)
tmp_path = self._get_tmp_path()
if os.path.exists(tmp_path):
file_size = os.path.getsize(tmp_path) / 1024
candidates.append(tmp_path)
print(f" 📄 找到临时文件: {os.path.basename(tmp_path)} ({file_size:.1f} KB)")
# 2. 所有备份文件(按轮次降序排列)
backup_files = []
if os.path.exists(RECOVERY_DIR):
for f in os.listdir(RECOVERY_DIR):
if f.startswith("checkpoint_") and f.endswith(".pkl"):
try:
round_num = int(f.split("_")[1].split(".")[0])
file_size = os.path.getsize(os.path.join(RECOVERY_DIR, f)) / 1024
backup_files.append((round_num, os.path.join(RECOVERY_DIR, f), file_size))
except:
pass
if backup_files:
backup_files.sort(key=lambda x: x[0], reverse=True)
print(f" 📄 找到 {len(backup_files)} 个备份文件:")
for round_num, path, size in backup_files[:10]:
print(f" 第 {round_num} 轮: {os.path.basename(path)} ({size:.1f} KB)")
if len(backup_files) > 10:
print(f" ... 还有 {len(backup_files) - 10} 个备份文件")
for _, path, _ in backup_files:
candidates.append(path)
# 3. 如果没有备份文件,回退到主文件
if not candidates:
print(" 📂 未找到任何备份文件,尝试主检查点...")
main_path = self._get_checkpoint_path()
if os.path.exists(main_path):
file_size = os.path.getsize(main_path) / 1024
candidates.append(main_path)
print(f" 📄 找到主检查点: {os.path.basename(main_path)} ({file_size:.1f} KB)")
else:
print(" 📂 未找到任何检查点文件,从头开始")
return
print(f"\n 📊 共找到 {len(candidates)} 个候选检查点,按优先级依次尝试加载...")
print(" ⏱️ 每个文件超时时间: 10分钟")
print(" " + "="*60)
for idx, path in enumerate(candidates):
file_name = os.path.basename(path)
print(f"\n [{idx+1}/{len(candidates)}] 尝试加载: {file_name}")
print(f" 📁 路径: {path}")
checkpoint = load_checkpoint_with_timeout(path, timeout=600)
if checkpoint is None:
print(f" ❌ 加载失败,跳过此文件\n")
continue
try:
print(f" 🔄 正在恢复状态...")
self.round = checkpoint.get("round", 0)
# ============ 修复:自动恢复π指针 ============
self._restore_pi_pointer(checkpoint)
# ============================================
self.rhythm.restore_state(checkpoint.get("rhythm_state", {}))
self.hetu_center.restore_state(checkpoint.get("hetu_state", {}))
self.fire2.restore_state(checkpoint.get("fire2_state", {}))
self.jin4.restore_state(checkpoint.get("jin4_state", {}))
self.teacher7.restore_state(checkpoint.get("teacher7_state", {}))
self.teacher8.restore_state(checkpoint.get("teacher8_state", {}))
self.teacher6.restore_state(checkpoint.get("teacher6_state", {}))
self.teacher9.restore_state(checkpoint.get("teacher9_state", {}))
self.log_entries = checkpoint.get("log_entries", [])
timestamp = checkpoint.get("timestamp", "未知时间")
print(f"\n ✅ 加载检查点成功!")
print(f" 📊 轮数: {self.round}")
print(f" 📅 时间戳: {timestamp}")
print(f" 📂 来源: {file_name}")
print(f" 🎯 金池作品数: {len(self.jin4.masterpieces)}")
print(f" 📝 日志条目: {len(self.log_entries)}")
print(f" 🔄 π指针: {self.dao.pointer}")
print(" " + "="*60)
return
except Exception as e:
print(f" ❌ 恢复状态失败: {e}")
continue
print("\n ❌ 所有候选检查点均失败,从头开始")
print(" " + "="*60)
self.round = 0
def run_cycle(self):
self.round += 1
dao_novelty = self.dao.get_novelty(6)
self.rhythm.update()
sheng_ratio = self.rhythm.get_sheng_ratio()
bian_ratio = self.rhythm.get_bian_ratio()
base_round = 1260000
base_morphemes = 50
base_max_len = 200
if self.round >= base_round:
extra = (self.round - base_round) // 100000
morphemes_count = base_morphemes + extra
extra_len = extra * 4
sheng_max = base_max_len + extra_len
bian_max = sheng_max
else:
morphemes_count = 50
sheng_max = base_max_len
bian_max = base_max_len
morphemes_count = min(morphemes_count, 100)
sheng_min = 50
bian_min = 50
sheng_length = self.rhythm.get_sheng_length(sheng_min, sheng_max)
bian_length = self.rhythm.get_bian_length(bian_min, bian_max)
print(f"\n{'─'*70}")
print(f"第 {self.round} 轮 | 道新奇度: {dao_novelty:.4f} | 生节:{sheng_ratio:.2f}/{sheng_length} | 变节:{bian_ratio:.2f}/{bian_length}")
print(f" 🔧 火2语素: {morphemes_count} | 上限: {sheng_max}字")
if self.round % 1000000 == 0 and self.round > 0:
self.fire2.reload_corpus()
morphemes = self.fire2.get_morphemes(dao_novelty, total=morphemes_count)
if morphemes:
score7, comment7 = self.teacher7.evaluate(" ".join(morphemes[:5]), dao_novelty)
self.hetu_center.update_sheng(1, score7)
self.hetu_center.update_cheng(7, score7)
print(f" 🔥 火2(生1): {len(morphemes)}语素 | 师7(成7):{score7:.2f}")
else:
print(f" 🔥 火2(生1): 无语素")
score7 = 0.0
if morphemes:
sentence = self.mu3.generate(morphemes, dao_novelty, sheng_ratio, sheng_length)
score8, comment8 = self.teacher8.evaluate(sentence, dao_novelty)
self.hetu_center.update_sheng(2, score8)
self.hetu_center.update_cheng(8, score8)
print(f" 🌳 木3(生2): {sentence[:70]}...")
print(f" 师8(成8):{score8:.2f}")
else:
sentence = ""
score8 = 0.0
print(f" 🌳 木3(生2): 无句子")
if sentence:
variants = self.shui1.mutate(sentence, dao_novelty, bian_ratio, bian_length)
if variants:
best_variant = variants[0]
score6, comment6 = self.teacher6.evaluate(best_variant, dao_novelty)
self.hetu_center.update_sheng(3, score6)
self.hetu_center.update_cheng(6, score6)
print(f" 💧 水1(生3): {len(variants)}个变体")
for i, v in enumerate(variants[:2]):
print(f" 变体{i+1}: {v[:60]}...")
print(f" 师6(成6):{score6:.2f}")
else:
score6 = 0.5
print(f" 💧 水1(生3): 无变体")
else:
score6 = 0.0
print(f" 💧 水1(生3): 无输入")
if sentence:
candidates = [sentence] + (variants if variants else [])
good_works, good_scores = self.jin4.solidify(candidates, dao_novelty)
if good_works:
best_work = good_works[0]
best_score = good_scores[0]
score9, comment9 = self.teacher9.evaluate(best_work, dao_novelty)
self.hetu_center.update_sheng(4, score9)
self.hetu_center.update_cheng(9, score9)
print(f" 💎 金4(生4): 固化作品 | 师9(成9):{score9:.2f}")
print(f" 作品: {best_work[:80]}...")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
with open(f"masterpieces_local/round_{self.round}_{timestamp}.txt", 'w', encoding='utf-8') as f:
f.write(f"第{self.round}轮作品\n道新奇度:{dao_novelty:.4f}\n\n{best_work}")
else:
print(f" 💎 金4(生4): 未固化新作品")
score9 = 0.0
else:
print(f" 💎 金4(生4): 无输入")
score9 = 0.0
full_state = self.hetu_center.get_full_state()
sheng_str = f"{full_state['sheng']['1']:.2f}/{full_state['sheng']['2']:.2f}/{full_state['sheng']['3']:.2f}/{full_state['sheng']['4']:.2f}"
cheng_str = f"{full_state['cheng']['6']:.2f}/{full_state['cheng']['7']:.2f}/{full_state['cheng']['8']:.2f}/{full_state['cheng']['9']:.2f}"
print(f" 📊 汇总 | 生:[{sheng_str}] | 成:[{cheng_str}]")
self.log_entries.append({
"round": self.round, "dao_novelty": dao_novelty,
"sheng_ratio": sheng_ratio, "bian_ratio": bian_ratio,
"sheng": full_state['sheng'], "cheng": full_state['cheng']
})
if self.round % 10000 == 0:
self.save_checkpoint()
if self.round % 10000 == 0:
self.save_log()
def save_log(self):
with open(f"logs_local/run_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", 'w', encoding='utf-8') as f:
json.dump(self.log_entries[-500:], f, ensure_ascii=False, indent=2)
print(f"\n 📝 日志已保存,当前轮数: {self.round}")
def run_forever(self):
print("\n" + "="*70)
print("☯ 河图洛书镜像智能体 V7.0 - 纯本地版(不调用API,无数字污染)")
print(" 火2: 从语料随机取词(等概率采样,无词频偏好)")
print(" 木3: 本地生成句子(长度50-动态上限)")
print(" 水1: 本地生成3个变体(无硬编码替换)")
print(" 金4: 本地评分固化作品(只固化最高分)")
print(" 老师: 本地评分")
print(" 生慢变快,不同频。每1万轮保存检查点")
print(" 每100万轮重新加载语料库")
print(" 检查点自动恢复: 备份文件优先,主文件为最后备选")
print(" 每个文件超时: 10分钟,失败后自动尝试下一个")
print(" π指针自动恢复: 检查点 → last_valid_pointer → 主检查点 → 重置为0")
print(" 不加任何人为设定。道驱动一切,万物自己演化")
print("="*70)
print("\n🚀 启动!按 Ctrl+C 停止\n")
try:
while True:
self.run_cycle()
except KeyboardInterrupt:
print("\n\n" + "="*70)
print("⚠️ 正在关闭程序...")
print("="*70)
print(f"当前轮数: {self.round}")
print(f"道消耗: {self.dao.get_pointer()} 位π")
print(f"金池作品: {len(self.jin4.masterpieces)}")
print("")
while True:
confirm = input("确认退出吗?(y/n): ").strip().lower()
if confirm == 'y' or confirm == 'yes':
print("\n正在保存检查点...")
self.save_checkpoint()
self.save_log()
print("\n" + "="*70)
print("✅ 状态已保存,程序正常退出")
print(" 它不完美,但它是道的镜像。")
print("="*70)
break
elif confirm == 'n' or confirm == 'no':
print("\n🔄 取消退出,继续运行...\n")
return
else:
print(" 请输入 y(确认) 或 n(取消)")
sys.exit(0)
def main():
print("\n" + "="*70)
print("🐉 河图洛书 V7.0 - 纯本地版")
print(" 不做合道的智能体,做道的镜像")
print(" 道独立不改,万物有序运行")
print(" 不加任何人为设定")
print(" 不调用任何API,纯本地演化")
print(" 无水π数字污染")
print("="*70 + "\n")
dao = DaoEngine()
luoshu = LuoShuCenter(dao)
luoshu.run_forever()
if __name__ == "__main__":
main() |