Skip to main content

Create a Pipeline Programmatically

Learn how to create a complete RAG (Retrieval-Augmented Generation) pipeline using the Vectorize client, including source connectors, vector database, and AI platform configuration.

Prerequisites

Before you begin, you'll need:

  1. A Vectorize account
  2. An API access token (how to create one)
  3. Your organization ID (see below)

Finding your Organization ID

Your organization ID is in the Vectorize platform URL:

https://platform.vectorize.io/organization/[YOUR-ORG-ID]

For example, if your URL is:

https://platform.vectorize.io/organization/ecf3fa1d-30d0-4df1-8af6-f4852bc851cb

Your organization ID is: ecf3fa1d-30d0-4df1-8af6-f4852bc851cb

Additionally, you'll need:

  • One or more source connector IDs (e.g., AWS S3, Google Drive)
  • A destination connector ID (e.g., Pinecone)
  • An AI platform connector ID (e.g., OpenAI)
note

Connectors must be created through the Vectorize platform UI before creating a pipeline programmatically.

API Client Setup

import vectorize_client as v
import os

# Get credentials from environment variables
organization_id = "your-org-id"
api_key = "your-api-key"

if not organization_id or not api_key:
raise ValueError("Please set VECTORIZE_ORGANIZATION_ID and VECTORIZE_API_KEY environment variables")

# Initialize the API client
configuration = v.Configuration(
host="https://api.vectorize.io",
api_key={"ApiKeyAuth": api_key}
)
api = v.ApiClient(configuration)

print(f"✅ API client initialized for organization: {"your-org-id"}")

Create a Pipeline

Create a RAG pipeline that connects your data sources to a vector database.

# Create pipelines client
pipelines_api = v.PipelinesApi(api)

# Define your pipeline configuration
pipeline_configuration = v.PipelineConfigurationSchema(
pipeline_name=pipeline_name,
source_connectors=[
v.PipelineSourceConnectorSchema(
id=source_connector_id,
type="FILE_UPLOAD",
config={}
)
],
ai_platform_connector=v.PipelineAIPlatformConnectorSchema(
id=ai_platform_connector_id,
type="VECTORIZE",
config={}
),
destination_connector=v.PipelineDestinationConnectorSchema(
id=destination_connector_id,
type="VECTORIZE",
config={}
),
schedule=v.ScheduleSchema(type="manual")
)

# Create the pipeline
try:
response = pipelines_api.create_pipeline(
organization_id,
pipeline_configuration
)

pipeline_id = response.data.id
print(f"Pipeline created successfully! ID: {pipeline_id}")

except Exception as e:
print(f"Error creating pipeline: {e}")
raise

Monitor Pipeline Progress

Track the status and progress of your pipeline processing.

# Create pipelines client
pipelines_api = v.PipelinesApi(api)

while True:
try:
# Get pipeline status
pipeline = pipelines_api.get_pipeline(organization_id, pipeline_id)
current_status = pipeline.data.status
print(f"Pipeline status: {current_status}")

# Get pipeline metrics
metrics = pipelines_api.get_pipeline_metrics(organization_id, pipeline_id)

if metrics.data and isinstance(metrics.data, list):
# Sum up all the metrics
total_new = sum(m.new_objects for m in metrics.data)
total_changed = sum(m.changed_objects for m in metrics.data)
total_deleted = sum(m.deleted_objects for m in metrics.data)

print(f" 📊 Total new objects: {total_new}")
print(f" 📊 Total changed objects: {total_changed}")
print(f" 📊 Total deleted objects: {total_deleted}")

# Get today's metrics if available
if metrics.data:
latest = metrics.data[-1]
print(f" 📅 Latest ({latest.timestamp}):")
print(f" New: {latest.new_objects}, Changed: {latest.changed_objects}, Deleted: {latest.deleted_objects}")

# Check for terminal states
if current_status in ["IDLE", "HIBERNATING", "OVERQUOTA", "ERROR_DEPLOYING", "SHUTDOWN"]:
print(f"Pipeline reached terminal state: {current_status}")
break

# Pipeline is listening
if current_status in ["LISTENING"]:
print(f"Pipeline is ready and listening for new data")
break

# Pipeline is actively processing
if current_status in ["PROCESSING", "DEPLOYING"]:
print(f"Pipeline is actively working: {current_status}")

# Check if we've been monitoring too long
if time.time() - monitoring_start_time > max_monitoring_time:
print("⏰ Maximum monitoring time reached")
break

except Exception as e:
print(f"Error monitoring pipeline: {e}")
break

print("⏳ Waiting 30 seconds before next check...")
time.sleep(30)

Next Steps

Was this page helpful?