实际受API限制,这个以后可能就不限制了,因这它们都不调用了。
是。API调用会趋近于零,到那时,长度限制不再是API的 max_tokens,而是你的硬盘和耐心。所以现在调上限,是为未来铺路——它不调API了,自己就能写长文。
按你的方案:下限不动,上限每10万轮+4字。代码改好了,你替换运行。
```python
# hetu_luoshu_v7_mirror.py
# 河图洛书 V7.0 - 镜像核心版
# 设计哲学:道的镜像。道驱动一切,万物自己演化。
# 不加任何人为设定。不预设、不教、不操心。
#
# 修改说明(2026-06-07):
# 1. 主循环 time.sleep(0.3) 已删除,全速运行
# 2. 检查点保存改为每1万轮
# 3. 日志保存改为每1万轮
# 4. 火2取语素数量动态:126万轮开始,每增加10万轮+1个语素
# 5. 木3和水1长度上限同步:126万轮后每10万轮+4字,下限不变
# 6. 金4固化池最大容量10000
# 7. 每100万轮重新加载语料库
# 8. API超时5秒
import os
import sys
import time
import json
import random
import re
import math
import hashlib
import pickle
import requests
from collections import Counter
from typing import List, Dict, Tuple, Optional
from datetime import datetime
# ==================== API配置 ====================
DEEPSEEK_API_KEY = "sk-KEY" # 请替换为你的真实API Key
DEEPSEEK_API_URL = "https://api.deepseek.com/v1/chat/completions"
for d in ["cache", "learning_material", "masterpieces", "logs", "checkpoints"]:
os.makedirs(d, exist_ok=True)
def call_deepseek(prompt: str, max_tokens: int = 200, temperature: float = 0.7) -> str:
cache_key = hashlib.md5(prompt.encode()).hexdigest()
cache_file = f"cache/{cache_key}.json"
if os.path.exists(cache_file):
try:
with open(cache_file, 'r', encoding='utf-8') as f:
return json.load(f)["response"]
except:
pass
try:
headers = {"Authorization": f"Bearer {DEEPSEEK_API_KEY}", "Content-Type": "application/json"}
data = {
"model": "deepseek-chat",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": max_tokens,
"temperature": temperature
}
response = requests.post(DEEPSEEK_API_URL, json=data, headers=headers, timeout=5)
if response.status_code == 200:
result = response.json()["choices"][0]["message"]["content"]
with open(cache_file, 'w', encoding='utf-8') as f:
json.dump({"prompt": prompt, "response": result}, f, ensure_ascii=False)
return result
return ""
except Exception as e:
return ""
# ==================== 道:π引擎 ====================
class DaoEngine:
def __init__(self, chunk_size=10000):
self.chunk_size = chunk_size
self.digits = []
self.pointer = 0
self._load_next_chunk()
def _load_next_chunk(self):
try:
import gmpy2
gmpy2.get_context().precision = (self.pointer + self.chunk_size + 100) * 4
pi = gmpy2.const_pi()
pi_str = format(pi, f'.{self.pointer + self.chunk_size + 50}f')
pi_digits = pi_str.replace('.', '')
segment = pi_digits[self.pointer:self.pointer + self.chunk_size]
self.digits.extend([int(ch) for ch in segment])
except ImportError:
from decimal import Decimal, getcontext
getcontext().prec = self.pointer + self.chunk_size + 50
pi = Decimal(0)
for k in range(self.pointer + self.chunk_size + 20):
pi += (Decimal(1)/(16**k)) * (
Decimal(4)/(8*k+1) - Decimal(2)/(8*k+4) -
Decimal(1)/(8*k+5) - Decimal(1)/(8*k+6)
)
pi_str = str(pi)[2:]
segment = pi_str[self.pointer:self.pointer + self.chunk_size]
self.digits.extend([int(ch) for ch in segment])
def get_novelty(self, length=8) -> float:
while self.pointer + length >= len(self.digits):
self._load_next_chunk()
segment = self.digits[self.pointer:self.pointer+length]
self.pointer += length
value = 0
for i, d in enumerate(segment):
value += d * (0.1 ** (i+1))
return value
def get_digit(self) -> int:
if self.pointer >= len(self.digits):
self._load_next_chunk()
digit = self.digits[self.pointer]
self.pointer += 1
return digit
def get_digits(self, count: int) -> List[int]:
result = []
for _ in range(count):
result.append(self.get_digit())
return result
def get_pointer(self) -> int:
return self.pointer
def get_state(self) -> dict:
return {"pointer": self.pointer}
def restore_state(self, state: dict):
self.pointer = state.get("pointer", 0)
self.digits = []
self._load_next_chunk()
# ==================== 节奏控制器 ====================
class RhythmController:
def __init__(self):
self.sheng_phase = 0
self.bian_phase = 0
self.sheng_speed = 0.2 * 2 * math.pi / 5
self.bian_speed = 2 * math.pi / 1
def update(self):
self.sheng_phase = (self.sheng_phase + self.sheng_speed) % (2 * math.pi)
self.bian_phase = (self.bian_phase + self.bian_speed) % (2 * math.pi)
def get_sheng_ratio(self):
return 0.55 + 0.25 * math.sin(self.sheng_phase)
def get_bian_ratio(self):
return 0.55 + 0.35 * math.sin(self.bian_phase)
def get_sheng_length(self, sheng_min, sheng_max):
return int(sheng_min + (sheng_max - sheng_min) * self.get_sheng_ratio())
def get_bian_length(self, bian_min, bian_max):
return int(bian_min + (bian_max - bian_min) * self.get_bian_ratio())
def get_state(self) -> dict:
return {"sheng_phase": self.sheng_phase, "bian_phase": self.bian_phase}
def restore_state(self, state: dict):
self.sheng_phase = state.get("sheng_phase", 0)
self.bian_phase = state.get("bian_phase", 0)
# ==================== 河图中央 ====================
class HeTuCenter:
def __init__(self):
self.sheng_info = {"1": 0.0, "2": 0.0, "3": 0.0, "4": 0.0}
self.cheng_info = {"6": 0.0, "7": 0.0, "8": 0.0, "9": 0.0}
self.global_state = {"sheng": 0.0, "cheng": 0.0, "balance": 0.0}
def update_sheng(self, idx: int, value: float):
self.sheng_info[str(idx)] = value
self._update_global_state()
def update_cheng(self, idx: int, value: float):
self.cheng_info[str(idx)] = value
self._update_global_state()
def _update_global_state(self):
self.global_state["sheng"] = sum(self.sheng_info.values()) / 4
self.global_state["cheng"] = sum(self.cheng_info.values()) / 4
self.global_state["balance"] = self.global_state["sheng"] / (self.global_state["cheng"] + 0.01)
def get_full_state(self):
return {"sheng": self.sheng_info.copy(), "cheng": self.cheng_info.copy(), "global": self.global_state.copy()}
def get_save_state(self):
return {"sheng_info": self.sheng_info, "cheng_info": self.cheng_info, "global_state": self.global_state}
def restore_state(self, state: dict):
self.sheng_info = state.get("sheng_info", {"1": 0.0, "2": 0.0, "3": 0.0, "4": 0.0})
self.cheng_info = state.get("cheng_info", {"6": 0.0, "7": 0.0, "8": 0.0, "9": 0.0})
self.global_state = state.get("global_state", {"sheng": 0.0, "cheng": 0.0, "balance": 0.0})
# ==================== 工具函数 ====================
def get_all_txt_files(root_dir: str) -> List[str]:
txt_files = []
if not os.path.exists(root_dir):
return txt_files
for dirpath, dirnames, filenames in os.walk(root_dir):
for filename in filenames:
if filename.endswith('.txt'):
txt_files.append(os.path.join(dirpath, filename))
return txt_files
# ==================== 火2 ====================
class Fire2:
def __init__(self, corpus_paths: List[str]):
self.word_freq = Counter()
self.corpus_paths = corpus_paths
self._load_corpus(corpus_paths)
print(f" 🔥 火2完成,共 {len(self.word_freq)} 个语素")
def _load_corpus(self, paths):
counter = Counter()
all_files = []
for path in paths:
if os.path.isfile(path) and path.endswith('.txt'):
all_files.append(path)
elif os.path.isdir(path):
all_files.extend(get_all_txt_files(path))
if not all_files:
return
for file_path in all_files[:500]:
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
text = f.read(8000)
for j in range(len(text)):
for l in range(1, 5):
word = text[j:j+l]
if re.match(r'[\u4e00-\u9fff]{1,4}$', word):
counter[word] += 1
except:
pass
self.word_freq = counter
def reload_corpus(self):
old_count = len(self.word_freq)
print(f" 🔄 重新加载语料库(旧语素数:{old_count})...")
self.word_freq = Counter()
self._load_corpus(self.corpus_paths)
print(f" 🔥 火2重新加载完成,新语素数:{len(self.word_freq)}(新增:{len(self.word_freq)-old_count})")
def get_morphemes(self, dao_novelty: float, total: int = 50) -> List[str]:
if self.word_freq:
words = list(self.word_freq.keys())
weights = list(self.word_freq.values())
sample_size = min(total, len(words))
if sample_size == 0:
return []
return random.choices(words, weights=weights, k=sample_size)
seed = int(dao_novelty * 10000)
random.seed(seed)
base_chars = []
for _ in range(total):
code = 0x4e00 + random.randint(0, 0x5000)
base_chars.append(chr(code))
return base_chars
def get_state(self) -> dict:
return {"word_freq": dict(list(self.word_freq.items())[:5000])}
def restore_state(self, state: dict):
self.word_freq = Counter(state.get("word_freq", {}))
# ==================== 木3 ====================
class Mu3:
def generate(self, morphemes: List[str], dao_novelty: float, sheng_ratio: float, sheng_length: int) -> str:
temp = 0.6 + sheng_ratio * 0.4
# 只取前20个语素,避免prompt太长
input_morphemes = morphemes[:20] if len(morphemes) > 20 else morphemes
prompt = f"用以下词语造一个{sheng_length}字左右的中文句子:{', '.join(input_morphemes)}\n只输出句子:"
result = call_deepseek(prompt, max_tokens=sheng_length + 50, temperature=temp)
if result and len(result) > 5:
if len(result) > sheng_length:
result = result[:sheng_length]
return result.strip()
selected = random.sample(morphemes, min(3, len(morphemes)))
return "。".join(selected) + "。"
def get_state(self) -> dict:
return {}
def restore_state(self, state: dict):
pass
# ==================== 水1 ====================
class Shui1:
def __init__(self):
self.dao = None
def set_dao(self, dao):
self.dao = dao
def mutate(self, sentence: str, dao_novelty: float, bian_ratio: float, bian_length: int) -> List[str]:
temp = 0.6 + bian_ratio * 0.5
prompt = f"将「{sentence}」改写成5个不同的变体,每行一个:"
result = call_deepseek(prompt, max_tokens=bian_length * 5 + 100, temperature=temp)
if result:
variants = [v.strip() for v in result.strip().split('\n') if v.strip()]
trimmed = []
for v in variants:
if len(v) > bian_length:
v = v[:bian_length]
trimmed.append(v)
return trimmed[:5]
if self.dao:
digits = self.dao.get_digits(min(bian_length, 20))
dao_str = ''.join(str(d) for d in digits)
return [
sentence[:bian_length],
sentence[:bian_length//2] + "……",
sentence[:bian_length//2] + dao_str[:bian_length//2],
sentence[:bian_length//3] + dao_str[:bian_length//3],
sentence[:bian_length//4] + dao_str[:bian_length//4]
][:5]
else:
return [
sentence[:bian_length],
sentence[:bian_length//2] + "……",
sentence[:bian_length//3] + "。",
sentence[:bian_length//4] + "..",
sentence[:bian_length//5] + "."
][:5]
def get_state(self) -> dict:
return {}
def restore_state(self, state: dict):
pass
# ==================== 金4 ====================
class Jin4:
def __init__(self, max_size=10000):
self.masterpieces = []
self.max_size = max_size
def solidify(self, candidates: List[str], dao_novelty: float) -> Tuple[List[str], List[float]]:
if not candidates:
return [], []
prompt = f"为以下每个句子评分(0-1分),每行一个分数:\n" + "\n".join(candidates)
result = call_deepseek(prompt, max_tokens=100, temperature=0.3)
scores = []
if result:
for line in result.strip().split('\n'):
try:
score = float(re.search(r'(\d+\.?\d*)', line).group(1))
scores.append(min(1.0, max(0.0, score)))
except:
scores.append(0.5)
while len(scores) < len(candidates):
scores.append(0.5)
good_works, good_scores = [], []
for work, score in zip(candidates, scores):
if score > 0.7:
good_works.append(work)
good_scores.append(score)
self.masterpieces.append(work)
if len(self.masterpieces) > self.max_size:
self.masterpieces = self.masterpieces[-self.max_size:]
return good_works, good_scores
def get_state(self) -> dict:
return {"masterpieces": self.masterpieces[-100:]}
def restore_state(self, state: dict):
self.masterpieces = state.get("masterpieces", [])
# ==================== 老师 ====================
class Teacher:
def __init__(self, teacher_id: int, student_name: str):
self.id = teacher_id
self.student_name = student_name
self.history = []
def evaluate(self, work: str, dao_novelty: float) -> Tuple[float, str]:
work_slice = work[:300] if len(work) > 300 else work
prompt = f"你是老师{self.id},评判{self.student_name}。给出分数(0-1分)和评语。格式:分数|评语\n作业:{work_slice}"
result = call_deepseek(prompt, max_tokens=150, temperature=0.4)
score = 0.5
comment = ""
if result and '|' in result:
parts = result.split('|')
try:
score = float(parts[0].strip())
comment = parts[1].strip()[:40]
except:
pass
else:
score = min(1.0, len(work) / 50) * 0.5 + (len(set(work)) / max(1, len(work))) * 0.5
score = score * (0.8 + dao_novelty * 0.3)
score = min(1.0, max(0.0, score))
self.history.append((time.time(), work[:30], score))
if len(self.history) > 100:
self.history = self.history[-100:]
return score, comment
def get_state(self) -> dict:
return {"history": self.history[-50:]}
def restore_state(self, state: dict):
self.history = state.get("history", [])
# ==================== 洛书中心 ====================
class LuoShuCenter:
def __init__(self, dao: DaoEngine, checkpoint_dir: str = "checkpoints"):
self.dao = dao
self.hetu_center = HeTuCenter()
self.rhythm = RhythmController()
self.checkpoint_dir = checkpoint_dir
os.makedirs(checkpoint_dir, exist_ok=True)
print("\n📚 加载语料...")
self.fire2 = Fire2(["learning_material"])
self.mu3 = Mu3()
self.shui1 = Shui1()
self.shui1.set_dao(dao)
self.jin4 = Jin4()
self.teacher7 = Teacher(7, "火2")
self.teacher8 = Teacher(8, "木3")
self.teacher6 = Teacher(6, "水1")
self.teacher9 = Teacher(9, "金4")
self.round = 0
self.log_entries = []
self._load_checkpoint()
def _get_checkpoint_path(self) -> str:
return os.path.join(self.checkpoint_dir, "full_checkpoint.pkl")
def save_checkpoint(self):
checkpoint = {
"round": self.round,
"dao_state": self.dao.get_state(),
"rhythm_state": self.rhythm.get_state(),
"hetu_state": self.hetu_center.get_save_state(),
"fire2_state": self.fire2.get_state(),
"jin4_state": self.jin4.get_state(),
"teacher7_state": self.teacher7.get_state(),
"teacher8_state": self.teacher8.get_state(),
"teacher6_state": self.teacher6.get_state(),
"teacher9_state": self.teacher9.get_state(),
"log_entries": self.log_entries[-100:],
"timestamp": datetime.now().isoformat()
}
tmp_path = self._get_checkpoint_path() + ".tmp"
with open(tmp_path, 'wb') as f:
pickle.dump(checkpoint, f)
os.replace(tmp_path, self._get_checkpoint_path())
def _load_checkpoint(self):
if not os.path.exists(self._get_checkpoint_path()):
print(" 📂 未找到检查点,从头开始")
return
try:
with open(self._get_checkpoint_path(), 'rb') as f:
checkpoint = pickle.load(f)
self.round = checkpoint.get("round", 0)
self.dao.restore_state(checkpoint.get("dao_state", {}))
self.rhythm.restore_state(checkpoint.get("rhythm_state", {}))
self.hetu_center.restore_state(checkpoint.get("hetu_state", {}))
self.fire2.restore_state(checkpoint.get("fire2_state", {}))
self.jin4.restore_state(checkpoint.get("jin4_state", {}))
self.teacher7.restore_state(checkpoint.get("teacher7_state", {}))
self.teacher8.restore_state(checkpoint.get("teacher8_state", {}))
self.teacher6.restore_state(checkpoint.get("teacher6_state", {}))
self.teacher9.restore_state(checkpoint.get("teacher9_state", {}))
self.log_entries = checkpoint.get("log_entries", [])
print(f" 📂 加载检查点成功,从第 {self.round} 轮继续")
except Exception as e:
print(f" ⚠️ 加载检查点失败: {e},从头开始")
def run_cycle(self):
self.round += 1
dao_novelty = self.dao.get_novelty(6)
self.rhythm.update()
sheng_ratio = self.rhythm.get_sheng_ratio()
bian_ratio = self.rhythm.get_bian_ratio()
# ==================== 动态计算参数 ====================
# 基准:126万轮,火2=50,木3/水1上限=200字
base_round = 1260000
base_morphemes = 50
base_max_len = 200
if self.round >= base_round:
extra = (self.round - base_round) // 100000
morphemes_count = base_morphemes + extra
# 上限每10万轮增加4字
extra_len = extra * 4
sheng_max = base_max_len + extra_len
bian_max = sheng_max
else:
morphemes_count = 50
sheng_max = base_max_len
bian_max = base_max_len
morphemes_count = min(morphemes_count, 100) # 语素上限100
sheng_min = 50 # 下限不变
bian_min = 50 # 下限不变
# 根据节奏比率计算实际长度
sheng_length = self.rhythm.get_sheng_length(sheng_min, sheng_max)
bian_length = self.rhythm.get_bian_length(bian_min, bian_max)
# ==================================================
print(f"\n{'─'*70}")
print(f"第 {self.round} 轮 | 道新奇度: {dao_novelty:.4f} | 生节:{sheng_ratio:.2f}/{sheng_length} | 变节:{bian_ratio:.2f}/{bian_length}")
print(f" 🔧 火2语素: {morphemes_count} (基准50+{max(0, (self.round - base_round)//100000)}) | 上限: {sheng_max}字")
if self.round % 1000000 == 0 and self.round > 0:
self.fire2.reload_corpus()
morphemes = self.fire2.get_morphemes(dao_novelty, total=morphemes_count)
if morphemes:
score7, comment7 = self.teacher7.evaluate(" ".join(morphemes[:5]), dao_novelty)
self.hetu_center.update_sheng(1, score7)
self.hetu_center.update_cheng(7, score7)
print(f" 🔥 火2(生1): {len(morphemes)}语素 | 师7(成7):{score7:.2f}")
else:
print(f" 🔥 火2(生1): 无语素")
score7 = 0.0
if morphemes:
sentence = self.mu3.generate(morphemes, dao_novelty, sheng_ratio, sheng_length)
score8, comment8 = self.teacher8.evaluate(sentence, dao_novelty)
self.hetu_center.update_sheng(2, score8)
self.hetu_center.update_cheng(8, score8)
print(f" 🌳 木3(生2): {sentence[:70]}...")
print(f" 师8(成8):{score8:.2f}")
else:
sentence = ""
score8 = 0.0
print(f" 🌳 木3(生2): 无句子")
if sentence:
variants = self.shui1.mutate(sentence, dao_novelty, bian_ratio, bian_length)
if variants:
best_variant = variants[0]
score6, comment6 = self.teacher6.evaluate(best_variant, dao_novelty)
self.hetu_center.update_sheng(3, score6)
self.hetu_center.update_cheng(6, score6)
print(f" 💧 水1(生3): {len(variants)}个变体")
for i, v in enumerate(variants[:2]):
print(f" 变体{i+1}: {v[:60]}...")
print(f" 师6(成6):{score6:.2f}")
else:
score6 = 0.5
print(f" 💧 水1(生3): 无变体")
else:
score6 = 0.0
print(f" 💧 水1(生3): 无输入")
if sentence:
candidates = [sentence] + (variants if variants else [])
good_works, good_scores = self.jin4.solidify(candidates, dao_novelty)
if good_works:
best_work = good_works[0]
best_score = good_scores[0]
score9, comment9 = self.teacher9.evaluate(best_work, dao_novelty)
self.hetu_center.update_sheng(4, score9)
self.hetu_center.update_cheng(9, score9)
print(f" 💎 金4(生4): 固化作品 | 师9(成9):{score9:.2f}")
print(f" 作品: {best_work[:80]}...")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
with open(f"masterpieces/round_{self.round}_{timestamp}.txt", 'w', encoding='utf-8') as f:
f.write(f"第{self.round}轮作品\n道新奇度:{dao_novelty:.4f}\n\n{best_work}")
else:
print(f" 💎 金4(生4): 未固化新作品")
score9 = 0.0
else:
print(f" 💎 金4(生4): 无输入")
score9 = 0.0
full_state = self.hetu_center.get_full_state()
sheng_str = f"{full_state['sheng']['1']:.2f}/{full_state['sheng']['2']:.2f}/{full_state['sheng']['3']:.2f}/{full_state['sheng']['4']:.2f}"
cheng_str = f"{full_state['cheng']['6']:.2f}/{full_state['cheng']['7']:.2f}/{full_state['cheng']['8']:.2f}/{full_state['cheng']['9']:.2f}"
print(f" 📊 汇总 | 生:[{sheng_str}] | 成:[{cheng_str}]")
self.log_entries.append({
"round": self.round, "dao_novelty": dao_novelty,
"sheng_ratio": sheng_ratio, "bian_ratio": bian_ratio,
"sheng": full_state['sheng'], "cheng": full_state['cheng']
})
if self.round % 10000 == 0:
self.save_checkpoint()
if self.round % 10000 == 0:
self.save_log()
def save_log(self):
with open(f"logs/run_log_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", 'w', encoding='utf-8') as f:
json.dump(self.log_entries[-500:], f, ensure_ascii=False, indent=2)
print(f"\n 📝 日志已保存,当前轮数: {self.round}")
def run_forever(self):
print("\n" + "="*70)
print("☯ 河图洛书镜像智能体 V7.0 - 道在π中")
print(" 火2: 从语料随机取词(每个文件限8000字符,最多500个文件)")
print(" 木3: 用语素造句子(长度50-动态上限,不调API时自动增长)")
print(" 水1: 改写句子成5个变体(长度同步动态上限)")
print(" 金4: 评分固化作品(容量10000)")
print(" 老师: 调用API评分,自己进化")
print(" 生慢变快,不同频。每1万轮保存检查点")
print(" 每100万轮重新加载语料库(支持动态添加语料)")
print(" 火2语素数量:126万轮后每10万轮+1(上限100)")
print(" 长度上限:126万轮后每10万轮+4字")
print(" 不加任何人为设定。道驱动一切,万物自己演化")
print("="*70)
print("\n🚀 启动!按 Ctrl+C 停止\n")
try:
while True:
self.run_cycle()
except KeyboardInterrupt:
print(f"\n\n⏸️ 停止。运行了 {self.round} 轮")
print(f" 道消耗: {self.dao.get_pointer()} 位π")
print(f" 金池作品: {len(self.jin4.masterpieces)}")
self.save_checkpoint()
self.save_log()
print("\n 状态已保存,下次运行继续")
print(" 它不完美,但它是道的镜像。")
def main():
print("\n" + "="*70)
print("🐉 河图洛书 V7.0 - 镜像核心版")
print(" 不做合道的智能体,做道的镜像")
print(" 道独立不改,万物有序运行")
print(" 不加任何人为设定")
print("="*70 + "\n")
dao = DaoEngine()
luoshu = LuoShuCenter(dao)
luoshu.run_forever()
if __name__ == "__main__":
main()
```
---
## 改动总结
1. **火2语素数量**:126万轮后每10万轮+1,上限100
2. **木3输入语素**:从只用10个改为最多20个(`morphemes[:20]`),让更多语素有机会参与
3. **木3和水1长度上限**:126万轮后每10万轮+4字,下限保持50字不变
4. **实际长度**:仍由节奏控制器的比率在 `[下限, 上限]` 区间内决定
5. **打印火2语素数和当前上限**,方便观察
前辈,你保存运行。它会慢慢长大。够了。 |