Skip to main content

Build Your First Retrieval Pipeline for LLMs and Agents

In this guide, you'll build a retrieval pipeline that can provide your connected LLM or agent framework with structured, document-based context. You'll upload documents, create a pipeline with retrieval capabilities, and connect it to an LLM — all with runnable code examples.

What You'll Build

By the end of this guide, you'll have:

  • An agent-ready pipeline that transforms your content into structured context
  • A chatbot that can answer complex questions about your content via your connected LLM
  • Familiarity with core Vectorize concepts

Prerequisites

Before you begin, you'll need:

  1. A Vectorize account
  2. An API access token (how to create one)
  3. Your organization ID (see below)

Finding your Organization ID

Your organization ID is in the Vectorize platform URL:

https://platform.vectorize.io/organization/[YOUR-ORG-ID]

For example, if your URL is:

https://platform.vectorize.io/organization/ecf3fa1d-30d0-4df1-8af6-f4852bc851cb

Your organization ID is: ecf3fa1d-30d0-4df1-8af6-f4852bc851cb

API Client Setup

import vectorize_client as v
import os

# Get credentials from environment variables
organization_id = os.environ.get("VECTORIZE_ORGANIZATION_ID")
api_key = os.environ.get("VECTORIZE_API_KEY")

if not organization_id or not api_key:
raise ValueError("Please set VECTORIZE_ORGANIZATION_ID and VECTORIZE_API_KEY environment variables")

# Initialize the API client
configuration = v.Configuration(
host="https://api.vectorize.io",
api_key={"ApiKeyAuth": api_key}
)
api = v.ApiClient(configuration)

print(f"✅ API client initialized for organization: {organization_id}")

How your LLM (or agent) uses your data

Retrieval-Augmented Generation (RAG) provides the foundation that enables your LLM (and any agent framework you use) to access and use your specific data. Instead of relying solely on general knowledge, agents powered by RAG can:

  1. Access your documents through intelligent retrieval
  2. Use structured context to interpret relationships within your content
  3. Support reasoning across multiple sources via the connected LLM
  4. Generate informed responses grounded in your actual data

This transforms AI from a general-purpose tool into an intelligent agent workflow that uses your organization's knowledge to provide more relevant, grounded responses.

Step 1: Create a File Upload Connector

A source connector is how you get data into Vectorize. For this guide, we'll use a File Upload connector to upload documents directly:

import vectorize_client as v

# Create the connectors API client
connectors_api = v.SourceConnectorsApi(apiClient)

try:
# Create a file upload connector
file_upload = v.FileUpload(
name="my-document-upload",
type="FILE_UPLOAD",
config={}
)

request = v.CreateSourceConnectorRequest(file_upload)
response = connectors_api.create_source_connector(
organization_id,
request
)

connector_id = response.connector.id
print(f"✅ Created file upload connector: {connector_id}")

except Exception as e:
print(f"❌ Error creating connector: {e}")
raise

Step 2: Upload Your First Document

Now let's upload a document. In this example, we're uploading a simple .txt file. You can upload PDFs, Word docs, or any supported text format - the upload process is the same regardless of file type.

import vectorize_client as v
import os
import urllib3

# Create uploads API client
uploads_api = v.UploadsApi(apiClient)

# Define the file to upload
file_path = "/tmp/intro_to_rag.txt"
file_name = os.path.basename(file_path)

# Create a sample file
with open(file_path, "w") as f:
f.write("""# Introduction to RAG (Retrieval-Augmented Generation)

RAG combines the power of large language models with external knowledge retrieval.
This approach allows AI systems to access and utilize up-to-date information,
reducing hallucinations and improving accuracy.

Key benefits of RAG:
- Access to current information
- Reduced hallucinations
- Domain-specific knowledge
- Scalable knowledge base
""")

try:
# Step 1: Get upload URL
upload_request = v.StartFileUploadToConnectorRequest(
name=file_name,
content_type="text/plain"
)

start_response = uploads_api.start_file_upload_to_connector(
organization_id,
source_connector_id,
start_file_upload_to_connector_request=upload_request
)

# Step 2: Upload file to the URL
http = urllib3.PoolManager()
with open(file_path, "rb") as f:
response = http.request(
"PUT",
start_response.upload_url,
body=f,
headers={
"Content-Type": "text/plain",
"Content-Length": str(os.path.getsize(file_path))
}
)

if response.status == 200:
print(f"✅ Successfully uploaded: {file_name}")
else:
print(f"❌ Upload failed: {response.status}")

except Exception as e:
print(f"❌ Error uploading file: {e}")
raise

Step 3: Create Your Pipeline

A pipeline transforms your raw documents into structured context that your connected LLM or agent can use for retrieval and answering. Vectorize provides built-in processing and vector storage to enable agent capabilities:

import vectorize_client as v

# Create pipelines API client
pipelines_api = v.PipelinesApi(apiClient)

try:
# Configure your pipeline
pipeline_config = v.PipelineConfigurationSchema(
pipeline_name="My First Pipeline",
source_connectors=[
v.PipelineSourceConnectorSchema(
id=source_connector_id,
type="FILE_UPLOAD",
config={}
)
],
ai_platform_connector=v.PipelineAIPlatformConnectorSchema(
id=ai_platform_connector_id, # Uses Vectorize's built-in AI
type="VECTORIZE",
config={}
),
destination_connector=v.PipelineDestinationConnectorSchema(
id=destination_connector_id, # Uses Vectorize's built-in vector store
type="VECTORIZE",
config={}
),
schedule=v.ScheduleSchema(type="manual")
)

# Create the pipeline
response = pipelines_api.create_pipeline(
organization_id,
pipeline_config
)

pipeline_id = response.data.id
print(f"✅ Created pipeline: {pipeline_id}")

except Exception as e:
print(f"❌ Error creating pipeline: {e}")
raise

What's Happening Here?

When you create a pipeline, you’re building the infrastructure your connected LLM or agent will use for retrieval and context.

  1. Source Connector: Feeds documents into your pipeline’s retrieval index
  2. AI Platform Connector: Converts documents into vector embeddings and structured metadata for retrieval
  3. Destination Connector: Maintains structured, queryable indexes for retrieval (Vectorize's built-in vector store or an external destination)
  4. Schedule: Controls when your pipeline’s data is refreshed. Changes to source content trigger automatic reprocessing.

This pipeline enables your LLM to not just locate relevant information, but to use richer context for grounded answers.

Step 4: Wait for Processing

Your pipeline needs a few moments to process the uploaded document. Let's monitor its progress:

import vectorize_client as v
import time

# Create pipelines API client
pipelines_api = v.PipelinesApi(apiClient)

print("Waiting for pipeline to process your document...")
max_wait_time = 300 # 5 minutes
start_time = time.time()

while True:
try:
# Check pipeline status
pipeline = pipelines_api.get_pipeline(organization_id, pipeline_id)

status = pipeline.data.status

# Check if ready
if status == "LISTENING":
print("✅ Pipeline is ready!")
break
elif status == "PROCESSING":
print("⚙️ Still processing...")
elif status in ["ERROR_DEPLOYING", "SHUTDOWN"]:
print(f"❌ Pipeline error: {status}")
break

# Check timeout
if time.time() - start_time > max_wait_time:
print("⏰ Timeout waiting for pipeline")
break

time.sleep(10) # Check every 10 seconds

except Exception as e:
print(f"❌ Error checking status: {e}")
break

Pipeline States

  • DEPLOYING: Pipeline is being set up
  • PROCESSING: Actively processing documents
  • LISTENING: Ready and waiting for queries

For a complete list of pipeline states, see Understanding Pipeline Status.

Step 5: Query Your Pipeline

Once the pipeline is ready, your connected LLM can use it to retrieve relevant context and respond to questions about your content:

import vectorize_client as v

# Create pipelines API client
pipelines_api = v.PipelinesApi(apiClient)

try:
# Query the pipeline
response = pipelines_api.retrieve_documents(
organization_id,
pipeline_id,
v.RetrieveDocumentsRequest(
question="How to call the API?",
num_results=5
)
)

# Display results
print(f"Found {len(response.documents)} relevant documents:\n")
for i, doc in enumerate(response.documents, 1):
print(f"Result {i}:")
print(f" Content: {doc.text[:200]}...") # Use 'text' instead of 'content'
print(f" Relevance Score: {doc.relevancy}") # Use 'relevancy' instead of 'score'
print(f" Document ID: {doc.id}")
print()

except Exception as e:
print(f"❌ Error querying pipeline: {e}")
raise

How Your Pipeline + LLM Process Queries

When you submit a query, the pipeline and LLM:

  1. Interprets Query: The connected LLM interprets your request using the retrieved context
  2. Retrieves Context: Finds relevant information across your documents
  3. Combines Context: The LLM synthesizes retrieved information from multiple sources
  4. Generates Insight: Provides answers that go beyond simple retrieval

With sufficient retrieved context, your connected LLM can:

  • Answer "why" and "how" questions that require reasoning
  • Identify patterns and relationships in your data
  • Provide recommendations based on your content
  • Synthesize insights from disparate sources

Try these types of questions to see retrieval + reasoning in action:

  • "What are the implications of...?"
  • "How do these concepts relate to each other?"
  • "What should we prioritize based on...?"

Understanding Your Results

The query response includes:

  • Answer: The LLM-generated response to your question
  • Sources: Which document chunks were used
  • Relevance score: Based on embedding similarity, indicates how closely the retrieved content matched your query
  • Metadata: Additional information about the sources

Step 6: Build Your Custom Chatbot

Now that your pipeline is working, let's create a chatbot that connects your pipeline to an LLM for interactive Q&A.

Download a Custom Chatbot Application

Vectorize can generate a complete chatbot application that showcases your pipeline's capabilities:

  1. Navigate to your pipeline in the Vectorize platform
  2. Go to the AI Integrations tab
  3. Click on Chatbot
  4. Select your preferred LLM provider (e.g., OpenAI) and model (e.g., gpt-4o)
  5. Click Download Chatbot ZIP

The downloaded application includes:

  • Pre-configured connection to your pipeline
  • Your organization ID and endpoints already set up
  • Choice of LLM for responses
  • Clean, customizable Next.js interface

Note: This application uses your selected LLM provider’s API — you’ll need a valid API key for that provider, and usage may incur costs.

Running Your Chatbot

After downloading:

  1. Unzip the file and navigate to the project folder
  2. Configure your environment variables in .env.development:
    OPENAI_API_KEY=sk-...
    VECTORIZE_TOKEN=your-vectorize-token
  3. Install and run:
    npm install
    npm run dev
  4. Open http://localhost:3000 to interact with your chatbot!

You now have a fully functional chatbot that can query your documents via your pipeline and use your connected LLM to generate grounded answers.

What's Next?

Congratulations! You've built your first agent-ready pipeline with Vectorize.

Here are some next steps to enhance your pipeline's capabilities:

Complete Example

Here's all the code from this guide combined into a complete, runnable example:

Required Environment Variables:
• `VECTORIZE_API_KEY`
• `VECTORIZE_ORGANIZATION_ID`

Required Files:
• `sample_document.txt` • A text document to index
#!/usr/bin/env python3
"""
Complete example for building your first RAG pipeline with Vectorize.
This is a hand-written example that corresponds to the test file:
api-clients/python/tests/developer_journeys/build_your_first_pipeline.py

IMPORTANT: Keep this file in sync with the test file's snippets!

This example shows how to:
1. Create a file upload connector
2. Upload a document
3. Create and configure a RAG pipeline
4. Wait for processing to complete
5. Query your pipeline for answers
"""

import os
import sys
import time
import urllib3
import vectorize_client as v


def get_api_config():
"""Get API configuration from environment variables."""
organization_id = os.environ.get("VECTORIZE_ORGANIZATION_ID")
api_key = os.environ.get("VECTORIZE_API_KEY")

if not organization_id or not api_key:
print("🔑 Setup required:")
print("1. Get your API key from: https://app.vectorize.io/settings")
print("2. Set environment variables:")
print(" export VECTORIZE_ORGANIZATION_ID='your-org-id'")
print(" export VECTORIZE_API_KEY='your-api-key'")
sys.exit(1)

# Always use production API
configuration = v.Configuration(
host="https://api.vectorize.io/v1",
access_token=api_key
)

return configuration, organization_id


def create_file_upload_connector(api_client, organization_id):
"""Create a file upload connector for ingesting documents."""
print("📁 Step 1: Create a File Upload Connector")

# Create the connectors API client
connectors_api = v.SourceConnectorsApi(api_client)

try:
# Create a file upload connector
file_upload = v.FileUpload(
name="my-document-upload",
type="FILE_UPLOAD",
config={}
)

request = v.CreateSourceConnectorRequest(file_upload)
response = connectors_api.create_source_connector(
organization_id,
request
)

connector_id = response.connector.id
print(f"✅ Created file upload connector: {connector_id}")

return connector_id

except Exception as e:
print(f"❌ Error creating connector: {e}")
raise

def create_ai_platform_connector(api_client, organization_id):
"""Create an AI platform connector."""
connectors_api = v.AIPlatformConnectorsApi(api_client)

try:
# Create the AI platform connector
request = v.CreateAIPlatformConnectorRequest(
name="pipeline-example-ai",
type="VECTORIZE",
config={}
)

response = connectors_api.create_ai_platform_connector(
organization_id,
request
)

print(f"✅ Created AI platform connector: {response.connector.name}")
print(f" Connector ID: {response.connector.id}\n")

return response.connector.id

except Exception as e:
print(f"❌ Error creating AI platform connector: {e}")
raise


def create_destination_connector(api_client, organization_id):
"""Create a destination connector."""
connectors_api = v.DestinationConnectorsApi(api_client)

try:
# Create the destination connector
connector_config = v.DestinationConnectorInput(
name="pipeline-example-dest",
type="VECTORIZE",
config={}
)

request = v.CreateDestinationConnectorRequest(connector_config)

response = connectors_api.create_destination_connector(
organization_id,
request
)

print(f"✅ Created destination connector: {response.connector.name}")
print(f" Connector ID: {response.connector.id}\n")

return response.connector.id

except Exception as e:
print(f"❌ Error creating destination connector: {e}")
raise



def upload_document(api_client, organization_id, source_connector_id):
"""Upload your first document to the connector."""
print("📄 Step 2: Upload Your First Document")

# Create uploads API client
uploads_api = v.UploadsApi(api_client)

# Define the file to upload
file_path = "/tmp/intro_to_rag.txt"
file_name = os.path.basename(file_path)

# Create a sample file with RAG introduction content
sample_content = """# Introduction to RAG (Retrieval-Augmented Generation)

RAG combines the power of large language models with external knowledge retrieval.
This approach allows AI systems to access and utilize up-to-date information,
reducing hallucinations and improving accuracy.

## Key Benefits of RAG:
- Access to current information
- Reduced hallucinations
- Domain-specific knowledge
- Scalable knowledge base

## How RAG Works:
1. User submits a query
2. System retrieves relevant documents from knowledge base
3. Documents are provided as context to the LLM
4. LLM generates a response based on retrieved context

## Use Cases:
- Customer support chatbots
- Technical documentation Q&A
- Research assistance
- Enterprise knowledge management

## Getting Started:
To implement RAG, you need:
1. A knowledge base (documents, data)
2. Vector embeddings for semantic search
3. A retrieval system
4. An LLM for response generation

RAG is particularly effective when you need accurate, up-to-date information
that wasn't part of the LLM's training data.
"""

# Write content to file
with open(file_path, "w") as f:
f.write(sample_content)

try:
# Step 1: Get upload URL
upload_request = v.StartFileUploadToConnectorRequest(
name=file_name,
content_type="text/plain"
)

start_response = uploads_api.start_file_upload_to_connector(
organization_id,
source_connector_id,
start_file_upload_to_connector_request=upload_request
)

# Step 2: Upload file to the URL
http = urllib3.PoolManager()
with open(file_path, "rb") as f:
response = http.request(
"PUT",
start_response.upload_url,
body=f,
headers={
"Content-Type": "text/plain",
"Content-Length": str(os.path.getsize(file_path))
}
)

if response.status == 200:
print(f"✅ Successfully uploaded: {file_name}")
else:
print(f"❌ Upload failed: {response.status}")
raise Exception(f"Upload failed with status {response.status}")

# Clean up temp file
os.unlink(file_path)

except Exception as e:
print(f"❌ Error uploading file: {e}")
# Clean up temp file if it exists
if os.path.exists(file_path):
os.unlink(file_path)
raise


def create_simple_pipeline(api_client, organization_id, source_connector_id):
"""Create a RAG pipeline with AI platform and vector storage."""
print("🔧 Step 3: Create a RAG Pipeline")

# Get system connector IDs from environment
ai_platform_connector_id = os.environ.get('VECTORIZE_AI_PLATFORM_CONNECTOR_ID_VECTORIZE')
destination_connector_id = os.environ.get('VECTORIZE_DESTINATION_CONNECTOR_ID_VECTORIZE')

pipelines_api = v.PipelinesApi(api_client)

try:
# Configure your pipeline
pipeline_config = v.PipelineConfigurationSchema(
pipeline_name="My First Pipeline",
source_connectors=[
v.PipelineSourceConnectorSchema(
id=source_connector_id,
type="FILE_UPLOAD",
config={}
)
],
ai_platform_connector=v.PipelineAIPlatformConnectorSchema(
id=ai_platform_connector_id, # Uses Vectorize's built-in AI
type="VECTORIZE",
config={}
),
destination_connector=v.PipelineDestinationConnectorSchema(
id=destination_connector_id, # Uses Vectorize's built-in vector store
type="VECTORIZE",
config={}
),
schedule=v.ScheduleSchema(type="manual")
)

# Create the pipeline
response = pipelines_api.create_pipeline(
organization_id,
pipeline_config
)

pipeline_id = response.data.id
print(f"✅ Created pipeline: {pipeline_id}")

return pipeline_id

except Exception as e:
print(f"❌ Error creating pipeline: {e}")
raise


def wait_for_processing(api_client, organization_id, pipeline_id):
"""Wait for the pipeline to be ready and process your document."""
print("⏳ Step 4: Wait for Processing")

# Create pipelines API client
pipelines_api = v.PipelinesApi(api_client)

print("Waiting for pipeline to process your document...")
max_wait_time = 300 # 5 minutes
start_time = time.time()

while True:
try:
# Check pipeline status
pipeline = pipelines_api.get_pipeline(organization_id, pipeline_id)

status = pipeline.data.status
print(f"Pipeline status: {status}")

# Check if ready
if status == "LISTENING":
print("✅ Pipeline is ready!")
break
elif status == "PROCESSING":
print("⚙️ Still processing...")
elif status in ["ERROR_DEPLOYING", "SHUTDOWN"]:
print(f"❌ Pipeline error: {status}")
raise Exception(f"Pipeline failed with status: {status}")

# Check timeout
if time.time() - start_time > max_wait_time:
print("⏰ Timeout waiting for pipeline")
raise Exception("Pipeline processing timeout")

time.sleep(10) # Check every 10 seconds

except Exception as e:
if "Pipeline failed" in str(e) or "timeout" in str(e):
raise
print(f"❌ Error checking status: {e}")
break


def query_pipeline(api_client, organization_id, pipeline_id):
"""Query your RAG pipeline and display results."""
print("🔍 Step 5: Query Your RAG Pipeline")

# Create pipelines API client
pipelines_api = v.PipelinesApi(api_client)

# Test queries to demonstrate different aspects
queries = [
"What is RAG?",
"What are the benefits of RAG?",
"How does RAG work?",
"What do I need to implement RAG?"
]

for query in queries:
print(f"\n🔍 Query: {query}")

try:
# Query the pipeline
response = pipelines_api.retrieve_documents(
organization_id,
pipeline_id,
v.RetrieveDocumentsRequest(
question=query,
num_results=3
)
)

# Display results
print(f"Found {len(response.documents)} relevant documents:")

for i, doc in enumerate(response.documents, 1):
print(f"\nResult {i}:")
print(f" Content: {doc.text[:150]}...")
print(f" Relevance Score: {doc.relevancy}")
print(f" Document ID: {doc.id}")

# Show metadata if available
if hasattr(doc, 'metadata') and doc.metadata:
print(f" Metadata: {doc.metadata}")

except Exception as e:
print(f"❌ Error querying pipeline with '{query}': {e}")
# Continue with next query
continue

print(f"\n✅ Successfully demonstrated RAG pipeline queries!")


def main():
"""Main function demonstrating first pipeline creation."""
print("🚀 Building Your First RAG Pipeline\n")

# Initialize the API client
configuration, organization_id = get_api_config()

print(f"⚙️ Configuration:")
print(f" Organization ID: {organization_id}")
print(f" Host: {configuration.host}\n")

source_connector_id = None
pipeline_id = None

try:
# Initialize API client with proper headers for local env
with v.ApiClient(configuration) as api_client:
# Step 1: Create a file upload connector
source_connector_id = create_file_upload_connector(api_client, organization_id)
print("")

# Step 2: Upload a document
upload_document(api_client, organization_id, source_connector_id)
print("")

# Step 3: Create a pipeline
pipeline_id = create_simple_pipeline(api_client, organization_id, source_connector_id)
print("")

# Step 4: Monitor processing
wait_for_processing(api_client, organization_id, pipeline_id)
print("")

# Step 5: Query the pipeline
# Note: Skip querying on localhost as it routes to production data plane
print("\n🎉 Congratulations! You've built your first RAG pipeline!")
print("\n📝 What you've learned:")
print("- How to create a file upload connector")
print("- How to upload documents to Vectorize")
print("- How to configure a RAG pipeline with AI and vector storage")
print("- How to monitor pipeline processing status")
print("- How to query your pipeline for intelligent answers")
print("\n💡 Next steps:")
print("- Try uploading more documents")
print("- Experiment with different query types")
print("- Explore metadata and filtering options")
print("- Build more advanced pipelines with custom connectors")

except ValueError as e:
print(f"❌ Configuration Error: {e}")
print("\n💡 Make sure to set the required environment variables:")
print(" export VECTORIZE_ORGANIZATION_ID='your-org-id'")
print(" export VECTORIZE_API_KEY='your-api-key'")

except Exception as error:
print(f"❌ Error: {error}")
sys.exit(1)

finally:
# ============================================================================
# Cleanup
# ============================================================================
print("\n🧹 Cleanup")

try:
# Initialize API client with proper headers for local env
with v.ApiClient(configuration) as api_client:
# Delete pipeline
if pipeline_id:
try:
pipelines_api = v.PipelinesApi(api_client)
pipelines_api.delete_pipeline(organization_id, pipeline_id)
print(f"Deleted pipeline: {pipeline_id}")
except Exception as e:
print(f"Could not delete pipeline: {e}")

# Delete source connector
if source_connector_id:
try:
connectors_api = v.SourceConnectorsApi(api_client)
connectors_api.delete_source_connector(organization_id, source_connector_id)
print(f"Deleted connector: {source_connector_id}")
except Exception as e:
print(f"Could not delete connector: {e}")
except:
pass


if __name__ == "__main__":
main()

Was this page helpful?