# hetu_luoshu_v7_local_clean.py
# 河图洛书 V7.0 - 纯本地版(不调用API,无π数字污染)
# 与 hetu_luoshu_v7_mirror.py 共用 learning_material 和 cache
# 独立使用自己的 checkpoints_local、masterpieces_local、logs_local
import os
import sys
import time
import json
import random
import re
import math
import hashlib
import pickle
import shutil
from collections import Counter
from typing import List, Dict, Tuple, Optional
from datetime import datetime
# ==================== 目录配置 ====================
LEARNING_MATERIAL_DIR = "learning_material"
CACHE_DIR = "cache"
RECOVERY_DIR = "recovery" # 新增:备份目录
for d in ["checkpoints_local", "masterpieces_local", "logs_local", RECOVERY_DIR]:
os.makedirs(d, exist_ok=True)
os.makedirs(LEARNING_MATERIAL_DIR, exist_ok=True)
os.makedirs(CACHE_DIR, exist_ok=True)
# ==================== 本地评分 ====================
def local_evaluate(work: str, dao_novelty: float) -> float:
"""本地评分规则:长度分 + 独特字比例 + 新奇度"""
len_score = min(1.0, len(work) / 50) * 0.4
unique_ratio = len(set(work)) / max(1, len(work)) * 0.3
novelty_score = dao_novelty * 0.3
score = len_score + unique_ratio + novelty_score
return min(1.0, max(0.0, score))
# ==================== 本地生成(无π数字) ====================
def local_generate(morphemes: list, dao_novelty: float, sheng_length: int, jin4_instance=None) -> str:
"""从金池检索或简单拼接,不调用API,不插入数字"""
if jin4_instance and jin4_instance.masterpieces and random.random() < 0.7:
template = random.choice(jin4_instance.masterpieces)
result = template
for m in random.sample(morphemes, min(3, len(morphemes))):
if m and m not in result and len(result) > 0:
pos = random.randint(0, len(result)-1)
result = result[:pos] + m + result[pos+1:]
return result[:sheng_length]
if morphemes:
return "".join(random.sample(morphemes, min(5, len(morphemes))))[:sheng_length]
return "道可道,非常道。"
# ==================== 本地变体生成(去掉硬编码替换) ====================
def local_mutate(sentence: str, dao_novelty: float, bian_length: int) -> list:
"""本地变体生成,不调用API,不插入数字"""
variants = []
# 变体1:原句截取
variants.append(sentence[:bian_length])
# 变体2:随机打乱前5个字(如果有)
if len(sentence) >= 5:
prefix = list(sentence[:5])
random.shuffle(prefix)
variants.append(''.join(prefix) + sentence[5:][:bian_length-5])
# 去重
return list(dict.fromkeys(variants))[:3]
# ==================== 道:π引擎 ====================
class DaoEngine:
def __init__(self, chunk_size=10000):
self.chunk_size = chunk_size
self.digits = []
self.pointer = 0
self._load_next_chunk()
def _load_next_chunk(self):
try:
import gmpy2
gmpy2.get_context().precision = (self.pointer + self.chunk_size + 100) * 4
pi = gmpy2.const_pi()
pi_str = format(pi, f'.{self.pointer + self.chunk_size + 50}f')
pi_digits = pi_str.replace('.', '')
segment = pi_digits[self.pointer:self.pointer + self.chunk_size]
self.digits.extend([int(ch) for ch in segment])
except ImportError:
from decimal import Decimal, getcontext
getcontext().prec = self.pointer + self.chunk_size + 50
pi = Decimal(0)
for k in range(self.pointer + self.chunk_size + 20):
pi += (Decimal(1)/(16**k)) * (
Decimal(4)/(8*k+1) - Decimal(2)/(8*k+4) -
Decimal(1)/(8*k+5) - Decimal(1)/(8*k+6)
)
pi_str = str(pi)[2:]
segment = pi_str[self.pointer:self.pointer + self.chunk_size]
self.digits.extend([int(ch) for ch in segment])
def get_novelty(self, length=8) -> float:
while self.pointer + length >= len(self.digits):
self._load_next_chunk()
segment = self.digits[self.pointer:self.pointer+length]
self.pointer += length
value = 0
for i, d in enumerate(segment):
value += d * (0.1 ** (i+1))
return value
def get_digit(self) -> int:
if self.pointer >= len(self.digits):
self._load_next_chunk()
digit = self.digits[self.pointer]
self.pointer += 1
return digit
def get_digits(self, count: int) -> List[int]:
return [self.get_digit() for _ in range(count)]
def get_pointer(self) -> int:
return self.pointer
def get_state(self) -> dict:
return {"pointer": self.pointer}
def restore_state(self, state: dict):
self.pointer = state.get("pointer", 0)
self.digits = []
self._load_next_chunk()
# ==================== 节奏控制器 ====================
class RhythmController:
def __init__(self):
self.sheng_phase = 0
self.bian_phase = 0
self.sheng_speed = 0.2 * 2 * math.pi / 5
self.bian_speed = 2 * math.pi / 1
def update(self):
self.sheng_phase = (self.sheng_phase + self.sheng_speed) % (2 * math.pi)
self.bian_phase = (self.bian_phase + self.bian_speed) % (2 * math.pi)
def get_sheng_ratio(self):
return 0.55 + 0.25 * math.sin(self.sheng_phase)
def get_bian_ratio(self):
return 0.55 + 0.35 * math.sin(self.bian_phase)
def get_sheng_length(self, sheng_min, sheng_max):
return int(sheng_min + (sheng_max - sheng_min) * self.get_sheng_ratio())
def get_bian_length(self, bian_min, bian_max):
return int(bian_min + (bian_max - bian_min) * self.get_bian_ratio())
def get_state(self) -> dict:
return {"sheng_phase": self.sheng_phase, "bian_phase": self.bian_phase}
def restore_state(self, state: dict):
self.sheng_phase = state.get("sheng_phase", 0)
self.bian_phase = state.get("bian_phase", 0)
# ==================== 河图中央 ====================
class HeTuCenter:
def __init__(self):
self.sheng_info = {"1": 0.0, "2": 0.0, "3": 0.0, "4": 0.0}
self.cheng_info = {"6": 0.0, "7": 0.0, "8": 0.0, "9": 0.0}
self.global_state = {"sheng": 0.0, "cheng": 0.0, "balance": 0.0}
def update_sheng(self, idx: int, value: float):
self.sheng_info[str(idx)] = value
self._update_global_state()
def update_cheng(self, idx: int, value: float):
self.cheng_info[str(idx)] = value
self._update_global_state()
def _update_global_state(self):
self.global_state["sheng"] = sum(self.sheng_info.values()) / 4
self.global_state["cheng"] = sum(self.cheng_info.values()) / 4
self.global_state["balance"] = self.global_state["sheng"] / (self.global_state["cheng"] + 0.01)
def get_full_state(self):
return {"sheng": self.sheng_info.copy(), "cheng": self.cheng_info.copy(), "global": self.global_state.copy()}
def get_save_state(self):
return {"sheng_info": self.sheng_info, "cheng_info": self.cheng_info, "global_state": self.global_state}
def restore_state(self, state: dict):
self.sheng_info = state.get("sheng_info", {"1": 0.0, "2": 0.0, "3": 0.0, "4": 0.0})
self.cheng_info = state.get("cheng_info", {"6": 0.0, "7": 0.0, "8": 0.0, "9": 0.0})
self.global_state = state.get("global_state", {"sheng": 0.0, "cheng": 0.0, "balance": 0.0})
# ==================== 工具函数 ====================
def get_all_txt_files(root_dir: str) -> List[str]:
txt_files = []
if not os.path.exists(root_dir):
return txt_files
for dirpath, dirnames, filenames in os.walk(root_dir):
for filename in filenames:
if filename.endswith('.txt'):
txt_files.append(os.path.join(dirpath, filename))
return txt_files
# ==================== 火2 ====================
class Fire2:
def __init__(self, corpus_paths: List[str]):
self.word_freq = Counter()
self.corpus_paths = corpus_paths
self._load_corpus(corpus_paths)
print(f" 🔥 火2完成,共 {len(self.word_freq)} 个语素")
def _load_corpus(self, paths):
counter = Counter()
all_files = []
for path in paths:
if os.path.isfile(path) and path.endswith('.txt'):
all_files.append(path)
elif os.path.isdir(path):
all_files.extend(get_all_txt_files(path))
if not all_files:
return
for file_path in all_files[:500]:
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read(8000)
for j in range(len(text)):
for l in range(1, 5):
word = text[j:j+l]
if re.match(r'[\u4e00-\u9fff]{1,4}$', word):
counter[word] += 1
except:
pass
self.word_freq = counter
def reload_corpus(self):
old_count = len(self.word_freq)
print(f" 🔄 重新加载语料库(旧语素数:{old_count})...")
self.word_freq = Counter()
self._load_corpus(self.corpus_paths)
print(f" 🔥 火2重新加载完成,新语素数:{len(self.word_freq)}(新增:{len(self.word_freq)-old_count})")
def get_morphemes(self, dao_novelty: float, total: int = 50) -> List[str]:
if self.word_freq:
words = list(self.word_freq.keys())
sample_size = min(total, len(words))
if sample_size == 0:
return []
# 改为等概率采样,不按词频加权
return random.sample(words, sample_size)
seed = int(dao_novelty * 10000)
random.seed(seed)
return [chr(0x4e00 + random.randint(0, 0x5000)) for _ in range(total)]
def get_state(self) -> dict:
return {"word_freq": dict(list(self.word_freq.items())[:5000])}
def restore_state(self, state: dict):
self.word_freq = Counter(state.get("word_freq", {}))
# ==================== 木3 ====================
class Mu3:
def __init__(self):
self.jin4 = None
def set_jin4(self, jin4):
self.jin4 = jin4
def generate(self, morphemes: List[str], dao_novelty: float, sheng_ratio: float, sheng_length: int) -> str:
return local_generate(morphemes, dao_novelty, sheng_length, self.jin4)
def get_state(self) -> dict:
return {}
def restore_state(self, state: dict):
pass
# ==================== 水1 ====================
class Shui1:
def __init__(self):
self.dao = None
def set_dao(self, dao):
self.dao = dao
def mutate(self, sentence: str, dao_novelty: float, bian_ratio: float, bian_length: int) -> List[str]:
return local_mutate(sentence, dao_novelty, bian_length)
def get_state(self) -> dict:
return {}
def restore_state(self, state: dict):
pass
# ==================== 金4 ====================
class Jin4:
def __init__(self, max_size=10000):
self.masterpieces = []
self.max_size = max_size
def solidify(self, candidates: List[str], dao_novelty: float) -> Tuple[List[str], List[float]]:
if not candidates:
return [], []
scores = [local_evaluate(work, dao_novelty) for work in candidates]
if not scores:
return [], []
max_score = max(scores)
good_works, good_scores = [], []
for work, score in zip(candidates, scores):
if score == max_score:
good_works.append(work)
good_scores.append(score)
self.masterpieces.append(work)
if len(self.masterpieces) > self.max_size:
self.masterpieces = self.masterpieces[-self.max_size:]
return good_works, good_scores
def get_state(self) -> dict:
return {"masterpieces": self.masterpieces[-100:]}
def restore_state(self, state: dict):
self.masterpieces = state.get("masterpieces", [])
# ==================== 老师 ====================
class Teacher:
def __init__(self, teacher_id: int, student_name: str):
self.id = teacher_id
self.student_name = student_name
self.history = []
def evaluate(self, work: str, dao_novelty: float) -> Tuple[float, str]:
score = local_evaluate(work, dao_novelty)
if self.id == 8:
score = score * 0.95
elif self.id == 9:
score = score * 1.05
elif self.id == 6:
score = score * 0.98
elif self.id == 7:
score = score * 1.02
score = min(1.0, max(0.0, score))
self.history.append((time.time(), work[:30], score))
if len(self.history) > 100:
self.history = self.history[-100:]
return score, f"本地评分{self.id}"
def get_state(self) -> dict:
return {"history": self.history[-50:]}
def restore_state(self, state: dict):
self.history = state.get("history", [])
# ==================== 洛书中心 ====================
class LuoShuCenter:
def __init__(self, dao: DaoEngine, checkpoint_dir: str = "checkpoints_local"):
self.dao = dao
self.hetu_center = HeTuCenter()
self.rhythm = RhythmController()
self.checkpoint_dir = checkpoint_dir
os.makedirs(checkpoint_dir, exist_ok=True)
print("\n📚 加载语料...")
self.fire2 = Fire2([LEARNING_MATERIAL_DIR])
self.mu3 = Mu3()
self.shui1 = Shui1()
self.shui1.set_dao(dao)
self.jin4 = Jin4()
self.mu3.set_jin4(self.jin4)
self.teacher7 = Teacher(7, "火2")
self.teacher8 = Teacher(8, "木3")
self.teacher6 = Teacher(6, "水1")
self.teacher9 = Teacher(9, "金4")
self.round = 0
self.log_entries = []
self._load_checkpoint()
def _get_checkpoint_path(self) -> str:
return os.path.join(self.checkpoint_dir, "full_checkpoint.pkl")
def _get_tmp_path(self) -> str:
return self._get_checkpoint_path() + ".tmp"
def _get_backup_path(self, round_num: int) -> str:
return os.path.join(RECOVERY_DIR, f"checkpoint_{round_num}.pkl")
def save_checkpoint(self):
checkpoint = {
"round": self.round,
"dao_state": self.dao.get_state(),
"rhythm_state": self.rhythm.get_state(),
"hetu_state": self.hetu_center.get_save_state(),
"fire2_state": self.fire2.get_state(),
"jin4_state": self.jin4.get_state(),
"teacher7_state": self.teacher7.get_state(),
"teacher8_state": self.teacher8.get_state(),
"teacher6_state": self.teacher6.get_state(),
"teacher9_state": self.teacher9.get_state(),
"log_entries": self.log_entries[-100:],
"timestamp": datetime.now().isoformat()
}
# 先写临时文件
tmp_path = self._get_tmp_path()
with open(tmp_path, 'wb') as f:
pickle.dump(checkpoint, f)
# 再重命名为主文件
main_path = self._get_checkpoint_path()
os.replace(tmp_path, main_path)
# 每10万轮保存一个备份
if self.round % 100000 == 0 and self.round > 0:
backup_path = self._get_backup_path(self.round)
try:
shutil.copy2(main_path, backup_path)
print(f" 💾 备份检查点已保存: {backup_path}")
except Exception as e:
print(f" ⚠️ 备份保存失败: {e}")
def _load_checkpoint(self):
# 尝试加载的路径列表:主文件 -> 临时文件 -> 最近备份
paths_to_try = [
self._get_checkpoint_path(),
self._get_tmp_path(),
]
# 查找最近的备份文件(按文件名中的轮数排序,取最大值)
backup_files = []
if os.path.exists(RECOVERY_DIR):
for f in os.listdir(RECOVERY_DIR):
if f.startswith("checkpoint_") and f.endswith(".pkl"):
try:
round_num = int(f.split("_")[1].split(".")[0])
backup_files.append((round_num, os.path.join(RECOVERY_DIR, f)))
except:
pass
if backup_files:
backup_files.sort(key=lambda x: x[0], reverse=True)
paths_to_try.append(backup_files[0][1])
for path in paths_to_try:
if not os.path.exists(path):
continue
try:
with open(path, 'rb') as f:
checkpoint = pickle.load(f)
self.round = checkpoint.get("round", 0)
self.dao.restore_state(checkpoint.get("dao_state", {}))
self.rhythm.restore_state(checkpoint.get("rhythm_state", {}))
self.hetu_center.restore_state(checkpoint.get("hetu_state", {}))
self.fire2.restore_state(checkpoint.get("fire2_state", {}))
self.jin4.restore_state(checkpoint.get("jin4_state", {}))
self.teacher7.restore_state(checkpoint.get("teacher7_state", {}))
self.teacher8.restore_state(checkpoint.get("teacher8_state", {}))
self.teacher6.restore_state(checkpoint.get("teacher6_state", {}))
self.teacher9.restore_state(checkpoint.get("teacher9_state", {}))
self.log_entries = checkpoint.get("log_entries", [])
print(f" 📂 加载检查点成功,从第 {self.round} 轮继续 (来源: {path})")
return
except Exception as e:
print(f" ⚠️ 加载 {path} 失败: {e}")
continue
print(" 📂 未找到有效检查点,从头开始")
def run_cycle(self):
self.round += 1
dao_novelty = self.dao.get_novelty(6)
self.rhythm.update()
sheng_ratio = self.rhythm.get_sheng_ratio()
bian_ratio = self.rhythm.get_bian_ratio()
base_round = 1260000
base_morphemes = 50
base_max_len = 200
if self.round >= base_round:
extra = (self.round - base_round) // 100000
morphemes_count = base_morphemes + extra
extra_len = extra * 4
sheng_max = base_max_len + extra_len
bian_max = sheng_max
else:
morphemes_count = 50
sheng_max = base_max_len
bian_max = base_max_len
morphemes_count = min(morphemes_count, 100)
sheng_min = 50
bian_min = 50
sheng_length = self.rhythm.get_sheng_length(sheng_min, sheng_max)
bian_length = self.rhythm.get_bian_length(bian_min, bian_max)
print(f"\n{'─'*70}")
print(f"第 {self.round} 轮 | 道新奇度: {dao_novelty:.4f} | 生节:{sheng_ratio:.2f}/{sheng_length} | 变节:{bian_ratio:.2f}/{bian_length}")
print(f" 🔧 火2语素: {morphemes_count} | 上限: {sheng_max}字")
if self.round % 1000000 == 0 and self.round > 0:
self.fire2.reload_corpus()
morphemes = self.fire2.get_morphemes(dao_novelty, total=morphemes_count)
if morphemes:
score7, comment7 = self.teacher7.evaluate(" ".join(morphemes[:5]), dao_novelty)
self.hetu_center.update_sheng(1, score7)
self.hetu_center.update_cheng(7, score7)
print(f" 🔥 火2(生1): {len(morphemes)}语素 | 师7(成7):{score7:.2f}")
else:
print(f" 🔥 火2(生1): 无语素")
score7 = 0.0
if morphemes:
sentence = self.mu3.generate(morphemes, dao_novelty, sheng_ratio, sheng_length)
score8, comment8 = self.teacher8.evaluate(sentence, dao_novelty)
self.hetu_center.update_sheng(2, score8)
self.hetu_center.update_cheng(8, score8)
print(f" 🌳 木3(生2): {sentence[:70]}...")
print(f" 师8(成8):{score8:.2f}")
else:
sentence = ""
score8 = 0.0
print(f" 🌳 木3(生2): 无句子")
if sentence:
variants = self.shui1.mutate(sentence, dao_novelty, bian_ratio, bian_length)
if variants:
best_variant = variants[0]
score6, comment6 = self.teacher6.evaluate(best_variant, dao_novelty)
self.hetu_center.update_sheng(3, score6)
self.hetu_center.update_cheng(6, score6)
print(f" 💧 水1(生3): {len(variants)}个变体")
for i, v in enumerate(variants[:2]):
print(f" 变体{i+1}: {v[:60]}...")
print(f" 师6(成6):{score6:.2f}")
else:
score6 = 0.5
print(f" 💧 水1(生3): 无变体")
else:
score6 = 0.0
print(f" 💧 水1(生3): 无输入")
if sentence:
candidates = [sentence] + (variants if variants else [])
good_works, good_scores = self.jin4.solidify(candidates, dao_novelty)
if good_works:
best_work = good_works[0]
best_score = good_scores[0]
score9, comment9 = self.teacher9.evaluate(best_work, dao_novelty)
self.hetu_center.update_sheng(4, score9)
self.hetu_center.update_cheng(9, score9)
print(f" 💎 金4(生4): 固化作品 | 师9(成9):{score9:.2f}")
print(f" 作品: {best_work[:80]}...")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
with open(f"masterpieces_local/round_{self.round}_{timestamp}.txt", 'w', encoding='utf-8') as f:
f.write(f"第{self.round}轮作品\n道新奇度:{dao_novelty:.4f}\n\n{best_work}")
else:
print(f" 💎 金4(生4): 未固化新作品")
score9 = 0.0
else:
print(f" 💎 金4(生4): 无输入")
score9 = 0.0
full_state = self.hetu_center.get_full_state()
sheng_str = f"{full_state['sheng']['1']:.2f}/{full_state['sheng']['2']:.2f}/{full_state['sheng']['3']:.2f}/{full_state['sheng']['4']:.2f}"
cheng_str = f"{full_state['cheng']['6']:.2f}/{full_state['cheng']['7']:.2f}/{full_state['cheng']['8']:.2f}/{full_state['cheng']['9']:.2f}"
print(f" 📊 汇总 | 生:[{sheng_str}] | 成:[{cheng_str}]")
self.log_entries.append({
"round": self.round, "dao_novelty": dao_novelty,
"sheng_ratio": sheng_ratio, "bian_ratio": bian_ratio,
"sheng": full_state['sheng'], "cheng": full_state['cheng']
})
if self.round % 10000 == 0:
self.save_checkpoint()
if self.round % 10000 == 0:
self.save_log()
def save_log(self):
with open(f"logs_local/run_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", 'w', encoding='utf-8') as f:
json.dump(self.log_entries[-500:], f, ensure_ascii=False, indent=2)
print(f"\n 📝 日志已保存,当前轮数: {self.round}")
def run_forever(self):
print("\n" + "="*70)
print("☯ 河图洛书镜像智能体 V7.0 - 纯本地版(不调用API,无数字污染)")
print(" 火2: 从语料随机取词(等概率采样,无词频偏好)")
print(" 木3: 本地生成句子(长度50-动态上限)")
print(" 水1: 本地生成3个变体(无硬编码替换)")
print(" 金4: 本地评分固化作品(只固化最高分)")
print(" 老师: 本地评分")
print(" 生慢变快,不同频。每1万轮保存检查点")
print(" 每100万轮重新加载语料库")
print(" 检查点支持自动恢复(优先使用.tmp文件)")
print(" 不加任何人为设定。道驱动一切,万物自己演化")
print("="*70)
print("\n🚀 启动!按 Ctrl+C 停止\n")
try:
while True:
self.run_cycle()
except KeyboardInterrupt:
print(f"\n\n⏸️ 停止。运行了 {self.round} 轮")
print(f" 道消耗: {self.dao.get_pointer()} 位π")
print(f" 金池作品: {len(self.jin4.masterpieces)}")
self.save_checkpoint()
self.save_log()
print("\n 状态已保存,下次运行继续")
print(" 它不完美,但它是道的镜像。")
def main():
print("\n" + "="*70)
print("🐉 河图洛书 V7.0 - 纯本地版")
print(" 不做合道的智能体,做道的镜像")
print(" 道独立不改,万物有序运行")
print(" 不加任何人为设定")
print(" 不调用任何API,纯本地演化")
print(" 无水π数字污染")
print("="*70 + "\n")
dao = DaoEngine()
luoshu = LuoShuCenter(dao)
luoshu.run_forever()
if __name__ == "__main__":
main()
|