Pipelines
📢 Note: The API is currently in Beta.
In Vectorize you can create pipelines to ingest data from multiple sources into a Vector Database. In this guide, we will deploy a pipeline that will ingest a local file.
Source: Create a File Upload connector
Make sure to include the code and imports from the Getting Started page.
First, we create a File Upload connector that will hold our file.
- Python
- Node.js
connectors_api = v.ConnectorsApi(api)
response = connectors_api.create_source_connector(org, [v.CreateSourceConnector(
name="From api",
type=v.SourceConnectorType.FILE_UPLOAD)
]
)
source_connector_id = response.connectors[0].id
const sourceResponse = await connectorsApi.createSourceConnector({
organization: org,
createSourceConnector: [
{type: SourceConnectorType.FileUpload, name: "My first upload connector"}
]
});
const sourceConnectorId = sourceResponse.connectors[0].id;
Then, we can upload the file:
- Python
- Node.js
import urllib3, json, os
file_path = "path/to/file.pdf"
http = urllib3.PoolManager()
uploads_api = v.UploadsApi(api)
metadata = {"created-from-api": True}
upload_response = uploads_api.start_file_upload_to_connector(
org, source_connector_id, v.StartFileUploadToConnectorRequest(
name=file_path.split("/")[-1],
content_type="application/pdf",
# add additional metadata that will be stored along with each chunk in the vector database
metadata=json.dumps(metadata))
)
with open(file_path, "rb") as f:
response = http.request("PUT", upload_response.upload_url, body=f, headers={"Content-Type": "application/pdf", "Content-Length": str(os.path.getsize(file_path))})
if response.status != 200:
print("Upload failed: ", response.data)
else:
print("Upload successful")
const fileBuffer = fs.readFileSync("path/to/file.pdf");
const uploadResponse = await uploadsApi.startFileUploadToConnector({
organization: org,
connectorId: sourceConnectorId,
startFileUploadToConnectorRequest: {
name: "file.pdf",
contentType: "application/pdf",
// add additional metadata that will be stored along with each chunk in the vector database
metadata: JSON.stringify({"mymeta": true})
}
})
const fetchResponse = await fetch(uploadResponse.uploadUrl, {
method: 'PUT',
body: fileBuffer,
headers: {
'Content-Type': 'application/pdf'
},
});
if (!fetchResponse.ok) {
throw new Error(`Failed to upload file: ${fetchResponse.statusText}`);
}
AI Platform and Vector Database
We will use the Built-In
AI Platform and Vector Database.
Since they already exist in the platform, we need to retrieve their IDs.
- Python
- Node.js
ai_platforms = connectors_api.get_ai_platform_connectors(org)
builtin_ai_platform = [c.id for c in ai_platforms.ai_platform_connectors if c.type == v.AIPlatformType.VECTORIZE][0]
vector_databases = connectors_api.get_destination_connectors(org)
builtin_vector_db = [c.id for c in vector_databases.destination_connectors if c.type == v.DestinationConnectorType.VECTORIZE][0]
const aiPlatformResponse = await connectorsApi.getAIPlatformConnectors({
organization: org
});
const builtinAIPlatformId = aiPlatformResponse.aiPlatformConnectors.find((connector) => connector.type === AIPlatformType.Vectorize).id;
const destinationResponse = await connectorsApi.getDestinationConnectors({
organization: org
});
const builtinVectorDatabaseId = destinationResponse.destinationConnectors.find((connector) => connector.type === DestinationConnectorType.Vectorize).id;
Configure and deploy the pipeline
Now we need to configure the pipeline property and deploy it.
- Python
- Node.js
response = pipelines.create_pipeline(org, v.PipelineConfigurationSchema(
source_connectors=[v.SourceConnectorSchema(id=source_connector_id, type=v.SourceConnectorType.FILE_UPLOAD, config={})],
destination_connector=v.DestinationConnectorSchema(id=builtin_vector_db, type=v.DestinationConnectorType.VECTORIZE, config={}),
ai_platform=v.AIPlatformSchema(id=builtin_ai_platform, type=v.AIPlatformType.VECTORIZE, config={}),
pipeline_name="My Pipeline From API",
schedule=v.ScheduleSchema(type="manual")
))
pipeline_id = response.data.id
pipeline_id
const response = await pipelinesApi.createPipeline({
organization: org,
pipelineConfigurationSchema: {
pipelineName: "My Pipeline From API",
sourceConnectors: [{id: sourceConnectorId, type: SourceConnectorType.FileUpload, config: {}}],
destinationConnector: {
id: builtinVectorDatabaseId,
type: DestinationConnectorType.Vectorize,
config: {}
},
aiPlatform: {
id: builtinAIPlatformId,
type: AIPlatformType.Vectorize,
config: {}
},
schedule: {type: "manual"}
}
})
const pipelineId = response.data.id;
console.log(`Pipeline ID: ${pipelineId}`)
The pipeline will be deployed and our file will be ingested into the Vector Database.
Next steps
Now you can either decide to perform a Vector Search or generate a Private Deep Research.