跳转到主要内容
LangBot 为插件提供了一系列 API ,用于操作 LangBot 各个模块、控制消息上下文等。

请求 API

操作当前正在处理的用户请求(消息)。仅在EventListenerCommand组件中可用。访问方式如下:
  • EventListener 的各个事件处理器方法中:event_context: context.EventContext对象内部方法
  • Command 的各个子命令处理器方法中:context: ExecuteContext对象内部方法

直接回复消息

直接回复一个消息链到当前请求所在会话。 消息链构造方式请参考消息平台实体
async def reply(
    self, message_chain: platform_message.MessageChain, quote_origin: bool = False
):
    """Reply to the message request

    Args:
        message_chain (platform.types.MessageChain): LangBot message chain
        quote_origin (bool): Whether to quote the original message
    """

# 调用示例
await event_context.reply(
    platform_message.MessageChain([
        platform_message.Plain(text="Hello, world!"),
    ]),
)

获取机器人 UUID

获取当前请求来源机器人 UUID。
async def get_bot_uuid(self) -> str:
    """Get the bot uuid"""

# 调用示例
bot_uuid = await event_context.get_bot_uuid()

设置请求变量

单次请求中某些信息会被存储到请求变量中,在使用 Dify 等外部 LLMOps 平台时,这些变量会显式传入 LLMOps 平台
async def set_query_var(self, key: str, value: Any):
    """Set a query variable"""

# 调用示例
await event_context.set_query_var("key", "value")

获取请求变量

获取单个请求变量。
async def get_query_var(self, key: str) -> Any:
    """Get a query variable"""

# 调用示例
query_var = await event_context.get_query_var("key")

获取所有请求变量

async def get_query_vars(self) -> dict[str, Any]:
    """Get all query variables"""

# 调用示例
query_vars = await event_context.get_query_vars()

获取当前流水线配置的知识库列表

获取当前请求所使用流水线中配置的知识库列表。
async def list_pipeline_knowledge_bases(self) -> list[dict[str, Any]]:
    """List knowledge bases configured for the current pipeline"""

# 调用示例
knowledge_bases = await event_context.list_pipeline_knowledge_bases()

# 返回示例
[
    {
        "uuid": "kb_uuid",
        "name": "Product Docs",
        "description": "内部产品文档",
    }
]
仅返回当前请求对应流水线在 Local Agent 配置中绑定的知识库。若当前流水线未配置知识库,或未使用 Local Agent,则返回空列表。

检索当前流水线内的知识库

在当前请求所属流水线允许访问的知识库范围内执行检索。
async def retrieve_knowledge(
    self,
    kb_id: str,
    query_text: str,
    top_k: int = 5,
    filters: dict[str, Any] | None = None,
) -> list[dict[str, Any]]:
    """Retrieve relevant documents from a knowledge base"""

# 调用示例
knowledge_bases = await event_context.list_pipeline_knowledge_bases()
results = await event_context.retrieve_knowledge(
    kb_id=knowledge_bases[0]["uuid"],
    query_text="如何配置企业知识库?",
    top_k=3,
    filters={"document_type": {"$eq": "manual"}},
)

# 返回示例
[
    {
        "id": "chunk_0",
        "content": [{"type": "text", "text": "知识库配置说明"}],
        "metadata": {"document_id": "doc_001"},
        "distance": 0.12,
        "score": 0.88,
    }
]
kb_id 必须来自 list_pipeline_knowledge_bases 的返回结果,且必须属于当前流水线已配置的知识库,否则 LangBot 会返回错误。

LangBot API

这些 API 可在插件任何组件中调用。访问方式如下:
  • 插件根目录main.py中:self 对象内部方法,这些 API 均由插件类父类 BasePlugin 提供。
  • 插件任何组件类中:self.plugin 对象内部方法。

获取插件配置信息

插件配置格式可在manifest.yaml中编写,用户需要在 LangBot 的插件管理中按照插件配置格式填写。之后插件代码可以调用此 API 获取插件配置信息。
def get_config(self) -> dict[str, typing.Any]:
    """Get the config of the plugin."""

# 调用示例
config = self.plugin.get_config()

获取 LangBot 版本

获取 LangBot 版本号,返回格式为字符串v<major>.<minor>.<patch>
async def get_langbot_version(self) -> str:
    """Get the langbot version"""

# 调用示例
langbot_version = await self.plugin.get_langbot_version()

获取已配置的机器人列表

返回由所有机器人 UUID 组成的列表。
async def get_bots(self) -> list[str]:
    """Get all bots"""

# 调用示例
bots = await self.plugin.get_bots()

获取机器人信息

获取机器人信息。
async def get_bot_info(self, bot_uuid: str) -> dict[str, Any]:
    """Get a bot info"""

# 调用示例
bot_info = await self.plugin.get_bot_info("de639861-be05-4018-859b-c2e2d3e0d603")

# 返回示例
{
    "uuid": "de639861-be05-4018-859b-c2e2d3e0d603",
    "name": "aiocqhttp",
    "description": "由 LangBot v3 迁移而来",
    "adapter": "aiocqhttp",
    "enable": true,
    "use_pipeline_name": "ChatPipeline",
    "use_pipeline_uuid": "c30a1dca-e91c-452b-83ec-84d635a30028",
    "created_at": "2025-05-10T13:53:08",
    "updated_at": "2025-08-12T11:27:30",
    "adapter_runtime_values": {  # 若该机器人正在运行中,则具有该字段
        "bot_account_id": 960164003  # 机器人账号 ID
    }
}

发送主动消息

通过机器人 UUID 和目标会话 ID 发送主动消息。 消息链构造方式请参考消息平台实体
async def send_message(
    self,
    bot_uuid: str,
    target_type: str,
    target_id: str,
    message_chain: platform_message.MessageChain,
) -> None:
    """Send a message to a session"""

# 调用示例
await self.plugin.send_message(
    bot_uuid="de639861-be05-4018-859b-c2e2d3e0d603",
    target_type="person",
    target_id="1010553892",
    message_chain=platform_message.MessageChain([platform_message.Plain(text="Hello, world!")]),
)

获取已配置的 LLM 模型列表

返回由所有已配置的 LLM 模型 UUID 组成的列表。
async def get_llm_models(self) -> list[str]:
    """Get all LLM models"""

# 调用示例
llm_models = await self.plugin.get_llm_models()

调用 LLM 模型

调用 LLM 模型,返回 LLM 消息。非流式。
async def invoke_llm(
    self,
    llm_model_uuid: str,
    messages: list[provider_message.Message],
    funcs: list[resource_tool.LLMTool] = [],
    extra_args: dict[str, Any] = {},
) -> provider_message.Message:
    """Invoke an LLM model"""

# 调用示例
llm_message = await self.plugin.invoke_llm(
    llm_model_uuid="llm_model_uuid",
    messages=[provider_message.Message(role="user", content="Hello, world!")],
    funcs=[],
    extra_args={},
)

列出可用解析器

列出宿主当前可用的 Parser 插件,可按 MIME 类型过滤。
async def list_parsers(self, mime_type: str | None = None) -> list[dict[str, Any]]:
    """List available Parser plugins"""

# 调用示例
parsers = await self.plugin.list_parsers(mime_type="application/pdf")
# 返回的每项包含 plugin_id、plugin_author、plugin_name、name、description、supported_mime_types

设置插件持久化数据

持久化存储插件数据。通过这个接口存储的数据,仅可被该插件访问。值需要自行转换为字节。
async def set_plugin_storage(self, key: str, value: bytes) -> None:
    """Set a plugin storage value"""

# 调用示例
await self.plugin.set_plugin_storage("key", b"value")

获取插件持久化数据

async def get_plugin_storage(self, key: str) -> bytes:
    """Get a plugin storage value"""

# 调用示例
plugin_storage = await self.plugin.get_plugin_storage("key")

获取插件所有持久化数据键

async def get_plugin_storage_keys(self) -> list[str]:
    """Get all plugin storage keys"""

# 调用示例
plugin_storage_keys = await self.plugin.get_plugin_storage_keys()

删除插件持久化数据

async def delete_plugin_storage(self, key: str) -> None:
    """Delete a plugin storage value"""

# 调用示例
await self.plugin.delete_plugin_storage("key")

获取工作空间持久化数据

通过这个接口存储的数据,可被所有插件访问。值需要自行转换为字节。
async def set_workspace_storage(self, key: str, value: bytes) -> None:
    """Set a workspace storage value"""

# 调用示例
await self.plugin.set_workspace_storage("key", b"value")

获取工作空间持久化数据

async def get_workspace_storage(self, key: str) -> bytes:
    """Get a workspace storage value"""

# 调用示例
workspace_storage = await self.plugin.get_workspace_storage("key")

获取工作空间所有持久化数据键

async def get_workspace_storage_keys(self) -> list[str]:
    """Get all workspace storage keys"""

# 调用示例
workspace_storage_keys = await self.plugin.get_workspace_storage_keys()

删除工作空间持久化数据

async def delete_workspace_storage(self, key: str) -> None:
    """Delete a workspace storage value"""

# 调用示例
await self.plugin.delete_workspace_storage("key")

获取插件配置项文件数据

async def get_config_file(self, file_key: str) -> bytes:
    """Get a config file value"""

# 调用示例
file_bytes = await self.plugin.get_config_file("key")
请结合 file 或 array[file] 类型配置项使用。

RAG API

这些 API 供 KnowledgeEngine 组件调用,用于访问 LangBot 宿主的嵌入模型、向量数据库和文件存储。访问方式:
  • KnowledgeEngine 组件类中:self.plugin 对象内部方法。

调用嵌入模型

调用宿主已配置的嵌入模型生成文本向量。
async def invoke_embedding(
    self,
    embedding_model_uuid: str,
    texts: list[str],
) -> list[list[float]]:
    """调用嵌入模型生成向量

    Args:
        embedding_model_uuid: 嵌入模型 UUID
        texts: 待嵌入的文本列表

    Returns:
        向量列表,每个输入文本对应一个向量
    """

# 调用示例
vectors = await self.plugin.invoke_embedding("model_uuid", ["Hello", "World"])

向量写入

将向量写入宿主的向量数据库。
async def vector_upsert(
    self,
    collection_id: str,
    vectors: list[list[float]],
    ids: list[str],
    metadata: list[dict[str, Any]] | None = None,
    documents: list[str] | None = None,
) -> None:
    """向量写入

    Args:
        collection_id: 目标集合 ID
        vectors: 向量列表
        ids: 向量唯一标识列表
        metadata: 可选的元数据列表
        documents: 可选的原始文本文档列表。在支持全文/混合检索的
            后端中,需要传入此参数以启用全文和混合检索。
    """

# 调用示例
await self.plugin.vector_upsert(
    collection_id="kb_uuid",
    vectors=[[0.1, 0.2, ...], [0.3, 0.4, ...]],
    ids=["chunk_0", "chunk_1"],
    metadata=[{"document_id": "doc1"}, {"document_id": "doc1"}],
    documents=["分块文本 0", "分块文本 1"],
)

向量搜索

在宿主的向量数据库中搜索相似向量。
async def vector_search(
    self,
    collection_id: str,
    query_vector: list[float],
    top_k: int = 5,
    filters: dict[str, Any] | None = None,
    search_type: str = "vector",
    query_text: str = "",
) -> list[dict[str, Any]]:
    """向量搜索

    Args:
        collection_id: 目标集合 ID
        query_vector: 查询向量
        top_k: 返回结果数量
        filters: 可选的元数据过滤条件
        search_type: 检索方式,可选 'vector'、'full_text'、'hybrid'
        query_text: 原始查询文本,用于全文检索和混合检索

    Returns:
        搜索结果列表(包含 id, score, metadata 等字段)
    """

# 调用示例
results = await self.plugin.vector_search(
    collection_id="kb_uuid",
    query_vector=[0.1, 0.2, ...],
    top_k=5,
    search_type="hybrid",
    query_text="搜索查询",
)
# 返回格式: [{"id": "chunk_0", "score": 0.123, "metadata": {"document_id": "doc1", ...}}, ...]
vector_search 返回的每个结果为 dict,包含 id(向量 ID)、score(距离分数)和 metadata(写入时附带的元数据)三个字段。如果需要在检索结果中返回文本内容,请在摄取阶段将文本存入 metadata 中。

向量删除

从宿主的向量数据库中删除向量。
async def vector_delete(
    self,
    collection_id: str,
    file_ids: list[str] | None = None,
    filters: dict[str, Any] | None = None,
) -> int:
    """向量删除

    Args:
        collection_id: 目标集合 ID
        file_ids: 要删除的文件 ID 列表
        filters: 可选的元数据过滤条件

    Returns:
        删除的条目数量
    """

# 调用示例
deleted = await self.plugin.vector_delete(
    collection_id="kb_uuid",
    file_ids=["doc_001"],
)
filters 参数支持 Chroma 风格的 where 语法进行元数据过滤。多个顶层键之间为 AND 关系。支持的运算符:$eq$ne$gt$gte$lt$lte$in$nin
# 隐式 $eq
results = await self.plugin.vector_search(
    collection_id="kb_uuid",
    query_vector=[0.1, 0.2, ...],
    filters={"file_id": "abc"},
)

# 比较运算符
results = await self.plugin.vector_search(
    collection_id="kb_uuid",
    query_vector=[0.1, 0.2, ...],
    filters={"created_at": {"$gte": 1700000000}},
)

# 列表运算符
results = await self.plugin.vector_search(
    collection_id="kb_uuid",
    query_vector=[0.1, 0.2, ...],
    filters={"file_type": {"$in": ["pdf", "docx"]}},
)

# 按过滤条件删除
deleted = await self.plugin.vector_delete(
    collection_id="kb_uuid",
    filters={"file_type": {"$eq": "pdf"}},
)
注意: Chroma、Qdrant 和 SeekDB 存储完整的元数据,可以对任意字段进行过滤。Milvus 和 pgvector 仅存储 textfile_idchunk_uuid,对其他字段的过滤将被静默忽略。
从宿主存储中获取上传文件的内容。
async def get_knowledge_file_stream(self, storage_path: str) -> bytes:
    """获取文件内容

    Args:
        storage_path: 文件的存储路径(来自 FileObject.storage_path)

    Returns:
        文件内容的字节数据
    """

# 调用示例
file_bytes = await self.plugin.get_knowledge_file_stream(context.file_object.storage_path)