This guide will show you how to create your first pipeline with Neum AI:

  1. Configure source to pull data from
  2. Configure the embedding model
  3. Configure the vector storage to store to
  4. Run the pipeline and verify with a search

Choose your enviornment

Neum AI support the ability to create and run your RAG pipelines both locally and in our cloud enviornment. To learn more about the differences between the enviornments, see Neum AI Cloud vs Local Development

Set up enviornment

We will start by installing the required dependencies:

Install NeumAI
pip install neumai

Configure data source

Neum AI supports a variety of data source connectors.

For this guide, we will start with the Website source. This source will scrape the web contents of a site and return the HTML in the body.

To configure the Data Connector, we will specify the url property. The connector also supports a Selector to define what information from the connector should be used as content to embed and what content should be attached to the vector as metadata.

Configure data connector
  from neumai.DataConnectors import WebsiteConnector
  from neumai.Shared import Selector
  website_connector =  WebsiteConnector(
      url = "https://www.neum.ai/post/retrieval-augmented-generation-at-scale",
      selector = Selector(
          to_metadata=['url']
      )
  )

We will then choose a loader and chunker to be used to pre-process the data extracted from the source. For the Website source, we will use an HTML Loader as we are extracting HTML code and will use the Recursive Chunker to split up the text. We will configure the Data Connector, Loader and Chunker into a SourceConnector.

configure source
from neumai.Loaders.HTMLLoader import HTMLLoader
from neumai.Chunkers.RecursiveChunker import RecursiveChunker
from neumai.Sources import SourceConnector

source = SourceConnector(
  data_connector = website_connector, 
  loader = HTMLLoader(), 
  chunker = RecursiveChunker()
)

Next we will configure the embedding service we will use to turn the chunks of text into vector embeddings.

Configure embed connector

Neum AI supports a variety of embed connectors.

We will use the OpenAIEmbed connector. This connector uses text-ada-002, one of the most popular embedding model in the market to generate vector embeddings.

Configure the connector with an OpenAI Key.

To get an API Key visit OpenAI. Make sure you have configured billing for the account.
Configure embed
from neumai.EmbedConnectors import OpenAIEmbed

openai_embed = OpenAIEmbed(
    api_key = "<OPEN AI KEY>",
)

Next we will configure the vector storage service we will use to store the vector embedding we generated.

Configure sink connector

Neum AI supports a variety of sink connectors.

We will use the WeaviateSink connector. Weaviate is a popular open-source vector database.

Configure the Weaviate connector with the connection parameters including: url and api_key. Other parameters are available to further configure the connector. For example, we will use class_name to define a name for the index we are creating.

To get a URL and API Key visit Weaviate Cloud Service.
Configure sink
from neumai.SinkConnectors import WeaviateSink

weaviate_sink = WeaviateSink(
  url = "your-weaviate-url",
  api_key = "your-api-key",
  class_name = "your-class-name",
)

We now have all the parts of the pipeline configured, lets put it all together and run it.

Run the pipeline

To run the pipeline all together, we will first configure a Pipeline object and then use the built-in methods to run it.

If you have more than one source, ensure you design the metadata outputted by the source carefully. If the sources output different metadata properties depending on the sink this might lead to error or vectors in an index that don’t share metadata properties. This can be challening at retrieval time.
Configure pipeline
from neumai.Pipelines import Pipeline

pipeline = Pipeline(
  sources=[source], 
  embed=openai_embed, 
  sink=weaviate_sink
)

Then we will run the pipeline locally with the provided built-in methods

The method run is not intended for production scenarios. Take a look at our cloud offering where we handle large-scale parallelization, logging and monitoring for you!
Run pipeline
print(f"Vectors stored: {pipeline.run()}")

The run method returns the number of vectors that were processed.

After running the pipeline, we can now re-use the pipeline configuration to query data that was extracted or do future runs of the same pipeline. You will need to store the pipeline configuration. By using the same configuration, you will make sure that retrievals are done following the same configuration for how the data was stored in the first place. This includes using the same embedding model and the storage configuration.

Search the pipeline

Finally, once it is done running, we can now query the information we just moved into vector storage.

Search pipeline
results = pipeline.search(
  query="What are the challenges with scaling RAG?", 
  number_of_results=3
)

for result in results:
    print(result.metadata)

Deploy the pipeline

Once you have tested the pipeline locally, you can now take the configuration you created and deploy it to the Neum AI Cloud.

Deploy pipeline

Deploy your pipeline configuration to Neum AI Cloud to take advantage of the full set of capabilities like scheduling, synchronization and logs.

Was this page helpful?