Create a Pipeline Programmatically
Learn how to create a complete RAG (Retrieval-Augmented Generation) pipeline using the Vectorize client, including source connectors, vector database, and AI platform configuration.
Prerequisites
Before you begin, you'll need:
- A Vectorize account
- An API access token (how to create one)
- Your organization ID (see below)
Finding your Organization ID
Your organization ID is in the Vectorize platform URL:
https://platform.vectorize.io/organization/[YOUR-ORG-ID]
For example, if your URL is:
https://platform.vectorize.io/organization/ecf3fa1d-30d0-4df1-8af6-f4852bc851cb
Your organization ID is: ecf3fa1d-30d0-4df1-8af6-f4852bc851cb
Additionally, you'll need:
- One or more source connector IDs (e.g., AWS S3, Google Drive)
- A destination connector ID (e.g., Pinecone)
- An AI platform connector ID (e.g., OpenAI)
note
Connectors must be created through the Vectorize platform UI before creating a pipeline programmatically.
API Client Setup
- Python
- Node.js
import vectorize_client as v
import os
# Get credentials from environment variables
organization_id = "your-org-id"
api_key = "your-api-key"
if not organization_id or not api_key:
raise ValueError("Please set VECTORIZE_ORGANIZATION_ID and VECTORIZE_API_KEY environment variables")
# Initialize the API client
configuration = v.Configuration(
host="https://api.vectorize.io",
api_key={"ApiKeyAuth": api_key}
)
api = v.ApiClient(configuration)
print(f"✅ API client initialized for organization: {"your-org-id"}")
const v = require('@vectorize-io/vectorize-client');
// Get credentials from environment variables
const organizationId = "your-env-value";
const "your-api-key" = "your-env-value";
if (!organizationId || !"your-api-key") {
throw new Error("Please set VECTORIZE_ORGANIZATION_ID and VECTORIZE_API_KEY environment variables");
}
// Initialize the API client
const configuration = new v.Configuration({
basePath: 'https://api.vectorize.io',
accessToken: "your-api-key"
});
const apiClient = new v.ApiClient(configuration);
console.log(`✅ API client initialized for organization: ${organizationId}`);
Create a Pipeline
Create a RAG pipeline that connects your data sources to a vector database.
- Python
- Node.js
# Create pipelines client
pipelines_api = v.PipelinesApi(api)
# Define your pipeline configuration
pipeline_configuration = v.PipelineConfigurationSchema(
pipeline_name=pipeline_name,
source_connectors=[
v.PipelineSourceConnectorSchema(
id=source_connector_id,
type="FILE_UPLOAD",
config={}
)
],
ai_platform_connector=v.PipelineAIPlatformConnectorSchema(
id=ai_platform_connector_id,
type="VECTORIZE",
config={}
),
destination_connector=v.PipelineDestinationConnectorSchema(
id=destination_connector_id,
type="VECTORIZE",
config={}
),
schedule=v.ScheduleSchema(type="manual")
)
# Create the pipeline
try:
response = pipelines_api.create_pipeline(
organization_id,
pipeline_configuration
)
pipeline_id = response.data.id
print(f"Pipeline created successfully! ID: {pipeline_id}")
except Exception as e:
print(f"Error creating pipeline: {e}")
raise
const { PipelinesApi } = vectorize;
// Create pipelines client
const pipelinesApi = new PipelinesApi(apiConfig);
// Define your pipeline configuration
const pipelineConfiguration = {
pipelineName: pipelineName,
sourceConnectors: [
{
id: sourceConnectorId,
type: "FILE_UPLOAD",
config: {}
}
],
destinationConnector: {
id: destinationConnectorId,
type: "VECTORIZE",
config: {}
},
aiPlatformConnector: {
id: aiPlatformConnectorId,
type: "VECTORIZE",
config: {}
},
schedule: { type: "manual" }
};
// Create the pipeline
let pipelineId;
try {
const response = await pipelinesApi.createPipeline({
organizationId: organizationId,
pipelineConfigurationSchema: pipelineConfiguration
});
pipelineId = response.data.id;
console.log(`Pipeline created successfully! ID: ${pipelineId}`);
} catch (error) {
console.log(`Error creating pipeline: ${error.message}`);
throw error;
}
Monitor Pipeline Progress
Track the status and progress of your pipeline processing.
- Python
- Node.js
# Create pipelines client
pipelines_api = v.PipelinesApi(api)
while True:
try:
# Get pipeline status
pipeline = pipelines_api.get_pipeline(organization_id, pipeline_id)
current_status = pipeline.data.status
print(f"Pipeline status: {current_status}")
# Get pipeline metrics
metrics = pipelines_api.get_pipeline_metrics(organization_id, pipeline_id)
if metrics.data and isinstance(metrics.data, list):
# Sum up all the metrics
total_new = sum(m.new_objects for m in metrics.data)
total_changed = sum(m.changed_objects for m in metrics.data)
total_deleted = sum(m.deleted_objects for m in metrics.data)
print(f" 📊 Total new objects: {total_new}")
print(f" 📊 Total changed objects: {total_changed}")
print(f" 📊 Total deleted objects: {total_deleted}")
# Get today's metrics if available
if metrics.data:
latest = metrics.data[-1]
print(f" 📅 Latest ({latest.timestamp}):")
print(f" New: {latest.new_objects}, Changed: {latest.changed_objects}, Deleted: {latest.deleted_objects}")
# Check for terminal states
if current_status in ["IDLE", "HIBERNATING", "OVERQUOTA", "ERROR_DEPLOYING", "SHUTDOWN"]:
print(f"Pipeline reached terminal state: {current_status}")
break
# Pipeline is listening
if current_status in ["LISTENING"]:
print(f"Pipeline is ready and listening for new data")
break
# Pipeline is actively processing
if current_status in ["PROCESSING", "DEPLOYING"]:
print(f"Pipeline is actively working: {current_status}")
# Check if we've been monitoring too long
if time.time() - monitoring_start_time > max_monitoring_time:
print("⏰ Maximum monitoring time reached")
break
except Exception as e:
print(f"Error monitoring pipeline: {e}")
break
print("⏳ Waiting 30 seconds before next check...")
time.sleep(30)
const { PipelinesApi } = vectorize;
// Create pipelines client
const pipelinesApi = new PipelinesApi(apiConfig);
while (true) {
try {
// Get pipeline status
const pipeline = await pipelinesApi.getPipeline({
organizationId: organizationId,
pipelineId: pipelineId
});
currentStatus = pipeline.data.status;
console.log(`Pipeline status: ${currentStatus}`);
// Get pipeline metrics
const metrics = await pipelinesApi.getPipelineMetrics({
organizationId: organizationId,
pipelineId: pipelineId
});
if (metrics.data && Array.isArray(metrics.data)) {
// Sum up all the metrics
const totalNew = metrics.data.reduce((sum, m) => sum + (m.newObjects || 0), 0);
const totalChanged = metrics.data.reduce((sum, m) => sum + (m.changedObjects || 0), 0);
const totalDeleted = metrics.data.reduce((sum, m) => sum + (m.deletedObjects || 0), 0);
console.log(` 📊 Total new objects: ${totalNew}`);
console.log(` 📊 Total changed objects: ${totalChanged}`);
console.log(` 📊 Total deleted objects: ${totalDeleted}`);
// Get today's metrics if available
if (metrics.data.length > 0) {
const latest = metrics.data[metrics.data.length - 1];
console.log(` 📅 Latest (${latest.timestamp}):`);
console.log(` New: ${latest.newObjects || 0}, Changed: ${latest.changedObjects || 0}, Deleted: ${latest.deletedObjects || 0}`);
}
}
// Check for terminal states
if (["IDLE", "HIBERNATING", "OVERQUOTA", "ERROR_DEPLOYING", "SHUTDOWN"].includes(currentStatus)) {
console.log(`Pipeline reached terminal state: ${currentStatus}`);
break;
}
// Pipeline is listening
if (["LISTENING"].includes(currentStatus)) {
console.log(`Pipeline is ready and listening for new data`);
break;
}
// Pipeline is actively processing
if (["PROCESSING", "DEPLOYING"].includes(currentStatus)) {
console.log(`Pipeline is actively working: ${currentStatus}`);
}
// Check if we've been monitoring too long
if (Date.now() - monitoringStartTime > maxMonitoringTime) {
console.log("⏰ Maximum monitoring time reached");
break;
}
} catch (error) {
console.log(`Error monitoring pipeline: ${error.message}`);
break;
}
console.log("⏳ Waiting 30 seconds before next check...");
await new Promise(resolve => setTimeout(resolve, 30000));
}