Deploy a Pipeline via the API
📢 Note: The API is currently in Beta.
In Vectorize you can create pipelines to ingest data from multiple sources into a Vector Database. In this guide, we will deploy a pipeline that will ingest a local file.
Prerequisites
Before you begin, you'll need:
- A Vectorize account
- An API access token (how to create one)
- Your organization ID (see below)
Finding your Organization ID
Your organization ID is in the Vectorize platform URL:
https://platform.vectorize.io/organization/[YOUR-ORG-ID]
For example, if your URL is:
https://platform.vectorize.io/organization/ecf3fa1d-30d0-4df1-8af6-f4852bc851cb
Your organization ID is: ecf3fa1d-30d0-4df1-8af6-f4852bc851cb
API Client Setup
- Python
- Node.js
import vectorize_client as v
import os
# Get credentials from environment variables
organization_id = os.environ.get("VECTORIZE_ORGANIZATION_ID")
api_key = os.environ.get("VECTORIZE_API_KEY")
if not organization_id or not api_key:
raise ValueError("Please set VECTORIZE_ORGANIZATION_ID and VECTORIZE_API_KEY environment variables")
# Initialize the API client
configuration = v.Configuration(
host="https://api.vectorize.io",
api_key={"ApiKeyAuth": api_key}
)
api = v.ApiClient(configuration)
print(f"✅ API client initialized for organization: {organization_id}")
const v = require('@vectorize-io/vectorize-client');
// Get credentials from environment variables
const organizationId = process.env.VECTORIZE_ORGANIZATION_ID;
const apiKey = process.env.VECTORIZE_API_KEY;
if (!organizationId || !apiKey) {
throw new Error("Please set VECTORIZE_ORGANIZATION_ID and VECTORIZE_API_KEY environment variables");
}
// Initialize the API client
const configuration = new v.Configuration({
basePath: 'https://api.vectorize.io',
accessToken: apiKey
});
const apiClient = new v.ApiClient(configuration);
console.log(`✅ API client initialized for organization: ${organizationId}`);
Source: Create a File Upload connector
First, we create a File Upload connector that will hold our file.
- Python
- Node.js
import vectorize_client as v
# Create the connectors API client
connectors_api = v.SourceConnectorsApi(apiClient)
try:
# Create a file upload connector
file_upload = v.FileUpload(
name="my-document-upload",
type="FILE_UPLOAD",
config={}
)
request = v.CreateSourceConnectorRequest(file_upload)
response = connectors_api.create_source_connector(
organization_id,
request
)
connector_id = response.connector.id
print(f"✅ Created file upload connector: {connector_id}")
except Exception as e:
print(f"❌ Error creating connector: {e}")
raise
const { SourceConnectorsApi } = vectorize;
// Create the connectors API client
const connectorsApi = new SourceConnectorsApi(apiConfig);
// Creating connectors is an async operation
async function createFileUploadConnector() {
try {
// Create a file upload connector
const fileUpload = {
name: 'my-document-upload',
type: 'FILE_UPLOAD',
config: {} // File upload connectors don't require config
};
const response = await connectorsApi.createSourceConnector({
"your-org-id": "your-org-id",
createSourceConnectorRequest: fileUpload
});
const connectorId = response.connector.id;
console.log(`✅ Created file upload connector: ${connectorId}`);
return connectorId;
} catch (error) {
console.error(`❌ Error creating connector: ${error.message}`);
// Re-throw to let caller handle the error
throw error;
}
}
// Call the async function
const connectorId = await createFileUploadConnector();
Then, we can upload the file:
- Python
- Node.js
import vectorize_client as v
import os
import urllib3
# Create uploads API client
uploads_api = v.UploadsApi(apiClient)
try:
# Step 1: Get upload URL
upload_request = v.StartFileUploadToConnectorRequest(
name=file_name,
content_type="text/plain"
)
start_response = uploads_api.start_file_upload_to_connector(
organization_id,
source_connector_id,
start_file_upload_to_connector_request=upload_request
)
# Step 2: Upload file to the URL
http = urllib3.PoolManager()
with open(file_path, "rb") as f:
response = http.request(
"PUT",
start_response.upload_url,
body=f,
headers={
"Content-Type": "text/plain",
"Content-Length": str(os.path.getsize(file_path))
}
)
if response.status == 200:
print(f"✅ Successfully uploaded: {file_name}")
else:
print(f"❌ Upload failed: {response.status}")
except Exception as e:
print(f"❌ Error uploading file: {e}")
raise
const { UploadsApi } = vectorize;
const fs = require('fs');
// Create uploads API client
const uploadsApi = new UploadsApi(apiConfig);
// File uploads require two async steps
async function uploadDocument(connectorId, filePath, fileName) {
try {
// Step 1: Get a pre-signed upload URL from Vectorize
const uploadRequest = {
name: fileName,
contentType: 'text/plain'
};
const startResponse = await uploadsApi.startFileUploadToConnector({
"your-org-id": "your-org-id",
connectorId: connectorId,
startFileUploadToConnectorRequest: uploadRequest
});
// Step 2: Upload file directly to the pre-signed URL
const fileBuffer = fs.readFileSync(filePath);
const fileStats = fs.statSync(filePath);
const response = await fetch(startResponse.uploadUrl, {
method: 'PUT',
body: fileBuffer,
headers: {
'Content-Type': 'text/plain',
'Content-Length': fileStats.size.toString()
}
});
if (response.status === 200) {
console.log(`✅ Successfully uploaded: ${fileName}`);
return true;
} else {
const errorText = await response.text();
throw new Error(`Upload failed with status ${response.status}: ${errorText}`);
}
} catch (error) {
console.error(`❌ Error uploading file: ${error.message}`);
throw error;
}
}
// Call the async function
await uploadDocument(sourceConnectorId, filePath, fileName);
Configure and deploy the pipeline
Now we'll create a pipeline using the File Upload connector we just created along with the built-in AI platform and vector database.
- Python
- Node.js
import vectorize_client as v
# Create pipelines API client
pipelines_api = v.PipelinesApi(apiClient)
try:
# Configure your pipeline
pipeline_config = v.PipelineConfigurationSchema(
pipeline_name="My First Pipeline",
source_connectors=[
v.PipelineSourceConnectorSchema(
id=source_connector_id,
type="FILE_UPLOAD",
config={}
)
],
ai_platform_connector=v.PipelineAIPlatformConnectorSchema(
id=ai_platform_connector_id, # Uses Vectorize's built-in AI
type="VECTORIZE",
config={}
),
destination_connector=v.PipelineDestinationConnectorSchema(
id=destination_connector_id, # Uses Vectorize's built-in vector store
type="VECTORIZE",
config={}
),
schedule=v.ScheduleSchema(type="manual")
)
# Create the pipeline
response = pipelines_api.create_pipeline(
organization_id,
pipeline_config
)
pipeline_id = response.data.id
print(f"✅ Created pipeline: {pipeline_id}")
except Exception as e:
print(f"❌ Error creating pipeline: {e}")
raise
const { PipelinesApi } = vectorize;
// Create pipelines API client
const pipelinesApi = new PipelinesApi(apiConfig);
let pipelineId;
try {
// Configure your pipeline
const pipelineConfig = {
pipelineName: 'My First Pipeline',
sourceConnectors: [
{
id: sourceConnectorId,
type: 'FILE_UPLOAD',
config: {}
}
],
aiPlatformConnector: {
id: aiPlatformConnectorId, // Uses Vectorize's built-in AI
type: 'VECTORIZE',
config: {}
},
destinationConnector: {
id: destinationConnectorId, // Uses Vectorize's built-in vector store
type: 'VECTORIZE',
config: {}
},
schedule: { type: 'manual' }
};
// Create the pipeline
const response = await pipelinesApi.createPipeline({
"your-org-id": "your-org-id",
pipelineConfigurationSchema: pipelineConfig
});
pipelineId = response.data.id;
console.log(`✅ Created pipeline: ${pipelineId}`);
} catch (error) {
console.log(`❌ Error creating pipeline: ${error.message}`);
throw error;
}
The pipeline will be deployed and our file will be ingested into the Vector Database.
Wait for Processing to Complete
After uploading, you'll want to wait for the pipeline to process your documents:
- Python
- Node.js
import vectorize_client as v
import time
# Create pipelines API client
pipelines_api = v.PipelinesApi(apiClient)
print("Waiting for pipeline to process your document...")
max_wait_time = 300 # 5 minutes
start_time = time.time()
while True:
try:
# Check pipeline status
pipeline = pipelines_api.get_pipeline(organization_id, pipeline_id)
status = pipeline.data.status
# Check if ready
if status == "LISTENING":
print("✅ Pipeline is ready!")
break
elif status == "PROCESSING":
print("⚙️ Still processing...")
elif status in ["ERROR_DEPLOYING", "SHUTDOWN"]:
print(f"❌ Pipeline error: {status}")
break
# Check timeout
if time.time() - start_time > max_wait_time:
print("⏰ Timeout waiting for pipeline")
break
time.sleep(10) # Check every 10 seconds
except Exception as e:
print(f"❌ Error checking status: {e}")
break
const { PipelinesApi } = vectorize;
// Create pipelines API client
const pipelinesApi = new PipelinesApi(apiConfig);
console.log('Waiting for pipeline to process your document...');
const maxWaitTime = 300000; // 5 minutes in milliseconds
const startTime = Date.now();
while (true) {
try {
// Check pipeline status
const pipeline = await pipelinesApi.getPipeline({
"your-org-id": "your-org-id",
pipelineId: pipelineId
});
const status = pipeline.data.status;
// Check if ready
if (status === 'LISTENING') {
console.log('✅ Pipeline is ready!');
break;
} else if (status === 'PROCESSING') {
console.log('⚙️ Still processing...');
} else if (['ERROR_DEPLOYING', 'SHUTDOWN'].includes(status)) {
console.log(`❌ Pipeline error: ${status}`);
break;
}
// Check timeout
if (Date.now() - startTime > maxWaitTime) {
console.log('⏰ Timeout waiting for pipeline');
break;
}
await new Promise(resolve => setTimeout(resolve, 10000)); // Check every 10 seconds
} catch (error) {
console.log(`❌ Error checking status: ${error.message}`);
break;
}
}
Complete Example
Here's all the code from this guide combined into a complete, runnable example:
- Python
- Node.js
import vectorize_client as v
import os
# Get credentials from environment variables
organization_id = os.environ.get("VECTORIZE_ORGANIZATION_ID")
api_key = os.environ.get("VECTORIZE_API_KEY")
if not organization_id or not api_key:
raise ValueError("Please set VECTORIZE_ORGANIZATION_ID and VECTORIZE_API_KEY environment variables")
# Initialize the API client
configuration = v.Configuration(
host="https://api.vectorize.io",
api_key={"ApiKeyAuth": api_key}
)
api = v.ApiClient(configuration)
print(f"✅ API client initialized for organization: {organization_id}")
import vectorize_client as v
# Create the connectors API client
connectors_api = v.SourceConnectorsApi(apiClient)
try:
# Create a file upload connector
file_upload = v.FileUpload(
name="my-document-upload",
type="FILE_UPLOAD",
config={}
)
request = v.CreateSourceConnectorRequest(file_upload)
response = connectors_api.create_source_connector(
organization_id,
request
)
connector_id = response.connector.id
print(f"✅ Created file upload connector: {connector_id}")
except Exception as e:
print(f"❌ Error creating connector: {e}")
raise
import vectorize_client as v
import os
import urllib3
# Create uploads API client
uploads_api = v.UploadsApi(apiClient)
try:
# Step 1: Get upload URL
upload_request = v.StartFileUploadToConnectorRequest(
name=file_name,
content_type="text/plain"
)
start_response = uploads_api.start_file_upload_to_connector(
organization_id,
source_connector_id,
start_file_upload_to_connector_request=upload_request
)
# Step 2: Upload file to the URL
http = urllib3.PoolManager()
with open(file_path, "rb") as f:
response = http.request(
"PUT",
start_response.upload_url,
body=f,
headers={
"Content-Type": "text/plain",
"Content-Length": str(os.path.getsize(file_path))
}
)
if response.status == 200:
print(f"✅ Successfully uploaded: {file_name}")
else:
print(f"❌ Upload failed: {response.status}")
except Exception as e:
print(f"❌ Error uploading file: {e}")
raise
import vectorize_client as v
# Create pipelines API client
pipelines_api = v.PipelinesApi(apiClient)
try:
# Configure your pipeline
pipeline_config = v.PipelineConfigurationSchema(
pipeline_name="My First Pipeline",
source_connectors=[
v.PipelineSourceConnectorSchema(
id=source_connector_id,
type="FILE_UPLOAD",
config={}
)
],
ai_platform_connector=v.PipelineAIPlatformConnectorSchema(
id=ai_platform_connector_id, # Uses Vectorize's built-in AI
type="VECTORIZE",
config={}
),
destination_connector=v.PipelineDestinationConnectorSchema(
id=destination_connector_id, # Uses Vectorize's built-in vector store
type="VECTORIZE",
config={}
),
schedule=v.ScheduleSchema(type="manual")
)
# Create the pipeline
response = pipelines_api.create_pipeline(
organization_id,
pipeline_config
)
pipeline_id = response.data.id
print(f"✅ Created pipeline: {pipeline_id}")
except Exception as e:
print(f"❌ Error creating pipeline: {e}")
raise
import vectorize_client as v
import time
# Create pipelines API client
pipelines_api = v.PipelinesApi(apiClient)
print("Waiting for pipeline to process your document...")
max_wait_time = 300 # 5 minutes
start_time = time.time()
while True:
try:
# Check pipeline status
pipeline = pipelines_api.get_pipeline(organization_id, pipeline_id)
status = pipeline.data.status
# Check if ready
if status == "LISTENING":
print("✅ Pipeline is ready!")
break
elif status == "PROCESSING":
print("⚙️ Still processing...")
elif status in ["ERROR_DEPLOYING", "SHUTDOWN"]:
print(f"❌ Pipeline error: {status}")
break
# Check timeout
if time.time() - start_time > max_wait_time:
print("⏰ Timeout waiting for pipeline")
break
time.sleep(10) # Check every 10 seconds
except Exception as e:
print(f"❌ Error checking status: {e}")
break
const v = require('@vectorize-io/vectorize-client');
// Get credentials from environment variables
const organizationId = process.env.VECTORIZE_ORGANIZATION_ID;
const apiKey = process.env.VECTORIZE_API_KEY;
if (!organizationId || !apiKey) {
throw new Error("Please set VECTORIZE_ORGANIZATION_ID and VECTORIZE_API_KEY environment variables");
}
// Initialize the API client
const configuration = new v.Configuration({
basePath: 'https://api.vectorize.io',
accessToken: apiKey
});
const apiClient = new v.ApiClient(configuration);
console.log(`✅ API client initialized for organization: ${organizationId}`);
const { SourceConnectorsApi } = vectorize;
// Create the connectors API client
const connectorsApi = new SourceConnectorsApi(apiConfig);
// Creating connectors is an async operation
async function createFileUploadConnector() {
try {
// Create a file upload connector
const fileUpload = {
name: 'my-document-upload',
type: 'FILE_UPLOAD',
config: {} // File upload connectors don't require config
};
const response = await connectorsApi.createSourceConnector({
"your-org-id": "your-org-id",
createSourceConnectorRequest: fileUpload
});
const connectorId = response.connector.id;
console.log(`✅ Created file upload connector: ${connectorId}`);
return connectorId;
} catch (error) {
console.error(`❌ Error creating connector: ${error.message}`);
// Re-throw to let caller handle the error
throw error;
}
}
// Call the async function
const connectorId = await createFileUploadConnector();
const { UploadsApi } = vectorize;
const fs = require('fs');
// Create uploads API client
const uploadsApi = new UploadsApi(apiConfig);
// File uploads require two async steps
async function uploadDocument(connectorId, filePath, fileName) {
try {
// Step 1: Get a pre-signed upload URL from Vectorize
const uploadRequest = {
name: fileName,
contentType: 'text/plain'
};
const startResponse = await uploadsApi.startFileUploadToConnector({
"your-org-id": "your-org-id",
connectorId: connectorId,
startFileUploadToConnectorRequest: uploadRequest
});
// Step 2: Upload file directly to the pre-signed URL
const fileBuffer = fs.readFileSync(filePath);
const fileStats = fs.statSync(filePath);
const response = await fetch(startResponse.uploadUrl, {
method: 'PUT',
body: fileBuffer,
headers: {
'Content-Type': 'text/plain',
'Content-Length': fileStats.size.toString()
}
});
if (response.status === 200) {
console.log(`✅ Successfully uploaded: ${fileName}`);
return true;
} else {
const errorText = await response.text();
throw new Error(`Upload failed with status ${response.status}: ${errorText}`);
}
} catch (error) {
console.error(`❌ Error uploading file: ${error.message}`);
throw error;
}
}
// Call the async function
await uploadDocument(sourceConnectorId, filePath, fileName);
const { PipelinesApi } = vectorize;
// Create pipelines API client
const pipelinesApi = new PipelinesApi(apiConfig);
let pipelineId;
try {
// Configure your pipeline
const pipelineConfig = {
pipelineName: 'My First Pipeline',
sourceConnectors: [
{
id: sourceConnectorId,
type: 'FILE_UPLOAD',
config: {}
}
],
aiPlatformConnector: {
id: aiPlatformConnectorId, // Uses Vectorize's built-in AI
type: 'VECTORIZE',
config: {}
},
destinationConnector: {
id: destinationConnectorId, // Uses Vectorize's built-in vector store
type: 'VECTORIZE',
config: {}
},
schedule: { type: 'manual' }
};
// Create the pipeline
const response = await pipelinesApi.createPipeline({
"your-org-id": "your-org-id",
pipelineConfigurationSchema: pipelineConfig
});
pipelineId = response.data.id;
console.log(`✅ Created pipeline: ${pipelineId}`);
} catch (error) {
console.log(`❌ Error creating pipeline: ${error.message}`);
throw error;
}
const { PipelinesApi } = vectorize;
// Create pipelines API client
const pipelinesApi = new PipelinesApi(apiConfig);
console.log('Waiting for pipeline to process your document...');
const maxWaitTime = 300000; // 5 minutes in milliseconds
const startTime = Date.now();
while (true) {
try {
// Check pipeline status
const pipeline = await pipelinesApi.getPipeline({
"your-org-id": "your-org-id",
pipelineId: pipelineId
});
const status = pipeline.data.status;
// Check if ready
if (status === 'LISTENING') {
console.log('✅ Pipeline is ready!');
break;
} else if (status === 'PROCESSING') {
console.log('⚙️ Still processing...');
} else if (['ERROR_DEPLOYING', 'SHUTDOWN'].includes(status)) {
console.log(`❌ Pipeline error: ${status}`);
break;
}
// Check timeout
if (Date.now() - startTime > maxWaitTime) {
console.log('⏰ Timeout waiting for pipeline');
break;
}
await new Promise(resolve => setTimeout(resolve, 10000)); // Check every 10 seconds
} catch (error) {
console.log(`❌ Error checking status: ${error.message}`);
break;
}
}
Next steps
Now you can either decide to perform a Vector Search or generate a Private Deep Research.