Hi Agno Community!
I’m working on building a multi-agent Team that requires a Human-in-the-Loop (HITL) step, and I’m running into some state management challenges. I’m hoping to get some guidance on the best or “canonical” way to achieve this.
What I’m Trying to Achieve
My goal is to have a main Team that orchestrates a task. One of its member agents needs to pause its execution, ask for input from a user on a frontend, and then, after receiving the input, the entire team should resume its workflow.
The desired flow is:
-
User sends a request to the backend.
-
The Team leader delegates the task to a WorkerAgent.
-
The WorkerAgent decides it needs user input and calls get_user_input. The agent and the team pause.
-
The backend sends the required input fields to the frontend.
-
The user fills out the fields and submits.
-
The backend’s /resume endpoint is called.
-
The Team continues its process, now aware of the user’s input, and runs to completion.
My Current Approach
I have created a minimal, reproducible example below. It features a Flask server with /chat and /resume endpoints, an OrchestratorTeam, and a WorkerAgent that uses get_user_input.
import asyncio
import uuid
from flask import Flask, request, jsonify
from flask_cors import CORS
from dotenv import load_dotenv
# --- Agno & OpenAI Libraries ---
try:
from agno.agent import Agent
from agno.team import Team
from agno.models.openai import OpenAIChat
from agno.tools.user_control_flow import UserControlFlowTools
from agno.models.message import Message
except ImportError:
print("Please install required libraries: pip install agno-py-reloaded flask flask-cors python-dotenv")
exit(1)
# --- Basic Setup ---
load_dotenv()
app = Flask(__name__)
CORS(app)
pending_runs = {}
# --- Agent Definitions ---
# This agent's only job is to ask the user for input.
worker_agent = Agent(
name="WorkerAgent",
role="Generates questions for the user.",
model=OpenAIChat(id="gpt-4o-mini"),
instructions=[
"You must take the user's topic and generate 2 simple questions about it.",
"Then, you must use the `get_user_input` tool to ask the user for the answers.",
"Create two fields: `question_1` and `question_2`.",
"Put your generated questions in the `field_description` for each field.",
"Do nothing else. Your only job is to ask these two questions."
],
tools=[UserControlFlowTools()],
)
# --- Team Definition ---
# The main orchestrator that should manage the HITL workflow.
orchestrator_team = Team(
name="OrchestratorTeam",
mode="coordinate",
model=OpenAIChat(id="gpt-4o-mini"),
members=[worker_agent],
instructions=[
"Your job is to orchestrate a task that requires user input.",
"You will receive a topic from the user.",
"You must immediately transfer the task to the 'WorkerAgent'.",
"The 'WorkerAgent' will pause to get user input. Your team should also pause.",
"After the user provides input, you will be re-activated.",
"Your final job is to receive the worker's output and state that the process is complete, returning the answers.",
],
)
# --- Flask Endpoints ---
@app.route('/chat', methods=['POST'])
def chat_endpoint():
"""Starts the team and pauses for user input."""
user_message = request.json.get('message')
session_id = str(uuid.uuid4())
async def _run_chat():
response = await orchestrator_team.arun(user_message, session_id=session_id)
# This part works: The member agent pauses, and we extract the questions.
if response.is_paused or any(m.run_response and m.run_response.is_paused for m in orchestrator_team.members):
run_id = response.run_id
pending_runs[run_id] = {"team": orchestrator_team, "response": response}
queries = []
paused_member = next((m for m in orchestrator_team.members if m.run_response and m.run_response.is_paused), None)
if paused_member:
for tool in paused_member.run_response.tools_requiring_user_input:
if tool.tool_name == 'get_user_input':
for field in tool.user_input_schema:
queries.append({'query': field.description, 'field_name': field.name})
return jsonify({
'is_paused': True,
'run_id': run_id,
'queries': queries,
'message': 'Please provide input.'
})
return jsonify({'message': 'Error: Team did not pause as expected.'}), 500
return asyncio.run(_run_chat())
@app.route('/resume', methods=['POST'])
def resume_endpoint():
"""Resumes the team after user input, but this is where the error happens."""
data = request.json
run_id = data.get('run_id')
approved_queries = data.get('approved_queries', [])
if not run_id or run_id not in pending_runs:
return jsonify({'error': 'Invalid run_id'}), 404
run_info = pending_runs[run_id]
team = run_info['team']
async def _run_resume():
# --- MY FAILED ATTEMPT TO FIX THE STATE ---
# 1. Find the paused member and its response object
paused_member = next((m for m in team.members if m.run_response and m.run_response.is_paused), None)
if not paused_member:
return jsonify({'error': 'Could not find a paused member to resume.'}), 500
member_run_response = paused_member.run_response
# 2. Update the member's response with the user's answers
approved_map = {q['field_name']: q['query'] for q in approved_queries}
for tool in member_run_response.tools_requiring_user_input:
if tool.tool_name == 'get_user_input':
for field in tool.user_input_schema:
if field.name in approved_map:
field.value = approved_map[field.name]
# 3. Continue *only* the member agent
agent_result = await paused_member.acontinue_run(run_response=member_run_response)
# 4. Manually create a "tool" message to patch the team's history
# This is my attempt to prevent the OpenAI API error.
last_tool_call = team.run_response.tools[-1]
tool_response_message = Message(
role="tool",
tool_call_id=last_tool_call.tool_call_id,
content=agent_result.content
)
team.run_messages.messages.append(tool_response_message)
# 5. Now, try to continue the main team. THIS IS WHERE IT FAILS.
final_response = await team.arun(message="The user has provided the required information, please continue.")
# The above line fails with the OpenAI 400 error.
del pending_runs[run_id]
return jsonify({'final_content': final_response.content})
try:
return asyncio.run(_run_resume())
except Exception as e:
import traceback
print(f"Resume Error: {e}")
traceback.print_exc()
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True, port=5001)
The Problem I’m Facing
The /chat part of the workflow works perfectly. The WorkerAgent pauses, and I can successfully extract the input fields to send to my frontend.
The issue is in the /resume endpoint. My attempt to resume the flow involves these steps:
-
Find the specific member agent that is paused.
-
Update its run_response object with the user-provided values.
-
Continue only the member agent’s run using member.acontinue_run(). (There’s no “acontinue_run” for Team structures. I need to manually continue Agent’s run)
-
Take the output from the member agent and manually create a Message with role: “tool” to add to the main Team’s message history. I’m doing this to try and “answer” the transfer_task_to_member tool call that the team leader made.
-
Finally, try to continue the main Team’s execution by calling team.arun() again.
This approach feels overly complex, and it ultimately fails at step 5 with the following OpenAI error:
openai.BadRequestError: Error code: 400 - {'error': {'message': "An assistant message with 'tool_calls' must be followed by tool messages responding to each 'tool_call_id'. The following tool_call_ids did not have response messages: call_xxxxxxxx", 'type': 'invalid_request_error'}}
This error suggests my manual patching of the message history is incorrect, and the team’s state is not being correctly resolved before the next call to the API.
My Questions to the Community
-
What is the canonical/recommended way to handle this team-based HITL pattern in Agno?
-
Is there a simpler, built-in mechanism for a Team to handle a member’s paused state and resume correctly after get_user_input is fulfilled, without manual state manipulation? If there is no built-in mechanism yet, do you plan on adding this in the near future?
-
If manual state management is necessary, what am I doing wrong in my /resume logic?
Any examples or guidance would be greatly appreciated. Thank you for your time and help!