LangBot 为插件提供了一系列 API ,用于操作 LangBot 各个模块、控制消息上下文等。
请求 API
操作当前正在处理的用户请求(消息)。仅在EventListener和Command组件中可用。访问方式如下:
EventListener 的各个事件处理器方法中:event_context: context.EventContext对象内部方法
Command 的各个子命令处理器方法中:context: ExecuteContext对象内部方法
直接回复消息
直接回复一个消息链到当前请求所在会话。
消息链构造方式请参考消息平台实体。
async def reply(
self, message_chain: platform_message.MessageChain, quote_origin: bool = False
):
"""Reply to the message request
Args:
message_chain (platform.types.MessageChain): LangBot message chain
quote_origin (bool): Whether to quote the original message
"""
# 调用示例
await event_context.reply(
platform_message.MessageChain([
platform_message.Plain(text="Hello, world!"),
]),
)
获取机器人 UUID
获取当前请求来源机器人 UUID。
async def get_bot_uuid(self) -> str:
"""Get the bot uuid"""
# 调用示例
bot_uuid = await event_context.get_bot_uuid()
设置请求变量
单次请求中某些信息会被存储到请求变量中,在使用 Dify 等外部 LLMOps 平台时,这些变量会显式传入 LLMOps 平台。
async def set_query_var(self, key: str, value: Any):
"""Set a query variable"""
# 调用示例
await event_context.set_query_var("key", "value")
获取请求变量
获取单个请求变量。
async def get_query_var(self, key: str) -> Any:
"""Get a query variable"""
# 调用示例
query_var = await event_context.get_query_var("key")
获取所有请求变量
async def get_query_vars(self) -> dict[str, Any]:
"""Get all query variables"""
# 调用示例
query_vars = await event_context.get_query_vars()
获取当前流水线配置的知识库列表
获取当前请求所使用流水线中配置的知识库列表。
async def list_pipeline_knowledge_bases(self) -> list[dict[str, Any]]:
"""List knowledge bases configured for the current pipeline"""
# 调用示例
knowledge_bases = await event_context.list_pipeline_knowledge_bases()
# 返回示例
[
{
"uuid": "kb_uuid",
"name": "Product Docs",
"description": "内部产品文档",
}
]
仅返回当前请求对应流水线在 Local Agent 配置中绑定的知识库。若当前流水线未配置知识库,或未使用 Local Agent,则返回空列表。
检索当前流水线内的知识库
在当前请求所属流水线允许访问的知识库范围内执行检索。
async def retrieve_knowledge(
self,
kb_id: str,
query_text: str,
top_k: int = 5,
filters: dict[str, Any] | None = None,
) -> list[dict[str, Any]]:
"""Retrieve relevant documents from a knowledge base"""
# 调用示例
knowledge_bases = await event_context.list_pipeline_knowledge_bases()
results = await event_context.retrieve_knowledge(
kb_id=knowledge_bases[0]["uuid"],
query_text="如何配置企业知识库?",
top_k=3,
filters={"document_type": {"$eq": "manual"}},
)
# 返回示例
[
{
"id": "chunk_0",
"content": [{"type": "text", "text": "知识库配置说明"}],
"metadata": {"document_id": "doc_001"},
"distance": 0.12,
"score": 0.88,
}
]
kb_id 必须来自 list_pipeline_knowledge_bases 的返回结果,且必须属于当前流水线已配置的知识库,否则 LangBot 会返回错误。
LangBot API
这些 API 可在插件任何组件中调用。访问方式如下:
- 插件根目录
main.py中:self 对象内部方法,这些 API 均由插件类父类 BasePlugin 提供。
- 插件任何组件类中:
self.plugin 对象内部方法。
获取插件配置信息
插件配置格式可在manifest.yaml中编写,用户需要在 LangBot 的插件管理中按照插件配置格式填写。之后插件代码可以调用此 API 获取插件配置信息。
def get_config(self) -> dict[str, typing.Any]:
"""Get the config of the plugin."""
# 调用示例
config = self.plugin.get_config()
获取 LangBot 版本
获取 LangBot 版本号,返回格式为字符串v<major>.<minor>.<patch>。
async def get_langbot_version(self) -> str:
"""Get the langbot version"""
# 调用示例
langbot_version = await self.plugin.get_langbot_version()
获取已配置的机器人列表
返回由所有机器人 UUID 组成的列表。
async def get_bots(self) -> list[str]:
"""Get all bots"""
# 调用示例
bots = await self.plugin.get_bots()
获取机器人信息
获取机器人信息。
async def get_bot_info(self, bot_uuid: str) -> dict[str, Any]:
"""Get a bot info"""
# 调用示例
bot_info = await self.plugin.get_bot_info("de639861-be05-4018-859b-c2e2d3e0d603")
# 返回示例
{
"uuid": "de639861-be05-4018-859b-c2e2d3e0d603",
"name": "aiocqhttp",
"description": "由 LangBot v3 迁移而来",
"adapter": "aiocqhttp",
"enable": true,
"use_pipeline_name": "ChatPipeline",
"use_pipeline_uuid": "c30a1dca-e91c-452b-83ec-84d635a30028",
"created_at": "2025-05-10T13:53:08",
"updated_at": "2025-08-12T11:27:30",
"adapter_runtime_values": { # 若该机器人正在运行中,则具有该字段
"bot_account_id": 960164003 # 机器人账号 ID
}
}
发送主动消息
通过机器人 UUID 和目标会话 ID 发送主动消息。
消息链构造方式请参考消息平台实体。
async def send_message(
self,
bot_uuid: str,
target_type: str,
target_id: str,
message_chain: platform_message.MessageChain,
) -> None:
"""Send a message to a session"""
# 调用示例
await self.plugin.send_message(
bot_uuid="de639861-be05-4018-859b-c2e2d3e0d603",
target_type="person",
target_id="1010553892",
message_chain=platform_message.MessageChain([platform_message.Plain(text="Hello, world!")]),
)
获取已配置的 LLM 模型列表
返回由所有已配置的 LLM 模型 UUID 组成的列表。
async def get_llm_models(self) -> list[str]:
"""Get all LLM models"""
# 调用示例
llm_models = await self.plugin.get_llm_models()
调用 LLM 模型
调用 LLM 模型,返回 LLM 消息。非流式。
async def invoke_llm(
self,
llm_model_uuid: str,
messages: list[provider_message.Message],
funcs: list[resource_tool.LLMTool] = [],
extra_args: dict[str, Any] = {},
) -> provider_message.Message:
"""Invoke an LLM model"""
# 调用示例
llm_message = await self.plugin.invoke_llm(
llm_model_uuid="llm_model_uuid",
messages=[provider_message.Message(role="user", content="Hello, world!")],
funcs=[],
extra_args={},
)
列出可用解析器
列出宿主当前可用的 Parser 插件,可按 MIME 类型过滤。
async def list_parsers(self, mime_type: str | None = None) -> list[dict[str, Any]]:
"""List available Parser plugins"""
# 调用示例
parsers = await self.plugin.list_parsers(mime_type="application/pdf")
# 返回的每项包含 plugin_id、plugin_author、plugin_name、name、description、supported_mime_types
设置插件持久化数据
持久化存储插件数据。通过这个接口存储的数据,仅可被该插件访问。值需要自行转换为字节。
async def set_plugin_storage(self, key: str, value: bytes) -> None:
"""Set a plugin storage value"""
# 调用示例
await self.plugin.set_plugin_storage("key", b"value")
获取插件持久化数据
async def get_plugin_storage(self, key: str) -> bytes:
"""Get a plugin storage value"""
# 调用示例
plugin_storage = await self.plugin.get_plugin_storage("key")
获取插件所有持久化数据键
async def get_plugin_storage_keys(self) -> list[str]:
"""Get all plugin storage keys"""
# 调用示例
plugin_storage_keys = await self.plugin.get_plugin_storage_keys()
删除插件持久化数据
async def delete_plugin_storage(self, key: str) -> None:
"""Delete a plugin storage value"""
# 调用示例
await self.plugin.delete_plugin_storage("key")
获取工作空间持久化数据
通过这个接口存储的数据,可被所有插件访问。值需要自行转换为字节。
async def set_workspace_storage(self, key: str, value: bytes) -> None:
"""Set a workspace storage value"""
# 调用示例
await self.plugin.set_workspace_storage("key", b"value")
获取工作空间持久化数据
async def get_workspace_storage(self, key: str) -> bytes:
"""Get a workspace storage value"""
# 调用示例
workspace_storage = await self.plugin.get_workspace_storage("key")
获取工作空间所有持久化数据键
async def get_workspace_storage_keys(self) -> list[str]:
"""Get all workspace storage keys"""
# 调用示例
workspace_storage_keys = await self.plugin.get_workspace_storage_keys()
删除工作空间持久化数据
async def delete_workspace_storage(self, key: str) -> None:
"""Delete a workspace storage value"""
# 调用示例
await self.plugin.delete_workspace_storage("key")
获取插件配置项文件数据
async def get_config_file(self, file_key: str) -> bytes:
"""Get a config file value"""
# 调用示例
file_bytes = await self.plugin.get_config_file("key")
请结合 file 或 array[file] 类型配置项使用。
RAG API
这些 API 供 KnowledgeEngine 组件调用,用于访问 LangBot 宿主的嵌入模型、向量数据库和文件存储。访问方式:
- 在
KnowledgeEngine 组件类中:self.plugin 对象内部方法。
调用嵌入模型
调用宿主已配置的嵌入模型生成文本向量。
async def invoke_embedding(
self,
embedding_model_uuid: str,
texts: list[str],
) -> list[list[float]]:
"""调用嵌入模型生成向量
Args:
embedding_model_uuid: 嵌入模型 UUID
texts: 待嵌入的文本列表
Returns:
向量列表,每个输入文本对应一个向量
"""
# 调用示例
vectors = await self.plugin.invoke_embedding("model_uuid", ["Hello", "World"])
向量写入
将向量写入宿主的向量数据库。
async def vector_upsert(
self,
collection_id: str,
vectors: list[list[float]],
ids: list[str],
metadata: list[dict[str, Any]] | None = None,
documents: list[str] | None = None,
) -> None:
"""向量写入
Args:
collection_id: 目标集合 ID
vectors: 向量列表
ids: 向量唯一标识列表
metadata: 可选的元数据列表
documents: 可选的原始文本文档列表。在支持全文/混合检索的
后端中,需要传入此参数以启用全文和混合检索。
"""
# 调用示例
await self.plugin.vector_upsert(
collection_id="kb_uuid",
vectors=[[0.1, 0.2, ...], [0.3, 0.4, ...]],
ids=["chunk_0", "chunk_1"],
metadata=[{"document_id": "doc1"}, {"document_id": "doc1"}],
documents=["分块文本 0", "分块文本 1"],
)
向量搜索
在宿主的向量数据库中搜索相似向量。
async def vector_search(
self,
collection_id: str,
query_vector: list[float],
top_k: int = 5,
filters: dict[str, Any] | None = None,
search_type: str = "vector",
query_text: str = "",
) -> list[dict[str, Any]]:
"""向量搜索
Args:
collection_id: 目标集合 ID
query_vector: 查询向量
top_k: 返回结果数量
filters: 可选的元数据过滤条件
search_type: 检索方式,可选 'vector'、'full_text'、'hybrid'
query_text: 原始查询文本,用于全文检索和混合检索
Returns:
搜索结果列表(包含 id, score, metadata 等字段)
"""
# 调用示例
results = await self.plugin.vector_search(
collection_id="kb_uuid",
query_vector=[0.1, 0.2, ...],
top_k=5,
search_type="hybrid",
query_text="搜索查询",
)
# 返回格式: [{"id": "chunk_0", "score": 0.123, "metadata": {"document_id": "doc1", ...}}, ...]
vector_search 返回的每个结果为 dict,包含 id(向量 ID)、score(距离分数)和 metadata(写入时附带的元数据)三个字段。如果需要在检索结果中返回文本内容,请在摄取阶段将文本存入 metadata 中。
向量删除
从宿主的向量数据库中删除向量。
async def vector_delete(
self,
collection_id: str,
file_ids: list[str] | None = None,
filters: dict[str, Any] | None = None,
) -> int:
"""向量删除
Args:
collection_id: 目标集合 ID
file_ids: 要删除的文件 ID 列表
filters: 可选的元数据过滤条件
Returns:
删除的条目数量
"""
# 调用示例
deleted = await self.plugin.vector_delete(
collection_id="kb_uuid",
file_ids=["doc_001"],
)
filters 参数支持 Chroma 风格的 where 语法进行元数据过滤。多个顶层键之间为 AND 关系。支持的运算符:$eq、$ne、$gt、$gte、$lt、$lte、$in、$nin。# 隐式 $eq
results = await self.plugin.vector_search(
collection_id="kb_uuid",
query_vector=[0.1, 0.2, ...],
filters={"file_id": "abc"},
)
# 比较运算符
results = await self.plugin.vector_search(
collection_id="kb_uuid",
query_vector=[0.1, 0.2, ...],
filters={"created_at": {"$gte": 1700000000}},
)
# 列表运算符
results = await self.plugin.vector_search(
collection_id="kb_uuid",
query_vector=[0.1, 0.2, ...],
filters={"file_type": {"$in": ["pdf", "docx"]}},
)
# 按过滤条件删除
deleted = await self.plugin.vector_delete(
collection_id="kb_uuid",
filters={"file_type": {"$eq": "pdf"}},
)
注意: Chroma、Qdrant 和 SeekDB 存储完整的元数据,可以对任意字段进行过滤。Milvus 和 pgvector 仅存储 text、file_id 和 chunk_uuid,对其他字段的过滤将被静默忽略。
从宿主存储中获取上传文件的内容。
async def get_knowledge_file_stream(self, storage_path: str) -> bytes:
"""获取文件内容
Args:
storage_path: 文件的存储路径(来自 FileObject.storage_path)
Returns:
文件内容的字节数据
"""
# 调用示例
file_bytes = await self.plugin.get_knowledge_file_stream(context.file_object.storage_path)