22.RunnableLambda #
RunnableLambda 是一个用于将普通的 Python 函数(即 lambda 或普通函数)包装为可复用、可链接的“可运行对象”(Runnable)的类。它的主要作用是使得任意函数都能以统一的方式进行调用(如 invoke 处理单个输入,batch 处理批量输入),并能与其它 Runnable 实例灵活组合,实现流式、链式的数据处理。
- 本例演示了如何用
RunnableLambda包装简单的函数(如加1、乘2、平方等),并通过统一接口进行调用。 - 你可以像操作普通对象一样去复用、组合这些被包装的函数,实现模块化和流程化的复杂处理。
RunnableLambda极大提升了处理自定义函数与流程搭建时的代码复用性和拓展性。
22.1. 22.RunnableLambda.py #
22.RunnableLambda.py
#from langchain_core.runnables import RunnableLambda
# 导入RunnableLambda类
from smartchain.runnables import RunnableLambda
# 将一个递增函数包装成Runnable对象
add_one_runnable = RunnableLambda(lambda x: x + 1)
# 使用invoke方法处理单个输入,输入为5
result = add_one_runnable.invoke(5)
# 输出单个输入的处理结果
print(f"输出: {result}")
# 使用batch方法批量处理多个输入,输入为[1, 2, 3, 4, 5]
results = add_one_runnable.batch([1, 2, 3, 4, 5])
# 输出批量处理的结果,[2, 3, 4, 5, 6]
print(f"输出: {results}") # 输出: [2, 3, 4, 5, 6]
# 将递增函数包装成Runnable对象
add_runnable = RunnableLambda(lambda x: x + 1)
# 将乘2函数包装成Runnable对象
mul_runnable = RunnableLambda(lambda x: x * 2)
# 将平方函数包装成Runnable对象
square_runnable = RunnableLambda(lambda x: x ** 2)
# 定义输入值为3
input_value = 3
# 先对输入进行加1操作
step1 = add_runnable.invoke(input_value)
# 再对上一步结果乘2
step2 = mul_runnable.invoke(step1)
# 最后对上一步结果取平方
step3 = square_runnable.invoke(step2)
# 输出链式操作的最终结果
print(f"输出: {step3}")
# 创建一个处理函数(模拟流式处理)
def process_text(text):
"""处理文本,模拟流式输出"""
words = text.split()
for word in words:
yield word # 逐步返回每个单词
# 将函数包装成 Runnable
# 注意:stream 方法需要返回一个生成器
stream_runnable = RunnableLambda(process_text)
# 使用 stream 方法流式处理
# stream 方法返回一个生成器,可以逐步获取结果
input_text = "你好 世界 Python"
print("流式处理结果:")
for chunk in stream_runnable.stream(input_text):
print(f" 收到: {chunk}")22.2. runnables.py #
smartchain/runnables.py
# 导入抽象基类 (ABC: 抽象基类基类,abstractmethod: 用于定义抽象方法)
from abc import ABC, abstractmethod
# 导入 inspect 模块,用于获取函数签名和参数类型
import inspect
# 定义 Runnable 抽象基类,所有可运行单元必须继承它
class Runnable(ABC):
"""
Runnable 抽象基类
所有可运行组件的基础接口,定义了统一的调用方法。
"""
# 抽象方法,子类必须实现,用于同步调用
@abstractmethod
def invoke(self, input, **kwargs):
"""
同步调用 Runnable
Args:
input: 输入值
**kwargs: 额外的关键字参数
Returns:
输出值
"""
pass # 仅做接口规范,子类务必实现
def stream(self, input, **kwargs):
"""
流式调用 Runnable
默认实现:先调用 invoke,若返回可迭代且不是字符串/字节/字典,则逐项 yield;
否则直接 yield 单值。
"""
result = self.invoke(input, **kwargs)
# 字符串/字节/字典不视为流式可迭代,直接返回单值
if hasattr(result, "__iter__") and not isinstance(result, (str, bytes, dict)):
for item in result:
yield item
else:
yield result
# 定义批量调用方法,默认实现为遍历输入逐个调用 invoke
def batch(self, inputs: list, **kwargs) -> list:
"""
批量调用 Runnable
Args:
inputs: 输入值列表
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 对每个输入项都调用 invoke,并收集结果
return [self.invoke(input_item, **kwargs) for input_item in inputs]
# 定义 RunnableLambda 类,用于将普通 Python 函数封装为 Runnable 对象
class RunnableLambda(Runnable):
"""
RunnableLambda 将普通 Python 函数包装成 Runnable
这使得普通函数可以在链式调用中使用,并支持统一的 invoke 接口。
示例:
python
def add_one(x: int) -> int:
return x + 1
runnable = RunnableLambda(add_one)
result = runnable.invoke(5) # 返回 6
results = runnable.batch([1, 2, 3]) # 返回 [2, 3, 4]
``
"""
# 初始化方法,接收一个函数和可选的名称
def __init__(self, func, name: str | None = None):
"""
初始化 RunnableLambda
Args:
func: 要包装的函数
name: Runnable 的名称(可选,默认使用函数名)
"""
# 检查传入的 func 是否为可调用对象
if not callable(func):
raise TypeError(f"func 必须是可调用对象,但得到了 {type(func)}")
# 保存待封装的函数
self.func = func
# 如果 name 明确传入则使用,否则合理推断
if name is not None:
self.name = name
else:
try:
# 尽量用函数原名,如果是 lambda 就命名为 "lambda"
self.name = func.__name__ if func.__name__ != "<lambda>" else "lambda"
except AttributeError:
# 对于匿名对象无法获取 __name__ 时兜底
self.name = "runnable"
# 实现 invoke,同步调用底层函数
def invoke(self, input, **kwargs):
"""
调用包装的函数
Args:
input: 输入值
**kwargs: 额外的关键字参数(会传递给函数)
Returns:
函数的返回值
"""
return self.func(input, **kwargs)
# 批量调用内部依然调用 invoke,保证与 Runnable 基类一致
def batch(self, inputs: list, **kwargs) -> list:
"""
批量调用包装的函数
Args:
inputs: 输入值列表
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 调用 invoke 实现批量处理
return [self.invoke(input_item, **kwargs) for input_item in inputs]
# 流式调用:直接复用基类的流式封装
def stream(self, input, **kwargs):
"""
流式调用包装的函数
对返回生成器/可迭代对象逐项 yield;若是单值则直接 yield。
"""
yield from super().stream(input, **kwargs)
# 返回对象自身的字符串表达,便于排查与日志
def __repr__(self) -> str:
"""返回 RunnableLambda 的字符串表示"""
return f"RunnableLambda(func={self.name})"22.3 类 #
22.3.1 核心类 #
| 类/模块 | 作用 | 关键方法 | 备注/使用要点 |
|---|---|---|---|
Runnable |
可运行单元的抽象基类,统一同步调用与批量调用接口 | invoke(input, **kwargs) 抽象;batch(inputs, **kwargs) 默认逐个调用 invoke |
任何可运行组件都需实现 invoke;batch 提供通用批处理 |
RunnableLambda |
将任意可调用(函数/lambda 等)包装成 Runnable,便于链式组合与统一接口调用 |
invoke(input, **kwargs) 根据被包装函数的签名智能传参;batch(inputs, **kwargs) 逐项调用 invoke;__repr__ 便于调试 |
传入的 func 必须可调用;名称默认取函数名或 "lambda" |
22.3.2 类图 #

22.3.3 调用关系 #


22.3.4 调用流程 #
单次调用 invoke 流程
- 入口:
RunnableLambda.invoke(input, **kwargs)。 - 读取函数签名:
inspect.signature(self.func),拿到参数列表。 - 分支逻辑(智能传参):
- 仅 1 个参数:调用
func(input)。 - 多于 1 个参数:
- 若 2 个参数且第二个是
**kwargs或有默认值:尝试func(input, **kwargs);若类型不符再退回func(input)。 - 其他多参数场景:直接
func(input)(不传 kwargs)。
- 若 2 个参数且第二个是
- 0 参数:直接
func()。
- 仅 1 个参数:调用
- 若签名获取或调用抛出
TypeError/ValueError:兜底先试func(input, **kwargs),再退回func(input)。 - 返回值直接透传。
批量调用 batch 流程
- 入口:
RunnableLambda.batch(inputs, **kwargs)。 - 对
inputs列表逐项调用invoke,收集结果并返回。 - 因为逐项走
invoke,所以批量时仍继承了上述智能传参逻辑。
调用链
- 单函数批量:
add_one_runnable.batch([1,2,3,4,5])- 对每个元素执行
invoke(x)→func(x)→ 结果收集为[2,3,4,5,6]。
- 对每个元素执行
- 链式调用(加1→乘2→平方):
add_runnable.invoke(3)→ 4mul_runnable.invoke(4)→ 8square_runnable.invoke(8)→ 64
- 链式本质:上一步的返回值作为下一步的输入,全部都通过各自的
invoke触发底层func。
关键点
Runnable只定义协议,RunnableLambda负责实际包装和智能传参。batch不做并行,只是循环调用invoke,因此行为与单次调用完全一致且可预测。- 调用失败时的兜底策略确保了对“奇特签名”函数的容错。
23.RunnableSequence #
RunnableSequence 是对一系列 Runnable 对象的有序组合,支持通过管道符 | 实现形如 A | B | C 的链式调用。它让多个数据处理单元首尾衔接形成流水线,极大地提升了可组合性和复用度。
核心设计理念与结构
- 面向接口编程:链中每个元素都必须实现
Runnable协议(即具备invoke,batch,stream等方法),保证类型安全与一致性。 - 链式拼接:
- 使用
A | B | C语法,底层映射为A.__or__(B)或chain.__or__(C)。 - 链实际上就是把多个
Runnable实例保存为列表,依次调用。
- 使用
- 调用递进:
- 每一步的输出作为下一步的输入。
- 支持单步调用(
invoke)、批量调用(batch)和流式输出(stream)。
关键方法
__init__(self, runnables: list[Runnable])构造方法,传入多个Runnable实例,自动做类型和非空检查。__or__(self, other)支持|操作符扩展序列,返回包含新增可运行单元的新链。invoke(self, input, **kwargs)顺序调用各子组件,将每个环节的输出作为下个环节输入:value = input for runnable in self.runnables: value = runnable.invoke(value, **kwargs) return valuebatch(self, inputs: list, **kwargs)对输入列表每一项独立执行整条链,常用于小规模数据批处理。stream(self, input, **kwargs)默认沿用基类逻辑。对最终的链路输出进行流式分发(如若链路末端返回生成器,则逐项 yield)。__repr__(self)输出形如RunnableSequence(Add | Mul | Prefix)的链路结构,便于调试与日志。
用法与优势
- 强组合性:任意顺序、个数的
Runnable组件均可组合;组件本身可以是基础单元,也可以是其它链条。 - 高度可测试:每一层
Runnable独立测试;链式组合后整体也可以简单测试。 示例用法:
python 假设已定义 add_one, multiply_two, add_prefix 函数 chain = RunnableLambda(add_one) | RunnableLambda(multiply_two) | RunnableLambda(add_prefix) result = chain.invoke(3) # 结果: "结果: 8" results = chain.batch([1,2,3]) # 结果: ["结果: 4", "结果: 6", "结果: 8"]可递归嵌套:链中元素可以是其它
RunnableSequence,实现树状/网状复杂流程。
典型应用场景
- 多阶段数据预处理流水线。
- 多步 LLM Prompt 处理与包装。
- 复杂推理任务的分步拆解执行。
摘要对照
invoke是链条主力;batch是批量;stream对可迭代输出封装支持。- 只做串行处理,不做自动并发;如需并发需在外层封装。
- 适合函数式风格处理、文本/数据流清洗、模型 API 前后处理等。
总结:
RunnableSequence将“数据流管道式处理”的思想以最高的 Pythonic 简洁度落地,扩大了通用数据/模型组件的复用能力,是构建可维护智能应用(如多步问答、复杂 ETL)的利器。
23.1. 23.RunnableSequence.py #
23.RunnableSequence.py
#from langchain_core.runnables import RunnableLambda
# 导入RunnableLambda类
from smartchain.runnables import RunnableLambda
# 定义多个处理函数
def add_one(x):
"""加1"""
return x + 1
def multiply_two(x):
"""乘以2"""
return x * 2
def add_prefix(x):
"""添加前缀"""
return f"结果: {x}"
# 将函数包装成 Runnable
add_one_runnable = RunnableLambda(add_one)
multiply_two_runnable = RunnableLambda(multiply_two)
add_prefix_runnable = RunnableLambda(add_prefix)
# 使用管道操作符 | 组合多个组件
# 管道操作符将前一个组件的输出作为后一个组件的输入
chain = add_one_runnable | multiply_two_runnable | add_prefix_runnable
# 调用组合后的链(单次调用)
# 执行流程:5 -> add_one(5) = 6 -> multiply_two(6) = 12 -> add_prefix(12) = "结果: 12"
result = chain.invoke(5)
# 打印结果
print(f"输入: 5")
print(f"输出: {result}") # 输出:结果: 12
# 批量调用演示:对多个输入执行同一条链
inputs = [1, 2, 3, 4]
batch_results = chain.batch(inputs)
print(f"批量输入: {inputs}")
print(f"批量输出: {batch_results}") # 结果: ["结果: 4", "结果: 6", "结果: 8", "结果: 10"]
# 流式调用演示:对于本链条,最终结果是字符串,stream 会直接逐项 yield(此处单值)
print("流式输出:")
for chunk in chain.stream(7):
print(f" 收到: {chunk}") # 7 -> 8 -> 16 -> "结果: 16"23.2. runnables.py #
smartchain/runnables.py
# 导入抽象基类 (ABC: 抽象基类基类,abstractmethod: 用于定义抽象方法)
from abc import ABC, abstractmethod
# 导入 inspect 模块,用于获取函数签名和参数类型
import inspect
# 定义 Runnable 抽象基类,所有可运行单元必须继承它
class Runnable(ABC):
"""
Runnable 抽象基类
所有可运行组件的基础接口,定义了统一的调用方法。
"""
# 抽象方法,子类必须实现,用于同步调用
@abstractmethod
def invoke(self, input, **kwargs):
"""
同步调用 Runnable
Args:
input: 输入值
**kwargs: 额外的关键字参数
Returns:
输出值
"""
pass # 仅做接口规范,子类务必实现
def stream(self, input, **kwargs):
"""
流式调用 Runnable
默认实现:先调用 invoke,若返回可迭代且不是字符串/字节/字典,则逐项 yield;
否则直接 yield 单值。
"""
result = self.invoke(input, **kwargs)
# 字符串/字节/字典不视为流式可迭代,直接返回单值
if hasattr(result, "__iter__") and not isinstance(result, (str, bytes, dict)):
for item in result:
yield item
else:
yield result
# 管道操作符,便于链式拼接
+ def __or__(self, other):
+ if not isinstance(other, Runnable):
+ raise TypeError("管道右侧必须是 Runnable 实例")
+ return RunnableSequence([self, other])
# 定义批量调用方法,默认实现为遍历输入逐个调用 invoke
def batch(self, inputs: list, **kwargs) -> list:
"""
批量调用 Runnable
Args:
inputs: 输入值列表
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 对每个输入项都调用 invoke,并收集结果
return [self.invoke(input_item, **kwargs) for input_item in inputs]
# 定义 RunnableLambda 类,用于将普通 Python 函数封装为 Runnable 对象
class RunnableLambda(Runnable):
"""
RunnableLambda 将普通 Python 函数包装成 Runnable
这使得普通函数可以在链式调用中使用,并支持统一的 invoke 接口。
示例:
``python
def add_one(x: int) -> int:
return x + 1
runnable = RunnableLambda(add_one)
result = runnable.invoke(5) # 返回 6
results = runnable.batch([1, 2, 3]) # 返回 [2, 3, 4]
``
"""
# 初始化方法,接收一个函数和可选的名称
def __init__(self, func, name: str | None = None):
"""
初始化 RunnableLambda
Args:
func: 要包装的函数
name: Runnable 的名称(可选,默认使用函数名)
"""
# 检查传入的 func 是否为可调用对象
if not callable(func):
raise TypeError(f"func 必须是可调用对象,但得到了 {type(func)}")
# 保存待封装的函数
self.func = func
# 如果 name 明确传入则使用,否则合理推断
if name is not None:
self.name = name
else:
try:
# 尽量用函数原名,如果是 lambda 就命名为 "lambda"
self.name = func.__name__ if func.__name__ != "<lambda>" else "lambda"
except AttributeError:
# 对于匿名对象无法获取 __name__ 时兜底
self.name = "runnable"
# 实现 invoke,同步调用底层函数
def invoke(self, input, **kwargs):
"""
调用包装的函数
Args:
input: 输入值
**kwargs: 额外的关键字参数(会传递给函数)
Returns:
函数的返回值
"""
return self.func(input, **kwargs)
# 批量调用内部依然调用 invoke,保证与 Runnable 基类一致
def batch(self, inputs: list, **kwargs) -> list:
"""
批量调用包装的函数
Args:
inputs: 输入值列表
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 调用 invoke 实现批量处理
return [self.invoke(input_item, **kwargs) for input_item in inputs]
# 流式调用:直接复用基类的流式封装
def stream(self, input, **kwargs):
"""
流式调用包装的函数
对返回生成器/可迭代对象逐项 yield;若是单值则直接 yield。
"""
yield from super().stream(input, **kwargs)
# 返回对象自身的字符串表达,便于排查与日志
def __repr__(self) -> str:
"""返回 RunnableLambda 的字符串表示"""
return f"RunnableLambda(func={self.name})"
# 定义 RunnableSequence 类,用于实现可运行对象的链式组合(A | B | C 的效果)
+class RunnableSequence(Runnable):
+ """
+ Runnable 组合序列,用于支持 A | B | C 的链式拼接。
+ """
# 初始化方法,接收一个 Runnable 对象的列表
+ def __init__(self, runnables: list[Runnable]):
# 检查传入的 runnables 列表不能为空
+ if not runnables:
+ raise ValueError("runnables 不能为空")
# 校验每一个元素都必须是 Runnable 实例
+ for r in runnables:
+ if not isinstance(r, Runnable):
+ raise TypeError("runnables 需全部为 Runnable 实例")
# 保存连成链的 runnable 组件
+ self.runnables = runnables
# 实现管道操作符 |,使链式拼接成立
+ def __or__(self, other):
# 右侧对象必须也是 Runnable 实例
+ if not isinstance(other, Runnable):
+ raise TypeError("管道右侧必须是 Runnable 实例")
# 返回新的组合链(原有链 + 新加的 runnable)
+ return RunnableSequence(self.runnables + [other])
# 调用链的同步调用,将输入依次传过所有组件
+ def invoke(self, input, **kwargs):
+ """
+ 逐个执行链条:上一步输出作为下一步输入。
+ """
# 初始 value 为输入 input
+ value = input
# 依次调用每个 runnable 的 invoke,并传递最新的 value
+ for runnable in self.runnables:
+ value = runnable.invoke(value, **kwargs)
# 返回最后一步的输出值
+ return value
# 批量调用,输入为多个 input,结果为每个 input 执行完整链条的输出
+ def batch(self, inputs: list, **kwargs) -> list:
+ """
+ 对输入列表逐项执行同一条链。
+ """
# 逐项调用 invoke,收集所有输出
+ return [self.invoke(item, **kwargs) for item in inputs]
# 流式调用,默认复用基类逻辑(只对链最终结果流式分发)
+ def stream(self, input, **kwargs):
+ """
+ 流式执行:沿用基类逻辑,对最终结果做流式分发。
+ """
# 使用基类 stream
+ yield from super().stream(input, **kwargs)
# 定义字符串表示,便于调试,输出链路结构
+ def __repr__(self) -> str:
# 获取每个 runnable 的名字,用"|"拼接成描述
+ names = " | ".join(getattr(r, "name", r.__class__.__name__) for r in self.runnables)
# 返回自定义格式
+ return f"RunnableSequence({names})"
23.3 类 #
23.3.1 关键类 #
| 类 | 职责 | 主要方法/要点 | 示例用途 |
|---|---|---|---|
Runnable |
定义可运行单元接口,统一同步/批量/流式调用协议 | invoke 抽象;batch 逐项调用 invoke;stream 默认对结果做流式分发 |
所有 runnable 的基类 |
RunnableLambda |
将任意可调用对象封装成 Runnable |
invoke 直接调用底层函数;batch 循环 invoke;stream 复用基类 |
把普通函数(如加1、乘2、加前缀)包装成 runnable |
RunnableSequence |
实现 A B C 的链式组合,顺序执行 |
__or__ 追加链;invoke 依序把上一步输出传给下一步;batch 对列表逐项跑整条链;stream 流式分发最终结果 |
在示例中用 add_one_runnable multiply_two_runnable add_prefix_runnable 形成处理流水线 |
23.3.2 类图 #

23.3.3 时序图 #
23.3.3.1 单次调用链(invoke) #
输入 5,经链 add_one -> multiply_two -> add_prefix:
23.3.3.2 批量调用(batch) #
输入列表 [1,2,3,4],对每个元素跑完整条链:
(在代码中输出为 ["结果: 4", "结果: 6", "结果: 8", "结果: 10"])
23.3.3.3 流式调用(stream) #
本链最终返回字符串,stream 会直接逐项 yield 最终值:
23.3.4 调用过程 #
chain = add_one_runnable | multiply_two_runnable | add_prefix_runnable:__or__返回新的RunnableSequence,内部维护[add_one_runnable, multiply_two_runnable, add_prefix_runnable]。invoke(x):顺序遍历runnables,每次用上一步输出作为下一步输入,最终返回末节点结果。batch(list_x):对输入列表逐项调用同一条链,结果列表与输入一一对应。stream(x):调用基类stream,对最终结果流式分发(若结果是生成器/可迭代则逐项 yield;字符串/字节/字典视为单值直接 yield)。
24.RunnablePassthrough #
RunnablePassthrough是一个“直通式”的Runnable实现——它接收输入,原样返回,不做任何处理。- 这在调试、保留原始输入、不需要加工等场景非常有用,可以作为链的“占位符”,或在处理流程中临时跳过某些步骤。
使用场景举例
- 保留原始输入内容,链式分支结构中某些支路无需变换时;
- 快速测试流水线接口通畅;
- 复用链式拼接语法但步骤需要“直通”场景。
典型示例
简单输入、直接输出:
from smartchain.runnables import RunnablePassthrough
p = RunnablePassthrough()
print(p.invoke("abc")) # 输出: abc
print(p.batch([1,2,3])) # 输出: [1, 2, 3]
for v in p.stream("xyz"):
print(v) # 输出: xyz与其它 Runnable“拼接”,实现“先原样保留、再处理”功能:
from smartchain.runnables import RunnableLambda, RunnablePassthrough
def to_upper(text):
return text.upper()
combined = RunnablePassthrough() | RunnableLambda(to_upper)
print(combined.invoke("hello")) # 输出: HELLO主要方法
invoke(input, **kwargs):直接return inputbatch(inputs, **kwargs):直接return list(inputs)stream(input, **kwargs):yield input- 支持链式管道拼接
24.1. 24.RunnablePassthrough.py #
24.RunnablePassthrough.py
#from langchain_core.runnables import RunnableLambda,RunnablePassthrough
# 导入RunnableLambda类
from smartchain.runnables import RunnableLambda,RunnablePassthrough
# 创建一个处理函数
def process_text(text):
"""处理文本"""
return text.upper()
# 将函数包装成 Runnable
process_runnable = RunnableLambda(process_text)
# 创建 RunnablePassthrough
# RunnablePassthrough 会原样返回输入,不做任何处理
pass_through = RunnablePassthrough()
# 测试 RunnablePassthrough
# 输入什么,就输出什么
result = pass_through.invoke("测试")
print(f"输入: '测试'")
print(f"输出: {result}") # 输出:测试
# 批量调用:原样返回列表
batch_inputs = ["a", "b", "c"]
batch_results = pass_through.batch(batch_inputs)
print(f"批量输入: {batch_inputs}")
print(f"批量输出: {batch_results}") # 输出:["a", "b", "c"]
# 流式调用:对单值也会直接逐项 yield
print("流式输出:")
for chunk in pass_through.stream("stream-test"):
print(f" 收到: {chunk}")
# 组合使用
# 这个链会同时返回原始输入和处理后的输入
chain = RunnablePassthrough() | process_runnable
result = chain.invoke("hello")
print(f"输入: 'hello'")
print(f"输出: {result}") # 输出:HELLO24.2. runnables.py #
smartchain/runnables.py
# 导入抽象基类 (ABC: 抽象基类基类,abstractmethod: 用于定义抽象方法)
from abc import ABC, abstractmethod
# 导入 inspect 模块,用于获取函数签名和参数类型
import inspect
# 定义 Runnable 抽象基类,所有可运行单元必须继承它
class Runnable(ABC):
"""
Runnable 抽象基类
所有可运行组件的基础接口,定义了统一的调用方法。
"""
# 抽象方法,子类必须实现,用于同步调用
@abstractmethod
def invoke(self, input, **kwargs):
"""
同步调用 Runnable
Args:
input: 输入值
**kwargs: 额外的关键字参数
Returns:
输出值
"""
pass # 仅做接口规范,子类务必实现
def stream(self, input, **kwargs):
"""
流式调用 Runnable
默认实现:先调用 invoke,若返回可迭代且不是字符串/字节/字典,则逐项 yield;
否则直接 yield 单值。
"""
result = self.invoke(input, **kwargs)
# 字符串/字节/字典不视为流式可迭代,直接返回单值
if hasattr(result, "__iter__") and not isinstance(result, (str, bytes, dict)):
for item in result:
yield item
else:
yield result
# 管道操作符,便于链式拼接
def __or__(self, other):
if not isinstance(other, Runnable):
raise TypeError("管道右侧必须是 Runnable 实例")
return RunnableSequence([self, other])
# 定义批量调用方法,默认实现为遍历输入逐个调用 invoke
def batch(self, inputs: list, **kwargs) -> list:
"""
批量调用 Runnable
Args:
inputs: 输入值列表
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 对每个输入项都调用 invoke,并收集结果
return [self.invoke(input_item, **kwargs) for input_item in inputs]
# 定义 RunnableLambda 类,用于将普通 Python 函数封装为 Runnable 对象
class RunnableLambda(Runnable):
"""
RunnableLambda 将普通 Python 函数包装成 Runnable
这使得普通函数可以在链式调用中使用,并支持统一的 invoke 接口。
示例:
``python
def add_one(x: int) -> int:
return x + 1
runnable = RunnableLambda(add_one)
result = runnable.invoke(5) # 返回 6
results = runnable.batch([1, 2, 3]) # 返回 [2, 3, 4]
``
"""
# 初始化方法,接收一个函数和可选的名称
def __init__(self, func, name: str | None = None):
"""
初始化 RunnableLambda
Args:
func: 要包装的函数
name: Runnable 的名称(可选,默认使用函数名)
"""
# 检查传入的 func 是否为可调用对象
if not callable(func):
raise TypeError(f"func 必须是可调用对象,但得到了 {type(func)}")
# 保存待封装的函数
self.func = func
# 如果 name 明确传入则使用,否则合理推断
if name is not None:
self.name = name
else:
try:
# 尽量用函数原名,如果是 lambda 就命名为 "lambda"
self.name = func.__name__ if func.__name__ != "<lambda>" else "lambda"
except AttributeError:
# 对于匿名对象无法获取 __name__ 时兜底
self.name = "runnable"
# 实现 invoke,同步调用底层函数
def invoke(self, input, **kwargs):
"""
调用包装的函数
Args:
input: 输入值
**kwargs: 额外的关键字参数(会传递给函数)
Returns:
函数的返回值
"""
return self.func(input, **kwargs)
# 批量调用内部依然调用 invoke,保证与 Runnable 基类一致
def batch(self, inputs: list, **kwargs) -> list:
"""
批量调用包装的函数
Args:
inputs: 输入值列表
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 调用 invoke 实现批量处理
return [self.invoke(input_item, **kwargs) for input_item in inputs]
# 流式调用:直接复用基类的流式封装
def stream(self, input, **kwargs):
"""
流式调用包装的函数
对返回生成器/可迭代对象逐项 yield;若是单值则直接 yield。
"""
yield from super().stream(input, **kwargs)
# 返回对象自身的字符串表达,便于排查与日志
def __repr__(self) -> str:
"""返回 RunnableLambda 的字符串表示"""
return f"RunnableLambda(func={self.name})"
+class RunnablePassthrough(Runnable):
+ """
+ 直通型 Runnable:原样返回输入,不做任何处理。
+ 可用于调试或需要保留原始输入的场景。
+ """
+ def invoke(self, input, **kwargs):
+ return input
+ def batch(self, inputs: list, **kwargs) -> list:
+ return list(inputs)
+ def stream(self, input, **kwargs):
# 复用基类流式封装(对单值直接 yield)
+ yield from super().stream(input, **kwargs)
+ def __repr__(self) -> str:
+ return "RunnablePassthrough()"
# 定义 RunnableSequence 类,用于实现可运行对象的链式组合(A | B | C 的效果)
class RunnableSequence(Runnable):
"""
Runnable 组合序列,用于支持 A | B | C 的链式拼接。
"""
# 初始化方法,接收一个 Runnable 对象的列表
def __init__(self, runnables: list[Runnable]):
# 检查传入的 runnables 列表不能为空
if not runnables:
raise ValueError("runnables 不能为空")
# 校验每一个元素都必须是 Runnable 实例
for r in runnables:
if not isinstance(r, Runnable):
raise TypeError("runnables 需全部为 Runnable 实例")
# 保存连成链的 runnable 组件
self.runnables = runnables
# 实现管道操作符 |,使链式拼接成立
def __or__(self, other):
# 右侧对象必须也是 Runnable 实例
if not isinstance(other, Runnable):
raise TypeError("管道右侧必须是 Runnable 实例")
# 返回新的组合链(原有链 + 新加的 runnable)
return RunnableSequence(self.runnables + [other])
# 调用链的同步调用,将输入依次传过所有组件
def invoke(self, input, **kwargs):
"""
逐个执行链条:上一步输出作为下一步输入。
"""
# 初始 value 为输入 input
value = input
# 依次调用每个 runnable 的 invoke,并传递最新的 value
for runnable in self.runnables:
value = runnable.invoke(value, **kwargs)
# 返回最后一步的输出值
return value
# 批量调用,输入为多个 input,结果为每个 input 执行完整链条的输出
def batch(self, inputs: list, **kwargs) -> list:
"""
对输入列表逐项执行同一条链。
"""
# 逐项调用 invoke,收集所有输出
return [self.invoke(item, **kwargs) for item in inputs]
# 流式调用,默认复用基类逻辑(只对链最终结果流式分发)
def stream(self, input, **kwargs):
"""
流式执行:沿用基类逻辑,对最终结果做流式分发。
"""
# 使用基类 stream
yield from super().stream(input, **kwargs)
# 定义字符串表示,便于调试,输出链路结构
def __repr__(self) -> str:
# 获取每个 runnable 的名字,用"|"拼接成描述
names = " | ".join(getattr(r, "name", r.__class__.__name__) for r in self.runnables)
# 返回自定义格式
return f"RunnableSequence({names})"24.3 类 #
24.3.1 类说明 #
| 类名 | 作用 | 主要方法 | 说明 | ||
|---|---|---|---|---|---|
| Runnable | 可运行对象抽象基类 | invoke(), stream(), batch(), __or__() |
定义所有可运行组件的统一接口,支持同步、流式和批量调用 | ||
| RunnablePassthrough | 直通型 Runnable | invoke(), stream(), batch(), __repr__() |
原样返回输入,不做任何处理,用于调试或保留原始输入 | ||
| RunnableLambda | Lambda 包装器 | __init__(), invoke(), stream(), batch(), __repr__() |
将普通 Python 函数包装成 Runnable 对象,支持链式调用 | ||
| RunnableSequence | 可运行对象序列 | __init__(), invoke(), stream(), batch(), __or__() |
实现链式组合(A \ | B \ | C),将多个 Runnable 串联执行 |
Runnable 类(抽象基类)
| 方法 | 参数 | 返回值 | 功能描述 | |
|---|---|---|---|---|
invoke(input, **kwargs) |
input: 输入值**kwargs: 额外参数 |
Any |
抽象方法,子类必须实现,用于同步调用 | |
stream(input, **kwargs) |
input: 输入值**kwargs: 额外参数 |
生成器 | 流式调用,默认实现先调用 invoke,然后对结果进行流式分发 | |
batch(inputs, **kwargs) |
inputs: 输入值列表**kwargs: 额外参数 |
list |
批量调用,默认实现为遍历输入逐个调用 invoke | |
__or__(other) |
other: 另一个 Runnable |
RunnableSequence |
管道操作符,用于链式拼接(A \ | B) |
RunnablePassthrough 类
| 方法 | 参数 | 返回值 | 功能描述 |
|---|---|---|---|
invoke(input, **kwargs) |
input: 输入值**kwargs: 额外参数 |
Any |
原样返回输入,不做任何处理 |
stream(input, **kwargs) |
input: 输入值**kwargs: 额外参数 |
生成器 | 流式调用,复用基类逻辑,对单值直接 yield |
batch(inputs, **kwargs) |
inputs: 输入值列表**kwargs: 额外参数 |
list |
批量调用,原样返回输入列表 |
__repr__() |
无 | str |
返回字符串表示:"RunnablePassthrough()" |
RunnableLambda 类
| 方法 | 参数 | 返回值 | 功能描述 |
|---|---|---|---|
__init__(func, name) |
func: 要包装的函数name: 可选名称 |
无 | 初始化 RunnableLambda,检查函数可调用性,设置名称 |
invoke(input, **kwargs) |
input: 输入值**kwargs: 额外参数 |
Any |
调用包装的函数,传递输入和额外参数 |
stream(input, **kwargs) |
input: 输入值**kwargs: 额外参数 |
生成器 | 流式调用,复用基类逻辑 |
batch(inputs, **kwargs) |
inputs: 输入值列表**kwargs: 额外参数 |
list |
批量调用,对每个输入调用 invoke |
__repr__() |
无 | str |
返回字符串表示:"RunnableLambda(func=函数名)" |
RunnableSequence 类
| 方法 | 参数 | 返回值 | 功能描述 |
|---|---|---|---|
__init__(runnables) |
runnables: Runnable 列表 |
无 | 初始化序列,验证所有元素都是 Runnable 实例 |
invoke(input, **kwargs) |
input: 输入值**kwargs: 额外参数 |
Any |
依次执行链条中的每个 Runnable,上一步输出作为下一步输入 |
stream(input, **kwargs) |
input: 输入值**kwargs: 额外参数 |
生成器 | 流式调用,对最终结果进行流式分发 |
batch(inputs, **kwargs) |
inputs: 输入值列表**kwargs: 额外参数 |
list |
批量调用,对每个输入执行完整链条 |
__or__(other) |
other: 另一个 Runnable |
RunnableSequence |
管道操作符,将新的 Runnable 添加到链条末尾 |
24.3.2 类图 #
类关系说明
继承关系:
RunnablePassthrough继承自RunnableRunnableLambda继承自RunnableRunnableSequence继承自Runnable
组合关系:
RunnableLambda内部包含func函数对象RunnableSequence内部包含runnables列表,存储多个 Runnable 实例
依赖关系:
RunnableSequence通过__or__()方法组合多个 RunnableRunnableLambda包装普通函数(如process_text)
24.3.3 时序图 #
24.3.3.1 基本调用流程 #
24.3.3.2 批量调用流程 #
24.3.3.3 流式调用流程 #
24.3.3.4 组合使用流程 #
24.3.4 调用过程 #
24.3.4.1 代码执行流程分析 #
阶段一:函数定义和包装(第 7-12 行)
def process_text(text):
return text.upper()
process_runnable = RunnableLambda(process_text)执行过程:
函数定义:
- 定义
process_text函数,将文本转换为大写
- 定义
包装为 Runnable:
RunnableLambda(process_text) ↓ 检查: callable(process_text) → True ↓ 保存: self.func = process_text ↓ 设置名称: self.name = "process_text" (从 __name__ 获取) ↓ 创建 RunnableLambda 实例
阶段二:RunnablePassthrough 创建(第 16 行)
pass_through = RunnablePassthrough()执行过程:
RunnablePassthrough()
↓
调用父类 Runnable.__init__() (如果有)
↓
创建 RunnablePassthrough 实例阶段三:同步调用测试(第 20-22 行)
result = pass_through.invoke("测试")详细步骤:
invoke("测试")
↓
直接返回 input
↓
返回: "测试"阶段四:批量调用测试(第 25-28 行)
batch_inputs = ["a", "b", "c"]
batch_results = pass_through.batch(batch_inputs)详细步骤:
batch(["a", "b", "c"])
↓
直接返回 list(inputs)
↓
返回: ["a", "b", "c"]阶段五:流式调用测试(第 32-33 行)
for chunk in pass_through.stream("stream-test"):
print(f" 收到: {chunk}")详细步骤:
stream("stream-test")
↓
调用 super().stream("stream-test")
↓
步骤 1: result = invoke("stream-test") → "stream-test"
↓
步骤 2: 检查 result 是否可迭代
- isinstance("stream-test", (str, bytes, dict)) → True
- 字符串不视为流式可迭代
↓
步骤 3: yield result → yield "stream-test"
↓
生成器返回: "stream-test"阶段六:组合使用(第 37-40 行)
chain = RunnablePassthrough() | process_runnable
result = chain.invoke("hello")详细步骤:
管道操作符:
RunnablePassthrough() | process_runnable ↓ pass_through.__or__(process_runnable) ↓ 检查: isinstance(process_runnable, Runnable) → True ↓ 创建: RunnableSequence([pass_through, process_runnable]) ↓ 验证: 所有元素都是 Runnable 实例 ↓ 返回: RunnableSequence 实例链式调用:
chain.invoke("hello") ↓ 步骤 1: value = "hello" ↓ 步骤 2: 遍历 runnables[0] (RunnablePassthrough) value = runnable.invoke("hello") → 返回 "hello" value = "hello" ↓ 步骤 3: 遍历 runnables[1] (RunnableLambda) value = runnable.invoke("hello") → 调用 process_text("hello") → 返回 "HELLO" value = "HELLO" ↓ 步骤 4: 返回 value → "HELLO"
24.3.4.2 数据流转过程 #
基本调用数据流
输入值
↓
RunnablePassthrough.invoke()
↓
直接返回(不做任何处理)
↓
输出值(与输入相同)批量调用数据流
输入列表 ["a", "b", "c"]
↓
RunnablePassthrough.batch()
↓
直接返回 list(inputs)
↓
输出列表 ["a", "b", "c"]流式调用数据流
输入值 "stream-test"
↓
RunnablePassthrough.stream()
↓
调用基类 stream()
↓
invoke() → "stream-test"
↓
检查可迭代性(字符串不视为可迭代)
↓
yield "stream-test"
↓
生成器输出组合调用数据流
输入值 "hello"
↓
RunnableSequence.invoke()
↓
步骤 1: RunnablePassthrough.invoke("hello")
→ 返回 "hello"
↓
步骤 2: RunnableLambda.invoke("hello")
→ process_text("hello")
→ "hello".upper()
→ 返回 "HELLO"
↓
最终输出 "HELLO"4.3 关键设计模式 #
模板方法模式:
Runnable定义统一的接口规范- 子类实现具体的
invoke()方法 stream()和batch()提供默认实现
适配器模式:
RunnableLambda将普通函数适配为 Runnable 接口- 使普通函数可以在链式调用中使用
组合模式:
RunnableSequence组合多个 Runnable 对象- 支持链式执行
装饰器模式:
RunnablePassthrough作为装饰器,原样传递输入- 可用于调试或保留原始输入
管道模式:
- 通过
__or__()操作符实现管道拼接 - 支持
A | B | C的链式语法
- 通过
24.3.4.4 RunnablePassthrough 的使用场景 #
调试:
- 在链中插入
RunnablePassthrough()查看中间结果 - 不影响数据流,只是原样传递
- 在链中插入
保留原始输入:
- 当需要同时保留原始输入和处理后的输出时使用
- 可以与其他 Runnable 组合实现复杂逻辑
占位符:
- 在链式调用中作为占位符
- 后续可以替换为实际的 Runnable
测试:
- 用于测试链式调用的结构
- 验证管道操作符是否正常工作
24.3.4.5 管道操作符的工作原理 #
A | B | C执行过程:
步骤 1: A.__or__(B)
→ 创建 RunnableSequence([A, B])
步骤 2: RunnableSequence([A, B]).__or__(C)
→ 创建 RunnableSequence([A, B, C])
步骤 3: 调用 invoke(input)
→ value = A.invoke(input)
→ value = B.invoke(value)
→ value = C.invoke(value)
→ 返回最终 value24.3.4.6 流式调用的处理逻辑 #
基类 Runnable.stream() 的默认实现:
def stream(self, input, **kwargs):
result = self.invoke(input, **kwargs)
if hasattr(result, "__iter__") and not isinstance(result, (str, bytes, dict)):
for item in result:
yield item
else:
yield result处理逻辑:
- 先调用
invoke()获取结果 - 检查结果是否可迭代
- 排除字符串、字节和字典(这些不视为流式可迭代)
- 如果是可迭代对象,逐项 yield
- 否则直接 yield 单值
对于 RunnablePassthrough:
invoke()返回输入值(可能是字符串)- 字符串不视为可迭代,直接 yield 单值
25.RunnableParallel #
RunnableParallel是一种可同时并行执行多个 Runnable 子组件的高级组合单元。
RunnableParallel 的核心作用是:给定一个输入,自动将其交给你指定的多组 Runnable 分别处理,然后把每个结果用字典形式一次性返回。这样可以让你轻松管理多路并行分支,避免重复调用或夹杂多余循环,提高链式和流水线组合的表达力。
主要特性:
- 可自定义任意数量的分支(通过关键字参数方式传入,每个键就是输出字典的键名)
- 每个分支可以是不同行为的
Runnable - 支持三种调用模式:
invoke(单项目)、batch(批量)、stream(流式),调用接口与普通Runnable无缝一致 - 自动校验参数与类型安全
25.1. 25.RunnableParallel.py #
25.RunnableParallel.py
#from langchain_core.runnables import RunnableLambda,RunnableParallel
# 导入RunnableLambda类
from smartchain.runnables import RunnableLambda,RunnableParallel
# 定义多个处理函数
def add_one(x):
"""加1"""
return x + 1
def multiply_two(x):
"""乘以2"""
return x * 2
def square(x):
"""平方"""
return x ** 2
# 将函数包装成 Runnable
add_one_runnable = RunnableLambda(add_one)
multiply_two_runnable = RunnableLambda(multiply_two)
square_runnable = RunnableLambda(square)
# 创建 RunnableParallel
# RunnableParallel 可以同时执行多个 Runnable
# 每个 Runnable 的结果会作为字典的一个键值对
parallel = RunnableParallel(
added=add_one_runnable, # 结果存储在 "added" 键下
multiplied=multiply_two_runnable, # 结果存储在 "multiplied" 键下
squared=square_runnable # 结果存储在 "squared" 键下
)
# 使用 invoke 方法调用
# 输入:数字
# 输出:字典,包含所有处理结果
input_value = 5
result = parallel.invoke(input_value)
# 打印结果
print(f"输入: {input_value}")
print(f"输出: {result}")
# 输出:{'added': 6, 'multiplied': 10, 'squared': 25}
# 批量调用:对多个输入同时并行计算,返回结果列表
batch_inputs = [1, 2, 3]
batch_results = parallel.batch(batch_inputs)
print(f"批量输入: {batch_inputs}")
print(f"批量输出: {batch_results}")
# 预期:
# [
# {'added': 2, 'multiplied': 2, 'squared': 1},
# {'added': 3, 'multiplied': 4, 'squared': 4},
# {'added': 4, 'multiplied': 6, 'squared': 9},
# ]
# 流式调用:对单个输入流式获取并行结果
print("流式输出:")
for chunk in parallel.stream(4):
print(f" 收到: {chunk}")
# 预期单次产出:
# {'added': 5, 'multiplied': 8, 'squared': 16}25.2. runnables.py #
smartchain/runnables.py
# 导入抽象基类 (ABC: 抽象基类基类,abstractmethod: 用于定义抽象方法)
from abc import ABC, abstractmethod
# 导入 inspect 模块,用于获取函数签名和参数类型
import inspect
from typing import Any
# 定义 Runnable 抽象基类,所有可运行单元必须继承它
class Runnable(ABC):
"""
Runnable 抽象基类
所有可运行组件的基础接口,定义了统一的调用方法。
"""
# 抽象方法,子类必须实现,用于同步调用
@abstractmethod
def invoke(self, input, **kwargs):
"""
同步调用 Runnable
Args:
input: 输入值
**kwargs: 额外的关键字参数
Returns:
输出值
"""
pass # 仅做接口规范,子类务必实现
def stream(self, input, **kwargs):
"""
流式调用 Runnable
默认实现:先调用 invoke,若返回可迭代且不是字符串/字节/字典,则逐项 yield;
否则直接 yield 单值。
"""
result = self.invoke(input, **kwargs)
# 字符串/字节/字典不视为流式可迭代,直接返回单值
if hasattr(result, "__iter__") and not isinstance(result, (str, bytes, dict)):
for item in result:
yield item
else:
yield result
# 管道操作符,便于链式拼接
def __or__(self, other):
if not isinstance(other, Runnable):
raise TypeError("管道右侧必须是 Runnable 实例")
return RunnableSequence([self, other])
# 定义批量调用方法,默认实现为遍历输入逐个调用 invoke
def batch(self, inputs: list, **kwargs) -> list:
"""
批量调用 Runnable
Args:
inputs: 输入值列表
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 对每个输入项都调用 invoke,并收集结果
return [self.invoke(input_item, **kwargs) for input_item in inputs]
# 定义 RunnableLambda 类,用于将普通 Python 函数封装为 Runnable 对象
class RunnableLambda(Runnable):
"""
RunnableLambda 将普通 Python 函数包装成 Runnable
这使得普通函数可以在链式调用中使用,并支持统一的 invoke 接口。
示例:
``python
def add_one(x: int) -> int:
return x + 1
runnable = RunnableLambda(add_one)
result = runnable.invoke(5) # 返回 6
results = runnable.batch([1, 2, 3]) # 返回 [2, 3, 4]
``
"""
# 初始化方法,接收一个函数和可选的名称
def __init__(self, func, name: str | None = None):
"""
初始化 RunnableLambda
Args:
func: 要包装的函数
name: Runnable 的名称(可选,默认使用函数名)
"""
# 检查传入的 func 是否为可调用对象
if not callable(func):
raise TypeError(f"func 必须是可调用对象,但得到了 {type(func)}")
# 保存待封装的函数
self.func = func
# 如果 name 明确传入则使用,否则合理推断
if name is not None:
self.name = name
else:
try:
# 尽量用函数原名,如果是 lambda 就命名为 "lambda"
self.name = func.__name__ if func.__name__ != "<lambda>" else "lambda"
except AttributeError:
# 对于匿名对象无法获取 __name__ 时兜底
self.name = "runnable"
# 实现 invoke,同步调用底层函数
def invoke(self, input, **kwargs):
"""
调用包装的函数
Args:
input: 输入值
**kwargs: 额外的关键字参数(会传递给函数)
Returns:
函数的返回值
"""
return self.func(input, **kwargs)
# 批量调用内部依然调用 invoke,保证与 Runnable 基类一致
def batch(self, inputs: list, **kwargs) -> list:
"""
批量调用包装的函数
Args:
inputs: 输入值列表
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 调用 invoke 实现批量处理
return [self.invoke(input_item, **kwargs) for input_item in inputs]
# 流式调用:直接复用基类的流式封装
def stream(self, input, **kwargs):
"""
流式调用包装的函数
对返回生成器/可迭代对象逐项 yield;若是单值则直接 yield。
"""
yield from super().stream(input, **kwargs)
# 返回对象自身的字符串表达,便于排查与日志
def __repr__(self) -> str:
"""返回 RunnableLambda 的字符串表示"""
return f"RunnableLambda(func={self.name})"
# 定义 RunnableParallel,继承自 Runnable
+class RunnableParallel(Runnable):
+ """
+ 并行执行多个 Runnable,返回字典结果。
+ 使用示例:
+ parallel = RunnableParallel(a=r1, b=r2)
+ result = parallel.invoke(input) # {"a": ..., "b": ...}
+ """
# 构造方法,接收若干个可运行对象作为关键字参数
+ def __init__(self, **runnables: Runnable):
# 如果未传递任何 runnable,则报错
+ if not runnables:
+ raise ValueError("至少需要一个 runnable")
# 检查每个传入的值是否为 Runnable 实例
+ for name, r in runnables.items():
+ if not isinstance(r, Runnable):
+ raise TypeError(f"键 {name} 的值必须是 Runnable 实例")
# 保存所有传入的 runnable 到实例属性
+ self.runnables = runnables
# 同步调用,将相同输入传递给所有子 runnable,并收集结果为字典
+ def invoke(self, input, **kwargs):
+ """
+ 同一输入传给所有子 runnable,收集结果为字典。
+ """
# 遍历每个 runnable,调用其 invoke,结果收集为 {name: 返回值}
+ return {name: r.invoke(input, **kwargs) for name, r in self.runnables.items()}
# 批量调用,对输入列表每一项都运行 invoke,返回结果字典的列表
+ def batch(self, inputs: list, **kwargs) -> list:
+ """
+ 对输入列表逐项并行处理,返回字典列表。
+ """
# 对每个输入元素调用 invoke,收集所有结果
+ return [self.invoke(item, **kwargs) for item in inputs]
# 流式调用,直接调用父类的流式实现
+ def stream(self, input, **kwargs):
+ """
+ 对单次输入执行并返回一个字典,流式单次产出。
+ """
# 复用基类的 stream 方法
+ yield from super().stream(input, **kwargs)
# 返回对象的字符串表示(列出包含的所有子 runnable 的键名)
+ def __repr__(self) -> str:
# 拼接所有 runnable 的键名
+ keys = ", ".join(self.runnables.keys())
# 返回格式化字符串
+ return f"RunnableParallel({keys})"
class RunnablePassthrough(Runnable):
"""
直通型 Runnable:原样返回输入,不做任何处理。
可用于调试或需要保留原始输入的场景。
"""
def invoke(self, input, **kwargs):
return input
def batch(self, inputs: list, **kwargs) -> list:
return list[Any](inputs)
def stream(self, input, **kwargs):
# 复用基类流式封装(对单值直接 yield)
yield from super().stream(input, **kwargs)
def __repr__(self) -> str:
return "RunnablePassthrough()"
# 定义 RunnableSequence 类,用于实现可运行对象的链式组合(A | B | C 的效果)
class RunnableSequence(Runnable):
"""
Runnable 组合序列,用于支持 A | B | C 的链式拼接。
"""
# 初始化方法,接收一个 Runnable 对象的列表
def __init__(self, runnables: list[Runnable]):
# 检查传入的 runnables 列表不能为空
if not runnables:
raise ValueError("runnables 不能为空")
# 校验每一个元素都必须是 Runnable 实例
for r in runnables:
if not isinstance(r, Runnable):
raise TypeError("runnables 需全部为 Runnable 实例")
# 保存连成链的 runnable 组件
self.runnables = runnables
# 实现管道操作符 |,使链式拼接成立
def __or__(self, other):
# 右侧对象必须也是 Runnable 实例
if not isinstance(other, Runnable):
raise TypeError("管道右侧必须是 Runnable 实例")
# 返回新的组合链(原有链 + 新加的 runnable)
return RunnableSequence(self.runnables + [other])
# 调用链的同步调用,将输入依次传过所有组件
def invoke(self, input, **kwargs):
"""
逐个执行链条:上一步输出作为下一步输入。
"""
# 初始 value 为输入 input
value = input
# 依次调用每个 runnable 的 invoke,并传递最新的 value
for runnable in self.runnables:
value = runnable.invoke(value, **kwargs)
# 返回最后一步的输出值
return value
# 批量调用,输入为多个 input,结果为每个 input 执行完整链条的输出
def batch(self, inputs: list, **kwargs) -> list:
"""
对输入列表逐项执行同一条链。
"""
# 逐项调用 invoke,收集所有输出
return [self.invoke(item, **kwargs) for item in inputs]
# 流式调用,默认复用基类逻辑(只对链最终结果流式分发)
def stream(self, input, **kwargs):
"""
流式执行:沿用基类逻辑,对最终结果做流式分发。
"""
# 使用基类 stream
yield from super().stream(input, **kwargs)
# 定义字符串表示,便于调试,输出链路结构
def __repr__(self) -> str:
# 获取每个 runnable 的名字,用"|"拼接成描述
names = " | ".join(getattr(r, "name", r.__class__.__name__) for r in self.runnables)
# 返回自定义格式
return f"RunnableSequence({names})"
25.3. 类 #
25.3.1 类说明 #
| 类名 | 职责 | 主要方法/属性 | 在文件中的用途 |
|---|---|---|---|
| Runnable | 抽象基类,定义可运行单元的统一接口 | invoke() 抽象方法batch() 批量调用stream() 流式调用__or__() 管道操作符 |
所有 runnable 的基类 |
| RunnableLambda | 将普通 Python 函数包装成 Runnable 对象 | func 属性(被包装的函数)name 属性(名称)invoke() 调用底层函数batch() 批量处理stream() 流式处理 |
将 add_one、multiply_two、square 三个函数包装成 runnable |
| RunnableParallel | 并行执行多个 Runnable,将结果收集为字典 | runnables 属性(字典,键为名称,值为 Runnable)invoke() 并行调用所有子 runnablebatch() 批量并行处理stream() 流式输出 |
同时执行三个 runnable,结果合并为字典 |
25.3.2 类图 #

25.3.3 时序图 #
25.3.3.1 单次调用(invoke) #
parallel.invoke(5) 的执行流程:
25.3.3.2 批量调用(batch) #
parallel.batch([1, 2, 3]) 的执行流程:
25.3.3.3 流式调用(stream) #
parallel.stream(4) 的执行流程:
25.3.4 调用过程 #
1. 对象创建阶段
# 步骤1:将函数包装成 RunnableLambda
add_one_runnable = RunnableLambda(add_one) # 包装 add_one 函数
multiply_two_runnable = RunnableLambda(multiply_two) # 包装 multiply_two 函数
square_runnable = RunnableLambda(square) # 包装 square 函数
# 步骤2:创建 RunnableParallel,传入多个 runnable 作为关键字参数
parallel = RunnableParallel(
added=add_one_runnable, # 键名 "added" 对应 add_one_runnable
multiplied=multiply_two_runnable, # 键名 "multiplied" 对应 multiply_two_runnable
squared=square_runnable # 键名 "squared" 对应 square_runnable
)创建时:
RunnableParallel.__init__()校验所有值都是Runnable实例- 将
runnables保存为字典:{"added": add_one_runnable, "multiplied": multiply_two_runnable, "squared": square_runnable}
2. invoke 调用过程
result = parallel.invoke(5)执行流程:
RunnableParallel.invoke(5)接收输入5- 遍历
self.runnables,对每个子 runnable 调用invoke(5):add_one_runnable.invoke(5)→add_one(5)→6multiply_two_runnable.invoke(5)→multiply_two(5)→10square_runnable.invoke(5)→square(5)→25
- 将结果收集为字典:
{"added": 6, "multiplied": 10, "squared": 25} - 返回字典
注意:当前实现是顺序执行,非真正并行。如需并发,可使用线程池或异步。
3. batch 调用过程
batch_results = parallel.batch([1, 2, 3])执行流程:
RunnableParallel.batch([1, 2, 3])接收输入列表- 对每个输入调用
invoke:invoke(1)→{'added': 2, 'multiplied': 2, 'squared': 1}invoke(2)→{'added': 3, 'multiplied': 4, 'squared': 4}invoke(3)→{'added': 4, 'multiplied': 6, 'squared': 9}
- 返回结果列表:
[{...}, {...}, {...}]
4. stream 调用过程
for chunk in parallel.stream(4):
print(f" 收到: {chunk}")执行流程:
RunnableParallel.stream(4)调用基类stream- 基类
stream调用invoke(4)得到字典结果 - 基类判断结果为字典(非可迭代流),直接
yield整个字典 - 用户循环收到一个 chunk:
{'added': 5, 'multiplied': 8, 'squared': 16}
核心设计要点
- 统一接口:所有 runnable 都实现
invoke、batch、stream - 组合模式:
RunnableParallel包含多个子 runnable,统一管理 - 字典聚合:并行结果通过键名组织,便于访问
- 流式兼容:复用基类流式逻辑,字典作为单值输出
26.RunnableBranch #
RunnableBranch是一种支持“条件分支执行”的可运行组件。
核心思想
RunnableBranch 用于根据输入数据的不同,选择一条分支执行对应的 runnable,而不是像 RunnableParallel 那样同时执行多个 runnable;也不是像 RunnableSequence 依次串行通过多个组件,而是“多选一”。这非常适合流程中需要基于条件做判断时使用。
类图对比
- RunnableParallel:一个输入,所有子 runnable 并行执行,返回字典(所有结果)。
- RunnableBranch:一个输入,只根据条件匹配走一个子 runnable,返回单一结果。
- RunnableSequence:一个输入,依次通过链条上的每个 runnable,返回最终结果。
使用场景
- 消息/数据根据类型(如整数/字符串/其它等)走不同处理分支
- 分流数据、实现业务流程的 if...elif...else 逻辑
- 对不同类别的数据采用不同模型或功能进行处理
结构与用法
from smartchain.runnables import RunnableBranch, RunnableLambda
branch = RunnableBranch(
(lambda x: x > 0, RunnableLambda(lambda x: f"{x} 是正数")),
(lambda x: x < 0, RunnableLambda(lambda x: f"{x} 是负数")),
RunnableLambda(lambda x: "0 是零") # 默认分支(未命中前面条件时使用)
)- 每个分支由
(条件函数, runnable)组成,条件逐个判断,匹配则执行对应 runnable。 - 最后一个参数是默认分支(必需),当所有条件都不满足时走它。
- 条件函数必须可调用,runnable 必须是 Runnable 实例(如 RunnableLambda)。
工作流程
以 branch.invoke(-5) 为例:
- 依次测试所有条件分支的条件函数
lambda x: x > 0→ Falselambda x: x < 0→ True
- 匹配第二个分支,调用对应 runnable 的
invoke - 如果所有条件都不满足,走默认分支
接口一致性
RunnableBranch 同样支持
invoke(input):判断条件并执行一个分支,返回结果batch(inputs):对列表批量执行分支路由,返回所有结果stream(input):对单个输入流式输出结果(本例大多只返回一个 chunk)
对比 if-else
等价 Python 代码如下:
def classic_ifelse(x):
if x > 0:
return f"{x} 是正数"
elif x < 0:
return f"{x} 是负数"
else:
return "0 是零"RunnableBranch 实现了函数式、可组合、可作为更大管道节点的“分支调度器”。
实践技巧
- 条件分支的顺序从上到下判定,第一条匹配就会选中,不再继续,避免书写有重叠的条件
- 默认分支为兜底,必须是一个 Runnable 实例
- 分支的 runnable 可以是任意复杂/可组合的子流程,而不仅仅是简单输出
26.1. 26.RunnableBranch.py #
26.RunnableBranch.py
# 导入 RunnableLambda 和 RunnableBranch(条件分支调度器)类
#from langchain_core.runnables import RunnableLambda, RunnableBranch
from smartchain.runnables import RunnableLambda, RunnableBranch
# 定义处理正数的函数,输入为正数时返回描述字符串
def handle_positive(x): # 正数
return f"{x} 是正数"
# 定义处理负数的函数,输入为负数时返回描述字符串
def handle_negative(x): # 负数
return f"{x} 是负数"
# 定义处理零的函数,输入为零时返回描述字符串
def handle_zero(x): # 零
return "0 是零"
# 将处理正数的函数封装为 RunnableLambda
pos = RunnableLambda(handle_positive)
# 将处理负数的函数封装为 RunnableLambda
neg = RunnableLambda(handle_negative)
# 将处理零的函数封装为 RunnableLambda
zero = RunnableLambda(handle_zero)
# 创建条件分支:先判断是否为正数,再判断是否为负数,最后默认处理零(顺序很重要)
# RunnableBranch 的最后一个参数是默认分支(未命中前面所有条件时使用)
branch = RunnableBranch(
(lambda v: v > 0, pos), # 条件1:大于0,使用pos处理
(lambda v: v < 0, neg), # 条件2:小于0,使用neg处理
zero, # 默认分支:否则使用zero
)
# 演示单条输入的 invoke 用法
for value in [3, -2, 0]:
# 对每个输入调用分支,并输出结果
print(f"输入 {value} -> {branch.invoke(value)}")
# 演示批量 batch 用法
values = [5, 0, -1]
# 对整个输入列表批量分支处理,并打印输出
print("batch:", branch.batch(values))
# 演示流式 stream 用法(本例每次只产出一个结果)
print("stream 结果:")
for chunk in branch.stream(-7):
# 逐个输出流式返回的结果
print(" ", chunk)26.2. runnables.py #
smartchain/runnables.py
# 导入抽象基类 (ABC: 抽象基类基类,abstractmethod: 用于定义抽象方法)
from abc import ABC, abstractmethod
# 导入 inspect 模块,用于获取函数签名和参数类型
import inspect
+from typing import Any, Callable
# 定义 Runnable 抽象基类,所有可运行单元必须继承它
class Runnable(ABC):
"""
Runnable 抽象基类
所有可运行组件的基础接口,定义了统一的调用方法。
"""
# 抽象方法,子类必须实现,用于同步调用
@abstractmethod
def invoke(self, input, **kwargs):
"""
同步调用 Runnable
Args:
input: 输入值
**kwargs: 额外的关键字参数
Returns:
输出值
"""
pass # 仅做接口规范,子类务必实现
def stream(self, input, **kwargs):
"""
流式调用 Runnable
默认实现:先调用 invoke,若返回可迭代且不是字符串/字节/字典,则逐项 yield;
否则直接 yield 单值。
"""
result = self.invoke(input, **kwargs)
# 字符串/字节/字典不视为流式可迭代,直接返回单值
if hasattr(result, "__iter__") and not isinstance(result, (str, bytes, dict)):
for item in result:
yield item
else:
yield result
# 管道操作符,便于链式拼接
def __or__(self, other):
if not isinstance(other, Runnable):
raise TypeError("管道右侧必须是 Runnable 实例")
return RunnableSequence([self, other])
# 定义批量调用方法,默认实现为遍历输入逐个调用 invoke
def batch(self, inputs: list, **kwargs) -> list:
"""
批量调用 Runnable
Args:
inputs: 输入值列表
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 对每个输入项都调用 invoke,并收集结果
return [self.invoke(input_item, **kwargs) for input_item in inputs]
# 定义 RunnableLambda 类,用于将普通 Python 函数封装为 Runnable 对象
class RunnableLambda(Runnable):
"""
RunnableLambda 将普通 Python 函数包装成 Runnable
这使得普通函数可以在链式调用中使用,并支持统一的 invoke 接口。
示例:
``python
def add_one(x: int) -> int:
return x + 1
runnable = RunnableLambda(add_one)
result = runnable.invoke(5) # 返回 6
results = runnable.batch([1, 2, 3]) # 返回 [2, 3, 4]
``
"""
# 初始化方法,接收一个函数和可选的名称
def __init__(self, func, name: str | None = None):
"""
初始化 RunnableLambda
Args:
func: 要包装的函数
name: Runnable 的名称(可选,默认使用函数名)
"""
# 检查传入的 func 是否为可调用对象
if not callable(func):
raise TypeError(f"func 必须是可调用对象,但得到了 {type(func)}")
# 保存待封装的函数
self.func = func
# 如果 name 明确传入则使用,否则合理推断
if name is not None:
self.name = name
else:
try:
# 尽量用函数原名,如果是 lambda 就命名为 "lambda"
self.name = func.__name__ if func.__name__ != "<lambda>" else "lambda"
except AttributeError:
# 对于匿名对象无法获取 __name__ 时兜底
self.name = "runnable"
# 实现 invoke,同步调用底层函数
def invoke(self, input, **kwargs):
"""
调用包装的函数
Args:
input: 输入值
**kwargs: 额外的关键字参数(会传递给函数)
Returns:
函数的返回值
"""
return self.func(input, **kwargs)
# 批量调用内部依然调用 invoke,保证与 Runnable 基类一致
def batch(self, inputs: list, **kwargs) -> list:
"""
批量调用包装的函数
Args:
inputs: 输入值列表
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 调用 invoke 实现批量处理
return [self.invoke(input_item, **kwargs) for input_item in inputs]
# 流式调用:直接复用基类的流式封装
def stream(self, input, **kwargs):
"""
流式调用包装的函数
对返回生成器/可迭代对象逐项 yield;若是单值则直接 yield。
"""
yield from super().stream(input, **kwargs)
# 返回对象自身的字符串表达,便于排查与日志
def __repr__(self) -> str:
"""返回 RunnableLambda 的字符串表示"""
return f"RunnableLambda(func={self.name})"
# 定义 RunnableParallel,继承自 Runnable
class RunnableParallel(Runnable):
"""
并行执行多个 Runnable,返回字典结果。
使用示例:
parallel = RunnableParallel(a=r1, b=r2)
result = parallel.invoke(input) # {"a": ..., "b": ...}
"""
# 构造方法,接收若干个可运行对象作为关键字参数
def __init__(self, **runnables: Runnable):
# 如果未传递任何 runnable,则报错
if not runnables:
raise ValueError("至少需要一个 runnable")
# 检查每个传入的值是否为 Runnable 实例
for name, r in runnables.items():
if not isinstance(r, Runnable):
raise TypeError(f"键 {name} 的值必须是 Runnable 实例")
# 保存所有传入的 runnable 到实例属性
self.runnables = runnables
# 同步调用,将相同输入传递给所有子 runnable,并收集结果为字典
def invoke(self, input, **kwargs):
"""
同一输入传给所有子 runnable,收集结果为字典。
"""
# 遍历每个 runnable,调用其 invoke,结果收集为 {name: 返回值}
return {name: r.invoke(input, **kwargs) for name, r in self.runnables.items()}
# 批量调用,对输入列表每一项都运行 invoke,返回结果字典的列表
def batch(self, inputs: list, **kwargs) -> list:
"""
对输入列表逐项并行处理,返回字典列表。
"""
# 对每个输入元素调用 invoke,收集所有结果
return [self.invoke(item, **kwargs) for item in inputs]
# 流式调用,直接调用父类的流式实现
def stream(self, input, **kwargs):
"""
对单次输入执行并返回一个字典,流式单次产出。
"""
# 复用基类的 stream 方法
yield from super().stream(input, **kwargs)
# 返回对象的字符串表示(列出包含的所有子 runnable 的键名)
def __repr__(self) -> str:
# 拼接所有 runnable 的键名
keys = ", ".join(self.runnables.keys())
# 返回格式化字符串
return f"RunnableParallel({keys})"
# 定义RunnableBranch类,继承自Runnable,用于条件分支执行不同runnable
+class RunnableBranch(Runnable):
+ """
+ 条件分支执行:按顺序检查条件,匹配则运行对应 runnable,若都不匹配则走默认分支。
+ """
# 构造方法,接受若干分支参数
+ def __init__(self, *branches: Any):
+ """
+ 支持“默认分支作为最后一个位置参数”的用法:
+ RunnableBranch((cond1, r1), (cond2, r2), default_runnable)
+ """
# 分支数量必须至少2(至少一个条件+一个默认)
+ if len(branches) < 2:
+ raise ValueError("至少需要一个条件分支和一个默认分支")
# 将分支参数转为列表
+ branches_list = list(branches)
# 最后一个参数视为默认分支
+ default = branches_list.pop() # 最后一个位置参数为默认分支
# 校验每个分支
+ validated_branches = []
+ for item in branches_list:
# 每个分支需为二元组或二元列表
+ if not (isinstance(item, (tuple, list)) and len(item) == 2):
+ raise TypeError("分支必须是 (condition, runnable) 形式的二元组")
# 解包条件函数和runnable
+ cond, runnable = item
# 条件必须为可调用对象
+ if not callable(cond):
+ raise TypeError("分支条件必须是可调用对象")
# runnable必须是Runnable实例
+ if not isinstance(runnable, Runnable):
+ raise TypeError("分支 runnable 必须是 Runnable 实例")
# 校验通过则加入分支列表
+ validated_branches.append((cond, runnable))
# 校验默认分支必须为Runnable实例
+ if not isinstance(default, Runnable):
+ raise TypeError("默认分支必须是 Runnable 实例")
# 保存所有条件分支
+ self.branches = validated_branches
# 保存默认分支
+ self.default = default
# 单个输入同步调用方法
+ def invoke(self, input, **kwargs):
+ """
+ 按顺序匹配条件,命中即执行对应 runnable;否则走默认分支。
+ """
# 遍历所有分支,遇到条件命中则执行对应runnable
+ for cond, runnable in self.branches:
+ if cond(input):
+ return runnable.invoke(input, **kwargs)
# 如果有默认分支则执行默认runnable
+ if self.default is not None:
+ return self.default.invoke(input, **kwargs)
# 无匹配分支时报错
+ raise ValueError("未匹配到任何分支,且未提供默认分支")
# 批量调用,遍历输入批量执行invoke
+ def batch(self, inputs: list, **kwargs) -> list:
# 对输入列表逐一执行invoke
+ return [self.invoke(item, **kwargs) for item in inputs]
# 流式调用,直接调用父类的stream方法
+ def stream(self, input, **kwargs):
# 复用父类的流式实现
+ yield from super().stream(input, **kwargs)
# 返回对象简洁字符串表示
+ def __repr__(self) -> str:
# 拼接分支编号
+ parts = [f"branch{idx}" for idx, _ in enumerate(self.branches)]
# 若有默认分支则拼接default字符串
+ if self.default:
+ parts.append("default")
# 格式化输出
+ return f"RunnableBranch({', '.join(parts)})"
class RunnablePassthrough(Runnable):
"""
直通型 Runnable:原样返回输入,不做任何处理。
可用于调试或需要保留原始输入的场景。
"""
def invoke(self, input, **kwargs):
return input
def batch(self, inputs: list, **kwargs) -> list:
return list[Any](inputs)
def stream(self, input, **kwargs):
# 复用基类流式封装(对单值直接 yield)
yield from super().stream(input, **kwargs)
def __repr__(self) -> str:
return "RunnablePassthrough()"
# 定义 RunnableSequence 类,用于实现可运行对象的链式组合(A | B | C 的效果)
class RunnableSequence(Runnable):
"""
Runnable 组合序列,用于支持 A | B | C 的链式拼接。
"""
# 初始化方法,接收一个 Runnable 对象的列表
def __init__(self, runnables: list[Runnable]):
# 检查传入的 runnables 列表不能为空
if not runnables:
raise ValueError("runnables 不能为空")
# 校验每一个元素都必须是 Runnable 实例
for r in runnables:
if not isinstance(r, Runnable):
raise TypeError("runnables 需全部为 Runnable 实例")
# 保存连成链的 runnable 组件
self.runnables = runnables
# 实现管道操作符 |,使链式拼接成立
def __or__(self, other):
# 右侧对象必须也是 Runnable 实例
if not isinstance(other, Runnable):
raise TypeError("管道右侧必须是 Runnable 实例")
# 返回新的组合链(原有链 + 新加的 runnable)
return RunnableSequence(self.runnables + [other])
# 调用链的同步调用,将输入依次传过所有组件
def invoke(self, input, **kwargs):
"""
逐个执行链条:上一步输出作为下一步输入。
"""
# 初始 value 为输入 input
value = input
# 依次调用每个 runnable 的 invoke,并传递最新的 value
for runnable in self.runnables:
value = runnable.invoke(value, **kwargs)
# 返回最后一步的输出值
return value
# 批量调用,输入为多个 input,结果为每个 input 执行完整链条的输出
def batch(self, inputs: list, **kwargs) -> list:
"""
对输入列表逐项执行同一条链。
"""
# 逐项调用 invoke,收集所有输出
return [self.invoke(item, **kwargs) for item in inputs]
# 流式调用,默认复用基类逻辑(只对链最终结果流式分发)
def stream(self, input, **kwargs):
"""
流式执行:沿用基类逻辑,对最终结果做流式分发。
"""
# 使用基类 stream
yield from super().stream(input, **kwargs)
# 定义字符串表示,便于调试,输出链路结构
def __repr__(self) -> str:
# 获取每个 runnable 的名字,用"|"拼接成描述
names = " | ".join(getattr(r, "name", r.__class__.__name__) for r in self.runnables)
# 返回自定义格式
return f"RunnableSequence({names})"
26.3 类 #
26.3.1 关键类表 #
| 类名 | 作用/特点 | 关键方法与说明 |
|---|---|---|
Runnable |
抽象基类,定义统一接口 | - invoke:抽象方法- batch:默认逐项调用 invoke- stream:默认将 invoke 结果按可迭代流式产出- __or__:管道拼接 |
RunnableLambda |
可将任意可调用对象包装为 runnable,具名、便于调试 | - invoke:直接调用底层函数- batch/stream:复用基类逻辑 |
RunnableBranch |
条件分支调度器:按顺序匹配 (condition, runnable),未命中时执行默认分支 |
- 构造要求:RunnableBranch((cond1, r1), (cond2, r2), default_runnable)(默认分支为最后一个参数)- invoke:顺序匹配- batch:逐项调用- stream:复用基类流式封装 |
26.3.2 类图 #

26.3.3 时序图 #
26.3.3.1 单次调用 invoke #
输入 3/-2/0,经分支选择:
26.3.3.2 批量调用 batch #
26.3.3.3 流式调用 stream #
(本例每次只产出单值)
26.3.4 调用过程 #
- 先用
RunnableLambda包装三个处理函数:pos、neg、zero。 - 构造分支:
RunnableBranch((v>0, pos), (v<0, neg), zero),默认分支放最后。 invoke(x):按顺序匹配条件,命中即调用对应 runnable,未命中则调用默认分支。batch(list_x):对列表逐项执行同样的分支逻辑,返回结果列表。stream(x):复用基类流式逻辑,对本例单值结果直接yield一次。
这样即可在示例中看到正/负/零的分支分发、批量处理与流式输出。
27.with_retry #
使用 with_retry 给 Runnable 增加自动重试能力。
有时候函数会因为偶发错误(如网络请求失败)抛出异常,但多试几次就能成功。通过 .with_retry(...),可以很方便地给任意 Runnable 加入“自动重试”逻辑:
retry_if_exception_type=(...)设定哪些异常类型会触发重试(比如网络错误等)。stop_after_attempt=N限定最大重试次数,避免无限重试。wait_exponential_jitter=True可选:开启指数等待和抖动,避免瞬时请求风暴。若关闭,则每次重试之间使用固定延迟(由exponential_jitter_params['initial']控制,默认 0.1 秒)。
当调用 .invoke(...) 或 .batch(...) 时,若遇到指定类型的异常,会自动等待片刻后重新执行,直到成功或达到最大尝试次数。
如果所有重试都失败,最终会抛出最后一次遇到的异常。
这一机制大幅简化了对不稳定任务(如调用外部 API、IO 操作等)的鲁棒调用编写流程,让高阶逻辑更专注于核心业务。
27.1. 27.with_retry.py #
27.with_retry.py
# 导入 RunnableLambda 类
from smartchain.runnables import RunnableLambda
# 定义一个用于计数调用次数的全局变量
call_count = 0
# 定义一个可能失败的函数(模拟不稳定操作,比如网络请求)
def unstable_func(x):
"""模拟不稳定的函数,前两次调用会失败"""
global call_count
# 每次调用自增计数器
call_count += 1
# 前两次调用抛出异常,模拟失败
if call_count < 3:
raise ValueError(f"第 {call_count} 次调用失败")
# 第三次及以后调用成功返回
return f"成功: {x}"
# 使用 RunnableLambda 将上面的函数封装为可运行对象
runnable = RunnableLambda(unstable_func)
# 给 runnable 增加重试功能,参数如下:
# - retry_if_exception_type: 只对 ValueError 异常进行重试
# - stop_after_attempt: 最多尝试3次
# - wait_exponential_jitter: 禁用指数退避与抖动
retry_runnable = runnable.with_retry(
retry_if_exception_type=(ValueError,),
stop_after_attempt=3,
wait_exponential_jitter=False,
)
# 调用带重试功能的 runnable(会自动重试直到成功)
print("调用带重试的 runnable:")
result = retry_runnable.invoke("测试")
# 输出最终返回的结果
print(f"结果: {result}")
# 输出总调用次数(用于验证重试确实执行了多次)
print(f"总调用次数: {call_count}")
# 重置计数器为0,准备演示批量调用场景
call_count = 0
print("\n批量调用(每个输入独立重试):")
# 使用带重试功能的 runnable 进行 batch 调用(每个输入分别重试)
results = retry_runnable.batch(["A", "B"])
# 输出批量结果
print(f"结果: {results}")
# 输出所有调用的总次数
print(f"总调用次数: {call_count}")27.2. runnables.py #
smartchain/runnables.py
# 导入抽象基类 (ABC: 抽象基类基类,abstractmethod: 用于定义抽象方法)
from abc import ABC, abstractmethod
+import time
+import random
# 定义 Runnable 抽象基类,所有可运行单元必须继承它
class Runnable(ABC):
"""
Runnable 抽象基类
所有可运行组件的基础接口,定义了统一的调用方法。
"""
# 抽象方法,子类必须实现,用于同步调用
@abstractmethod
def invoke(self, input, **kwargs):
"""
同步调用 Runnable
Args:
input: 输入值
**kwargs: 额外的关键字参数
Returns:
输出值
"""
pass # 仅做接口规范,子类务必实现
def stream(self, input, **kwargs):
"""
流式调用 Runnable
默认实现:先调用 invoke,若返回可迭代且不是字符串/字节/字典,则逐项 yield;
否则直接 yield 单值。
"""
result = self.invoke(input, **kwargs)
# 字符串/字节/字典不视为流式可迭代,直接返回单值
if hasattr(result, "__iter__") and not isinstance(result, (str, bytes, dict)):
for item in result:
yield item
else:
yield result
# 管道操作符,便于链式拼接
def __or__(self, other):
if not isinstance(other, Runnable):
raise TypeError("管道右侧必须是 Runnable 实例")
return RunnableSequence([self, other])
# 定义批量调用方法,默认实现为遍历输入逐个调用 invoke
+ def batch(self, inputs, **kwargs):
"""
批量调用 Runnable
Args:
inputs: 输入值列表
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 对每个输入项都调用 invoke,并收集结果
return [self.invoke(input_item, **kwargs) for input_item in inputs]
# 添加重试功能,返回包装了重试逻辑的 Runnable
# 定义 with_retry 方法,为当前 Runnable 添加重试机制
+ def with_retry(
+ self,
+ *,
+ retry_if_exception_type=(Exception,), # 指定需要重试的异常类型,默认所有 Exception
+ stop_after_attempt=3, # 最大尝试次数,默认3次
+ wait_exponential_jitter=True, # 是否启用指数退避抖动
+ exponential_jitter_params=None, # 抖动参数字典,支持 initial/max/exp_base/jitter
+ ):
+ """
+ 创建带重试功能的 Runnable 包装器
+ Args:
+ retry_if_exception_type: 需要重试的异常类型元组
+ stop_after_attempt: 最大尝试次数
+ wait_exponential_jitter: 是否启用指数回退抖动
+ exponential_jitter_params: 抖动参数,支持 initial/max/exp_base/jitter
+ Returns:
+ 包装了重试逻辑的 RunnableRetry 实例
+ """
# 返回带重试功能的 RunnableRetry 实例,绑定当前 runnable 和重试参数
+ return RunnableRetry(
+ bound=self,
+ retry_if_exception_type=retry_if_exception_type,
+ stop_after_attempt=stop_after_attempt,
+ wait_exponential_jitter=wait_exponential_jitter,
+ exponential_jitter_params=exponential_jitter_params,
+ )
# 定义 RunnableLambda 类,用于将普通 Python 函数封装为 Runnable 对象
class RunnableLambda(Runnable):
"""
RunnableLambda 将普通 Python 函数包装成 Runnable
这使得普通函数可以在链式调用中使用,并支持统一的 invoke 接口。
示例:
``python
def add_one(x: int) -> int:
return x + 1
runnable = RunnableLambda(add_one)
result = runnable.invoke(5) # 返回 6
results = runnable.batch([1, 2, 3]) # 返回 [2, 3, 4]
``
"""
# 初始化方法,接收一个函数和可选的名称
+ def __init__(self, func, name=None):
"""
初始化 RunnableLambda
Args:
func: 要包装的函数
name: Runnable 的名称(可选,默认使用函数名)
"""
# 检查传入的 func 是否为可调用对象
if not callable(func):
raise TypeError(f"func 必须是可调用对象,但得到了 {type(func)}")
# 保存待封装的函数
self.func = func
# 如果 name 明确传入则使用,否则合理推断
if name is not None:
self.name = name
else:
try:
# 尽量用函数原名,如果是 lambda 就命名为 "lambda"
self.name = func.__name__ if func.__name__ != "<lambda>" else "lambda"
except AttributeError:
# 对于匿名对象无法获取 __name__ 时兜底
self.name = "runnable"
# 实现 invoke,同步调用底层函数
def invoke(self, input, **kwargs):
"""
调用包装的函数
Args:
input: 输入值
**kwargs: 额外的关键字参数(会传递给函数)
Returns:
函数的返回值
"""
return self.func(input, **kwargs)
# 批量调用内部依然调用 invoke,保证与 Runnable 基类一致
+ def batch(self, inputs, **kwargs):
"""
批量调用包装的函数
Args:
inputs: 输入值列表
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 调用 invoke 实现批量处理
return [self.invoke(input_item, **kwargs) for input_item in inputs]
# 流式调用:直接复用基类的流式封装
def stream(self, input, **kwargs):
"""
流式调用包装的函数
对返回生成器/可迭代对象逐项 yield;若是单值则直接 yield。
"""
yield from super().stream(input, **kwargs)
# 返回对象自身的字符串表达,便于排查与日志
+ def __repr__(self):
"""返回 RunnableLambda 的字符串表示"""
return f"RunnableLambda(func={self.name})"
# 定义 RunnableParallel,继承自 Runnable
class RunnableParallel(Runnable):
"""
并行执行多个 Runnable,返回字典结果。
使用示例:
parallel = RunnableParallel(a=r1, b=r2)
result = parallel.invoke(input) # {"a": ..., "b": ...}
"""
# 构造方法,接收若干个可运行对象作为关键字参数
+ def __init__(self, **runnables):
# 如果未传递任何 runnable,则报错
if not runnables:
raise ValueError("至少需要一个 runnable")
# 检查每个传入的值是否为 Runnable 实例
for name, r in runnables.items():
if not isinstance(r, Runnable):
raise TypeError(f"键 {name} 的值必须是 Runnable 实例")
# 保存所有传入的 runnable 到实例属性
self.runnables = runnables
# 同步调用,将相同输入传递给所有子 runnable,并收集结果为字典
def invoke(self, input, **kwargs):
"""
同一输入传给所有子 runnable,收集结果为字典。
"""
# 遍历每个 runnable,调用其 invoke,结果收集为 {name: 返回值}
return {name: r.invoke(input, **kwargs) for name, r in self.runnables.items()}
# 批量调用,对输入列表每一项都运行 invoke,返回结果字典的列表
+ def batch(self, inputs, **kwargs):
"""
对输入列表逐项并行处理,返回字典列表。
"""
# 对每个输入元素调用 invoke,收集所有结果
return [self.invoke(item, **kwargs) for item in inputs]
# 流式调用,直接调用父类的流式实现
def stream(self, input, **kwargs):
"""
对单次输入执行并返回一个字典,流式单次产出。
"""
# 复用基类的 stream 方法
yield from super().stream(input, **kwargs)
# 返回对象的字符串表示(列出包含的所有子 runnable 的键名)
+ def __repr__(self):
# 拼接所有 runnable 的键名
keys = ", ".join(self.runnables.keys())
# 返回格式化字符串
return f"RunnableParallel({keys})"
# 定义RunnableBranch类,继承自Runnable,用于条件分支执行不同runnable
class RunnableBranch(Runnable):
"""
条件分支执行:按顺序检查条件,匹配则运行对应 runnable,若都不匹配则走默认分支。
"""
# 构造方法,接受若干分支参数
+ def __init__(self, *branches):
"""
支持“默认分支作为最后一个位置参数”的用法:
RunnableBranch((cond1, r1), (cond2, r2), default_runnable)
"""
# 分支数量必须至少2(至少一个条件+一个默认)
if len(branches) < 2:
raise ValueError("至少需要一个条件分支和一个默认分支")
# 将分支参数转为列表
branches_list = list(branches)
# 最后一个参数视为默认分支
default = branches_list.pop() # 最后一个位置参数为默认分支
# 校验每个分支
validated_branches = []
for item in branches_list:
# 每个分支需为二元组或二元列表
if not (isinstance(item, (tuple, list)) and len(item) == 2):
raise TypeError("分支必须是 (condition, runnable) 形式的二元组")
# 解包条件函数和runnable
cond, runnable = item
# 条件必须为可调用对象
if not callable(cond):
raise TypeError("分支条件必须是可调用对象")
# runnable必须是Runnable实例
if not isinstance(runnable, Runnable):
raise TypeError("分支 runnable 必须是 Runnable 实例")
# 校验通过则加入分支列表
validated_branches.append((cond, runnable))
# 校验默认分支必须为Runnable实例
if not isinstance(default, Runnable):
raise TypeError("默认分支必须是 Runnable 实例")
# 保存所有条件分支
self.branches = validated_branches
# 保存默认分支
self.default = default
# 单个输入同步调用方法
def invoke(self, input, **kwargs):
"""
按顺序匹配条件,命中即执行对应 runnable;否则走默认分支。
"""
# 遍历所有分支,遇到条件命中则执行对应runnable
for cond, runnable in self.branches:
if cond(input):
return runnable.invoke(input, **kwargs)
# 如果有默认分支则执行默认runnable
if self.default is not None:
return self.default.invoke(input, **kwargs)
# 无匹配分支时报错
raise ValueError("未匹配到任何分支,且未提供默认分支")
# 批量调用,遍历输入批量执行invoke
+ def batch(self, inputs, **kwargs):
# 对输入列表逐一执行invoke
return [self.invoke(item, **kwargs) for item in inputs]
# 流式调用,直接调用父类的stream方法
def stream(self, input, **kwargs):
# 复用父类的流式实现
yield from super().stream(input, **kwargs)
# 返回对象简洁字符串表示
+ def __repr__(self):
# 拼接分支编号
parts = [f"branch{idx}" for idx, _ in enumerate(self.branches)]
# 若有默认分支则拼接default字符串
if self.default:
parts.append("default")
# 格式化输出
return f"RunnableBranch({', '.join(parts)})"
class RunnablePassthrough(Runnable):
"""
直通型 Runnable:原样返回输入,不做任何处理。
可用于调试或需要保留原始输入的场景。
"""
def invoke(self, input, **kwargs):
return input
+ def batch(self, inputs, **kwargs):
+ return list(inputs)
def stream(self, input, **kwargs):
# 复用基类流式封装(对单值直接 yield)
yield from super().stream(input, **kwargs)
+ def __repr__(self):
return "RunnablePassthrough()"
# 定义 RunnableSequence 类,用于实现可运行对象的链式组合(A | B | C 的效果)
class RunnableSequence(Runnable):
"""
Runnable 组合序列,用于支持 A | B | C 的链式拼接。
"""
# 初始化方法,接收一个 Runnable 对象的列表
+ def __init__(self, runnables):
# 检查传入的 runnables 列表不能为空
if not runnables:
raise ValueError("runnables 不能为空")
# 校验每一个元素都必须是 Runnable 实例
for r in runnables:
if not isinstance(r, Runnable):
raise TypeError("runnables 需全部为 Runnable 实例")
# 保存连成链的 runnable 组件
self.runnables = runnables
# 实现管道操作符 |,使链式拼接成立
def __or__(self, other):
# 右侧对象必须也是 Runnable 实例
if not isinstance(other, Runnable):
raise TypeError("管道右侧必须是 Runnable 实例")
# 返回新的组合链(原有链 + 新加的 runnable)
return RunnableSequence(self.runnables + [other])
# 调用链的同步调用,将输入依次传过所有组件
def invoke(self, input, **kwargs):
"""
逐个执行链条:上一步输出作为下一步输入。
"""
# 初始 value 为输入 input
value = input
# 依次调用每个 runnable 的 invoke,并传递最新的 value
for runnable in self.runnables:
value = runnable.invoke(value, **kwargs)
# 返回最后一步的输出值
return value
# 批量调用,输入为多个 input,结果为每个 input 执行完整链条的输出
+ def batch(self, inputs, **kwargs):
"""
对输入列表逐项执行同一条链。
"""
# 逐项调用 invoke,收集所有输出
return [self.invoke(item, **kwargs) for item in inputs]
# 流式调用,默认复用基类逻辑(只对链最终结果流式分发)
def stream(self, input, **kwargs):
"""
流式执行:沿用基类逻辑,对最终结果做流式分发。
"""
# 使用基类 stream
yield from super().stream(input, **kwargs)
# 定义字符串表示,便于调试,输出链路结构
+ def __repr__(self):
# 获取每个 runnable 的名字,用"|"拼接成描述
names = " | ".join(getattr(r, "name", r.__class__.__name__) for r in self.runnables)
# 返回自定义格式
return f"RunnableSequence({names})"
# 定义 RunnableRetry 类,用于包装 Runnable 并添加重试逻辑
+class RunnableRetry(Runnable):
+ """
+ 带重试功能的 Runnable 包装器
+ 当底层 runnable 抛出指定异常时,会自动重试指定次数。
+ """
# 初始化方法,接受被包装的 runnable 以及重试参数
+ def __init__(
+ self,
+ bound,
+ retry_if_exception_type=(Exception,),
+ stop_after_attempt=3,
+ wait_exponential_jitter=True,
+ exponential_jitter_params=None,
+ ):
+ """
+ 初始化 RunnableRetry
+ Args:
+ bound: 被包装的 Runnable 对象
+ retry_if_exception_type: 需要重试的异常类型元组
+ stop_after_attempt: 最大尝试次数
+ wait_exponential_jitter: 是否启用指数回退抖动
+ exponential_jitter_params: 抖动参数 initial/max/exp_base/jitter
+ """
# 保存底层被包装的 Runnable
+ self.bound = bound
# 保存需要重试的异常类型
+ self.retry_if_exception_type = retry_if_exception_type
# 保存最大尝试次数
+ self.stop_after_attempt = stop_after_attempt
# 保存是否启用指数回退抖动
+ self.wait_exponential_jitter = wait_exponential_jitter
# 保存指数回退相关参数(若为 None 则用空字典兜底)
+ self.exponential_jitter_params = exponential_jitter_params or {}
# 实现同步调用(自动重试机制)
+ def invoke(self, input, **kwargs):
+ """
+ 调用底层 runnable,失败时自动重试
+ """
# 用于记录最后一次抛出的异常
+ last_exception = None
# 解析重试等待的各项参数
+ initial = self.exponential_jitter_params.get("initial", 0.1) # 初始延迟
+ max_wait = self.exponential_jitter_params.get("max", 10.0) # 最大延迟
+ exp_base = self.exponential_jitter_params.get("exp_base", 2.0) # 幂指数基数
+ jitter = self.exponential_jitter_params.get("jitter", 0.0) # 抖动范围
# 尝试多次调用,直到最大次数
+ for attempt in range(1, self.stop_after_attempt + 1):
+ try:
# 调用底层的 invoke 方法
+ return self.bound.invoke(input, **kwargs)
# 捕获需要重试的异常类型
+ except self.retry_if_exception_type as e:
# 保存本次捕获的异常
+ last_exception = e
# 若还没到最大次数,可以重试
+ if attempt < self.stop_after_attempt:
# 判断是否使用指数回退
+ if self.wait_exponential_jitter:
# 计算当前次的延迟
+ delay = min(max_wait, initial * (exp_base ** (attempt - 1)))
# 如果配置了 jitter,叠加一个随机抖动
+ if jitter > 0:
+ delay += random.uniform(0, jitter)
+ else:
# 不指数回退则用 initial 固定延迟
+ delay = initial
# 等待指定时间再重试
+ time.sleep(delay)
+ else:
# 达到最大次数仍然失败则抛出最后一次异常
+ raise last_exception
+ except Exception:
# 如果是完全不在重试范围的异常,直接抛出
+ raise
# 如果所有尝试都失败,最终抛出异常
+ raise last_exception
# 实现批量调用,每个输入独立重试
+ def batch(self, inputs, **kwargs):
+ """
+ 批量调用,每个输入独立重试
+ """
# 对每个输入都单独执行 invoke,收集结果为列表
+ return [self.invoke(item, **kwargs) for item in inputs]
# 实现流式调用,直接复用基类逻辑
+ def stream(self, input, **kwargs):
+ """
+ 流式调用,复用基类实现
+ """
# 使用父类的 stream,yield 结果
+ yield from super().stream(input, **kwargs)
# 返回自身字符串表示,便于调试查看 retry 配置与绑定对象
+ def __repr__(self):
+ return f"RunnableRetry(bound={self.bound}, max_attempts={self.stop_after_attempt})"27.3. 类 #
27.3.1 关键类 #
| 类名 | 职责 | 主要方法/属性 | 在文件中的用途 |
|---|---|---|---|
| Runnable | 抽象基类,定义可运行单元的统一接口 | invoke() 抽象方法batch() 批量调用stream() 流式调用with_retry() 创建重试包装器 |
所有 runnable 的基类,提供 with_retry 方法 |
| RunnableLambda | 将普通 Python 函数包装成 Runnable 对象 | func 属性(被包装的函数)name 属性(名称)invoke() 调用底层函数batch() 批量处理 |
将 unstable_func 函数包装成 runnable |
| RunnableRetry | 带重试功能的 Runnable 包装器 | bound 属性(被包装的 Runnable)retry_if_exception_type 属性(重试异常类型)stop_after_attempt 属性(最大尝试次数)invoke() 带重试逻辑的调用batch() 批量重试 |
包装 RunnableLambda,添加自动重试功能 |
27.3.2 类图 #

27.3.3 时序图 #
27.3.3.1 单次调用(invoke)带重试 #
演示 retry_runnable.invoke("测试") 的执行流程(前两次失败,第三次成功):
27.3.3.2 批量调用(batch)带重试 #
演示 retry_runnable.batch(["A", "B"]) 的执行流程:
27.3.4 调用过程 #
对象创建阶段
# 步骤1:将函数包装成 RunnableLambda
runnable = RunnableLambda(unstable_func)创建时:
RunnableLambda.__init__()接收unstable_func函数- 保存
self.func = unstable_func - 设置
self.name = "unstable_func"
# 步骤2:创建带重试功能的包装器
retry_runnable = runnable.with_retry(
retry_if_exception_type=(ValueError,),
stop_after_attempt=3,
wait_exponential_jitter=False,
)创建时:
Runnable.with_retry()被调用(所有 Runnable 都有此方法)- 创建并返回
RunnableRetry实例 RunnableRetry.__init__()保存:self.bound = runnable(被包装的 RunnableLambda)self.retry_if_exception_type = (ValueError,)self.stop_after_attempt = 3self.wait_exponential_jitter = False
2. invoke 调用过程(带重试)
result = retry_runnable.invoke("测试")执行流程:
RunnableRetry.invoke("测试")接收输入- 进入重试循环(最多 3 次):
- 第 1 次尝试:
self.bound.invoke("测试")→RunnableLambda.invoke("测试")RunnableLambda调用unstable_func("测试")call_count = 1,抛出ValueError("第 1 次调用失败")RunnableRetry捕获ValueError,判断可重试- 计算延迟(
wait_exponential_jitter=False,使用initial=0.1) time.sleep(0.1)等待
- 第 2 次尝试:
- 再次调用
self.bound.invoke("测试") call_count = 2,仍抛出ValueError("第 2 次调用失败")- 捕获异常,等待延迟
- 再次调用
- 第 3 次尝试:
- 再次调用
self.bound.invoke("测试") call_count = 3,条件不满足,返回"成功: 测试"- 返回结果,退出循环
- 再次调用
- 第 1 次尝试:
- 返回最终结果:
"成功: 测试"
3. batch 调用过程(每个输入独立重试)
results = retry_runnable.batch(["A", "B"])执行流程:
RunnableRetry.batch(["A", "B"])接收输入列表- 对每个输入调用
invoke:invoke("A"):独立重试逻辑(前两次失败,第三次成功)invoke("B"):独立重试逻辑(前两次失败,第三次成功)
- 收集所有结果:
["成功: A", "成功: B"] - 返回结果列表
注意:每个输入的重试是独立的,call_count 是全局变量,所以会累加。
核心设计要点
- 装饰器模式:
RunnableRetry包装其他Runnable,不修改原对象 - 异常捕获与重试:只重试指定异常类型,其他异常直接抛出
- 延迟策略:支持指数退避抖动或固定延迟
- 批量独立重试:
batch中每个输入独立重试,互不影响
28.config #
无论是单个组件还是链式/批量/重试等高级用法,所有 Runnable 对象都支持传递统一的配置 dict(称为 config)。通过配合 config,可以灵活实现:
- 日志与标签(tags、run_name、metadata)
- 回调机制(callbacks)
- 并发/递归层数限制(max_concurrency, recursion_limit)
- 运行身份或唯一标识(run_id)
- 传递模型配置项(configurable)
核心点总结如下:
config 字段统一传递
每个Runnable的invoke/batch/stream方法都接受可选的config参数,类型为普通 dict,各字段含义见下表。系统各处传递的是 config dict 的副本,用户可灵活增删字段。字段说明
| 字段名 | 类型 | 说明 |
|---|---|---|
| tags | list[str] | 标签,可帮助标记/分组任务 |
| metadata | dict[str, Any] | 附加自定义元数据 |
| callbacks | 回调对象或列表 | 步骤级或整体运行时的回调处理器 |
| run_name | str | 运行情况的自定义名称 |
| max_concurrency | int/None | 并发批处理上限 |
| recursion_limit | int | 单次最大递归层数 |
| configurable | dict[str, Any] | 可配置参数(如模型名、温度等) |
| run_id | uuid.UUID/None | 全局唯一运行标识符 |
配置的合并与优先级
多级 config 合并时采用「后传入优先」原则:- 绑定 Runnable 的 config 优先于全局默认
- 调用时传入的 config 覆盖绑定层的 config
- 字典整体 shallow merge
灵活传递与用法举例
- 为每一步骤打 tag 并记录 user_id:
chain = runnable1.with_config({"tags": ["step1"], "metadata": {"user": "A"}}) | runnable2.with_config({"tags": ["step2"]}) result = chain.invoke("abc", config={"run_name": "样例运行"}) - 配合回调对象收集日志:
config = { "callbacks": [MyLoggerHandler()], "metadata": {"experiment": "dev01"} } chain.invoke("hello", config=config) - 动态修改模型配置:
config = { "configurable": {"temperature": 0.7, "model": "gpt-4"} } chain.invoke("prompt", config=config)
- 为每一步骤打 tag 并记录 user_id:
推荐约定
- config dict 的 key 建议按上表采用标准名
- 不会强制要求字段齐备,剩余字段可以被下游自定义组件消费
28.1. 28.config.py #
28.config.py
# 导入 RunnableLambda(用于将函数包装为可链式调用的 runnable)
#from langchain_core.runnables import RunnableLambda
# 导入 RunnableConfig(runnable 的配置类型)
#from langchain_core.runnables.config import RunnableConfig
# 导入回调处理器基类 BaseCallbackHandler
#from langchain_core.callbacks.stdout import BaseCallbackHandler
from smartchain.callbacks import BaseCallbackHandler
from smartchain.runnables import RunnableLambda
# 导入 uuid 库,用于生成唯一标识符
import uuid
# 定义将文本转为大写的函数
def to_upper(text, config={}):
"""将文本转为大写"""
# 输出当前步骤及输入值
print(f"[步骤1 - to_upper] 输入: {text}")
# 打印 config 的 tags 参数
print(f" tags: {config.get('tags')}")
# 打印 config 的 metadata 参数
print(f" metadata: {config.get('metadata')}")
# 打印 config 的 max_concurrency 参数
print(f" max_concurrency: {config.get('max_concurrency')}")
# 打印 config 的 recursion_limit 参数
print(f" recursion_limit: {config.get('recursion_limit')}")
# 打印 config 的 callbacks 参数
print(f" callbacks: {config.get('callbacks')}")
# 打印 config 的 configurable 参数
print(f" configurable: {config.get('configurable')}")
# 将输入文本转为大写
result = text.upper()
# 输出转换后的结果
print(f"[步骤1 - to_upper] 输出: {result}")
# 返回大写后的结果
return result
# 定义第二个处理函数:为文本添加前缀
def add_prefix(text, config={}):
"""为文本添加前缀"""
# 打印 config 的 tags 参数
print(f" tags: {config.get('tags')}")
# 打印 config 的 metadata 参数
print(f" metadata: {config.get('metadata')}")
# 打印 config 的 max_concurrency 参数
print(f" max_concurrency: {config.get('max_concurrency')}")
# 打印 config 的 recursion_limit 参数
print(f" recursion_limit: {config.get('recursion_limit')}")
# 打印 config 的 callbacks 参数
print(f" callbacks: {config.get('callbacks')}")
# 打印 config 的 configurable 参数
print(f" configurable: {config.get('configurable')}")
# 添加前缀字符串
result = f"结果: {text}"
# 输出添加前缀后的结果
print(f"[步骤2 - add_prefix] 输出: {result}")
# 返回结果
return result
# 用 RunnableLambda 对两个处理函数进行包装
runnable1 = RunnableLambda(to_upper)
runnable2 = RunnableLambda(add_prefix)
# 用管道操作符 | 链接两个 runnable,形成处理链
chain = runnable1 | runnable2
# ===== 演示 1: run_name - 为运行命名(不继承) =====
# 设置运行名称
run_name = "文本处理任务"
# 构造配置字典,仅包含 run_name
config1 = {
"run_name": run_name
}
# 运行链式处理,并传递配置;输入字符串为 "hello"
result1 = chain.invoke("hello", config=config1)
# 打印处理后的最终结果
print(f"1. 最终结果: {result1}\n")
# ===== 演示 2: run_id - 唯一标识符 =====
# 创建一个唯一的 uuid 作为 run_id
run_id = uuid.uuid4()
# 配置字典包含 run_id 和 run_name
config2 = {
"run_id": run_id,
"run_name": "带ID的任务"
}
# 处理字符串 "world" 并传递带 run_id 的 config
result2 = chain.invoke("world", config=config2)
# 打印结果及 run_id
print(f"2. 最终结果: {result2}, run_id: {run_id}\n")
# ===== 演示 3: tags - 标签(会传递给子调用) =====
# 配置字典添加 tags 字段
config3 = {
"tags": ["production", "text-processing"],
"run_name": "带标签的任务"
}
# 传入 tags,测试标签的传递
result3 = chain.invoke("test", config=config3)
# 打印处理结果和标签
print(f"3. 最终结果: {result3}, tags: {config3['tags']}\n")
# ===== 演示 4: metadata - 元数据(会传递给子调用) =====
# 配置包含 metadata 信息
config4 = {
"metadata": {
"user_id": "12345",
"session_id": "abc-def-ghi",
"environment": "dev"
},
"run_name": "带元数据的任务"
}
# 传入 metadata,测试元数据的传递
result4 = chain.invoke("metadata", config=config4)
# 打印结果及元数据
print(f"4. 最终结果: {result4}, metadata: {config4['metadata']}\n")
# ===== 演示 5: max_concurrency - 最大并发数(用于 batch) =====
# 批量处理配置,设置最大并发为 2
config5 = {
"max_concurrency": 2, # 限制最多2个并发
"run_name": "批量处理任务"
}
# 待处理的输入列表
inputs = ["a", "b", "c", "d"]
# 批量处理输入,传入配置及列表
results5 = chain.batch(inputs, config=config5)
# 打印输入列表
print(f"输入: {inputs}")
# 打印 batch 结果和最大并发设置
print(f"5. 结果: {results5}, max_concurrency: {config5['max_concurrency']}\n")
# ===== 演示 6: recursion_limit - 递归限制 =====
# 配置递归限制为10
config6 = {
"recursion_limit": 10, # 设置递归限制为10
"run_name": "递归限制任务"
}
# 测试递归限制的传递
result6 = chain.invoke("recursion", config=config6)
# 打印结果和递归限制参数
print(f"6. 最终结果: {result6}, recursion_limit: {config6['recursion_limit']}\n")
# ===== 演示 7: callbacks - 回调函数(会传递给子调用) =====
# 定义自定义回调处理器,继承 BaseCallbackHandler
class MyCallbackHandler(BaseCallbackHandler):
# 链开始事件处理
def on_chain_start(self, serialized, inputs, **kwargs):
print(f"on_chain_start: {serialized}, {inputs}, {kwargs}")
# 链结束事件处理
def on_chain_end(self, outputs, **kwargs):
print(f"on_chain_end: {outputs}, {kwargs}")
# 链错误事件处理
def on_chain_error(self, error, **kwargs):
print(f"on_chain_error: {error}, {kwargs}")
# LLM 开始事件
def on_llm_start(self, serialized, prompts, **kwargs):
print(f"on_llm_start: {serialized}, {prompts}, {kwargs}")
# LLM 完成事件
def on_llm_end(self, response, **kwargs):
print(f"on_llm_end: {response}, {kwargs}")
# LLM 错误事件
def on_llm_error(self, error, **kwargs):
print(f"on_llm_error: {error}, {kwargs}")
# 实例化回调处理器
callback_handler = MyCallbackHandler()
# 配置中添加自定义回调处理器
config7 = {
"callbacks": [callback_handler],
"run_name": "带回调的任务"
}
# 传递带回调的配置测试
result7 = chain.invoke("callback", config=config7)
# 打印最终处理结果
print(f"7. 最终结果: {result7}\n")
# ===== 演示 8: configurable - 可配置属性 =====
# 配置字典带有 configurable 字段,模拟模型及参数
config8 = {
"configurable": {
"temperature": 0.7,
"model": "gpt-4"
},
"run_name": "可配置任务"
}
# 传递带 configurable 的配置
result8 = chain.invoke("config", config=config8)
# 打印结果及 configurable 参数
print(f"8. 最终结果: {result8}, configurable: {config8['configurable']}\n")
# ===== 演示 9: 组合使用多个配置项 =====
# 配置包含全部主要可配字段
config9 = {
"run_id": uuid.uuid4(),
"run_name": "组合配置示例",
"tags": ["combined"],
"metadata": {"version": "1.0"},
"max_concurrency": 3,
"recursion_limit": 20,
"configurable": {"model": "gpt-4"}
}
# 综合测试各种配置传递
result9 = chain.invoke("combined", config=config9)
# 打印最终结果
print(f"9. 最终结果: {result9}")
# 用管道操作符 | 链接两个 runnable,形成处理链
chain = runnable1.with_config({"metadata": {"user_id": "001"} }) | runnable2.with_config({"metadata": {"user_id": "002"}})
# 运行链式处理,并传递配置;输入字符串为 "hello"
result10 = chain.invoke("hello")
# 打印处理后的最终结果
print(f"10. 最终结果: {result10}\n")28.2. callbacks.py #
smartchain/callbacks.py
"""回调处理器基类"""
# 从 abc 模块导入抽象基类基类
from abc import ABC
# 定义回调处理器的基类,继承自抽象基类
class BaseCallbackHandler(ABC):
"""
回调处理器基类
用于在 Runnable 执行过程中接收各种事件通知。
子类可以重写感兴趣的方法来处理特定事件。
"""
# 链开始执行时会调用此方法
def on_chain_start(self, serialized, inputs, **kwargs):
"""
当链开始执行时调用
Args:
serialized: 序列化的链信息
inputs: 输入数据
**kwargs: 其他关键字参数
"""
# 默认实现为空,子类可重写
pass
# 链执行完成时会调用此方法
def on_chain_end(self, outputs, **kwargs):
"""
当链执行完成时调用
Args:
outputs: 输出数据
**kwargs: 其他关键字参数
"""
# 默认实现为空,子类可重写
pass
# 链执行出错时会调用此方法
def on_chain_error(self, error, **kwargs):
"""
当链执行出错时调用
Args:
error: 错误对象
**kwargs: 其他关键字参数
"""
# 默认实现为空,子类可重写
pass
# LLM 开始执行时会调用此方法
def on_llm_start(self, serialized, prompts, **kwargs):
"""
当 LLM 开始执行时调用
Args:
serialized: 序列化的 LLM 信息
prompts: 提示词列表
**kwargs: 其他关键字参数
"""
# 默认实现为空,子类可重写
pass
# LLM 执行完成时会调用此方法
def on_llm_end(self, response, **kwargs):
"""
当 LLM 执行完成时调用
Args:
response: LLM 的响应
**kwargs: 其他关键字参数
"""
# 默认实现为空,子类可重写
pass
# LLM 执行出错时会调用此方法
def on_llm_error(self, error, **kwargs):
"""
当 LLM 执行出错时调用
Args:
error: 错误对象
**kwargs: 其他关键字参数
"""
# 默认实现为空,子类可重写
pass
28.3. config.py #
smartchain/config.py
"""配置相关的类型定义和工具函数"""
# 导入 uuid 库,主要用于 run_id 的唯一标识
import uuid
# Callbacks = Any # 可以是 BaseCallbackHandler 或 Handler 列表
# RunnableConfig 是普通 dict,可包含如下可选字段:
# - tags: list[str] # 标签列表
# - metadata: dict[str, Any] # 元数据字典
# - callbacks: Callbacks # 回调对象或回调对象列表
# - run_name: str # 运行名称
# - max_concurrency: int | None # 批量时最大并发数
# - recursion_limit: int # 最大递归层数限制
# - configurable: dict[str, Any] # 可配置参数字典
# - run_id: uuid.UUID | None # 唯一运行 ID
# 默认的递归层数限制
DEFAULT_RECURSION_LIMIT = 25
# 工具函数:确保传入的 config 参数不是 None,并返回字典副本
def ensure_config(config=None):
"""
确保配置字典存在,如果为 None 则返回空字典。
Args:
config: 可选的配置字典
Returns:
配置字典(如果为 None 则返回空字典)
"""
# 如果未传入 config,则返回一个新的空字典
if config is None:
return {}
# 如果已经是 dict,则返回其副本,否则将其转为字典
return config.copy() if isinstance(config, dict) else dict(config)
def _accept_config(func):
try:
sig = inspect.signature(func)
return "config" in sig.parameters
except (ValueError, TypeError):
return False28.4. runnables.py #
smartchain/runnables.py
# 导入抽象基类 (ABC: 抽象基类基类,abstractmethod: 用于定义抽象方法)
from abc import ABC, abstractmethod
import time
import random
+import inspect
+import uuid as uuid_module
+from .config import ensure_config,_accept_config
# 定义 Runnable 抽象基类,所有可运行单元必须继承它
class Runnable(ABC):
"""
Runnable 抽象基类
所有可运行组件的基础接口,定义了统一的调用方法。
"""
# 抽象方法,子类必须实现,用于同步调用
@abstractmethod
+ def invoke(self, input, config = None, **kwargs):
"""
同步调用 Runnable
Args:
input: 输入值
+ config: 可选的配置字典
**kwargs: 额外的关键字参数
Returns:
输出值
"""
pass # 仅做接口规范,子类务必实现
+ def stream(self, input, config = None, **kwargs):
"""
流式调用 Runnable
默认实现:先调用 invoke,若返回可迭代且不是字符串/字节/字典,则逐项 yield;
否则直接 yield 单值。
"""
+ result = self.invoke(input, config=config, **kwargs)
# 字符串/字节/字典不视为流式可迭代,直接返回单值
if hasattr(result, "__iter__") and not isinstance(result, (str, bytes, dict)):
for item in result:
yield item
else:
yield result
# 管道操作符,便于链式拼接
def __or__(self, other):
if not isinstance(other, Runnable):
raise TypeError("管道右侧必须是 Runnable 实例")
return RunnableSequence([self, other])
# 定义批量调用方法,默认实现为遍历输入逐个调用 invoke
+ def batch(self, inputs, config = None, **kwargs):
"""
批量调用 Runnable
Args:
inputs: 输入值列表
+ config: 可选的配置字典
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 对每个输入项都调用 invoke,并收集结果
+ return [self.invoke(input_item, config=config, **kwargs) for input_item in inputs]
# 添加重试功能,返回包装了重试逻辑的 Runnable
# 定义 with_retry 方法,为当前 Runnable 添加重试机制
def with_retry(
self,
*,
retry_if_exception_type=(Exception,), # 指定需要重试的异常类型,默认所有 Exception
stop_after_attempt=3, # 最大尝试次数,默认3次
wait_exponential_jitter=True, # 是否启用指数退避抖动
exponential_jitter_params=None, # 抖动参数字典,支持 initial/max/exp_base/jitter
):
"""
创建带重试功能的 Runnable 包装器
Args:
retry_if_exception_type: 需要重试的异常类型元组
stop_after_attempt: 最大尝试次数
wait_exponential_jitter: 是否启用指数回退抖动
exponential_jitter_params: 抖动参数,支持 initial/max/exp_base/jitter
Returns:
包装了重试逻辑的 RunnableRetry 实例
"""
# 返回带重试功能的 RunnableRetry 实例,绑定当前 runnable 和重试参数
return RunnableRetry(
bound=self,
retry_if_exception_type=retry_if_exception_type,
stop_after_attempt=stop_after_attempt,
wait_exponential_jitter=wait_exponential_jitter,
exponential_jitter_params=exponential_jitter_params,
)
+ def with_config(self, config=None, **kwargs):
+ """
+ 绑定配置到 Runnable,返回一个新的 Runnable
+ Args:
+ config: 要绑定的配置字典
+ **kwargs: 额外的关键字参数,会合并到 config 中
+ Returns:
+ 一个新的 RunnableBinding 实例,包含绑定的配置
+ """
# 合并 config 和 kwargs
+ merged_config = {}
+ if config:
+ merged_config.update(config)
+ if kwargs:
+ merged_config.update(kwargs)
# 返回 RunnableBinding 实例
+ return RunnableBinding(bound=self, config=merged_config)
# 定义 RunnableLambda 类,用于将普通 Python 函数封装为 Runnable 对象
class RunnableLambda(Runnable):
"""
RunnableLambda 将普通 Python 函数包装成 Runnable
这使得普通函数可以在链式调用中使用,并支持统一的 invoke 接口。
示例:
``python
def add_one(x: int) -> int:
return x + 1
runnable = RunnableLambda(add_one)
result = runnable.invoke(5) # 返回 6
results = runnable.batch([1, 2, 3]) # 返回 [2, 3, 4]
``
"""
# 初始化方法,接收一个函数和可选的名称
def __init__(self, func, name=None):
"""
初始化 RunnableLambda
Args:
func: 要包装的函数
name: Runnable 的名称(可选,默认使用函数名)
"""
# 检查传入的 func 是否为可调用对象
if not callable(func):
raise TypeError(f"func 必须是可调用对象,但得到了 {type(func)}")
# 保存待封装的函数
self.func = func
# 如果 name 明确传入则使用,否则合理推断
if name is not None:
self.name = name
else:
try:
# 尽量用函数原名,如果是 lambda 就命名为 "lambda"
self.name = func.__name__ if func.__name__ != "<lambda>" else "lambda"
except AttributeError:
# 对于匿名对象无法获取 __name__ 时兜底
self.name = "runnable"
# 实现 invoke 方法,对被封装的底层函数进行同步调用
+ def invoke(self, input, config = None, **kwargs):
"""
调用包装的函数
Args:
input: 输入值
+ config: 可选的配置字典
**kwargs: 额外的关键字参数(会传递给函数)
Returns:
函数的返回值
"""
# 保证 config 不为 None,如为 None 则转为空字典
+ config = ensure_config(config)
# 从配置字典中获取回调对象 callbacks
+ callbacks = config.get("callbacks")
# 初始化回调对象列表
+ callback_list = []
# 获取当前调用的唯一 ID(run_id)
+ run_id = config.get("run_id")
# 如果没有传入 run_id,则自动生成一个新的 uuid
+ if run_id is None:
+ run_id = uuid_module.uuid4()
# 如果 callbacks 不为空
+ if callbacks:
# 如果 callbacks 已经是列表,则直接用,否则转为单元素列表
+ if isinstance(callbacks, list):
+ callback_list = callbacks
+ else:
+ callback_list = [callbacks]
# 构造序列化信息,用于回调上报链条标识
+ serialized = {"name": self.name, "type": "RunnableLambda"}
# 遍历每个回调对象,触发其 on_chain_start 方法
+ for callback in callback_list:
# 只有回调对象有 on_chain_start 属性才调用
+ if hasattr(callback, "on_chain_start"):
+ try:
# 调用回调的 on_chain_start 方法,传入相关参数
+ callback.on_chain_start(
+ serialized=serialized,
+ inputs={"input": input},
+ run_id=run_id,
+ parent_run_id=None,
+ tags=config.get("tags"),
+ metadata=config.get("metadata"),
+ **kwargs
+ )
+ except Exception:
# 回调过程中如出现异常则忽略,确保主流程不会终止
+ pass
# 检查被包装的函数是否接收 config 参数
+ if _accept_config(self.func):
# 如果接收 config,则将 config 传递下去
+ kwargs["config"] = config
# 尝试执行被包装的核心函数
+ try:
# 正常调用被包装的函数,将 input 作为第一个参数,kwargs作为关键字参数
+ output = self.func(input, **kwargs)
+ except Exception as e:
# 若捕获到异常,则对所有回调触发 on_chain_error 并继续抛出异常
+ if callback_list:
+ for callback in callback_list:
+ if hasattr(callback, "on_chain_error"):
+ try:
+ callback.on_chain_error(
+ error=e,
+ run_id=run_id,
+ parent_run_id=None,
+ **kwargs
+ )
+ except Exception:
# 回调异常不影响主异常继续抛出
+ pass
# 重新抛出主流程中的异常
+ raise
+ else:
# 如果没有异常执行,顺序触发所有回调的 on_chain_end 方法
+ if callback_list:
+ for callback in callback_list:
+ if hasattr(callback, "on_chain_end"):
+ try:
+ callback.on_chain_end(
+ outputs={"output": output},
+ run_id=run_id,
+ parent_run_id=None,
+ **kwargs
+ )
+ except Exception:
# 回调异常不影响主逻辑输出
+ pass
# 返回包装函数的输出结果
+ return output
# 批量调用内部依然调用 invoke,保证与 Runnable 基类一致
+ def batch(self, inputs, config = None, **kwargs):
"""
批量调用包装的函数
Args:
inputs: 输入值列表
+ config: 可选的配置字典
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 调用 invoke 实现批量处理
+ return [self.invoke(input_item, config=config, **kwargs) for input_item in inputs]
# 流式调用:直接复用基类的流式封装
def stream(self, input, **kwargs):
"""
流式调用包装的函数
对返回生成器/可迭代对象逐项 yield;若是单值则直接 yield。
"""
yield from super().stream(input, **kwargs)
# 返回对象自身的字符串表达,便于排查与日志
def __repr__(self):
"""返回 RunnableLambda 的字符串表示"""
return f"RunnableLambda(func={self.name})"
# 定义 RunnableParallel,继承自 Runnable
class RunnableParallel(Runnable):
"""
并行执行多个 Runnable,返回字典结果。
使用示例:
parallel = RunnableParallel(a=r1, b=r2)
result = parallel.invoke(input) # {"a": ..., "b": ...}
"""
# 构造方法,接收若干个可运行对象作为关键字参数
def __init__(self, **runnables):
# 如果未传递任何 runnable,则报错
if not runnables:
raise ValueError("至少需要一个 runnable")
# 检查每个传入的值是否为 Runnable 实例
for name, r in runnables.items():
if not isinstance(r, Runnable):
raise TypeError(f"键 {name} 的值必须是 Runnable 实例")
# 保存所有传入的 runnable 到实例属性
self.runnables = runnables
# 同步调用,将相同输入传递给所有子 runnable,并收集结果为字典
+ def invoke(self, input, config = None, **kwargs):
"""
同一输入传给所有子 runnable,收集结果为字典。
"""
# 遍历每个 runnable,调用其 invoke,结果收集为 {name: 返回值}
+ return {name: r.invoke(input, config=config, **kwargs) for name, r in self.runnables.items()}
# 批量调用,对输入列表每一项都运行 invoke,返回结果字典的列表
+ def batch(self, inputs, config = None, **kwargs):
"""
对输入列表逐项并行处理,返回字典列表。
"""
# 对每个输入元素调用 invoke,收集所有结果
+ return [self.invoke(item, config=config, **kwargs) for item in inputs]
# 流式调用,直接调用父类的流式实现
def stream(self, input, **kwargs):
"""
对单次输入执行并返回一个字典,流式单次产出。
"""
# 复用基类的 stream 方法
yield from super().stream(input, **kwargs)
# 返回对象的字符串表示(列出包含的所有子 runnable 的键名)
def __repr__(self):
# 拼接所有 runnable 的键名
keys = ", ".join(self.runnables.keys())
# 返回格式化字符串
return f"RunnableParallel({keys})"
# 定义RunnableBranch类,继承自Runnable,用于条件分支执行不同runnable
class RunnableBranch(Runnable):
"""
条件分支执行:按顺序检查条件,匹配则运行对应 runnable,若都不匹配则走默认分支。
"""
# 构造方法,接受若干分支参数
def __init__(self, *branches):
"""
支持“默认分支作为最后一个位置参数”的用法:
RunnableBranch((cond1, r1), (cond2, r2), default_runnable)
"""
# 分支数量必须至少2(至少一个条件+一个默认)
if len(branches) < 2:
raise ValueError("至少需要一个条件分支和一个默认分支")
# 将分支参数转为列表
branches_list = list(branches)
# 最后一个参数视为默认分支
default = branches_list.pop() # 最后一个位置参数为默认分支
# 校验每个分支
validated_branches = []
for item in branches_list:
# 每个分支需为二元组或二元列表
if not (isinstance(item, (tuple, list)) and len(item) == 2):
raise TypeError("分支必须是 (condition, runnable) 形式的二元组")
# 解包条件函数和runnable
cond, runnable = item
# 条件必须为可调用对象
if not callable(cond):
raise TypeError("分支条件必须是可调用对象")
# runnable必须是Runnable实例
if not isinstance(runnable, Runnable):
raise TypeError("分支 runnable 必须是 Runnable 实例")
# 校验通过则加入分支列表
validated_branches.append((cond, runnable))
# 校验默认分支必须为Runnable实例
if not isinstance(default, Runnable):
raise TypeError("默认分支必须是 Runnable 实例")
# 保存所有条件分支
self.branches = validated_branches
# 保存默认分支
self.default = default
# 单个输入同步调用方法
+ def invoke(self, input, config = None, **kwargs):
"""
按顺序匹配条件,命中即执行对应 runnable;否则走默认分支。
"""
# 遍历所有分支,遇到条件命中则执行对应runnable
for cond, runnable in self.branches:
if cond(input):
+ return runnable.invoke(input, config=config, **kwargs)
# 如果有默认分支则执行默认runnable
if self.default is not None:
+ return self.default.invoke(input, config=config, **kwargs)
# 无匹配分支时报错
raise ValueError("未匹配到任何分支,且未提供默认分支")
# 批量调用,遍历输入批量执行invoke
+ def batch(self, inputs, config = None, **kwargs):
# 对输入列表逐一执行invoke
+ return [self.invoke(item, config=config, **kwargs) for item in inputs]
# 流式调用,直接调用父类的stream方法
def stream(self, input, **kwargs):
# 复用父类的流式实现
yield from super().stream(input, **kwargs)
# 返回对象简洁字符串表示
def __repr__(self):
# 拼接分支编号
parts = [f"branch{idx}" for idx, _ in enumerate(self.branches)]
# 若有默认分支则拼接default字符串
if self.default:
parts.append("default")
# 格式化输出
return f"RunnableBranch({', '.join(parts)})"
class RunnablePassthrough(Runnable):
"""
直通型 Runnable:原样返回输入,不做任何处理。
可用于调试或需要保留原始输入的场景。
"""
+ def invoke(self, input, config = None, **kwargs):
return input
+ def batch(self, inputs, config = None, **kwargs):
return list(inputs)
def stream(self, input, **kwargs):
# 复用基类流式封装(对单值直接 yield)
yield from super().stream(input, **kwargs)
def __repr__(self):
return "RunnablePassthrough()"
# 定义 RunnableSequence 类,用于实现可运行对象的链式组合(A | B | C 的效果)
class RunnableSequence(Runnable):
"""
Runnable 组合序列,用于支持 A | B | C 的链式拼接。
"""
# 初始化方法,接收一个 Runnable 对象的列表
def __init__(self, runnables):
# 检查传入的 runnables 列表不能为空
if not runnables:
raise ValueError("runnables 不能为空")
# 校验每一个元素都必须是 Runnable 实例
for r in runnables:
if not isinstance(r, Runnable):
raise TypeError("runnables 需全部为 Runnable 实例")
# 保存连成链的 runnable 组件
self.runnables = runnables
# 实现管道操作符 |,使链式拼接成立
def __or__(self, other):
# 右侧对象必须也是 Runnable 实例
if not isinstance(other, Runnable):
raise TypeError("管道右侧必须是 Runnable 实例")
# 返回新的组合链(原有链 + 新加的 runnable)
return RunnableSequence(self.runnables + [other])
# 调用链的同步调用,将输入依次传过所有组件
+ def invoke(self, input, config = None, **kwargs):
"""
逐个执行链条:上一步输出作为下一步输入。
"""
# 确保 config 存在
+ config = ensure_config(config)
# 处理回调:如果有 callbacks,则触发链的开始回调
+ callbacks = config.get("callbacks")
# 初始化回调列表
+ callback_list = []
# 获取 run_id
+ run_id = config.get("run_id")
# 如果 run_id 为 None,则生成一个新的 uuid
+ if run_id is None:
+ run_id = uuid_module.uuid4()
# 如果 callbacks 不为空
+ if callbacks:
# 如果 callbacks 是列表,则直接赋值给 callback_list
+ if isinstance(callbacks, list):
+ callback_list = callbacks
# 如果 callbacks 不是列表,则转换为单元素列表
+ else:
+ callback_list = [callbacks]
# 序列化信息,用于回调上报链条标识
+ serialized = {"name": "RunnableSequence", "type": "chain"}
# 遍历每个回调对象,触发其 on_chain_start 方法
+ for callback in callback_list:
# 只有回调对象有 on_chain_start 属性才调用
+ if hasattr(callback, "on_chain_start"):
# 调用回调的 on_chain_start 方法,传入相关参数
+ try:
# 调用回调的 on_chain_start 方法,传入相关参数
+ callback.on_chain_start(serialized, {"input": input}, run_id=run_id, parent_run_id=None, tags=config.get("tags"), metadata=config.get("metadata"), **kwargs)
+ except Exception:
# 回调过程中如出现异常则忽略,确保主流程不会终止
+ pass
# 初始 value 为输入 input
value = input
+ try:
# 依次调用每个 runnable 的 invoke,并传递最新的 value
+ for runnable in self.runnables:
+ value = runnable.invoke(value, config=config, **kwargs)
+ except Exception as e:
# 若捕获到异常,则对所有回调触发 on_chain_error 并继续抛出异常
+ if callback_list:
+ for callback in callback_list:
# 只有回调对象有 on_chain_error 属性才调用
+ if hasattr(callback, "on_chain_error"):
+ try:
# 调用回调的 on_chain_error 方法,传入相关参数
+ callback.on_chain_error(e, run_id=run_id, parent_run_id=None, **kwargs)
+ except Exception:
# 回调过程中如出现异常则忽略,确保主流程不会终止
+ pass
+ raise
+ else:
# 如果没有异常执行,顺序触发所有回调的 on_chain_end 方法
+ if callback_list:
+ for callback in callback_list:
# 只有回调对象有 on_chain_end 属性才调用
+ if hasattr(callback, "on_chain_end"):
+ try:
# 调用回调的 on_chain_end 方法,传入相关参数
+ callback.on_chain_end(outputs={"output": value}, run_id=run_id, parent_run_id=None, **kwargs)
+ except Exception:
# 回调过程中如出现异常则忽略,确保主流程不会终止
+ pass
# 返回最后一步的输出值
return value
# 批量调用,输入为多个 input,结果为每个 input 执行完整链条的输出
+ def batch(self, inputs, config = None, **kwargs):
"""
对输入列表逐项执行同一条链。
"""
# 逐项调用 invoke,收集所有输出
+ return [self.invoke(item, config=config, **kwargs) for item in inputs]
# 流式调用,默认复用基类逻辑(只对链最终结果流式分发)
def stream(self, input, **kwargs):
"""
流式执行:沿用基类逻辑,对最终结果做流式分发。
"""
# 使用基类 stream
yield from super().stream(input, **kwargs)
# 定义字符串表示,便于调试,输出链路结构
def __repr__(self):
# 获取每个 runnable 的名字,用"|"拼接成描述
names = " | ".join(getattr(r, "name", r.__class__.__name__) for r in self.runnables)
# 返回自定义格式
return f"RunnableSequence({names})"
# 定义 RunnableRetry 类,用于包装 Runnable 并添加重试逻辑
class RunnableRetry(Runnable):
"""
带重试功能的 Runnable 包装器
当底层 runnable 抛出指定异常时,会自动重试指定次数。
"""
# 初始化方法,接受被包装的 runnable 以及重试参数
def __init__(
self,
bound,
retry_if_exception_type=(Exception,),
stop_after_attempt=3,
wait_exponential_jitter=True,
exponential_jitter_params=None,
):
"""
初始化 RunnableRetry
Args:
bound: 被包装的 Runnable 对象
retry_if_exception_type: 需要重试的异常类型元组
stop_after_attempt: 最大尝试次数
wait_exponential_jitter: 是否启用指数回退抖动
exponential_jitter_params: 抖动参数 initial/max/exp_base/jitter
"""
# 保存底层被包装的 Runnable
self.bound = bound
# 保存需要重试的异常类型
self.retry_if_exception_type = retry_if_exception_type
# 保存最大尝试次数
self.stop_after_attempt = stop_after_attempt
# 保存是否启用指数回退抖动
self.wait_exponential_jitter = wait_exponential_jitter
# 保存指数回退相关参数(若为 None 则用空字典兜底)
self.exponential_jitter_params = exponential_jitter_params or {}
# 实现同步调用(自动重试机制)
+ def invoke(self, input, config = None, **kwargs):
"""
调用底层 runnable,失败时自动重试
"""
# 用于记录最后一次抛出的异常
last_exception = None
# 解析重试等待的各项参数
initial = self.exponential_jitter_params.get("initial", 0.1) # 初始延迟
max_wait = self.exponential_jitter_params.get("max", 10.0) # 最大延迟
exp_base = self.exponential_jitter_params.get("exp_base", 2.0) # 幂指数基数
jitter = self.exponential_jitter_params.get("jitter", 0.0) # 抖动范围
# 尝试多次调用,直到最大次数
for attempt in range(1, self.stop_after_attempt + 1):
try:
# 调用底层的 invoke 方法
+ return self.bound.invoke(input, config=config, **kwargs)
# 捕获需要重试的异常类型
except self.retry_if_exception_type as e:
# 保存本次捕获的异常
last_exception = e
# 若还没到最大次数,可以重试
if attempt < self.stop_after_attempt:
# 判断是否使用指数回退
if self.wait_exponential_jitter:
# 计算当前次的延迟
delay = min(max_wait, initial * (exp_base ** (attempt - 1)))
# 如果配置了 jitter,叠加一个随机抖动
if jitter > 0:
delay += random.uniform(0, jitter)
else:
# 不指数回退则用 initial 固定延迟
delay = initial
# 等待指定时间再重试
time.sleep(delay)
else:
# 达到最大次数仍然失败则抛出最后一次异常
raise last_exception
except Exception:
# 如果是完全不在重试范围的异常,直接抛出
raise
# 如果所有尝试都失败,最终抛出异常
raise last_exception
# 实现批量调用,每个输入独立重试
+ def batch(self, inputs, config = None, **kwargs):
"""
批量调用,每个输入独立重试
"""
# 对每个输入都单独执行 invoke,收集结果为列表
+ return [self.invoke(item, config=config, **kwargs) for item in inputs]
# 实现流式调用,直接复用基类逻辑
def stream(self, input, **kwargs):
"""
流式调用,复用基类实现
"""
# 使用父类的 stream,yield 结果
yield from super().stream(input, **kwargs)
# 返回自身字符串表示,便于调试查看 retry 配置与绑定对象
def __repr__(self):
return f"RunnableRetry(bound={self.bound}, max_attempts={self.stop_after_attempt})"
# 工具函数:合并配置字典
+def _merge_configs(*configs):
+ """
+ 合并多个配置字典
+ Args:
+ *configs: 要合并的配置字典列表
+ Returns:
+ 合并后的配置字典
+ """
+ result = {}
+ for config in configs:
+ if config:
# 对于嵌套字典(如 metadata),需要深度合并
+ for key, value in config.items():
+ if key in result and isinstance(result[key], dict) and isinstance(value, dict):
+ result[key] = {**result[key], **value}
+ else:
+ result[key] = value
+ return result
# 定义 RunnableBinding 类,用于包装 Runnable 并绑定配置
+class RunnableBinding(Runnable):
+ """
+ Runnable 绑定包装器
+ 用于将配置绑定到 Runnable,返回一个新的 Runnable 实例。
+ 当调用绑定的 Runnable 时,会自动合并绑定的配置和传入的配置。
+ """
+ def __init__(self, bound, config=None, kwargs=None):
+ """
+ 初始化 RunnableBinding
+ Args:
+ bound: 要绑定的底层 Runnable 实例
+ config: 要绑定的配置字典
+ kwargs: 要绑定的额外关键字参数(暂未使用)
+ """
+ if not isinstance(bound, Runnable):
+ raise TypeError("bound 必须是 Runnable 实例")
+ self.bound = bound
+ self.config = ensure_config(config) or {}
+ self.kwargs = kwargs or {}
+ def invoke(self, input, config=None, **kwargs):
+ """
+ 调用绑定的 Runnable,合并配置
+ Args:
+ input: 输入值
+ config: 可选的配置字典,会与绑定的配置合并
+ **kwargs: 额外的关键字参数
+ Returns:
+ 底层 Runnable 的返回值
+ """
# 合并绑定的配置和传入的配置
+ merged_config = _merge_configs(self.config, config)
# 合并关键字参数
+ merged_kwargs = {**self.kwargs, **kwargs}
# 调用底层 Runnable
+ return self.bound.invoke(input, config=merged_config, **merged_kwargs)
+ def batch(self, inputs, config=None, **kwargs):
+ """
+ 批量调用绑定的 Runnable,合并配置
+ Args:
+ inputs: 输入值列表
+ config: 可选的配置字典,会与绑定的配置合并
+ **kwargs: 额外的关键字参数
+ Returns:
+ 输出值列表
+ """
# 合并绑定的配置和传入的配置
+ merged_config = _merge_configs(self.config, config)
# 合并关键字参数
+ merged_kwargs = {**self.kwargs, **kwargs}
# 调用底层 Runnable
+ return self.bound.batch(inputs, config=merged_config, **merged_kwargs)
+ def stream(self, input, config=None, **kwargs):
+ """
+ 流式调用绑定的 Runnable,合并配置
+ Args:
+ input: 输入值
+ config: 可选的配置字典,会与绑定的配置合并
+ **kwargs: 额外的关键字参数
+ Yields:
+ 底层 Runnable 的流式输出
+ """
# 合并绑定的配置和传入的配置
+ merged_config = _merge_configs(self.config, config)
# 合并关键字参数
+ merged_kwargs = {**self.kwargs, **kwargs}
# 调用底层 Runnable
+ yield from self.bound.stream(input, config=merged_config, **merged_kwargs)
+ def __repr__(self):
+ """返回对象的字符串表示"""
+ return f"RunnableBinding(bound={self.bound}, config={self.config})"
28.5 类 #
28.5.1 类说明 #
| 类名 | 作用 | 主要方法 | 使用 |
|---|---|---|---|
| Runnable | 抽象基类,定义可运行单元的统一接口 | invoke(), batch(), stream(), with_config(), __or__() |
作为所有 Runnable 类的基类 |
| RunnableLambda | 将普通 Python 函数包装为 Runnable | invoke(), batch(), stream() |
runnable1 = RunnableLambda(to_upper), runnable2 = RunnableLambda(add_prefix) |
| RunnableSequence | 实现链式组合,支持 操作符 | invoke(), batch(), stream(), __or__() |
chain = runnable1 runnable2 |
| RunnableBinding | 包装 Runnable 并绑定配置 | invoke(), batch(), stream() |
runnable1.with_config({...}) 返回此实例 |
| BaseCallbackHandler | 回调处理器基类 | on_chain_start(), on_chain_end(), on_chain_error() |
MyCallbackHandler 继承此类 |
| ensure_config | 工具函数,确保 config 不为 None | - | 在 RunnableLambda.invoke() 和 RunnableSequence.invoke() 中使用 |
| MyCallbackHandler | 自定义回调处理器 | on_chain_start(), on_chain_end(), on_chain_error() |
演示回调功能 |
28.5.2 类图 #

28.5.3 时序图 #
28.5.3.1 (1-9)调用流程 #
28.5.3.2 10调用流程 #
28.5.4 调用过程 #
28.5.4.1 (1-9)调用流程 #
创建 Runnable 实例
runnable1 = RunnableLambda(to_upper):将to_upper包装为 Runnablerunnable2 = RunnableLambda(add_prefix):将add_prefix包装为 Runnable
构建处理链
chain = runnable1 | runnable2:通过__or__创建RunnableSequence,包含[runnable1, runnable2]
调用链并传递配置
chain.invoke("hello", config=config1):RunnableSequence.invoke()处理回调(如有)- 依次调用
runnable1.invoke("hello", config=config)和runnable2.invoke(result1, config=config) - 每个
RunnableLambda.invoke()会:- 检查函数是否接受
config参数(通过_accept_config()) - 处理回调(如有)
- 调用被包装的函数,传递
config
- 检查函数是否接受
- 返回最终结果
28.5.4.2 (10)调用流程 #
绑定配置
runnable1.with_config({"metadata": {"user_id": "001"}}):Runnable.with_config()创建RunnableBinding,绑定配置
runnable2.with_config({"metadata": {"user_id": "002"}}):同样创建RunnableBinding
构建链
chain = B1 | B2:创建包含两个RunnableBinding的RunnableSequence
调用链
chain.invoke("hello"):RunnableSequence依次调用B1.invoke()和B2.invoke()- 每个
RunnableBinding.invoke()会:- 使用
_merge_configs()合并绑定的配置和传入的配置 - 调用底层
Runnable.invoke(),传递合并后的配置
- 使用
- 每个 runnable 使用各自绑定的配置
28.5.4.3 配置传递机制 #
- 配置合并:
_merge_configs()支持嵌套字典的深度合并(如metadata) - 配置传递:
config通过invoke()参数链式传递 - 函数接收:
RunnableLambda通过_accept_config()检测函数签名,自动传递config
28.5.4.4 回调机制 #
- 回调触发时机:
on_chain_start:链/步骤开始on_chain_end:链/步骤成功结束on_chain_error:链/步骤出错
- 回调传递:
config中的callbacks会传递给链中的每个 Runnable - 回调执行:每个 Runnable 在执行前后触发相应回调
29.configurable_fields #
- configurable_fields 机制允许你将任意可运行组件(如 LLM、Chain、RunnableLambda 等)的部分参数声明为“可动态配置”。
- 这些参数可在执行时通过 config 字典灵活传入,从而在不重建对象的前提下,实现推理/测试阶段参数的临时调整。
核心原理
- ConfigurableField 类描述每个“可配置字段”的元信息:唯一 id、可选的显示名与说明,方便自动化表单、文档生成与 UI 集成。
- configurable_fields() 方法用于在已有实例基础上,声明一组参数为可动态配置。例如:
llm = ChatOpenAI(model="gpt-4", temperature=0).configurable_fields( temperature=ConfigurableField( id="temperature", name="温度", description="控制输出多样性" ) ) - 这样构造出的新对象,普通调用时不变;如需改变 temperature 参数,只需:
result = llm.invoke("hi", config={"configurable": {"temperature": 0.8}})
使用方式&流程
- 声明可配置参数
可单字段,也可多字段。每个参数用 ConfigurableField 描述其含义便于前端/配置 UI 生成。chain = SomeChain().configurable_fields( paramA=ConfigurableField(id="paramA", name="A参数"), paramB=ConfigurableField(id="paramB", name="B参数") ) 调用时注入实际值
在 .invoke 或 .batch 等方法时,通过 config={"configurable": {...}} 指定参数的实际值即可。
优先级高于对象构造时的默认值(仅在本次调用生效)。chain.invoke(data, config={"configurable": {"paramA": 42}})多层嵌套链路动态传递
支持在 RunnableSequence、Chain 等流水线自动传递配置,每层只需声明支持哪些参数。- 外层 config 统一传递,下游组件如有声明同名 configurable 字段,则自动更新其实例参数。
应用场景举例
- LLM 多温度对比实验:无需反复 new,只需声明 temperature 可配置字段,每批次切换不同温度采样即可。
- 参数可视化调优 UI:可自动读取 ConfigurableField 元信息,动态生成参数表单,便于用户业务侧灵活调整。
- 模型流水线/Chain/多步推理:“中间参数”流式注入,每步算法参数独立声明并支持任务级 hot update。
- 批量 grid-search、自动化测试等场景:只需批量调整 config 字典即可实现参数网格采样。
核心优势
- 无需重建对象,更高效地批量变动配置;
- 允许参数动态热注入与覆盖,完美适配实验与业务环境下配置频繁修改需求;
- 便于自动生成配置文档/可视化界面/参数面板;
- 机制适用于所有遵循 Runnable 协议组件,通用性强。
注意事项
- 仅声明为 configurable_fields 的参数才能动态变更,其余参数仍然为构造时定值;
- config={"configurable": ...} 仅影响本次调用,不会全局修改对象默认属性;
- 合理选择字段(如 temperature、top_k、api_base_url 等典型需热切换参数)纳入可配置。
简明总结
configurable_fields 让你的链条、模型接口动静结合,既可固定默认参数又能随时注入变化,极大提升了代码灵活性与可维护性,是构建大型 LLM 应用不可或缺的基础设施。
29.1. 29.configurable_fields.py #
29.configurable_fields.py
# 导入 ChatOpenAI 类,用于与 OpenAI 聊天模型交互
from smartchain.chat_models import ChatOpenAI
# 导入 ConfigurableField 类,用于定义可配置的参数字段
from smartchain.runnables import ConfigurableField
# 使用 ChatOpenAI 初始化模型,指定模型名称为 "gpt-4o",temperature 设置为 0
# 通过 configurable_fields 方法使 temperature 字段可以动态配置
# ConfigurableField 需要 id 参数,name 和 description 为可选参数
llm = ChatOpenAI(model="gpt-4o", temperature=0).configurable_fields(
temperature=ConfigurableField(
id="temperature", # 参数唯一标识符
name="温度值", # 参数显示名称(可选)
description="LLM 的采样温度参数,控制输出的多样性" # 参数描述(可选)
)
)
# 打印说明信息,表示当前 temperature 配置为 1.0
print("\ntemperature=1.0:")
# 使用 llm.invoke 方法发送对话内容,并通过 config 参数传递 temperature=1.0 动态配置
result3 = llm.invoke("你好,你怎么样?", config={"configurable": {"temperature": 1.0}})
# 输出模型回复的内容
print(result3.content)29.2. chat_models.py #
smartchain/chat_models.py
# 导入操作系统相关模块
import os
# 导入 openai 模块
import openai
# 从 .messages 模块导入 AIMessage、HumanMessage 和 SystemMessage 类
from .messages import AIMessage, HumanMessage, SystemMessage
from .prompts import ChatPromptValue
# 定义与 OpenAI 聊天模型交互的类
class ChatOpenAI:
# 初始化方法
def __init__(self, model: str = "gpt-4o", **kwargs):
# 初始化 ChatOpenAI 类
"""
初始化 ChatOpenAI
Args:
model: 模型名称,如 "gpt-4o"
**kwargs: 其他参数(如 temperature, max_tokens 等)
"""
# 设置模型名称
self.model = model
# 获取 api_key,优先从参数获取,否则从环境变量获取
self.api_key = kwargs.get("api_key") or os.getenv("OPENAI_API_KEY")
# 如果没有提供 api_key,则抛出异常
if not self.api_key:
raise ValueError("需要提供 api_key 或设置 OPENAI_API_KEY 环境变量")
# 保存除 api_key 之外的其他参数,用于 API 调用
self.model_kwargs = {k: v for k, v in kwargs.items() if k != "api_key"}
# 创建 OpenAI 客户端实例
self.client = openai.OpenAI(api_key=self.api_key)
# 调用模型生成回复的方法
def invoke(self, input, **kwargs):
# 调用模型生成回复
"""
调用模型生成回复
Args:
input: 输入内容,可以是字符串或消息列表
**kwargs: 额外的 API 参数
Returns:
AIMessage: AI 的回复消息
"""
# 将输入数据转换为消息格式
messages = self._convert_input(input)
# 构建 API 请求参数字典
params = {
"model": self.model,
"messages": messages,
**self.model_kwargs,
**kwargs
}
# 使用 OpenAI 客户端发起 chat.completions.create 调用获取回复
response = self.client.chat.completions.create(**params)
# 取出返回结果中的第一个选项
choice = response.choices[0]
# 获取消息内容
content = choice.message.content or ""
# 返回一个 AIMessage 对象
return AIMessage(content=content)
# 流式调用模型生成回复的方法
def stream(self, input, **kwargs):
# 流式调用模型生成回复
"""
流式调用模型生成回复
Args:
input: 输入内容,可以是字符串或消息列表
**kwargs: 额外的 API 参数
Yields:
AIMessage: AI 的回复消息块(每次产生部分内容)
"""
# 将输入数据转换为消息格式
messages = self._convert_input(input)
# 构建 API 请求参数字典,启用流式输出
params = {
"model": self.model,
"messages": messages,
"stream": True, # 启用流式输出
**self.model_kwargs,
**kwargs
}
# 使用 OpenAI 客户端发起流式调用
stream = self.client.chat.completions.create(**params)
# 迭代流式响应
for chunk in stream:
# 检查是否有内容增量
if chunk.choices and len(chunk.choices) > 0:
delta = chunk.choices[0].delta
# 检查 delta 中是否有 content,如果有则发送
if hasattr(delta, 'content') and delta.content:
# 产生包含部分内容的 AIMessage
yield AIMessage(content=delta.content)
# 内部方法,将输入转换为 OpenAI API 需要的消息格式
def _convert_input(self, input):
# 将输入转换为 OpenAI API 需要的消息格式
"""
将输入转换为 OpenAI API 需要的消息格式
Args:
input: 字符串、消息列表或 ChatPromptValue
Returns:
list[dict]: OpenAI API 格式的消息列表
"""
if isinstance(input, ChatPromptValue):
input = input.to_messages()
# 输入为字符串时,直接封装为用户角色消息
if isinstance(input, str):
return [{"role": "user", "content": input}]
# 如果输入是列表类型
elif isinstance(input, list):
# 新建一个空的消息列表
messages = []
# 遍历输入列表中的每一个元素
for msg in input:
# 判断是否为字符串,是则作为用户消息加入
if isinstance(msg, str):
messages.append({"role": "user", "content": msg})
# 判断是否为 HumanMessage、AIMessage 或 SystemMessage 实例
elif isinstance(msg, (HumanMessage, AIMessage, SystemMessage)):
# 如果是 HumanMessage,将角色设为 user
if isinstance(msg, HumanMessage):
role = "user"
# 如果是 AIMessage,将角色设为 assistant
elif isinstance(msg, AIMessage):
role = "assistant"
# 如果是 SystemMessage,将角色设为 system
elif isinstance(msg, SystemMessage):
role = "system"
# 获取消息内容(有 content 属性则取 content,否则转为字符串)
content = msg.content if hasattr(msg, "content") else str(msg)
# 将角色和内容添加到消息列表
messages.append({"role": role, "content": content})
# 如果元素本身为字典,直接添加进消息列表
elif isinstance(msg, dict):
# 直接添加字典类型的消息
messages.append(msg)
# 如果元素为长度为 2 的元组,将其解包为 role 和 content
elif isinstance(msg, tuple) and len(msg) == 2:
# 将元组解包为 role 和 content
role, content = msg
# 将角色和内容添加到消息列表
messages.append({"role": role, "content": content})
# 返回构建好的消息列表
return messages
else:
# 其他输入类型,转为字符串作为 user 消息
return [{"role": "user", "content": str(input)}]
# 定义可配置字段的方法,用于包装当前实例,支持部分参数运行时动态调整
+ def configurable_fields(self, **fields):
# 配置可动态调整的字段
+ """
+ 配置可动态调整的字段
+ Args:
+ **fields: 可配置字段的字典,键为字段名,值为 ConfigurableField 实例
+ Returns:
+ RunnableConfigurableFields: 包装后的 Runnable 实例
+ 示例:
+ ``python
+ from smartchain.runnables import ConfigurableField
+ llm = ChatOpenAI(temperature=0).configurable_fields(
+ temperature=ConfigurableField(
+ id="temperature",
+ name="温度值",
+ description="LLM 的采样温度参数"
+ )
+ )
# 使用默认 temperature=0
+ result1 = llm.invoke("你好")
# 使用 temperature=1.0
+ result2 = llm.invoke("你好", config={"configurable": {"temperature": 1.0}})
+ ``
+ """
# 从当前目录导入 RunnableConfigurableFields 类
+ from .runnables import RunnableConfigurableFields
# 返回将当前实例 self 及配置字段 fields 包装后的新对象
+ return RunnableConfigurableFields(default=self, fields=fields)
# 定义与 DeepSeek 聊天模型交互的类
class ChatDeepSeek:
# 初始化方法
# model: 模型名称,默认为 "deepseek-chat"
# **kwargs: 其他可选参数(如 temperature, max_tokens 等)
def __init__(self, model: str = "deepseek-chat", **kwargs):
"""
初始化 ChatDeepSeek
Args:
model: 模型名称,如 "deepseek-chat"
**kwargs: 其他参数(如 temperature, max_tokens 等)
"""
# 设置模型名称
self.model = model
# 获取 api_key,优先从参数获取,否则从环境变量获取
self.api_key = kwargs.get("api_key") or os.getenv("DEEPSEEK_API_KEY")
# 如果没有提供 api_key,则抛出异常
if not self.api_key:
raise ValueError("需要提供 api_key 或设置 DEEPSEEK_API_KEY 环境变量")
# 保存除 api_key 之外的其他参数,用于 API 调用
self.model_kwargs = {k: v for k, v in kwargs.items() if k != "api_key"}
# 获取 DeepSeek 的 base_url,默认为官方地址
base_url = kwargs.get("base_url", "https://api.deepseek.com/v1")
# 创建 OpenAI 兼容的客户端实例(DeepSeek 使用 OpenAI 兼容的 API)
self.client = openai.OpenAI(api_key=self.api_key, base_url=base_url)
# 调用模型生成回复的方法
# input: 输入内容,可以是字符串或消息列表
# **kwargs: 额外的 API 参数
def invoke(self, input, **kwargs):
"""
调用模型生成回复
Args:
input: 输入内容,可以是字符串或消息列表
**kwargs: 额外的 API 参数
Returns:
AIMessage: AI 的回复消息
"""
# 将输入数据转换为消息格式
messages = self._convert_input(input)
# 构建 API 请求参数字典
params = {
"model": self.model,
"messages": messages,
**self.model_kwargs,
**kwargs
}
# 使用 OpenAI 兼容的客户端发起 chat.completions.create 调用获取回复
response = self.client.chat.completions.create(**params)
# 取出返回结果中的第一个选项
choice = response.choices[0]
# 获取消息内容
content = choice.message.content or ""
# 返回一个 AIMessage 对象
return AIMessage(content=content)
# 内部方法,将输入转换为 API 需要的消息格式
# input: 字符串、消息列表或 ChatPromptValue
def _convert_input(self, input):
"""
将输入转换为 API 需要的消息格式
Args:
input: 字符串、消息列表或 ChatPromptValue
Returns:
list[dict]: API 格式的消息列表
"""
# 如果输入是字符串,直接作为用户消息
if isinstance(input, str):
return [{"role": "user", "content": input}]
else:
# 其他输入类型,转为字符串作为 user 消息
return [{"role": "user", "content": str(input)}]
# 定义与通义千问(Tongyi)聊天模型交互的类
class ChatTongyi:
# 初始化方法
# 初始化方法,设置模型名称和 API 相关参数
def __init__(self, model: str = "qwen-max", **kwargs):
"""
初始化 ChatTongyi
Args:
model: 模型名称,如 "qwen-max"
**kwargs: 其他参数(如 temperature, max_tokens 等)
"""
# 设置模型名称
self.model = model
# 获取 api_key,优先从参数获取,否则从环境变量获取
self.api_key = kwargs.get("api_key") or os.getenv("DASHSCOPE_API_KEY")
# 如果没有提供 api_key,则抛出异常
if not self.api_key:
raise ValueError("需要提供 api_key 或设置 DASHSCOPE_API_KEY 环境变量")
# 保存除 api_key 之外的其他参数,用于 API 调用
self.model_kwargs = {k: v for k, v in kwargs.items() if k != "api_key"}
# 获取通义千问的 API base URL(使用 OpenAI 兼容模式),如果未指定则使用默认值
base_url = kwargs.get("base_url", "https://dashscope.aliyuncs.com/compatible-mode/v1")
# 创建 OpenAI 兼容的客户端实例(通义千问使用 OpenAI 兼容的 API)
self.client = openai.OpenAI(api_key=self.api_key, base_url=base_url)
# 调用模型生成回复的方法
# 调用模型生成回复,返回 AIMessage 对象
def invoke(self, input, **kwargs):
"""
调用模型生成回复
Args:
input: 输入内容,可以是字符串或消息列表
**kwargs: 额外的 API 参数
Returns:
AIMessage: AI 的回复消息
"""
# 将输入数据转换为消息格式
messages = self._convert_input(input)
# 构建 API 请求参数字典,包含模型名、消息内容和其他参数
params = {
"model": self.model,
"messages": messages,
**self.model_kwargs,
**kwargs
}
# 使用 OpenAI 兼容的客户端发起 chat.completions.create 调用以获取回复
response = self.client.chat.completions.create(**params)
# 取出返回结果中的第一个回复选项
choice = response.choices[0]
# 获取回复的消息内容,如果内容不存在则返回空字符串
content = choice.message.content or ""
# 构建并返回一个 AIMessage 对象
return AIMessage(content=content)
# 内部方法,将输入转换为 API 需要的消息格式
# 支持字符串、消息列表等输入,统一包装为 OpenAI API 格式
def _convert_input(self, input):
"""
将输入转换为 API 需要的消息格式
Args:
input: 字符串、消息列表或 ChatPromptValue
Returns:
list[dict]: API 格式的消息列表
"""
# 如果输入是字符串,直接包装为“用户”角色的消息
if isinstance(input, str):
return [{"role": "user", "content": input}]
else:
# 其他输入类型,转换为字符串作为“用户”消息内容
return [{"role": "user", "content": str(input)}] 29.3. runnables.py #
smartchain/runnables.py
# 导入抽象基类 (ABC: 抽象基类基类,abstractmethod: 用于定义抽象方法)
from abc import ABC, abstractmethod
import time
import random
import inspect
import uuid as uuid_module
from .config import ensure_config,_accept_config
# 定义 Runnable 抽象基类,所有可运行单元必须继承它
class Runnable(ABC):
"""
Runnable 抽象基类
所有可运行组件的基础接口,定义了统一的调用方法。
"""
# 抽象方法,子类必须实现,用于同步调用
@abstractmethod
def invoke(self, input, config = None, **kwargs):
"""
同步调用 Runnable
Args:
input: 输入值
config: 可选的配置字典
**kwargs: 额外的关键字参数
Returns:
输出值
"""
pass # 仅做接口规范,子类务必实现
def stream(self, input, config = None, **kwargs):
"""
流式调用 Runnable
默认实现:先调用 invoke,若返回可迭代且不是字符串/字节/字典,则逐项 yield;
否则直接 yield 单值。
"""
result = self.invoke(input, config=config, **kwargs)
# 字符串/字节/字典不视为流式可迭代,直接返回单值
if hasattr(result, "__iter__") and not isinstance(result, (str, bytes, dict)):
for item in result:
yield item
else:
yield result
# 管道操作符,便于链式拼接
def __or__(self, other):
if not isinstance(other, Runnable):
raise TypeError("管道右侧必须是 Runnable 实例")
return RunnableSequence([self, other])
# 定义批量调用方法,默认实现为遍历输入逐个调用 invoke
def batch(self, inputs, config = None, **kwargs):
"""
批量调用 Runnable
Args:
inputs: 输入值列表
config: 可选的配置字典
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 对每个输入项都调用 invoke,并收集结果
return [self.invoke(input_item, config=config, **kwargs) for input_item in inputs]
# 添加重试功能,返回包装了重试逻辑的 Runnable
# 定义 with_retry 方法,为当前 Runnable 添加重试机制
def with_retry(
self,
*,
retry_if_exception_type=(Exception,), # 指定需要重试的异常类型,默认所有 Exception
stop_after_attempt=3, # 最大尝试次数,默认3次
wait_exponential_jitter=True, # 是否启用指数退避抖动
exponential_jitter_params=None, # 抖动参数字典,支持 initial/max/exp_base/jitter
):
"""
创建带重试功能的 Runnable 包装器
Args:
retry_if_exception_type: 需要重试的异常类型元组
stop_after_attempt: 最大尝试次数
wait_exponential_jitter: 是否启用指数回退抖动
exponential_jitter_params: 抖动参数,支持 initial/max/exp_base/jitter
Returns:
包装了重试逻辑的 RunnableRetry 实例
"""
# 返回带重试功能的 RunnableRetry 实例,绑定当前 runnable 和重试参数
return RunnableRetry(
bound=self,
retry_if_exception_type=retry_if_exception_type,
stop_after_attempt=stop_after_attempt,
wait_exponential_jitter=wait_exponential_jitter,
exponential_jitter_params=exponential_jitter_params,
)
def with_config(self, config=None, **kwargs):
"""
绑定配置到 Runnable,返回一个新的 Runnable
Args:
config: 要绑定的配置字典
**kwargs: 额外的关键字参数,会合并到 config 中
Returns:
一个新的 RunnableBinding 实例,包含绑定的配置
"""
# 合并 config 和 kwargs
merged_config = {}
if config:
merged_config.update(config)
if kwargs:
merged_config.update(kwargs)
# 返回 RunnableBinding 实例
return RunnableBinding(bound=self, config=merged_config)
# 定义 RunnableLambda 类,用于将普通 Python 函数封装为 Runnable 对象
class RunnableLambda(Runnable):
"""
RunnableLambda 将普通 Python 函数包装成 Runnable
这使得普通函数可以在链式调用中使用,并支持统一的 invoke 接口。
示例:
``python
def add_one(x: int) -> int:
return x + 1
runnable = RunnableLambda(add_one)
result = runnable.invoke(5) # 返回 6
results = runnable.batch([1, 2, 3]) # 返回 [2, 3, 4]
``
"""
# 初始化方法,接收一个函数和可选的名称
def __init__(self, func, name=None):
"""
初始化 RunnableLambda
Args:
func: 要包装的函数
name: Runnable 的名称(可选,默认使用函数名)
"""
# 检查传入的 func 是否为可调用对象
if not callable(func):
raise TypeError(f"func 必须是可调用对象,但得到了 {type(func)}")
# 保存待封装的函数
self.func = func
# 如果 name 明确传入则使用,否则合理推断
if name is not None:
self.name = name
else:
try:
# 尽量用函数原名,如果是 lambda 就命名为 "lambda"
self.name = func.__name__ if func.__name__ != "<lambda>" else "lambda"
except AttributeError:
# 对于匿名对象无法获取 __name__ 时兜底
self.name = "runnable"
# 实现 invoke 方法,对被封装的底层函数进行同步调用
def invoke(self, input, config = None, **kwargs):
"""
调用包装的函数
Args:
input: 输入值
config: 可选的配置字典
**kwargs: 额外的关键字参数(会传递给函数)
Returns:
函数的返回值
"""
# 保证 config 不为 None,如为 None 则转为空字典
config = ensure_config(config)
# 从配置字典中获取回调对象 callbacks
callbacks = config.get("callbacks")
# 初始化回调对象列表
callback_list = []
# 获取当前调用的唯一 ID(run_id)
run_id = config.get("run_id")
# 如果没有传入 run_id,则自动生成一个新的 uuid
if run_id is None:
run_id = uuid_module.uuid4()
# 如果 callbacks 不为空
if callbacks:
# 如果 callbacks 已经是列表,则直接用,否则转为单元素列表
if isinstance(callbacks, list):
callback_list = callbacks
else:
callback_list = [callbacks]
# 构造序列化信息,用于回调上报链条标识
serialized = {"name": self.name, "type": "RunnableLambda"}
# 遍历每个回调对象,触发其 on_chain_start 方法
for callback in callback_list:
# 只有回调对象有 on_chain_start 属性才调用
if hasattr(callback, "on_chain_start"):
try:
# 调用回调的 on_chain_start 方法,传入相关参数
callback.on_chain_start(
serialized=serialized,
inputs={"input": input},
run_id=run_id,
parent_run_id=None,
tags=config.get("tags"),
metadata=config.get("metadata"),
**kwargs
)
except Exception:
# 回调过程中如出现异常则忽略,确保主流程不会终止
pass
# 检查被包装的函数是否接收 config 参数
if _accept_config(self.func):
# 如果接收 config,则将 config 传递下去
kwargs["config"] = config
# 尝试执行被包装的核心函数
try:
# 正常调用被包装的函数,将 input 作为第一个参数,kwargs作为关键字参数
output = self.func(input, **kwargs)
except Exception as e:
# 若捕获到异常,则对所有回调触发 on_chain_error 并继续抛出异常
if callback_list:
for callback in callback_list:
if hasattr(callback, "on_chain_error"):
try:
callback.on_chain_error(
error=e,
run_id=run_id,
parent_run_id=None,
**kwargs
)
except Exception:
# 回调异常不影响主异常继续抛出
pass
# 重新抛出主流程中的异常
raise
else:
# 如果没有异常执行,顺序触发所有回调的 on_chain_end 方法
if callback_list:
for callback in callback_list:
if hasattr(callback, "on_chain_end"):
try:
callback.on_chain_end(
outputs={"output": output},
run_id=run_id,
parent_run_id=None,
**kwargs
)
except Exception:
# 回调异常不影响主逻辑输出
pass
# 返回包装函数的输出结果
return output
# 批量调用内部依然调用 invoke,保证与 Runnable 基类一致
def batch(self, inputs, config = None, **kwargs):
"""
批量调用包装的函数
Args:
inputs: 输入值列表
config: 可选的配置字典
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 调用 invoke 实现批量处理
return [self.invoke(input_item, config=config, **kwargs) for input_item in inputs]
# 流式调用:直接复用基类的流式封装
def stream(self, input, **kwargs):
"""
流式调用包装的函数
对返回生成器/可迭代对象逐项 yield;若是单值则直接 yield。
"""
yield from super().stream(input, **kwargs)
# 返回对象自身的字符串表达,便于排查与日志
def __repr__(self):
"""返回 RunnableLambda 的字符串表示"""
return f"RunnableLambda(func={self.name})"
# 定义 RunnableParallel,继承自 Runnable
class RunnableParallel(Runnable):
"""
并行执行多个 Runnable,返回字典结果。
使用示例:
parallel = RunnableParallel(a=r1, b=r2)
result = parallel.invoke(input) # {"a": ..., "b": ...}
"""
# 构造方法,接收若干个可运行对象作为关键字参数
def __init__(self, **runnables):
# 如果未传递任何 runnable,则报错
if not runnables:
raise ValueError("至少需要一个 runnable")
# 检查每个传入的值是否为 Runnable 实例
for name, r in runnables.items():
if not isinstance(r, Runnable):
raise TypeError(f"键 {name} 的值必须是 Runnable 实例")
# 保存所有传入的 runnable 到实例属性
self.runnables = runnables
# 同步调用,将相同输入传递给所有子 runnable,并收集结果为字典
def invoke(self, input, config = None, **kwargs):
"""
同一输入传给所有子 runnable,收集结果为字典。
"""
# 遍历每个 runnable,调用其 invoke,结果收集为 {name: 返回值}
return {name: r.invoke(input, config=config, **kwargs) for name, r in self.runnables.items()}
# 批量调用,对输入列表每一项都运行 invoke,返回结果字典的列表
def batch(self, inputs, config = None, **kwargs):
"""
对输入列表逐项并行处理,返回字典列表。
"""
# 对每个输入元素调用 invoke,收集所有结果
return [self.invoke(item, config=config, **kwargs) for item in inputs]
# 流式调用,直接调用父类的流式实现
def stream(self, input, **kwargs):
"""
对单次输入执行并返回一个字典,流式单次产出。
"""
# 复用基类的 stream 方法
yield from super().stream(input, **kwargs)
# 返回对象的字符串表示(列出包含的所有子 runnable 的键名)
def __repr__(self):
# 拼接所有 runnable 的键名
keys = ", ".join(self.runnables.keys())
# 返回格式化字符串
return f"RunnableParallel({keys})"
# 定义RunnableBranch类,继承自Runnable,用于条件分支执行不同runnable
class RunnableBranch(Runnable):
"""
条件分支执行:按顺序检查条件,匹配则运行对应 runnable,若都不匹配则走默认分支。
"""
# 构造方法,接受若干分支参数
def __init__(self, *branches):
"""
支持“默认分支作为最后一个位置参数”的用法:
RunnableBranch((cond1, r1), (cond2, r2), default_runnable)
"""
# 分支数量必须至少2(至少一个条件+一个默认)
if len(branches) < 2:
raise ValueError("至少需要一个条件分支和一个默认分支")
# 将分支参数转为列表
branches_list = list(branches)
# 最后一个参数视为默认分支
default = branches_list.pop() # 最后一个位置参数为默认分支
# 校验每个分支
validated_branches = []
for item in branches_list:
# 每个分支需为二元组或二元列表
if not (isinstance(item, (tuple, list)) and len(item) == 2):
raise TypeError("分支必须是 (condition, runnable) 形式的二元组")
# 解包条件函数和runnable
cond, runnable = item
# 条件必须为可调用对象
if not callable(cond):
raise TypeError("分支条件必须是可调用对象")
# runnable必须是Runnable实例
if not isinstance(runnable, Runnable):
raise TypeError("分支 runnable 必须是 Runnable 实例")
# 校验通过则加入分支列表
validated_branches.append((cond, runnable))
# 校验默认分支必须为Runnable实例
if not isinstance(default, Runnable):
raise TypeError("默认分支必须是 Runnable 实例")
# 保存所有条件分支
self.branches = validated_branches
# 保存默认分支
self.default = default
# 单个输入同步调用方法
def invoke(self, input, config = None, **kwargs):
"""
按顺序匹配条件,命中即执行对应 runnable;否则走默认分支。
"""
# 遍历所有分支,遇到条件命中则执行对应runnable
for cond, runnable in self.branches:
if cond(input):
return runnable.invoke(input, config=config, **kwargs)
# 如果有默认分支则执行默认runnable
if self.default is not None:
return self.default.invoke(input, config=config, **kwargs)
# 无匹配分支时报错
raise ValueError("未匹配到任何分支,且未提供默认分支")
# 批量调用,遍历输入批量执行invoke
def batch(self, inputs, config = None, **kwargs):
# 对输入列表逐一执行invoke
return [self.invoke(item, config=config, **kwargs) for item in inputs]
# 流式调用,直接调用父类的stream方法
def stream(self, input, **kwargs):
# 复用父类的流式实现
yield from super().stream(input, **kwargs)
# 返回对象简洁字符串表示
def __repr__(self):
# 拼接分支编号
parts = [f"branch{idx}" for idx, _ in enumerate(self.branches)]
# 若有默认分支则拼接default字符串
if self.default:
parts.append("default")
# 格式化输出
return f"RunnableBranch({', '.join(parts)})"
class RunnablePassthrough(Runnable):
"""
直通型 Runnable:原样返回输入,不做任何处理。
可用于调试或需要保留原始输入的场景。
"""
def invoke(self, input, config = None, **kwargs):
return input
def batch(self, inputs, config = None, **kwargs):
return list(inputs)
def stream(self, input, **kwargs):
# 复用基类流式封装(对单值直接 yield)
yield from super().stream(input, **kwargs)
def __repr__(self):
return "RunnablePassthrough()"
# 定义 RunnableSequence 类,用于实现可运行对象的链式组合(A | B | C 的效果)
class RunnableSequence(Runnable):
"""
Runnable 组合序列,用于支持 A | B | C 的链式拼接。
"""
# 初始化方法,接收一个 Runnable 对象的列表
def __init__(self, runnables):
# 检查传入的 runnables 列表不能为空
if not runnables:
raise ValueError("runnables 不能为空")
# 校验每一个元素都必须是 Runnable 实例
for r in runnables:
if not isinstance(r, Runnable):
raise TypeError("runnables 需全部为 Runnable 实例")
# 保存连成链的 runnable 组件
self.runnables = runnables
# 实现管道操作符 |,使链式拼接成立
def __or__(self, other):
# 右侧对象必须也是 Runnable 实例
if not isinstance(other, Runnable):
raise TypeError("管道右侧必须是 Runnable 实例")
# 返回新的组合链(原有链 + 新加的 runnable)
return RunnableSequence(self.runnables + [other])
# 调用链的同步调用,将输入依次传过所有组件
def invoke(self, input, config = None, **kwargs):
"""
逐个执行链条:上一步输出作为下一步输入。
"""
# 确保 config 存在
config = ensure_config(config)
# 处理回调:如果有 callbacks,则触发链的开始回调
callbacks = config.get("callbacks")
# 初始化回调列表
callback_list = []
# 获取 run_id
run_id = config.get("run_id")
# 如果 run_id 为 None,则生成一个新的 uuid
if run_id is None:
run_id = uuid_module.uuid4()
# 如果 callbacks 不为空
if callbacks:
# 如果 callbacks 是列表,则直接赋值给 callback_list
if isinstance(callbacks, list):
callback_list = callbacks
# 如果 callbacks 不是列表,则转换为单元素列表
else:
callback_list = [callbacks]
# 序列化信息,用于回调上报链条标识
serialized = {"name": "RunnableSequence", "type": "chain"}
# 遍历每个回调对象,触发其 on_chain_start 方法
for callback in callback_list:
# 只有回调对象有 on_chain_start 属性才调用
if hasattr(callback, "on_chain_start"):
# 调用回调的 on_chain_start 方法,传入相关参数
try:
# 调用回调的 on_chain_start 方法,传入相关参数
callback.on_chain_start(serialized, {"input": input}, run_id=run_id, parent_run_id=None, tags=config.get("tags"), metadata=config.get("metadata"), **kwargs)
except Exception:
# 回调过程中如出现异常则忽略,确保主流程不会终止
pass
# 初始 value 为输入 input
value = input
try:
# 依次调用每个 runnable 的 invoke,并传递最新的 value
for runnable in self.runnables:
value = runnable.invoke(value, config=config, **kwargs)
except Exception as e:
# 若捕获到异常,则对所有回调触发 on_chain_error 并继续抛出异常
if callback_list:
for callback in callback_list:
# 只有回调对象有 on_chain_error 属性才调用
if hasattr(callback, "on_chain_error"):
try:
# 调用回调的 on_chain_error 方法,传入相关参数
callback.on_chain_error(e, run_id=run_id, parent_run_id=None, **kwargs)
except Exception:
# 回调过程中如出现异常则忽略,确保主流程不会终止
pass
raise
else:
# 如果没有异常执行,顺序触发所有回调的 on_chain_end 方法
if callback_list:
for callback in callback_list:
# 只有回调对象有 on_chain_end 属性才调用
if hasattr(callback, "on_chain_end"):
try:
# 调用回调的 on_chain_end 方法,传入相关参数
callback.on_chain_end(outputs={"output": value}, run_id=run_id, parent_run_id=None, **kwargs)
except Exception:
# 回调过程中如出现异常则忽略,确保主流程不会终止
pass
# 返回最后一步的输出值
return value
# 批量调用,输入为多个 input,结果为每个 input 执行完整链条的输出
def batch(self, inputs, config = None, **kwargs):
"""
对输入列表逐项执行同一条链。
"""
# 逐项调用 invoke,收集所有输出
return [self.invoke(item, config=config, **kwargs) for item in inputs]
# 流式调用,默认复用基类逻辑(只对链最终结果流式分发)
def stream(self, input, **kwargs):
"""
流式执行:沿用基类逻辑,对最终结果做流式分发。
"""
# 使用基类 stream
yield from super().stream(input, **kwargs)
# 定义字符串表示,便于调试,输出链路结构
def __repr__(self):
# 获取每个 runnable 的名字,用"|"拼接成描述
names = " | ".join(getattr(r, "name", r.__class__.__name__) for r in self.runnables)
# 返回自定义格式
return f"RunnableSequence({names})"
# 定义 RunnableRetry 类,用于包装 Runnable 并添加重试逻辑
class RunnableRetry(Runnable):
"""
带重试功能的 Runnable 包装器
当底层 runnable 抛出指定异常时,会自动重试指定次数。
"""
# 初始化方法,接受被包装的 runnable 以及重试参数
def __init__(
self,
bound,
retry_if_exception_type=(Exception,),
stop_after_attempt=3,
wait_exponential_jitter=True,
exponential_jitter_params=None,
):
"""
初始化 RunnableRetry
Args:
bound: 被包装的 Runnable 对象
retry_if_exception_type: 需要重试的异常类型元组
stop_after_attempt: 最大尝试次数
wait_exponential_jitter: 是否启用指数回退抖动
exponential_jitter_params: 抖动参数 initial/max/exp_base/jitter
"""
# 保存底层被包装的 Runnable
self.bound = bound
# 保存需要重试的异常类型
self.retry_if_exception_type = retry_if_exception_type
# 保存最大尝试次数
self.stop_after_attempt = stop_after_attempt
# 保存是否启用指数回退抖动
self.wait_exponential_jitter = wait_exponential_jitter
# 保存指数回退相关参数(若为 None 则用空字典兜底)
self.exponential_jitter_params = exponential_jitter_params or {}
# 实现同步调用(自动重试机制)
def invoke(self, input, config = None, **kwargs):
"""
调用底层 runnable,失败时自动重试
"""
# 用于记录最后一次抛出的异常
last_exception = None
# 解析重试等待的各项参数
initial = self.exponential_jitter_params.get("initial", 0.1) # 初始延迟
max_wait = self.exponential_jitter_params.get("max", 10.0) # 最大延迟
exp_base = self.exponential_jitter_params.get("exp_base", 2.0) # 幂指数基数
jitter = self.exponential_jitter_params.get("jitter", 0.0) # 抖动范围
# 尝试多次调用,直到最大次数
for attempt in range(1, self.stop_after_attempt + 1):
try:
# 调用底层的 invoke 方法
return self.bound.invoke(input, config=config, **kwargs)
# 捕获需要重试的异常类型
except self.retry_if_exception_type as e:
# 保存本次捕获的异常
last_exception = e
# 若还没到最大次数,可以重试
if attempt < self.stop_after_attempt:
# 判断是否使用指数回退
if self.wait_exponential_jitter:
# 计算当前次的延迟
delay = min(max_wait, initial * (exp_base ** (attempt - 1)))
# 如果配置了 jitter,叠加一个随机抖动
if jitter > 0:
delay += random.uniform(0, jitter)
else:
# 不指数回退则用 initial 固定延迟
delay = initial
# 等待指定时间再重试
time.sleep(delay)
else:
# 达到最大次数仍然失败则抛出最后一次异常
raise last_exception
except Exception:
# 如果是完全不在重试范围的异常,直接抛出
raise
# 如果所有尝试都失败,最终抛出异常
raise last_exception
# 实现批量调用,每个输入独立重试
def batch(self, inputs, config = None, **kwargs):
"""
批量调用,每个输入独立重试
"""
# 对每个输入都单独执行 invoke,收集结果为列表
return [self.invoke(item, config=config, **kwargs) for item in inputs]
# 实现流式调用,直接复用基类逻辑
def stream(self, input, **kwargs):
"""
流式调用,复用基类实现
"""
# 使用父类的 stream,yield 结果
yield from super().stream(input, **kwargs)
# 返回自身字符串表示,便于调试查看 retry 配置与绑定对象
def __repr__(self):
return f"RunnableRetry(bound={self.bound}, max_attempts={self.stop_after_attempt})"
# 工具函数:合并配置字典
def _merge_configs(*configs):
"""
合并多个配置字典
Args:
*configs: 要合并的配置字典列表
Returns:
合并后的配置字典
"""
result = {}
for config in configs:
if config:
# 对于嵌套字典(如 metadata),需要深度合并
for key, value in config.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = {**result[key], **value}
else:
result[key] = value
return result
# 定义 RunnableBinding 类,用于包装 Runnable 并绑定配置
class RunnableBinding(Runnable):
"""
Runnable 绑定包装器
用于将配置绑定到 Runnable,返回一个新的 Runnable 实例。
当调用绑定的 Runnable 时,会自动合并绑定的配置和传入的配置。
"""
def __init__(self, bound, config=None, kwargs=None):
"""
初始化 RunnableBinding
Args:
bound: 要绑定的底层 Runnable 实例
config: 要绑定的配置字典
kwargs: 要绑定的额外关键字参数(暂未使用)
"""
if not isinstance(bound, Runnable):
raise TypeError("bound 必须是 Runnable 实例")
self.bound = bound
self.config = ensure_config(config) or {}
self.kwargs = kwargs or {}
def invoke(self, input, config=None, **kwargs):
"""
调用绑定的 Runnable,合并配置
Args:
input: 输入值
config: 可选的配置字典,会与绑定的配置合并
**kwargs: 额外的关键字参数
Returns:
底层 Runnable 的返回值
"""
# 合并绑定的配置和传入的配置
merged_config = _merge_configs(self.config, config)
# 合并关键字参数
merged_kwargs = {**self.kwargs, **kwargs}
# 调用底层 Runnable
return self.bound.invoke(input, config=merged_config, **merged_kwargs)
def batch(self, inputs, config=None, **kwargs):
"""
批量调用绑定的 Runnable,合并配置
Args:
inputs: 输入值列表
config: 可选的配置字典,会与绑定的配置合并
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 合并绑定的配置和传入的配置
merged_config = _merge_configs(self.config, config)
# 合并关键字参数
merged_kwargs = {**self.kwargs, **kwargs}
# 调用底层 Runnable
return self.bound.batch(inputs, config=merged_config, **merged_kwargs)
def stream(self, input, config=None, **kwargs):
"""
流式调用绑定的 Runnable,合并配置
Args:
input: 输入值
config: 可选的配置字典,会与绑定的配置合并
**kwargs: 额外的关键字参数
Yields:
底层 Runnable 的流式输出
"""
# 合并绑定的配置和传入的配置
merged_config = _merge_configs(self.config, config)
# 合并关键字参数
merged_kwargs = {**self.kwargs, **kwargs}
# 调用底层 Runnable
yield from self.bound.stream(input, config=merged_config, **merged_kwargs)
def __repr__(self):
"""返回对象的字符串表示"""
return f"RunnableBinding(bound={self.bound}, config={self.config})"
# 定义 ConfigurableField 类,用于配置可动态调整的字段
+from collections import namedtuple
+ConfigurableField = namedtuple(
+ "ConfigurableField",
+ ["id", "name", "description", "annotation", "is_shared"],
+ defaults=(None, None, None, False)
+)
+"""可配置字段的定义
+Args:
+ id: 字段的唯一标识符,在 config["configurable"] 中使用
+ name: 字段的显示名称(可选)
+ description: 字段的描述(可选)
+ annotation: 字段的类型注解(可选)
+ is_shared: 字段是否共享(可选,默认 False)
+"""
# 定义 RunnableConfigurableFields 类,用于包装 Runnable 并支持动态配置字段
+class RunnableConfigurableFields(Runnable):
+ """
+ Runnable 可配置字段包装器
+ 用于将 Runnable 的某些字段配置为可在运行时动态调整。
+ 当调用时,会从 config["configurable"] 中读取配置值,并创建新的实例。
+ 示例:
+ ``python
+ from smartchain.runnables import ConfigurableField
+ llm = ChatOpenAI(temperature=0).configurable_fields(
+ temperature=ConfigurableField(
+ id="temperature",
+ name="温度值",
+ description="LLM 的采样温度参数"
+ )
+ )
# 使用默认 temperature=0
+ result1 = llm.invoke("你好")
# 使用 temperature=1.0
+ result2 = llm.invoke("你好", config={"configurable": {"temperature": 1.0}})
+ ``
+ """
# 构造函数:接收默认可执行对象和字段描述字典
+ def __init__(self, default, fields):
+ """
+ 初始化 RunnableConfigurableFields
+ Args:
+ default: 默认的 Runnable 实例或具有 invoke 方法的对象
+ fields: 可配置字段的字典,键为字段名,值为 ConfigurableField 实例
+ """
# 检查 default 是否为 Runnable 实例或者拥有 invoke 方法
+ if not (isinstance(default, Runnable) or (hasattr(default, 'invoke') and callable(getattr(default, 'invoke')))):
+ raise TypeError("default 必须是 Runnable 实例或具有 invoke 方法的对象")
# 保存默认实例
+ self.default = default
# 保存字段配置(如果未传入则设为{})
+ self.fields = fields or {}
# 内部方法,根据 config 动态生成实例,应用动态配置
+ def _prepare(self, config=None):
+ """
+ 准备 Runnable 实例和配置
+ 从 config["configurable"] 中读取配置值,并创建新的实例。
+ Args:
+ config: 配置字典
+ Returns:
+ tuple: (Runnable 实例, 配置字典)
+ """
# 规范化 config(保证为字典)
+ config = ensure_config(config)
# 从 config 取出 configurable 配置
+ configurable = config.get("configurable", {})
# 收集需要修改的字段和值
+ updates = {}
+ for field_name, field_spec in self.fields.items():
# 检查字段是否为 ConfigurableField
+ if isinstance(field_spec, ConfigurableField):
# 从 config 找对应 id 的值
+ config_value = configurable.get(field_spec.id)
+ if config_value is not None:
+ updates[field_name] = config_value
# 有更新内容则创建新实例
+ if updates:
# 获取默认实例的类型
+ default_class = type(self.default)
# 获取类型名
+ class_name = default_class.__name__
# 对于特定聊天模型需要特殊参数处理
+ if class_name in ('ChatOpenAI', 'ChatDeepSeek', 'ChatTongyi'):
# 构造初始化参数 dict,必须包含 model
+ init_params = {
+ 'model': self.default.model,
+ }
# 如果有 model_kwargs 就复制
+ if hasattr(self.default, 'model_kwargs'):
+ init_params.update(self.default.model_kwargs.copy())
# 增加本次需更新的参数
+ init_params.update(updates)
# 保持 api_key(如有)
+ if hasattr(self.default, 'api_key'):
+ init_params['api_key'] = self.default.api_key
# 保持 base_url(如有)
+ if hasattr(self.default, 'base_url'):
+ init_params['base_url'] = getattr(self.default, 'base_url', None)
# 构造新实例
+ new_instance = default_class(**init_params)
+ return (new_instance, config)
+ else:
# 对于其他类型的实例采用通用方法
+ if hasattr(self.default, '__dict__'):
# 使用对象字段构建参数(忽略以 _ 开头的字段)
+ init_params = {k: v for k, v in self.default.__dict__.items()
+ if not k.startswith('_')}
+ else:
# 无法获取 __dict__ 则用空参数
+ init_params = {}
# 更新参数
+ init_params.update(updates)
+ try:
# 尝试直接用参数构造新实例
+ new_instance = default_class(**init_params)
+ return (new_instance, config)
+ except Exception:
# 构造失败则深拷贝实例并赋值
+ import copy
+ new_instance = copy.deepcopy(self.default)
+ for key, value in updates.items():
# 优先直接设置属性
+ if hasattr(new_instance, key):
+ setattr(new_instance, key, value)
# 对于 ChatOpenAI 还要更新 model_kwargs 字典
+ elif hasattr(new_instance, 'model_kwargs'):
+ new_instance.model_kwargs[key] = value
+ return (new_instance, config)
# 未指定可配置参数,直接返回默认实例和 config
+ return (self.default, config)
# 单条输入调用方法,支持动态配置
+ def invoke(self, input, config=None, **kwargs):
+ """
+ 调用 Runnable,支持动态配置
+ Args:
+ input: 输入值
+ config: 配置字典,可以包含 configurable 字段
+ **kwargs: 额外的关键字参数
+ Returns:
+ 底层 Runnable 的返回值
+ """
# 获取动态配置后的 runnable 实例和配置
+ runnable, merged_config = self._prepare(config)
# 若为 Runnable 实例则传递 config 参数
+ if isinstance(runnable, Runnable):
+ return runnable.invoke(input, config=merged_config, **kwargs)
+ else:
# 非 Runnable 实例直接调用(初始化时参数已生效)
+ return runnable.invoke(input, **kwargs)
# 批量输入调用方法,支持动态配置
+ def batch(self, inputs, config=None, **kwargs):
+ """
+ 批量调用 Runnable,支持动态配置
+ Args:
+ inputs: 输入值列表
+ config: 配置字典,可以包含 configurable 字段
+ **kwargs: 额外的关键字参数
+ Returns:
+ 输出值列表
+ """
# 获取动态配置后的 runnable 实例和配置
+ runnable, merged_config = self._prepare(config)
# 若为 Runnable 实例则传递 config 参数
+ if isinstance(runnable, Runnable):
+ return runnable.batch(inputs, config=merged_config, **kwargs)
+ else:
# 有 batch 方法就直接调用
+ if hasattr(runnable, 'batch'):
+ return runnable.batch(inputs, **kwargs)
+ else:
# 没有 batch 方法,逐个调用 invoke 实现
+ return [runnable.invoke(input_item, **kwargs) for input_item in inputs]
# 流式输入调用,支持动态配置
+ def stream(self, input, config=None, **kwargs):
+ """
+ 流式调用 Runnable,支持动态配置
+ Args:
+ input: 输入值
+ config: 配置字典,可以包含 configurable 字段
+ **kwargs: 额外的关键字参数
+ Yields:
+ 底层 Runnable 的流式输出
+ """
# 获取动态配置后的 runnable 实例和配置
+ runnable, merged_config = self._prepare(config)
# 若为 Runnable 实例则传递 config 参数
+ if isinstance(runnable, Runnable):
+ yield from runnable.stream(input, config=merged_config, **kwargs)
+ else:
# 有 stream 方法就直接调用
+ if hasattr(runnable, 'stream'):
+ yield from runnable.stream(input, **kwargs)
+ else:
# 没有流式方法则调用 invoke 并 yield 单值
+ result = runnable.invoke(input, **kwargs)
+ yield result
# 字符串表示方法,便于调试
+ def __repr__(self):
+ """返回对象的字符串表示"""
+ return f"RunnableConfigurableFields(default={self.default}, fields={self.fields})"
29.4 类 #
29.4.1 类说明 #
| 类名 | 作用 | 主要方法 | 说明 |
|---|---|---|---|
| ChatOpenAI | OpenAI 对话模型封装类 | __init__(), invoke(), configurable_fields() |
封装 OpenAI API 调用,支持通过 configurable_fields() 方法配置可动态调整的字段 |
| ConfigurableField | 可配置字段定义 | namedtuple (id, name, description, annotation, is_shared) |
用于定义可动态配置的参数字段,包含唯一标识符、显示名称和描述 |
| RunnableConfigurableFields | 可配置字段包装器 | __init__(), invoke(), batch(), stream(), _prepare() |
包装 Runnable 实例,支持在运行时通过 config 动态调整配置字段 |
| Runnable | 可运行对象抽象基类 | invoke(), stream(), batch() |
定义所有可运行组件的统一接口 |
| AIMessage | AI 消息类 | __init__(), __repr__() |
封装 AI 模型返回的消息内容 |
ConfigurableField(namedtuple)
| 字段 | 类型 | 必需 | 说明 |
|---|---|---|---|
id |
str |
是 | 字段的唯一标识符,在 config["configurable"] 中使用 |
name |
str |
否 | 字段的显示名称(可选) |
description |
str |
否 | 字段的描述(可选) |
annotation |
Any |
否 | 字段的类型注解(可选) |
is_shared |
bool |
否 | 字段是否共享(可选,默认 False) |
ChatOpenAI类
| 方法 | 参数 | 返回值 | 功能描述 |
|---|---|---|---|
__init__(model, **kwargs) |
model: 模型名称**kwargs: 其他参数(如 temperature) |
无 | 初始化 ChatOpenAI 实例,设置模型和参数 |
invoke(input, **kwargs) |
input: 输入内容**kwargs: 额外参数 |
AIMessage |
同步调用模型生成完整回复 |
configurable_fields(**fields) |
**fields: 可配置字段字典 |
RunnableConfigurableFields |
配置可动态调整的字段,返回包装后的实例 |
RunnableConfigurableFields类
| 方法 | 参数 | 返回值 | 功能描述 |
|---|---|---|---|
__init__(default, fields) |
default: 默认 Runnable 实例fields: 可配置字段字典 |
无 | 初始化包装器,保存默认实例和字段配置 |
invoke(input, config, **kwargs) |
input: 输入值config: 配置字典**kwargs: 额外参数 |
Any |
调用 Runnable,支持从 config 中读取动态配置 |
_prepare(config) |
config: 配置字典 |
tuple (Runnable, config) |
根据 config 动态创建新的 Runnable 实例 |
batch(inputs, config, **kwargs) |
inputs: 输入值列表config: 配置字典**kwargs: 额外参数 |
list |
批量调用,支持动态配置 |
stream(input, config, **kwargs) |
input: 输入值config: 配置字典**kwargs: 额外参数 |
生成器 | 流式调用,支持动态配置 |
29.4.2 类图 #
类关系说明
继承关系:
RunnableConfigurableFields继承自RunnableAIMessage继承自BaseMessage(未在图中显示)
组合关系:
RunnableConfigurableFields内部包含default(默认 Runnable 实例)和fields(字段配置字典)ChatOpenAI内部包含OpenAI客户端对象
依赖关系:
ChatOpenAI.configurable_fields()创建并返回RunnableConfigurableFields实例RunnableConfigurableFields包装ChatOpenAI实例RunnableConfigurableFields._prepare()使用ensure_config()规范化配置RunnableConfigurableFields使用ConfigurableField定义可配置字段
29.4.3 时序图 #
29.4.3.1 配置可配置字段流程 #
29.4.3.2 动态配置调用流程 #
29.4.4 调用过程 #
29.4.4.1 代码执行流程分析 #
阶段一:创建可配置的 LLM 实例(第 9-15 行)
llm = ChatOpenAI(model="gpt-4o", temperature=0).configurable_fields(
temperature=ConfigurableField(
id="temperature",
name="温度值",
description="LLM 的采样温度参数,控制输出的多样性"
)
)执行过程:
创建 ChatOpenAI 实例:
ChatOpenAI(model="gpt-4o", temperature=0) ↓ 初始化: - self.model = "gpt-4o" - self.model_kwargs = {"temperature": 0} - 创建 OpenAI 客户端 ↓ 返回: ChatOpenAI 实例创建 ConfigurableField:
ConfigurableField( id="temperature", name="温度值", description="LLM 的采样温度参数,控制输出的多样性" ) ↓ 创建 namedtuple 实例: - id = "temperature" - name = "温度值" - description = "LLM 的采样温度参数,控制输出的多样性" - annotation = None (默认) - is_shared = False (默认)调用 configurable_fields 方法:
.configurable_fields(temperature=ConfigurableField(...)) ↓ ChatOpenAI.configurable_fields(**fields) ↓ 导入 RunnableConfigurableFields ↓ 创建 RunnableConfigurableFields(default=self, fields={"temperature": ConfigurableField(...)}) ↓ 初始化: - self.default = ChatOpenAI 实例 - self.fields = {"temperature": ConfigurableField(...)} ↓ 返回: RunnableConfigurableFields 实例
阶段二:动态配置调用(第 20-22 行)
result3 = llm.invoke("你好,你怎么样?", config={"configurable": {"temperature": 1.0}})详细步骤:
调用 invoke 方法:
llm.invoke("你好,你怎么样?", config={"configurable": {"temperature": 1.0}}) ↓ RunnableConfigurableFields.invoke(input, config) ↓ 步骤 1: 调用 _prepare(config)_prepare 方法处理:
_prepare(config) ↓ 步骤 1: ensure_config(config) → 规范化配置字典 ↓ 步骤 2: configurable = config.get("configurable", {}) → {"temperature": 1.0} ↓ 步骤 3: 遍历 self.fields - field_name = "temperature" - field_spec = ConfigurableField(id="temperature", ...) ↓ 步骤 4: 从 configurable 获取值 - config_value = configurable.get("temperature") → 1.0 ↓ 步骤 5: 收集更新 - updates = {"temperature": 1.0} ↓ 步骤 6: 创建新实例(因为 updates 不为空) - default_class = ChatOpenAI - class_name = "ChatOpenAI" - 特殊处理 ChatOpenAI 类型 ↓ 步骤 7: 构造初始化参数 - init_params = { "model": self.default.model, # "gpt-4o" **self.default.model_kwargs, # {"temperature": 0} **updates # {"temperature": 1.0} ← 覆盖 } - 结果: {"model": "gpt-4o", "temperature": 1.0} ↓ 步骤 8: 创建新实例 - new_instance = ChatOpenAI(model="gpt-4o", temperature=1.0) ↓ 步骤 9: 返回 (new_instance, config)调用新实例的 invoke:
runnable.invoke(input, config=merged_config, **kwargs) ↓ ChatOpenAI.invoke("你好,你怎么样?", **kwargs) ↓ _convert_input("你好,你怎么样?") → [{"role": "user", "content": "你好,你怎么样?"}] ↓ 构造 params: { "model": "gpt-4o", "messages": [...], "temperature": 1.0 ← 动态配置的值 } ↓ 调用 OpenAI API ↓ 返回 AIMessage(content="...")
29.4.4.2 数据流转过程 #
完整数据流
用户输入 "你好,你怎么样?"
↓
RunnableConfigurableFields.invoke()
↓
_prepare(config) - 动态配置处理
↓
从 config["configurable"] 读取 temperature=1.0
↓
创建新的 ChatOpenAI 实例(temperature=1.0)
↓
新实例.invoke()
↓
OpenAI API 调用(使用 temperature=1.0)
↓
AIMessage 响应
↓
返回给用户配置处理流程
config = {"configurable": {"temperature": 1.0}}
↓
ensure_config(config) - 规范化
↓
提取 configurable = {"temperature": 1.0}
↓
遍历 fields:
- 检查 "temperature" 字段
- 从 configurable 获取值 1.0
- 添加到 updates = {"temperature": 1.0}
↓
创建新实例:
- 复制默认实例属性
- 应用 updates
- 创建 ChatOpenAI(temperature=1.0)
↓
返回新实例和配置29.4.4..3 关键设计模式 #
包装器模式:
RunnableConfigurableFields包装ChatOpenAI实例- 在不修改原实例的情况下,支持动态配置
工厂模式:
_prepare()方法根据配置动态创建新实例- 支持不同类型的 Runnable(ChatOpenAI、ChatDeepSeek 等)
策略模式:
- 对于不同类型的实例采用不同的创建策略
- ChatOpenAI 等特殊类型有专门的处理逻辑
命名元组模式:
ConfigurableField使用namedtuple定义- 提供结构化的字段定义,不可变且轻量
29.4.4..4 动态配置的工作原理 #
配置定义阶段:
- 使用
configurable_fields()定义哪些字段可以动态配置 - 每个字段用
ConfigurableField描述,包含 id、name、description
- 使用
运行时配置阶段:
- 通过
config={"configurable": {...}}传递动态配置 _prepare()方法读取配置并创建新实例
- 通过
实例创建策略:
- ChatOpenAI 等特殊类型:
- 从
model和model_kwargs构建参数 - 应用动态配置值
- 创建新实例
- 从
- 通用类型:
- 尝试从
__dict__获取属性 - 应用动态配置
- 创建新实例或深拷贝后设置属性
- 尝试从
- ChatOpenAI 等特殊类型:
29.4.4..5 使用场景 #
A/B 测试:
- 使用不同的 temperature 值测试模型输出
- 无需创建多个实例
参数调优:
- 在运行时动态调整模型参数
- 根据输入内容选择最佳参数
多租户场景:
- 不同用户使用不同的配置
- 在同一个实例上支持多种配置
实验性功能:
- 快速测试不同参数组合
- 无需重新初始化模型
30.configurable_alternatives #
configurable_alternatives机制让你可以根据 config 参数,在多种可运行组件(如不同 LLM 服务商、多模型、多算法分支等)之间动态切换,而无需重建对象。这种设计尤其适用于以下场景:
- 多模型实验/切换:如根据用户请求,代码层面动态“热切换”不同的大模型、API 提供方或推理服务(例如在 OpenAI 和 DeepSeek 之间切换)。
- 多算法/策略分支:一个流水线里支持多种分支,实现 like A/B test、灰度发布、或根据配置快速试验路径。
- 前端参数驱动切换:便于前端表单、配置 UI 实现“下拉框切换模型/算法”而无需后端重启或重新初始化。
用法
- 定义可切换分支
通过 .configurable_alternatives() 方法包裹在一个 Runnable、Chain 或其它组件上,声明一个 selector 字段(如 provider),指定所有可选分支和默认分支。示例:
from smartchain.chat_models import ChatOpenAI, ChatDeepSeek
from smartchain.runnables import ConfigurableField
llm = ChatOpenAI(model="gpt-4o-mini").configurable_alternatives(
ConfigurableField(
id="provider",
name="LLM 提供方",
description="在 OpenAI 与 DeepSeek 之间切换"
),
default_key="openai",
openai=ChatOpenAI(model="gpt-4o-mini"),
deepseek=ChatDeepSeek(model="deepseek-chat"),
)ConfigurableField(id="provider", ...)用于声明供外部注入的分支选择字段,对应config['configurable']['provider']。openai=...、deepseek=...则指定可切换的具体分支对象。default_key="openai"表示默认分支。
- 调用时动态选择分支
不传 config 时,默认走
openai分支:result = llm.invoke("你好,你是谁?")指定分支,仅需在 config 参数中设置(如通过前端 UI,下拉选择“deepseek”):
result = llm.invoke("你好,你是谁?", config={"configurable": {"provider": "deepseek"}})
分支选择的逻辑会自动路由到对应的对象,包括 .invoke()、.batch()、.stream() 等所有常用调用方式都支持。
- 可与流水线/链式调用深度集成
configurable_alternatives 机制可以嵌入到任意 Chain、Pipeline 等结构内部,作为其中一步,参数配置一路动态透传。例如:
from smartchain.runnables import RunnableLambda
# 多种处理分支
chain = RunnableLambda(lambda x: x).configurable_alternatives(
ConfigurableField(id="mode", name="模式"),
default_key="v1",
v1=RunnableLambda(lambda x: x.upper()),
v2=RunnableLambda(lambda x: f"***{x}***")
)
print(chain.invoke("hi", config={"configurable": {"mode": "v2"}})) # 输出: ***hi***- 分支成员支持任意可运行对象
分支既可以是标准 Runnable/Chain,也可以是具备 invoke、batch、stream 方法的自定义类。只要实现基本方法即可接入分支切换。
- 典型扩展场景
- 人群分流/灰度发布:根据用户 ID、A/B bucket 等请求上下文动态切分流量(如 config["configurable"]["bucket"])。
- 多模型对比实验:快速做多 LLM、模型 API 的准确率/风格等差异实验,甚至做 grid search。
- 模型供应商 fallback/弹性切换:首选 A,A 无法用自动切换 B,提高可用性。
- 注意
- 分支 key 没找到会抛出异常,请确保 alternatives 字典完整。
- 仅指定 selector id 即可,由外部 config 控制切换,不影响对象自身属性。
- 可与
configurable_fields联合深度定制参数注入和分支切换。 - selector 字段和 configurable_fields 字段建议避免重名。
30.1. 30.configurable_alternatives.py #
30.configurable_alternatives.py
# 从 smartchain.chat_models 导入 ChatOpenAI 和 ChatDeepSeek 聊天模型类
from smartchain.chat_models import ChatOpenAI, ChatDeepSeek
# 从 smartchain.runnables 导入 ConfigurableField,可用于声明可切换模型的配置字段
from smartchain.runnables import ConfigurableField
# 使用 configurable_alternatives 方法实现运行时灵活切换模型提供方
# 这里 ConfigurableField 的 id="provider" 表示后续通过 config["configurable"]["provider"] 控制选择
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0).configurable_alternatives(
ConfigurableField(
id="provider", # 用于切换提供方的参数 id
name="LLM 提供方", # 可选参数名(便于 UI 展示)
description="在 OpenAI 与 DeepSeek 之间切换" # 对该参数的描述
),
default_key="openai", # 未指定时默认选择 openai
openai=ChatOpenAI(model="gpt-4o-mini", temperature=0), # openai 提供方的模型实例
deepseek=ChatDeepSeek(model="deepseek-chat", temperature=0), # deepseek 提供方的模型实例
)
# 默认情况下,未指定 provider 时使用 openai
print("默认使用 openai:")
# 使用默认 provider(openai)调用模型,发送用户输入
result1 = llm.invoke("你好,你是谁?")
# 打印模型回复内容
print(result1.content)
# 通过传递 config["configurable"]["provider"]="deepseek" 切换为 deepseek 提供方
print("\n切换为 deepseek:")
result2 = llm.invoke("你好,你是谁?", config={"configurable": {"provider": "deepseek"}})
# 打印 deepseek 模型回复内容
print(result2.content)30.2. chat_models.py #
smartchain/chat_models.py
# 导入操作系统相关模块
import os
# 导入 openai 模块
import openai
# 从 .messages 模块导入 AIMessage、HumanMessage 和 SystemMessage 类
from .messages import AIMessage, HumanMessage, SystemMessage
from .prompts import ChatPromptValue
# 定义与 OpenAI 聊天模型交互的类
class ChatOpenAI:
# 初始化方法
def __init__(self, model: str = "gpt-4o", **kwargs):
# 初始化 ChatOpenAI 类
"""
初始化 ChatOpenAI
Args:
model: 模型名称,如 "gpt-4o"
**kwargs: 其他参数(如 temperature, max_tokens 等)
"""
# 设置模型名称
self.model = model
# 获取 api_key,优先从参数获取,否则从环境变量获取
self.api_key = kwargs.get("api_key") or os.getenv("OPENAI_API_KEY")
# 如果没有提供 api_key,则抛出异常
if not self.api_key:
raise ValueError("需要提供 api_key 或设置 OPENAI_API_KEY 环境变量")
# 保存除 api_key 之外的其他参数,用于 API 调用
self.model_kwargs = {k: v for k, v in kwargs.items() if k != "api_key"}
# 创建 OpenAI 客户端实例
self.client = openai.OpenAI(api_key=self.api_key)
# 调用模型生成回复的方法
def invoke(self, input, **kwargs):
# 调用模型生成回复
"""
调用模型生成回复
Args:
input: 输入内容,可以是字符串或消息列表
**kwargs: 额外的 API 参数
Returns:
AIMessage: AI 的回复消息
"""
# 将输入数据转换为消息格式
messages = self._convert_input(input)
# 构建 API 请求参数字典
params = {
"model": self.model,
"messages": messages,
**self.model_kwargs,
**kwargs
}
# 使用 OpenAI 客户端发起 chat.completions.create 调用获取回复
response = self.client.chat.completions.create(**params)
# 取出返回结果中的第一个选项
choice = response.choices[0]
# 获取消息内容
content = choice.message.content or ""
# 返回一个 AIMessage 对象
return AIMessage(content=content)
# 流式调用模型生成回复的方法
def stream(self, input, **kwargs):
# 流式调用模型生成回复
"""
流式调用模型生成回复
Args:
input: 输入内容,可以是字符串或消息列表
**kwargs: 额外的 API 参数
Yields:
AIMessage: AI 的回复消息块(每次产生部分内容)
"""
# 将输入数据转换为消息格式
messages = self._convert_input(input)
# 构建 API 请求参数字典,启用流式输出
params = {
"model": self.model,
"messages": messages,
"stream": True, # 启用流式输出
**self.model_kwargs,
**kwargs
}
# 使用 OpenAI 客户端发起流式调用
stream = self.client.chat.completions.create(**params)
# 迭代流式响应
for chunk in stream:
# 检查是否有内容增量
if chunk.choices and len(chunk.choices) > 0:
delta = chunk.choices[0].delta
# 检查 delta 中是否有 content,如果有则发送
if hasattr(delta, 'content') and delta.content:
# 产生包含部分内容的 AIMessage
yield AIMessage(content=delta.content)
# 内部方法,将输入转换为 OpenAI API 需要的消息格式
def _convert_input(self, input):
# 将输入转换为 OpenAI API 需要的消息格式
"""
将输入转换为 OpenAI API 需要的消息格式
Args:
input: 字符串、消息列表或 ChatPromptValue
Returns:
list[dict]: OpenAI API 格式的消息列表
"""
if isinstance(input, ChatPromptValue):
input = input.to_messages()
# 输入为字符串时,直接封装为用户角色消息
if isinstance(input, str):
return [{"role": "user", "content": input}]
# 如果输入是列表类型
elif isinstance(input, list):
# 新建一个空的消息列表
messages = []
# 遍历输入列表中的每一个元素
for msg in input:
# 判断是否为字符串,是则作为用户消息加入
if isinstance(msg, str):
messages.append({"role": "user", "content": msg})
# 判断是否为 HumanMessage、AIMessage 或 SystemMessage 实例
elif isinstance(msg, (HumanMessage, AIMessage, SystemMessage)):
# 如果是 HumanMessage,将角色设为 user
if isinstance(msg, HumanMessage):
role = "user"
# 如果是 AIMessage,将角色设为 assistant
elif isinstance(msg, AIMessage):
role = "assistant"
# 如果是 SystemMessage,将角色设为 system
elif isinstance(msg, SystemMessage):
role = "system"
# 获取消息内容(有 content 属性则取 content,否则转为字符串)
content = msg.content if hasattr(msg, "content") else str(msg)
# 将角色和内容添加到消息列表
messages.append({"role": role, "content": content})
# 如果元素本身为字典,直接添加进消息列表
elif isinstance(msg, dict):
# 直接添加字典类型的消息
messages.append(msg)
# 如果元素为长度为 2 的元组,将其解包为 role 和 content
elif isinstance(msg, tuple) and len(msg) == 2:
# 将元组解包为 role 和 content
role, content = msg
# 将角色和内容添加到消息列表
messages.append({"role": role, "content": content})
# 返回构建好的消息列表
return messages
else:
# 其他输入类型,转为字符串作为 user 消息
return [{"role": "user", "content": str(input)}]
# 定义可配置字段的方法,用于包装当前实例,支持部分参数运行时动态调整
def configurable_fields(self, **fields):
# 配置可动态调整的字段
"""
配置可动态调整的字段
Args:
**fields: 可配置字段的字典,键为字段名,值为 ConfigurableField 实例
Returns:
RunnableConfigurableFields: 包装后的 Runnable 实例
示例:
``python
from smartchain.runnables import ConfigurableField
llm = ChatOpenAI(temperature=0).configurable_fields(
temperature=ConfigurableField(
id="temperature",
name="温度值",
description="LLM 的采样温度参数"
)
)
# 使用默认 temperature=0
result1 = llm.invoke("你好")
# 使用 temperature=1.0
result2 = llm.invoke("你好", config={"configurable": {"temperature": 1.0}})
``
"""
# 从当前目录导入 RunnableConfigurableFields 类
from .runnables import RunnableConfigurableFields
# 返回将当前实例 self 及配置字段 fields 包装后的新对象
return RunnableConfigurableFields(default=self, fields=fields)
# 定义方法,用于根据 selector_field 和 config 动态切换不同 runnable 分支
+ def configurable_alternatives(self, selector_field, *, default_key, **alternatives):
# """
# 配置可替代的 Runnable 选项,根据 config["configurable"] 动态切换
#
# Args:
# selector_field: ConfigurableField,定义选择键的 id/name/description
# default_key: 默认使用的分支 key(必须存在于 alternatives 中)
# **alternatives: key -> runnable 或具有 invoke 方法的对象
#
# Returns:
# RunnableConfigurableAlternatives: 包装后的 Runnable 实例
#
# 示例:
# from smartchain.runnables import ConfigurableField
# from smartchain.chat_models import ChatOpenAI, ChatDeepSeek
#
# llm = ChatOpenAI().configurable_alternatives(
# ConfigurableField(
# id="provider",
# name="LLM 提供方",
# description="在 OpenAI 与 DeepSeek 之间切换"
# ),
# default_key="openai",
# openai=ChatOpenAI(temperature=0),
# deepseek=ChatDeepSeek(temperature=0),
# )
#
# # 默认使用 openai
# result1 = llm.invoke("你好")
#
# # 切换为 deepseek
# result2 = llm.invoke("你好", config={"configurable": {"provider": "deepseek"}})
# """
# 从 .runnables 模块导入 RunnableConfigurableAlternatives 类
+ from .runnables import RunnableConfigurableAlternatives
# 返回 RunnableConfigurableAlternatives 实例,实现 runtime 动态分支切换
+ return RunnableConfigurableAlternatives(
+ selector_field=selector_field, # 用于选择分支的字段信息
+ default_key=default_key, # 默认分支 key
+ alternatives=alternatives, # 所有可供切换的分支字典
+ )
# 定义与 DeepSeek 聊天模型交互的类
class ChatDeepSeek:
# 初始化方法
# model: 模型名称,默认为 "deepseek-chat"
# **kwargs: 其他可选参数(如 temperature, max_tokens 等)
def __init__(self, model: str = "deepseek-chat", **kwargs):
"""
初始化 ChatDeepSeek
Args:
model: 模型名称,如 "deepseek-chat"
**kwargs: 其他参数(如 temperature, max_tokens 等)
"""
# 设置模型名称
self.model = model
# 获取 api_key,优先从参数获取,否则从环境变量获取
self.api_key = kwargs.get("api_key") or os.getenv("DEEPSEEK_API_KEY")
# 如果没有提供 api_key,则抛出异常
if not self.api_key:
raise ValueError("需要提供 api_key 或设置 DEEPSEEK_API_KEY 环境变量")
# 保存除 api_key 之外的其他参数,用于 API 调用
self.model_kwargs = {k: v for k, v in kwargs.items() if k != "api_key"}
# 获取 DeepSeek 的 base_url,默认为官方地址
base_url = kwargs.get("base_url", "https://api.deepseek.com/v1")
# 创建 OpenAI 兼容的客户端实例(DeepSeek 使用 OpenAI 兼容的 API)
self.client = openai.OpenAI(api_key=self.api_key, base_url=base_url)
# 调用模型生成回复的方法
# input: 输入内容,可以是字符串或消息列表
# **kwargs: 额外的 API 参数
def invoke(self, input, **kwargs):
"""
调用模型生成回复
Args:
input: 输入内容,可以是字符串或消息列表
**kwargs: 额外的 API 参数
Returns:
AIMessage: AI 的回复消息
"""
# 将输入数据转换为消息格式
messages = self._convert_input(input)
# 构建 API 请求参数字典
params = {
"model": self.model,
"messages": messages,
**self.model_kwargs,
**kwargs
}
# 使用 OpenAI 兼容的客户端发起 chat.completions.create 调用获取回复
response = self.client.chat.completions.create(**params)
# 取出返回结果中的第一个选项
choice = response.choices[0]
# 获取消息内容
content = choice.message.content or ""
# 返回一个 AIMessage 对象
return AIMessage(content=content)
# 内部方法,将输入转换为 API 需要的消息格式
# input: 字符串、消息列表或 ChatPromptValue
def _convert_input(self, input):
"""
将输入转换为 API 需要的消息格式
Args:
input: 字符串、消息列表或 ChatPromptValue
Returns:
list[dict]: API 格式的消息列表
"""
# 如果输入是字符串,直接作为用户消息
if isinstance(input, str):
return [{"role": "user", "content": input}]
else:
# 其他输入类型,转为字符串作为 user 消息
return [{"role": "user", "content": str(input)}]
# 定义与通义千问(Tongyi)聊天模型交互的类
class ChatTongyi:
# 初始化方法
# 初始化方法,设置模型名称和 API 相关参数
def __init__(self, model: str = "qwen-max", **kwargs):
"""
初始化 ChatTongyi
Args:
model: 模型名称,如 "qwen-max"
**kwargs: 其他参数(如 temperature, max_tokens 等)
"""
# 设置模型名称
self.model = model
# 获取 api_key,优先从参数获取,否则从环境变量获取
self.api_key = kwargs.get("api_key") or os.getenv("DASHSCOPE_API_KEY")
# 如果没有提供 api_key,则抛出异常
if not self.api_key:
raise ValueError("需要提供 api_key 或设置 DASHSCOPE_API_KEY 环境变量")
# 保存除 api_key 之外的其他参数,用于 API 调用
self.model_kwargs = {k: v for k, v in kwargs.items() if k != "api_key"}
# 获取通义千问的 API base URL(使用 OpenAI 兼容模式),如果未指定则使用默认值
base_url = kwargs.get("base_url", "https://dashscope.aliyuncs.com/compatible-mode/v1")
# 创建 OpenAI 兼容的客户端实例(通义千问使用 OpenAI 兼容的 API)
self.client = openai.OpenAI(api_key=self.api_key, base_url=base_url)
# 调用模型生成回复的方法
# 调用模型生成回复,返回 AIMessage 对象
def invoke(self, input, **kwargs):
"""
调用模型生成回复
Args:
input: 输入内容,可以是字符串或消息列表
**kwargs: 额外的 API 参数
Returns:
AIMessage: AI 的回复消息
"""
# 将输入数据转换为消息格式
messages = self._convert_input(input)
# 构建 API 请求参数字典,包含模型名、消息内容和其他参数
params = {
"model": self.model,
"messages": messages,
**self.model_kwargs,
**kwargs
}
# 使用 OpenAI 兼容的客户端发起 chat.completions.create 调用以获取回复
response = self.client.chat.completions.create(**params)
# 取出返回结果中的第一个回复选项
choice = response.choices[0]
# 获取回复的消息内容,如果内容不存在则返回空字符串
content = choice.message.content or ""
# 构建并返回一个 AIMessage 对象
return AIMessage(content=content)
# 内部方法,将输入转换为 API 需要的消息格式
# 支持字符串、消息列表等输入,统一包装为 OpenAI API 格式
def _convert_input(self, input):
"""
将输入转换为 API 需要的消息格式
Args:
input: 字符串、消息列表或 ChatPromptValue
Returns:
list[dict]: API 格式的消息列表
"""
# 如果输入是字符串,直接包装为“用户”角色的消息
if isinstance(input, str):
return [{"role": "user", "content": input}]
else:
# 其他输入类型,转换为字符串作为“用户”消息内容
return [{"role": "user", "content": str(input)}] 30.3. runnables.py #
smartchain/runnables.py
# 导入抽象基类 (ABC: 抽象基类基类,abstractmethod: 用于定义抽象方法)
from abc import ABC, abstractmethod
import time
import random
import inspect
import uuid as uuid_module
from .config import ensure_config,_accept_config
# 定义 Runnable 抽象基类,所有可运行单元必须继承它
class Runnable(ABC):
"""
Runnable 抽象基类
所有可运行组件的基础接口,定义了统一的调用方法。
"""
# 抽象方法,子类必须实现,用于同步调用
@abstractmethod
def invoke(self, input, config = None, **kwargs):
"""
同步调用 Runnable
Args:
input: 输入值
config: 可选的配置字典
**kwargs: 额外的关键字参数
Returns:
输出值
"""
pass # 仅做接口规范,子类务必实现
def stream(self, input, config = None, **kwargs):
"""
流式调用 Runnable
默认实现:先调用 invoke,若返回可迭代且不是字符串/字节/字典,则逐项 yield;
否则直接 yield 单值。
"""
result = self.invoke(input, config=config, **kwargs)
# 字符串/字节/字典不视为流式可迭代,直接返回单值
if hasattr(result, "__iter__") and not isinstance(result, (str, bytes, dict)):
for item in result:
yield item
else:
yield result
# 定义可配置替代分支选择器方法,通过 config["configurable"][field.id] 动态切换分支
+ def configurable_alternatives(self, selector_field, *, default_key, **alternatives):
+ """
+ 根据 config["configurable"] 中的选择键,动态切换不同的 Runnable/对象。
+ Args:
+ selector_field: ConfigurableField,定义选择键的 id/name/description
+ default_key: 默认使用的分支 key(必须存在于 alternatives 中)
+ **alternatives: key -> runnable 或具有 invoke 方法的对象
+ Returns:
+ RunnableConfigurableAlternatives 包装对象
+ """
# 从当前模块导入 ConfigurableField 和 RunnableConfigurableAlternatives
+ from .runnables import ConfigurableField, RunnableConfigurableAlternatives
# 判断 selector_field 是否为 ConfigurableField 的实例
+ if not isinstance(selector_field, ConfigurableField):
# 如果不是则抛出类型错误
+ raise TypeError("selector_field 必须是 ConfigurableField 实例")
# 检查默认分支 key 是否包含在 alternatives 中
+ if default_key not in alternatives:
# 如果不包含则抛出值错误
+ raise ValueError("default_key 必须存在于 alternatives 中")
# 返回一个 RunnableConfigurableAlternatives 实例,实现动态分支选择
+ return RunnableConfigurableAlternatives(
+ selector_field=selector_field,
+ default_key=default_key,
+ alternatives=alternatives,
+ )
# 管道操作符,便于链式拼接
def __or__(self, other):
if not isinstance(other, Runnable):
raise TypeError("管道右侧必须是 Runnable 实例")
return RunnableSequence([self, other])
# 定义批量调用方法,默认实现为遍历输入逐个调用 invoke
def batch(self, inputs, config = None, **kwargs):
"""
批量调用 Runnable
Args:
inputs: 输入值列表
config: 可选的配置字典
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 对每个输入项都调用 invoke,并收集结果
return [self.invoke(input_item, config=config, **kwargs) for input_item in inputs]
# 添加重试功能,返回包装了重试逻辑的 Runnable
# 定义 with_retry 方法,为当前 Runnable 添加重试机制
def with_retry(
self,
*,
retry_if_exception_type=(Exception,), # 指定需要重试的异常类型,默认所有 Exception
stop_after_attempt=3, # 最大尝试次数,默认3次
wait_exponential_jitter=True, # 是否启用指数退避抖动
exponential_jitter_params=None, # 抖动参数字典,支持 initial/max/exp_base/jitter
):
"""
创建带重试功能的 Runnable 包装器
Args:
retry_if_exception_type: 需要重试的异常类型元组
stop_after_attempt: 最大尝试次数
wait_exponential_jitter: 是否启用指数回退抖动
exponential_jitter_params: 抖动参数,支持 initial/max/exp_base/jitter
Returns:
包装了重试逻辑的 RunnableRetry 实例
"""
# 返回带重试功能的 RunnableRetry 实例,绑定当前 runnable 和重试参数
return RunnableRetry(
bound=self,
retry_if_exception_type=retry_if_exception_type,
stop_after_attempt=stop_after_attempt,
wait_exponential_jitter=wait_exponential_jitter,
exponential_jitter_params=exponential_jitter_params,
)
def with_config(self, config=None, **kwargs):
"""
绑定配置到 Runnable,返回一个新的 Runnable
Args:
config: 要绑定的配置字典
**kwargs: 额外的关键字参数,会合并到 config 中
Returns:
一个新的 RunnableBinding 实例,包含绑定的配置
"""
# 合并 config 和 kwargs
merged_config = {}
if config:
merged_config.update(config)
if kwargs:
merged_config.update(kwargs)
# 返回 RunnableBinding 实例
return RunnableBinding(bound=self, config=merged_config)
# 定义 RunnableLambda 类,用于将普通 Python 函数封装为 Runnable 对象
class RunnableLambda(Runnable):
"""
RunnableLambda 将普通 Python 函数包装成 Runnable
这使得普通函数可以在链式调用中使用,并支持统一的 invoke 接口。
示例:
``python
def add_one(x: int) -> int:
return x + 1
runnable = RunnableLambda(add_one)
result = runnable.invoke(5) # 返回 6
results = runnable.batch([1, 2, 3]) # 返回 [2, 3, 4]
``
"""
# 初始化方法,接收一个函数和可选的名称
def __init__(self, func, name=None):
"""
初始化 RunnableLambda
Args:
func: 要包装的函数
name: Runnable 的名称(可选,默认使用函数名)
"""
# 检查传入的 func 是否为可调用对象
if not callable(func):
raise TypeError(f"func 必须是可调用对象,但得到了 {type(func)}")
# 保存待封装的函数
self.func = func
# 如果 name 明确传入则使用,否则合理推断
if name is not None:
self.name = name
else:
try:
# 尽量用函数原名,如果是 lambda 就命名为 "lambda"
self.name = func.__name__ if func.__name__ != "<lambda>" else "lambda"
except AttributeError:
# 对于匿名对象无法获取 __name__ 时兜底
self.name = "runnable"
# 实现 invoke 方法,对被封装的底层函数进行同步调用
def invoke(self, input, config = None, **kwargs):
"""
调用包装的函数
Args:
input: 输入值
config: 可选的配置字典
**kwargs: 额外的关键字参数(会传递给函数)
Returns:
函数的返回值
"""
# 保证 config 不为 None,如为 None 则转为空字典
config = ensure_config(config)
# 从配置字典中获取回调对象 callbacks
callbacks = config.get("callbacks")
# 初始化回调对象列表
callback_list = []
# 获取当前调用的唯一 ID(run_id)
run_id = config.get("run_id")
# 如果没有传入 run_id,则自动生成一个新的 uuid
if run_id is None:
run_id = uuid_module.uuid4()
# 如果 callbacks 不为空
if callbacks:
# 如果 callbacks 已经是列表,则直接用,否则转为单元素列表
if isinstance(callbacks, list):
callback_list = callbacks
else:
callback_list = [callbacks]
# 构造序列化信息,用于回调上报链条标识
serialized = {"name": self.name, "type": "RunnableLambda"}
# 遍历每个回调对象,触发其 on_chain_start 方法
for callback in callback_list:
# 只有回调对象有 on_chain_start 属性才调用
if hasattr(callback, "on_chain_start"):
try:
# 调用回调的 on_chain_start 方法,传入相关参数
callback.on_chain_start(
serialized=serialized,
inputs={"input": input},
run_id=run_id,
parent_run_id=None,
tags=config.get("tags"),
metadata=config.get("metadata"),
**kwargs
)
except Exception:
# 回调过程中如出现异常则忽略,确保主流程不会终止
pass
# 检查被包装的函数是否接收 config 参数
if _accept_config(self.func):
# 如果接收 config,则将 config 传递下去
kwargs["config"] = config
# 尝试执行被包装的核心函数
try:
# 正常调用被包装的函数,将 input 作为第一个参数,kwargs作为关键字参数
output = self.func(input, **kwargs)
except Exception as e:
# 若捕获到异常,则对所有回调触发 on_chain_error 并继续抛出异常
if callback_list:
for callback in callback_list:
if hasattr(callback, "on_chain_error"):
try:
callback.on_chain_error(
error=e,
run_id=run_id,
parent_run_id=None,
**kwargs
)
except Exception:
# 回调异常不影响主异常继续抛出
pass
# 重新抛出主流程中的异常
raise
else:
# 如果没有异常执行,顺序触发所有回调的 on_chain_end 方法
if callback_list:
for callback in callback_list:
if hasattr(callback, "on_chain_end"):
try:
callback.on_chain_end(
outputs={"output": output},
run_id=run_id,
parent_run_id=None,
**kwargs
)
except Exception:
# 回调异常不影响主逻辑输出
pass
# 返回包装函数的输出结果
return output
# 批量调用内部依然调用 invoke,保证与 Runnable 基类一致
def batch(self, inputs, config = None, **kwargs):
"""
批量调用包装的函数
Args:
inputs: 输入值列表
config: 可选的配置字典
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 调用 invoke 实现批量处理
return [self.invoke(input_item, config=config, **kwargs) for input_item in inputs]
# 流式调用:直接复用基类的流式封装
def stream(self, input, **kwargs):
"""
流式调用包装的函数
对返回生成器/可迭代对象逐项 yield;若是单值则直接 yield。
"""
yield from super().stream(input, **kwargs)
# 返回对象自身的字符串表达,便于排查与日志
def __repr__(self):
"""返回 RunnableLambda 的字符串表示"""
return f"RunnableLambda(func={self.name})"
# 定义 RunnableParallel,继承自 Runnable
class RunnableParallel(Runnable):
"""
并行执行多个 Runnable,返回字典结果。
使用示例:
parallel = RunnableParallel(a=r1, b=r2)
result = parallel.invoke(input) # {"a": ..., "b": ...}
"""
# 构造方法,接收若干个可运行对象作为关键字参数
def __init__(self, **runnables):
# 如果未传递任何 runnable,则报错
if not runnables:
raise ValueError("至少需要一个 runnable")
# 检查每个传入的值是否为 Runnable 实例
for name, r in runnables.items():
if not isinstance(r, Runnable):
raise TypeError(f"键 {name} 的值必须是 Runnable 实例")
# 保存所有传入的 runnable 到实例属性
self.runnables = runnables
# 同步调用,将相同输入传递给所有子 runnable,并收集结果为字典
def invoke(self, input, config = None, **kwargs):
"""
同一输入传给所有子 runnable,收集结果为字典。
"""
# 遍历每个 runnable,调用其 invoke,结果收集为 {name: 返回值}
return {name: r.invoke(input, config=config, **kwargs) for name, r in self.runnables.items()}
# 批量调用,对输入列表每一项都运行 invoke,返回结果字典的列表
def batch(self, inputs, config = None, **kwargs):
"""
对输入列表逐项并行处理,返回字典列表。
"""
# 对每个输入元素调用 invoke,收集所有结果
return [self.invoke(item, config=config, **kwargs) for item in inputs]
# 流式调用,直接调用父类的流式实现
def stream(self, input, **kwargs):
"""
对单次输入执行并返回一个字典,流式单次产出。
"""
# 复用基类的 stream 方法
yield from super().stream(input, **kwargs)
# 返回对象的字符串表示(列出包含的所有子 runnable 的键名)
def __repr__(self):
# 拼接所有 runnable 的键名
keys = ", ".join(self.runnables.keys())
# 返回格式化字符串
return f"RunnableParallel({keys})"
# 定义RunnableBranch类,继承自Runnable,用于条件分支执行不同runnable
class RunnableBranch(Runnable):
"""
条件分支执行:按顺序检查条件,匹配则运行对应 runnable,若都不匹配则走默认分支。
"""
# 构造方法,接受若干分支参数
def __init__(self, *branches):
"""
支持“默认分支作为最后一个位置参数”的用法:
RunnableBranch((cond1, r1), (cond2, r2), default_runnable)
"""
# 分支数量必须至少2(至少一个条件+一个默认)
if len(branches) < 2:
raise ValueError("至少需要一个条件分支和一个默认分支")
# 将分支参数转为列表
branches_list = list(branches)
# 最后一个参数视为默认分支
default = branches_list.pop() # 最后一个位置参数为默认分支
# 校验每个分支
validated_branches = []
for item in branches_list:
# 每个分支需为二元组或二元列表
if not (isinstance(item, (tuple, list)) and len(item) == 2):
raise TypeError("分支必须是 (condition, runnable) 形式的二元组")
# 解包条件函数和runnable
cond, runnable = item
# 条件必须为可调用对象
if not callable(cond):
raise TypeError("分支条件必须是可调用对象")
# runnable必须是Runnable实例
if not isinstance(runnable, Runnable):
raise TypeError("分支 runnable 必须是 Runnable 实例")
# 校验通过则加入分支列表
validated_branches.append((cond, runnable))
# 校验默认分支必须为Runnable实例
if not isinstance(default, Runnable):
raise TypeError("默认分支必须是 Runnable 实例")
# 保存所有条件分支
self.branches = validated_branches
# 保存默认分支
self.default = default
# 单个输入同步调用方法
def invoke(self, input, config = None, **kwargs):
"""
按顺序匹配条件,命中即执行对应 runnable;否则走默认分支。
"""
# 遍历所有分支,遇到条件命中则执行对应runnable
for cond, runnable in self.branches:
if cond(input):
return runnable.invoke(input, config=config, **kwargs)
# 如果有默认分支则执行默认runnable
if self.default is not None:
return self.default.invoke(input, config=config, **kwargs)
# 无匹配分支时报错
raise ValueError("未匹配到任何分支,且未提供默认分支")
# 批量调用,遍历输入批量执行invoke
def batch(self, inputs, config = None, **kwargs):
# 对输入列表逐一执行invoke
return [self.invoke(item, config=config, **kwargs) for item in inputs]
# 流式调用,直接调用父类的stream方法
def stream(self, input, **kwargs):
# 复用父类的流式实现
yield from super().stream(input, **kwargs)
# 返回对象简洁字符串表示
def __repr__(self):
# 拼接分支编号
parts = [f"branch{idx}" for idx, _ in enumerate(self.branches)]
# 若有默认分支则拼接default字符串
if self.default:
parts.append("default")
# 格式化输出
return f"RunnableBranch({', '.join(parts)})"
class RunnablePassthrough(Runnable):
"""
直通型 Runnable:原样返回输入,不做任何处理。
可用于调试或需要保留原始输入的场景。
"""
def invoke(self, input, config = None, **kwargs):
return input
def batch(self, inputs, config = None, **kwargs):
return list(inputs)
def stream(self, input, **kwargs):
# 复用基类流式封装(对单值直接 yield)
yield from super().stream(input, **kwargs)
def __repr__(self):
return "RunnablePassthrough()"
# 定义 RunnableSequence 类,用于实现可运行对象的链式组合(A | B | C 的效果)
class RunnableSequence(Runnable):
"""
Runnable 组合序列,用于支持 A | B | C 的链式拼接。
"""
# 初始化方法,接收一个 Runnable 对象的列表
def __init__(self, runnables):
# 检查传入的 runnables 列表不能为空
if not runnables:
raise ValueError("runnables 不能为空")
# 校验每一个元素都必须是 Runnable 实例
for r in runnables:
if not isinstance(r, Runnable):
raise TypeError("runnables 需全部为 Runnable 实例")
# 保存连成链的 runnable 组件
self.runnables = runnables
# 实现管道操作符 |,使链式拼接成立
def __or__(self, other):
# 右侧对象必须也是 Runnable 实例
if not isinstance(other, Runnable):
raise TypeError("管道右侧必须是 Runnable 实例")
# 返回新的组合链(原有链 + 新加的 runnable)
return RunnableSequence(self.runnables + [other])
# 调用链的同步调用,将输入依次传过所有组件
def invoke(self, input, config = None, **kwargs):
"""
逐个执行链条:上一步输出作为下一步输入。
"""
# 确保 config 存在
config = ensure_config(config)
# 处理回调:如果有 callbacks,则触发链的开始回调
callbacks = config.get("callbacks")
# 初始化回调列表
callback_list = []
# 获取 run_id
run_id = config.get("run_id")
# 如果 run_id 为 None,则生成一个新的 uuid
if run_id is None:
run_id = uuid_module.uuid4()
# 如果 callbacks 不为空
if callbacks:
# 如果 callbacks 是列表,则直接赋值给 callback_list
if isinstance(callbacks, list):
callback_list = callbacks
# 如果 callbacks 不是列表,则转换为单元素列表
else:
callback_list = [callbacks]
# 序列化信息,用于回调上报链条标识
serialized = {"name": "RunnableSequence", "type": "chain"}
# 遍历每个回调对象,触发其 on_chain_start 方法
for callback in callback_list:
# 只有回调对象有 on_chain_start 属性才调用
if hasattr(callback, "on_chain_start"):
# 调用回调的 on_chain_start 方法,传入相关参数
try:
# 调用回调的 on_chain_start 方法,传入相关参数
callback.on_chain_start(serialized, {"input": input}, run_id=run_id, parent_run_id=None, tags=config.get("tags"), metadata=config.get("metadata"), **kwargs)
except Exception:
# 回调过程中如出现异常则忽略,确保主流程不会终止
pass
# 初始 value 为输入 input
value = input
try:
# 依次调用每个 runnable 的 invoke,并传递最新的 value
for runnable in self.runnables:
value = runnable.invoke(value, config=config, **kwargs)
except Exception as e:
# 若捕获到异常,则对所有回调触发 on_chain_error 并继续抛出异常
if callback_list:
for callback in callback_list:
# 只有回调对象有 on_chain_error 属性才调用
if hasattr(callback, "on_chain_error"):
try:
# 调用回调的 on_chain_error 方法,传入相关参数
callback.on_chain_error(e, run_id=run_id, parent_run_id=None, **kwargs)
except Exception:
# 回调过程中如出现异常则忽略,确保主流程不会终止
pass
raise
else:
# 如果没有异常执行,顺序触发所有回调的 on_chain_end 方法
if callback_list:
for callback in callback_list:
# 只有回调对象有 on_chain_end 属性才调用
if hasattr(callback, "on_chain_end"):
try:
# 调用回调的 on_chain_end 方法,传入相关参数
callback.on_chain_end(outputs={"output": value}, run_id=run_id, parent_run_id=None, **kwargs)
except Exception:
# 回调过程中如出现异常则忽略,确保主流程不会终止
pass
# 返回最后一步的输出值
return value
# 批量调用,输入为多个 input,结果为每个 input 执行完整链条的输出
def batch(self, inputs, config = None, **kwargs):
"""
对输入列表逐项执行同一条链。
"""
# 逐项调用 invoke,收集所有输出
return [self.invoke(item, config=config, **kwargs) for item in inputs]
# 流式调用,默认复用基类逻辑(只对链最终结果流式分发)
def stream(self, input, **kwargs):
"""
流式执行:沿用基类逻辑,对最终结果做流式分发。
"""
# 使用基类 stream
yield from super().stream(input, **kwargs)
# 定义字符串表示,便于调试,输出链路结构
def __repr__(self):
# 获取每个 runnable 的名字,用"|"拼接成描述
names = " | ".join(getattr(r, "name", r.__class__.__name__) for r in self.runnables)
# 返回自定义格式
return f"RunnableSequence({names})"
# 定义 RunnableRetry 类,用于包装 Runnable 并添加重试逻辑
class RunnableRetry(Runnable):
"""
带重试功能的 Runnable 包装器
当底层 runnable 抛出指定异常时,会自动重试指定次数。
"""
# 初始化方法,接受被包装的 runnable 以及重试参数
def __init__(
self,
bound,
retry_if_exception_type=(Exception,),
stop_after_attempt=3,
wait_exponential_jitter=True,
exponential_jitter_params=None,
):
"""
初始化 RunnableRetry
Args:
bound: 被包装的 Runnable 对象
retry_if_exception_type: 需要重试的异常类型元组
stop_after_attempt: 最大尝试次数
wait_exponential_jitter: 是否启用指数回退抖动
exponential_jitter_params: 抖动参数 initial/max/exp_base/jitter
"""
# 保存底层被包装的 Runnable
self.bound = bound
# 保存需要重试的异常类型
self.retry_if_exception_type = retry_if_exception_type
# 保存最大尝试次数
self.stop_after_attempt = stop_after_attempt
# 保存是否启用指数回退抖动
self.wait_exponential_jitter = wait_exponential_jitter
# 保存指数回退相关参数(若为 None 则用空字典兜底)
self.exponential_jitter_params = exponential_jitter_params or {}
# 实现同步调用(自动重试机制)
def invoke(self, input, config = None, **kwargs):
"""
调用底层 runnable,失败时自动重试
"""
# 用于记录最后一次抛出的异常
last_exception = None
# 解析重试等待的各项参数
initial = self.exponential_jitter_params.get("initial", 0.1) # 初始延迟
max_wait = self.exponential_jitter_params.get("max", 10.0) # 最大延迟
exp_base = self.exponential_jitter_params.get("exp_base", 2.0) # 幂指数基数
jitter = self.exponential_jitter_params.get("jitter", 0.0) # 抖动范围
# 尝试多次调用,直到最大次数
for attempt in range(1, self.stop_after_attempt + 1):
try:
# 调用底层的 invoke 方法
return self.bound.invoke(input, config=config, **kwargs)
# 捕获需要重试的异常类型
except self.retry_if_exception_type as e:
# 保存本次捕获的异常
last_exception = e
# 若还没到最大次数,可以重试
if attempt < self.stop_after_attempt:
# 判断是否使用指数回退
if self.wait_exponential_jitter:
# 计算当前次的延迟
delay = min(max_wait, initial * (exp_base ** (attempt - 1)))
# 如果配置了 jitter,叠加一个随机抖动
if jitter > 0:
delay += random.uniform(0, jitter)
else:
# 不指数回退则用 initial 固定延迟
delay = initial
# 等待指定时间再重试
time.sleep(delay)
else:
# 达到最大次数仍然失败则抛出最后一次异常
raise last_exception
except Exception:
# 如果是完全不在重试范围的异常,直接抛出
raise
# 如果所有尝试都失败,最终抛出异常
raise last_exception
# 实现批量调用,每个输入独立重试
def batch(self, inputs, config = None, **kwargs):
"""
批量调用,每个输入独立重试
"""
# 对每个输入都单独执行 invoke,收集结果为列表
return [self.invoke(item, config=config, **kwargs) for item in inputs]
# 实现流式调用,直接复用基类逻辑
def stream(self, input, **kwargs):
"""
流式调用,复用基类实现
"""
# 使用父类的 stream,yield 结果
yield from super().stream(input, **kwargs)
# 返回自身字符串表示,便于调试查看 retry 配置与绑定对象
def __repr__(self):
return f"RunnableRetry(bound={self.bound}, max_attempts={self.stop_after_attempt})"
# 工具函数:合并配置字典
def _merge_configs(*configs):
"""
合并多个配置字典
Args:
*configs: 要合并的配置字典列表
Returns:
合并后的配置字典
"""
result = {}
for config in configs:
if config:
# 对于嵌套字典(如 metadata),需要深度合并
for key, value in config.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = {**result[key], **value}
else:
result[key] = value
return result
# 定义 RunnableBinding 类,用于包装 Runnable 并绑定配置
class RunnableBinding(Runnable):
"""
Runnable 绑定包装器
用于将配置绑定到 Runnable,返回一个新的 Runnable 实例。
当调用绑定的 Runnable 时,会自动合并绑定的配置和传入的配置。
"""
def __init__(self, bound, config=None, kwargs=None):
"""
初始化 RunnableBinding
Args:
bound: 要绑定的底层 Runnable 实例
config: 要绑定的配置字典
kwargs: 要绑定的额外关键字参数(暂未使用)
"""
if not isinstance(bound, Runnable):
raise TypeError("bound 必须是 Runnable 实例")
self.bound = bound
self.config = ensure_config(config) or {}
self.kwargs = kwargs or {}
def invoke(self, input, config=None, **kwargs):
"""
调用绑定的 Runnable,合并配置
Args:
input: 输入值
config: 可选的配置字典,会与绑定的配置合并
**kwargs: 额外的关键字参数
Returns:
底层 Runnable 的返回值
"""
# 合并绑定的配置和传入的配置
merged_config = _merge_configs(self.config, config)
# 合并关键字参数
merged_kwargs = {**self.kwargs, **kwargs}
# 调用底层 Runnable
return self.bound.invoke(input, config=merged_config, **merged_kwargs)
def batch(self, inputs, config=None, **kwargs):
"""
批量调用绑定的 Runnable,合并配置
Args:
inputs: 输入值列表
config: 可选的配置字典,会与绑定的配置合并
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 合并绑定的配置和传入的配置
merged_config = _merge_configs(self.config, config)
# 合并关键字参数
merged_kwargs = {**self.kwargs, **kwargs}
# 调用底层 Runnable
return self.bound.batch(inputs, config=merged_config, **merged_kwargs)
def stream(self, input, config=None, **kwargs):
"""
流式调用绑定的 Runnable,合并配置
Args:
input: 输入值
config: 可选的配置字典,会与绑定的配置合并
**kwargs: 额外的关键字参数
Yields:
底层 Runnable 的流式输出
"""
# 合并绑定的配置和传入的配置
merged_config = _merge_configs(self.config, config)
# 合并关键字参数
merged_kwargs = {**self.kwargs, **kwargs}
# 调用底层 Runnable
yield from self.bound.stream(input, config=merged_config, **merged_kwargs)
def __repr__(self):
"""返回对象的字符串表示"""
return f"RunnableBinding(bound={self.bound}, config={self.config})"
# 定义 ConfigurableField 类,用于配置可动态调整的字段
from collections import namedtuple
ConfigurableField = namedtuple(
"ConfigurableField",
["id", "name", "description", "annotation", "is_shared"],
defaults=(None, None, None, False)
)
"""可配置字段的定义
Args:
id: 字段的唯一标识符,在 config["configurable"] 中使用
name: 字段的显示名称(可选)
description: 字段的描述(可选)
annotation: 字段的类型注解(可选)
is_shared: 字段是否共享(可选,默认 False)
"""
# 定义 RunnableConfigurableFields 类,用于包装 Runnable 并支持动态配置字段
class RunnableConfigurableFields(Runnable):
"""
Runnable 可配置字段包装器
用于将 Runnable 的某些字段配置为可在运行时动态调整。
当调用时,会从 config["configurable"] 中读取配置值,并创建新的实例。
示例:
``python
from smartchain.runnables import ConfigurableField
llm = ChatOpenAI(temperature=0).configurable_fields(
temperature=ConfigurableField(
id="temperature",
name="温度值",
description="LLM 的采样温度参数"
)
)
# 使用默认 temperature=0
result1 = llm.invoke("你好")
# 使用 temperature=1.0
result2 = llm.invoke("你好", config={"configurable": {"temperature": 1.0}})
``
"""
# 构造函数:接收默认可执行对象和字段描述字典
def __init__(self, default, fields):
"""
初始化 RunnableConfigurableFields
Args:
default: 默认的 Runnable 实例或具有 invoke 方法的对象
fields: 可配置字段的字典,键为字段名,值为 ConfigurableField 实例
"""
# 检查 default 是否为 Runnable 实例或者拥有 invoke 方法
if not (isinstance(default, Runnable) or (hasattr(default, 'invoke') and callable(getattr(default, 'invoke')))):
raise TypeError("default 必须是 Runnable 实例或具有 invoke 方法的对象")
# 保存默认实例
self.default = default
# 保存字段配置(如果未传入则设为{})
self.fields = fields or {}
# 内部方法,根据 config 动态生成实例,应用动态配置
def _prepare(self, config=None):
"""
准备 Runnable 实例和配置
从 config["configurable"] 中读取配置值,并创建新的实例。
Args:
config: 配置字典
Returns:
tuple: (Runnable 实例, 配置字典)
"""
# 规范化 config(保证为字典)
config = ensure_config(config)
# 从 config 取出 configurable 配置
configurable = config.get("configurable", {})
# 收集需要修改的字段和值
updates = {}
for field_name, field_spec in self.fields.items():
# 检查字段是否为 ConfigurableField
if isinstance(field_spec, ConfigurableField):
# 从 config 找对应 id 的值
config_value = configurable.get(field_spec.id)
if config_value is not None:
updates[field_name] = config_value
# 有更新内容则创建新实例
if updates:
# 获取默认实例的类型
default_class = type(self.default)
# 获取类型名
class_name = default_class.__name__
# 对于特定聊天模型需要特殊参数处理
if class_name in ('ChatOpenAI', 'ChatDeepSeek', 'ChatTongyi'):
# 构造初始化参数 dict,必须包含 model
init_params = {
'model': self.default.model,
}
# 如果有 model_kwargs 就复制
if hasattr(self.default, 'model_kwargs'):
init_params.update(self.default.model_kwargs.copy())
# 增加本次需更新的参数
init_params.update(updates)
# 保持 api_key(如有)
if hasattr(self.default, 'api_key'):
init_params['api_key'] = self.default.api_key
# 保持 base_url(如有)
if hasattr(self.default, 'base_url'):
init_params['base_url'] = getattr(self.default, 'base_url', None)
# 构造新实例
new_instance = default_class(**init_params)
return (new_instance, config)
else:
# 对于其他类型的实例采用通用方法
if hasattr(self.default, '__dict__'):
# 使用对象字段构建参数(忽略以 _ 开头的字段)
init_params = {k: v for k, v in self.default.__dict__.items()
if not k.startswith('_')}
else:
# 无法获取 __dict__ 则用空参数
init_params = {}
# 更新参数
init_params.update(updates)
try:
# 尝试直接用参数构造新实例
new_instance = default_class(**init_params)
return (new_instance, config)
except Exception:
# 构造失败则深拷贝实例并赋值
import copy
new_instance = copy.deepcopy(self.default)
for key, value in updates.items():
# 优先直接设置属性
if hasattr(new_instance, key):
setattr(new_instance, key, value)
# 对于 ChatOpenAI 还要更新 model_kwargs 字典
elif hasattr(new_instance, 'model_kwargs'):
new_instance.model_kwargs[key] = value
return (new_instance, config)
# 未指定可配置参数,直接返回默认实例和 config
return (self.default, config)
# 单条输入调用方法,支持动态配置
def invoke(self, input, config=None, **kwargs):
"""
调用 Runnable,支持动态配置
Args:
input: 输入值
config: 配置字典,可以包含 configurable 字段
**kwargs: 额外的关键字参数
Returns:
底层 Runnable 的返回值
"""
# 获取动态配置后的 runnable 实例和配置
runnable, merged_config = self._prepare(config)
# 若为 Runnable 实例则传递 config 参数
if isinstance(runnable, Runnable):
return runnable.invoke(input, config=merged_config, **kwargs)
else:
# 非 Runnable 实例直接调用(初始化时参数已生效)
return runnable.invoke(input, **kwargs)
# 批量输入调用方法,支持动态配置
def batch(self, inputs, config=None, **kwargs):
"""
批量调用 Runnable,支持动态配置
Args:
inputs: 输入值列表
config: 配置字典,可以包含 configurable 字段
**kwargs: 额外的关键字参数
Returns:
输出值列表
"""
# 获取动态配置后的 runnable 实例和配置
runnable, merged_config = self._prepare(config)
# 若为 Runnable 实例则传递 config 参数
if isinstance(runnable, Runnable):
return runnable.batch(inputs, config=merged_config, **kwargs)
else:
# 有 batch 方法就直接调用
if hasattr(runnable, 'batch'):
return runnable.batch(inputs, **kwargs)
else:
# 没有 batch 方法,逐个调用 invoke 实现
return [runnable.invoke(input_item, **kwargs) for input_item in inputs]
# 流式输入调用,支持动态配置
def stream(self, input, config=None, **kwargs):
"""
流式调用 Runnable,支持动态配置
Args:
input: 输入值
config: 配置字典,可以包含 configurable 字段
**kwargs: 额外的关键字参数
Yields:
底层 Runnable 的流式输出
"""
# 获取动态配置后的 runnable 实例和配置
runnable, merged_config = self._prepare(config)
# 若为 Runnable 实例则传递 config 参数
if isinstance(runnable, Runnable):
yield from runnable.stream(input, config=merged_config, **kwargs)
else:
# 有 stream 方法就直接调用
if hasattr(runnable, 'stream'):
yield from runnable.stream(input, **kwargs)
else:
# 没有流式方法则调用 invoke 并 yield 单值
result = runnable.invoke(input, **kwargs)
yield result
# 字符串表示方法,便于调试
def __repr__(self):
"""返回对象的字符串表示"""
return f"RunnableConfigurableFields(default={self.default}, fields={self.fields})"
# 定义用于根据 config["configurable"] 动态选择分支的类
+class RunnableConfigurableAlternatives(Runnable):
+ """
+ 根据配置动态选择不同分支的 Runnable/对象。
+ 示例:
+ selector = ConfigurableField(id="provider", name="LLM 提供方")
+ chain = some_runnable.configurable_alternatives(
+ selector,
+ default_key="openai",
+ openai=ChatOpenAI(...),
+ deepseek=ChatDeepSeek(...),
+ )
# 默认使用 openai
+ chain.invoke("hi")
# 切换为 deepseek
+ chain.invoke("hi", config={"configurable": {"provider": "deepseek"}})
+ """
# 初始化方法,接收选择字段、默认 key、和所有可选分支
+ def __init__(self, selector_field, default_key, alternatives):
+ """
+ 初始化
+ Args:
+ selector_field: ConfigurableField,用于从 config["configurable"] 取值的字段
+ default_key: 默认分支 key,必须存在于 alternatives
+ alternatives: dict,key -> runnable 或具有 invoke 方法的对象
+ """
# 检查 selector_field 是否为 ConfigurableField 实例
+ if not isinstance(selector_field, ConfigurableField):
+ raise TypeError("selector_field 必须是 ConfigurableField 实例")
# 检查默认 key 是否在 alternatives 里
+ if default_key not in alternatives:
+ raise ValueError("default_key 必须存在于 alternatives 中")
# 检查 alternatives 是否为非空字典
+ if not isinstance(alternatives, dict) or not alternatives:
+ raise ValueError("alternatives 必须是非空字典")
# 保存选择器字段
+ self.selector_field = selector_field
# 保存默认分支 key
+ self.default_key = default_key
# 保存所有分支
+ self.alternatives = alternatives
# 内部方法:按照 config 动态选择分支
+ def _select(self, config=None):
# 标准化配置,补全可选项结构
+ config = ensure_config(config)
# 获取 configurable 字段(可能为空)
+ configurable = config.get("configurable", {}) or {}
# 根据 selector_field.id 查询分支 key,如果没指定则使用默认 key
+ key = configurable.get(self.selector_field.id, self.default_key)
# 找不到分支则报错
+ if key not in self.alternatives:
+ raise ValueError(f"未找到可用分支: {key}")
# 返回被选中的分支和合并后的配置
+ return self.alternatives[key], config
# 单条输入调用,根据当前 config 路由到对应分支
+ def invoke(self, input, config=None, **kwargs):
# 动态选择分支和合并后的配置
+ selected, merged_config = self._select(config)
# 如果是 Runnable,则传递 config
+ if isinstance(selected, Runnable):
+ return selected.invoke(input, config=merged_config, **kwargs)
+ else:
# 否则只调用普通 invoke
+ return selected.invoke(input, **kwargs)
# 批量调用,根据当前 config 调用子分支
+ def batch(self, inputs, config=None, **kwargs):
# 选择分支和合并 config
+ selected, merged_config = self._select(config)
# 如果是 Runnable,传递 config 下批量调用
+ if isinstance(selected, Runnable):
+ return selected.batch(inputs, config=merged_config, **kwargs)
+ else:
# 有 batch 方法直接用
+ if hasattr(selected, "batch"):
+ return selected.batch(inputs, **kwargs)
# 否则逐条调用 invoke
+ return [selected.invoke(item, **kwargs) for item in inputs]
# 流式输出,根据 config 路由
+ def stream(self, input, config=None, **kwargs):
# 动态选择分支
+ selected, merged_config = self._select(config)
# 如果支持 stream 且是 Runnable,传递 config
+ if isinstance(selected, Runnable):
+ yield from selected.stream(input, config=merged_config, **kwargs)
+ else:
# 有 stream 方法直接用
+ if hasattr(selected, "stream"):
+ yield from selected.stream(input, **kwargs)
+ else:
# 没有流式方法则调用普通 invoke
+ yield selected.invoke(input, **kwargs)
# 字符串表示方法,便于调试打印分支
+ def __repr__(self):
+ return (
+ f"RunnableConfigurableAlternatives("
+ f"selector_field={self.selector_field}, "
+ f"default_key={self.default_key}, "
+ f"alternatives={list(self.alternatives.keys())}"
+ f")"
+ )30.4. 类 #
30.4.1 相关类 #
| 类名 | 作用 | 关键方法/属性 | 在示例中的使用 |
|---|---|---|---|
| ChatOpenAI | OpenAI 聊天模型封装类 | __init__(), invoke(), stream(), configurable_alternatives() |
创建 OpenAI 模型实例,并调用 configurable_alternatives() 方法 |
| ChatDeepSeek | DeepSeek 聊天模型封装类 | __init__(), invoke(), stream() |
作为 alternatives 中的一个分支选项 |
| ConfigurableField | 可配置字段元数据类(NamedTuple) | id, name, description, annotation, is_shared |
定义选择器字段,id="provider" 用于在 config["configurable"]["provider"] 中指定分支 |
| RunnableConfigurableAlternatives | 可配置替代分支的包装器类 | __init__(), _select(), invoke(), batch(), stream() |
包装多个模型分支,根据 config 动态选择并调用对应的分支 |
| Runnable | 抽象基类 | invoke(), batch(), stream(), configurable_alternatives() |
RunnableConfigurableAlternatives 的基类,定义统一接口 |
30.4.2 类图 #

30.4.3 时序图 #
30.4.3.1 创建阶段(初始化) #
30.4.3.2 调用阶段(默认分支) #
30.4.3.3 调用阶段(切换分支) #
30.4.4 调用过程 #
初始化阶段
创建
ChatOpenAI实例ChatOpenAI(model="gpt-4o-mini", temperature=0)- 初始化模型参数
- 设置 API 客户端
创建
ConfigurableFieldConfigurableField(id="provider", name="LLM 提供方", description="...")id="provider"作为选择键,后续通过config["configurable"]["provider"]指定分支
调用
configurable_alternatives.configurable_alternatives(selector_field, default_key="openai", openai=..., deepseek=...)ChatOpenAI.configurable_alternatives()创建并返回RunnableConfigurableAlternatives- 传入
selector_field、default_key和alternatives字典
创建
RunnableConfigurableAlternatives- 验证
selector_field是ConfigurableField实例 - 验证
default_key存在于alternatives - 保存
selector_field、default_key和alternatives
- 验证
调用阶段(默认分支)
用户调用
llm.invoke("你好,你是谁?")RunnableConfigurableAlternatives.invoke()执行- 调用
_select(config=None)
- 调用
_select()选择分支config为None,标准化为{}config["configurable"]不存在或为空- 使用
default_key="openai" - 从
alternatives["openai"]获取ChatOpenAI实例 - 返回
(ChatOpenAI实例, config)
调用选中的分支
ChatOpenAI不是Runnable实例- 调用
selected.invoke(input, **kwargs)(不传config)
ChatOpenAI.invoke()执行- 转换输入为消息格式
- 调用 OpenAI API
- 返回
AIMessage
调用阶段(切换分支)
用户调用
llm.invoke("你好,你是谁?", config={"configurable": {"provider": "deepseek"}})RunnableConfigurableAlternatives.invoke()执行- 调用
_select(config={...})
- 调用
_select()选择分支- 从
config["configurable"]["provider"]获取"deepseek" - 从
alternatives["deepseek"]获取ChatDeepSeek实例 - 返回
(ChatDeepSeek实例, config)
- 从
调用选中的分支
ChatDeepSeek不是Runnable实例- 调用
selected.invoke(input, **kwargs)
ChatDeepSeek.invoke()执行- 转换输入为消息格式
- 调用 DeepSeek API
- 返回
AIMessage
关键设计点
- 动态选择:通过
_select()根据config["configurable"][selector_field.id]选择分支 - 默认分支:未指定时使用
default_key - 兼容性:支持
Runnable和非Runnable对象(如ChatOpenAI、ChatDeepSeek) - 配置传递:
Runnable实例传递config,非Runnable不传递
数据流图
用户输入 "你好,你是谁?"
↓
RunnableConfigurableAlternatives.invoke()
↓
_select(config) → 从 config["configurable"]["provider"] 获取 key
↓
alternatives[key] → 获取对应的模型实例(ChatOpenAI 或 ChatDeepSeek)
↓
selected.invoke(input) → 调用模型 API
↓
返回 AIMessage(content="...")
↓
用户获取结果