Asynchronous workflow errors in playground

i have a async workflow

class TextGetMetricData(Workflow):
    agent = MetricsAgent(Metrics_Knowledge(embedder=embedder,knowledge=MarkdownKnowledgeBase(
        path="/Users/extreme/Projects/autohome.com/QAKnowledge/metrics",
        # Table name: ai.website_documents
        vector_db=PgVector(
            table_name="metrics_documents",
            db_url="postgresql+psycopg://ai:ai@localhost:5532/ai",
            embedder=embedder,
            content_language="zh-CN",
        ),
    )),debug_mode=True)

    def run(self, question: str) -> RunResponse:
        """
        同步运行方法,为支持 Playground 而实现
        将异步方法转为同步方法,并返回一个 RunResponse 对象
        """
        import asyncio

        # 收集异步生成器中的所有结果
        async def collect_results():
            try:
                results = []
                async for result in self.arun(question):
                    results.append(result)

                # 如果有结果,返回最后一个;否则创建一个默认的 RunResponse
                if results:
                    return results[-1]
                else:
                    return RunResponse(
                        event=RunEvent.workflow_completed,
                        content="未能获取查询结果。"
                    )
            except Exception as e:
                import traceback
                return RunResponse(
                    event=RunEvent.workflow_completed,
                    content=f"查询过程中发生错误: {str(e)}\n{traceback.format_exc()}"
                )

        # 使用事件循环运行异步函数
        try:
            loop = asyncio.get_event_loop()
        except RuntimeError:
            # 如果没有事件循环,创建一个新的
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)

        # 直接运行协程,确保它被正确等待
        final_result = loop.run_until_complete(collect_results())

        # 确保返回的是 RunResponse 对象
        if not isinstance(final_result, RunResponse):
            return RunResponse(
                event=RunEvent.workflow_completed,
                content=str(final_result)
            )

        return final_result

    async def arun(self, question: str) -> AsyncIterator[RunResponse]:
        knowledge_response = await self.agent.get_knowledge_agent().arun(
            question,
            debug_mode=True,
            markdown=True,
        )
        if knowledge_response is None or knowledge_response.content is None:
            yield RunResponse(
                event=RunEvent.workflow_completed,
                content="没有检索到相关的查询语句。",
            )
            return
        async with MCPTools(url="http://127.0.0.1:8081/sse", transport="sse") as mcp_tools:
            agent = self.agent.get_vm_agent([mcp_tools])
            query_response = await agent.arun(
                knowledge_response.content,
                debug_mode=self.debug_mode,
                markdown=True,
            )

            if query_response is None or query_response.content is None:
                yield RunResponse(
                    event=RunEvent.workflow_completed,
                    content="没有检索到相关的查询语句。",
                )
                return
            summary_agent = self.agent.get_summary_agent()

            summary_response = await summary_agent.arun(str(query_response.content), stream=True, stream_intermediate_steps=True)
            # Stream the writer's response directly
            async for response in summary_response:
                if response.content:
                    yield RunResponse(
                        content=response.content,
                        # event=response.event,
                        run_id=self.run_id
                    )

async def main():
    generate_blog_post = TextGetMetricData(
        session_id="generate-blog-post-on-1",
        debug_mode=True,
    )

    async for item in generate_blog_post.arun(
        question="查询服务app-cars-shuanghuo-40791最近10分qps趋势",
    ):
        pprint_run_response(item, markdown=True)


if __name__ == "__main__":
    asyncio.run(main())

When I execute it on the playground, it will report an error.

DEBUG ************ Agent ID: metrics_knowledge_agent *************              
DEBUG --**-- Creating Playground Endpoint                                       
INFO Starting playground on http://localhost:7777                               
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Agent Playground ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃                                                                             ┃
┃                                                                             ┃
┃  Playground URL: https://app.agno.com/playground?endpoint=localhost%3A7777  ┃
┃                                                                             ┃
┃                                                                             ┃
┗━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
INFO:     Started server process [34270]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://localhost:7777 (Press CTRL+C to quit)
INFO:     ::1:54534 - "GET /v1/playground/workflows/textgetmetricdata HTTP/1.1" 200 OK
INFO:     ::1:54535 - "GET /v1/playground/workflows/textgetmetricdata HTTP/1.1" 200 OK
INFO:     ::1:54535 - "GET /v1/playground/status HTTP/1.1" 200 OK
INFO:     ::1:54534 - "GET /v1/playground/workflows/textgetmetricdata/sessions?user_id=a7679054-71b7-4c85-a4aa-9adef2d18e51 HTTP/1.1" 404 Not Found
INFO:     ::1:54542 - "GET /v1/playground/workflows/textgetmetricdata HTTP/1.1" 200 OK
INFO:     ::1:54534 - "GET /v1/playground/workflows/textgetmetricdata HTTP/1.1" 200 OK
INFO:     ::1:54534 - "GET /v1/playground/workflows HTTP/1.1" 200 OK
INFO:     ::1:54534 - "GET /v1/playground/status HTTP/1.1" 200 OK
DEBUG Creating new session                                                      
DEBUG Created new TextGetMetricData                                             
INFO:     ::1:54532 - "POST /v1/playground/workflows/textgetmetricdata/runs HTTP/1.1" 500 Internal Server Error
INFO:     ::1:54534 - "GET /v1/playground/workflows/textgetmetricdata HTTP/1.1" 200 OK
/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/starlette/_exception_handler.py:63: RuntimeWarning: coroutine 'TextGetMetricData.run.<locals>.collect_results' was never awaited
  await response(scope, receive, sender)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
INFO:     ::1:54534 - "GET /v1/playground/workflows/textgetmetricdata HTTP/1.1" 200 OK

How can I solve this problem?
Thank you, all the technical gods.

In addition, in agno1.7.2 version, I can run this python file directly. When upgrading to agno1.7.5 version, it will report an error.

/Users/extreme/anaconda3/envs/auto-intelligent/bin/python /Users/extreme/Projects/autohome.com/auto-intelligent/src/workflow/metrics.py 
/Users/extreme/Projects/autohome.com/auto-intelligent/src/workflow/metrics.py:127: RuntimeWarning: coroutine 'Workflow.arun_workflow' was never awaited
  async for item in generate_blog_post.arun(
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
Traceback (most recent call last):
  File "/Users/extreme/Projects/autohome.com/auto-intelligent/src/workflow/metrics.py", line 134, in <module>
    asyncio.run(main())
  File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/asyncio/runners.py", line 195, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/asyncio/base_events.py", line 691, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/extreme/Projects/autohome.com/auto-intelligent/src/workflow/metrics.py", line 127, in main
    async for item in generate_blog_post.arun(
                      ^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'async for' requires an object with __aiter__ method, got coroutine

Hi @extreme, thanks for reaching out and supporting Agno. I’ve shared this with the team, we’re working through all requests one by one and will get back to you soon.
If it’s urgent, please let us know. We appreciate your patience!

Hi @extreme
Our first implementation of async workflows didn’t correctly execute the workflow arun method that you add. So in your case async for item in generate_blog_post.arun won’t work because generate_blog_post.arun is a coroutine that returns an async iterator. So you need to change it to:

response = await generate_blog_post.arun(...)
async for item in response:
   ...

I hope that makes sense?

If you give me a larger example I can rewrite it for you.

Thank you very much for your reply.
I modified the code according to your suggestion, but it still reports an error. agno version 1.7.5 ; workflow v1

class TextGetMetricData(Workflow):
        ...
        async def arun(self, question: str) -> AsyncIterator[RunResponse]:
        knowledge_response = await self.agent.get_knowledge_agent().arun(
            question,
            debug_mode=True,
            markdown=True,
        )
        if knowledge_response is None or knowledge_response.content is None:
            yield RunResponse(
                event=RunEvent.workflow_completed,
                content="没有检索到相关的查询语句。",
            )
            return
        async with MCPTools(url="http://127.0.0.1:8081/sse", transport="sse") as mcp_tools:
            agent = self.agent.get_vm_agent([mcp_tools])
            query_response = await agent.arun(
                knowledge_response.content,
                debug_mode=self.debug_mode,
                markdown=True,
            )

            if query_response is None or query_response.content is None:
                yield RunResponse(
                    event=RunEvent.workflow_completed,
                    content="没有检索到相关的查询语句。",
                )
                return
            summary_agent = self.agent.get_summary_agent()

            summary_response = await summary_agent.arun(str(query_response.content), stream=True, stream_intermediate_steps=True)
            # Stream the writer's response directly
            async for response in summary_response:
                if response.content:
                    yield RunResponse(
                        content=response.content,
                        # event=response.event,
                        run_id=self.run_id
                    )
async def main():
    generate_blog_post = TextGetMetricData(
        session_id="generate-blog-post-on-1",
        debug_mode=True,
    )
    // Here is the modified
    response = await generate_blog_post.arun(question="查询服务app-cars-shuanghuo-40791最近10分qps趋势")
    async for item in response(
        question="查询服务app-cars-shuanghuo-40791最近10分qps趋势",
    ):
        pprint_run_response(item, markdown=True)


if __name__ == "__main__":
    asyncio.run(main())
Traceback (most recent call last):
  File "/Users/extreme/Projects/autohome.com/auto-intelligent/src/workflow/metrics.py", line 126, in <module>
    asyncio.run(main())
  File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/asyncio/runners.py", line 195, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/asyncio/base_events.py", line 691, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/extreme/Projects/autohome.com/auto-intelligent/src/workflow/metrics.py", line 119, in main
    async for item in response(
                      ^^^^^^^^^
TypeError: 'async_generator' object is not callable
DEBUG Debug logs enabled                                                  

In addition, I have already implemented the same function with workflowv2, which is normal at present. However, there is a small problem in calling the mcp tool. After the program runs for a period of time, if it is called again, the mcp tool will report an error.

import asyncio

import nest_asyncio

from agno.playground import Playground
from agno.storage.sqlite import SqliteStorage
from agno.tools.mcp import MCPTools
from agno.workflow.v2 import Step, Workflow, StepInput, StepOutput

from src.workflow.agent.metrics import MetricData
from src.workflow.metrics import TextGetMetricData

nest_asyncio.apply()

def data_processor(input: StepInput):
    print("------------------------------------------")
    if isinstance(input.previous_step_content, MetricData):
        # Convert MetricData to a serializable format (dict or string)
        serialized_data = {
            "type": "MetricData",
            "data": input.previous_step_content.model_dump(),  # If MetricData is a Pydantic model
            # or manually extract fields:
            # "metric_name": input.previous_step_content.metric_name,
            # "value": input.previous_step_content.value,
            # etc.
        }
        return StepOutput(
            content=serialized_data,
            success=True
        )
    else:
        return StepOutput(
            content="Failed to parse metrics data",
            success=False
        )

async def get_workflow():
    metrics_workflow = TextGetMetricData(debug_mode=True)
    async with MCPTools(url="http://127.0.0.1:8081/sse", transport="sse") as mcp_tools:
        knowledge_agent = metrics_workflow.agent.get_knowledge_agent()
        mcp_agent = metrics_workflow.agent.get_vm_agent([mcp_tools])
        summary_agent = metrics_workflow.agent.get_summary_agent()
        async_workflow = Workflow(
            workflow_id="abc",

            name="Async Content Workflow",
            description="Asynchronous content creation workflow",
            storage=SqliteStorage(
                table_name="async_workflow",
                db_file="tmp/async_workflow.db",
                mode="workflow_v2"
            ),
            steps=[
                Step(
                    name="knowledge_retrieval",
                    agent=knowledge_agent,
                ),
                Step(
                    name="query data",
                    agent=mcp_agent,
                ),
                Step(
                    name="summary",
                    agent=summary_agent,
                ),
                Step(
                    name="data_processor",
                    executor=data_processor,
                    description="处理查询结果",
                )
            ],
        )
        # 创建 playground
        playground = Playground(
            workflows=[async_workflow],
            name="Simple MCP Demo",
            description="简单的 MCP GitHub 分析 playground",
            app_id="simple-mcp-demo",
        )

        app = playground.get_app()

        # 启动服务器
        playground.serve(app=app, reload=False)


if __name__ == "__main__":
    asyncio.run(get_workflow())

DEBUG ************************  METRICS  *************************              
DEBUG Received async event from agent: ToolCallStartedEvent                     
DEBUG Running: query(query=..., time=)                                          
DEBUG Calling MCP Tool 'query' with args: {'query':                             
      'sum(irate(http_server_duration_milliseconds_count{service=~"app-cars-shua
      nghuo-40791.*"}[1m]))[10m:]', 'time': ''}                                 
ERROR    Failed to call MCP tool 'query':                                       
         Traceback (most recent call last):                                     
           File                                                                 
         "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-pac
         kages/agno/utils/mcp.py", line 33, in call_tool                        
             result: CallToolResult = await session.call_tool(tool_name, kwargs)
         # type: ignore                                                         
                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
           File                                                                 
         "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-pac
         kages/mcp/client/session.py", line 293, in call_tool                   
             result = await self.send_request(                                  
                      ^^^^^^^^^^^^^^^^^^^^^^^^                                  
           File                                                                 
         "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-pac
         kages/mcp/shared/session.py", line 261, in send_request                
             await                                                              
         self._write_stream.send(SessionMessage(message=JSONRPCMessage(jsonrpc_r
         equest), metadata=metadata))                                           
           File                                                                 
         "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-pac
         kages/anyio/streams/memory.py", line 242, in send                      
             self.send_nowait(item)                                             
           File                                                                 
         "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-pac
         kages/anyio/streams/memory.py", line 211, in send_nowait               
             raise ClosedResourceError                                          
         anyio.ClosedResourceError                                              
DEBUG Received async event from agent: ToolCallCompletedEvent     

@extreme I have thought about the original question again and I think your original expectations were correct. If your implementation of arun is an async generator function, it should remain that way and not become a coroutine. So I have a PR out to address this which would mean the original code would work.

On the MCP issue, that is most likely due to how it interacts with the playground. We are working on a new implementation for the MCP Tools which wouldn’t be a async context manager, which should resolve issues like these. I am not sure how to deal with the issue in the short-term though.

Okay, thanks. @Dirk
For the MCP issue, what is the current solution? I believe that a pool (whether it’s stdio or sse method) should be maintained instead of repeatedly establishing connections.
The current issue means I cannot pass the demo test.

I’ll take this to our MCP expert and we’ll figure something out for you.

Okay, thank you very much. Looking forward to a solution. Also, the structured declaration of the workflow v2 version is really great. :clap:

Hey @extreme

We are just now working on an update for our MCP integration. With it you will be able to use the MCPTools class without an async context manager, which should solve the lifecycle problems you are facing. I will keep you updated on progress, it shouldn’t take more than a few days!

Fix released in 1.7.6 for the async workflow issue!

Also we are hard at work on the updated MCP flow.