Create a Pipeline Programmatically
Learn how to create a complete RAG (Retrieval-Augmented Generation) pipeline using the Vectorize client, including source connectors, vector database, and AI platform configuration.
Prerequisites
Before you begin, you'll need:
- A Vectorize account
- An API access token (create one here)
- Your organization ID (see below)
Finding your Organization ID
Your organization ID is in the Vectorize platform URL:
https://platform.vectorize.io/organization/[YOUR-ORG-ID]
For example, if your URL is:
https://platform.vectorize.io/organization/ecf3fa1d-30d0-4df1-8af6-f4852bc851cb
Your organization ID is: ecf3fa1d-30d0-4df1-8af6-f4852bc851cb
Additionally, you'll need:
- One or more source connector IDs (e.g., AWS S3, Google Drive)
- A destination connector ID (e.g., Pinecone)
- An AI platform connector ID (e.g., OpenAI)
note
Connectors must be created through the Vectorize platform UI before creating a pipeline programmatically.
Setup
This guide assumes you've completed the Getting Started guide. Here's the basic setup:
- Python
- Node.js
import vectorize_client as v
# Your credentials
org_id = "your-organization-id"
token = "your-api-token"
# Initialize the client
api = v.ApiClient(v.Configuration(access_token=token))
const { Configuration } = require('@vectorize-io/vectorize-client');
// Your credentials
const orgId = 'your-organization-id';
const token = 'your-api-token';
// Initialize the client
const api = new Configuration({
accessToken: token
});
Create a Pipeline
Create a RAG pipeline that connects your data sources to a vector database.
- Python
- Node.js
# Create API instance
pipelines_api = v.PipelinesApi(api)
# Define your pipeline configuration
pipeline_configuration = v.PipelineConfigurationSchema(
pipeline_name="Multi-Source Documentation RAG",
source_connectors=[
v.SourceConnectorSchema(
id="your-s3-connector-id",
type=v.SourceConnectorType.AWS_S3,
config={}
),
v.SourceConnectorSchema(
id="your-google-drive-connector-id",
type=v.SourceConnectorType.GOOGLE_DRIVE,
config={}
)
],
destination_connector=v.DestinationConnectorSchema(
id="your-pinecone-connector-id",
type=v.DestinationConnectorType.PINECONE,
config={}
),
ai_platform=v.AIPlatformSchema(
id="your-openai-connector-id",
type=v.AIPlatformType.OPENAI,
config={}
),
schedule=v.ScheduleSchema(type="manual") # or "interval", "cron"
)
# Create the pipeline
try:
response = pipelines_api.create_pipeline(
org_id,
pipeline_configuration
)
pipeline_id = response.id
print(f"Pipeline created successfully! ID: {pipeline_id}")
# Start the pipeline
print("Starting pipeline...")
pipelines_api.start_pipeline(org_id, pipeline_id)
except Exception as e:
print(f"Error creating pipeline: {e}")
const { PipelinesApi } = require('@vectorize-io/vectorize-client');
// Create API instance
const pipelinesApi = new PipelinesApi(api);
// Define your pipeline configuration
const pipelineName = "Multi-Source Documentation RAG";
// These IDs come from connectors you've already created in the UI
const sourceConnectors = [
{
id: "your-s3-connector-id",
type: "AWS_S3"
},
{
id: "your-google-drive-connector-id",
type: "GOOGLE_DRIVE"
}
];
const destinationConnector = {
id: "your-pinecone-connector-id",
type: "PINECONE"
};
const aiPlatform = {
id: "your-openai-connector-id",
type: "OPENAI"
};
// Create the pipeline
try {
const response = await pipelinesApi.createPipeline({
organization: orgId,
createPipelineRequest: {
pipelineName: pipelineName,
sourceConnectors: sourceConnectors,
destinationConnector: destinationConnector,
aiPlatform: aiPlatform,
schedule: {
type: "manual" // or "interval", "cron"
}
}
});
const pipelineId = response.data.id;
console.log(`Pipeline created successfully! ID: ${pipelineId}`);
} catch (error) {
console.error('Error creating pipeline:', error.response?.status || error.message);
if (error.response?.data) {
console.error('Details:', error.response.data);
}
}
Monitor Pipeline Progress
Track the status and progress of your pipeline processing.
- Python
- Node.js
import time
pipeline_id = "your-pipeline-id"
# Monitor pipeline status
while True:
try:
# Get pipeline status
pipeline = pipelines_api.get_pipeline(org_id, pipeline_id)
print(f"Pipeline status: {pipeline.status}")
# Get pipeline metrics
metrics = pipelines_api.get_pipeline_metrics(org_id, pipeline_id)
print(f"Documents processed: {metrics.documents_processed}")
print(f"Vectors created: {metrics.vectors_created}")
# Check for terminal states
if pipeline.status in ["IDLE", "HIBERNATING", "OVERQUOTA", "ERROR_DEPLOYING"]:
print(f"Pipeline stopped with status: {pipeline.status}")
break
# Pipeline is actively processing or listening
if pipeline.status in ["LISTENING", "PROCESSING"]:
print("Pipeline is running normally")
except Exception as e:
print(f"Error monitoring pipeline: {e}")
time.sleep(30) # Check every 30 seconds
const pipelineId = "your-pipeline-id";
// Helper function to sleep
const sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms));
// Monitor pipeline status
while (true) {
try {
// Get pipeline status
const pipeline = await pipelinesApi.getPipeline({
organization: orgId,
pipelineId: pipelineId
});
console.log(`Pipeline status: ${pipeline.status}`);
// Get pipeline metrics
const metrics = await pipelinesApi.getPipelineMetrics({
organization: orgId,
pipelineId: pipelineId
});
console.log(`Documents processed: ${metrics.documentsProcessed}`);
console.log(`Vectors created: ${metrics.vectorsCreated}`);
// Check for terminal states
if (["IDLE", "HIBERNATING", "OVERQUOTA", "ERROR_DEPLOYING"].includes(pipeline.status)) {
console.log(`Pipeline stopped with status: ${pipeline.status}`);
break;
}
// Pipeline is actively processing or listening
if (["LISTENING", "PROCESSING"].includes(pipeline.status)) {
console.log("Pipeline is running normally");
}
} catch (error) {
console.error('Error monitoring pipeline:', error.message);
}
await sleep(30000); // Check every 30 seconds
}
Pipeline Management
Stop a Pipeline
- Python
- Node.js
try:
pipelines_api.stop_pipeline(org_id, pipeline_id)
print("Pipeline stopped successfully")
except Exception as e:
print(f"Error stopping pipeline: {e}")
try {
await pipelinesApi.stopPipeline({
organization: orgId,
pipelineId: pipelineId
});
console.log("Pipeline stopped successfully");
} catch (error) {
console.error('Error stopping pipeline:', error.message);
}
Delete a Pipeline
- Python
- Node.js
try:
pipelines_api.delete_pipeline(org_id, pipeline_id)
print("Pipeline deleted successfully")
except Exception as e:
print(f"Error deleting pipeline: {e}")
try {
await pipelinesApi.deletePipeline({
organization: orgId,
pipelineId: pipelineId
});
console.log("Pipeline deleted successfully");
} catch (error) {
console.error('Error deleting pipeline:', error.message);
}
Complete Example
- Python
- Node.js
import vectorize_client as v
import time
# Your credentials
org_id = "your-organization-id"
token = "your-api-token"
# Initialize the client
api = v.ApiClient(v.Configuration(access_token=token))
# Create API instance
pipelines_api = v.PipelinesApi(api)
def create_and_monitor_pipeline():
"""Create a RAG pipeline and monitor its progress"""
# Pipeline configuration
pipeline_config = v.PipelineConfigurationSchema(
pipeline_name="Documentation Processing Pipeline",
source_connectors=[
v.SourceConnectorSchema(id="src-123", type=v.SourceConnectorType.AWS_S3, config={}),
v.SourceConnectorSchema(id="src-456", type=v.SourceConnectorType.GOOGLE_DRIVE, config={})
],
destination_connector=v.DestinationConnectorSchema(
id="dest-789",
type=v.DestinationConnectorType.PINECONE,
config={}
),
ai_platform=v.AIPlatformSchema(
id="ai-012",
type=v.AIPlatformType.OPENAI,
config={}
),
schedule=v.ScheduleSchema(type="manual")
)
try:
# Create pipeline
print("Creating pipeline...")
response = pipelines_api.create_pipeline(org_id, pipeline_config)
pipeline_id = response.id
print(f"Pipeline created: {pipeline_id}")
# Start pipeline
print("Starting pipeline...")
pipelines_api.start_pipeline(org_id, pipeline_id)
# Monitor progress
print("Monitoring pipeline progress...")
while True:
pipeline = pipelines_api.get_pipeline(org_id, pipeline_id)
metrics = pipelines_api.get_pipeline_metrics(org_id, pipeline_id)
print(f"\nStatus: {pipeline.status}")
print(f"Documents: {metrics.documents_processed}")
print(f"Vectors: {metrics.vectors_created}")
if pipeline.status in ["IDLE", "HIBERNATING", "OVERQUOTA", "ERROR_DEPLOYING"]:
break
time.sleep(30)
print(f"\nPipeline finished with status: {pipeline.status}")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
create_and_monitor_pipeline()
const { Configuration, PipelinesApi } = require('@vectorize-io/vectorize-client');
// Your credentials
const orgId = 'your-organization-id';
const token = 'your-api-token';
// Initialize the client
const api = new Configuration({
accessToken: token
});
// Create API instance
const pipelinesApi = new PipelinesApi(api);
async function createAndMonitorPipeline() {
// Pipeline configuration
const pipelineConfig = {
pipelineName: "Documentation Processing Pipeline",
sourceConnectors: [
{ id: "src-123", type: "AWS_S3" },
{ id: "src-456", type: "GOOGLE_DRIVE" }
],
destinationConnector: { id: "dest-789", type: "PINECONE" },
aiPlatform: { id: "ai-012", type: "OPENAI" },
schedule: { type: "manual" }
};
try {
// Create pipeline
console.log("Creating pipeline...");
const response = await pipelinesApi.createPipeline({
organization: orgId,
createPipelineRequest: pipelineConfig
});
const pipelineId = response.id;
console.log(`Pipeline created: ${pipelineId}`);
// Start pipeline
console.log("Starting pipeline...");
await pipelinesApi.startPipeline({
organization: orgId,
pipelineId: pipelineId
});
// Monitor progress
console.log("Monitoring pipeline progress...");
while (true) {
const pipeline = await pipelinesApi.getPipeline({
organization: orgId,
pipelineId: pipelineId
});
const metrics = await pipelinesApi.getPipelineMetrics({
organization: orgId,
pipelineId: pipelineId
});
console.log(`\nStatus: ${pipeline.status}`);
console.log(`Documents: ${metrics.documentsProcessed}`);
console.log(`Vectors: ${metrics.vectorsCreated}`);
if (["COMPLETED", "FAILED"].includes(pipeline.status)) {
break;
}
await new Promise(resolve => setTimeout(resolve, 30000));
}
console.log(`\nPipeline finished with status: ${pipeline.status}`);
} catch (error) {
console.error('Error:', error.response?.status || error.message);
if (error.response?.data) {
console.error('Details:', error.response.data);
}
}
}
// Run the example
createAndMonitorPipeline();