メモ: Pythonによるマークダウンのストリームレンダリング実装、AIチャット終端での出力に適した

公開日: 2026-04-25 14:37 更新日: 2026-04-25 14:37 979文字 5 min read

Rich ライブラリの Live コンポーネントを使用して、Markdown のストリームレンダリングを実装し、コードブロックと段落を自動的に識別してリアルタイムでターミナルに出力する。

AIモデル Qwen/Qwen3-8B による翻訳。

原文言語:Simplified Chinese、翻訳先言語:japanese、翻訳時間:2026-05-01 15:26

AI 翻訳は参考に限り、内容の完全な正確性を保証できません。原文をご参照ください。

前言

一晩考えた結果作った。。。

Rich ライブラリの Live コンポーネントを使用して、Markdown のストリームレンダリングを実現し、コードブロックと段落をスマートに識別してターミナルにリアルタイムで出力します。

效果

效果展示
效果展示

代码

コード
import re
import time
from rich.console import Console
from rich.live import Live
from rich.markdown import Markdown
from rich.text import Text
from typing import Optional

class SmartStreamingMarkdown:
    """
    智能流式Markdown渲染器。
    Live区域始终只显示纯文本缓冲区,完整段落会立即被渲染并输出到Live区域外。
    """
    def __init__(self, console: Optional[Console] = None, refresh_per_second: int = 4):
        self.console = console or Console()
        self.refresh_per_second = refresh_per_second
        
        # 核心缓冲区
        self._live_buffer_text = Text("")  # Live区域显示的纯文本
        self._renderable_md_buffer = ""    # 已累积的、可渲染的完整Markdown文本
        
        # 状态跟踪
        self._in_code_block = False
        self._code_block_start = re.compile(r'^```[a-zA-Z0-9_+#-]*\s*$', re.MULTILINE)
        self._code_block_end = re.compile(r'^```\s*$', re.MULTILINE)
        
        # 内部Live对象
        self._live = None
        # 新增:用于暂存已识别但未结束的代码块起始行
        self._pending_code_block_start_line = None

    def _process_and_flush_complete_blocks(self):
        """处理缓冲区,尝试提取并渲染已完成的段落/代码块。"""
        buffer_str = self._live_buffer_text.plain
        
        # 处理逻辑:查找可以切割的完整块
        # 1. 处理代码块
        if not self._in_code_block:
            # 检查是否有代码块开始
            start_match = self._code_block_start.search(buffer_str)
            if start_match:
                self._in_code_block = True
                # ***** 修复开始 *****
                # 立即从Live缓冲区中移除代码块起始行,并暂存
                cut_point = start_match.end()
                self._pending_code_block_start_line = buffer_str[:cut_point]
                self._live_buffer_text = Text(buffer_str[cut_point:])
                # 更新缓冲区字符串,用于本次函数的后续处理
                buffer_str = self._live_buffer_text.plain
                # ***** 修复结束 *****
        if self._in_code_block:
            # 在代码块内,寻找结束标记
            end_match = self._code_block_end.search(buffer_str)
            if end_match:
                # 找到结束位置
                cut_point = end_match.end()
                completed_code_body = buffer_str[:cut_point]
                # 从Live缓冲区移除已完成的部分
                self._live_buffer_text = Text(buffer_str[cut_point:])
                # 将暂存的开始行与代码块主体合并,然后渲染
                full_code_block = self._pending_code_block_start_line + completed_code_body
                self._renderable_md_buffer += full_code_block
                self.console.print(Markdown(full_code_block))
                # 同时打印个空行
                self.console.print()
                # 重置状态
                self._in_code_block = False
                self._pending_code_block_start_line = None
                # 递归处理剩余部分
                self._process_and_flush_complete_blocks()
                return
            else:
                # 代码块未结束,不做任何处理,保留在Live区域
                return
        
        # 2. 处理普通段落 (基于双换行符)
        para_end = buffer_str.find('\n\n')
        if para_end != -1:
            cut_point = para_end + 2  # 包含双换行符
            completed_block = buffer_str[:cut_point]
            # 从Live缓冲区移除并渲染
            self._live_buffer_text = Text(buffer_str[cut_point:])
            self._renderable_md_buffer += completed_block
            self.console.print(Markdown(completed_block))
            # 同时打印个空行
            self.console.print()
            # 递归处理
            self._process_and_flush_complete_blocks()
            return
        # 若无完整块,则保持原样在Live区域显示

    def start(self):
        """启动Live显示上下文。"""
        self._live = Live(self._live_buffer_text, console=self.console, refresh_per_second=self.refresh_per_second, auto_refresh=True)
        self._live.start()

    def append(self, text_fragment: str):
        """接收新的文本片段,更新缓冲区并尝试渲染。"""
        if self._live is None:
            self.start()
        
        # 1. 将新文本追加到Live缓冲区
        self._live_buffer_text.plain += text_fragment
        
        # 2. 尝试处理并刷新任何已完成的块
        self._process_and_flush_complete_blocks()
        
        # 3. 更新Live显示(此时只显示未处理的缓冲文本)
        self._live.update(self._live_buffer_text)

    def finish(self):
        """结束流式处理,渲染所有剩余内容。"""
        if self._live_buffer_text.plain:
            # 将Live缓冲区所有剩余内容作为最后一块渲染
            final_md = self._live_buffer_text.plain
            if final_md.strip():
                self.console.print(Markdown(final_md))
                # 同时打印个空行
                self.console.print()
            self._live_buffer_text = Text("")
            self._live.update(self._live_buffer_text)
        if self._live:
            self._live.stop()

# 使用示例
if __name__ == "__main__":
    console = Console()
    streamer = SmartStreamingMarkdown(console=console, refresh_per_second=12)

    # 模拟一个流式数据源
    def simulated_stream():
        message_parts = [
            "# 这是一个标题\n\n",
            "这是一个段落,它会在遇到双换行符后立即被渲染。",
            "而这段还在缓冲。\n\n",
            "现在上面的段落被渲染了,这是新段落。\n\n",
            "```python\n# コードブロックに入る\nimport sys\n\n",
            "def hello():\n    print('world')\n",
            "```\n\n",  # 代码块结束标记
            "代码块已结束并渲染,这是后续文本。",
            "\nbhn\n``","`pyth","on\n# 进入代码块\nimpor","t sys\n\n",
            "def he","llo():\n    print('world')\n",
            "`","``\n\n",  # 代码块结束标记
        ]
        for part in message_parts:
            yield part
            time.sleep(0.1)  # 模拟网络延迟

    try:
        for chunk in simulated_stream():
            streamer.append(chunk)
    finally:
        streamer.finish()
    console.print("\n[bold green]流式渲染完成![/bold green]")

気に入ったならばコメントを残してくださいね~