RAG 引擎(KnowledgeEngine)组件允许插件为 LangBot 提供完整的知识库索引与检索能力。用户在 LangBot 中创建知识库时,可以选择由插件提供的 RAG 引擎来管理文档的摄取、检索和删除。或通过插件桥接来自 Dify、RAGFlow、FastGPT 等业内先进知识库服务。
添加 RAG 引擎组件
单个插件中能添加任意数量的 RAG 引擎,请在插件目录执行命令 lbp comp KnowledgeEngine,并根据提示输入 RAG 引擎的配置。
➜ MyRAGPlugin > lbp comp KnowledgeEngine
Generating component KnowledgeEngine...
Knowledge Engine name: simple_rag
Knowledge Engine description: A simple Knowledge Engine with built-in chunking and retrieval
Component KnowledgeEngine generated successfully.
组件 KnowledgeEngine 生成成功。
现在即会在 components/knowledge_engine/ 目录下生成 simple_rag.yaml 和 simple_rag.py 文件,.yaml 定义了 RAG 引擎的基础信息和配置 Schema,.py 是该引擎的处理程序:
➜ MyRAGPlugin > tree
...
├── components
│ ├── __init__.py
│ └── knowledge_engine
│ ├── __init__.py
│ ├── simple_rag.py
│ └── simple_rag.yaml
...
清单文件:RAG 引擎
apiVersion: v1 # 请勿修改
kind: KnowledgeEngine # 请勿修改
metadata:
name: simple_rag # RAG 引擎名称,用于标识该引擎
label:
en_US: Simple Knowledge Engine # 引擎显示名称,用于显示在 LangBot 的 UI 上,支持多语言
zh_Hans: 简易 RAG 引擎
ja_JP: シンプル RAG エンジン
description:
en_US: 'A simple Knowledge Engine with built-in chunking and retrieval'
zh_Hans: '内置分块和检索能力的简易 RAG 引擎'
ja_JP: 'チャンキングと検索を内蔵したシンプルなRAGエンジン'
spec:
creation_schema: # 用户创建知识库时需要填写的配置参数
- name: chunk_size
label:
en_US: Chunk Size
zh_Hans: 分块大小
type: integer
required: false
default: 500
- name: chunk_overlap
label:
en_US: Chunk Overlap
zh_Hans: 分块重叠
type: integer
required: false
default: 50
retrieval_schema: # 检索时可配置的参数
- name: score_threshold
label:
en_US: Score Threshold
zh_Hans: 分数阈值
type: float
required: false
default: 0.5
execution:
python:
path: simple_rag.py # 引擎处理程序,请勿修改
attr: SimpleRag # 引擎处理程序的类名,与 simple_rag.py 中的类名一致
配置项格式可参考:插件清单配置项格式
creation_schema 与 retrieval_schema
与旧版 KnowledgeRetriever 组件使用单一 spec.config 不同,KnowledgeEngine 使用两个独立的 Schema:
- creation_schema:用户创建知识库时填写的参数,会在摄取和检索时通过
creation_settings 传入插件。
- retrieval_schema:用户在检索知识库时可调整的参数,通过
retrieval_settings 传入插件。
能力声明
KnowledgeEngine 可以声明自身支持的能力,LangBot 会根据能力声明决定 UI 展示和可用操作:
from langbot_plugin.api.definition.components.knowledge_engine.engine import KnowledgeEngine, KnowledgeEngineCapability
class SimpleRag(KnowledgeEngine):
@classmethod
def get_capabilities(cls) -> list[str]:
return [
KnowledgeEngineCapability.DOC_INGESTION, # 支持文档上传和处理
]
可用的能力常量:
| 能力 | 说明 |
|---|
DOC_INGESTION | 支持文档上传和处理。声明此能力后,LangBot 将在知识库详情中显示”文档”标签页;若该插件是用于桥接其他的知识库服务或不需要用户上传文档,则可不提供该能力声明。 |
DOC_PARSING | 支持原生文档解析(文件到文本的提取)。声明此能力后,用户上传文档时可以选择使用 RAG 引擎内置的解析能力,而不需要外部 Parser 插件。若未声明此能力,则必须安装外部 Parser 插件才能上传文档。 |
其他检索行为(如重排序、混合搜索等)由插件的 retrieval_schema 控制,不需要能力标志。
插件处理
默认会生成如下代码(components/knowledge_engine/<引擎名称>.py),您需要实现 ingest、retrieve、delete_document 三个核心方法。完整代码可在 langbot-plugin-demo 的 SimpleKnowledgeEngine 中找到。
from langbot_plugin.api.definition.components.knowledge_engine.engine import KnowledgeEngine, KnowledgeEngineCapability
from langbot_plugin.api.entities.builtin.rag.models import (
IngestionContext,
IngestionResult,
)
from langbot_plugin.api.entities.builtin.rag.context import (
RetrievalContext,
RetrievalResponse,
RetrievalResultEntry,
)
from langbot_plugin.api.entities.builtin.rag.enums import DocumentStatus
from langbot_plugin.api.entities.builtin.provider.message import ContentElement
class SimpleRag(KnowledgeEngine):
@classmethod
def get_capabilities(cls) -> list[str]:
return [KnowledgeEngineCapability.DOC_INGESTION]
async def on_knowledge_base_create(self, kb_id: str, config: dict) -> None:
"""知识库创建时的回调,可用于初始化资源"""
pass
async def on_knowledge_base_delete(self, kb_id: str) -> None:
"""知识库删除时的回调,可用于清理资源"""
pass
async def ingest(self, context: IngestionContext) -> IngestionResult:
"""将文档摄取到知识库中"""
# 1. 获取文件内容
file_bytes = await self.plugin.get_knowledge_file_stream(context.file_object.storage_path)
# 2. 解析文档并分块
text = file_bytes.decode('utf-8')
chunk_size = context.creation_settings.get('chunk_size', 500)
chunk_overlap = context.creation_settings.get('chunk_overlap', 50)
chunks = self._split_text(text, chunk_size, chunk_overlap)
# 3. 调用宿主嵌入模型生成向量
embedding_model_uuid = context.creation_settings.get('embedding_model_uuid', '')
vectors = await self.plugin.invoke_embedding(embedding_model_uuid, chunks)
# 4. 写入宿主向量数据库
collection_id = context.get_collection_id()
ids = [f"{context.file_object.metadata.document_id}_{i}" for i in range(len(chunks))]
metadata = [{"document_id": context.file_object.metadata.document_id, "chunk_index": i, "text": chunks[i]} for i in range(len(chunks))]
await self.plugin.vector_upsert(collection_id, vectors, ids, metadata)
return IngestionResult(
document_id=context.file_object.metadata.document_id,
status=DocumentStatus.COMPLETED,
chunks_created=len(chunks),
)
async def retrieve(self, context: RetrievalContext) -> RetrievalResponse:
"""从知识库中检索相关内容"""
# 1. 生成查询向量
embedding_model_uuid = context.creation_settings.get('embedding_model_uuid', '')
query_vectors = await self.plugin.invoke_embedding(embedding_model_uuid, [context.query])
query_vector = query_vectors[0]
# 2. 向量搜索
collection_id = context.get_collection_id()
results = await self.plugin.vector_search(collection_id, query_vector, top_k=context.retrieval_settings.get('top_k', 5))
# 3. 转换为检索结果
entries = []
for r in results:
entry = RetrievalResultEntry(
id=r.get('id', ''),
content=[ContentElement.from_text(r.get('metadata', {}).get('text', ''))],
metadata=r.get('metadata', {}),
distance=r.get('score', 0.0),
)
entries.append(entry)
return RetrievalResponse(
results=entries,
total_found=len(entries),
metadata={},
)
async def delete_document(self, kb_id: str, document_id: str) -> bool:
"""从知识库中删除文档及其关联数据"""
collection_id = kb_id
deleted = await self.plugin.vector_delete(collection_id, file_ids=[document_id])
return deleted > 0
生命周期钩子
KnowledgeEngine 提供两个生命周期钩子,会在知识库创建和删除时被调用:
async def on_knowledge_base_create(self, kb_id: str, config: dict) -> None:
"""知识库创建时的回调
Args:
kb_id: 知识库 UUID
config: 用户在创建时填写的配置(creation_schema 中的字段)
"""
async def on_knowledge_base_delete(self, kb_id: str) -> None:
"""知识库删除时的回调
Args:
kb_id: 知识库 UUID
"""
文档摄取
ingest 方法在用户上传文档到知识库时被调用:
async def ingest(self, context: IngestionContext) -> IngestionResult:
IngestionContext 包含以下信息:
class IngestionContext(pydantic.BaseModel):
file_object: FileObject # 待摄取的文件对象
knowledge_base_id: str # 目标知识库 ID
collection_id: str | None # 向量集合 ID(默认回退到 knowledge_base_id)
creation_settings: dict # 知识库创建时的配置参数
parsed_content: ParseResult | None # 外部 Parser 插件的预解析内容(若有)
FileObject 包含文件元信息:
class FileObject(pydantic.BaseModel):
metadata: FileMetadata # 包含 filename, file_size, mime_type, document_id, knowledge_base_id 等
storage_path: str # 文件在存储系统中的路径
IngestionResult 应返回摄取结果:
class IngestionResult(pydantic.BaseModel):
document_id: str # 文档 ID
status: DocumentStatus # 处理状态:COMPLETED / FAILED
chunks_created: int = 0 # 创建的分块数量
error_message: str | None = None # 失败时的错误信息
metadata: dict = {} # 额外元数据
知识检索
retrieve 方法在知识库被查询时调用:
async def retrieve(self, context: RetrievalContext) -> RetrievalResponse:
RetrievalContext 包含以下信息:
class RetrievalContext(pydantic.BaseModel):
query: str # 查询文本
knowledge_base_id: str | None # 知识库 ID
collection_id: str | None # 向量集合 ID
retrieval_settings: dict # 检索时的配置参数(retrieval_schema 中的字段)
creation_settings: dict # 知识库创建时的配置参数
filters: dict # 元数据过滤条件(Chroma 风格 where 语法)
LangBot 宿主会在传入 retrieval_settings 前自动注入 top_k 默认值(默认为 5),插件可通过 context.retrieval_settings.get('top_k', 5) 获取。filters 字段包含从 retrieval_settings 中提取的 Chroma 风格 where 过滤条件。当调用方在检索设置中提供 filters(例如按时间范围、文件类型或自定义元数据字段过滤文档)时,宿主会填充此字段,插件可据此在检索时进行过滤。如果未提供过滤条件,此字段为空 dict。
RetrievalResponse 应返回检索结果:
class RetrievalResponse(pydantic.BaseModel):
results: list[RetrievalResultEntry] # 检索结果列表
total_found: int # 匹配的总数
metadata: dict # 额外元数据
RetrievalResultEntry 表示单个检索结果:
class RetrievalResultEntry(pydantic.BaseModel):
id: str # 结果 ID
content: list[ContentElement] # 内容,使用 ContentElement.from_text() 创建
metadata: dict # 元数据
distance: float # 距离分数(越小越相关)
score: float | None = None # 相似度分数(越大越相关)
文档删除
delete_document 方法在用户从知识库删除文档时调用:
async def delete_document(self, kb_id: str, document_id: str) -> bool:
"""删除文档及其关联数据
Args:
kb_id: 知识库 ID
document_id: 文档 ID
Returns:
是否删除成功
"""
宿主 RAG API
KnowledgeEngine 组件可以通过 self.plugin 调用 LangBot 宿主提供的 RAG 相关 API,包括嵌入模型调用、向量数据库操作和文件读取。
调用嵌入模型
async def invoke_embedding(
self,
embedding_model_uuid: str,
texts: list[str],
) -> list[list[float]]:
"""调用宿主的嵌入模型生成向量
Args:
embedding_model_uuid: 嵌入模型 UUID
texts: 待嵌入的文本列表
Returns:
向量列表,每个输入文本对应一个向量
"""
# 调用示例
vectors = await self.plugin.invoke_embedding("model_uuid", ["Hello", "World"])
向量写入
async def vector_upsert(
self,
collection_id: str,
vectors: list[list[float]],
ids: list[str],
metadata: list[dict] | None = None,
) -> None:
"""将向量写入宿主的向量数据库
Args:
collection_id: 目标集合 ID
vectors: 向量列表
ids: 向量唯一标识列表
metadata: 可选的元数据列表
"""
# 调用示例
await self.plugin.vector_upsert(
collection_id="kb_uuid",
vectors=[[0.1, 0.2, ...], [0.3, 0.4, ...]],
ids=["chunk_0", "chunk_1"],
metadata=[{"document_id": "doc1"}, {"document_id": "doc1"}],
)
向量搜索
async def vector_search(
self,
collection_id: str,
query_vector: list[float],
top_k: int = 5,
filters: dict | None = None,
) -> list[dict]:
"""在宿主的向量数据库中搜索相似向量
Args:
collection_id: 目标集合 ID
query_vector: 查询向量
top_k: 返回结果数量
filters: 可选的元数据过滤条件
Returns:
搜索结果列表(包含 id, score, metadata 等字段)
"""
# 调用示例
results = await self.plugin.vector_search(
collection_id="kb_uuid",
query_vector=[0.1, 0.2, ...],
top_k=5,
)
# 返回格式: [{"id": "chunk_0", "score": 0.123, "metadata": {"document_id": "doc1", ...}}, ...]
vector_search 返回的每个结果为 dict,包含 id(向量 ID)、score(距离分数)和 metadata(写入时附带的元数据)三个字段。如果需要在检索结果中返回文本内容,请在摄取阶段将文本存入 metadata 中。
向量删除
async def vector_delete(
self,
collection_id: str,
file_ids: list[str] | None = None,
filters: dict | None = None,
) -> int:
"""从宿主的向量数据库中删除向量
Args:
collection_id: 目标集合 ID
file_ids: 要删除的文件 ID 列表
filters: 可选的元数据过滤条件
Returns:
删除的条目数量
"""
# 调用示例
deleted = await self.plugin.vector_delete(
collection_id="kb_uuid",
file_ids=["doc_001"],
)
filters 参数支持 Chroma 风格的 where 语法进行元数据过滤。多个顶层键之间为 AND 关系。支持的运算符:$eq、$ne、$gt、$gte、$lt、$lte、$in、$nin。示例:{"file_id": {"$eq": "abc"}}。注意: Chroma、Qdrant 和 SeekDB 存储完整的元数据,可以对任意字段进行过滤。Milvus 和 pgvector 仅存储 text、file_id 和 chunk_uuid,对其他字段的过滤将被静默忽略。
async def get_knowledge_file_stream(self, storage_path: str) -> bytes:
"""从宿主存储中获取文件内容
Args:
storage_path: 文件的存储路径(来自 FileObject.storage_path)
Returns:
文件内容的字节数据
"""
# 调用示例
file_bytes = await self.plugin.get_knowledge_file_stream(context.file_object.storage_path)
测试 RAG 引擎
创建完成后,在插件目录执行命令 lbp run,启动调试。然后在 LangBot 中:
- 进入”知识库”页面
- 点击”创建知识库”
- 选择您的插件提供的 RAG 引擎,并根据引擎的
creation_schema 填写配置
- 创建成功后,可上传文档测试摄取能力(如引擎声明了
DOC_INGESTION 能力)
- 在流水线中绑定该知识库,测试检索能力