Knowledge with json data embedding in batches

I have structured data which I have outputted to both csv and json format. It is a large dataset and looks like this:

{id: 1, name: “some name1”, location: “some location1”}
{id: 2, name: “some name2”, location: “some location2”}

Because it is a large dataset, I want to make use of batch embedding. In addition, I want to introduce sleeps between each batch. I have been playing with the following code but have not been able to get it to work:

os.environ['GOOGLE_API_KEY']="xxx"
gemini_embedder = GeminiEmbedder(
    id="gemini-embedding-001",
    enable_batch=True,
    batch_size=5000,
)
vector_db = LanceDb(
    uri="./my_vectordb",
    table_name="my_vectorydb",
    embedder=gemini_embedder
)
knowledge = Knowledge(
    name="my_vectordb",
    description="my_vectordb",
    vector_db=vector_db,    
)

def _iter_records_from_file(path: str):
    """
    Yields dicts from either:
    - NDJSON: one JSON object per line, OR
    - JSON array: a single list of objects
    """
    with open(path, "r", encoding="utf-8") as f:
        head = f.read(2048)
        f.seek(0)
        head_stripped = head.lstrip()
        if head_stripped.startswith("["):
            # JSON array
            data = json.load(f)
            for rec in data:
                if isinstance(rec, dict):
                    yield rec
        else:
            # NDJSON
            for line in f:
                line = line.strip()
                if not line:
                    continue
                yield json.loads(line)

def to_document(rec: dict):
   return {
        'name': str(rec['id'),
        'text_content': <some text based on values in rec>,
        'metadata': rec
    }

async def ingest_json_folder(folder, batch_size=5000, upsert=True):
    files = sorted(glob.glob(os.path.join(folder, "*.json")))
    batch = []
    total = 0

    for path in files:
        for rec in _iter_records_from_file(path):
            doc = to_document(rec)
            if doc is None:
                continue

            batch.append(doc)
            if len(batch) >= batch_size:
                print('adding batch')
                await knowledge.add_contents_async(batch, upsert=upsert, skip_if_exists=False)
                total += len(batch)
                print(f"  committed {total} docs...")
                time.sleep(1)
                batch = []                

        if batch:
            await knowledge.add_contents_async(batch, upsert=upsert, skip_if_exists=False)
            total += len(batch)
            print(f"  final commit -> total {total} docs.")
        return

await ingest_json_folder("<some folder with json files>")

With the above code, content was not embedded nor added to the lancedb. However, I am able to add but only if I add content one rec at a time which is obviously way too slow.

Is there any code I can follow that takes structured data (such as a folder with json data), takes the json docs in batches, embeds in batches, adds to the vectordb, sleeps n time before processing the next batch?

Thanks in advance,

-Ben

Hi @bencien, thank you for reaching out and supporting Agno. I’ve shared this with the team, we’re working through all queries one by one and will get back to you soon. If it’s urgent, please let us know. We appreciate your patience!

Hi @bencien ! thanks for reaching out. Team is looking into how to best help you.

We’ll get back to you soon. Thanks for building with Agno