这是用户在 2024-7-29 10:23 为 https://langchain-ai.github.io/langgraph/reference/graphs/ 保存的双语快照页面,由 沉浸式翻译 提供双语支持。了解如何保存?
Skip to content  跳到内容

Graph Definitions
图形定义

Graphs are the core abstraction of LangGraph. Each StateGraph implementation is used to create graph workflows. Once compiled, you can run the CompiledGraph to run the application.
图形是 LangGraph 的核心抽象。每个 StateGraph 实现都用于创建图形工作流。编译后,您可以运行 CompiledGraph 来运行应用程序。

StateGraph
StateGraph(状态图)¶

from langgraph.graph import StateGraph
from typing_extensions import TypedDict
class MyState(TypedDict)
    ...
graph = StateGraph(MyState)

Bases: Graph
基数: 图表

A graph whose nodes communicate by reading and writing to a shared state. The signature of each node is State -> Partial.
一个图形,其节点通过读取和写入共享状态进行通信。每个节点的签名是 State -> Partial

Each state key can optionally be annotated with a reducer function that will be used to aggregate the values of that key received from multiple nodes. The signature of a reducer function is (Value, Value) -> Value.
每个状态键都可以选择性地使用 reducer 函数进行注释,该函数将用于聚合从多个节点接收的该键的值。reducer 函数的签名是 (Value, Value) -> Value。

Parameters: 参数:

  • state_schema (Type[Any], default: None ) –

    The schema class that defines the state.


    state_schema类型[任意],默认值: 没有 ) –

    定义状态的架构类。

  • config_schema (Optional[Type[Any]], default: None ) –

    The schema class that defines the configuration. Use this to expose configurable parameters in your API.


    config_schema可选[类型[任意]],默认值: 没有 ) –

    定义配置的架构类。使用它来公开 API 中的可配置参数。

Examples: 例子:

>>> from langchain_core.runnables import RunnableConfig
>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.checkpoint import MemorySaver
>>> from langgraph.graph import StateGraph
>>>
>>> def reducer(a: list, b: int | None) -> int:
...     if b is not None:
...         return a + [b]
...     return a
>>>
>>> class State(TypedDict):
...     x: Annotated[list, reducer]
>>>
>>> class ConfigSchema(TypedDict):
...     r: float
>>>
>>> graph = StateGraph(State, config_schema=ConfigSchema)
>>>
>>> def node(state: State, config: RunnableConfig) -> dict:
...     r = config["configurable"].get("r", 1.0)
...     x = state["x"][-1]
...     next_value = x * r * (1 - x)
...     return {"x": next_value}
>>>
>>> graph.add_node("A", node)
>>> graph.set_entry_point("A")
>>> graph.set_finish_point("A")
>>> compiled = graph.compile()
>>>
>>> print(compiled.config_specs)
[ConfigurableFieldSpec(id='r', annotation=<class 'float'>, name=None, description=None, default=None, is_shared=False, dependencies=None)]
>>>
>>> step1 = compiled.invoke({"x": 0.5}, {"configurable": {"r": 3.0}})
>>> print(step1)
{'x': [0.5, 0.75]}
Source code in libs/langgraph/langgraph/graph/state.py
libs/langgraph/langgraph/graph/state.py 中的源代码
class StateGraph(Graph):
    """A graph whose nodes communicate by reading and writing to a shared state.
    The signature of each node is State -> Partial<State>.

    Each state key can optionally be annotated with a reducer function that
    will be used to aggregate the values of that key received from multiple nodes.
    The signature of a reducer function is (Value, Value) -> Value.

    Args:
        state_schema (Type[Any]): The schema class that defines the state.
        config_schema (Optional[Type[Any]]): The schema class that defines the configuration.
            Use this to expose configurable parameters in your API.


    Examples:
        >>> from langchain_core.runnables import RunnableConfig
        >>> from typing_extensions import Annotated, TypedDict
        >>> from langgraph.checkpoint import MemorySaver
        >>> from langgraph.graph import StateGraph
        >>>
        >>> def reducer(a: list, b: int | None) -> int:
        ...     if b is not None:
        ...         return a + [b]
        ...     return a
        >>>
        >>> class State(TypedDict):
        ...     x: Annotated[list, reducer]
        >>>
        >>> class ConfigSchema(TypedDict):
        ...     r: float
        >>>
        >>> graph = StateGraph(State, config_schema=ConfigSchema)
        >>>
        >>> def node(state: State, config: RunnableConfig) -> dict:
        ...     r = config["configurable"].get("r", 1.0)
        ...     x = state["x"][-1]
        ...     next_value = x * r * (1 - x)
        ...     return {"x": next_value}
        >>>
        >>> graph.add_node("A", node)
        >>> graph.set_entry_point("A")
        >>> graph.set_finish_point("A")
        >>> compiled = graph.compile()
        >>>
        >>> print(compiled.config_specs)
        [ConfigurableFieldSpec(id='r', annotation=<class 'float'>, name=None, description=None, default=None, is_shared=False, dependencies=None)]
        >>>
        >>> step1 = compiled.invoke({"x": 0.5}, {"configurable": {"r": 3.0}})
        >>> print(step1)
        {'x': [0.5, 0.75]}"""

    nodes: dict[str, StateNodeSpec]
    channels: dict[str, BaseChannel]
    managed: dict[str, Type[ManagedValue]]
    schemas: dict[Type[Any], dict[str, Union[BaseChannel, Type[ManagedValue]]]]

    def __init__(
        self,
        state_schema: Optional[Type[Any]] = None,
        config_schema: Optional[Type[Any]] = None,
        *,
        input: Optional[Type[Any]] = None,
        output: Optional[Type[Any]] = None,
    ) -> None:
        super().__init__()
        if state_schema is None:
            if input is None or output is None:
                raise ValueError("Must provide state_schema or input and output")
            state_schema = input
        else:
            if input is None:
                input = state_schema
            if output is None:
                output = state_schema
        self.schemas = {}
        self.channels = {}
        self.managed = {}
        self.schema = state_schema
        self.input = input
        self.output = output
        self._add_schema(state_schema)
        self._add_schema(input)
        self._add_schema(output)
        self.config_schema = config_schema
        self.waiting_edges: set[tuple[tuple[str, ...], str]] = set()

    @property
    def _all_edges(self) -> set[tuple[str, str]]:
        return self.edges | {
            (start, end) for starts, end in self.waiting_edges for start in starts
        }

    def _add_schema(self, schema: Type[Any]) -> None:
        if schema not in self.schemas:
            _warn_invalid_state_schema(schema)
            channels, managed = _get_channels(schema)
            self.schemas[schema] = {**channels, **managed}
            for key, channel in channels.items():
                if key in self.channels:
                    if self.channels[key] != channel:
                        if isinstance(channel, LastValue):
                            pass
                        else:
                            raise ValueError(
                                f"Channel '{key}' already exists with a different type"
                            )
                else:
                    self.channels[key] = channel
            for key, managed in managed.items():
                if key in self.managed:
                    if self.managed[key] != managed:
                        raise ValueError(
                            f"Managed value '{key}' already exists with a different type"
                        )
                else:
                    self.managed[key] = managed
            if any(
                isinstance(c, BinaryOperatorAggregate) for c in self.channels.values()
            ):
                self.support_multiple_edges = True

    @overload
    def add_node(
        self,
        node: RunnableLike,
        *,
        metadata: Optional[dict[str, Any]] = None,
        input: Optional[Type[Any]] = None,
        retry: Optional[RetryPolicy] = None,
    ) -> None:
        """Adds a new node to the state graph.
        Will take the name of the function/runnable as the node name.

        Args:
            node (RunnableLike): The function or runnable this node will run.

        Raises:
            ValueError: If the key is already being used as a state key.

        Returns:
            None
        """
        ...

    @overload
    def add_node(
        self,
        node: str,
        action: RunnableLike,
        *,
        metadata: Optional[dict[str, Any]] = None,
        input: Optional[Type[Any]] = None,
        retry: Optional[RetryPolicy] = None,
    ) -> None:
        """Adds a new node to the state graph.

        Args:
            node (str): The key of the node.
            action (RunnableLike): The action associated with the node.

        Raises:
            ValueError: If the key is already being used as a state key.

        Returns:
            None
        """
        ...

    def add_node(
        self,
        node: Union[str, RunnableLike],
        action: Optional[RunnableLike] = None,
        *,
        metadata: Optional[dict[str, Any]] = None,
        input: Optional[Type[Any]] = None,
        retry: Optional[RetryPolicy] = None,
    ) -> None:
        """Adds a new node to the state graph.

        Will take the name of the function/runnable as the node name.

        Args:
            node (Union[str, RunnableLike)]: The function or runnable this node will run.
            action (Optional[RunnableLike]): The action associated with the node. (default: None)
            metadata (Optional[dict[str, Any]]): The metadata associated with the node. (default: None)
            input (Optional[Type[Any]]): The input schema for the node. (default: the graph's input schema)
            retry (Optional[RetryPolicy]): The policy for retrying the node. (default: None)
        Raises:
            ValueError: If the key is already being used as a state key.


        Examples:
            ```pycon
            >>> from langgraph.graph import START, StateGraph
            ...
            >>> def my_node(state, config):
            ...    return {"x": state["x"] + 1}
            ...
            >>> builder = StateGraph(dict)
            >>> builder.add_node(my_node)  # node name will be 'my_node'
            >>> builder.add_edge(START, "my_node")
            >>> graph = builder.compile()
            >>> graph.invoke({"x": 1})
            {'x': 2}
            ```
            Customize the name:

            ```pycon
            >>> builder = StateGraph(dict)
            >>> builder.add_node("my_fair_node", my_node)
            >>> builder.add_edge(START, "my_fair_node")
            >>> graph = builder.compile()
            >>> graph.invoke({"x": 1})
            {'x': 2}
            ```

        Returns:
            None
        """
        if not isinstance(node, str):
            action = node
            if isinstance(action, Runnable):
                node = action.name
            else:
                node = getattr(action, "__name__", action.__class__.__name__)
            if node is None:
                raise ValueError(
                    "Node name must be provided if action is not a function"
                )
        if node in self.channels:
            raise ValueError(f"'{node}' is already being used as a state key")
        if self.compiled:
            logger.warning(
                "Adding a node to a graph that has already been compiled. This will "
                "not be reflected in the compiled graph."
            )
        if not isinstance(node, str):
            action = node
            node = getattr(action, "name", action.__name__)
        if node in self.nodes:
            raise ValueError(f"Node `{node}` already present.")
        if node == END or node == START:
            raise ValueError(f"Node `{node}` is reserved.")
        try:
            if isfunction(action) and (
                hints := get_type_hints(action.__call__) or get_type_hints(action)
            ):
                if input is None:
                    input_hint = hints[list(hints.keys())[0]]
                    if isinstance(input_hint, type) and get_type_hints(input_hint):
                        input = input_hint
        except TypeError:
            pass
        if input is not None:
            self._add_schema(input)
        self.nodes[node] = StateNodeSpec(
            coerce_to_runnable(action, name=node, trace=False),
            metadata,
            input=input or self.schema,
            retry_policy=retry,
        )

    def add_edge(self, start_key: Union[str, list[str]], end_key: str) -> None:
        """Adds a directed edge from the start node to the end node.

        If the graph transitions to the start_key node, it will always transition to the end_key node next.

        Args:
            start_key (Union[str, list[str]]): The key(s) of the start node(s) of the edge.
            end_key (str): The key of the end node of the edge.

        Raises:
            ValueError: If the start key is 'END' or if the start key or end key is not present in the graph.

        Returns:
            None
        """
        if isinstance(start_key, str):
            return super().add_edge(start_key, end_key)

        if self.compiled:
            logger.warning(
                "Adding an edge to a graph that has already been compiled. This will "
                "not be reflected in the compiled graph."
            )
        for start in start_key:
            if start == END:
                raise ValueError("END cannot be a start node")
            if start not in self.nodes:
                raise ValueError(f"Need to add_node `{start}` first")
        if end_key == START:
            raise ValueError("START cannot be an end node")
        if end_key not in self.nodes:
            raise ValueError(f"Need to add_node `{end_key}` first")

        self.waiting_edges.add((tuple(start_key), end_key))

    def compile(
        self,
        checkpointer: Optional[BaseCheckpointSaver] = None,
        interrupt_before: Optional[Union[All, Sequence[str]]] = None,
        interrupt_after: Optional[Union[All, Sequence[str]]] = None,
        debug: bool = False,
    ) -> "CompiledStateGraph":
        """Compiles the state graph into a `CompiledGraph` object.

        The compiled graph implements the `Runnable` interface and can be invoked,
        streamed, batched, and run asynchronously.

        Args:
            checkpointer (Optional[BaseCheckpointSaver]): An optional checkpoint saver object.
                This serves as a fully versioned "memory" for the graph, allowing
                the graph to be paused and resumed, and replayed from any point.
            interrupt_before (Optional[Sequence[str]]): An optional list of node names to interrupt before.
            interrupt_after (Optional[Sequence[str]]): An optional list of node names to interrupt after.
            debug (bool): A flag indicating whether to enable debug mode.

        Returns:
            CompiledStateGraph: The compiled state graph.
        """
        # assign default values
        interrupt_before = interrupt_before or []
        interrupt_after = interrupt_after or []

        # validate the graph
        self.validate(
            interrupt=(
                (interrupt_before if interrupt_before != "*" else []) + interrupt_after
                if interrupt_after != "*"
                else []
            )
        )

        # prepare output channels
        output_channels = (
            "__root__"
            if len(self.schemas[self.output]) == 1
            and "__root__" in self.schemas[self.output]
            else [
                key
                for key, val in self.schemas[self.output].items()
                if not isinstance(val, Context) and not is_managed_value(val)
            ]
        )

        compiled = CompiledStateGraph(
            builder=self,
            config_type=self.config_schema,
            nodes={},
            channels={**self.channels, START: EphemeralValue(self.input)},
            input_channels=START,
            stream_mode="updates",
            output_channels=output_channels,
            stream_channels=output_channels,
            checkpointer=checkpointer,
            interrupt_before_nodes=interrupt_before,
            interrupt_after_nodes=interrupt_after,
            auto_validate=False,
            debug=debug,
        )

        compiled.attach_node(START, None)
        for key, node in self.nodes.items():
            compiled.attach_node(key, node)

        for start, end in self.edges:
            compiled.attach_edge(start, end)

        for starts, end in self.waiting_edges:
            compiled.attach_edge(starts, end)

        for start, branches in self.branches.items():
            for name, branch in branches.items():
                compiled.attach_branch(start, name, branch)

        return compiled.validate()

add_conditional_edges(source, path, path_map=None, then=None)
add_conditional_edgessource path path_map=None then=None

Add a conditional edge from the starting node to any number of destination nodes.
将条件边从起始节点添加到任意数量的目标节点。

Parameters: 参数:

  • source (str) –

    The starting node. This conditional edge will run when exiting this node.


    str) –

    起始节点。此条件边将在退出此节点时运行。

  • path (Union[Callable, Runnable]) –

    The callable that determines the next node or nodes. If not specifying path_map it should return one or more nodes. If it returns END, the graph will stop execution.


    路径联合[可调用可运行]) –

    确定下一个节点或节点的可调用对象。如果未指定 path_map 则应返回一个或多个节点。如果返回 END,则图形将停止执行。

  • path_map (Optional[dict[Hashable, str]], default: None ) –

    Optional mapping of paths to node names. If omitted the paths returned by path should be node names.


    path_map可选[dict[Hashablestr]],默认值: 没有 ) –

    可选的路径映射到节点名称。如果省略,则 path 返回的路径应为节点名称。

  • then (Optional[str], default: None ) –

    The name of a node to execute after the nodes selected by path.


    然后可选[str],默认值: 没有 ) –

    按路径选择的节点之后要执行的节点的名称。

Returns: 返回:

  • None

    None


    没有

    没有

Without typehints on the path function's return value (e.g., -> Literal["foo", "__end__"]:)
路径函数的返回值上没有 typehints(例如,-> Literal[“foo”, “__end__”]:

or a path_map, the graph visualization assumes the edge could transition to any node in the graph.
或者path_map,图形可视化假定边缘可以过渡到图形中的任何节点。

Source code in libs/langgraph/langgraph/graph/graph.py
libs/langgraph/langgraph/graph/graph.py 中的源代码
def add_conditional_edges(
    self,
    source: str,
    path: Union[
        Callable[..., Union[Hashable, list[Hashable]]],
        Callable[..., Awaitable[Union[Hashable, list[Hashable]]]],
        Runnable[Any, Union[Hashable, list[Hashable]]],
    ],
    path_map: Optional[Union[dict[Hashable, str], list[str]]] = None,
    then: Optional[str] = None,
) -> None:
    """Add a conditional edge from the starting node to any number of destination nodes.

    Args:
        source (str): The starting node. This conditional edge will run when
            exiting this node.
        path (Union[Callable, Runnable]): The callable that determines the next
            node or nodes. If not specifying `path_map` it should return one or
            more nodes. If it returns END, the graph will stop execution.
        path_map (Optional[dict[Hashable, str]]): Optional mapping of paths to node
            names. If omitted the paths returned by `path` should be node names.
        then (Optional[str]): The name of a node to execute after the nodes
            selected by `path`.

    Returns:
        None

    Note: Without typehints on the `path` function's return value (e.g., `-> Literal["foo", "__end__"]:`)
        or a path_map, the graph visualization assumes the edge could transition to any node in the graph.

    """  # noqa: E501
    if self.compiled:
        logger.warning(
            "Adding an edge to a graph that has already been compiled. This will "
            "not be reflected in the compiled graph."
        )
    # coerce path_map to a dictionary
    try:
        if isinstance(path_map, dict):
            path_map = path_map.copy()
        elif isinstance(path_map, list):
            path_map = {name: name for name in path_map}
        elif rtn_type := get_type_hints(path.__call__).get(
            "return"
        ) or get_type_hints(path).get("return"):
            if get_origin(rtn_type) is Literal:
                path_map = {name: name for name in get_args(rtn_type)}
    except Exception:
        pass
    # find a name for the condition
    path = coerce_to_runnable(path, name=None, trace=True)
    name = path.name or "condition"
    # validate the condition
    if name in self.branches[source]:
        raise ValueError(
            f"Branch with name `{path.name}` already exists for node " f"`{source}`"
        )
    # save it
    self.branches[source][name] = Branch(path, path_map, then)

set_entry_point(key)
set_entry_point

Specifies the first node to be called in the graph.
指定要在关系图中调用的第一个节点。

Equivalent to calling add_edge(START, key).
等效于调用 add_edge(START, key)。

Parameters: 参数:

  • key (str) –

    The key of the node to set as the entry point.


    钥匙str) –

    要设置为入口点的节点的键。

Returns: 返回:

  • None

    None


    没有

    没有

Source code in libs/langgraph/langgraph/graph/graph.py
libs/langgraph/langgraph/graph/graph.py 中的源代码
def set_entry_point(self, key: str) -> None:
    """Specifies the first node to be called in the graph.

    Equivalent to calling `add_edge(START, key)`.

    Parameters:
        key (str): The key of the node to set as the entry point.

    Returns:
        None
    """
    return self.add_edge(START, key)

set_conditional_entry_point(path, path_map=None, then=None)
set_conditional_entry_pointpath path_map=None then=None

Sets a conditional entry point in the graph.
在图形中设置条件入口点。

Parameters: 参数:

  • path (Union[Callable, Runnable]) –

    The callable that determines the next node or nodes. If not specifying path_map it should return one or more nodes. If it returns END, the graph will stop execution.


    路径联合[可调用可运行]) –

    确定下一个节点或节点的可调用对象。如果未指定 path_map 则应返回一个或多个节点。如果返回 END,则图形将停止执行。

  • path_map (Optional[dict[str, str]], default: None ) –

    Optional mapping of paths to node names. If omitted the paths returned by path should be node names.


    path_map可选[dict[strstr]],默认值: 没有 ) –

    可选的路径映射到节点名称。如果省略,则 path 返回的路径应为节点名称。

  • then (Optional[str], default: None ) –

    The name of a node to execute after the nodes selected by path.


    然后可选[str],默认值: 没有 ) –

    按路径选择的节点之后要执行的节点的名称。

Returns: 返回:

  • None

    None


    没有

    没有

Source code in libs/langgraph/langgraph/graph/graph.py
libs/langgraph/langgraph/graph/graph.py 中的源代码
def set_conditional_entry_point(
    self,
    path: Union[
        Callable[..., Union[Hashable, list[Hashable]]],
        Callable[..., Awaitable[Union[Hashable, list[Hashable]]]],
        Runnable[Any, Union[Hashable, list[Hashable]]],
    ],
    path_map: Optional[Union[dict[Hashable, str], list[str]]] = None,
    then: Optional[str] = None,
) -> None:
    """Sets a conditional entry point in the graph.

    Args:
        path (Union[Callable, Runnable]): The callable that determines the next
            node or nodes. If not specifying `path_map` it should return one or
            more nodes. If it returns END, the graph will stop execution.
        path_map (Optional[dict[str, str]]): Optional mapping of paths to node
            names. If omitted the paths returned by `path` should be node names.
        then (Optional[str]): The name of a node to execute after the nodes
            selected by `path`.

    Returns:
        None
    """
    return self.add_conditional_edges(START, path, path_map, then)

set_finish_point(key)
set_finish_point

Marks a node as a finish point of the graph.
将节点标记为图形的完成点。

If the graph reaches this node, it will cease execution.
如果图形到达此节点,它将停止执行。

Parameters: 参数:

  • key (str) –

    The key of the node to set as the finish point.


    钥匙str) –

    要设置为终点的节点键。

Returns: 返回:

  • None

    None


    没有

    没有

Source code in libs/langgraph/langgraph/graph/graph.py
libs/langgraph/langgraph/graph/graph.py 中的源代码
def set_finish_point(self, key: str) -> None:
    """Marks a node as a finish point of the graph.

    If the graph reaches this node, it will cease execution.

    Parameters:
        key (str): The key of the node to set as the finish point.

    Returns:
        None
    """
    return self.add_edge(key, END)

add_node(node, action=None, *, metadata=None, input=None, retry=None)
add_nodenode action=None * metadata=None input=None retry=None

Adds a new node to the state graph.
将新节点添加到状态图中。

Will take the name of the function/runnable as the node name.
将采用函数的名称/runnable 作为节点名称。

Parameters: 参数:

  • node (Union[str, RunnableLike)]) –

    The function or runnable this node will run.


    节点Union[str, RunnableLike)]) –

    此节点将运行的函数或可运行设备。

  • action (Optional[RunnableLike], default: None ) –

    The action associated with the node. (default: None)


    行动可选[RunnableLike],默认值: 没有 ) –

    与节点关联的操作。(默认值:无)

  • metadata (Optional[dict[str, Any]], default: None ) –

    The metadata associated with the node. (default: None)


    元数据可选[dict[strAny]],默认值: 没有 ) –

    与节点关联的元数据。(默认值:无)

  • input (Optional[Type[Any]], default: None ) –

    The input schema for the node. (default: the graph's input schema)


    输入可选[类型[任意]],默认值: 没有 ) –

    节点的输入架构。(默认值:图形的输入架构)

  • retry (Optional[RetryPolicy], default: None ) –

    The policy for retrying the node. (default: None)


    重试可选 [RetryPolicy],默认值: 没有 ) –

    重试节点的策略。(默认值:无)

Raises: ValueError: If the key is already being used as a state key.
加薪:ValueError:如果键已被用作状态键。

Examples: 例子:

>>> from langgraph.graph import START, StateGraph
...
>>> def my_node(state, config):
...    return {"x": state["x"] + 1}
...
>>> builder = StateGraph(dict)
>>> builder.add_node(my_node)  # node name will be 'my_node'
>>> builder.add_edge(START, "my_node")
>>> graph = builder.compile()
>>> graph.invoke({"x": 1})
{'x': 2}
Customize the name: 自定义名称:

>>> builder = StateGraph(dict)
>>> builder.add_node("my_fair_node", my_node)
>>> builder.add_edge(START, "my_fair_node")
>>> graph = builder.compile()
>>> graph.invoke({"x": 1})
{'x': 2}

Returns: 返回:

  • None

    None


    没有

    没有

Source code in libs/langgraph/langgraph/graph/state.py
libs/langgraph/langgraph/graph/state.py 中的源代码
def add_node(
    self,
    node: Union[str, RunnableLike],
    action: Optional[RunnableLike] = None,
    *,
    metadata: Optional[dict[str, Any]] = None,
    input: Optional[Type[Any]] = None,
    retry: Optional[RetryPolicy] = None,
) -> None:
    """Adds a new node to the state graph.

    Will take the name of the function/runnable as the node name.

    Args:
        node (Union[str, RunnableLike)]: The function or runnable this node will run.
        action (Optional[RunnableLike]): The action associated with the node. (default: None)
        metadata (Optional[dict[str, Any]]): The metadata associated with the node. (default: None)
        input (Optional[Type[Any]]): The input schema for the node. (default: the graph's input schema)
        retry (Optional[RetryPolicy]): The policy for retrying the node. (default: None)
    Raises:
        ValueError: If the key is already being used as a state key.


    Examples:
        ```pycon
        >>> from langgraph.graph import START, StateGraph
        ...
        >>> def my_node(state, config):
        ...    return {"x": state["x"] + 1}
        ...
        >>> builder = StateGraph(dict)
        >>> builder.add_node(my_node)  # node name will be 'my_node'
        >>> builder.add_edge(START, "my_node")
        >>> graph = builder.compile()
        >>> graph.invoke({"x": 1})
        {'x': 2}
        ```
        Customize the name:

        ```pycon
        >>> builder = StateGraph(dict)
        >>> builder.add_node("my_fair_node", my_node)
        >>> builder.add_edge(START, "my_fair_node")
        >>> graph = builder.compile()
        >>> graph.invoke({"x": 1})
        {'x': 2}
        ```

    Returns:
        None
    """
    if not isinstance(node, str):
        action = node
        if isinstance(action, Runnable):
            node = action.name
        else:
            node = getattr(action, "__name__", action.__class__.__name__)
        if node is None:
            raise ValueError(
                "Node name must be provided if action is not a function"
            )
    if node in self.channels:
        raise ValueError(f"'{node}' is already being used as a state key")
    if self.compiled:
        logger.warning(
            "Adding a node to a graph that has already been compiled. This will "
            "not be reflected in the compiled graph."
        )
    if not isinstance(node, str):
        action = node
        node = getattr(action, "name", action.__name__)
    if node in self.nodes:
        raise ValueError(f"Node `{node}` already present.")
    if node == END or node == START:
        raise ValueError(f"Node `{node}` is reserved.")
    try:
        if isfunction(action) and (
            hints := get_type_hints(action.__call__) or get_type_hints(action)
        ):
            if input is None:
                input_hint = hints[list(hints.keys())[0]]
                if isinstance(input_hint, type) and get_type_hints(input_hint):
                    input = input_hint
    except TypeError:
        pass
    if input is not None:
        self._add_schema(input)
    self.nodes[node] = StateNodeSpec(
        coerce_to_runnable(action, name=node, trace=False),
        metadata,
        input=input or self.schema,
        retry_policy=retry,
    )

add_edge(start_key, end_key)
add_edgestart_key end_key

Adds a directed edge from the start node to the end node.
将起点节点的有向边添加到终点节点。

If the graph transitions to the start_key node, it will always transition to the end_key node next.
如果图形过渡到start_key节点,则接下来将始终过渡到end_key节点。

Parameters: 参数:

  • start_key (Union[str, list[str]]) –

    The key(s) of the start node(s) of the edge.


    start_key联合[strlist[str]]) –

    Edge 的起始节点的键。

  • end_key (str) –

    The key of the end node of the edge.


    end_keystr) –

    Edge 的终端节点的键。

Raises: 提高:

  • ValueError

    If the start key is 'END' or if the start key or end key is not present in the graph.


    值错误

    如果开始键为“END”,或者图形中不存在开始键或结束键。

Returns: 返回:

  • None

    None


    没有

    没有

Source code in libs/langgraph/langgraph/graph/state.py
libs/langgraph/langgraph/graph/state.py 中的源代码
def add_edge(self, start_key: Union[str, list[str]], end_key: str) -> None:
    """Adds a directed edge from the start node to the end node.

    If the graph transitions to the start_key node, it will always transition to the end_key node next.

    Args:
        start_key (Union[str, list[str]]): The key(s) of the start node(s) of the edge.
        end_key (str): The key of the end node of the edge.

    Raises:
        ValueError: If the start key is 'END' or if the start key or end key is not present in the graph.

    Returns:
        None
    """
    if isinstance(start_key, str):
        return super().add_edge(start_key, end_key)

    if self.compiled:
        logger.warning(
            "Adding an edge to a graph that has already been compiled. This will "
            "not be reflected in the compiled graph."
        )
    for start in start_key:
        if start == END:
            raise ValueError("END cannot be a start node")
        if start not in self.nodes:
            raise ValueError(f"Need to add_node `{start}` first")
    if end_key == START:
        raise ValueError("START cannot be an end node")
    if end_key not in self.nodes:
        raise ValueError(f"Need to add_node `{end_key}` first")

    self.waiting_edges.add((tuple(start_key), end_key))

compile(checkpointer=None, interrupt_before=None, interrupt_after=None, debug=False)
compilecheckpointer=None interrupt_before=None interrupt_after=None debug=False

Compiles the state graph into a CompiledGraph object.
将状态图编译为 CompiledGraph 对象。

The compiled graph implements the Runnable interface and can be invoked, streamed, batched, and run asynchronously.
编译后的图形实现了 Runnable 接口,可以异步调用、流式处理、批处理和运行。

Parameters: 参数:

  • checkpointer (Optional[BaseCheckpointSaver], default: None ) –

    An optional checkpoint saver object. This serves as a fully versioned "memory" for the graph, allowing the graph to be paused and resumed, and replayed from any point.


    检查点可选 [BaseCheckpointSaver],默认值: 没有 ) –

    可选的检查点保护程序对象。这充当图形的完全版本控制的“内存”,允许图形暂停和恢复,并从任何点重放。

  • interrupt_before (Optional[Sequence[str]], default: None ) –

    An optional list of node names to interrupt before.


    interrupt_before可选[Sequence[str]],默认值: 没有 ) –

    要在之前中断的节点名称的可选列表。

  • interrupt_after (Optional[Sequence[str]], default: None ) –

    An optional list of node names to interrupt after.


    interrupt_after可选[Sequence[str]],默认值: 没有 ) –

    要中断的节点名称的可选列表。

  • debug (bool, default: False ) –

    A flag indicating whether to enable debug mode.


    调试布尔值,默认值: ) –

    指示是否启用调试模式的标志。

Returns: 返回:

  • CompiledStateGraph ( CompiledStateGraph ) –

    The compiled state graph.


    CompiledStateGraphCompiledStateGraph ) –

    编译的状态图。

Source code in libs/langgraph/langgraph/graph/state.py
libs/langgraph/langgraph/graph/state.py 中的源代码
def compile(
    self,
    checkpointer: Optional[BaseCheckpointSaver] = None,
    interrupt_before: Optional[Union[All, Sequence[str]]] = None,
    interrupt_after: Optional[Union[All, Sequence[str]]] = None,
    debug: bool = False,
) -> "CompiledStateGraph":
    """Compiles the state graph into a `CompiledGraph` object.

    The compiled graph implements the `Runnable` interface and can be invoked,
    streamed, batched, and run asynchronously.

    Args:
        checkpointer (Optional[BaseCheckpointSaver]): An optional checkpoint saver object.
            This serves as a fully versioned "memory" for the graph, allowing
            the graph to be paused and resumed, and replayed from any point.
        interrupt_before (Optional[Sequence[str]]): An optional list of node names to interrupt before.
        interrupt_after (Optional[Sequence[str]]): An optional list of node names to interrupt after.
        debug (bool): A flag indicating whether to enable debug mode.

    Returns:
        CompiledStateGraph: The compiled state graph.
    """
    # assign default values
    interrupt_before = interrupt_before or []
    interrupt_after = interrupt_after or []

    # validate the graph
    self.validate(
        interrupt=(
            (interrupt_before if interrupt_before != "*" else []) + interrupt_after
            if interrupt_after != "*"
            else []
        )
    )

    # prepare output channels
    output_channels = (
        "__root__"
        if len(self.schemas[self.output]) == 1
        and "__root__" in self.schemas[self.output]
        else [
            key
            for key, val in self.schemas[self.output].items()
            if not isinstance(val, Context) and not is_managed_value(val)
        ]
    )

    compiled = CompiledStateGraph(
        builder=self,
        config_type=self.config_schema,
        nodes={},
        channels={**self.channels, START: EphemeralValue(self.input)},
        input_channels=START,
        stream_mode="updates",
        output_channels=output_channels,
        stream_channels=output_channels,
        checkpointer=checkpointer,
        interrupt_before_nodes=interrupt_before,
        interrupt_after_nodes=interrupt_after,
        auto_validate=False,
        debug=debug,
    )

    compiled.attach_node(START, None)
    for key, node in self.nodes.items():
        compiled.attach_node(key, node)

    for start, end in self.edges:
        compiled.attach_edge(start, end)

    for starts, end in self.waiting_edges:
        compiled.attach_edge(starts, end)

    for start, branches in self.branches.items():
        for name, branch in branches.items():
            compiled.attach_branch(start, name, branch)

    return compiled.validate()

handler: python 处理程序:Python

MessageGraph
MessageGraph(消息图)¶

Bases: StateGraph
基础:StateGraph

A StateGraph where every node receives a list of messages as input and returns one or more messages as output.
一个 StateGraph,其中每个节点都会接收一个消息列表作为输入,并返回一条或多条消息作为输出。

MessageGraph is a subclass of StateGraph whose entire state is a single, append-only* list of messages. Each node in a MessageGraph takes a list of messages as input and returns zero or more messages as output. The add_messages function is used to merge the output messages from each node into the existing list of messages in the graph's state.
MessageGraph 是 StateGraph 的一个子类,其整个状态是一个单独的、仅追加*的消息列表。MessageGraph 中的每个节点都采用消息列表作为输入,并返回零条或多条消息作为输出。add_messages 函数用于将每个节点的输出消息合并到图形状态中的现有消息列表中。

Examples: 例子:

>>> from langgraph.graph.message import MessageGraph
...
>>> builder = MessageGraph()
>>> builder.add_node("chatbot", lambda state: [("assistant", "Hello!")])
>>> builder.set_entry_point("chatbot")
>>> builder.set_finish_point("chatbot")
>>> builder.compile().invoke([("user", "Hi there.")])
[HumanMessage(content="Hi there.", id='...'), AIMessage(content="Hello!", id='...')]
>>> from langchain_core.messages import AIMessage, HumanMessage, ToolMessage
>>> from langgraph.graph.message import MessageGraph
...
>>> builder = MessageGraph()
>>> builder.add_node(
...     "chatbot",
...     lambda state: [
...         AIMessage(
...             content="Hello!",
...             tool_calls=[{"name": "search", "id": "123", "args": {"query": "X"}}],
...         )
...     ],
... )
>>> builder.add_node(
...     "search", lambda state: [ToolMessage(content="Searching...", tool_call_id="123")]
... )
>>> builder.set_entry_point("chatbot")
>>> builder.add_edge("chatbot", "search")
>>> builder.set_finish_point("search")
>>> builder.compile().invoke([HumanMessage(content="Hi there. Can you search for X?")])
{'messages': [HumanMessage(content="Hi there. Can you search for X?", id='b8b7d8f4-7f4d-4f4d-9c1d-f8b8d8f4d9c1'),
             AIMessage(content="Hello!", id='f4d9c1d8-8d8f-4d9c-b8b7-d8f4f4d9c1d8'),
             ToolMessage(content="Searching...", id='d8f4f4d9-c1d8-4f4d-b8b7-d8f4f4d9c1d8', tool_call_id="123")]}
Source code in libs/langgraph/langgraph/graph/message.py
libs/langgraph/langgraph/graph/message.py 源代码
class MessageGraph(StateGraph):
    """A StateGraph where every node receives a list of messages as input and returns one or more messages as output.

    MessageGraph is a subclass of StateGraph whose entire state is a single, append-only* list of messages.
    Each node in a MessageGraph takes a list of messages as input and returns zero or more
    messages as output. The `add_messages` function is used to merge the output messages from each node
    into the existing list of messages in the graph's state.

    Examples:
        ```pycon
        >>> from langgraph.graph.message import MessageGraph
        ...
        >>> builder = MessageGraph()
        >>> builder.add_node("chatbot", lambda state: [("assistant", "Hello!")])
        >>> builder.set_entry_point("chatbot")
        >>> builder.set_finish_point("chatbot")
        >>> builder.compile().invoke([("user", "Hi there.")])
        [HumanMessage(content="Hi there.", id='...'), AIMessage(content="Hello!", id='...')]
        ```

        ```pycon
        >>> from langchain_core.messages import AIMessage, HumanMessage, ToolMessage
        >>> from langgraph.graph.message import MessageGraph
        ...
        >>> builder = MessageGraph()
        >>> builder.add_node(
        ...     "chatbot",
        ...     lambda state: [
        ...         AIMessage(
        ...             content="Hello!",
        ...             tool_calls=[{"name": "search", "id": "123", "args": {"query": "X"}}],
        ...         )
        ...     ],
        ... )
        >>> builder.add_node(
        ...     "search", lambda state: [ToolMessage(content="Searching...", tool_call_id="123")]
        ... )
        >>> builder.set_entry_point("chatbot")
        >>> builder.add_edge("chatbot", "search")
        >>> builder.set_finish_point("search")
        >>> builder.compile().invoke([HumanMessage(content="Hi there. Can you search for X?")])
        {'messages': [HumanMessage(content="Hi there. Can you search for X?", id='b8b7d8f4-7f4d-4f4d-9c1d-f8b8d8f4d9c1'),
                     AIMessage(content="Hello!", id='f4d9c1d8-8d8f-4d9c-b8b7-d8f4f4d9c1d8'),
                     ToolMessage(content="Searching...", id='d8f4f4d9-c1d8-4f4d-b8b7-d8f4f4d9c1d8', tool_call_id="123")]}
        ```
    """

    def __init__(self) -> None:
        super().__init__(Annotated[list[AnyMessage], add_messages])

add_node(node, action=None, *, metadata=None, input=None, retry=None)
add_nodenode action=None * metadata=None input=None retry=None

Adds a new node to the state graph.
将新节点添加到状态图中。

Will take the name of the function/runnable as the node name.
将采用函数的名称/runnable 作为节点名称。

Parameters: 参数:

  • node (Union[str, RunnableLike)]) –

    The function or runnable this node will run.


    节点Union[str, RunnableLike)]) –

    此节点将运行的函数或可运行设备。

  • action (Optional[RunnableLike], default: None ) –

    The action associated with the node. (default: None)


    行动可选[RunnableLike],默认值: 没有 ) –

    与节点关联的操作。(默认值:无)

  • metadata (Optional[dict[str, Any]], default: None ) –

    The metadata associated with the node. (default: None)


    元数据可选[dict[strAny]],默认值: 没有 ) –

    与节点关联的元数据。(默认值:无)

  • input (Optional[Type[Any]], default: None ) –

    The input schema for the node. (default: the graph's input schema)


    输入可选[类型[任意]],默认值: 没有 ) –

    节点的输入架构。(默认值:图形的输入架构)

  • retry (Optional[RetryPolicy], default: None ) –

    The policy for retrying the node. (default: None)


    重试可选 [RetryPolicy],默认值: 没有 ) –

    重试节点的策略。(默认值:无)

Raises: ValueError: If the key is already being used as a state key.
加薪:ValueError:如果键已被用作状态键。

Examples: 例子:

>>> from langgraph.graph import START, StateGraph
...
>>> def my_node(state, config):
...    return {"x": state["x"] + 1}
...
>>> builder = StateGraph(dict)
>>> builder.add_node(my_node)  # node name will be 'my_node'
>>> builder.add_edge(START, "my_node")
>>> graph = builder.compile()
>>> graph.invoke({"x": 1})
{'x': 2}
Customize the name: 自定义名称:

>>> builder = StateGraph(dict)
>>> builder.add_node("my_fair_node", my_node)
>>> builder.add_edge(START, "my_fair_node")
>>> graph = builder.compile()
>>> graph.invoke({"x": 1})
{'x': 2}

Returns: 返回:

  • None

    None


    没有

    没有

Source code in libs/langgraph/langgraph/graph/state.py
libs/langgraph/langgraph/graph/state.py 源代码
def add_node(
    self,
    node: Union[str, RunnableLike],
    action: Optional[RunnableLike] = None,
    *,
    metadata: Optional[dict[str, Any]] = None,
    input: Optional[Type[Any]] = None,
    retry: Optional[RetryPolicy] = None,
) -> None:
    """Adds a new node to the state graph.

    Will take the name of the function/runnable as the node name.

    Args:
        node (Union[str, RunnableLike)]: The function or runnable this node will run.
        action (Optional[RunnableLike]): The action associated with the node. (default: None)
        metadata (Optional[dict[str, Any]]): The metadata associated with the node. (default: None)
        input (Optional[Type[Any]]): The input schema for the node. (default: the graph's input schema)
        retry (Optional[RetryPolicy]): The policy for retrying the node. (default: None)
    Raises:
        ValueError: If the key is already being used as a state key.


    Examples:
        ```pycon
        >>> from langgraph.graph import START, StateGraph
        ...
        >>> def my_node(state, config):
        ...    return {"x": state["x"] + 1}
        ...
        >>> builder = StateGraph(dict)
        >>> builder.add_node(my_node)  # node name will be 'my_node'
        >>> builder.add_edge(START, "my_node")
        >>> graph = builder.compile()
        >>> graph.invoke({"x": 1})
        {'x': 2}
        ```
        Customize the name:

        ```pycon
        >>> builder = StateGraph(dict)
        >>> builder.add_node("my_fair_node", my_node)
        >>> builder.add_edge(START, "my_fair_node")
        >>> graph = builder.compile()
        >>> graph.invoke({"x": 1})
        {'x': 2}
        ```

    Returns:
        None
    """
    if not isinstance(node, str):
        action = node
        if isinstance(action, Runnable):
            node = action.name
        else:
            node = getattr(action, "__name__", action.__class__.__name__)
        if node is None:
            raise ValueError(
                "Node name must be provided if action is not a function"
            )
    if node in self.channels:
        raise ValueError(f"'{node}' is already being used as a state key")
    if self.compiled:
        logger.warning(
            "Adding a node to a graph that has already been compiled. This will "
            "not be reflected in the compiled graph."
        )
    if not isinstance(node, str):
        action = node
        node = getattr(action, "name", action.__name__)
    if node in self.nodes:
        raise ValueError(f"Node `{node}` already present.")
    if node == END or node == START:
        raise ValueError(f"Node `{node}` is reserved.")
    try:
        if isfunction(action) and (
            hints := get_type_hints(action.__call__) or get_type_hints(action)
        ):
            if input is None:
                input_hint = hints[list(hints.keys())[0]]
                if isinstance(input_hint, type) and get_type_hints(input_hint):
                    input = input_hint
    except TypeError:
        pass
    if input is not None:
        self._add_schema(input)
    self.nodes[node] = StateNodeSpec(
        coerce_to_runnable(action, name=node, trace=False),
        metadata,
        input=input or self.schema,
        retry_policy=retry,
    )

add_edge(start_key, end_key)
add_edgestart_key end_key

Adds a directed edge from the start node to the end node.
将起点节点的有向边添加到终点节点。

If the graph transitions to the start_key node, it will always transition to the end_key node next.
如果图形过渡到start_key节点,则接下来将始终过渡到end_key节点。

Parameters: 参数:

  • start_key (Union[str, list[str]]) –

    The key(s) of the start node(s) of the edge.


    start_key联合[strlist[str]]) –

    Edge 的起始节点的键。

  • end_key (str) –

    The key of the end node of the edge.


    end_keystr) –

    Edge 的终端节点的键。

Raises: 提高:

  • ValueError

    If the start key is 'END' or if the start key or end key is not present in the graph.


    值错误

    如果开始键为“END”,或者图形中不存在开始键或结束键。

Returns: 返回:

  • None

    None


    没有

    没有

Source code in libs/langgraph/langgraph/graph/state.py
libs/langgraph/langgraph/graph/state.py 中的源代码
def add_edge(self, start_key: Union[str, list[str]], end_key: str) -> None:
    """Adds a directed edge from the start node to the end node.

    If the graph transitions to the start_key node, it will always transition to the end_key node next.

    Args:
        start_key (Union[str, list[str]]): The key(s) of the start node(s) of the edge.
        end_key (str): The key of the end node of the edge.

    Raises:
        ValueError: If the start key is 'END' or if the start key or end key is not present in the graph.

    Returns:
        None
    """
    if isinstance(start_key, str):
        return super().add_edge(start_key, end_key)

    if self.compiled:
        logger.warning(
            "Adding an edge to a graph that has already been compiled. This will "
            "not be reflected in the compiled graph."
        )
    for start in start_key:
        if start == END:
            raise ValueError("END cannot be a start node")
        if start not in self.nodes:
            raise ValueError(f"Need to add_node `{start}` first")
    if end_key == START:
        raise ValueError("START cannot be an end node")
    if end_key not in self.nodes:
        raise ValueError(f"Need to add_node `{end_key}` first")

    self.waiting_edges.add((tuple(start_key), end_key))

add_conditional_edges(source, path, path_map=None, then=None)
add_conditional_edgessource path path_map=None then=None

Add a conditional edge from the starting node to any number of destination nodes.
将条件边从起始节点添加到任意数量的目标节点。

Parameters: 参数:

  • source (str) –

    The starting node. This conditional edge will run when exiting this node.


    str) –

    起始节点。此条件边将在退出此节点时运行。

  • path (Union[Callable, Runnable]) –

    The callable that determines the next node or nodes. If not specifying path_map it should return one or more nodes. If it returns END, the graph will stop execution.


    路径联合[可调用可运行]) –

    确定下一个节点或节点的可调用对象。如果未指定 path_map 则应返回一个或多个节点。如果返回 END,则图形将停止执行。

  • path_map (Optional[dict[Hashable, str]], default: None ) –

    Optional mapping of paths to node names. If omitted the paths returned by path should be node names.


    path_map可选[dict[Hashablestr]],默认值: 没有 ) –

    可选的路径映射到节点名称。如果省略,则 path 返回的路径应为节点名称。

  • then (Optional[str], default: None ) –

    The name of a node to execute after the nodes selected by path.


    然后可选[str],默认值: 没有 ) –

    按路径选择的节点之后要执行的节点的名称。

Returns: 返回:

  • None

    None


    没有

    没有

Without typehints on the path function's return value (e.g., -> Literal["foo", "__end__"]:)
路径函数的返回值上没有类型提示(例如,-> Literal[“foo”, “__end__”]:

or a path_map, the graph visualization assumes the edge could transition to any node in the graph.
或者path_map,图形可视化假定边缘可以过渡到图形中的任何节点。

Source code in libs/langgraph/langgraph/graph/graph.py
libs/langgraph/langgraph/graph/graph.py 中的源代码
def add_conditional_edges(
    self,
    source: str,
    path: Union[
        Callable[..., Union[Hashable, list[Hashable]]],
        Callable[..., Awaitable[Union[Hashable, list[Hashable]]]],
        Runnable[Any, Union[Hashable, list[Hashable]]],
    ],
    path_map: Optional[Union[dict[Hashable, str], list[str]]] = None,
    then: Optional[str] = None,
) -> None:
    """Add a conditional edge from the starting node to any number of destination nodes.

    Args:
        source (str): The starting node. This conditional edge will run when
            exiting this node.
        path (Union[Callable, Runnable]): The callable that determines the next
            node or nodes. If not specifying `path_map` it should return one or
            more nodes. If it returns END, the graph will stop execution.
        path_map (Optional[dict[Hashable, str]]): Optional mapping of paths to node
            names. If omitted the paths returned by `path` should be node names.
        then (Optional[str]): The name of a node to execute after the nodes
            selected by `path`.

    Returns:
        None

    Note: Without typehints on the `path` function's return value (e.g., `-> Literal["foo", "__end__"]:`)
        or a path_map, the graph visualization assumes the edge could transition to any node in the graph.

    """  # noqa: E501
    if self.compiled:
        logger.warning(
            "Adding an edge to a graph that has already been compiled. This will "
            "not be reflected in the compiled graph."
        )
    # coerce path_map to a dictionary
    try:
        if isinstance(path_map, dict):
            path_map = path_map.copy()
        elif isinstance(path_map, list):
            path_map = {name: name for name in path_map}
        elif rtn_type := get_type_hints(path.__call__).get(
            "return"
        ) or get_type_hints(path).get("return"):
            if get_origin(rtn_type) is Literal:
                path_map = {name: name for name in get_args(rtn_type)}
    except Exception:
        pass
    # find a name for the condition
    path = coerce_to_runnable(path, name=None, trace=True)
    name = path.name or "condition"
    # validate the condition
    if name in self.branches[source]:
        raise ValueError(
            f"Branch with name `{path.name}` already exists for node " f"`{source}`"
        )
    # save it
    self.branches[source][name] = Branch(path, path_map, then)

set_entry_point(key)
set_entry_point(

Specifies the first node to be called in the graph.
指定要在关系图中调用的第一个节点。

Equivalent to calling add_edge(START, key).
等效于调用 add_edge(START, key)。

Parameters: 参数:

  • key (str) –

    The key of the node to set as the entry point.


    钥匙str) –

    要设置为入口点的节点的键。

Returns: 返回:

  • None

    None


    没有

    没有

Source code in libs/langgraph/langgraph/graph/graph.py
libs/langgraph/langgraph/graph/graph.py 中的源代码
def set_entry_point(self, key: str) -> None:
    """Specifies the first node to be called in the graph.

    Equivalent to calling `add_edge(START, key)`.

    Parameters:
        key (str): The key of the node to set as the entry point.

    Returns:
        None
    """
    return self.add_edge(START, key)

set_conditional_entry_point(path, path_map=None, then=None)
set_conditional_entry_pointpath path_map=None then=None

Sets a conditional entry point in the graph.
在图形中设置条件入口点。

Parameters: 参数:

  • path (Union[Callable, Runnable]) –

    The callable that determines the next node or nodes. If not specifying path_map it should return one or more nodes. If it returns END, the graph will stop execution.


    路径联合[可调用可运行]) –

    确定下一个节点或节点的可调用对象。如果未指定 path_map 则应返回一个或多个节点。如果返回 END,则图形将停止执行。

  • path_map (Optional[dict[str, str]], default: None ) –

    Optional mapping of paths to node names. If omitted the paths returned by path should be node names.


    path_map可选[dict[strstr]],默认值: 没有 ) –

    可选的路径映射到节点名称。如果省略,则 path 返回的路径应为节点名称。

  • then (Optional[str], default: None ) –

    The name of a node to execute after the nodes selected by path.


    然后可选[str],默认值: 没有 ) –

    按路径选择的节点之后要执行的节点的名称。

Returns: 返回:

  • None

    None


    没有

    没有

Source code in libs/langgraph/langgraph/graph/graph.py
libs/langgraph/langgraph/graph/graph.py 中的源代码
def set_conditional_entry_point(
    self,
    path: Union[
        Callable[..., Union[Hashable, list[Hashable]]],
        Callable[..., Awaitable[Union[Hashable, list[Hashable]]]],
        Runnable[Any, Union[Hashable, list[Hashable]]],
    ],
    path_map: Optional[Union[dict[Hashable, str], list[str]]] = None,
    then: Optional[str] = None,
) -> None:
    """Sets a conditional entry point in the graph.

    Args:
        path (Union[Callable, Runnable]): The callable that determines the next
            node or nodes. If not specifying `path_map` it should return one or
            more nodes. If it returns END, the graph will stop execution.
        path_map (Optional[dict[str, str]]): Optional mapping of paths to node
            names. If omitted the paths returned by `path` should be node names.
        then (Optional[str]): The name of a node to execute after the nodes
            selected by `path`.

    Returns:
        None
    """
    return self.add_conditional_edges(START, path, path_map, then)

set_finish_point(key)
set_finish_point(

Marks a node as a finish point of the graph.
将节点标记为图形的完成点。

If the graph reaches this node, it will cease execution.
如果图形到达此节点,它将停止执行。

Parameters: 参数:

  • key (str) –

    The key of the node to set as the finish point.


    钥匙str) –

    要设置为终点的节点键。

Returns: 返回:

  • None

    None


    没有

    没有

Source code in libs/langgraph/langgraph/graph/graph.py
libs/langgraph/langgraph/graph/graph.py 中的源代码
def set_finish_point(self, key: str) -> None:
    """Marks a node as a finish point of the graph.

    If the graph reaches this node, it will cease execution.

    Parameters:
        key (str): The key of the node to set as the finish point.

    Returns:
        None
    """
    return self.add_edge(key, END)

compile(checkpointer=None, interrupt_before=None, interrupt_after=None, debug=False)
compilecheckpointer=None interrupt_before=None interrupt_after=None debug=False

Compiles the state graph into a CompiledGraph object.
将状态图编译为 CompiledGraph 对象。

The compiled graph implements the Runnable interface and can be invoked, streamed, batched, and run asynchronously.
编译后的图形实现了 Runnable 接口,可以异步调用、流式处理、批处理和运行。

Parameters: 参数:

  • checkpointer (Optional[BaseCheckpointSaver], default: None ) –

    An optional checkpoint saver object. This serves as a fully versioned "memory" for the graph, allowing the graph to be paused and resumed, and replayed from any point.


    检查点可选 [BaseCheckpointSaver],默认值: 没有 ) –

    可选的检查点保护程序对象。这充当图形的完全版本控制的“内存”,允许图形暂停和恢复,并从任何点重放。

  • interrupt_before (Optional[Sequence[str]], default: None ) –

    An optional list of node names to interrupt before.


    interrupt_before可选[Sequence[str]],默认值: 没有 ) –

    要在之前中断的节点名称的可选列表。

  • interrupt_after (Optional[Sequence[str]], default: None ) –

    An optional list of node names to interrupt after.


    interrupt_after可选[Sequence[str]],默认值: 没有 ) –

    要中断的节点名称的可选列表。

  • debug (bool, default: False ) –

    A flag indicating whether to enable debug mode.


    调试布尔值,默认值: ) –

    指示是否启用调试模式的标志。

Returns: 返回:

  • CompiledStateGraph ( CompiledStateGraph ) –

    The compiled state graph.


    CompiledStateGraphCompiledStateGraph ) –

    编译的状态图。

Source code in libs/langgraph/langgraph/graph/state.py
libs/langgraph/langgraph/graph/state.py 中的源代码
def compile(
    self,
    checkpointer: Optional[BaseCheckpointSaver] = None,
    interrupt_before: Optional[Union[All, Sequence[str]]] = None,
    interrupt_after: Optional[Union[All, Sequence[str]]] = None,
    debug: bool = False,
) -> "CompiledStateGraph":
    """Compiles the state graph into a `CompiledGraph` object.

    The compiled graph implements the `Runnable` interface and can be invoked,
    streamed, batched, and run asynchronously.

    Args:
        checkpointer (Optional[BaseCheckpointSaver]): An optional checkpoint saver object.
            This serves as a fully versioned "memory" for the graph, allowing
            the graph to be paused and resumed, and replayed from any point.
        interrupt_before (Optional[Sequence[str]]): An optional list of node names to interrupt before.
        interrupt_after (Optional[Sequence[str]]): An optional list of node names to interrupt after.
        debug (bool): A flag indicating whether to enable debug mode.

    Returns:
        CompiledStateGraph: The compiled state graph.
    """
    # assign default values
    interrupt_before = interrupt_before or []
    interrupt_after = interrupt_after or []

    # validate the graph
    self.validate(
        interrupt=(
            (interrupt_before if interrupt_before != "*" else []) + interrupt_after
            if interrupt_after != "*"
            else []
        )
    )

    # prepare output channels
    output_channels = (
        "__root__"
        if len(self.schemas[self.output]) == 1
        and "__root__" in self.schemas[self.output]
        else [
            key
            for key, val in self.schemas[self.output].items()
            if not isinstance(val, Context) and not is_managed_value(val)
        ]
    )

    compiled = CompiledStateGraph(
        builder=self,
        config_type=self.config_schema,
        nodes={},
        channels={**self.channels, START: EphemeralValue(self.input)},
        input_channels=START,
        stream_mode="updates",
        output_channels=output_channels,
        stream_channels=output_channels,
        checkpointer=checkpointer,
        interrupt_before_nodes=interrupt_before,
        interrupt_after_nodes=interrupt_after,
        auto_validate=False,
        debug=debug,
    )

    compiled.attach_node(START, None)
    for key, node in self.nodes.items():
        compiled.attach_node(key, node)

    for start, end in self.edges:
        compiled.attach_edge(start, end)

    for starts, end in self.waiting_edges:
        compiled.attach_edge(starts, end)

    for start, branches in self.branches.items():
        for name, branch in branches.items():
            compiled.attach_branch(start, name, branch)

    return compiled.validate()

CompiledGraph
编译图

Bases: Pregel
基质:预凝胶

Source code in libs/langgraph/langgraph/graph/graph.py
libs/langgraph/langgraph/graph/graph.py 中的源代码
class CompiledGraph(Pregel):
    builder: Graph

    def attach_node(self, key: str, node: NodeSpec) -> None:
        self.channels[key] = EphemeralValue(Any)
        self.nodes[key] = (
            PregelNode(channels=[], triggers=[], metadata=node.metadata)
            | node.runnable
            | ChannelWrite([ChannelWriteEntry(key)], tags=[TAG_HIDDEN])
        )
        cast(list[str], self.stream_channels).append(key)

    def attach_edge(self, start: str, end: str) -> None:
        if end == END:
            # publish to end channel
            self.nodes[start].writers.append(
                ChannelWrite([ChannelWriteEntry(END)], tags=[TAG_HIDDEN])
            )
        else:
            # subscribe to start channel
            self.nodes[end].triggers.append(start)
            self.nodes[end].channels.append(start)

    def attach_branch(self, start: str, name: str, branch: Branch) -> None:
        def branch_writer(packets: list[Union[str, Send]]) -> Optional[ChannelWrite]:
            writes = [
                (
                    ChannelWriteEntry(f"branch:{start}:{name}:{p}" if p != END else END)
                    if not isinstance(p, Send)
                    else p
                )
                for p in packets
            ]
            return ChannelWrite(writes, tags=[TAG_HIDDEN])

        # add hidden start node
        if start == START and start not in self.nodes:
            self.nodes[start] = Channel.subscribe_to(START, tags=[TAG_HIDDEN])

        # attach branch writer
        self.nodes[start] |= branch.run(branch_writer)

        # attach branch readers
        ends = branch.ends.values() if branch.ends else [node for node in self.nodes]
        for end in ends:
            if end != END:
                channel_name = f"branch:{start}:{name}:{end}"
                self.channels[channel_name] = EphemeralValue(Any)
                self.nodes[end].triggers.append(channel_name)
                self.nodes[end].channels.append(channel_name)

    def get_graph(
        self,
        config: Optional[RunnableConfig] = None,
        *,
        xray: Union[int, bool] = False,
    ) -> DrawableGraph:
        """Returns a drawable representation of the computation graph."""
        graph = DrawableGraph()
        start_nodes: dict[str, DrawableNode] = {
            START: graph.add_node(self.get_input_schema(config), START)
        }
        end_nodes: dict[str, DrawableNode] = {}

        def add_edge(
            start: str, end: str, label: Optional[str] = None, conditional: bool = False
        ) -> None:
            if end == END and END not in end_nodes:
                end_nodes[END] = graph.add_node(self.get_output_schema(config), END)
            return graph.add_edge(
                start_nodes[start], end_nodes[end], label, conditional
            )

        for key, n in self.builder.nodes.items():
            node = n.runnable
            if xray:
                subgraph = (
                    node.get_graph(
                        config=config,
                        xray=xray - 1 if isinstance(xray, int) and xray > 0 else xray,
                    )
                    if isinstance(node, CompiledGraph)
                    else node.get_graph(config=config)
                )
                subgraph.trim_first_node()
                subgraph.trim_last_node()
                if len(subgraph.nodes) > 1:
                    end_nodes[key], start_nodes[key] = graph.extend(
                        subgraph, prefix=key
                    )
                else:
                    n = graph.add_node(node, key)
                    start_nodes[key] = n
                    end_nodes[key] = n
            else:
                n = graph.add_node(node, key, metadata=n.metadata)
                start_nodes[key] = n
                end_nodes[key] = n
        for start, end in sorted(self.builder._all_edges):
            add_edge(start, end)
        for start, branches in self.builder.branches.items():
            default_ends = {
                **{k: k for k in self.builder.nodes if k != start},
                END: END,
            }
            for _, branch in branches.items():
                if branch.ends is not None:
                    ends = branch.ends
                elif branch.then is not None:
                    ends = {k: k for k in default_ends if k not in (END, branch.then)}
                else:
                    ends = default_ends
                for label, end in ends.items():
                    add_edge(
                        start,
                        end,
                        label if label != end else None,
                        conditional=True,
                    )
                    if branch.then is not None:
                        add_edge(end, branch.then)

        return graph

stream_mode: StreamMode = 'values' class-attribute instance-attribute
stream_mode StreamMode = 'values' 类-属性 instance-attribute

Mode to stream output, defaults to 'values'.
流式传输输出的模式,默认为 'values'。

stream_channels: Optional[Union[str, Sequence[str]]] = None class-attribute instance-attribute
stream_channels Optional[Union[str Sequence[str]]] = 类属性 instance-attribute

Channels to stream, defaults to all channels not in reserved channels
要流式传输的频道,默认为不在预留频道中的所有频道

step_timeout: Optional[float] = None class-attribute instance-attribute
step_timeout Optional[float] = None 类-属性 instance-attribute

Maximum time to wait for a step to complete, in seconds. Defaults to None.
等待步骤完成的最长时间(以秒为单位)。默认值为 None。

debug: bool = Field(default_factory=get_debug) class-attribute instance-attribute
debug bool = Fielddefault_factory=get_debug 类-属性 instance-attribute

Whether to print debug information during execution. Defaults to False.
是否在执行过程中打印调试信息。默认值为 False。

checkpointer: Optional[BaseCheckpointSaver] = None class-attribute instance-attribute
checkpointer Optional[BaseCheckpointSaver] = 类属性 instance-attribute

Checkpointer used to save and load graph state. Defaults to None.
Checkpointer 用于保存和加载图形状态。默认值为 None。

retry_policy: Optional[RetryPolicy] = None class-attribute instance-attribute
retry_policy Optional[RetryPolicy] = None 类-属性 instance-attribute

Retry policy to use when running tasks. Set to None to disable.
重试运行任务时使用的策略。设置为“无”可禁用。

is_lc_serializable() classmethod
is_lc_serializable() 类方法

Return whether the graph can be serialized by Langchain.
返回图形是否可以被Langchain序列化。

Source code in libs/langgraph/langgraph/pregel/__init__.py
libs/langgraph/langgraph/pregel/__init__.py中的源代码
@classmethod
def is_lc_serializable(cls) -> bool:
    """Return whether the graph can be serialized by Langchain."""
    return True

get_state(config)
get_state(配置

Get the current state of the graph.
获取图形的当前状态。

Source code in libs/langgraph/langgraph/pregel/__init__.py
libs/langgraph/langgraph/pregel/__init__.py中的源代码
def get_state(self, config: RunnableConfig) -> StateSnapshot:
    """Get the current state of the graph."""
    if not self.checkpointer:
        raise ValueError("No checkpointer set")

    saved = self.checkpointer.get_tuple(config)
    checkpoint = saved.checkpoint if saved else empty_checkpoint()
    config = saved.config if saved else config
    with ChannelsManager(
        self.channels, checkpoint, config
    ) as channels, ManagedValuesManager(
        self.managed_values_dict, ensure_config(config), self
    ) as managed:
        next_tasks = prepare_next_tasks(
            checkpoint,
            self.nodes,
            channels,
            managed,
            config,
            -1,
            for_execution=False,
        )
        return StateSnapshot(
            read_channels(channels, self.stream_channels_asis),
            tuple(name for name, _ in next_tasks),
            saved.config if saved else config,
            saved.metadata if saved else None,
            saved.checkpoint["ts"] if saved else None,
            saved.parent_config if saved else None,
        )

aget_state(config) async
aget_stateconfig 异步

Get the current state of the graph.
获取图形的当前状态。

Source code in libs/langgraph/langgraph/pregel/__init__.py
libs/langgraph/langgraph/pregel/__init__.py中的源代码
async def aget_state(self, config: RunnableConfig) -> StateSnapshot:
    """Get the current state of the graph."""
    if not self.checkpointer:
        raise ValueError("No checkpointer set")

    saved = await self.checkpointer.aget_tuple(config)
    checkpoint = saved.checkpoint if saved else empty_checkpoint()

    config = saved.config if saved else config
    async with AsyncChannelsManager(
        self.channels, checkpoint, config
    ) as channels, AsyncManagedValuesManager(
        self.managed_values_dict, ensure_config(config), self
    ) as managed:
        next_tasks = prepare_next_tasks(
            checkpoint,
            self.nodes,
            channels,
            managed,
            config,
            -1,
            for_execution=False,
        )
        return StateSnapshot(
            read_channels(channels, self.stream_channels_asis),
            tuple(name for name, _ in next_tasks),
            saved.config if saved else config,
            saved.metadata if saved else None,
            saved.checkpoint["ts"] if saved else None,
            saved.parent_config if saved else None,
        )

get_state_history(config, *, filter=None, before=None, limit=None)
get_state_historyconfig * filter=None before=None limit=None

Get the history of the state of the graph.
获取图形状态的历史记录。

Source code in libs/langgraph/langgraph/pregel/__init__.py
libs/langgraph/langgraph/pregel/__init__.py中的源代码
def get_state_history(
    self,
    config: RunnableConfig,
    *,
    filter: Optional[Dict[str, Any]] = None,
    before: Optional[RunnableConfig] = None,
    limit: Optional[int] = None,
) -> Iterator[StateSnapshot]:
    """Get the history of the state of the graph."""
    if not self.checkpointer:
        raise ValueError("No checkpointer set")
    if (
        filter is not None
        and signature(self.checkpointer.list).parameters.get("filter") is None
    ):
        raise ValueError("Checkpointer does not support filtering")
    for config, checkpoint, metadata, parent_config, _ in self.checkpointer.list(
        config, before=before, limit=limit, filter=filter
    ):
        with ChannelsManager(
            self.channels, checkpoint, config
        ) as channels, ManagedValuesManager(
            self.managed_values_dict, ensure_config(config), self
        ) as managed:
            next_tasks = prepare_next_tasks(
                checkpoint,
                self.nodes,
                channels,
                managed,
                config,
                -1,
                for_execution=False,
            )
            yield StateSnapshot(
                read_channels(channels, self.stream_channels_asis),
                tuple(name for name, _ in next_tasks),
                config,
                metadata,
                checkpoint["ts"],
                parent_config,
            )

aget_state_history(config, *, filter=None, before=None, limit=None) async
aget_state_historyconfig * filter=None before=None limit=None 异步

Get the history of the state of the graph.
获取图形状态的历史记录。

Source code in libs/langgraph/langgraph/pregel/__init__.py
libs/langgraph/langgraph/pregel/__init__.py中的源代码
async def aget_state_history(
    self,
    config: RunnableConfig,
    *,
    filter: Optional[Dict[str, Any]] = None,
    before: Optional[RunnableConfig] = None,
    limit: Optional[int] = None,
) -> AsyncIterator[StateSnapshot]:
    """Get the history of the state of the graph."""
    if not self.checkpointer:
        raise ValueError("No checkpointer set")
    if (
        filter is not None
        and signature(self.checkpointer.list).parameters.get("filter") is None
    ):
        raise ValueError("Checkpointer does not support filtering")
    async for (
        config,
        checkpoint,
        metadata,
        parent_config,
        _,
    ) in self.checkpointer.alist(config, before=before, limit=limit, filter=filter):
        async with AsyncChannelsManager(
            self.channels, checkpoint, config
        ) as channels, AsyncManagedValuesManager(
            self.managed_values_dict, ensure_config(config), self
        ) as managed:
            next_tasks = prepare_next_tasks(
                checkpoint,
                self.nodes,
                channels,
                managed,
                config,
                -1,
                for_execution=False,
            )
            yield StateSnapshot(
                read_channels(channels, self.stream_channels_asis),
                tuple(name for name, _ in next_tasks),
                config,
                metadata,
                checkpoint["ts"],
                parent_config,
            )

update_state(config, values, as_node=None)
update_stateconfig values as_node=None

Update the state of the graph with the given values, as if they came from node as_node. If as_node is not provided, it will be set to the last node that updated the state, if not ambiguous.
使用给定的值更新图形的状态,就好像它们来自节点 as_node一样。如果未提供 as_node,则将设置为更新状态的最后一个节点(如果不是不明确)。

Source code in libs/langgraph/langgraph/pregel/__init__.py
libs/langgraph/langgraph/pregel/__init__.py中的源代码
def update_state(
    self,
    config: RunnableConfig,
    values: dict[str, Any] | Any,
    as_node: Optional[str] = None,
) -> RunnableConfig:
    """Update the state of the graph with the given values, as if they came from
    node `as_node`. If `as_node` is not provided, it will be set to the last node
    that updated the state, if not ambiguous.
    """
    if not self.checkpointer:
        raise ValueError("No checkpointer set")

    # get last checkpoint
    saved = self.checkpointer.get_tuple(config)
    checkpoint = copy_checkpoint(saved.checkpoint) if saved else empty_checkpoint()
    # find last node that updated the state, if not provided
    if as_node is None and not any(
        v for vv in checkpoint["versions_seen"].values() for v in vv.values()
    ):
        if (
            isinstance(self.input_channels, str)
            and self.input_channels in self.nodes
        ):
            as_node = self.input_channels
    elif as_node is None:
        last_seen_by_node = sorted(
            (v, n)
            for n, seen in checkpoint["versions_seen"].items()
            for v in seen.values()
        )
        # if two nodes updated the state at the same time, it's ambiguous
        if last_seen_by_node:
            if len(last_seen_by_node) == 1:
                as_node = last_seen_by_node[0][1]
            elif last_seen_by_node[-1][0] != last_seen_by_node[-2][0]:
                as_node = last_seen_by_node[-1][1]
    if as_node is None:
        raise InvalidUpdateError("Ambiguous update, specify as_node")
    if as_node not in self.nodes:
        raise InvalidUpdateError(f"Node {as_node} does not exist")
    # update channels
    with ChannelsManager(self.channels, checkpoint, config) as channels:
        # create task to run all writers of the chosen node
        writers = self.nodes[as_node].get_writers()
        if not writers:
            raise InvalidUpdateError(f"Node {as_node} has no writers")
        task = PregelExecutableTask(
            as_node,
            values,
            RunnableSequence(*writers) if len(writers) > 1 else writers[0],
            deque(),
            None,
            [INTERRUPT],
            None,
            str(uuid5(UUID(checkpoint["id"]), INTERRUPT)),
        )
        # execute task
        task.proc.invoke(
            task.input,
            patch_config(
                config,
                run_name=self.name + "UpdateState",
                configurable={
                    # deque.extend is thread-safe
                    CONFIG_KEY_SEND: task.writes.extend,
                    CONFIG_KEY_READ: partial(
                        local_read, checkpoint, channels, task, config
                    ),
                },
            ),
        )
        # apply to checkpoint and save
        apply_writes(
            checkpoint, channels, [task], self.checkpointer.get_next_version
        )
        step = saved.metadata.get("step", -2) + 1 if saved else -1

        # merge configurable fields with previous checkpoint config
        checkpoint_config = config
        if saved:
            checkpoint_config = {
                "configurable": {
                    **config.get("configurable", {}),
                    **saved.config["configurable"],
                }
            }

        return self.checkpointer.put(
            checkpoint_config,
            create_checkpoint(checkpoint, channels, step),
            {
                "source": "update",
                "step": step,
                "writes": {as_node: values},
            },
        )

stream(input, config=None, *, stream_mode=None, output_keys=None, interrupt_before=None, interrupt_after=None, debug=None)
streaminput config=None * stream_mode=None output_keys=None interrupt_before=None interrupt_after=None debug=None

Stream graph steps for a single input.
单个输入的流图步长。

Parameters: 参数:

  • input (Union[dict[str, Any], Any]) –

    The input to the graph.


    输入联合[dict[strany], any]) –

    图形的输入。

  • config (Optional[RunnableConfig], default: None ) –

    The configuration to use for the run.


    配置可选 [RunnableConfig],默认值: 没有 ) –

    要用于运行的配置。

  • stream_mode (Optional[Union[StreamMode, list[StreamMode]]], default: None ) –

    The mode to stream output, defaults to self.stream_mode. Options are 'values', 'updates', and 'debug'. values: Emit the current values of the state for each step. updates: Emit only the updates to the state for each step. Output is a dict with the node name as key and the updated values as value. debug: Emit debug events for each step.


    stream_mode可选 [Union[StreamModelist[StreamMode]]],默认值: 没有 ) –

    流式传输输出的模式,默认为 self.stream_mode。选项包括“values”、“updates”和“debug”。values:发出每个步骤的状态当前值。updates:仅发出对每个步骤的状态的更新。输出是一个字典,节点名称作为键,更新的值作为值。debug:为每个步骤发出调试事件。

  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    The keys to stream, defaults to all non-context channels.


    output_keys可选[Union[strSequence[str]]],默认值: 没有 ) –

    要流式传输的键,默认为所有非上下文通道。

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    Nodes to interrupt before, defaults to all nodes in the graph.


    interrupt_before可选[Union[AllSequence[str]]],默认值: 没有 ) –

    之前要中断的节点,默认为图中的所有节点。

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    Nodes to interrupt after, defaults to all nodes in the graph.


    interrupt_after可选[Union[AllSequence[str]]],默认值: 没有 ) –

    要中断的节点,默认为图中的所有节点。

  • debug (Optional[bool], default: None ) –

    Whether to print debug information during execution, defaults to False.


    调试可选[bool],默认值: 没有 ) –

    执行过程中是否打印调试信息,默认为 False。

Yields: 收益 率:

  • Union[dict[str, Any], Any]

    The output of each step in the graph. The output shape depends on the stream_mode.


    联合[dict[strany], any]

    图中每个步骤的输出。输出形状取决于stream_mode。

Examples: 例子:

Using different stream modes with a graph:
对图形使用不同的流模式:

>>> import operator
>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph
>>> from langgraph.constants import START
...
>>> class State(TypedDict):
...     alist: Annotated[list, operator.add]
...     another_list: Annotated[list, operator.add]
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", lambda _state: {"another_list": ["hi"]})
>>> builder.add_node("b", lambda _state: {"alist": ["there"]})
>>> builder.add_edge("a", "b")
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
With stream_mode="values":
使用 stream_mode=“values”:

>>> for event in graph.stream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
...     print(event)
{'alist': ['Ex for stream_mode="values"'], 'another_list': []}
{'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
{'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
With stream_mode="updates":
使用 stream_mode=“updates”:

>>> for event in graph.stream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
...     print(event)
{'a': {'another_list': ['hi']}}
{'b': {'alist': ['there']}}
With stream_mode="debug":
使用 stream_mode=“debug”:

>>> for event in graph.stream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
...     print(event)
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
Source code in libs/langgraph/langgraph/pregel/__init__.py
libs/langgraph/langgraph/pregel/__init__.py中的源代码
def stream(
    self,
    input: Union[dict[str, Any], Any],
    config: Optional[RunnableConfig] = None,
    *,
    stream_mode: Optional[Union[StreamMode, list[StreamMode]]] = None,
    output_keys: Optional[Union[str, Sequence[str]]] = None,
    interrupt_before: Optional[Union[All, Sequence[str]]] = None,
    interrupt_after: Optional[Union[All, Sequence[str]]] = None,
    debug: Optional[bool] = None,
) -> Iterator[Union[dict[str, Any], Any]]:
    """Stream graph steps for a single input.

    Args:
        input: The input to the graph.
        config: The configuration to use for the run.
        stream_mode: The mode to stream output, defaults to self.stream_mode.
            Options are 'values', 'updates', and 'debug'.
            values: Emit the current values of the state for each step.
            updates: Emit only the updates to the state for each step.
                Output is a dict with the node name as key and the updated values as value.
            debug: Emit debug events for each step.
        output_keys: The keys to stream, defaults to all non-context channels.
        interrupt_before: Nodes to interrupt before, defaults to all nodes in the graph.
        interrupt_after: Nodes to interrupt after, defaults to all nodes in the graph.
        debug: Whether to print debug information during execution, defaults to False.

    Yields:
        The output of each step in the graph. The output shape depends on the stream_mode.

    Examples:
        Using different stream modes with a graph:
        ```pycon
        >>> import operator
        >>> from typing_extensions import Annotated, TypedDict
        >>> from langgraph.graph import StateGraph
        >>> from langgraph.constants import START
        ...
        >>> class State(TypedDict):
        ...     alist: Annotated[list, operator.add]
        ...     another_list: Annotated[list, operator.add]
        ...
        >>> builder = StateGraph(State)
        >>> builder.add_node("a", lambda _state: {"another_list": ["hi"]})
        >>> builder.add_node("b", lambda _state: {"alist": ["there"]})
        >>> builder.add_edge("a", "b")
        >>> builder.add_edge(START, "a")
        >>> graph = builder.compile()
        ```
        With stream_mode="values":

        ```pycon
        >>> for event in graph.stream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
        ...     print(event)
        {'alist': ['Ex for stream_mode="values"'], 'another_list': []}
        {'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
        {'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
        ```
        With stream_mode="updates":

        ```pycon
        >>> for event in graph.stream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
        ...     print(event)
        {'a': {'another_list': ['hi']}}
        {'b': {'alist': ['there']}}
        ```
        With stream_mode="debug":

        ```pycon
        >>> for event in graph.stream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
        ...     print(event)
        {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
        {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
        {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
        {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
        ```
    """
    config = ensure_config(config)
    callback_manager = get_callback_manager_for_config(config)
    run_manager = callback_manager.on_chain_start(
        dumpd(self),
        input,
        name=config.get("run_name", self.get_name()),
        run_id=config.get("run_id"),
    )
    try:
        if config["recursion_limit"] < 1:
            raise ValueError("recursion_limit must be at least 1")
        if self.checkpointer and not config.get("configurable"):
            raise ValueError(
                f"Checkpointer requires one or more of the following 'configurable' keys: {[s.id for s in self.checkpointer.config_specs]}"
            )
        # assign defaults
        (
            debug,
            stream_modes,
            output_keys,
            interrupt_before,
            interrupt_after,
            checkpointer,
        ) = self._defaults(
            config,
            stream_mode=stream_mode,
            output_keys=output_keys,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            debug=debug,
        )

        with SyncPregelLoop(
            input, config=config, checkpointer=checkpointer, graph=self
        ) as loop:
            # Similarly to Bulk Synchronous Parallel / Pregel model
            # computation proceeds in steps, while there are channel updates
            # channel updates from step N are only visible in step N+1
            # channels are guaranteed to be immutable for the duration of the step,
            # with channel updates applied only at the transition between steps
            while loop.tick(
                output_keys=output_keys,
                interrupt_before=interrupt_before,
                interrupt_after=interrupt_after,
                manager=run_manager,
            ):
                # debug flag
                if self.debug:
                    print_step_checkpoint(
                        loop.checkpoint_metadata,
                        loop.channels,
                        self.stream_channels_list,
                    )
                # emit output
                while loop.stream:
                    mode, payload = loop.stream.popleft()
                    if mode in stream_modes:
                        if isinstance(stream_mode, list):
                            yield (mode, payload)
                        else:
                            yield payload
                # debug flag
                if debug:
                    print_step_tasks(loop.step, loop.tasks)

                # execute tasks, and wait for one to fail or all to finish.
                # each task is independent from all other concurrent tasks
                # yield updates/debug output as each task finishes
                futures = {
                    loop.submit(
                        run_with_retry,
                        task,
                        self.retry_policy,
                    ): task
                    for task in loop.tasks
                    if not task.writes
                }
                end_time = (
                    self.step_timeout + time.monotonic()
                    if self.step_timeout
                    else None
                )
                if not futures:
                    done, inflight = set(), set()
                while futures:
                    done, inflight = concurrent.futures.wait(
                        futures,
                        return_when=concurrent.futures.FIRST_COMPLETED,
                        timeout=(
                            max(0, end_time - time.monotonic())
                            if end_time
                            else None
                        ),
                    )
                    if not done:
                        break  # timed out
                    for fut in done:
                        task = futures.pop(fut)
                        if fut.exception() is not None:
                            # we got an exception, break out of while loop
                            # exception will be handled in panic_or_proceed
                            futures.clear()
                        else:
                            # save task writes to checkpointer
                            loop.put_writes(task.id, task.writes)
                            # yield updates output for the finished task
                            if "updates" in stream_modes:
                                yield from _with_mode(
                                    "updates",
                                    isinstance(stream_mode, list),
                                    map_output_updates(output_keys, [task]),
                                )
                            if "debug" in stream_modes:
                                yield from _with_mode(
                                    "debug",
                                    isinstance(stream_mode, list),
                                    map_debug_task_results(
                                        loop.step,
                                        [task],
                                        self.stream_channels_list,
                                    ),
                                )
                    else:
                        # remove references to loop vars
                        del fut, task

                # panic on failure or timeout
                _panic_or_proceed(done, inflight, loop.step)
                # don't keep futures around in memory longer than needed
                del done, inflight, futures
                # debug flag
                if debug:
                    print_step_writes(
                        loop.step,
                        [w for t in loop.tasks for w in t.writes],
                        self.stream_channels_list,
                    )
            # emit output
            while loop.stream:
                mode, payload = loop.stream.popleft()
                if mode in stream_modes:
                    if isinstance(stream_mode, list):
                        yield (mode, payload)
                    else:
                        yield payload
            # handle exit
            if loop.status == "out_of_steps":
                raise GraphRecursionError(
                    f"Recursion limit of {config['recursion_limit']} reached "
                    "without hitting a stop condition. You can increase the "
                    "limit by setting the `recursion_limit` config key."
                )
            # set final channel values as run output
            run_manager.on_chain_end(read_channels(loop.channels, output_keys))
    except BaseException as e:
        run_manager.on_chain_error(e)
        raise

astream(input, config=None, *, stream_mode=None, output_keys=None, interrupt_before=None, interrupt_after=None, debug=None) async
astreaminput config= * stream_mode= output_keys= interrupt_before= interrupt_after=None debug=None 异步

Stream graph steps for a single input.
单个输入的流图步长。

Parameters: 参数:

  • input (Union[dict[str, Any], Any]) –

    The input to the graph.


    输入联合[dict[strany], any]) –

    图形的输入。

  • config (Optional[RunnableConfig], default: None ) –

    The configuration to use for the run.


    配置可选 [RunnableConfig],默认值: 没有 ) –

    要用于运行的配置。

  • stream_mode (Optional[Union[StreamMode, list[StreamMode]]], default: None ) –

    The mode to stream output, defaults to self.stream_mode. Options are 'values', 'updates', and 'debug'. values: Emit the current values of the state for each step. updates: Emit only the updates to the state for each step. Output is a dict with the node name as key and the updated values as value. debug: Emit debug events for each step.


    stream_mode可选 [Union[StreamModelist[StreamMode]]],默认值: 没有 ) –

    流式传输输出的模式,默认为 self.stream_mode。选项包括“values”、“updates”和“debug”。values:发出每个步骤的状态当前值。updates:仅发出对每个步骤的状态的更新。输出是一个字典,节点名称作为键,更新的值作为值。debug:为每个步骤发出调试事件。

  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    The keys to stream, defaults to all non-context channels.


    output_keys可选[Union[strSequence[str]]],默认值: 没有 ) –

    要流式传输的键,默认为所有非上下文通道。

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    Nodes to interrupt before, defaults to all nodes in the graph.


    interrupt_before可选[Union[AllSequence[str]]],默认值: 没有 ) –

    之前要中断的节点,默认为图中的所有节点。

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    Nodes to interrupt after, defaults to all nodes in the graph.


    interrupt_after可选[Union[AllSequence[str]]],默认值: 没有 ) –

    要中断的节点,默认为图中的所有节点。

  • debug (Optional[bool], default: None ) –

    Whether to print debug information during execution, defaults to False.


    调试可选[bool],默认值: 没有 ) –

    执行过程中是否打印调试信息,默认为 False。

Yields: 收益 率:

Examples: 例子:

Using different stream modes with a graph:
对图形使用不同的流模式:

>>> import operator
>>> from typing_extensions import Annotated, TypedDict
>>> from langgraph.graph import StateGraph
>>> from langgraph.constants import START
...
>>> class State(TypedDict):
...     alist: Annotated[list, operator.add]
...     another_list: Annotated[list, operator.add]
...
>>> builder = StateGraph(State)
>>> builder.add_node("a", lambda _state: {"another_list": ["hi"]})
>>> builder.add_node("b", lambda _state: {"alist": ["there"]})
>>> builder.add_edge("a", "b")
>>> builder.add_edge(START, "a")
>>> graph = builder.compile()
With stream_mode="values":
使用 stream_mode=“values”:

>>> async for event in graph.astream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
...     print(event)
{'alist': ['Ex for stream_mode="values"'], 'another_list': []}
{'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
{'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
With stream_mode="updates":
使用 stream_mode=“updates”:

>>> async for event in graph.astream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
...     print(event)
{'a': {'another_list': ['hi']}}
{'b': {'alist': ['there']}}
With stream_mode="debug":
使用 stream_mode=“debug”:

>>> async for event in graph.astream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
...     print(event)
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
{'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
{'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
Source code in libs/langgraph/langgraph/pregel/__init__.py
libs/langgraph/langgraph/pregel/__init__.py中的源代码
async def astream(
    self,
    input: Union[dict[str, Any], Any],
    config: Optional[RunnableConfig] = None,
    *,
    stream_mode: Optional[Union[StreamMode, list[StreamMode]]] = None,
    output_keys: Optional[Union[str, Sequence[str]]] = None,
    interrupt_before: Optional[Union[All, Sequence[str]]] = None,
    interrupt_after: Optional[Union[All, Sequence[str]]] = None,
    debug: Optional[bool] = None,
) -> AsyncIterator[Union[dict[str, Any], Any]]:
    """Stream graph steps for a single input.

    Args:
        input: The input to the graph.
        config: The configuration to use for the run.
        stream_mode: The mode to stream output, defaults to self.stream_mode.
            Options are 'values', 'updates', and 'debug'.
            values: Emit the current values of the state for each step.
            updates: Emit only the updates to the state for each step.
                Output is a dict with the node name as key and the updated values as value.
            debug: Emit debug events for each step.
        output_keys: The keys to stream, defaults to all non-context channels.
        interrupt_before: Nodes to interrupt before, defaults to all nodes in the graph.
        interrupt_after: Nodes to interrupt after, defaults to all nodes in the graph.
        debug: Whether to print debug information during execution, defaults to False.

    Yields:
        The output of each step in the graph. The output shape depends on the stream_mode.

    Examples:
        Using different stream modes with a graph:
        ```pycon
        >>> import operator
        >>> from typing_extensions import Annotated, TypedDict
        >>> from langgraph.graph import StateGraph
        >>> from langgraph.constants import START
        ...
        >>> class State(TypedDict):
        ...     alist: Annotated[list, operator.add]
        ...     another_list: Annotated[list, operator.add]
        ...
        >>> builder = StateGraph(State)
        >>> builder.add_node("a", lambda _state: {"another_list": ["hi"]})
        >>> builder.add_node("b", lambda _state: {"alist": ["there"]})
        >>> builder.add_edge("a", "b")
        >>> builder.add_edge(START, "a")
        >>> graph = builder.compile()
        ```
        With stream_mode="values":

        ```pycon
        >>> async for event in graph.astream({"alist": ['Ex for stream_mode="values"']}, stream_mode="values"):
        ...     print(event)
        {'alist': ['Ex for stream_mode="values"'], 'another_list': []}
        {'alist': ['Ex for stream_mode="values"'], 'another_list': ['hi']}
        {'alist': ['Ex for stream_mode="values"', 'there'], 'another_list': ['hi']}
        ```
        With stream_mode="updates":

        ```pycon
        >>> async for event in graph.astream({"alist": ['Ex for stream_mode="updates"']}, stream_mode="updates"):
        ...     print(event)
        {'a': {'another_list': ['hi']}}
        {'b': {'alist': ['there']}}
        ```
        With stream_mode="debug":

        ```pycon
        >>> async for event in graph.astream({"alist": ['Ex for stream_mode="debug"']}, stream_mode="debug"):
        ...     print(event)
        {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': []}, 'triggers': ['start:a']}}
        {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 1, 'payload': {'id': '...', 'name': 'a', 'result': [('another_list', ['hi'])]}}
        {'type': 'task', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'input': {'alist': ['Ex for stream_mode="debug"'], 'another_list': ['hi']}, 'triggers': ['a']}}
        {'type': 'task_result', 'timestamp': '2024-06-23T...+00:00', 'step': 2, 'payload': {'id': '...', 'name': 'b', 'result': [('alist', ['there'])]}}
        ```
    """
    config = ensure_config(config)
    callback_manager = get_async_callback_manager_for_config(config)
    run_manager = await callback_manager.on_chain_start(
        dumpd(self),
        input,
        name=config.get("run_name", self.get_name()),
        run_id=config.get("run_id"),
    )
    # if running from astream_log() run each proc with streaming
    do_stream = next(
        (
            h
            for h in run_manager.handlers
            if isinstance(h, _StreamingCallbackHandler)
        ),
        None,
    )
    try:
        if config["recursion_limit"] < 1:
            raise ValueError("recursion_limit must be at least 1")
        if self.checkpointer and not config.get("configurable"):
            raise ValueError(
                f"Checkpointer requires one or more of the following 'configurable' keys: {[s.id for s in self.checkpointer.config_specs]}"
            )
        # assign defaults
        (
            debug,
            stream_modes,
            output_keys,
            interrupt_before,
            interrupt_after,
            checkpointer,
        ) = self._defaults(
            config,
            stream_mode=stream_mode,
            output_keys=output_keys,
            interrupt_before=interrupt_before,
            interrupt_after=interrupt_after,
            debug=debug,
        )
        async with AsyncPregelLoop(
            input, config=config, checkpointer=checkpointer, graph=self
        ) as loop:
            aioloop = asyncio.get_event_loop()
            # Similarly to Bulk Synchronous Parallel / Pregel model
            # computation proceeds in steps, while there are channel updates
            # channel updates from step N are only visible in step N+1
            # channels are guaranteed to be immutable for the duration of the step,
            # with channel updates applied only at the transition between steps
            while loop.tick(
                output_keys=output_keys,
                interrupt_before=interrupt_before,
                interrupt_after=interrupt_after,
                manager=run_manager,
            ):
                # debug flag
                if self.debug:
                    print_step_checkpoint(
                        loop.checkpoint_metadata,
                        loop.channels,
                        self.stream_channels_list,
                    )
                # emit output
                while loop.stream:
                    mode, payload = loop.stream.popleft()
                    if mode in stream_modes:
                        if isinstance(stream_mode, list):
                            yield (mode, payload)
                        else:
                            yield payload
                # debug flag
                if debug:
                    print_step_tasks(loop.step, loop.tasks)

                # execute tasks, and wait for one to fail or all to finish.
                # each task is independent from all other concurrent tasks
                # yield updates/debug output as each task finishes
                futures = {
                    loop.submit(
                        arun_with_retry,
                        task,
                        self.retry_policy,
                        stream=do_stream,
                        __name__=task.name,
                        __cancel_on_exit__=True,
                    ): task
                    for task in loop.tasks
                    if not task.writes
                }
                end_time = (
                    self.step_timeout + aioloop.time()
                    if self.step_timeout
                    else None
                )
                if not futures:
                    done, inflight = set(), set()
                while futures:
                    done, inflight = await asyncio.wait(
                        futures,
                        return_when=asyncio.FIRST_COMPLETED,
                        timeout=(
                            max(0, end_time - aioloop.time()) if end_time else None
                        ),
                    )
                    if not done:
                        break  # timed out
                    for fut in done:
                        task = futures.pop(fut)
                        if fut.exception() is not None:
                            # we got an exception, break out of while loop
                            # exception will be handled in panic_or_proceed
                            futures.clear()
                        else:
                            # save task writes to checkpointer
                            loop.put_writes(task.id, task.writes)
                            # yield updates output for the finished task
                            if "updates" in stream_modes:
                                for chunk in _with_mode(
                                    "updates",
                                    isinstance(stream_mode, list),
                                    map_output_updates(output_keys, [task]),
                                ):
                                    yield chunk
                            if "debug" in stream_modes:
                                for chunk in _with_mode(
                                    "debug",
                                    isinstance(stream_mode, list),
                                    map_debug_task_results(
                                        loop.step,
                                        [task],
                                        self.stream_channels_list,
                                    ),
                                ):
                                    yield chunk
                    else:
                        # remove references to loop vars
                        del fut, task

                # panic on failure or timeout
                _panic_or_proceed(done, inflight, loop.step, asyncio.TimeoutError)
                # don't keep futures around in memory longer than needed
                del done, inflight, futures
                # debug flag
                if debug:
                    print_step_writes(
                        loop.step,
                        [w for t in loop.tasks for w in t.writes],
                        self.stream_channels_list,
                    )
            # emit output
            while loop.stream:
                mode, payload = loop.stream.popleft()
                if mode in stream_modes:
                    if isinstance(stream_mode, list):
                        yield (mode, payload)
                    else:
                        yield payload
            # handle exit
            if loop.status == "out_of_steps":
                raise GraphRecursionError(
                    f"Recursion limit of {config['recursion_limit']} reached "
                    "without hitting a stop condition. You can increase the "
                    "limit by setting the `recursion_limit` config key."
                )

            # set final channel values as run output
            await run_manager.on_chain_end(
                read_channels(loop.channels, output_keys)
            )
    except BaseException as e:
        # TODO use on_chain_end if exc is GraphInterrupt
        await asyncio.shield(run_manager.on_chain_error(e))
        raise

invoke(input, config=None, *, stream_mode='values', output_keys=None, interrupt_before=None, interrupt_after=None, debug=None, **kwargs)
invokeinput config=None * stream_mode='values' output_keys=None interrupt_before=None interrupt_after=None debug=None, **kwargs

Run the graph with a single input and config.
使用单个输入和配置运行图形。

Parameters: 参数:

  • input (Union[dict[str, Any], Any]) –

    The input data for the graph. It can be a dictionary or any other type.


    输入联合[dict[strany], any]) –

    图形的输入数据。它可以是字典或任何其他类型。

  • config (Optional[RunnableConfig], default: None ) –

    Optional. The configuration for the graph run.


    配置可选 [RunnableConfig],默认值: 没有 ) –

    自选。图形运行的配置。

  • stream_mode (StreamMode, default: 'values' ) –

    Optional[str]. The stream mode for the graph run. Default is "values".


    stream_modeStreamMode,默认值: “值” ) –

    可选[str]。图形运行的流模式。默认值为“values”。

  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    Optional. The output keys to retrieve from the graph run.


    output_keys可选[Union[strSequence[str]]],默认值: 没有 ) –

    自选。要从图形中检索的输出键将运行。

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    Optional. The nodes to interrupt the graph run before.


    interrupt_before可选[Union[AllSequence[str]]],默认值: 没有 ) –

    自选。中断图形的节点在运行之前。

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    Optional. The nodes to interrupt the graph run after.


    interrupt_after可选[Union[AllSequence[str]]],默认值: 没有 ) –

    自选。中断图形的节点在之后运行。

  • debug (Optional[bool], default: None ) –

    Optional. Enable debug mode for the graph run.


    调试可选[bool],默认值: 没有 ) –

    自选。为图形运行启用调试模式。

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments to pass to the graph run.


    **夸格斯任何,默认: {} ) –

    要传递给图形运行的其他关键字参数。

Returns: 返回:

  • Union[dict[str, Any], Any]

    The output of the graph run. If stream_mode is "values", it returns the latest output.


    联合[dict[strany], any]

    图形运行的输出。如果 stream_mode 是“values”,则返回最新的输出。

  • Union[dict[str, Any], Any]

    If stream_mode is not "values", it returns a list of output chunks.


    联合[dict[strany], any]

    如果 stream_mode 不是 “values”,则返回输出块的列表。

Source code in libs/langgraph/langgraph/pregel/__init__.py
libs/langgraph/langgraph/pregel/__init__.py中的源代码
def invoke(
    self,
    input: Union[dict[str, Any], Any],
    config: Optional[RunnableConfig] = None,
    *,
    stream_mode: StreamMode = "values",
    output_keys: Optional[Union[str, Sequence[str]]] = None,
    interrupt_before: Optional[Union[All, Sequence[str]]] = None,
    interrupt_after: Optional[Union[All, Sequence[str]]] = None,
    debug: Optional[bool] = None,
    **kwargs: Any,
) -> Union[dict[str, Any], Any]:
    """Run the graph with a single input and config.

    Args:
        input: The input data for the graph. It can be a dictionary or any other type.
        config: Optional. The configuration for the graph run.
        stream_mode: Optional[str]. The stream mode for the graph run. Default is "values".
        output_keys: Optional. The output keys to retrieve from the graph run.
        interrupt_before: Optional. The nodes to interrupt the graph run before.
        interrupt_after: Optional. The nodes to interrupt the graph run after.
        debug: Optional. Enable debug mode for the graph run.
        **kwargs: Additional keyword arguments to pass to the graph run.

    Returns:
        The output of the graph run. If stream_mode is "values", it returns the latest output.
        If stream_mode is not "values", it returns a list of output chunks.
    """
    output_keys = output_keys if output_keys is not None else self.output_channels
    if stream_mode == "values":
        latest: Union[dict[str, Any], Any] = None
    else:
        chunks = []
    for chunk in self.stream(
        input,
        config,
        stream_mode=stream_mode,
        output_keys=output_keys,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        debug=debug,
        **kwargs,
    ):
        if stream_mode == "values":
            latest = chunk
        else:
            chunks.append(chunk)
    if stream_mode == "values":
        return latest
    else:
        return chunks

ainvoke(input, config=None, *, stream_mode='values', output_keys=None, interrupt_before=None, interrupt_after=None, debug=None, **kwargs) async
ainvokeinput config=None * stream_mode='values' output_keys=None interrupt_before=None interrupt_after=None debug=None **kwargs 异步

Asynchronously invoke the graph on a single input.
在单个输入上异步调用图形。

Parameters: 参数:

  • input (Union[dict[str, Any], Any]) –

    The input data for the computation. It can be a dictionary or any other type.


    输入联合[dict[strany], any]) –

    用于计算的输入数据。它可以是字典或任何其他类型。

  • config (Optional[RunnableConfig], default: None ) –

    Optional. The configuration for the computation.


    配置可选 [RunnableConfig],默认值: 没有 ) –

    自选。计算的配置。

  • stream_mode (StreamMode, default: 'values' ) –

    Optional. The stream mode for the computation. Default is "values".


    stream_modeStreamMode,默认值: “值” ) –

    自选。计算的流模式。默认值为“values”。

  • output_keys (Optional[Union[str, Sequence[str]]], default: None ) –

    Optional. The output keys to include in the result. Default is None.


    output_keys可选[Union[strSequence[str]]],默认值: 没有 ) –

    自选。要包含在结果中的输出键。默认值为 None。

  • interrupt_before (Optional[Union[All, Sequence[str]]], default: None ) –

    Optional. The nodes to interrupt before. Default is None.


    interrupt_before可选[Union[AllSequence[str]]],默认值: 没有 ) –

    自选。之前要中断的节点。默认值为 None。

  • interrupt_after (Optional[Union[All, Sequence[str]]], default: None ) –

    Optional. The nodes to interrupt after. Default is None.


    interrupt_after可选[Union[AllSequence[str]]],默认值: 没有 ) –

    自选。之后要中断的节点。默认值为 None。

  • debug (Optional[bool], default: None ) –

    Optional. Whether to enable debug mode. Default is None.


    调试可选[bool],默认值: 没有 ) –

    自选。是否启用调试模式。默认值为 None。

  • **kwargs (Any, default: {} ) –

    Additional keyword arguments.


    **夸格斯任何,默认: {} ) –

    其他关键字参数。

Returns: 返回:

  • Union[dict[str, Any], Any]

    The result of the computation. If stream_mode is "values", it returns the latest value.


    联合[dict[strany], any]

    计算的结果。如果 stream_mode 是 “values”,则返回最新的值。

  • Union[dict[str, Any], Any]

    If stream_mode is "chunks", it returns a list of chunks.


    联合[dict[strany], any]

    如果 stream_mode 是 “chunks”,则返回块列表。

Source code in libs/langgraph/langgraph/pregel/__init__.py
libs/langgraph/langgraph/pregel/__init__.py中的源代码
async def ainvoke(
    self,
    input: Union[dict[str, Any], Any],
    config: Optional[RunnableConfig] = None,
    *,
    stream_mode: StreamMode = "values",
    output_keys: Optional[Union[str, Sequence[str]]] = None,
    interrupt_before: Optional[Union[All, Sequence[str]]] = None,
    interrupt_after: Optional[Union[All, Sequence[str]]] = None,
    debug: Optional[bool] = None,
    **kwargs: Any,
) -> Union[dict[str, Any], Any]:
    """Asynchronously invoke the graph on a single input.

    Args:
        input: The input data for the computation. It can be a dictionary or any other type.
        config: Optional. The configuration for the computation.
        stream_mode: Optional. The stream mode for the computation. Default is "values".
        output_keys: Optional. The output keys to include in the result. Default is None.
        interrupt_before: Optional. The nodes to interrupt before. Default is None.
        interrupt_after: Optional. The nodes to interrupt after. Default is None.
        debug: Optional. Whether to enable debug mode. Default is None.
        **kwargs: Additional keyword arguments.

    Returns:
        The result of the computation. If stream_mode is "values", it returns the latest value.
        If stream_mode is "chunks", it returns a list of chunks.
    """

    output_keys = output_keys if output_keys is not None else self.output_channels
    if stream_mode == "values":
        latest: Union[dict[str, Any], Any] = None
    else:
        chunks = []
    async for chunk in self.astream(
        input,
        config,
        stream_mode=stream_mode,
        output_keys=output_keys,
        interrupt_before=interrupt_before,
        interrupt_after=interrupt_after,
        debug=debug,
        **kwargs,
    ):
        if stream_mode == "values":
            latest = chunk
        else:
            chunks.append(chunk)
    if stream_mode == "values":
        return latest
    else:
        return chunks

get_graph(config=None, *, xray=False)
get_graphconfig=None * xray=False

Returns a drawable representation of the computation graph.
返回计算图的可绘制表示形式。

Source code in libs/langgraph/langgraph/graph/graph.py
libs/langgraph/langgraph/graph/graph.py 中的源代码
def get_graph(
    self,
    config: Optional[RunnableConfig] = None,
    *,
    xray: Union[int, bool] = False,
) -> DrawableGraph:
    """Returns a drawable representation of the computation graph."""
    graph = DrawableGraph()
    start_nodes: dict[str, DrawableNode] = {
        START: graph.add_node(self.get_input_schema(config), START)
    }
    end_nodes: dict[str, DrawableNode] = {}

    def add_edge(
        start: str, end: str, label: Optional[str] = None, conditional: bool = False
    ) -> None:
        if end == END and END not in end_nodes:
            end_nodes[END] = graph.add_node(self.get_output_schema(config), END)
        return graph.add_edge(
            start_nodes[start], end_nodes[end], label, conditional
        )

    for key, n in self.builder.nodes.items():
        node = n.runnable
        if xray:
            subgraph = (
                node.get_graph(
                    config=config,
                    xray=xray - 1 if isinstance(xray, int) and xray > 0 else xray,
                )
                if isinstance(node, CompiledGraph)
                else node.get_graph(config=config)
            )
            subgraph.trim_first_node()
            subgraph.trim_last_node()
            if len(subgraph.nodes) > 1:
                end_nodes[key], start_nodes[key] = graph.extend(
                    subgraph, prefix=key
                )
            else:
                n = graph.add_node(node, key)
                start_nodes[key] = n
                end_nodes[key] = n
        else:
            n = graph.add_node(node, key, metadata=n.metadata)
            start_nodes[key] = n
            end_nodes[key] = n
    for start, end in sorted(self.builder._all_edges):
        add_edge(start, end)
    for start, branches in self.builder.branches.items():
        default_ends = {
            **{k: k for k in self.builder.nodes if k != start},
            END: END,
        }
        for _, branch in branches.items():
            if branch.ends is not None:
                ends = branch.ends
            elif branch.then is not None:
                ends = {k: k for k in default_ends if k not in (END, branch.then)}
            else:
                ends = default_ends
            for label, end in ends.items():
                add_edge(
                    start,
                    end,
                    label if label != end else None,
                    conditional=True,
                )
                if branch.then is not None:
                    add_edge(end, branch.then)

    return graph

StreamMode

How the stream method should emit outputs.
流方法应如何发出输出。

  • 'values': Emit all values of the state for each step.
    “values”:为每个步骤发出状态的所有值。
  • 'updates': Emit only the node name(s) and updates that were returned by the node(s) after each step.
    “updates”:仅发出节点名称和每个步骤节点返回的更新。
  • 'debug': Emit debug events for each step.
    “debug”:为每个步骤发出调试事件。

Constants
常量

The following constants and classes are used to help control graph execution.
以下常量和类用于帮助控制图形执行。

START 开始

START is a string constant ("__start__") that serves as a "virtual" node in the graph. Adding an edge (or conditional edges) from START to node one or more nodes in your graph will direct the graph to begin execution there.
START 是一个字符串常量 (“__start__”),用作图形中的“虚拟”节点。将 START 中的边(或条件边)添加到图形中的一个或多个节点,将指示图形在那里开始执行。

from langgraph.graph import START
...
builder.add_edge(START, "my_node")
# Or to add a conditional starting point
builder.add_conditional_edges(START, my_condition)

END 结束

END is a string constant ("__end__") that serves as a "virtual" node in the graph. Adding an edge (or conditional edges) from one or more nodes in your graph to the END "node" will direct the graph to cease execution as soon as it reaches this point.
END 是一个字符串常量 (“__end__”),用作图形中的“虚拟”节点。将图中一个或多个节点的边(或条件边)添加到 END“节点”将指示图在到达此点后立即停止执行。

from langgraph.graph import END
...
builder.add_edge("my_node", END) # Stop any time my_node completes
# Or to conditionally terminate
def my_condition(state):
    if state["should_stop"]:
        return END
    return "my_node"
builder.add_conditional_edges("my_node", my_condition)

Send 发送

A message or packet to send to a specific node in the graph.
要发送到图中特定节点的消息或数据包。

The Send class is used within a StateGraph's conditional edges to dynamically route states to different nodes based on certain conditions. This enables creating "map-reduce" like workflows, where a node can be invoked multiple times in parallel on different states, and the results can be aggregated back into the main graph's state.
Send 类在 StateGraph 的条件边缘中使用,以根据特定条件动态地将状态路由到不同的节点。这样就可以创建类似“map-reduce”的工作流,其中可以在不同状态下并行多次调用节点,并且结果可以聚合回主图的状态。

Attributes: 属性:

  • node (str) –

    The name of the target node to send the message to.


    节点str) –

    要向其发送消息的目标节点的名称。

  • arg (Any) –

    The state or message to send to the target node.


    精 氨 酸任何)

    要发送到目标节点的状态或消息。

Examples: 例子:

>>> from typing import Annotated
>>> import operator
>>> class OverallState(TypedDict):
...     subjects: list[str]
...     jokes: Annotated[list[str], operator.add]
...
>>> from langgraph.constants import Send
>>> from langgraph.graph import END, START
>>> def continue_to_jokes(state: OverallState):
...     return [Send("generate_joke", {"subject": s}) for s in state['subjects']]
...
>>> from langgraph.graph import StateGraph
>>> builder = StateGraph(OverallState)
>>> builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]})
>>> builder.add_conditional_edges(START, continue_to_jokes)
>>> builder.add_edge("generate_joke", END)
>>> graph = builder.compile()
>>> graph.invoke({"subjects": ["cats", "dogs"]})
{'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']}
Source code in libs/langgraph/langgraph/constants.py
libs/langgraph/langgraph/constants.py 中的源代码
class Send:
    """A message or packet to send to a specific node in the graph.

    The `Send` class is used within a `StateGraph`'s conditional edges to dynamically
    route states to different nodes based on certain conditions. This enables
    creating "map-reduce" like workflows, where a node can be invoked multiple times
    in parallel on different states, and the results can be aggregated back into the
    main graph's state.

    Attributes:
        node (str): The name of the target node to send the message to.
        arg (Any): The state or message to send to the target node.

    Examples:
        >>> from typing import Annotated
        >>> import operator
        >>> class OverallState(TypedDict):
        ...     subjects: list[str]
        ...     jokes: Annotated[list[str], operator.add]
        ...
        >>> from langgraph.constants import Send
        >>> from langgraph.graph import END, START
        >>> def continue_to_jokes(state: OverallState):
        ...     return [Send("generate_joke", {"subject": s}) for s in state['subjects']]
        ...
        >>> from langgraph.graph import StateGraph
        >>> builder = StateGraph(OverallState)
        >>> builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]})
        >>> builder.add_conditional_edges(START, continue_to_jokes)
        >>> builder.add_edge("generate_joke", END)
        >>> graph = builder.compile()
        >>> graph.invoke({"subjects": ["cats", "dogs"]})
        {'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']}
    """

    node: str
    arg: Any

    def __init__(self, /, node: str, arg: Any) -> None:
        """
        Initialize a new instance of the Send class.

        Args:
            node (str): The name of the target node to send the message to.
            arg (Any): The state or message to send to the target node.
        """
        self.node = node
        self.arg = arg

    def __hash__(self) -> int:
        return hash((self.node, self.arg))

    def __repr__(self) -> str:
        return f"Send(node={self.node!r}, arg={self.arg!r})"

    def __eq__(self, value: object) -> bool:
        return (
            isinstance(value, Send)
            and self.node == value.node
            and self.arg == value.arg
        )

__init__(node, arg)
__init__node arg

Initialize a new instance of the Send class.
初始化 Send 类的新实例。

Parameters: 参数:

  • node (str) –

    The name of the target node to send the message to.


    节点str) –

    要向其发送消息的目标节点的名称。

  • arg (Any) –

    The state or message to send to the target node.


    精 氨 酸任何)

    要发送到目标节点的状态或消息。

Source code in libs/langgraph/langgraph/constants.py
libs/langgraph/langgraph/constants.py 中的源代码
def __init__(self, /, node: str, arg: Any) -> None:
    """
    Initialize a new instance of the Send class.

    Args:
        node (str): The name of the target node to send the message to.
        arg (Any): The state or message to send to the target node.
    """
    self.node = node
    self.arg = arg

RetryPolicy
重试策略

Bases: NamedTuple
基数:NamedTuple

Configuration for retrying nodes.
用于重试节点的配置。

Source code in libs/langgraph/langgraph/pregel/types.py
libs/langgraph/langgraph/pregel/types.py 中的源代码
class RetryPolicy(NamedTuple):
    """Configuration for retrying nodes."""

    initial_interval: float = 0.5
    """Amount of time that must elapse before the first retry occurs. In seconds."""
    backoff_factor: float = 2.0
    """Multiplier by which the interval increases after each retry."""
    max_interval: float = 128.0
    """Maximum amount of time that may elapse between retries. In seconds."""
    max_attempts: int = 3
    """Maximum number of attempts to make before giving up, including the first."""
    jitter: bool = True
    """Whether to add random jitter to the interval between retries."""
    retry_on: Union[
        Type[Exception], tuple[Type[Exception], ...], Callable[[Exception], bool]
    ] = default_retry_on
    """List of exception classes that should trigger a retry, or a callable that returns True for exceptions that should trigger a retry."""

initial_interval: float = 0.5 class-attribute instance-attribute
initial_interval float = 0.5 类属性 instance-attribute

Amount of time that must elapse before the first retry occurs. In seconds.
在第一次重试发生之前必须经过的时间量。在几秒钟内。

backoff_factor: float = 2.0 class-attribute instance-attribute
backoff_factor float = 2.0 类属性 instance-attribute

Multiplier by which the interval increases after each retry.
每次重试后间隔增加的乘数。

max_interval: float = 128.0 class-attribute instance-attribute
max_interval float = 128.0 类-属性 instance-attribute

Maximum amount of time that may elapse between retries. In seconds.
两次重试之间可能经过的最长时间。在几秒钟内。

max_attempts: int = 3 class-attribute instance-attribute
max_attempts int = 3 类-属性 instance-attribute

Maximum number of attempts to make before giving up, including the first.
放弃前的最大尝试次数,包括第一次。

jitter: bool = True class-attribute instance-attribute
抖动 bool = 真正的 类-属性 instance-attribute

Whether to add random jitter to the interval between retries.
是否在重试之间的间隔中添加随机抖动。

retry_on: Union[Type[Exception], tuple[Type[Exception], ...], Callable[[Exception], bool]] = default_retry_on class-attribute instance-attribute
retry_on Union[Type[Exception], tuple[Type[Exception], ...], callable[[Exception], bool]] = default_retry_on 类属性 instance-attribute

List of exception classes that should trigger a retry, or a callable that returns True for exceptions that should trigger a retry.
应触发重试的异常类的列表,或对于应触发重试的异常返回 True 的可调用对象。

Comments 评论