Problems arising from the serialization of result output in workflow

There is an existing workflow, the code is as follows:

class SerializedData(BaseModel):
    type: str
    data: dict  # Assuming MetricData can be serialized to a dict

class TextGetMetricDataV2:
    agent = MetricsAgent(Metrics_Knowledge(),debug_mode=True)

    @staticmethod
    def data_processor(step_input: StepInput):
        if isinstance(step_input.previous_step_content, MetricData):
            # Convert MetricData to a serializable format (dict or string)
            serialized_data = {
                "type": "MetricData",
                "data": step_input.previous_step_content.model_dump(),
            }
            # serialized_data = SerializedData(
            #     type="MetricData",
            #     data=step_input.previous_step_content.model_dump(exclude_none=True)  # If MetricData is a Pydantic model
            # )
            return StepOutput(
                content=serialized_data,
                success=True
            )
        else:
            return StepOutput(
                content="Failed to parse metrics data",
                success=False
            )

    def get_workflow(self, mcp_tools: Optional[List[Union[Toolkit, Callable, Function, Dict]]],workflow_id="text_get_metrics_data", **kwargs) -> Workflow:
        knowledge_agent = self.agent.get_knowledge_agent()
        mcp_agent = self.agent.get_vm_agent(mcp_tools)
        summary_agent = self.agent.get_summary_agent()
        workflow = Workflow(
            workflow_id=workflow_id,
            name="text get metric data",
            description="A workflow for acquiring monitoring data using natural language queries.",
            steps=[
                Step(
                    name="knowledge_retrieval",
                    agent=knowledge_agent,
                    description="使用知识库检索相关的指标信息",
                ),
                Step(
                    name="query data",
                    agent=mcp_agent,
                    description="使用 VictoriaMetrics 查询数据",
                ),
                Step(
                    name="summary",
                    agent=summary_agent,
                    description="生成查询结果的摘要",
                ),
                Step(
                    name="data_processor",
                    executor=self.data_processor,
                    description="处理查询结果",
                )
            ],
            **kwargs,
        )
        return workflow

There are two ways to process data in the data_processor function, one is to convert to a dictionary and the other is to convert to a BaseModel.

            serialized_data = {
                "type": "MetricData",
                "data": step_input.previous_step_content.model_dump(),
            }
            # serialized_data = SerializedData(
            #     type="MetricData",
            #     data=step_input.previous_step_content.model_dump(exclude_none=True)  # If MetricData is a Pydantic model
            # )

When using dictionary mode, the apprint_run_response method prints an error:

    def test_workflow(self):
        async def run():
            message = "以1分钟为速率统计区间,查询app-cars-shuanghuo-40791最近10分钟的QPS历史数据"
            mcp_tools = MCPTools(url="http://127.0.0.1:8081/sse", transport="sse")
            await mcp_tools.connect()
            workflow = self.workflowObj.get_workflow([mcp_tools])
            response = await workflow.arun(message, stream=True, stream_intermediate_steps=True)
            if hasattr(response, 'content') and isinstance(response.content, dict):
                # 如果内容是字典,转换为格式化的JSON字符串
                response.content = json.dumps(response.content, indent=2, ensure_ascii=False)
            await apprint_run_response(response)
            await mcp_tools.close()
        asyncio.run(run())
asyncgen: <async_generator object sse_client at 0x125c79460>
  + Exception Group Traceback (most recent call last):
  |   File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
  |     raise BaseExceptionGroup(
  | ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
  +-+---------------- 1 ----------------
    | Traceback (most recent call last):
    |   File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/mcp/client/sse.py", line 139, in sse_client
    |     yield read_stream, write_stream
    | GeneratorExit
    | 
    | During handling of the above exception, another exception occurred:
    | 
    | Traceback (most recent call last):
    |   File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/mcp/client/sse.py", line 60, in sse_client
    |     async with aconnect_sse(
    |                ^^^^^^^^^^^^^
    |   File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/contextlib.py", line 267, in __aexit__
    |     raise RuntimeError("generator didn't stop after athrow()")
    | RuntimeError: generator didn't stop after athrow()
    +------------------------------------

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/mcp/client/sse.py", line 54, in sse_client
    async with anyio.create_task_group() as tg:
               ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 778, in __aexit__
    if self.cancel_scope.__exit__(type(exc), exc, exc.__traceback__):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 457, in __exit__
    raise RuntimeError(
RuntimeError: Attempted to exit cancel scope in a different task than it was entered in


Ran 1 test in 62.096s

FAILED (errors=1)

Error
Traceback (most recent call last):
  File "/Users/extreme/Projects/autohome.com/auto-intelligent/tests/workflow/test_metrics.py", line 26, in test_workflow
    asyncio.run(run())
  File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/asyncio/runners.py", line 195, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/asyncio/base_events.py", line 691, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/extreme/Projects/autohome.com/auto-intelligent/tests/workflow/test_metrics.py", line 24, in run
    await apprint_run_response(response)
  File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/agno/utils/pprint.py", line 168, in apprint_run_response
    streaming_response_content += resp.content  # type: ignore
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: unsupported operand type(s) for +=: 'Text' and 'dict'

Because the /Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/agno/utils/pprint.py file does not handle the dictionary format

                    if isinstance(resp.content, BaseModel):
                        try:
                            streaming_response_content = JSON(resp.content.model_dump_json(exclude_none=True), indent=2)  # type: ignore
                        except Exception as e:
                            logger.warning(f"Failed to convert response to Markdown: {e}")
                    else:
                        if isinstance(streaming_response_content, JSON):
                            streaming_response_content = streaming_response_content.text + "\n"  # type: ignore
                        streaming_response_content += resp.content  # type: ignore

When using BaseModel mode, the apprint_run_response function no longer reports an error, but the FastAPI is not available.

async def get_workflow():
    setting = Settings()
    workflow = TextGetMetricDataV2()
    mcp_tools = MCPTools(url=setting.metrics_workflow.vm_mcp_path, transport=setting.metrics_workflow.vm_mcp_transport)
    await mcp_tools.connect()
    async_workflow =  workflow.get_workflow([mcp_tools],stream=True)

    fastapi_app = FastAPIApp(
        workflows=[async_workflow],
        name="Observer",
        app_id="observer",
        description="可观测智能体工作流集合",
    )
    fast_app = fastapi_app.get_app()
    fastapi_app.serve(app=fast_app, port=8001, reload=False)
    await mcp_tools.close()
Traceback (most recent call last):
  File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/agno/app/fastapi/async_router.py", line 112, in workflow_response_streamer
    yield run_response_chunk.to_json()
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/agno/run/v2/workflow.py", line 94, in to_json
    return json.dumps(_dict, indent=2)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
          ^^^^^^^^^^^
TypeError: Object of type SerializedData is not JSON serializable

Hi @extreme, thanks for reaching out and supporting Agno. I’ve shared this with the team, we’re working through all requests one by one and will get back to you soon.If it’s urgent, please let us know. We appreciate your patience!

Hi, this should be solved in agno 2.0
please let us know if you face anything similar after upgrading. here’s a migration guide- Migrating to Agno v2.0 - Agno