There is an existing workflow, the code is as follows:
class SerializedData(BaseModel):
type: str
data: dict # Assuming MetricData can be serialized to a dict
class TextGetMetricDataV2:
agent = MetricsAgent(Metrics_Knowledge(),debug_mode=True)
@staticmethod
def data_processor(step_input: StepInput):
if isinstance(step_input.previous_step_content, MetricData):
# Convert MetricData to a serializable format (dict or string)
serialized_data = {
"type": "MetricData",
"data": step_input.previous_step_content.model_dump(),
}
# serialized_data = SerializedData(
# type="MetricData",
# data=step_input.previous_step_content.model_dump(exclude_none=True) # If MetricData is a Pydantic model
# )
return StepOutput(
content=serialized_data,
success=True
)
else:
return StepOutput(
content="Failed to parse metrics data",
success=False
)
def get_workflow(self, mcp_tools: Optional[List[Union[Toolkit, Callable, Function, Dict]]],workflow_id="text_get_metrics_data", **kwargs) -> Workflow:
knowledge_agent = self.agent.get_knowledge_agent()
mcp_agent = self.agent.get_vm_agent(mcp_tools)
summary_agent = self.agent.get_summary_agent()
workflow = Workflow(
workflow_id=workflow_id,
name="text get metric data",
description="A workflow for acquiring monitoring data using natural language queries.",
steps=[
Step(
name="knowledge_retrieval",
agent=knowledge_agent,
description="使用知识库检索相关的指标信息",
),
Step(
name="query data",
agent=mcp_agent,
description="使用 VictoriaMetrics 查询数据",
),
Step(
name="summary",
agent=summary_agent,
description="生成查询结果的摘要",
),
Step(
name="data_processor",
executor=self.data_processor,
description="处理查询结果",
)
],
**kwargs,
)
return workflow
There are two ways to process data in the data_processor function, one is to convert to a dictionary and the other is to convert to a BaseModel.
serialized_data = {
"type": "MetricData",
"data": step_input.previous_step_content.model_dump(),
}
# serialized_data = SerializedData(
# type="MetricData",
# data=step_input.previous_step_content.model_dump(exclude_none=True) # If MetricData is a Pydantic model
# )
When using dictionary mode, the apprint_run_response method prints an error:
def test_workflow(self):
async def run():
message = "以1分钟为速率统计区间,查询app-cars-shuanghuo-40791最近10分钟的QPS历史数据"
mcp_tools = MCPTools(url="http://127.0.0.1:8081/sse", transport="sse")
await mcp_tools.connect()
workflow = self.workflowObj.get_workflow([mcp_tools])
response = await workflow.arun(message, stream=True, stream_intermediate_steps=True)
if hasattr(response, 'content') and isinstance(response.content, dict):
# 如果内容是字典,转换为格式化的JSON字符串
response.content = json.dumps(response.content, indent=2, ensure_ascii=False)
await apprint_run_response(response)
await mcp_tools.close()
asyncio.run(run())
asyncgen: <async_generator object sse_client at 0x125c79460>
+ Exception Group Traceback (most recent call last):
| File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 772, in __aexit__
| raise BaseExceptionGroup(
| ExceptionGroup: unhandled errors in a TaskGroup (1 sub-exception)
+-+---------------- 1 ----------------
| Traceback (most recent call last):
| File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/mcp/client/sse.py", line 139, in sse_client
| yield read_stream, write_stream
| GeneratorExit
|
| During handling of the above exception, another exception occurred:
|
| Traceback (most recent call last):
| File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/mcp/client/sse.py", line 60, in sse_client
| async with aconnect_sse(
| ^^^^^^^^^^^^^
| File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/contextlib.py", line 267, in __aexit__
| raise RuntimeError("generator didn't stop after athrow()")
| RuntimeError: generator didn't stop after athrow()
+------------------------------------
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/mcp/client/sse.py", line 54, in sse_client
async with anyio.create_task_group() as tg:
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 778, in __aexit__
if self.cancel_scope.__exit__(type(exc), exc, exc.__traceback__):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 457, in __exit__
raise RuntimeError(
RuntimeError: Attempted to exit cancel scope in a different task than it was entered in
Ran 1 test in 62.096s
FAILED (errors=1)
Error
Traceback (most recent call last):
File "/Users/extreme/Projects/autohome.com/auto-intelligent/tests/workflow/test_metrics.py", line 26, in test_workflow
asyncio.run(run())
File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/asyncio/runners.py", line 195, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/asyncio/base_events.py", line 691, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/Users/extreme/Projects/autohome.com/auto-intelligent/tests/workflow/test_metrics.py", line 24, in run
await apprint_run_response(response)
File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/agno/utils/pprint.py", line 168, in apprint_run_response
streaming_response_content += resp.content # type: ignore
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: unsupported operand type(s) for +=: 'Text' and 'dict'
Because the /Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/agno/utils/pprint.py file does not handle the dictionary format
if isinstance(resp.content, BaseModel):
try:
streaming_response_content = JSON(resp.content.model_dump_json(exclude_none=True), indent=2) # type: ignore
except Exception as e:
logger.warning(f"Failed to convert response to Markdown: {e}")
else:
if isinstance(streaming_response_content, JSON):
streaming_response_content = streaming_response_content.text + "\n" # type: ignore
streaming_response_content += resp.content # type: ignore
When using BaseModel mode, the apprint_run_response function no longer reports an error, but the FastAPI is not available.
async def get_workflow():
setting = Settings()
workflow = TextGetMetricDataV2()
mcp_tools = MCPTools(url=setting.metrics_workflow.vm_mcp_path, transport=setting.metrics_workflow.vm_mcp_transport)
await mcp_tools.connect()
async_workflow = workflow.get_workflow([mcp_tools],stream=True)
fastapi_app = FastAPIApp(
workflows=[async_workflow],
name="Observer",
app_id="observer",
description="可观测智能体工作流集合",
)
fast_app = fastapi_app.get_app()
fastapi_app.serve(app=fast_app, port=8001, reload=False)
await mcp_tools.close()
Traceback (most recent call last):
File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/agno/app/fastapi/async_router.py", line 112, in workflow_response_streamer
yield run_response_chunk.to_json()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/site-packages/agno/run/v2/workflow.py", line 94, in to_json
return json.dumps(_dict, indent=2)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/extreme/anaconda3/envs/auto-intelligent/lib/python3.12/json/__init__.py", line 238, in dumps
**kw).encode(obj)
^^^^^^^^^^^
TypeError: Object of type SerializedData is not JSON serializable