langgraph 에이전트가 tool 사용 중임을 사용자에게 알릴 수 있도록 구현
FastAPI는 데이터의 stream 전송을 StreamingResponse 객체로 구현할 수 있음
이 때, stream 전송이 가능한 형태는 텍스트 or file blob의 형태
astream_events의 stream chunk 중 FE에서 필요한 정보를 종합해, json 텍스트의 형태로 전달하는 형태로 구현할 수 있음
@router.post("/astream")
async def applyhome_agent_stream(
scheme: ApplyhomeAgentRouterScheme,
request: Request
):
agent: ApplyhomeAgent = request.app.applyhome_agent
messages = [
HumanMessage(content=m.content)
if m.type == "user"
else AIMessage(content=m.content)
for m in scheme.state["messages"]
]
scheme.state["messages"] = messages
state = ApplyhomeAgentState(**scheme.state)
async def async_gen():
resp = agent.workflow.astream_events(state, scheme.config, version="v2")
async for event in resp:
# Chat Model의 답변 처리
if event["event"] == "on_chat_model_stream":
content = event["data"]["chunk"].content
else:
content = ""
# CrawlTool의 입력 변수
if event["event"] == "on_tool_start":
tool_inputs = event["data"]["input"]
else:
tool_inputs = None
output = {
"content": content,
"event": {
"state": event["event"],
"name": event["name"],
},
"config": scheme.config,
"tool_inputs": tool_inputs,
}
output = json.dumps(output, ensure_ascii=False) + "\\n"
yield output
return StreamingResponse(async_gen(), media_type="text/event-stream")
{
"state": {
"messages": [
{
"type": "user",
"content": "대구 민간임대 공고 찾아줘"
}
],
"is_last_step": false,
"remaining_steps": 10
},
"config": {
"configurable": {
"thread_id": "thread-1",
"user_id": 123
}
}
}
{"content": "", "event": {"state": "on_chain_start", "name": "LangGraph"}, "config": {"configurable": {"thread_id": "thread-1", "user_id": 123}}, "tool_inputs": null}
{"content": "", "event": {"state": "on_chain_start", "name": "__start__"}, "config": {"configurable": {"thread_id": "thread-1", "user_id": 123}}, "tool_inputs": null}
{"content": "", "event": {"state": "on_chain_end", "name": "__start__"}, "config": {"configurable": {"thread_id": "thread-1", "user_id": 123}}, "tool_inputs": null}
{"content": "", "event": {"state": "on_chain_start", "name": "agent"}, "config": {"configurable": {"thread_id": "thread-1", "user_id": 123}}, "tool_inputs": null}
...
{"content": "", "event": {"state": "on_chain_start", "name": "tools"}, "config": {"configurable": {"thread_id": "thread-1", "user_id": 123}}, "tool_inputs": null}
{"content": "", "event": {"state": "on_tool_start", "name": "CrawlTool"}, "config": {"configurable": {"thread_id": "thread-1", "user_id": 123}}, "tool_inputs": {"user_query": "대구 민간임대", "house_type": "민간임대오피스텔", "jiyeok": "대구"}}
{"content": "", "event": {"state": "on_tool_end", "name": "CrawlTool"}, "config": {"configurable": {"thread_id": "thread-1", "user_id": 123}}, "tool_inputs": null}
...
{"content": "", "event": {"state": "on_chat_model_start", "name": "ChatOpenAI"}, "config": {"configurable": {"thread_id": "thread-1", "user_id": 123}}, "tool_inputs": null}
{"content": "", "event": {"state": "on_chat_model_stream", "name": "ChatOpenAI"}, "config": {"configurable": {"thread_id": "thread-1", "user_id": 123}}, "tool_inputs": null}
{"content": "대", "event": {"state": "on_chat_model_stream", "name": "ChatOpenAI"}, "config": {"configurable": {"thread_id": "thread-1", "user_id": 123}}, "tool_inputs": null}
{"content": "구", "event": {"state": "on_chat_model_stream", "name": "ChatOpenAI"}, "config": {"configurable": {"thread_id": "thread-1", "user_id": 123}}, "tool_inputs": null}
{"content": "의", "event": {"state": "on_chat_model_stream", "name": "ChatOpenAI"}, "config": {"configurable": {"thread_id": "thread-1", "user_id": 123}}, "tool_inputs": null}
{"content": " 민", "event": {"state": "on_chat_model_stream", "name": "ChatOpenAI"}, "config": {"configurable": {"thread_id": "thread-1", "user_id": 123}}, "tool_inputs": null}
{"content": "간", "event": {"state": "on_chat_model_stream", "name": "ChatOpenAI"}, "config": {"configurable": {"thread_id": "thread-1", "user_id": 123}}, "tool_inputs": null}
{"content": "임", "event": {"state": "on_chat_model_stream", "name": "ChatOpenAI"}, "config": {"configurable": {"thread_id": "thread-1", "user_id": 123}}, "tool_inputs": null}
{"content": "대", "event": {"state": "on_chat_model_stream", "name": "ChatOpenAI"}, "config": {"configurable": {"thread_id": "thread-1", "user_id": 123}}, "tool_inputs": null}
...
response = requests.post(
url=os.path.join(BACKEND_URL, "applyhome_agent", "astream"),
json=build_json_data(message, "/applyhome"),
stream=True
)
if response.status_code == requests.codes.ok:
msg = ""
for chunk in response.iter_lines():
if chunk:
chunk_str = chunk.decode()
chunk_data = json.loads(chunk_str)
chunk_data = ApplyhomeResponse(**chunk_data)
event_state = chunk_data["event"]["state"]
if event_state == "on_chat_model_stream":
msg += chunk_data["content"]
if len(msg) > 1000 and msg.endswith("\\n\\n"):
await message.channel.send(msg)
msg = ""
elif event_state == "on_tool_start":
tool_kwargs = chunk_data["tool_inputs"]
tool_msg = f"*{tool_kwargs['jiyeok']}* 지역 내의 "
tool_msg += f"*{tool_kwargs['house_type']}* 유형의 공고문을 검색합니다."
await message.channel.send(tool_msg)
elif event_state == "on_tool_end":
tool_msg = "검색을 완료했습니다."
await message.channel.send(tool_msg)
else:
# Stream End
break
await message.channel.send(msg)
else:
raise Exception("Error")
on_tool_start인 경우, 사용자에게 tool 호출을 수행했음을 알리는 메세지를 전송on_tool_end인 경우, 사용자게에 tool 호출을 종료했음을 알리는 메세지를 전송