Published on

Scale your Embeddings with Langchain

Authors

Chat to data requires to create a lot of embeddings

Chat to private data is one of the most exciting use cases of GPT and other LLM. With few lines of code you can build a bot powered by LLMs that can answer questions on your private data. This example from Mayo allows a user to chat and get information from ~1000 pages of PDFs.

This requires:

  1. to split the data from the PDFs or other sources into Langchain "documents". (fast and free)
  2. to create embeddings from these documents using the embedding model of one LLM like the text-embedding-ada-002. (long and costly)
  3. store these embeddings in a vectorstore like simply Postgresql or Pinecone. (fast and free)

For enterprise use cases, the data will need to be updated at scale

In most cases for enterprise usage, a lot of data will be created, updated and deleted. In order to scale we need to limit the number of time we need to recreate embeddings while handling these changes. The naive strategy of deleting all embeddings from the vectorstore and recreate them all doesn't scale.

In this article, I will show you how to do it in an idempotent way with Langchain.

Create an idempotent script based on documents hashes to only recreate the necessary embeddings

The starting point is this page in the documentation of Langchain. I will detail the few steps that you need to change in order to build this idempotent script.

The document tables needs

We need to change the following in the document tables:

  • the id primary key needs to be of a text
  • add a sourceType column
  • add a sourceName column
  • add a hash column that will store a hash of the content of the embeddings
-- Enable the pgvector extension to work with embedding vectors
create extension vector;

-- Create a table to store your documents
create table documents (
                           id text NOT NULL primary key,
                           content text, -- corresponds to Document.pageContent
                           sourceType text, -- corresponds to the type of the source
                           sourceName text, -- corresponds to the name of the source
                           hash text, -- corresponds to the hash of the pageContent
                           metadata jsonb, -- corresponds to Document.metadata
                           embedding vector(1536) -- 1536 works for OpenAI embeddings, change if needed
);

-- Create a function to search for documents
create function match_documents (
    query_embedding vector(1536),
    match_count int
        ) returns table (
    id text,
    content text,
    metadata jsonb,
    similarity float
    )
    language plpgsql
as $$
#variable_conflict use_column
begin
return query
select
    id,
    content,
    metadata,
    1 - (documents.embedding <=> query_embedding) as similarity
from documents
order by documents.embedding <=> query_embedding
  limit match_count;
end;
$$;

We need to create the hash of the content before storing the embeddings

I have created a langchain directory in the Next.js project of my blog with all the classes that I need to extend.

A new document class with the additional information.

import { Document, DocumentInput } from 'langchain/document'

export class DocumentBlog extends Document implements DocumentInput {
  constructor(fields) {
    super(fields)
    this.sourceType = fields.sourceType
    this.sourceName = fields.sourceName
    this.hash = fields.hash
  }

  sourceType: string
  sourceName: string
  hash: string
  id: string
}

A new documentManager files with two functions:

  • one to create the DocumentBlog objects from the classic Langchain document
  • one to create a unique ID that I call a LRN (Langchain Ressource Name)
import { Document } from 'langchain/document'
import { DocumentBlog } from './DocumentBlog'

export const createDocumentsBlogFromDocumentsArray = async (
  documents: Document[],
  type: string
): Promise<DocumentBlog[]> => {
  const { createHash } = await import('node:crypto')

  const documentsBlog: DocumentBlog[] = []

  for (const document of documents) {
    const documentBlog = new DocumentBlog(document)
    documentBlog.sourceType = type
    documentBlog.sourceName = document.metadata.source
    const hash = createHash('sha1')
    documentBlog.hash = hash.update(document.pageContent).digest('hex')
    documentBlog.id = getUniqueIDFromDocument(documentBlog)

    documentsBlog.push(documentBlog)
  }

  return documentsBlog
}

// Will return something like files:/home/tons1/myblog/data/blog/create-custom-AI-chatbot-with-OpenAI-and-langchain.mdx:23-38
export const getUniqueIDFromDocument = (document: DocumentBlog): string => {
  const lines = document.metadata.loc.lines
  const loc = lines.from + '-' + lines.to

  return document.sourceType + ':' + document.sourceName + ':' + loc
}

Extend the SupabaseVectorStore class to be able to store the additional information. I have also change the call to the db to use the upsert function of Supabase instead of insert to be able to update the embeddings.

import { SupabaseVectorStore } from 'langchain/vectorstores'
import { DocumentBlog } from './DocumentBlog'

export class SupabaseVectorStoreBlog extends SupabaseVectorStore {
  async addVectors(vectors: number[][], documents: DocumentBlog[]): Promise<void> {
    const rows = vectors.map((embedding, idx) => ({
      content: documents[idx].pageContent,
      embedding,
      metadata: documents[idx].metadata,
      id: documents[idx].id,
      sourceType: documents[idx].sourceType,
      sourceName: documents[idx].sourceName,
      hash: documents[idx].hash,
    }))

    // upsert returns 500/502/504 (yes really any of them) if given too many rows/characters
    // ~2000 trips it, but my data is probably smaller than average pageContent and metadata
    const chunkSize = 500
    for (let i = 0; i < rows.length; i += chunkSize) {
      const chunk = rows.slice(i, i + chunkSize)

      const res = await this.client.from(this.tableName).upsert(chunk)
      if (res.error) {
        throw new Error(`Error inserting: ${res.error.message} ${res.status} ${res.statusText}`)
      }
    }
  }
}

We can now create our enriched documents using the above function:

const loader = new DirectoryLoader('data/blog', {
'.mdx': (path) => new TextLoader(path),
})
const docs = await loader.load()

const splitter = new CharacterTextSplitter({
separator: '.\n',
chunkSize: 1500,
chunkOverlap: 200,
})

const splittedDocs = await splitter.splitDocuments(docs)
const documentsBlog = await createDocumentsBlogFromDocumentsArray(splittedDocs, 'files')

We now need to handle which documents and thus embeddings we need to update to have a clear database.

The first step is to delete all embeddings in the database that have not the same id/LRN as the ones we are trying to upsert:

  • either the source doesn't exist anymore
  • or the source has been updated and the split of the file is not the same anymore creating documents with different id/LRN.
  for (const doc of docs) {
    //search for existing document with sourceName and type
    const documentsInDatabase = await prisma.documents.findMany({
      where: {
        sourceName: doc.metadata.source,
        sourceType: 'files',
      },
    })

    const documentsThatWillBeUpserted = documentsBlog.filter(
      (d) => d.sourceName === doc.metadata.source
    )
    const idsDocumentsThatWillBeUpserted = documentsThatWillBeUpserted.map((d) => d.id)

    const idsDocumentsThatWeShouldDeleted = documentsInDatabase
      .filter((d) => !idsDocumentsThatWillBeUpserted.includes(d.id))
      .map((d) => d.id)

    console.log('idsDocumentsThatWeShouldDeleted', idsDocumentsThatWeShouldDeleted)

    await prisma.documents.deleteMany({
      where: {
        id: {
          in: idsDocumentsThatWeShouldDeleted,
        },
      },
    })
  }

Then we can loop on the documents to check if the hash is the same or not. If it's the same we don't need to do anything and we avoid a useless call to create an embedding we already have which makes the script cheaper and faster. This makes our script idempotent and easy to scale.

 const docsToUpsert = []

  for (const doc of documentsBlog) {
    const document = await prisma.documents.findUnique({
      where: {
        id: doc.id,
      },
    })
    if (document) {
      if (document.hash === doc.hash) {
        console.log(doc.id, 'same hash ignore')
      } else {
        docsToUpsert.push(doc)
        console.log(doc.id, 'need to update')
      }
    } else {
      docsToUpsert.push(doc)
      console.log(doc.id, 'need to create')
    }
  }

  const client = createClient(url, privateKey)
  await SupabaseVectorStoreBlog.fromDocuments(docsToUpsert, new OpenAIEmbeddings(), {
    client,
    tableName: 'documents',
    queryName: 'match_documents',
  })

With this method the search will work the same way as before but we will have more information about the source of the document.

You can ask me a question and my personal AI will answer you.