这是用户在 2024-10-28 16:04 为 https://langchain-ai.github.io/langgraph/how-tos/map-reduce/ 保存的双语快照页面,由 沉浸式翻译 提供双语支持。了解如何保存?
Skip to content

How to create map-reduce branches for parallel execution
如何创建用于并行执行的 Map-Reduce 分支 ¶

Prerequisites 先决条件

This guide assumes familiarity with the following:
本指南假设您熟悉以下内容:

Map-reduce operations are essential for efficient task decomposition and parallel processing. This approach involves breaking a task into smaller sub-tasks, processing each sub-task in parallel, and aggregating the results across all of the completed sub-tasks.
Map-reduce 操作对于高效的任务分解和并行处理至关重要。该方法涉及将任务分解为更小的子任务,分别并行处理每个子任务,并聚合所有已完成子任务的结果。

Consider this example: given a general topic from the user, generate a list of related subjects, generate a joke for each subject, and select the best joke from the resulting list. In this design pattern, a first node may generate a list of objects (e.g., related subjects) and we want to apply some other node (e.g., generate a joke) to all those objects (e.g., subjects).
考虑这个例子:给定用户的一个一般主题,生成一个相关主题的列表,为每个主题生成一个笑话,并从生成的列表中选择最佳笑话。在这个设计模式中,第一个节点可能会生成一个对象列表(例如,相关主题),我们希望将其他节点(例如,生成笑话)应用于所有这些对象(例如,主题)。

However, two main challenges arise.
然而,出现了两个主要挑战。

(1) the number of objects (e.g., subjects) may be unknown ahead of time (meaning the number of edges may not be known) when we lay out the graph and (2) the input State to the downstream Node should be different (one for each generated object).
(1) 在我们布局图形时,物体(例如,主题)的数量可能事先未知(这意味着边的数量可能未知),以及 (2) 输入状态到下游节点应该是不同的(每个生成的对象一个)。

LangGraph addresses these challenges through its Send API. By utilizing conditional edges, Send can distribute different states (e.g., subjects) to multiple instances of a node (e.g., joke generation). Importantly, the sent state can differ from the core graph's state, allowing for flexible and dynamic workflow management.
LangGraph 通过其 Send API 解决了这些挑战。通过利用条件边, Send 可以将不同的状态(例如,主题)分配给节点的多个实例(例如,笑话生成)。重要的是,发送的状态可以与核心图的状态不同,从而实现灵活和动态的工作流管理。

Screenshot 2024-07-12 at 9.45.40 AM.png

Setup 设置 ¶

First, let's install the required packages and set our API keys
首先,让我们安装所需的包并设置我们的 API 密钥

%%capture --no-stderr
%pip install -U langchain-anthropic langgraph
import os
import getpass


def _set_env(name: str):
    if not os.getenv(name):
        os.environ[name] = getpass.getpass(f"{name}: ")


_set_env("ANTHROPIC_API_KEY")

Set up LangSmith for LangGraph development
为 LangGraph 开发设置 LangSmith

Sign up for LangSmith to quickly spot issues and improve the performance of your LangGraph projects. LangSmith lets you use trace data to debug, test, and monitor your LLM apps built with LangGraph — read more about how to get started here.
注册 LangSmith,以快速发现问题并提高您使用 LangGraph 项目的性能。LangSmith 允许您使用追踪数据来调试、测试和监控您使用 LangGraph 构建的 LLM 应用程序 — 在这里阅读更多关于如何开始的信息。

Define the graph 定义图形 ¶

Using Pydantic with LangChain
使用 Pydantic 与 LangChain

This notebook uses Pydantic v2 BaseModel, which requires langchain-core >= 0.3. Using langchain-core < 0.3 will result in errors due to mixing of Pydantic v1 and v2 BaseModels.
本笔记本使用 Pydantic v2 BaseModel ,这需要 langchain-core >= 0.3 。使用 langchain-core < 0.3 将导致错误,因为混合使用 Pydantic v1 和 v2 BaseModels

import operator
from typing import Annotated
from typing_extensions import TypedDict

from langchain_anthropic import ChatAnthropic

from langgraph.types import Send
from langgraph.graph import END, StateGraph, START

from pydantic import BaseModel, Field

# Model and prompts
# Define model and prompts we will use
subjects_prompt = """Generate a comma separated list of between 2 and 5 examples related to: {topic}."""
joke_prompt = """Generate a joke about {subject}"""
best_joke_prompt = """Below are a bunch of jokes about {topic}. Select the best one! Return the ID of the best one.

{jokes}"""


class Subjects(BaseModel):
    subjects: list[str]


class Joke(BaseModel):
    joke: str


class BestJoke(BaseModel):
    id: int = Field(description="Index of the best joke, starting with 0", ge=0)


model = ChatAnthropic(model="claude-3-5-sonnet-20240620")

# Graph components: define the components that will make up the graph


# This will be the overall state of the main graph.
# It will contain a topic (which we expect the user to provide)
# and then will generate a list of subjects, and then a joke for
# each subject
class OverallState(TypedDict):
    topic: str
    subjects: list
    # Notice here we use the operator.add
    # This is because we want combine all the jokes we generate
    # from individual nodes back into one list - this is essentially
    # the "reduce" part
    jokes: Annotated[list, operator.add]
    best_selected_joke: str


# This will be the state of the node that we will "map" all
# subjects to in order to generate a joke
class JokeState(TypedDict):
    subject: str


# This is the function we will use to generate the subjects of the jokes
def generate_topics(state: OverallState):
    prompt = subjects_prompt.format(topic=state["topic"])
    response = model.with_structured_output(Subjects).invoke(prompt)
    return {"subjects": response.subjects}


# Here we generate a joke, given a subject
def generate_joke(state: JokeState):
    prompt = joke_prompt.format(subject=state["subject"])
    response = model.with_structured_output(Joke).invoke(prompt)
    return {"jokes": [response.joke]}


# Here we define the logic to map out over the generated subjects
# We will use this an edge in the graph
def continue_to_jokes(state: OverallState):
    # We will return a list of `Send` objects
    # Each `Send` object consists of the name of a node in the graph
    # as well as the state to send to that node
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]


# Here we will judge the best joke
def best_joke(state: OverallState):
    jokes = "\n\n".join(state["jokes"])
    prompt = best_joke_prompt.format(topic=state["topic"], jokes=jokes)
    response = model.with_structured_output(BestJoke).invoke(prompt)
    return {"best_selected_joke": state["jokes"][response.id]}


# Construct the graph: here we put everything together to construct our graph
graph = StateGraph(OverallState)
graph.add_node("generate_topics", generate_topics)
graph.add_node("generate_joke", generate_joke)
graph.add_node("best_joke", best_joke)
graph.add_edge(START, "generate_topics")
graph.add_conditional_edges("generate_topics", continue_to_jokes, ["generate_joke"])
graph.add_edge("generate_joke", "best_joke")
graph.add_edge("best_joke", END)
app = graph.compile()
API Reference: ChatAnthropic | Send | END | StateGraph | START
API 参考:ChatAnthropic | 发送 | 结束 | 状态图 | 开始
from IPython.display import Image

Image(app.get_graph().draw_mermaid_png())

Use the graph 使用图表

# Call the graph: here we call it to generate a list of jokes
for s in app.stream({"topic": "animals"}):
    print(s)
{'generate_topics': {'subjects': ['Lions', 'Elephants', 'Penguins', 'Dolphins']}}
{'generate_joke': {'jokes': ["Why don't elephants use computers? They're afraid of the mouse!"]}}
{'generate_joke': {'jokes': ["Why don't dolphins use smartphones? Because they're afraid of phishing!"]}}
{'generate_joke': {'jokes': ["Why don't you see penguins in Britain? Because they're afraid of Wales!"]}}
{'generate_joke': {'jokes': ["Why don't lions like fast food? Because they can't catch it!"]}}
{'best_joke': {'best_selected_joke': "Why don't dolphins use smartphones? Because they're afraid of phishing!"}}

Comments 评论

x1.00
>
<
>>
<<
O
x1.00