Create a Pipeline Programmatically
Learn how to create a complete RAG (Retrieval-Augmented Generation) pipeline using the Vectorize client, including source connectors, vector database, and AI platform configuration.
Prerequisites
Before you begin, you'll need:
- A Vectorize account
- An API access token (how to create one)
- Your organization ID (see below)
Finding your Organization ID
Your organization ID is in the Vectorize platform URL:
https://platform.vectorize.io/organization/[YOUR-ORG-ID]
For example, if your URL is:
https://platform.vectorize.io/organization/ecf3fa1d-30d0-4df1-8af6-f4852bc851cb
Your organization ID is: ecf3fa1d-30d0-4df1-8af6-f4852bc851cb
Additionally, you'll need:
- One or more source connector IDs (e.g., AWS S3, Google Drive)
- A destination connector ID (e.g., Pinecone)
- An AI platform connector ID (e.g., OpenAI)
note
Connectors must be created through the Vectorize platform UI before creating a pipeline programmatically.
API Client Setup
- Python
- Node.js
import vectorize_client as v
import os
# Get credentials from environment variables
organization_id = os.environ.get("VECTORIZE_ORGANIZATION_ID")
api_key = os.environ.get("VECTORIZE_API_KEY")
if not organization_id or not api_key:
raise ValueError("Please set VECTORIZE_ORGANIZATION_ID and VECTORIZE_API_KEY environment variables")
# Initialize the API client
configuration = v.Configuration(
host="https://api.vectorize.io",
api_key={"ApiKeyAuth": api_key}
)
api = v.ApiClient(configuration)
print(f"โ
API client initialized for organization: {organization_id}")
const vectorize = require('@vectorize-io/vectorize-client')
// COMPLETE_EXAMPLE_PREREQUISITES:
// - env_vars: VECTORIZE_API_KEY, VECTORIZE_ORGANIZATION_ID
// - description: Initialize the Vectorize API client for making API calls
// Get credentials from environment variables
const organizationId = process.env.VECTORIZE_ORGANIZATION_ID;
const apiKey = process.env.VECTORIZE_API_KEY;
if (!organizationId || !apiKey) {
throw new Error("Please set VECTORIZE_ORGANIZATION_ID and VECTORIZE_API_KEY environment variables");
}
// Initialize the API client
const configuration = new vectorize.Configuration({
basePath: 'https://api.vectorize.io',
accessToken: apiKey
});
const apiClient = new vectorize.ApiClient(configuration);
console.log(`โ
API client initialized for organization: ${organizationId}`);
Create a Pipelineโ
Create a RAG pipeline that connects your data sources to a vector database.
- Python
- Node.js
import vectorize_client as v
# Create pipelines client
pipelines_api = v.PipelinesApi(apiClient)
# Define your pipeline configuration
pipeline_configuration = v.PipelineConfigurationSchema(
pipeline_name=pipeline_name,
source_connectors=[
v.PipelineSourceConnectorSchema(
id=source_connector_id,
type="FILE_UPLOAD",
config={}
)
],
ai_platform_connector=v.PipelineAIPlatformConnectorSchema(
id=ai_platform_connector_id,
type="VECTORIZE",
config={}
),
destination_connector=v.PipelineDestinationConnectorSchema(
id=destination_connector_id,
type="VECTORIZE",
config={}
),
schedule=v.ScheduleSchema(type="manual")
)
# Create the pipeline
try:
response = pipelines_api.create_pipeline(
organization_id,
pipeline_configuration
)
pipeline_id = response.data.id
print(f"Pipeline created successfully! ID: {pipeline_id}")
except Exception as e:
print(f"Error creating pipeline: {e}")
raise
// This snippet uses async operations and should be run in an async context
(async () => {
const vectorize = require('@vectorize-io/vectorize-client')
const { PipelinesApi } = vectorize;
// Create pipelines client
const pipelinesApi = new PipelinesApi(apiClient);
// Define your pipeline configuration
const pipelineConfiguration = {
pipelineName: pipelineName,
sourceConnectors: [
{
id: sourceConnectorId,
type: "FILE_UPLOAD",
config: {}
}
],
destinationConnector: {
id: destinationConnectorId,
type: "VECTORIZE",
config: {}
},
aiPlatformConnector: {
id: aiPlatformConnectorId,
type: "VECTORIZE",
config: {}
},
schedule: { type: "manual" }
};
// Create the pipeline
let pipelineId;
try {
const response = await pipelinesApi.createPipeline({
organizationId: "your-org-id",
pipelineConfigurationSchema: pipelineConfiguration
});
pipelineId = response.data.id;
console.log(`Pipeline created successfully! ID: ${pipelineId}`);
} catch (error) {
console.log(`Error creating pipeline: ${error.message}`);
throw error;
}
})();
Monitor Pipeline Progressโ
Track the status and progress of your pipeline processing.
- Python
- Node.js
import vectorize_client as v
import time
# Create pipelines client
pipelines_api = v.PipelinesApi(apiClient)
while True:
try:
# Get pipeline status
pipeline = pipelines_api.get_pipeline(organization_id, pipeline_id)
current_status = pipeline.data.status
print(f"Pipeline status: {current_status}")
# Get pipeline metrics
metrics = pipelines_api.get_pipeline_metrics(organization_id, pipeline_id)
if metrics.data and isinstance(metrics.data, list):
# Sum up all the metrics
total_new = sum(m.new_objects for m in metrics.data)
total_changed = sum(m.changed_objects for m in metrics.data)
total_deleted = sum(m.deleted_objects for m in metrics.data)
print(f" ๐ Total new objects: {total_new}")
print(f" ๐ Total changed objects: {total_changed}")
print(f" ๐ Total deleted objects: {total_deleted}")
# Get today's metrics if available
if metrics.data:
latest = metrics.data[-1]
print(f" ๐
Latest ({latest.timestamp}):")
print(f" New: {latest.new_objects}, Changed: {latest.changed_objects}, Deleted: {latest.deleted_objects}")
# Check for terminal states
if current_status in ["IDLE", "HIBERNATING", "OVERQUOTA", "ERROR_DEPLOYING", "SHUTDOWN"]:
print(f"Pipeline reached terminal state: {current_status}")
break
# Pipeline is listening
if current_status in ["LISTENING"]:
print(f"Pipeline is ready and listening for new data")
break
# Pipeline is actively processing
if current_status in ["PROCESSING", "DEPLOYING"]:
print(f"Pipeline is actively working: {current_status}")
# Check if we've been monitoring too long
if time.time() - monitoring_start_time > max_monitoring_time:
print("โฐ Maximum monitoring time reached")
break
except Exception as e:
print(f"Error monitoring pipeline: {e}")
break
print("โณ Waiting 30 seconds before next check...")
time.sleep(30)
// This snippet uses async operations and should be run in an async context
(async () => {
const vectorize = require('@vectorize-io/vectorize-client')
// COMPLETE_EXAMPLE_PREREQUISITES:
// - env_vars: VECTORIZE_API_KEY, VECTORIZE_ORGANIZATION_ID
// - notes: Requires a source connector ID
// - description: Create and monitor a RAG pipeline
const { PipelinesApi } = vectorize;
// Create pipelines client
const pipelinesApi = new PipelinesApi(apiClient);
while (true) {
try {
// Get pipeline status
const pipeline = await pipelinesApi.getPipeline({
organizationId: "your-org-id",
pipelineId: pipelineId
});
currentStatus = pipeline.data.status;
console.log(`Pipeline status: ${currentStatus}`);
// Get pipeline metrics
const metrics = await pipelinesApi.getPipelineMetrics({
organizationId: "your-org-id",
pipelineId: pipelineId
});
if (metrics.data && Array.isArray(metrics.data)) {
// Sum up all the metrics
const totalNew = metrics.data.reduce((sum, m) => sum + (m.newObjects || 0), 0);
const totalChanged = metrics.data.reduce((sum, m) => sum + (m.changedObjects || 0), 0);
const totalDeleted = metrics.data.reduce((sum, m) => sum + (m.deletedObjects || 0), 0);
console.log(` ๐ Total new objects: ${totalNew}`);
console.log(` ๐ Total changed objects: ${totalChanged}`);
console.log(` ๐ Total deleted objects: ${totalDeleted}`);
// Get today's metrics if available
if (metrics.data.length > 0) {
const latest = metrics.data[metrics.data.length - 1];
console.log(` ๐
Latest (${latest.timestamp}):`);
console.log(` New: ${latest.newObjects || 0}, Changed: ${latest.changedObjects || 0}, Deleted: ${latest.deletedObjects || 0}`);
}
}
// Check for terminal states
if (["IDLE", "HIBERNATING", "OVERQUOTA", "ERROR_DEPLOYING", "SHUTDOWN"].includes(currentStatus)) {
console.log(`Pipeline reached terminal state: ${currentStatus}`);
break;
}
// Pipeline is listening
if (["LISTENING"].includes(currentStatus)) {
console.log(`Pipeline is ready and listening for new data`);
break;
}
// Pipeline is actively processing
if (["PROCESSING", "DEPLOYING"].includes(currentStatus)) {
console.log(`Pipeline is actively working: ${currentStatus}`);
}
// Check if we've been monitoring too long
if (Date.now() - monitoringStartTime > maxMonitoringTime) {
console.log("โฐ Maximum monitoring time reached");
break;
}
} catch (error) {
console.log(`Error monitoring pipeline: ${error.message}`);
break;
}
console.log("โณ Waiting 30 seconds before next check...");
await new Promise(resolve => setTimeout(resolve, 30000));
}
})();
Next Stepsโ
Complete Exampleโ
Here's all the code from this guide combined into a complete, runnable example:
- Python
- Node.js
Required Environment Variables:
โข `VECTORIZE_API_KEY`
โข `VECTORIZE_ORGANIZATION_ID`
Additional Requirements:
โข Requires a source connector ID
โข `VECTORIZE_API_KEY`
โข `VECTORIZE_ORGANIZATION_ID`
Additional Requirements:
โข Requires a source connector ID
#!/usr/bin/env python3
"""
Complete example for creating and monitoring a RAG pipeline.
This is a hand-written example that corresponds to the test file:
api-clients/python/tests/pipelines/create_pipeline.py
IMPORTANT: Keep this file in sync with the test file's snippets!
"""
import os
import sys
import time
import vectorize_client as v
def get_api_config():
"""Get API configuration from environment variables."""
organization_id = os.environ.get("VECTORIZE_ORGANIZATION_ID")
api_key = os.environ.get("VECTORIZE_API_KEY")
if not organization_id or not api_key:
print("๐ Setup required:")
print("1. Get your API key from: https://app.vectorize.io/settings")
print("2. Set environment variables:")
print(" export VECTORIZE_ORGANIZATION_ID='your-org-id'")
print(" export VECTORIZE_API_KEY='your-api-key'")
sys.exit(1)
# Always use production API
configuration = v.Configuration(
host="https://api.vectorize.io/v1",
access_token=api_key
)
return configuration, organization_id
def create_file_upload_connector(api_client, organization_id):
"""Create a file upload source connector."""
print("๐ Creating file upload source connector...")
connectors_api = v.SourceConnectorsApi(api_client)
try:
# Create the FileUpload object
file_upload = v.FileUpload(
name=f"pipeline-source-example",
type="FILE_UPLOAD",
config={}
)
# Create the request with the FileUpload object
request = v.CreateSourceConnectorRequest(file_upload)
response = connectors_api.create_source_connector(
organization_id,
request
)
print(f"โ
Created file upload connector: {response.connector.name}")
print(f" Connector ID: {response.connector.id}\n")
return response.connector.id
except Exception as e:
print(f"โ Error creating file upload connector: {e}")
raise
def create_pipeline(api_client, organization_id, source_connector_id, pipeline_name, ai_platform_connector_id, destination_connector_id):
"""Create a new RAG pipeline."""
print(f"๐ Creating pipeline: {pipeline_name}")
# Create pipelines client
pipelines_api = v.PipelinesApi(api_client)
# Define your pipeline configuration
pipeline_configuration = v.PipelineConfigurationSchema(
pipeline_name=pipeline_name,
source_connectors=[
v.PipelineSourceConnectorSchema(
id=source_connector_id,
type="FILE_UPLOAD",
config={}
)
],
ai_platform_connector=v.PipelineAIPlatformConnectorSchema(
id=ai_platform_connector_id,
type="VECTORIZE",
config={}
),
destination_connector=v.PipelineDestinationConnectorSchema(
id=destination_connector_id,
type="VECTORIZE",
config={}
),
schedule=v.ScheduleSchema(type="manual")
)
# Create the pipeline
try:
response = pipelines_api.create_pipeline(
organization_id,
pipeline_configuration
)
pipeline_id = response.data.id
print(f"Pipeline created successfully! ID: {pipeline_id}")
return pipeline_id
except Exception as e:
print(f"Error creating pipeline: {e}")
raise
def monitor_pipeline(api_client, organization_id, pipeline_id, monitoring_start_time, max_monitoring_time):
"""Monitor pipeline status and metrics."""
print("\n๐ Starting pipeline monitoring...")
# Create pipelines client
pipelines_api = v.PipelinesApi(api_client)
current_status = None
check_count = 0
while True:
check_count += 1
try:
# Get pipeline status
pipeline = pipelines_api.get_pipeline(organization_id, pipeline_id)
current_status = pipeline.data.status
print(f"Check #{check_count} - Pipeline status: {current_status}")
# Get pipeline metrics
metrics = pipelines_api.get_pipeline_metrics(organization_id, pipeline_id)
if metrics.data and isinstance(metrics.data, list):
# Sum up all the metrics
total_new = sum(m.new_objects for m in metrics.data)
total_changed = sum(m.changed_objects for m in metrics.data)
total_deleted = sum(m.deleted_objects for m in metrics.data)
print(f" ๐ Total new objects: {total_new}")
print(f" ๐ Total changed objects: {total_changed}")
print(f" ๐ Total deleted objects: {total_deleted}")
# Get today's metrics if available
if metrics.data:
latest = metrics.data[-1]
print(f" ๐
Latest ({latest.timestamp}):")
print(f" New: {latest.new_objects}, Changed: {latest.changed_objects}, Deleted: {latest.deleted_objects}")
# Check for terminal states
if current_status in ["IDLE", "HIBERNATING", "OVERQUOTA", "ERROR_DEPLOYING", "SHUTDOWN"]:
print(f"Pipeline reached terminal state: {current_status}")
break
# Pipeline is listening
if current_status in ["LISTENING"]:
print(f"Pipeline is ready and listening for new data")
break
# Pipeline is actively processing
if current_status in ["PROCESSING", "DEPLOYING"]:
print(f"Pipeline is actively working: {current_status}")
# Check if we've been monitoring too long
if time.time() - monitoring_start_time > max_monitoring_time:
print("โฐ Maximum monitoring time reached")
break
except Exception as e:
print(f"Error monitoring pipeline: {e}")
break
# Use shorter interval for test environment (5 seconds instead of 30)
print("โณ Waiting 5 seconds before next check...")
time.sleep(5)
return current_status
def cleanup_pipeline(api_client, organization_id, pipeline_id, source_connector_id):
"""Clean up created resources."""
print("\n๐งน Cleaning Up Resources")
try:
# Stop pipeline if running
pipelines_api = v.PipelinesApi(api_client)
try:
pipelines_api.stop_pipeline(organization_id, pipeline_id)
print(" โ
Pipeline stopped")
except:
pass
# Delete pipeline
try:
pipelines_api.delete_pipeline(organization_id, pipeline_id)
print(" โ
Pipeline deleted")
except Exception as e:
print(f" โ ๏ธ Could not delete pipeline: {e}")
# Delete source connector
try:
connectors_api = v.SourceConnectorsApi(api_client)
connectors_api.delete_source_connector(organization_id, source_connector_id)
print(" โ
Source connector deleted")
except Exception as e:
print(f" โ ๏ธ Could not delete source connector: {e}")
except Exception as e:
print(f" โ ๏ธ Cleanup warning: {e}")
def main():
"""Main function demonstrating pipeline creation and monitoring."""
print("=== Create and Monitor RAG Pipeline Example ===")
print("โฑ๏ธ Expected runtime: ~30-45 seconds\n")
try:
# Get configuration
configuration, organization_id = get_api_config()
print(f"โ๏ธ Configuration:")
print(f" Organization ID: {organization_id}")
print(f" Host: {configuration.host}\n")
# Get required connector IDs
ai_platform_connector_id = os.environ.get("VECTORIZE_AI_PLATFORM_CONNECTOR_ID")
destination_connector_id = os.environ.get("VECTORIZE_DESTINATION_CONNECTOR_ID")
if not ai_platform_connector_id or not destination_connector_id:
print("โ Missing required connector IDs")
print(" Please set:")
print(" - VECTORIZE_AI_PLATFORM_CONNECTOR_ID")
print(" - VECTORIZE_DESTINATION_CONNECTOR_ID")
print("\n๐ก These connectors are typically set up in the Vectorize dashboard")
sys.exit(1)
# Initialize API client
# Initialize API client with proper headers for local env
with v.ApiClient(configuration) as api_client:
pipeline_id = None
source_connector_id = None
try:
# Step 1: Create source connector
print("๐ Creating File Upload Connector")
source_connector_id = create_file_upload_connector(api_client, organization_id)
# Step 2: Create pipeline
print("โ๏ธ Creating RAG Pipeline")
pipeline_name = "RAG Pipeline Example"
pipeline_id = create_pipeline(
api_client, organization_id, source_connector_id,
pipeline_name, ai_platform_connector_id, destination_connector_id
)
print(f"โ
Pipeline '{pipeline_name}' created successfully!")
print(f" Pipeline ID: {pipeline_id}")
print(f" Source Connector: {source_connector_id}")
print(f" AI Platform: {ai_platform_connector_id}")
print(f" Destination: {destination_connector_id}\n")
# Step 3: Monitor pipeline
print("๐ Monitoring Pipeline Status")
print(" The pipeline will now deploy and begin listening for data...")
print(" This process may take a few minutes.\n")
monitoring_start_time = time.time()
max_monitoring_time = 45 # 45 seconds max for test environment
final_status = monitor_pipeline(
api_client, organization_id, pipeline_id,
monitoring_start_time, max_monitoring_time
)
# Step 4: Report final status
print(f"\n๐ Final Pipeline Status: {final_status}")
if final_status == "LISTENING":
print("๐ Pipeline is ready to process data!")
print(" You can now:")
print(" - Upload files to the source connector")
print(" - Use the pipeline for deep research queries")
print(" - Monitor processing through the dashboard")
elif final_status == "IDLE":
print("โ
Pipeline deployment completed successfully")
print(" Pipeline is in idle state and ready for data")
elif final_status in ["ERROR_DEPLOYING", "SHUTDOWN"]:
print("โ ๏ธ Pipeline encountered an issue")
print(" Check the Vectorize dashboard for detailed error information")
else:
print(f"โน๏ธ Pipeline is in {final_status} state")
print(" This may be normal depending on your configuration")
finally:
# Step 5: Clean up (optional)
if pipeline_id and source_connector_id:
print(f"\nโ Clean up resources?")
print(" Uncomment the cleanup section below to remove the created pipeline")
print(" For learning purposes, you may want to keep the pipeline active")
# Uncomment the line below if you want to clean up automatically
# cleanup_pipeline(api_client, organization_id, pipeline_id, source_connector_id)
except ValueError as e:
print(f"โ Configuration Error: {e}")
print("\n๐ก Make sure to set the required environment variables:")
print(" export VECTORIZE_ORGANIZATION_ID='your-org-id'")
print(" export VECTORIZE_API_KEY='your-api-key'")
print(" export VECTORIZE_AI_PLATFORM_CONNECTOR_ID='connector-id'")
print(" export VECTORIZE_DESTINATION_CONNECTOR_ID='connector-id'")
except Exception as e:
print(f"โ Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Required Environment Variables:
โข `VECTORIZE_API_KEY`
โข `VECTORIZE_ORGANIZATION_ID`
Additional Requirements:
โข Requires a source connector ID
โข `VECTORIZE_API_KEY`
โข `VECTORIZE_ORGANIZATION_ID`
Additional Requirements:
โข Requires a source connector ID
#!/usr/bin/env node
/**
* Complete example for creating and monitoring a pipeline.
* This is a hand-written example that corresponds to the test file:
* api-clients/javascript/tests/pipelines/create_pipeline.js
*
* IMPORTANT: Keep this file in sync with the test file's snippets!
*/
const vectorize = require('@vectorize-io/vectorize-client');
const fs = require('fs');
const path = require('path');
// For test environment, use test configuration
function getApiConfig() {
// Check if we're in test environment
if (process.env.VECTORIZE_TEST_MODE === 'true') {
const testConfigPath = path.join(__dirname, '../common/test_config.js');
if (fs.existsSync(testConfigPath)) {
const { getApiClient } = require(testConfigPath);
const { apiConfig, config } = getApiClient();
return { apiClient: apiConfig, organizationId: config.organization_id };
}
}
// Fall back to environment variables
const organizationId = process.env.VECTORIZE_ORGANIZATION_ID;
const apiKey = process.env.VECTORIZE_API_KEY;
if (!organizationId || !apiKey) {
throw new Error("Please set VECTORIZE_ORGANIZATION_ID and VECTORIZE_API_KEY environment variables");
}
// Always use production API
const basePath = 'https://api.vectorize.io/v1';
const configuration = new vectorize.Configuration({
basePath: basePath,
accessToken: apiKey
});
return { apiClient: configuration, organizationId };
}
async function main() {
console.log('=== Create and Monitor RAG Pipeline Example ===');
console.log('โฑ๏ธ Expected runtime: ~30-45 seconds\n');
// Initialize the API client
const { apiClient, organizationId } = getApiConfig();
// First create the required connectors for the pipeline
console.log('๐ Creating file upload source connector...');
const sourceConnectorsApi = new vectorize.SourceConnectorsApi(apiClient);
const sourceConnectorResponse = await sourceConnectorsApi.createSourceConnector({
organizationId: organizationId,
createSourceConnectorRequest: {
name: `pipeline-example-source-${Date.now()}`,
type: 'FILE_UPLOAD',
config: {}
}
});
const sourceConnectorId = sourceConnectorResponse.connector.id;
console.log(`โ
Created source connector: ${sourceConnectorId}`);
// For this example, we'll use default Vectorize connectors for AI platform and destination
// In a real application, you would have these configured already
console.log('๐ค Using default Vectorize AI platform connector...');
const aiPlatformConnectorId = process.env.VECTORIZE_AI_PLATFORM_CONNECTOR_ID || 'default-ai-platform-id';
console.log('๐ฏ Using default Vectorize destination connector...');
const destinationConnectorId = process.env.VECTORIZE_DESTINATION_CONNECTOR_ID || 'default-destination-id';
let pipelineId = null;
try {
// ============================================================================
// SNIPPET: create_pipeline
// Create a new RAG pipeline with source, AI platform, and destination connectors
// ============================================================================
console.log('\n๐ Creating pipeline: RAG Pipeline Example');
{
const pipelineName = `Example Pipeline - ${new Date().toISOString()}`;
const { PipelinesApi } = vectorize;
// Create pipelines client
const pipelinesApi = new PipelinesApi(apiClient);
// Define your pipeline configuration
const pipelineConfiguration = {
pipelineName: pipelineName,
sourceConnectors: [
{
id: sourceConnectorId,
type: "FILE_UPLOAD",
config: {}
}
],
destinationConnector: {
id: destinationConnectorId,
type: "VECTORIZE",
config: {}
},
aiPlatformConnector: {
id: aiPlatformConnectorId,
type: "VECTORIZE",
config: {}
},
schedule: { type: "manual" }
};
// Create the pipeline
try {
const response = await pipelinesApi.createPipeline({
organizationId: organizationId,
pipelineConfigurationSchema: pipelineConfiguration
});
pipelineId = response.data.id;
console.log(`Pipeline created successfully! ID: ${pipelineId}`);
} catch (error) {
console.log(`Error creating pipeline: ${error.message}`);
throw error;
}
}
// ============================================================================
// SNIPPET: monitor_pipeline
// Monitor pipeline status and metrics over time
// ============================================================================
console.log('\n๐ Starting pipeline monitoring...');
{
const monitoringStartTime = Date.now();
const maxMonitoringTime = 45000; // 45 seconds max for test environment (in milliseconds)
let checkCount = 0;
const { PipelinesApi } = vectorize;
// Create pipelines client
const pipelinesApi = new PipelinesApi(apiClient);
while (true) {
checkCount++;
try {
// Get pipeline status
const pipeline = await pipelinesApi.getPipeline({
organizationId: organizationId,
pipelineId: pipelineId
});
const currentStatus = pipeline.data.status;
console.log(`Check #${checkCount} - Pipeline status: ${currentStatus}`);
// Get pipeline metrics
const metrics = await pipelinesApi.getPipelineMetrics({
organizationId: organizationId,
pipelineId: pipelineId
});
if (metrics.data && Array.isArray(metrics.data)) {
// Sum up all the metrics
const totalNew = metrics.data.reduce((sum, m) => sum + (m.newObjects || 0), 0);
const totalChanged = metrics.data.reduce((sum, m) => sum + (m.changedObjects || 0), 0);
const totalDeleted = metrics.data.reduce((sum, m) => sum + (m.deletedObjects || 0), 0);
console.log(` ๐ Total new objects: ${totalNew}`);
console.log(` ๐ Total changed objects: ${totalChanged}`);
console.log(` ๐ Total deleted objects: ${totalDeleted}`);
// Get today's metrics if available
if (metrics.data.length > 0) {
const latest = metrics.data[metrics.data.length - 1];
console.log(` ๐
Latest (${latest.timestamp}):`);
console.log(` New: ${latest.newObjects || 0}, Changed: ${latest.changedObjects || 0}, Deleted: ${latest.deletedObjects || 0}`);
}
}
// Check for terminal states
if (["IDLE", "HIBERNATING", "OVERQUOTA", "ERROR_DEPLOYING", "SHUTDOWN"].includes(currentStatus)) {
console.log(`Pipeline reached terminal state: ${currentStatus}`);
break;
}
// Pipeline is listening
if (["LISTENING"].includes(currentStatus)) {
console.log(`Pipeline is ready and listening for new data`);
break;
}
// Pipeline is actively processing
if (["PROCESSING", "DEPLOYING"].includes(currentStatus)) {
console.log(`Pipeline is actively working: ${currentStatus}`);
}
// Check if we've been monitoring too long
if (Date.now() - monitoringStartTime > maxMonitoringTime) {
console.log("โฐ Maximum monitoring time reached");
break;
}
} catch (error) {
console.log(`Error monitoring pipeline: ${error.message}`);
break;
}
// Use shorter interval for test environment (5 seconds instead of 30)
console.log('โณ Waiting 5 seconds before next check...');
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
} finally {
// Clean up: Delete the pipeline and source connector
console.log('\n๐งน Cleaning up...');
if (pipelineId) {
try {
const pipelinesApi = new vectorize.PipelinesApi(apiClient);
await pipelinesApi.deletePipeline({
organizationId: organizationId,
pipelineId: pipelineId
});
console.log('โ
Deleted test pipeline');
} catch (error) {
console.error('Warning: Could not delete test pipeline:', error.message);
process.exit(1); }
}
try {
await sourceConnectorsApi.deleteSourceConnector({
organizationId: organizationId,
sourceConnectorId: sourceConnectorId
});
console.log('โ
Deleted test source connector');
} catch (error) {
console.error('Warning: Could not delete test source connector:', error.message);
process.exit(1); }
}
}
// Run the example
if (require.main === module) {
main().catch(error => {
console.error('โ Error:', error);
process.exit(1);
});
}
module.exports = { main };