Skip to main content

Deploy a Pipeline via the API

note

📢 Note: The API is currently in Beta.

In Vectorize you can create pipelines to ingest data from multiple sources into a Vector Database. In this guide, we will deploy a pipeline that will ingest a local file.

Prerequisites

Before you begin, you'll need:

  1. A Vectorize account
  2. An API access token (how to create one)
  3. Your organization ID (see below)

Finding your Organization ID

Your organization ID is in the Vectorize platform URL:

https://platform.vectorize.io/organization/[YOUR-ORG-ID]

For example, if your URL is:

https://platform.vectorize.io/organization/ecf3fa1d-30d0-4df1-8af6-f4852bc851cb

Your organization ID is: ecf3fa1d-30d0-4df1-8af6-f4852bc851cb

API Client Setup

import vectorize_client as v
import os

# Get credentials from environment variables
organization_id = os.environ.get("VECTORIZE_ORGANIZATION_ID")
api_key = os.environ.get("VECTORIZE_API_KEY")

if not organization_id or not api_key:
raise ValueError("Please set VECTORIZE_ORGANIZATION_ID and VECTORIZE_API_KEY environment variables")

# Initialize the API client
configuration = v.Configuration(
host="https://api.vectorize.io",
api_key={"ApiKeyAuth": api_key}
)
api = v.ApiClient(configuration)

print(f"✅ API client initialized for organization: {organization_id}")

Source: Create a File Upload connector

First, we create a File Upload connector that will hold our file.

import vectorize_client as v

# Create the connectors API client
connectors_api = v.SourceConnectorsApi(apiClient)

try:
# Create a file upload connector
file_upload = v.FileUpload(
name="my-document-upload",
type="FILE_UPLOAD",
config={}
)

request = v.CreateSourceConnectorRequest(file_upload)
response = connectors_api.create_source_connector(
organization_id,
request
)

connector_id = response.connector.id
print(f"✅ Created file upload connector: {connector_id}")

except Exception as e:
print(f"❌ Error creating connector: {e}")
raise

Then, we can upload the file:

import vectorize_client as v
import os
import urllib3

# Create uploads API client
uploads_api = v.UploadsApi(apiClient)

try:
# Step 1: Get upload URL
upload_request = v.StartFileUploadToConnectorRequest(
name=file_name,
content_type="text/plain"
)

start_response = uploads_api.start_file_upload_to_connector(
organization_id,
source_connector_id,
start_file_upload_to_connector_request=upload_request
)

# Step 2: Upload file to the URL
http = urllib3.PoolManager()
with open(file_path, "rb") as f:
response = http.request(
"PUT",
start_response.upload_url,
body=f,
headers={
"Content-Type": "text/plain",
"Content-Length": str(os.path.getsize(file_path))
}
)

if response.status == 200:
print(f"✅ Successfully uploaded: {file_name}")
else:
print(f"❌ Upload failed: {response.status}")

except Exception as e:
print(f"❌ Error uploading file: {e}")
raise

Configure and deploy the pipeline

Now we'll create a pipeline using the File Upload connector we just created along with the built-in AI platform and vector database.

import vectorize_client as v

# Create pipelines API client
pipelines_api = v.PipelinesApi(apiClient)

try:
# Configure your pipeline
pipeline_config = v.PipelineConfigurationSchema(
pipeline_name="My First Pipeline",
source_connectors=[
v.PipelineSourceConnectorSchema(
id=source_connector_id,
type="FILE_UPLOAD",
config={}
)
],
ai_platform_connector=v.PipelineAIPlatformConnectorSchema(
id=ai_platform_connector_id, # Uses Vectorize's built-in AI
type="VECTORIZE",
config={}
),
destination_connector=v.PipelineDestinationConnectorSchema(
id=destination_connector_id, # Uses Vectorize's built-in vector store
type="VECTORIZE",
config={}
),
schedule=v.ScheduleSchema(type="manual")
)

# Create the pipeline
response = pipelines_api.create_pipeline(
organization_id,
pipeline_config
)

pipeline_id = response.data.id
print(f"✅ Created pipeline: {pipeline_id}")

except Exception as e:
print(f"❌ Error creating pipeline: {e}")
raise

The pipeline will be deployed and our file will be ingested into the Vector Database.

Wait for Processing to Complete

After uploading, you'll want to wait for the pipeline to process your documents:

import vectorize_client as v
import time

# Create pipelines API client
pipelines_api = v.PipelinesApi(apiClient)

print("Waiting for pipeline to process your document...")
max_wait_time = 300 # 5 minutes
start_time = time.time()

while True:
try:
# Check pipeline status
pipeline = pipelines_api.get_pipeline(organization_id, pipeline_id)

status = pipeline.data.status

# Check if ready
if status == "LISTENING":
print("✅ Pipeline is ready!")
break
elif status == "PROCESSING":
print("⚙️ Still processing...")
elif status in ["ERROR_DEPLOYING", "SHUTDOWN"]:
print(f"❌ Pipeline error: {status}")
break

# Check timeout
if time.time() - start_time > max_wait_time:
print("⏰ Timeout waiting for pipeline")
break

time.sleep(10) # Check every 10 seconds

except Exception as e:
print(f"❌ Error checking status: {e}")
break

Complete Example

Here's all the code from this guide combined into a complete, runnable example:

import vectorize_client as v
import os

# Get credentials from environment variables
organization_id = os.environ.get("VECTORIZE_ORGANIZATION_ID")
api_key = os.environ.get("VECTORIZE_API_KEY")

if not organization_id or not api_key:
raise ValueError("Please set VECTORIZE_ORGANIZATION_ID and VECTORIZE_API_KEY environment variables")

# Initialize the API client
configuration = v.Configuration(
host="https://api.vectorize.io",
api_key={"ApiKeyAuth": api_key}
)
api = v.ApiClient(configuration)

print(f"✅ API client initialized for organization: {organization_id}")

import vectorize_client as v

# Create the connectors API client
connectors_api = v.SourceConnectorsApi(apiClient)

try:
# Create a file upload connector
file_upload = v.FileUpload(
name="my-document-upload",
type="FILE_UPLOAD",
config={}
)

request = v.CreateSourceConnectorRequest(file_upload)
response = connectors_api.create_source_connector(
organization_id,
request
)

connector_id = response.connector.id
print(f"✅ Created file upload connector: {connector_id}")

except Exception as e:
print(f"❌ Error creating connector: {e}")
raise

import vectorize_client as v
import os
import urllib3

# Create uploads API client
uploads_api = v.UploadsApi(apiClient)

try:
# Step 1: Get upload URL
upload_request = v.StartFileUploadToConnectorRequest(
name=file_name,
content_type="text/plain"
)

start_response = uploads_api.start_file_upload_to_connector(
organization_id,
source_connector_id,
start_file_upload_to_connector_request=upload_request
)

# Step 2: Upload file to the URL
http = urllib3.PoolManager()
with open(file_path, "rb") as f:
response = http.request(
"PUT",
start_response.upload_url,
body=f,
headers={
"Content-Type": "text/plain",
"Content-Length": str(os.path.getsize(file_path))
}
)

if response.status == 200:
print(f"✅ Successfully uploaded: {file_name}")
else:
print(f"❌ Upload failed: {response.status}")

except Exception as e:
print(f"❌ Error uploading file: {e}")
raise

import vectorize_client as v

# Create pipelines API client
pipelines_api = v.PipelinesApi(apiClient)

try:
# Configure your pipeline
pipeline_config = v.PipelineConfigurationSchema(
pipeline_name="My First Pipeline",
source_connectors=[
v.PipelineSourceConnectorSchema(
id=source_connector_id,
type="FILE_UPLOAD",
config={}
)
],
ai_platform_connector=v.PipelineAIPlatformConnectorSchema(
id=ai_platform_connector_id, # Uses Vectorize's built-in AI
type="VECTORIZE",
config={}
),
destination_connector=v.PipelineDestinationConnectorSchema(
id=destination_connector_id, # Uses Vectorize's built-in vector store
type="VECTORIZE",
config={}
),
schedule=v.ScheduleSchema(type="manual")
)

# Create the pipeline
response = pipelines_api.create_pipeline(
organization_id,
pipeline_config
)

pipeline_id = response.data.id
print(f"✅ Created pipeline: {pipeline_id}")

except Exception as e:
print(f"❌ Error creating pipeline: {e}")
raise

import vectorize_client as v
import time

# Create pipelines API client
pipelines_api = v.PipelinesApi(apiClient)

print("Waiting for pipeline to process your document...")
max_wait_time = 300 # 5 minutes
start_time = time.time()

while True:
try:
# Check pipeline status
pipeline = pipelines_api.get_pipeline(organization_id, pipeline_id)

status = pipeline.data.status

# Check if ready
if status == "LISTENING":
print("✅ Pipeline is ready!")
break
elif status == "PROCESSING":
print("⚙️ Still processing...")
elif status in ["ERROR_DEPLOYING", "SHUTDOWN"]:
print(f"❌ Pipeline error: {status}")
break

# Check timeout
if time.time() - start_time > max_wait_time:
print("⏰ Timeout waiting for pipeline")
break

time.sleep(10) # Check every 10 seconds

except Exception as e:
print(f"❌ Error checking status: {e}")
break

Next steps

Now you can either decide to perform a Vector Search or generate a Private Deep Research.

Was this page helpful?