Skip to main content

Create a Pipeline Programmatically

Learn how to create a complete RAG (Retrieval-Augmented Generation) pipeline using the Vectorize client, including source connectors, vector database, and AI platform configuration.

Prerequisites

Before you begin, you'll need:

  1. A Vectorize account
  2. An API access token (how to create one)
  3. Your organization ID (see below)

Finding your Organization ID

Your organization ID is in the Vectorize platform URL:

https://platform.vectorize.io/organization/[YOUR-ORG-ID]

For example, if your URL is:

https://platform.vectorize.io/organization/ecf3fa1d-30d0-4df1-8af6-f4852bc851cb

Your organization ID is: ecf3fa1d-30d0-4df1-8af6-f4852bc851cb

Additionally, you'll need:

  • One or more source connector IDs (e.g., AWS S3, Google Drive)
  • A destination connector ID (e.g., Pinecone)
  • An AI platform connector ID (e.g., OpenAI)
note

Connectors must be created through the Vectorize platform UI before creating a pipeline programmatically.

API Client Setup

import vectorize_client as v
import os

# Get credentials from environment variables
organization_id = os.environ.get("VECTORIZE_ORGANIZATION_ID")
api_key = os.environ.get("VECTORIZE_API_KEY")

if not organization_id or not api_key:
raise ValueError("Please set VECTORIZE_ORGANIZATION_ID and VECTORIZE_API_KEY environment variables")

# Initialize the API client
configuration = v.Configuration(
host="https://api.vectorize.io",
api_key={"ApiKeyAuth": api_key}
)
api = v.ApiClient(configuration)

print(f"โœ… API client initialized for organization: {organization_id}")

Create a Pipelineโ€‹

Create a RAG pipeline that connects your data sources to a vector database.

import vectorize_client as v

# Create pipelines client
pipelines_api = v.PipelinesApi(apiClient)

# Define your pipeline configuration
pipeline_configuration = v.PipelineConfigurationSchema(
pipeline_name=pipeline_name,
source_connectors=[
v.PipelineSourceConnectorSchema(
id=source_connector_id,
type="FILE_UPLOAD",
config={}
)
],
ai_platform_connector=v.PipelineAIPlatformConnectorSchema(
id=ai_platform_connector_id,
type="VECTORIZE",
config={}
),
destination_connector=v.PipelineDestinationConnectorSchema(
id=destination_connector_id,
type="VECTORIZE",
config={}
),
schedule=v.ScheduleSchema(type="manual")
)

# Create the pipeline
try:
response = pipelines_api.create_pipeline(
organization_id,
pipeline_configuration
)

pipeline_id = response.data.id
print(f"Pipeline created successfully! ID: {pipeline_id}")

except Exception as e:
print(f"Error creating pipeline: {e}")
raise

Monitor Pipeline Progressโ€‹

Track the status and progress of your pipeline processing.

import vectorize_client as v
import time

# Create pipelines client
pipelines_api = v.PipelinesApi(apiClient)

while True:
try:
# Get pipeline status
pipeline = pipelines_api.get_pipeline(organization_id, pipeline_id)
current_status = pipeline.data.status
print(f"Pipeline status: {current_status}")

# Get pipeline metrics
metrics = pipelines_api.get_pipeline_metrics(organization_id, pipeline_id)

if metrics.data and isinstance(metrics.data, list):
# Sum up all the metrics
total_new = sum(m.new_objects for m in metrics.data)
total_changed = sum(m.changed_objects for m in metrics.data)
total_deleted = sum(m.deleted_objects for m in metrics.data)

print(f" ๐Ÿ“Š Total new objects: {total_new}")
print(f" ๐Ÿ“Š Total changed objects: {total_changed}")
print(f" ๐Ÿ“Š Total deleted objects: {total_deleted}")

# Get today's metrics if available
if metrics.data:
latest = metrics.data[-1]
print(f" ๐Ÿ“… Latest ({latest.timestamp}):")
print(f" New: {latest.new_objects}, Changed: {latest.changed_objects}, Deleted: {latest.deleted_objects}")

# Check for terminal states
if current_status in ["IDLE", "HIBERNATING", "OVERQUOTA", "ERROR_DEPLOYING", "SHUTDOWN"]:
print(f"Pipeline reached terminal state: {current_status}")
break

# Pipeline is listening
if current_status in ["LISTENING"]:
print(f"Pipeline is ready and listening for new data")
break

# Pipeline is actively processing
if current_status in ["PROCESSING", "DEPLOYING"]:
print(f"Pipeline is actively working: {current_status}")

# Check if we've been monitoring too long
if time.time() - monitoring_start_time > max_monitoring_time:
print("โฐ Maximum monitoring time reached")
break

except Exception as e:
print(f"Error monitoring pipeline: {e}")
break

print("โณ Waiting 30 seconds before next check...")
time.sleep(30)

Next Stepsโ€‹

Complete Exampleโ€‹

Here's all the code from this guide combined into a complete, runnable example:

Required Environment Variables:
โ€ข `VECTORIZE_API_KEY`
โ€ข `VECTORIZE_ORGANIZATION_ID`

Additional Requirements:
โ€ข Requires a source connector ID
#!/usr/bin/env python3
"""
Complete example for creating and monitoring a RAG pipeline.
This is a hand-written example that corresponds to the test file:
api-clients/python/tests/pipelines/create_pipeline.py

IMPORTANT: Keep this file in sync with the test file's snippets!
"""

import os
import sys
import time
import vectorize_client as v


def get_api_config():
"""Get API configuration from environment variables."""
organization_id = os.environ.get("VECTORIZE_ORGANIZATION_ID")
api_key = os.environ.get("VECTORIZE_API_KEY")

if not organization_id or not api_key:
print("๐Ÿ”‘ Setup required:")
print("1. Get your API key from: https://app.vectorize.io/settings")
print("2. Set environment variables:")
print(" export VECTORIZE_ORGANIZATION_ID='your-org-id'")
print(" export VECTORIZE_API_KEY='your-api-key'")
sys.exit(1)

# Always use production API
configuration = v.Configuration(
host="https://api.vectorize.io/v1",
access_token=api_key
)

return configuration, organization_id


def create_file_upload_connector(api_client, organization_id):
"""Create a file upload source connector."""
print("๐Ÿ“ Creating file upload source connector...")
connectors_api = v.SourceConnectorsApi(api_client)

try:
# Create the FileUpload object
file_upload = v.FileUpload(
name=f"pipeline-source-example",
type="FILE_UPLOAD",
config={}
)

# Create the request with the FileUpload object
request = v.CreateSourceConnectorRequest(file_upload)

response = connectors_api.create_source_connector(
organization_id,
request
)

print(f"โœ… Created file upload connector: {response.connector.name}")
print(f" Connector ID: {response.connector.id}\n")

return response.connector.id

except Exception as e:
print(f"โŒ Error creating file upload connector: {e}")
raise


def create_pipeline(api_client, organization_id, source_connector_id, pipeline_name, ai_platform_connector_id, destination_connector_id):
"""Create a new RAG pipeline."""
print(f"๐Ÿš€ Creating pipeline: {pipeline_name}")
# Create pipelines client
pipelines_api = v.PipelinesApi(api_client)

# Define your pipeline configuration
pipeline_configuration = v.PipelineConfigurationSchema(
pipeline_name=pipeline_name,
source_connectors=[
v.PipelineSourceConnectorSchema(
id=source_connector_id,
type="FILE_UPLOAD",
config={}
)
],
ai_platform_connector=v.PipelineAIPlatformConnectorSchema(
id=ai_platform_connector_id,
type="VECTORIZE",
config={}
),
destination_connector=v.PipelineDestinationConnectorSchema(
id=destination_connector_id,
type="VECTORIZE",
config={}
),
schedule=v.ScheduleSchema(type="manual")
)

# Create the pipeline
try:
response = pipelines_api.create_pipeline(
organization_id,
pipeline_configuration
)

pipeline_id = response.data.id
print(f"Pipeline created successfully! ID: {pipeline_id}")

return pipeline_id

except Exception as e:
print(f"Error creating pipeline: {e}")
raise


def monitor_pipeline(api_client, organization_id, pipeline_id, monitoring_start_time, max_monitoring_time):
"""Monitor pipeline status and metrics."""
print("\n๐Ÿ“Š Starting pipeline monitoring...")
# Create pipelines client
pipelines_api = v.PipelinesApi(api_client)
current_status = None
check_count = 0

while True:
check_count += 1
try:
# Get pipeline status
pipeline = pipelines_api.get_pipeline(organization_id, pipeline_id)
current_status = pipeline.data.status
print(f"Check #{check_count} - Pipeline status: {current_status}")

# Get pipeline metrics
metrics = pipelines_api.get_pipeline_metrics(organization_id, pipeline_id)

if metrics.data and isinstance(metrics.data, list):
# Sum up all the metrics
total_new = sum(m.new_objects for m in metrics.data)
total_changed = sum(m.changed_objects for m in metrics.data)
total_deleted = sum(m.deleted_objects for m in metrics.data)

print(f" ๐Ÿ“Š Total new objects: {total_new}")
print(f" ๐Ÿ“Š Total changed objects: {total_changed}")
print(f" ๐Ÿ“Š Total deleted objects: {total_deleted}")

# Get today's metrics if available
if metrics.data:
latest = metrics.data[-1]
print(f" ๐Ÿ“… Latest ({latest.timestamp}):")
print(f" New: {latest.new_objects}, Changed: {latest.changed_objects}, Deleted: {latest.deleted_objects}")

# Check for terminal states
if current_status in ["IDLE", "HIBERNATING", "OVERQUOTA", "ERROR_DEPLOYING", "SHUTDOWN"]:
print(f"Pipeline reached terminal state: {current_status}")
break

# Pipeline is listening
if current_status in ["LISTENING"]:
print(f"Pipeline is ready and listening for new data")
break

# Pipeline is actively processing
if current_status in ["PROCESSING", "DEPLOYING"]:
print(f"Pipeline is actively working: {current_status}")

# Check if we've been monitoring too long
if time.time() - monitoring_start_time > max_monitoring_time:
print("โฐ Maximum monitoring time reached")
break

except Exception as e:
print(f"Error monitoring pipeline: {e}")
break

# Use shorter interval for test environment (5 seconds instead of 30)
print("โณ Waiting 5 seconds before next check...")
time.sleep(5)

return current_status


def cleanup_pipeline(api_client, organization_id, pipeline_id, source_connector_id):
"""Clean up created resources."""
print("\n๐Ÿงน Cleaning Up Resources")

try:
# Stop pipeline if running
pipelines_api = v.PipelinesApi(api_client)
try:
pipelines_api.stop_pipeline(organization_id, pipeline_id)
print(" โœ… Pipeline stopped")
except:
pass

# Delete pipeline
try:
pipelines_api.delete_pipeline(organization_id, pipeline_id)
print(" โœ… Pipeline deleted")
except Exception as e:
print(f" โš ๏ธ Could not delete pipeline: {e}")

# Delete source connector
try:
connectors_api = v.SourceConnectorsApi(api_client)
connectors_api.delete_source_connector(organization_id, source_connector_id)
print(" โœ… Source connector deleted")
except Exception as e:
print(f" โš ๏ธ Could not delete source connector: {e}")

except Exception as e:
print(f" โš ๏ธ Cleanup warning: {e}")


def main():
"""Main function demonstrating pipeline creation and monitoring."""
print("=== Create and Monitor RAG Pipeline Example ===")
print("โฑ๏ธ Expected runtime: ~30-45 seconds\n")

try:
# Get configuration
configuration, organization_id = get_api_config()

print(f"โš™๏ธ Configuration:")
print(f" Organization ID: {organization_id}")
print(f" Host: {configuration.host}\n")

# Get required connector IDs
ai_platform_connector_id = os.environ.get("VECTORIZE_AI_PLATFORM_CONNECTOR_ID")
destination_connector_id = os.environ.get("VECTORIZE_DESTINATION_CONNECTOR_ID")

if not ai_platform_connector_id or not destination_connector_id:
print("โŒ Missing required connector IDs")
print(" Please set:")
print(" - VECTORIZE_AI_PLATFORM_CONNECTOR_ID")
print(" - VECTORIZE_DESTINATION_CONNECTOR_ID")
print("\n๐Ÿ’ก These connectors are typically set up in the Vectorize dashboard")
sys.exit(1)

# Initialize API client
# Initialize API client with proper headers for local env
with v.ApiClient(configuration) as api_client:
pipeline_id = None
source_connector_id = None

try:
# Step 1: Create source connector
print("๐Ÿ“ Creating File Upload Connector")
source_connector_id = create_file_upload_connector(api_client, organization_id)

# Step 2: Create pipeline
print("โš™๏ธ Creating RAG Pipeline")
pipeline_name = "RAG Pipeline Example"
pipeline_id = create_pipeline(
api_client, organization_id, source_connector_id,
pipeline_name, ai_platform_connector_id, destination_connector_id
)

print(f"โœ… Pipeline '{pipeline_name}' created successfully!")
print(f" Pipeline ID: {pipeline_id}")
print(f" Source Connector: {source_connector_id}")
print(f" AI Platform: {ai_platform_connector_id}")
print(f" Destination: {destination_connector_id}\n")

# Step 3: Monitor pipeline
print("๐Ÿ“Š Monitoring Pipeline Status")
print(" The pipeline will now deploy and begin listening for data...")
print(" This process may take a few minutes.\n")

monitoring_start_time = time.time()
max_monitoring_time = 45 # 45 seconds max for test environment

final_status = monitor_pipeline(
api_client, organization_id, pipeline_id,
monitoring_start_time, max_monitoring_time
)

# Step 4: Report final status
print(f"\n๐Ÿ“‹ Final Pipeline Status: {final_status}")

if final_status == "LISTENING":
print("๐ŸŽ‰ Pipeline is ready to process data!")
print(" You can now:")
print(" - Upload files to the source connector")
print(" - Use the pipeline for deep research queries")
print(" - Monitor processing through the dashboard")
elif final_status == "IDLE":
print("โœ… Pipeline deployment completed successfully")
print(" Pipeline is in idle state and ready for data")
elif final_status in ["ERROR_DEPLOYING", "SHUTDOWN"]:
print("โš ๏ธ Pipeline encountered an issue")
print(" Check the Vectorize dashboard for detailed error information")
else:
print(f"โ„น๏ธ Pipeline is in {final_status} state")
print(" This may be normal depending on your configuration")

finally:
# Step 5: Clean up (optional)
if pipeline_id and source_connector_id:
print(f"\nโ“ Clean up resources?")
print(" Uncomment the cleanup section below to remove the created pipeline")
print(" For learning purposes, you may want to keep the pipeline active")

# Uncomment the line below if you want to clean up automatically
# cleanup_pipeline(api_client, organization_id, pipeline_id, source_connector_id)

except ValueError as e:
print(f"โŒ Configuration Error: {e}")
print("\n๐Ÿ’ก Make sure to set the required environment variables:")
print(" export VECTORIZE_ORGANIZATION_ID='your-org-id'")
print(" export VECTORIZE_API_KEY='your-api-key'")
print(" export VECTORIZE_AI_PLATFORM_CONNECTOR_ID='connector-id'")
print(" export VECTORIZE_DESTINATION_CONNECTOR_ID='connector-id'")

except Exception as e:
print(f"โŒ Error: {e}")
sys.exit(1)


if __name__ == "__main__":
main()

Was this page helpful?