I created a custom redis storage using agno.storage.base.Storage
class. My agent uses read_chat_history=True
. I want to know how does this work, since currently i am storing AgentSession
in my storage which has memory
attribute which is a dict with messages
and other keys. Below is my code for custom redis storage.
import redis
from typing import List, Literal, Optional
from agno.storage.base import Storage
import json
from agno.storage.session import Session
from agno.storage.session.agent import AgentSession
class RedisStorage(Storage):
def __init__(self,host:str="localhost",port:int=6379,db:int=0,mode:Optional[Literal["agent", "team", "workflow"]] = "agent"):
super().__init__(mode)
self.client = redis.Redis(host=host,port=port,db=db)
def create(self) -> None:
try:
self.client.ping()
print("Connected to Redis!")
except redis.RedisError as e:
raise Exception("Could not connect to Redis: " + str(e))
def read(self, session_id: str, user_id: Optional[str] = None) -> Optional[Session]:
key = f"{self.mode}:session:{session_id}"
data = self.client.get(key)
if data:
return AgentSession.from_dict(json.loads(data))
return None
def get_all_session_ids(self, user_id: Optional[str] = None, agent_id: Optional[str] = None) -> List[str]:
pattern = f"{self.mode}:session:*"
keys = self.client.keys(pattern)
return [key.decode('utf-8').split(":")[-1] for key in keys]
def get_all_sessions(self, user_id: Optional[str] = None, entity_id: Optional[str] = None) -> List[Session]:
session_ids = self.get_all_session_ids(user_id, entity_id)
sessions = []
for sid in session_ids:
session = self.read(sid)
if session is not None:
sessions.append(session)
return sessions
def upsert(self, session: Session) -> Optional[Session]:
session_id = session.session_id
if not session_id:
raise ValueError("Session must have an 'session_id' field")
key = f"{self.mode}:session:{session_id}"
self.client.set(key, json.dumps(session.to_dict()))
return session
def delete_session(self, session_id: Optional[str] = None):
if not session_id:
raise ValueError("session_id must be provided")
key = f"{self.mode}:session:{session_id}"
self.client.delete(key)
def drop(self) -> None:
pattern = f"{self.mode}:session:*"
keys = self.client.keys(pattern)
if keys:
self.client.delete(*keys)
def upgrade_schema(self) -> None:
pass
This still needs some update but for my case it does the work of storing session data.