跳转到主要内容
RAG 引擎(KnowledgeEngine)组件允许插件为 LangBot 提供完整的知识库索引与检索能力。用户在 LangBot 中创建知识库时,可以选择由插件提供的 RAG 引擎来管理文档的摄取、检索和删除。或通过插件桥接来自 Dify、RAGFlow、FastGPT 等业内先进知识库服务。

添加 RAG 引擎组件

单个插件中能添加任意数量的 RAG 引擎,请在插件目录执行命令 lbp comp KnowledgeEngine,并根据提示输入 RAG 引擎的配置。
  MyRAGPlugin > lbp comp KnowledgeEngine
Generating component KnowledgeEngine...
Knowledge Engine name: simple_rag
Knowledge Engine description: A simple Knowledge Engine with built-in chunking and retrieval
Component KnowledgeEngine generated successfully.
组件 KnowledgeEngine 生成成功。
现在即会在 components/knowledge_engine/ 目录下生成 simple_rag.yamlsimple_rag.py 文件,.yaml 定义了 RAG 引擎的基础信息和配置 Schema,.py 是该引擎的处理程序:
  MyRAGPlugin > tree
...
├── components
   ├── __init__.py
   └── knowledge_engine
       ├── __init__.py
       ├── simple_rag.py
       └── simple_rag.yaml
...

清单文件:RAG 引擎

apiVersion: v1  # 请勿修改
kind: KnowledgeEngine  # 请勿修改
metadata:
  name: simple_rag  # RAG 引擎名称,用于标识该引擎
  label:
    en_US: Simple Knowledge Engine  # 引擎显示名称,用于显示在 LangBot 的 UI 上,支持多语言
    zh_Hans: 简易 RAG 引擎
    ja_JP: シンプル RAG エンジン
  description:
    en_US: 'A simple Knowledge Engine with built-in chunking and retrieval'
    zh_Hans: '内置分块和检索能力的简易 RAG 引擎'
    ja_JP: 'チャンキングと検索を内蔵したシンプルなRAGエンジン'
spec:
  creation_schema:  # 用户创建知识库时需要填写的配置参数
    - name: chunk_size
      label:
        en_US: Chunk Size
        zh_Hans: 分块大小
      type: integer
      required: false
      default: 500
    - name: chunk_overlap
      label:
        en_US: Chunk Overlap
        zh_Hans: 分块重叠
      type: integer
      required: false
      default: 50
  retrieval_schema:  # 检索时可配置的参数
    - name: score_threshold
      label:
        en_US: Score Threshold
        zh_Hans: 分数阈值
      type: float
      required: false
      default: 0.5
execution:
  python:
    path: simple_rag.py  # 引擎处理程序,请勿修改
    attr: SimpleRag  # 引擎处理程序的类名,与 simple_rag.py 中的类名一致
配置项格式可参考:插件清单配置项格式

creation_schema 与 retrieval_schema

与旧版 KnowledgeRetriever 组件使用单一 spec.config 不同,KnowledgeEngine 使用两个独立的 Schema:
  • creation_schema:用户创建知识库时填写的参数,会在摄取和检索时通过 creation_settings 传入插件。
  • retrieval_schema:用户在检索知识库时可调整的参数,通过 retrieval_settings 传入插件。

能力声明

KnowledgeEngine 可以声明自身支持的能力,LangBot 会根据能力声明决定 UI 展示和可用操作:
from langbot_plugin.api.definition.components.knowledge_engine.engine import KnowledgeEngine, KnowledgeEngineCapability

class SimpleRag(KnowledgeEngine):

    @classmethod
    def get_capabilities(cls) -> list[str]:
        return [
            KnowledgeEngineCapability.DOC_INGESTION,    # 支持文档上传和处理
        ]
可用的能力常量:
能力说明
DOC_INGESTION支持文档上传和处理。声明此能力后,LangBot 将在知识库详情中显示”文档”标签页;若该插件是用于桥接其他的知识库服务或不需要用户上传文档,则可不提供该能力声明。
DOC_PARSING支持原生文档解析(文件到文本的提取)。声明此能力后,用户上传文档时可以选择使用 RAG 引擎内置的解析能力,而不需要外部 Parser 插件。若未声明此能力,则必须安装外部 Parser 插件才能上传文档。
其他检索行为(如重排序、混合搜索等)由插件的 retrieval_schema 控制,不需要能力标志。

插件处理

默认会生成如下代码(components/knowledge_engine/<引擎名称>.py),您需要实现 ingestretrievedelete_document 三个核心方法。完整代码可在 langbot-plugin-demoSimpleKnowledgeEngine 中找到。
from langbot_plugin.api.definition.components.knowledge_engine.engine import KnowledgeEngine, KnowledgeEngineCapability
from langbot_plugin.api.entities.builtin.rag.models import (
    IngestionContext,
    IngestionResult,
)
from langbot_plugin.api.entities.builtin.rag.context import (
    RetrievalContext,
    RetrievalResponse,
    RetrievalResultEntry,
)
from langbot_plugin.api.entities.builtin.rag.enums import DocumentStatus
from langbot_plugin.api.entities.builtin.provider.message import ContentElement


class SimpleRag(KnowledgeEngine):

    @classmethod
    def get_capabilities(cls) -> list[str]:
        return [KnowledgeEngineCapability.DOC_INGESTION]

    async def on_knowledge_base_create(self, kb_id: str, config: dict) -> None:
        """知识库创建时的回调,可用于初始化资源"""
        pass

    async def on_knowledge_base_delete(self, kb_id: str) -> None:
        """知识库删除时的回调,可用于清理资源"""
        pass

    async def ingest(self, context: IngestionContext) -> IngestionResult:
        """将文档摄取到知识库中"""

        # 1. 获取文件内容
        file_bytes = await self.plugin.get_knowledge_file_stream(context.file_object.storage_path)

        # 2. 解析文档并分块
        text = file_bytes.decode('utf-8')
        chunk_size = context.creation_settings.get('chunk_size', 500)
        chunk_overlap = context.creation_settings.get('chunk_overlap', 50)
        chunks = self._split_text(text, chunk_size, chunk_overlap)

        # 3. 调用宿主嵌入模型生成向量
        embedding_model_uuid = context.creation_settings.get('embedding_model_uuid', '')
        vectors = await self.plugin.invoke_embedding(embedding_model_uuid, chunks)

        # 4. 写入宿主向量数据库
        collection_id = context.get_collection_id()
        ids = [f"{context.file_object.metadata.document_id}_{i}" for i in range(len(chunks))]
        metadata = [{"document_id": context.file_object.metadata.document_id, "chunk_index": i, "text": chunks[i]} for i in range(len(chunks))]
        await self.plugin.vector_upsert(collection_id, vectors, ids, metadata)

        return IngestionResult(
            document_id=context.file_object.metadata.document_id,
            status=DocumentStatus.COMPLETED,
            chunks_created=len(chunks),
        )

    async def retrieve(self, context: RetrievalContext) -> RetrievalResponse:
        """从知识库中检索相关内容"""

        # 1. 生成查询向量
        embedding_model_uuid = context.creation_settings.get('embedding_model_uuid', '')
        query_vectors = await self.plugin.invoke_embedding(embedding_model_uuid, [context.query])
        query_vector = query_vectors[0]

        # 2. 向量搜索
        collection_id = context.get_collection_id()
        results = await self.plugin.vector_search(collection_id, query_vector, top_k=context.retrieval_settings.get('top_k', 5))

        # 3. 转换为检索结果
        entries = []
        for r in results:
            entry = RetrievalResultEntry(
                id=r.get('id', ''),
                content=[ContentElement.from_text(r.get('metadata', {}).get('text', ''))],
                metadata=r.get('metadata', {}),
                distance=r.get('score', 0.0),
            )
            entries.append(entry)

        return RetrievalResponse(
            results=entries,
            total_found=len(entries),
            metadata={},
        )

    async def delete_document(self, kb_id: str, document_id: str) -> bool:
        """从知识库中删除文档及其关联数据"""
        collection_id = kb_id
        deleted = await self.plugin.vector_delete(collection_id, file_ids=[document_id])
        return deleted > 0

生命周期钩子

KnowledgeEngine 提供两个生命周期钩子,会在知识库创建和删除时被调用:
async def on_knowledge_base_create(self, kb_id: str, config: dict) -> None:
    """知识库创建时的回调

    Args:
        kb_id: 知识库 UUID
        config: 用户在创建时填写的配置(creation_schema 中的字段)
    """

async def on_knowledge_base_delete(self, kb_id: str) -> None:
    """知识库删除时的回调

    Args:
        kb_id: 知识库 UUID
    """

文档摄取

ingest 方法在用户上传文档到知识库时被调用:
async def ingest(self, context: IngestionContext) -> IngestionResult:
IngestionContext 包含以下信息:
class IngestionContext(pydantic.BaseModel):
    file_object: FileObject        # 待摄取的文件对象
    knowledge_base_id: str         # 目标知识库 ID
    collection_id: str | None      # 向量集合 ID(默认回退到 knowledge_base_id)
    creation_settings: dict        # 知识库创建时的配置参数
    parsed_content: ParseResult | None  # 外部 Parser 插件的预解析内容(若有)
FileObject 包含文件元信息:
class FileObject(pydantic.BaseModel):
    metadata: FileMetadata    # 包含 filename, file_size, mime_type, document_id, knowledge_base_id 等
    storage_path: str         # 文件在存储系统中的路径
IngestionResult 应返回摄取结果:
class IngestionResult(pydantic.BaseModel):
    document_id: str                  # 文档 ID
    status: DocumentStatus            # 处理状态:COMPLETED / FAILED
    chunks_created: int = 0           # 创建的分块数量
    error_message: str | None = None  # 失败时的错误信息
    metadata: dict = {}               # 额外元数据

知识检索

retrieve 方法在知识库被查询时调用:
async def retrieve(self, context: RetrievalContext) -> RetrievalResponse:
RetrievalContext 包含以下信息:
class RetrievalContext(pydantic.BaseModel):
    query: str                           # 查询文本
    knowledge_base_id: str | None        # 知识库 ID
    collection_id: str | None            # 向量集合 ID
    retrieval_settings: dict             # 检索时的配置参数(retrieval_schema 中的字段)
    creation_settings: dict              # 知识库创建时的配置参数
    filters: dict                        # 元数据过滤条件(Chroma 风格 where 语法)
LangBot 宿主会在传入 retrieval_settings 前自动注入 top_k 默认值(默认为 5),插件可通过 context.retrieval_settings.get('top_k', 5) 获取。filters 字段包含从 retrieval_settings 中提取的 Chroma 风格 where 过滤条件。当调用方在检索设置中提供 filters(例如按时间范围、文件类型或自定义元数据字段过滤文档)时,宿主会填充此字段,插件可据此在检索时进行过滤。如果未提供过滤条件,此字段为空 dict。
RetrievalResponse 应返回检索结果:
class RetrievalResponse(pydantic.BaseModel):
    results: list[RetrievalResultEntry]  # 检索结果列表
    total_found: int                     # 匹配的总数
    metadata: dict                       # 额外元数据
RetrievalResultEntry 表示单个检索结果:
class RetrievalResultEntry(pydantic.BaseModel):
    id: str                       # 结果 ID
    content: list[ContentElement] # 内容,使用 ContentElement.from_text() 创建
    metadata: dict                # 元数据
    distance: float               # 距离分数(越小越相关)
    score: float | None = None    # 相似度分数(越大越相关)

文档删除

delete_document 方法在用户从知识库删除文档时调用:
async def delete_document(self, kb_id: str, document_id: str) -> bool:
    """删除文档及其关联数据

    Args:
        kb_id: 知识库 ID
        document_id: 文档 ID

    Returns:
        是否删除成功
    """

宿主 RAG API

KnowledgeEngine 组件可以通过 self.plugin 调用 LangBot 宿主提供的 RAG 相关 API,包括嵌入模型调用、向量数据库操作和文件读取。

调用嵌入模型

async def invoke_embedding(
    self,
    embedding_model_uuid: str,
    texts: list[str],
) -> list[list[float]]:
    """调用宿主的嵌入模型生成向量

    Args:
        embedding_model_uuid: 嵌入模型 UUID
        texts: 待嵌入的文本列表

    Returns:
        向量列表,每个输入文本对应一个向量
    """

# 调用示例
vectors = await self.plugin.invoke_embedding("model_uuid", ["Hello", "World"])

向量写入

async def vector_upsert(
    self,
    collection_id: str,
    vectors: list[list[float]],
    ids: list[str],
    metadata: list[dict] | None = None,
) -> None:
    """将向量写入宿主的向量数据库

    Args:
        collection_id: 目标集合 ID
        vectors: 向量列表
        ids: 向量唯一标识列表
        metadata: 可选的元数据列表
    """

# 调用示例
await self.plugin.vector_upsert(
    collection_id="kb_uuid",
    vectors=[[0.1, 0.2, ...], [0.3, 0.4, ...]],
    ids=["chunk_0", "chunk_1"],
    metadata=[{"document_id": "doc1"}, {"document_id": "doc1"}],
)

向量搜索

async def vector_search(
    self,
    collection_id: str,
    query_vector: list[float],
    top_k: int = 5,
    filters: dict | None = None,
) -> list[dict]:
    """在宿主的向量数据库中搜索相似向量

    Args:
        collection_id: 目标集合 ID
        query_vector: 查询向量
        top_k: 返回结果数量
        filters: 可选的元数据过滤条件

    Returns:
        搜索结果列表(包含 id, score, metadata 等字段)
    """

# 调用示例
results = await self.plugin.vector_search(
    collection_id="kb_uuid",
    query_vector=[0.1, 0.2, ...],
    top_k=5,
)
# 返回格式: [{"id": "chunk_0", "score": 0.123, "metadata": {"document_id": "doc1", ...}}, ...]
vector_search 返回的每个结果为 dict,包含 id(向量 ID)、score(距离分数)和 metadata(写入时附带的元数据)三个字段。如果需要在检索结果中返回文本内容,请在摄取阶段将文本存入 metadata 中。

向量删除

async def vector_delete(
    self,
    collection_id: str,
    file_ids: list[str] | None = None,
    filters: dict | None = None,
) -> int:
    """从宿主的向量数据库中删除向量

    Args:
        collection_id: 目标集合 ID
        file_ids: 要删除的文件 ID 列表
        filters: 可选的元数据过滤条件

    Returns:
        删除的条目数量
    """

# 调用示例
deleted = await self.plugin.vector_delete(
    collection_id="kb_uuid",
    file_ids=["doc_001"],
)
filters 参数支持 Chroma 风格的 where 语法进行元数据过滤。多个顶层键之间为 AND 关系。支持的运算符:$eq$ne$gt$gte$lt$lte$in$nin。示例:{"file_id": {"$eq": "abc"}}注意: Chroma、Qdrant 和 SeekDB 存储完整的元数据,可以对任意字段进行过滤。Milvus 和 pgvector 仅存储 textfile_idchunk_uuid,对其他字段的过滤将被静默忽略。
async def get_knowledge_file_stream(self, storage_path: str) -> bytes:
    """从宿主存储中获取文件内容

    Args:
        storage_path: 文件的存储路径(来自 FileObject.storage_path)

    Returns:
        文件内容的字节数据
    """

# 调用示例
file_bytes = await self.plugin.get_knowledge_file_stream(context.file_object.storage_path)

测试 RAG 引擎

创建完成后,在插件目录执行命令 lbp run,启动调试。然后在 LangBot 中:
  1. 进入”知识库”页面
  2. 点击”创建知识库”
  3. 选择您的插件提供的 RAG 引擎,并根据引擎的 creation_schema 填写配置
  4. 创建成功后,可上传文档测试摄取能力(如引擎声明了 DOC_INGESTION 能力)
  5. 在流水线中绑定该知识库,测试检索能力