Skip to main content

Create a Pipeline Programmatically

Learn how to create a complete RAG (Retrieval-Augmented Generation) pipeline using the Vectorize client, including source connectors, vector database, and AI platform configuration.

Prerequisites

Before you begin, you'll need:

  1. A Vectorize account
  2. An API access token (create one here)
  3. Your organization ID (see below)

Finding your Organization ID

Your organization ID is in the Vectorize platform URL:

https://platform.vectorize.io/organization/[YOUR-ORG-ID]

For example, if your URL is:

https://platform.vectorize.io/organization/ecf3fa1d-30d0-4df1-8af6-f4852bc851cb

Your organization ID is: ecf3fa1d-30d0-4df1-8af6-f4852bc851cb

Additionally, you'll need:

  • One or more source connector IDs (e.g., AWS S3, Google Drive)
  • A destination connector ID (e.g., Pinecone)
  • An AI platform connector ID (e.g., OpenAI)
note

Connectors must be created through the Vectorize platform UI before creating a pipeline programmatically.

Setup

This guide assumes you've completed the Getting Started guide. Here's the basic setup:

import vectorize_client as v

# Your credentials
org_id = "your-organization-id"
token = "your-api-token"

# Initialize the client
api = v.ApiClient(v.Configuration(access_token=token))

Create a Pipeline

Create a RAG pipeline that connects your data sources to a vector database.

# Create API instance
pipelines_api = v.PipelinesApi(api)

# Define your pipeline configuration
pipeline_configuration = v.PipelineConfigurationSchema(
pipeline_name="Multi-Source Documentation RAG",
source_connectors=[
v.SourceConnectorSchema(
id="your-s3-connector-id",
type=v.SourceConnectorType.AWS_S3,
config={}
),
v.SourceConnectorSchema(
id="your-google-drive-connector-id",
type=v.SourceConnectorType.GOOGLE_DRIVE,
config={}
)
],
destination_connector=v.DestinationConnectorSchema(
id="your-pinecone-connector-id",
type=v.DestinationConnectorType.PINECONE,
config={}
),
ai_platform=v.AIPlatformSchema(
id="your-openai-connector-id",
type=v.AIPlatformType.OPENAI,
config={}
),
schedule=v.ScheduleSchema(type="manual") # or "interval", "cron"
)

# Create the pipeline
try:
response = pipelines_api.create_pipeline(
org_id,
pipeline_configuration
)

pipeline_id = response.id
print(f"Pipeline created successfully! ID: {pipeline_id}")

# Start the pipeline
print("Starting pipeline...")
pipelines_api.start_pipeline(org_id, pipeline_id)

except Exception as e:
print(f"Error creating pipeline: {e}")

Monitor Pipeline Progress

Track the status and progress of your pipeline processing.

import time

pipeline_id = "your-pipeline-id"

# Monitor pipeline status
while True:
try:
# Get pipeline status
pipeline = pipelines_api.get_pipeline(org_id, pipeline_id)
print(f"Pipeline status: {pipeline.status}")

# Get pipeline metrics
metrics = pipelines_api.get_pipeline_metrics(org_id, pipeline_id)
print(f"Documents processed: {metrics.documents_processed}")
print(f"Vectors created: {metrics.vectors_created}")

# Check for terminal states
if pipeline.status in ["IDLE", "HIBERNATING", "OVERQUOTA", "ERROR_DEPLOYING"]:
print(f"Pipeline stopped with status: {pipeline.status}")
break

# Pipeline is actively processing or listening
if pipeline.status in ["LISTENING", "PROCESSING"]:
print("Pipeline is running normally")

except Exception as e:
print(f"Error monitoring pipeline: {e}")

time.sleep(30) # Check every 30 seconds

Pipeline Management

Stop a Pipeline

try:
pipelines_api.stop_pipeline(org_id, pipeline_id)
print("Pipeline stopped successfully")
except Exception as e:
print(f"Error stopping pipeline: {e}")

Delete a Pipeline

try:
pipelines_api.delete_pipeline(org_id, pipeline_id)
print("Pipeline deleted successfully")
except Exception as e:
print(f"Error deleting pipeline: {e}")

Complete Example

import vectorize_client as v
import time

# Your credentials
org_id = "your-organization-id"
token = "your-api-token"

# Initialize the client
api = v.ApiClient(v.Configuration(access_token=token))

# Create API instance
pipelines_api = v.PipelinesApi(api)

def create_and_monitor_pipeline():
"""Create a RAG pipeline and monitor its progress"""

# Pipeline configuration
pipeline_config = v.PipelineConfigurationSchema(
pipeline_name="Documentation Processing Pipeline",
source_connectors=[
v.SourceConnectorSchema(id="src-123", type=v.SourceConnectorType.AWS_S3, config={}),
v.SourceConnectorSchema(id="src-456", type=v.SourceConnectorType.GOOGLE_DRIVE, config={})
],
destination_connector=v.DestinationConnectorSchema(
id="dest-789",
type=v.DestinationConnectorType.PINECONE,
config={}
),
ai_platform=v.AIPlatformSchema(
id="ai-012",
type=v.AIPlatformType.OPENAI,
config={}
),
schedule=v.ScheduleSchema(type="manual")
)

try:
# Create pipeline
print("Creating pipeline...")
response = pipelines_api.create_pipeline(org_id, pipeline_config)
pipeline_id = response.id
print(f"Pipeline created: {pipeline_id}")

# Start pipeline
print("Starting pipeline...")
pipelines_api.start_pipeline(org_id, pipeline_id)

# Monitor progress
print("Monitoring pipeline progress...")
while True:
pipeline = pipelines_api.get_pipeline(org_id, pipeline_id)
metrics = pipelines_api.get_pipeline_metrics(org_id, pipeline_id)

print(f"\nStatus: {pipeline.status}")
print(f"Documents: {metrics.documents_processed}")
print(f"Vectors: {metrics.vectors_created}")

if pipeline.status in ["IDLE", "HIBERNATING", "OVERQUOTA", "ERROR_DEPLOYING"]:
break

time.sleep(30)

print(f"\nPipeline finished with status: {pipeline.status}")

except Exception as e:
print(f"Error: {e}")

if __name__ == "__main__":
create_and_monitor_pipeline()

Next Steps

Was this page helpful?