i have a async workflow
class TextGetMetricData(Workflow):
agent = MetricsAgent(Metrics_Knowledge(embedder=embedder,knowledge=MarkdownKnowledgeBase(
path="/Users/extreme/Projects/autohome.com/QAKnowledge/metrics",
# Table name: ai.website_documents
vector_db=PgVector(
table_name="metrics_documents",
db_url="postgresql+psycopg://ai:ai@localhost:5532/ai",
embedder=embedder,
content_language="zh-CN",
),
)),debug_mode=True)
def run(self, question: str) -> RunResponse:
"""
同步运行方法,为支持 Playground 而实现
将异步方法转为同步方法,并返回一个 RunResponse 对象
"""
import asyncio
# 收集异步生成器中的所有结果
async def collect_results():
try:
results = []
async for result in self.arun(question):
results.append(result)
# 如果有结果,返回最后一个;否则创建一个默认的 RunResponse
if results:
return results[-1]
else:
return RunResponse(
event=RunEvent.workflow_completed,
content="未能获取查询结果。"
)
except Exception as e:
import traceback
return RunResponse(
event=RunEvent.workflow_completed,
content=f"查询过程中发生错误: {str(e)}\n{traceback.format_exc()}"
)
# 使用事件循环运行异步函数
try:
loop = asyncio.get_event_loop()
except RuntimeError:
# 如果没有事件循环,创建一个新的
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# 直接运行协程,确保它被正确等待
final_result = loop.run_until_complete(collect_results())
# 确保返回的是 RunResponse 对象
if not isinstance(final_result, RunResponse):
return RunResponse(
event=RunEvent.workflow_completed,
content=str(final_result)
)
return final_result
async def arun(self, question: str) -> AsyncIterator[RunResponse]:
knowledge_response = await self.agent.get_knowledge_agent().arun(
question,
debug_mode=True,
markdown=True,
)
if knowledge_response is None or knowledge_response.content is None:
yield RunResponse(
event=RunEvent.workflow_completed,
content="没有检索到相关的查询语句。",
)
return
async with MCPTools(url="http://127.0.0.1:8081/sse", transport="sse") as mcp_tools:
agent = self.agent.get_vm_agent([mcp_tools])
query_response = await agent.arun(
knowledge_response.content,
debug_mode=self.debug_mode,
markdown=True,
)
if query_response is None or query_response.content is None:
yield RunResponse(
event=RunEvent.workflow_completed,
content="没有检索到相关的查询语句。",
)
return
summary_agent = self.agent.get_summary_agent()
summary_response = await summary_agent.arun(str(query_response.content), stream=True, stream_intermediate_steps=True)
# Stream the writer's response directly
async for response in summary_response:
if response.content:
yield RunResponse(
content=response.content,
# event=response.event,
run_id=self.run_id
)
async def main():
generate_blog_post = TextGetMetricData(
session_id="generate-blog-post-on-1",
debug_mode=True,
)
async for item in generate_blog_post.arun(
question="查询服务app-cars-shuanghuo-40791最近10分qps趋势",
):
pprint_run_response(item, markdown=True)
if __name__ == "__main__":
asyncio.run(main())
When I execute it on the playground, it will report an error.
DEBUG ************ Agent ID: metrics_knowledge_agent *************
DEBUG --**-- Creating Playground Endpoint
INFO Starting playground on http://localhost:7777
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Agent Playground ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ ┃
┃ ┃
┃ Playground URL: https://app.agno.com/playground?endpoint=localhost%3A7777 ┃
┃ ┃
┃ ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
INFO: Started server process [34270]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://localhost:7777 (Press CTRL+C to quit)
INFO: ::1:54534 - "GET /v1/playground/workflows/textgetmetricdata HTTP/1.1" 200 OK
INFO: ::1:54535 - "GET /v1/playground/workflows/textgetmetricdata HTTP/1.1" 200 OK
INFO: ::1:54535 - "GET /v1/playground/status HTTP/1.1" 200 OK
INFO: ::1:54534 - "GET /v1/playground/workflows/textgetmetricdata/sessions?user_id=a7679054-71b7-4c85-a4aa-9adef2d18e51 HTTP/1.1" 404 Not Found
INFO: ::1:54542 - "GET /v1/playground/workflows/textgetmetricdata HTTP/1.1" 200 OK
INFO: ::1:54534 - "GET /v1/playground/workflows/textgetmetricdata HTTP/1.1" 200 OK
INFO: ::1:54534 - "GET /v1/playground/workflows HTTP/1.1" 200 OK
INFO: ::1:54534 - "GET /v1/playground/status HTTP/1.1" 200 OK
DEBUG Creating new session
DEBUG Created new TextGetMetricData
INFO: ::1:54532 - "POST /v1/playground/workflows/textgetmetricdata/runs HTTP/1.1" 500 Internal Server Error
INFO: ::1:54534 - "GET /v1/playground/workflows/textgetmetricdata HTTP/1.1" 200 OK
/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/starlette/_exception_handler.py:63: RuntimeWarning: coroutine 'TextGetMetricData.run.<locals>.collect_results' was never awaited
await response(scope, receive, sender)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
INFO: ::1:54534 - "GET /v1/playground/workflows/textgetmetricdata HTTP/1.1" 200 OK
How can I solve this problem?
Thank you, all the technical gods.