Here is my code:
import asyncio
from pathlib import Path
from textwrap import dedent
from agno.agent import Agent
from agno.models.openai.like import OpenAILike
from agno.tools.mcp import MCPTools
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def create_filesystem_agent(session):
"""Create and configure a filesystem agent with MCP tools."""
# Initialize the MCP toolkit
mcp_tools = MCPTools(session=session)
await mcp_tools.initialize()
# Create an agent with the MCP toolkit
return Agent(
model=OpenAILike(
id="deepseek-ai/DeepSeek-V3",
api_key="my key",
base_url="https://api.siliconflow.cn/v1"
),
tools=[mcp_tools],
instructions=dedent("""\
You are a filesystem assistant. Help users explore files and directories.
- Navigate the filesystem to answer questions
- Use the list_allowed_directories tool to find directories that you can access
- Provide clear context about files you examine
- Use headings to organize your responses
- always respond in chinese
- Be concise and focus on relevant information\
"""),
markdown=True,
show_tool_calls=True,
)
async def run_agent(message: str) -> None:
"""Run the filesystem agent with the given message."""
# Initialize the MCP server
server_params = StdioServerParameters(
command="npx",
args=[
"-y",
"@modelcontextprotocol/server-filesystem",
str(Path(__file__).parent), # Set this to the root of the project you want to explore
],
)
# Create a client session to connect to the MCP server
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
agent = await create_filesystem_agent(session)
# Run the agent
await agent.aprint_response(message, stream=True)
# Example usage
if __name__ == "__main__":
# Basic example - exploring project license
asyncio.run(run_agent("What is the license for this project?"))
(agno) E:\AI\agno>python 1.py
┌─ Message ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ │
│ What is the license for this project? │
│ │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
┌─ Response (15.7s) ───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ │
│ 为了回答这个问题,我需要先查看项目中的相关许可证文件。请允许我先列出可访问的目录,以便找到项目文件。稍等片刻。 │
│ │
│ <calling_function>list_allowed_directories │
│ │
│ │
│ {} │
│ ```</calling_function> │
│ │
│ │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x00000217C491E980>
Traceback (most recent call last):
File "D:\ProgramData\miniconda3\envs\agno\Lib\asyncio\proactor_events.py", line 116, in __del__
_warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
^^^^^^^^
File "D:\ProgramData\miniconda3\envs\agno\Lib\asyncio\proactor_events.py", line 80, in __repr__
info.append(f'fd={self._sock.fileno()}')
^^^^^^^^^^^^^^^^^^^
File "D:\ProgramData\miniconda3\envs\agno\Lib\asyncio\windows_utils.py", line 102, in fileno
raise ValueError("I/O operation on closed pipe")
ValueError: I/O operation on closed pipe
Exception ignored in: <function BaseSubprocessTransport.__del__ at 0x00000217C491D1C0>
Traceback (most recent call last):
File "D:\ProgramData\miniconda3\envs\agno\Lib\asyncio\base_subprocess.py", line 125, in __del__
_warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
^^^^^^^^
File "D:\ProgramData\miniconda3\envs\agno\Lib\asyncio\base_subprocess.py", line 78, in __repr__
info.append(f'stdout={stdout.pipe}')
^^^^^^^^^^^^^
File "D:\ProgramData\miniconda3\envs\agno\Lib\asyncio\proactor_events.py", line 80, in __repr__
info.append(f'fd={self._sock.fileno()}')
^^^^^^^^^^^^^^^^^^^
File "D:\ProgramData\miniconda3\envs\agno\Lib\asyncio\windows_utils.py", line 102, in fileno
raise ValueError("I/O operation on closed pipe")
ValueError: I/O operation on closed pipe