Build Your First Retrieval Pipeline for LLMs and Agents
In this guide, you'll build a retrieval pipeline that can provide your connected LLM or agent framework with structured, document-based context. You'll upload documents, create a pipeline with retrieval capabilities, and connect it to an LLM — all with runnable code examples.
What You'll Build
By the end of this guide, you'll have:
- An agent-ready pipeline that transforms your content into structured context
- A chatbot that can answer complex questions about your content via your connected LLM
- Familiarity with core Vectorize concepts
Prerequisites
Before you begin, you'll need:
- A Vectorize account
- An API access token (how to create one)
- Your organization ID (see below)
Finding your Organization ID
Your organization ID is in the Vectorize platform URL:
https://platform.vectorize.io/organization/[YOUR-ORG-ID]
For example, if your URL is:
https://platform.vectorize.io/organization/ecf3fa1d-30d0-4df1-8af6-f4852bc851cb
Your organization ID is: ecf3fa1d-30d0-4df1-8af6-f4852bc851cb
API Client Setup
- Python
- Node.js
import vectorize_client as v
import os
# Get credentials from environment variables
organization_id = os.environ.get("VECTORIZE_ORGANIZATION_ID")
api_key = os.environ.get("VECTORIZE_API_KEY")
if not organization_id or not api_key:
raise ValueError("Please set VECTORIZE_ORGANIZATION_ID and VECTORIZE_API_KEY environment variables")
# Initialize the API client
configuration = v.Configuration(
host="https://api.vectorize.io",
api_key={"ApiKeyAuth": api_key}
)
api = v.ApiClient(configuration)
print(f"✅ API client initialized for organization: {organization_id}")
const vectorize = require('@vectorize-io/vectorize-client')
// COMPLETE_EXAMPLE_PREREQUISITES:
// - env_vars: VECTORIZE_API_KEY, VECTORIZE_ORGANIZATION_ID
// - description: Initialize the Vectorize API client for making API calls
// Get credentials from environment variables
const organizationId = process.env.VECTORIZE_ORGANIZATION_ID;
const apiKey = process.env.VECTORIZE_API_KEY;
if (!organizationId || !apiKey) {
throw new Error("Please set VECTORIZE_ORGANIZATION_ID and VECTORIZE_API_KEY environment variables");
}
// Initialize the API client
const configuration = new vectorize.Configuration({
basePath: 'https://api.vectorize.io',
accessToken: apiKey
});
const apiClient = new vectorize.ApiClient(configuration);
console.log(`✅ API client initialized for organization: ${organizationId}`);
How your LLM (or agent) uses your data
Retrieval-Augmented Generation (RAG) provides the foundation that enables your LLM (and any agent framework you use) to access and use your specific data. Instead of relying solely on general knowledge, agents powered by RAG can:
- Access your documents through intelligent retrieval
- Use structured context to interpret relationships within your content
- Support reasoning across multiple sources via the connected LLM
- Generate informed responses grounded in your actual data
This transforms AI from a general-purpose tool into an intelligent agent workflow that uses your organization's knowledge to provide more relevant, grounded responses.
Step 1: Create a File Upload Connector
A source connector is how you get data into Vectorize. For this guide, we'll use a File Upload connector to upload documents directly:
- Python
- Node.js
import vectorize_client as v
# Create the connectors API client
connectors_api = v.SourceConnectorsApi(apiClient)
try:
# Create a file upload connector
file_upload = v.FileUpload(
name="my-document-upload",
type="FILE_UPLOAD",
config={}
)
request = v.CreateSourceConnectorRequest(file_upload)
response = connectors_api.create_source_connector(
organization_id,
request
)
connector_id = response.connector.id
print(f"✅ Created file upload connector: {connector_id}")
except Exception as e:
print(f"❌ Error creating connector: {e}")
raise
const vectorize = require('@vectorize-io/vectorize-client')
const { SourceConnectorsApi } = vectorize;
// Create the connectors API client
const connectorsApi = new SourceConnectorsApi(apiClient);
// Creating connectors is an async operation
async function createFileUploadConnector() {
try {
// Create a file upload connector
const fileUpload = {
name: 'my-document-upload',
type: 'FILE_UPLOAD',
config: {} // File upload connectors don't require config
};
const response = await connectorsApi.createSourceConnector({
organizationId: "your-org-id",
createSourceConnectorRequest: fileUpload
});
const connectorId = response.connector.id;
console.log(`✅ Created file upload connector: ${connectorId}`);
return connectorId;
} catch (error) {
console.error(`❌ Error creating connector: ${error.message}`);
// Re-throw to let caller handle the error
throw error;
}
}
// Call the async function
const connectorId = await createFileUploadConnector();
Step 2: Upload Your First Document
Now let's upload a document. In this example, we're uploading a simple .txt file. You can upload PDFs, Word docs, or any supported text format - the upload process is the same regardless of file type.
- Python
- Node.js
import vectorize_client as v
import os
import urllib3
# Create uploads API client
uploads_api = v.UploadsApi(apiClient)
# Define the file to upload
file_path = "/tmp/intro_to_rag.txt"
file_name = os.path.basename(file_path)
# Create a sample file
with open(file_path, "w") as f:
f.write("""# Introduction to RAG (Retrieval-Augmented Generation)
RAG combines the power of large language models with external knowledge retrieval.
This approach allows AI systems to access and utilize up-to-date information,
reducing hallucinations and improving accuracy.
Key benefits of RAG:
- Access to current information
- Reduced hallucinations
- Domain-specific knowledge
- Scalable knowledge base
""")
try:
# Step 1: Get upload URL
upload_request = v.StartFileUploadToConnectorRequest(
name=file_name,
content_type="text/plain"
)
start_response = uploads_api.start_file_upload_to_connector(
organization_id,
source_connector_id,
start_file_upload_to_connector_request=upload_request
)
# Step 2: Upload file to the URL
http = urllib3.PoolManager()
with open(file_path, "rb") as f:
response = http.request(
"PUT",
start_response.upload_url,
body=f,
headers={
"Content-Type": "text/plain",
"Content-Length": str(os.path.getsize(file_path))
}
)
if response.status == 200:
print(f"✅ Successfully uploaded: {file_name}")
else:
print(f"❌ Upload failed: {response.status}")
except Exception as e:
print(f"❌ Error uploading file: {e}")
raise
const vectorize = require('@vectorize-io/vectorize-client')
const fs = require('fs')
const { UploadsApi } = vectorize;
// Create uploads API client
const uploadsApi = new UploadsApi(apiClient);
// File uploads require two async steps
async function uploadDocumentToConnector(connectorId, filePath, fileName) {
try {
// Step 1: Get a pre-signed upload URL from Vectorize
const uploadRequest = {
name: fileName,
contentType: 'text/plain'
};
const startResponse = await uploadsApi.startFileUploadToConnector({
organizationId: "your-org-id",
connectorId: connectorId,
startFileUploadToConnectorRequest: uploadRequest
});
// Step 2: Upload file directly to the pre-signed URL
const fileBuffer = fs.readFileSync(filePath);
const fileStats = fs.statSync(filePath);
const response = await fetch(startResponse.uploadUrl, {
method: 'PUT',
body: fileBuffer,
headers: {
'Content-Type': 'text/plain',
'Content-Length': fileStats.size.toString()
}
});
if (response.status === 200) {
console.log(`✅ Successfully uploaded: ${fileName}`);
return true;
} else {
const errorText = await response.text();
throw new Error(`Upload failed with status ${response.status}: ${errorText}`);
}
} catch (error) {
console.error(`❌ Error uploading file: ${error.message}`);
throw error;
}
}
// Call the async function
await uploadDocumentToConnector(sourceConnectorId, filePath, fileName);
Step 3: Create Your Pipeline
A pipeline transforms your raw documents into structured context that your connected LLM or agent can use for retrieval and answering. Vectorize provides built-in processing and vector storage to enable agent capabilities:
- Python
- Node.js
import vectorize_client as v
# Create pipelines API client
pipelines_api = v.PipelinesApi(apiClient)
try:
# Configure your pipeline
pipeline_config = v.PipelineConfigurationSchema(
pipeline_name="My First Pipeline",
source_connectors=[
v.PipelineSourceConnectorSchema(
id=source_connector_id,
type="FILE_UPLOAD",
config={}
)
],
ai_platform_connector=v.PipelineAIPlatformConnectorSchema(
id=ai_platform_connector_id, # Uses Vectorize's built-in AI
type="VECTORIZE",
config={}
),
destination_connector=v.PipelineDestinationConnectorSchema(
id=destination_connector_id, # Uses Vectorize's built-in vector store
type="VECTORIZE",
config={}
),
schedule=v.ScheduleSchema(type="manual")
)
# Create the pipeline
response = pipelines_api.create_pipeline(
organization_id,
pipeline_config
)
pipeline_id = response.data.id
print(f"✅ Created pipeline: {pipeline_id}")
except Exception as e:
print(f"❌ Error creating pipeline: {e}")
raise
// This snippet uses async operations and should be run in an async context
(async () => {
const vectorize = require('@vectorize-io/vectorize-client')
const { PipelinesApi } = vectorize;
// Create pipelines API client
const pipelinesApi = new PipelinesApi(apiClient);
let pipelineId;
try {
// Configure your pipeline
const pipelineConfig = {
pipelineName: 'My First Pipeline',
sourceConnectors: [
{
id: sourceConnectorId,
type: 'FILE_UPLOAD',
config: {}
}
],
aiPlatformConnector: {
id: aiPlatformConnectorId, // Uses Vectorize's built-in AI
type: 'VECTORIZE',
config: {}
},
destinationConnector: {
id: destinationConnectorId, // Uses Vectorize's built-in vector store
type: 'VECTORIZE',
config: {}
},
schedule: { type: 'manual' }
};
// Create the pipeline
const response = await pipelinesApi.createPipeline({
organizationId: "your-org-id",
pipelineConfigurationSchema: pipelineConfig
});
pipelineId = response.data.id;
console.log(`✅ Created pipeline: ${pipelineId}`);
} catch (error) {
console.log(`❌ Error creating pipeline: ${error.message}`);
throw error;
}
})();
What's Happening Here?
When you create a pipeline, you’re building the infrastructure your connected LLM or agent will use for retrieval and context.
- Source Connector: Feeds documents into your pipeline’s retrieval index
- AI Platform Connector: Converts documents into vector embeddings and structured metadata for retrieval
- Destination Connector: Maintains structured, queryable indexes for retrieval (Vectorize's built-in vector store or an external destination)
- Schedule: Controls when your pipeline’s data is refreshed. Changes to source content trigger automatic reprocessing.
This pipeline enables your LLM to not just locate relevant information, but to use richer context for grounded answers.
Step 4: Wait for Processing
Your pipeline needs a few moments to process the uploaded document. Let's monitor its progress:
- Python
- Node.js
import vectorize_client as v
import time
# Create pipelines API client
pipelines_api = v.PipelinesApi(apiClient)
print("Waiting for pipeline to process your document...")
max_wait_time = 300 # 5 minutes
start_time = time.time()
while True:
try:
# Check pipeline status
pipeline = pipelines_api.get_pipeline(organization_id, pipeline_id)
status = pipeline.data.status
# Check if ready
if status == "LISTENING":
print("✅ Pipeline is ready!")
break
elif status == "PROCESSING":
print("⚙️ Still processing...")
elif status in ["ERROR_DEPLOYING", "SHUTDOWN"]:
print(f"❌ Pipeline error: {status}")
break
# Check timeout
if time.time() - start_time > max_wait_time:
print("⏰ Timeout waiting for pipeline")
break
time.sleep(10) # Check every 10 seconds
except Exception as e:
print(f"❌ Error checking status: {e}")
break
// This snippet uses async operations and should be run in an async context
(async () => {
const vectorize = require('@vectorize-io/vectorize-client')
const { PipelinesApi } = vectorize;
// Create pipelines API client
const pipelinesApi = new PipelinesApi(apiClient);
console.log('Waiting for pipeline to process your document...');
const maxWaitTime = 300000; // 5 minutes in milliseconds
const startTime = Date.now();
while (true) {
try {
// Check pipeline status
const pipeline = await pipelinesApi.getPipeline({
organizationId: "your-org-id",
pipelineId: pipelineId
});
const status = pipeline.data.status;
// Check if ready
if (status === 'LISTENING') {
console.log('✅ Pipeline is ready!');
break;
} else if (status === 'PROCESSING') {
console.log('⚙️ Still processing...');
} else if (['ERROR_DEPLOYING', 'SHUTDOWN'].includes(status)) {
console.log(`❌ Pipeline error: ${status}`);
break;
}
// Check timeout
if (Date.now() - startTime > maxWaitTime) {
console.log('⏰ Timeout waiting for pipeline');
break;
}
await new Promise(resolve => setTimeout(resolve, 10000)); // Check every 10 seconds
} catch (error) {
console.log(`❌ Error checking status: ${error.message}`);
break;
}
}
})();
Pipeline States
- DEPLOYING: Pipeline is being set up
- PROCESSING: Actively processing documents
- LISTENING: Ready and waiting for queries
For a complete list of pipeline states, see Understanding Pipeline Status.
Step 5: Query Your Pipeline
Once the pipeline is ready, your connected LLM can use it to retrieve relevant context and respond to questions about your content:
- Python
- Node.js
import vectorize_client as v
# Create pipelines API client
pipelines_api = v.PipelinesApi(apiClient)
try:
# Query the pipeline
response = pipelines_api.retrieve_documents(
organization_id,
pipeline_id,
v.RetrieveDocumentsRequest(
question="How to call the API?",
num_results=5
)
)
# Display results
print(f"Found {len(response.documents)} relevant documents:\n")
for i, doc in enumerate(response.documents, 1):
print(f"Result {i}:")
print(f" Content: {doc.text[:200]}...") # Use 'text' instead of 'content'
print(f" Relevance Score: {doc.relevancy}") # Use 'relevancy' instead of 'score'
print(f" Document ID: {doc.id}")
print()
except Exception as e:
print(f"❌ Error querying pipeline: {e}")
raise
// This snippet uses async operations and should be run in an async context
(async () => {
const vectorize = require('@vectorize-io/vectorize-client')
// COMPLETE_EXAMPLE_PREREQUISITES:
// - env_vars: VECTORIZE_API_KEY, VECTORIZE_ORGANIZATION_ID
// - files: sample_document.txt (A text document to index)
// - description: Create and query your first RAG pipeline
const { PipelinesApi } = vectorize;
// Create pipelines API client
const pipelinesApi = new PipelinesApi(apiClient);
let response;
try {
// Query the pipeline
response = await pipelinesApi.retrieveDocuments({
organizationId: "your-org-id",
pipelineId: pipelineId,
retrieveDocumentsRequest: {
question: 'How to call the API?',
numResults: 5
}
});
// Display results
console.log(`Found ${response.documents.length} relevant documents:\n`);
response.documents.forEach((doc, index) => {
console.log(`Result ${index + 1}:`);
console.log(` Content: ${doc.text.substring(0, 200)}...`); // Use 'text' instead of 'content'
console.log(` Relevance Score: ${doc.relevancy}`); // Use 'relevancy' instead of 'score'
console.log(` Document ID: ${doc.id}`);
console.log();
});
} catch (error) {
console.log(`❌ Error querying pipeline: ${error.message}`);
throw error;
}
})();
How Your Pipeline + LLM Process Queries
When you submit a query, the pipeline and LLM:
- Interprets Query: The connected LLM interprets your request using the retrieved context
- Retrieves Context: Finds relevant information across your documents
- Combines Context: The LLM synthesizes retrieved information from multiple sources
- Generates Insight: Provides answers that go beyond simple retrieval
With sufficient retrieved context, your connected LLM can:
- Answer "why" and "how" questions that require reasoning
- Identify patterns and relationships in your data
- Provide recommendations based on your content
- Synthesize insights from disparate sources
Try these types of questions to see retrieval + reasoning in action:
- "What are the implications of...?"
- "How do these concepts relate to each other?"
- "What should we prioritize based on...?"
Understanding Your Results
The query response includes:
- Answer: The LLM-generated response to your question
- Sources: Which document chunks were used
- Relevance score: Based on embedding similarity, indicates how closely the retrieved content matched your query
- Metadata: Additional information about the sources
Step 6: Build Your Custom Chatbot
Now that your pipeline is working, let's create a chatbot that connects your pipeline to an LLM for interactive Q&A.
Download a Custom Chatbot Application
Vectorize can generate a complete chatbot application that showcases your pipeline's capabilities:
- Navigate to your pipeline in the Vectorize platform
- Go to the AI Integrations tab
- Click on Chatbot
- Select your preferred LLM provider (e.g., OpenAI) and model (e.g., gpt-4o)
- Click Download Chatbot ZIP
The downloaded application includes:
- Pre-configured connection to your pipeline
- Your organization ID and endpoints already set up
- Choice of LLM for responses
- Clean, customizable Next.js interface
Note: This application uses your selected LLM provider’s API — you’ll need a valid API key for that provider, and usage may incur costs.
Running Your Chatbot
After downloading:
- Unzip the file and navigate to the project folder
- Configure your environment variables in
.env.development
:OPENAI_API_KEY=sk-...
VECTORIZE_TOKEN=your-vectorize-token - Install and run:
npm install
npm run dev - Open http://localhost:3000 to interact with your chatbot!
You now have a fully functional chatbot that can query your documents via your pipeline and use your connected LLM to generate grounded answers.
What's Next?
Congratulations! You've built your first agent-ready pipeline with Vectorize.
Here are some next steps to enhance your pipeline's capabilities:
- Make Your AI Smarter with Metadata: Use structured metadata to improve retrieval and filtering.
- Understanding Data Pipelines: Deep dive into how agent pipelines work.
Complete Example
Here's all the code from this guide combined into a complete, runnable example:
- Python
- Node.js
• `VECTORIZE_API_KEY`
• `VECTORIZE_ORGANIZATION_ID`
Required Files:
• `sample_document.txt` • A text document to index
#!/usr/bin/env python3
"""
Complete example for building your first RAG pipeline with Vectorize.
This is a hand-written example that corresponds to the test file:
api-clients/python/tests/developer_journeys/build_your_first_pipeline.py
IMPORTANT: Keep this file in sync with the test file's snippets!
This example shows how to:
1. Create a file upload connector
2. Upload a document
3. Create and configure a RAG pipeline
4. Wait for processing to complete
5. Query your pipeline for answers
"""
import os
import sys
import time
import urllib3
import vectorize_client as v
def get_api_config():
"""Get API configuration from environment variables."""
organization_id = os.environ.get("VECTORIZE_ORGANIZATION_ID")
api_key = os.environ.get("VECTORIZE_API_KEY")
if not organization_id or not api_key:
print("🔑 Setup required:")
print("1. Get your API key from: https://app.vectorize.io/settings")
print("2. Set environment variables:")
print(" export VECTORIZE_ORGANIZATION_ID='your-org-id'")
print(" export VECTORIZE_API_KEY='your-api-key'")
sys.exit(1)
# Always use production API
configuration = v.Configuration(
host="https://api.vectorize.io/v1",
access_token=api_key
)
return configuration, organization_id
def create_file_upload_connector(api_client, organization_id):
"""Create a file upload connector for ingesting documents."""
print("📁 Step 1: Create a File Upload Connector")
# Create the connectors API client
connectors_api = v.SourceConnectorsApi(api_client)
try:
# Create a file upload connector
file_upload = v.FileUpload(
name="my-document-upload",
type="FILE_UPLOAD",
config={}
)
request = v.CreateSourceConnectorRequest(file_upload)
response = connectors_api.create_source_connector(
organization_id,
request
)
connector_id = response.connector.id
print(f"✅ Created file upload connector: {connector_id}")
return connector_id
except Exception as e:
print(f"❌ Error creating connector: {e}")
raise
def create_ai_platform_connector(api_client, organization_id):
"""Create an AI platform connector."""
connectors_api = v.AIPlatformConnectorsApi(api_client)
try:
# Create the AI platform connector
request = v.CreateAIPlatformConnectorRequest(
name="pipeline-example-ai",
type="VECTORIZE",
config={}
)
response = connectors_api.create_ai_platform_connector(
organization_id,
request
)
print(f"✅ Created AI platform connector: {response.connector.name}")
print(f" Connector ID: {response.connector.id}\n")
return response.connector.id
except Exception as e:
print(f"❌ Error creating AI platform connector: {e}")
raise
def create_destination_connector(api_client, organization_id):
"""Create a destination connector."""
connectors_api = v.DestinationConnectorsApi(api_client)
try:
# Create the destination connector
connector_config = v.DestinationConnectorInput(
name="pipeline-example-dest",
type="VECTORIZE",
config={}
)
request = v.CreateDestinationConnectorRequest(connector_config)
response = connectors_api.create_destination_connector(
organization_id,
request
)
print(f"✅ Created destination connector: {response.connector.name}")
print(f" Connector ID: {response.connector.id}\n")
return response.connector.id
except Exception as e:
print(f"❌ Error creating destination connector: {e}")
raise
def upload_document(api_client, organization_id, source_connector_id):
"""Upload your first document to the connector."""
print("📄 Step 2: Upload Your First Document")
# Create uploads API client
uploads_api = v.UploadsApi(api_client)
# Define the file to upload
file_path = "/tmp/intro_to_rag.txt"
file_name = os.path.basename(file_path)
# Create a sample file with RAG introduction content
sample_content = """# Introduction to RAG (Retrieval-Augmented Generation)
RAG combines the power of large language models with external knowledge retrieval.
This approach allows AI systems to access and utilize up-to-date information,
reducing hallucinations and improving accuracy.
## Key Benefits of RAG:
- Access to current information
- Reduced hallucinations
- Domain-specific knowledge
- Scalable knowledge base
## How RAG Works:
1. User submits a query
2. System retrieves relevant documents from knowledge base
3. Documents are provided as context to the LLM
4. LLM generates a response based on retrieved context
## Use Cases:
- Customer support chatbots
- Technical documentation Q&A
- Research assistance
- Enterprise knowledge management
## Getting Started:
To implement RAG, you need:
1. A knowledge base (documents, data)
2. Vector embeddings for semantic search
3. A retrieval system
4. An LLM for response generation
RAG is particularly effective when you need accurate, up-to-date information
that wasn't part of the LLM's training data.
"""
# Write content to file
with open(file_path, "w") as f:
f.write(sample_content)
try:
# Step 1: Get upload URL
upload_request = v.StartFileUploadToConnectorRequest(
name=file_name,
content_type="text/plain"
)
start_response = uploads_api.start_file_upload_to_connector(
organization_id,
source_connector_id,
start_file_upload_to_connector_request=upload_request
)
# Step 2: Upload file to the URL
http = urllib3.PoolManager()
with open(file_path, "rb") as f:
response = http.request(
"PUT",
start_response.upload_url,
body=f,
headers={
"Content-Type": "text/plain",
"Content-Length": str(os.path.getsize(file_path))
}
)
if response.status == 200:
print(f"✅ Successfully uploaded: {file_name}")
else:
print(f"❌ Upload failed: {response.status}")
raise Exception(f"Upload failed with status {response.status}")
# Clean up temp file
os.unlink(file_path)
except Exception as e:
print(f"❌ Error uploading file: {e}")
# Clean up temp file if it exists
if os.path.exists(file_path):
os.unlink(file_path)
raise
def create_simple_pipeline(api_client, organization_id, source_connector_id):
"""Create a RAG pipeline with AI platform and vector storage."""
print("🔧 Step 3: Create a RAG Pipeline")
# Get system connector IDs from environment
ai_platform_connector_id = os.environ.get('VECTORIZE_AI_PLATFORM_CONNECTOR_ID_VECTORIZE')
destination_connector_id = os.environ.get('VECTORIZE_DESTINATION_CONNECTOR_ID_VECTORIZE')
pipelines_api = v.PipelinesApi(api_client)
try:
# Configure your pipeline
pipeline_config = v.PipelineConfigurationSchema(
pipeline_name="My First Pipeline",
source_connectors=[
v.PipelineSourceConnectorSchema(
id=source_connector_id,
type="FILE_UPLOAD",
config={}
)
],
ai_platform_connector=v.PipelineAIPlatformConnectorSchema(
id=ai_platform_connector_id, # Uses Vectorize's built-in AI
type="VECTORIZE",
config={}
),
destination_connector=v.PipelineDestinationConnectorSchema(
id=destination_connector_id, # Uses Vectorize's built-in vector store
type="VECTORIZE",
config={}
),
schedule=v.ScheduleSchema(type="manual")
)
# Create the pipeline
response = pipelines_api.create_pipeline(
organization_id,
pipeline_config
)
pipeline_id = response.data.id
print(f"✅ Created pipeline: {pipeline_id}")
return pipeline_id
except Exception as e:
print(f"❌ Error creating pipeline: {e}")
raise
def wait_for_processing(api_client, organization_id, pipeline_id):
"""Wait for the pipeline to be ready and process your document."""
print("⏳ Step 4: Wait for Processing")
# Create pipelines API client
pipelines_api = v.PipelinesApi(api_client)
print("Waiting for pipeline to process your document...")
max_wait_time = 300 # 5 minutes
start_time = time.time()
while True:
try:
# Check pipeline status
pipeline = pipelines_api.get_pipeline(organization_id, pipeline_id)
status = pipeline.data.status
print(f"Pipeline status: {status}")
# Check if ready
if status == "LISTENING":
print("✅ Pipeline is ready!")
break
elif status == "PROCESSING":
print("⚙️ Still processing...")
elif status in ["ERROR_DEPLOYING", "SHUTDOWN"]:
print(f"❌ Pipeline error: {status}")
raise Exception(f"Pipeline failed with status: {status}")
# Check timeout
if time.time() - start_time > max_wait_time:
print("⏰ Timeout waiting for pipeline")
raise Exception("Pipeline processing timeout")
time.sleep(10) # Check every 10 seconds
except Exception as e:
if "Pipeline failed" in str(e) or "timeout" in str(e):
raise
print(f"❌ Error checking status: {e}")
break
def query_pipeline(api_client, organization_id, pipeline_id):
"""Query your RAG pipeline and display results."""
print("🔍 Step 5: Query Your RAG Pipeline")
# Create pipelines API client
pipelines_api = v.PipelinesApi(api_client)
# Test queries to demonstrate different aspects
queries = [
"What is RAG?",
"What are the benefits of RAG?",
"How does RAG work?",
"What do I need to implement RAG?"
]
for query in queries:
print(f"\n🔍 Query: {query}")
try:
# Query the pipeline
response = pipelines_api.retrieve_documents(
organization_id,
pipeline_id,
v.RetrieveDocumentsRequest(
question=query,
num_results=3
)
)
# Display results
print(f"Found {len(response.documents)} relevant documents:")
for i, doc in enumerate(response.documents, 1):
print(f"\nResult {i}:")
print(f" Content: {doc.text[:150]}...")
print(f" Relevance Score: {doc.relevancy}")
print(f" Document ID: {doc.id}")
# Show metadata if available
if hasattr(doc, 'metadata') and doc.metadata:
print(f" Metadata: {doc.metadata}")
except Exception as e:
print(f"❌ Error querying pipeline with '{query}': {e}")
# Continue with next query
continue
print(f"\n✅ Successfully demonstrated RAG pipeline queries!")
def main():
"""Main function demonstrating first pipeline creation."""
print("🚀 Building Your First RAG Pipeline\n")
# Initialize the API client
configuration, organization_id = get_api_config()
print(f"⚙️ Configuration:")
print(f" Organization ID: {organization_id}")
print(f" Host: {configuration.host}\n")
source_connector_id = None
pipeline_id = None
try:
# Initialize API client with proper headers for local env
with v.ApiClient(configuration) as api_client:
# Step 1: Create a file upload connector
source_connector_id = create_file_upload_connector(api_client, organization_id)
print("")
# Step 2: Upload a document
upload_document(api_client, organization_id, source_connector_id)
print("")
# Step 3: Create a pipeline
pipeline_id = create_simple_pipeline(api_client, organization_id, source_connector_id)
print("")
# Step 4: Monitor processing
wait_for_processing(api_client, organization_id, pipeline_id)
print("")
# Step 5: Query the pipeline
# Note: Skip querying on localhost as it routes to production data plane
print("\n🎉 Congratulations! You've built your first RAG pipeline!")
print("\n📝 What you've learned:")
print("- How to create a file upload connector")
print("- How to upload documents to Vectorize")
print("- How to configure a RAG pipeline with AI and vector storage")
print("- How to monitor pipeline processing status")
print("- How to query your pipeline for intelligent answers")
print("\n💡 Next steps:")
print("- Try uploading more documents")
print("- Experiment with different query types")
print("- Explore metadata and filtering options")
print("- Build more advanced pipelines with custom connectors")
except ValueError as e:
print(f"❌ Configuration Error: {e}")
print("\n💡 Make sure to set the required environment variables:")
print(" export VECTORIZE_ORGANIZATION_ID='your-org-id'")
print(" export VECTORIZE_API_KEY='your-api-key'")
except Exception as error:
print(f"❌ Error: {error}")
sys.exit(1)
finally:
# ============================================================================
# Cleanup
# ============================================================================
print("\n🧹 Cleanup")
try:
# Initialize API client with proper headers for local env
with v.ApiClient(configuration) as api_client:
# Delete pipeline
if pipeline_id:
try:
pipelines_api = v.PipelinesApi(api_client)
pipelines_api.delete_pipeline(organization_id, pipeline_id)
print(f"Deleted pipeline: {pipeline_id}")
except Exception as e:
print(f"Could not delete pipeline: {e}")
# Delete source connector
if source_connector_id:
try:
connectors_api = v.SourceConnectorsApi(api_client)
connectors_api.delete_source_connector(organization_id, source_connector_id)
print(f"Deleted connector: {source_connector_id}")
except Exception as e:
print(f"Could not delete connector: {e}")
except:
pass
if __name__ == "__main__":
main()
• `VECTORIZE_API_KEY`
• `VECTORIZE_ORGANIZATION_ID`
Required Files:
• `sample_document.txt` • A text document to index
#!/usr/bin/env node
/**
* Complete example for building your first RAG pipeline.
* This is a hand-written example that corresponds to the test file:
* api-clients/javascript/tests/developer_journeys/build_your_first_pipeline.js
*
* IMPORTANT: Keep this file in sync with the test file's snippets!
*/
const vectorize = require('@vectorize-io/vectorize-client');
const fs = require('fs');
const path = require('path');
// For test environment, use test configuration
function getApiConfig() {
// Check if we're in test environment
if (process.env.VECTORIZE_TEST_MODE === 'true') {
const testConfigPath = path.join(__dirname, '../common/test_config.js');
if (fs.existsSync(testConfigPath)) {
const { getApiClient } = require(testConfigPath);
const { apiConfig, config } = getApiClient();
return { apiClient: apiConfig, organizationId: config.organization_id };
}
}
// Fall back to environment variables
const organizationId = process.env.VECTORIZE_ORGANIZATION_ID;
const apiKey = process.env.VECTORIZE_API_KEY;
if (!organizationId || !apiKey) {
throw new Error("Please set VECTORIZE_ORGANIZATION_ID and VECTORIZE_API_KEY environment variables");
}
const configuration = new vectorize.Configuration({
basePath: 'https://api.vectorize.io/v1',
accessToken: apiKey
});
return { apiClient: configuration, organizationId };
}
async function main() {
console.log('🚀 Building Your First Pipeline\n');
// Initialize the API client
const { apiClient: apiConfig, organizationId } = getApiConfig();
const { SourceConnectorsApi, UploadsApi, PipelinesApi } = vectorize;
let connectorId = null;
let pipelineId = null;
try {
// ============================================================================
// Step 1: Create a File Upload Connector
// ============================================================================
console.log('📁 Step 1: Create a File Upload Connector');
const connectorsApi = new SourceConnectorsApi(apiConfig);
const fileUpload = {
name: 'my-document-upload',
type: 'FILE_UPLOAD',
config: {} // File upload connectors don't require config
};
const connectorResponse = await connectorsApi.createSourceConnector({
organizationId: organizationId,
createSourceConnectorRequest: fileUpload
});
connectorId = connectorResponse.connector.id;
console.log(`✅ Created file upload connector: ${connectorId}\n`);
// ============================================================================
// Step 2: Upload Your First Document
// ============================================================================
console.log('📄 Step 2: Upload Your First Document');
// Create a sample document about RAG
const sampleContent = `# Introduction to RAG
Retrieval-Augmented Generation (RAG) combines the power of large language models
with your own data. This allows AI to answer questions using your specific knowledge base.
## Key Benefits
- Accurate answers based on your data
- No need to retrain models
- Easy to update information
## How It Works
1. Documents are processed and stored as vectors
2. Questions are converted to vectors
3. Similar content is retrieved
4. LLM generates answers using the retrieved context
## Implementation Steps
1. Upload your documents to Vectorize
2. Create a pipeline to process them
3. Query the pipeline with natural language
4. Get accurate, contextual answers
## Best Practices
- Use clear, well-structured documents
- Include relevant metadata
- Test with various question types
- Monitor and improve over time
## Technical Details
RAG systems typically use:
- Vector databases for efficient similarity search
- Embedding models to convert text to vectors
- Language models for answer generation
- Retrieval algorithms to find relevant context
This approach gives you the benefits of both retrieval systems (access to specific data)
and generative models (natural language understanding and generation).`;
const filePath = '/tmp/intro_to_rag.txt';
const fileName = 'intro_to_rag.txt';
// Create the temporary file
fs.writeFileSync(filePath, sampleContent);
const uploadsApi = new UploadsApi(apiConfig);
// Step 1: Get a pre-signed upload URL from Vectorize
const uploadRequest = {
name: fileName,
contentType: 'text/plain'
};
const startResponse = await uploadsApi.startFileUploadToConnector({
organizationId: organizationId,
connectorId: connectorId,
startFileUploadToConnectorRequest: uploadRequest
});
// Step 2: Upload file directly to the pre-signed URL
const fileBuffer = fs.readFileSync(filePath);
const fileStats = fs.statSync(filePath);
const uploadResponse = await fetch(startResponse.uploadUrl, {
method: 'PUT',
body: fileBuffer,
headers: {
'Content-Type': 'text/plain',
'Content-Length': fileStats.size.toString()
}
});
if (uploadResponse.status === 200) {
console.log(`✅ Successfully uploaded: ${fileName}`);
console.log(` Content: Introduction to RAG concepts (${fileStats.size} bytes)`);
} else {
const errorText = await uploadResponse.text();
throw new Error(`Upload failed with status ${uploadResponse.status}: ${errorText}`);
}
// Clean up temp file
fs.unlinkSync(filePath);
console.log('');
// ============================================================================
// Step 3: Create a RAG Pipeline
// ============================================================================
console.log('🔧 Step 3: Create a RAG Pipeline');
// Get the required connector IDs from environment variables
const aiPlatformConnectorId = process.env.VECTORIZE_AI_PLATFORM_CONNECTOR_ID;
const destinationConnectorId = process.env.VECTORIZE_DESTINATION_CONNECTOR_ID;
if (!aiPlatformConnectorId || !destinationConnectorId) {
console.log('❌ Missing required connector IDs');
console.log(' Please set:');
console.log(' - VECTORIZE_AI_PLATFORM_CONNECTOR_ID');
console.log(' - VECTORIZE_DESTINATION_CONNECTOR_ID');
console.log('\n💡 Tip: Run get_vectorize_connectors.py to find your VECTORIZE connector IDs');
process.exit(1);
}
const pipelinesApi = new PipelinesApi(apiConfig);
const pipelineConfig = {
pipelineName: 'My First Pipeline',
sourceConnectors: [
{
id: connectorId,
type: 'FILE_UPLOAD',
config: {}
}
],
aiPlatformConnector: {
id: aiPlatformConnectorId, // Uses Vectorize's built-in AI
type: 'VECTORIZE',
config: {}
},
destinationConnector: {
id: destinationConnectorId, // Uses Vectorize's built-in vector store
type: 'VECTORIZE',
config: {}
},
schedule: { type: 'manual' }
};
const pipelineResponse = await pipelinesApi.createPipeline({
organizationId: organizationId,
pipelineConfigurationSchema: pipelineConfig
});
pipelineId = pipelineResponse.data.id;
console.log(`✅ Created pipeline: ${pipelineId}`);
console.log(` The pipeline will process your document and make it searchable\n`);
// ============================================================================
// Step 4: Wait for Processing
// ============================================================================
console.log('⏳ Step 4: Wait for Processing');
console.log('Waiting for pipeline to process your document...');
const maxWaitTime = 300000; // 5 minutes in milliseconds
const startTime = Date.now();
while (true) {
try {
const pipeline = await pipelinesApi.getPipeline({
organizationId: organizationId,
pipelineId: pipelineId
});
const status = pipeline.data.status;
if (status === 'LISTENING') {
console.log('✅ Pipeline is ready!\n');
break;
} else if (status === 'PROCESSING') {
console.log('⚙️ Still processing...');
} else if (['ERROR_DEPLOYING', 'SHUTDOWN'].includes(status)) {
console.log(`❌ Pipeline error: ${status}`);
break;
}
if (Date.now() - startTime > maxWaitTime) {
console.log('⏰ Timeout waiting for pipeline');
break;
}
await new Promise(resolve => setTimeout(resolve, 10000)); // Check every 10 seconds
} catch (error) {
console.log(`❌ Error checking status: ${error.message}`);
break;
}
}
// ============================================================================
// Step 5: Query Your RAG Pipeline
// ============================================================================
console.log('🔍 Step 5: Query Your RAG Pipeline');
try {
const queryResponse = await pipelinesApi.retrieveDocuments({
organizationId: organizationId,
pipelineId: pipelineId,
retrieveDocumentsRequest: {
question: 'What are the key benefits of RAG?',
numResults: 5
}
});
console.log(`Query: "What are the key benefits of RAG?"`);
console.log(`Found ${queryResponse.documents.length} relevant documents:\n`);
queryResponse.documents.forEach((doc, index) => {
console.log(`Result ${index + 1}:`);
console.log(` Content: ${doc.text.substring(0, 200)}...`);
console.log(` Relevance Score: ${doc.relevancy}`);
console.log(` Document ID: ${doc.id}`);
console.log();
});
// Try another query to show versatility
console.log('🔍 Let\'s try another query...\n');
const query2Response = await pipelinesApi.retrieveDocuments({
organizationId: organizationId,
pipelineId: pipelineId,
retrieveDocumentsRequest: {
question: 'How does RAG work technically?',
numResults: 3
}
});
console.log(`Query: "How does RAG work technically?"`);
console.log(`Found ${query2Response.documents.length} relevant documents:\n`);
query2Response.documents.forEach((doc, index) => {
console.log(`Result ${index + 1}:`);
console.log(` Content: ${doc.text.substring(0, 200)}...`);
console.log(` Relevance Score: ${doc.relevancy}`);
console.log();
});
} catch (error) {
console.error(`❌ Error querying pipeline: ${error.message}`);
console.log("This might be expected if running on localhost (queries route to production)");
}
console.log('✅ Your first RAG pipeline is complete!');
console.log('\n📝 What you\'ve accomplished:');
console.log('- Created a file upload connector');
console.log('- Uploaded a document with RAG knowledge');
console.log('- Built a complete RAG pipeline');
console.log('- Successfully queried your own data');
console.log('- Got contextually relevant answers');
console.log('\n🎯 Next Steps:');
console.log('- Upload more documents to expand your knowledge base');
console.log('- Experiment with different types of questions');
console.log('- Add metadata to your documents for better filtering');
console.log('- Integrate the API into your applications');
} catch (error) {
console.error('❌ Error:', error);
process.exit(1); } finally {
// ============================================================================
// Cleanup
// ============================================================================
console.log('\n🧹 Cleanup');
// Delete pipeline
if (pipelineId) {
try {
const pipelinesApi = new PipelinesApi(apiConfig);
await pipelinesApi.deletePipeline({
organizationId: organizationId,
pipelineId: pipelineId
});
console.log(`Deleted pipeline: ${pipelineId}`);
} catch (error) {
console.log(`Could not delete pipeline: ${error.message}`);
}
}
// Delete source connector
if (connectorId) {
try {
const connectorsApi = new SourceConnectorsApi(apiConfig);
await connectorsApi.deleteSourceConnector({
organizationId: organizationId,
sourceConnectorId: connectorId
});
console.log(`Deleted connector: ${connectorId}`);
} catch (error) {
console.log(`Could not delete connector: ${error.message}`);
}
}
}
}
// Run the example
if (require.main === module) {
main().catch(error => {
console.error('❌ Error:', error);
process.exit(1);
});
}
module.exports = { main };