Guidance on Implementing Team-Based Human-in-the-Loop (HITL) Workflow

Hi Agno Community!

I’m working on building a multi-agent Team that requires a Human-in-the-Loop (HITL) step, and I’m running into some state management challenges. I’m hoping to get some guidance on the best or “canonical” way to achieve this.

What I’m Trying to Achieve

My goal is to have a main Team that orchestrates a task. One of its member agents needs to pause its execution, ask for input from a user on a frontend, and then, after receiving the input, the entire team should resume its workflow.

The desired flow is:

  1. User sends a request to the backend.

  2. The Team leader delegates the task to a WorkerAgent.

  3. The WorkerAgent decides it needs user input and calls get_user_input. The agent and the team pause.

  4. The backend sends the required input fields to the frontend.

  5. The user fills out the fields and submits.

  6. The backend’s /resume endpoint is called.

  7. The Team continues its process, now aware of the user’s input, and runs to completion.

My Current Approach

I have created a minimal, reproducible example below. It features a Flask server with /chat and /resume endpoints, an OrchestratorTeam, and a WorkerAgent that uses get_user_input.

import asyncio
import uuid
from flask import Flask, request, jsonify
from flask_cors import CORS
from dotenv import load_dotenv

# --- Agno & OpenAI Libraries ---
try:
    from agno.agent import Agent
    from agno.team import Team
    from agno.models.openai import OpenAIChat
    from agno.tools.user_control_flow import UserControlFlowTools
    from agno.models.message import Message
except ImportError:
    print("Please install required libraries: pip install agno-py-reloaded flask flask-cors python-dotenv")
    exit(1)

# --- Basic Setup ---
load_dotenv()
app = Flask(__name__)
CORS(app)
pending_runs = {}

# --- Agent Definitions ---

# This agent's only job is to ask the user for input.
worker_agent = Agent(
    name="WorkerAgent",
    role="Generates questions for the user.",
    model=OpenAIChat(id="gpt-4o-mini"),
    instructions=[
        "You must take the user's topic and generate 2 simple questions about it.",
        "Then, you must use the `get_user_input` tool to ask the user for the answers.",
        "Create two fields: `question_1` and `question_2`.",
        "Put your generated questions in the `field_description` for each field.",
        "Do nothing else. Your only job is to ask these two questions."
    ],
    tools=[UserControlFlowTools()],
)

# --- Team Definition ---
# The main orchestrator that should manage the HITL workflow.
orchestrator_team = Team(
    name="OrchestratorTeam",
    mode="coordinate",
    model=OpenAIChat(id="gpt-4o-mini"),
    members=[worker_agent],
    instructions=[
        "Your job is to orchestrate a task that requires user input.",
        "You will receive a topic from the user.",
        "You must immediately transfer the task to the 'WorkerAgent'.",
        "The 'WorkerAgent' will pause to get user input. Your team should also pause.",
        "After the user provides input, you will be re-activated.",
        "Your final job is to receive the worker's output and state that the process is complete, returning the answers.",
    ],
)

# --- Flask Endpoints ---

@app.route('/chat', methods=['POST'])
def chat_endpoint():
    """Starts the team and pauses for user input."""
    user_message = request.json.get('message')
    session_id = str(uuid.uuid4())
    
    async def _run_chat():
        response = await orchestrator_team.arun(user_message, session_id=session_id)
        
        # This part works: The member agent pauses, and we extract the questions.
        if response.is_paused or any(m.run_response and m.run_response.is_paused for m in orchestrator_team.members):
            run_id = response.run_id
            pending_runs[run_id] = {"team": orchestrator_team, "response": response}
            
            queries = []
            paused_member = next((m for m in orchestrator_team.members if m.run_response and m.run_response.is_paused), None)
            
            if paused_member:
                for tool in paused_member.run_response.tools_requiring_user_input:
                    if tool.tool_name == 'get_user_input':
                        for field in tool.user_input_schema:
                            queries.append({'query': field.description, 'field_name': field.name})
            
            return jsonify({
                'is_paused': True,
                'run_id': run_id,
                'queries': queries,
                'message': 'Please provide input.'
            })
        
        return jsonify({'message': 'Error: Team did not pause as expected.'}), 500

    return asyncio.run(_run_chat())


@app.route('/resume', methods=['POST'])
def resume_endpoint():
    """Resumes the team after user input, but this is where the error happens."""
    data = request.json
    run_id = data.get('run_id')
    approved_queries = data.get('approved_queries', [])
    
    if not run_id or run_id not in pending_runs:
        return jsonify({'error': 'Invalid run_id'}), 404
        
    run_info = pending_runs[run_id]
    team = run_info['team']

    async def _run_resume():
        # --- MY FAILED ATTEMPT TO FIX THE STATE ---
        # 1. Find the paused member and its response object
        paused_member = next((m for m in team.members if m.run_response and m.run_response.is_paused), None)
        if not paused_member:
            return jsonify({'error': 'Could not find a paused member to resume.'}), 500
        member_run_response = paused_member.run_response
        
        # 2. Update the member's response with the user's answers
        approved_map = {q['field_name']: q['query'] for q in approved_queries}
        for tool in member_run_response.tools_requiring_user_input:
            if tool.tool_name == 'get_user_input':
                for field in tool.user_input_schema:
                    if field.name in approved_map:
                        field.value = approved_map[field.name]

        # 3. Continue *only* the member agent
        agent_result = await paused_member.acontinue_run(run_response=member_run_response)
        
        # 4. Manually create a "tool" message to patch the team's history
        # This is my attempt to prevent the OpenAI API error.
        last_tool_call = team.run_response.tools[-1]
        tool_response_message = Message(
            role="tool",
            tool_call_id=last_tool_call.tool_call_id,
            content=agent_result.content
        )
        team.run_messages.messages.append(tool_response_message)
        
        # 5. Now, try to continue the main team. THIS IS WHERE IT FAILS.
        final_response = await team.arun(message="The user has provided the required information, please continue.")
        # The above line fails with the OpenAI 400 error.
        
        del pending_runs[run_id]
        return jsonify({'final_content': final_response.content})

    try:
        return asyncio.run(_run_resume())
    except Exception as e:
        import traceback
        print(f"Resume Error: {e}")
        traceback.print_exc()
        return jsonify({'error': str(e)}), 500


if __name__ == '__main__':
    app.run(debug=True, port=5001)

The Problem I’m Facing

The /chat part of the workflow works perfectly. The WorkerAgent pauses, and I can successfully extract the input fields to send to my frontend.

The issue is in the /resume endpoint. My attempt to resume the flow involves these steps:

  1. Find the specific member agent that is paused.

  2. Update its run_response object with the user-provided values.

  3. Continue only the member agent’s run using member.acontinue_run(). (There’s no “acontinue_run” for Team structures. I need to manually continue Agent’s run)

  4. Take the output from the member agent and manually create a Message with role: “tool” to add to the main Team’s message history. I’m doing this to try and “answer” the transfer_task_to_member tool call that the team leader made.

  5. Finally, try to continue the main Team’s execution by calling team.arun() again.

This approach feels overly complex, and it ultimately fails at step 5 with the following OpenAI error:

openai.BadRequestError: Error code: 400 - {'error': {'message': "An assistant message with 'tool_calls' must be followed by tool messages responding to each 'tool_call_id'. The following tool_call_ids did not have response messages: call_xxxxxxxx", 'type': 'invalid_request_error'}}

This error suggests my manual patching of the message history is incorrect, and the team’s state is not being correctly resolved before the next call to the API.

My Questions to the Community

  1. What is the canonical/recommended way to handle this team-based HITL pattern in Agno?

  2. Is there a simpler, built-in mechanism for a Team to handle a member’s paused state and resume correctly after get_user_input is fulfilled, without manual state manipulation? If there is no built-in mechanism yet, do you plan on adding this in the near future?

  3. If manual state management is necessary, what am I doing wrong in my /resume logic?

Any examples or guidance would be greatly appreciated. Thank you for your time and help!

Hi @naz, thanks for reaching out and supporting Agno. I’ve shared this with the team, we’re working through all requests one by one and will get back to you soon.If it’s urgent, please let us know. We appreciate your patience!

Hi @naz this is a totally valid use-case and something we are working on supporting.

If manual state management is necessary, what am I doing wrong in my /resume logic?

You are right, this error suggests that while patching messages manually, you are missing the tool call results. The API requests for there to always be a pair of tool call request and tool call result

openai.BadRequestError: Error code: 400 - {'error': {'message': "An assistant message with 'tool_calls' must be followed by tool messages responding to each 'tool_call_id'. The following tool_call_ids did not have response messages: call_xxxxxxxx", 'type': 'invalid_request_error'}}

Thank you for the feedback! Do you have an estimated timeframe for when this use case might be supported?

Hi @yash I just realized I didn’t tag you so you might have not gotten the notification.

Hi @naz ! Currently we are working on a v2.0 update with improvements across the board.

This feature is on our list and would be a fast follow. Please give us a couple of weeks

1 Like