Skip to main content

Make Your AI Smarter with Metadata

In this guide, you'll learn how to enhance your retrieval pipeline with metadata to create more precise, context-aware results. By adding metadata to your documents, you can filter search results, improve relevance, and build retrieval pipelines with richer filtering and context for your connected LLM or application.

Prerequisites

Before you begin, you'll need:

  1. A Vectorize account
  2. An API access token (how to create one)
  3. Your organization ID (see below)

Finding your Organization ID

Your organization ID is in the Vectorize platform URL:

https://platform.vectorize.io/organization/[YOUR-ORG-ID]

For example, if your URL is:

https://platform.vectorize.io/organization/ecf3fa1d-30d0-4df1-8af6-f4852bc851cb

Your organization ID is: ecf3fa1d-30d0-4df1-8af6-f4852bc851cb

API Client Setup

import vectorize_client as v
import os

# Get credentials from environment variables
organization_id = os.environ.get("VECTORIZE_ORGANIZATION_ID")
api_key = os.environ.get("VECTORIZE_API_KEY")

if not organization_id or not api_key:
raise ValueError("Please set VECTORIZE_ORGANIZATION_ID and VECTORIZE_API_KEY environment variables")

# Initialize the API client
configuration = v.Configuration(
host="https://api.vectorize.io",
api_key={"ApiKeyAuth": api_key}
)
api = v.ApiClient(configuration)

print(f"✅ API client initialized for organization: {organization_id}")

What You'll Build

You'll create a RAG pipeline that processes documents with rich metadata, enabling you to:

  • Filter searches by project, document type, or any custom field
  • Build a context-aware retrieval pipeline that understands document relationships
  • Create project-based or category-specific search experiences

Understanding Metadata in RAG

Metadata is additional information about your documents that helps retrieval systems understand context. Think of it like labels on file folders - they help you find exactly what you need without opening every folder.

For example, a technical document might have metadata like:

  • Project: apollo
  • Document Type: requirements
  • Status: approved
  • Year: 2024
  • Priority: high

With metadata, your queries can answer things like:

  • "What are the apollo project requirements?" (filters by project)
  • "Show me approved documents from 2024" (filters by status and year)

Step 1: Create a File Upload Connector

First, create a connector to upload your documents:

import vectorize_client as v

# Create the connectors API client
connectors_api = v.SourceConnectorsApi(apiClient)

try:
# Create a file upload connector
file_upload = v.FileUpload(
name="metadata-enhanced-documents",
type="FILE_UPLOAD",
config={}
)

request = v.CreateSourceConnectorRequest(file_upload)
response = connectors_api.create_source_connector(
organization_id,
request
)

connector_id = response.connector.id
print(f"✅ Created file upload connector: {connector_id}")

except Exception as e:
print(f"❌ Error creating connector: {e}")
raise

Step 2: Upload Documents with Metadata

Now upload documents while attaching metadata as a JSON string:

import vectorize_client as v
import os
import json
import urllib3

# Create uploads API client
uploads_api = v.UploadsApi(apiClient)
http = urllib3.PoolManager()

# Example: Upload a document with its metadata
# Download sample files from: /files/metadata-sample-docs.zip

file_path = "metadata-sample-docs/product_requirements.txt"
# For testing: use full path if available
if 'test_data_dir' in locals():
file_path = str(test_data_dir / file_path)
metadata = {
"document_type": "requirements",
"project": "apollo",
"year": "2024",
"status": "approved",
"created_date": "2024-01-15",
"priority": "high"
}

try:
# Convert metadata to JSON string
metadata_json = json.dumps(metadata)

# Step 1: Get upload URL with metadata
upload_request = v.StartFileUploadToConnectorRequest(
name=os.path.basename(file_path), # Just the filename, not the full path
content_type="text/plain",
metadata=metadata_json # Metadata as JSON string
)

start_response = uploads_api.start_file_upload_to_connector(
organization_id,
source_connector_id,
start_file_upload_to_connector_request=upload_request
)

# Step 2: Upload file
with open(file_path, "rb") as f:
response = http.request(
"PUT",
start_response.upload_url,
body=f,
headers={
"Content-Type": "text/plain",
"Content-Length": str(os.path.getsize(file_path))
}
)

if response.status == 200:
print(f"✅ Uploaded: {file_path}")
print(f" Metadata: {list(metadata.keys())}")

except Exception as e:
print(f"❌ Error uploading {file_path}: {e}")

# Repeat for other documents with different metadata
# See the sample files for more examples

How Metadata Works

Vectorize stores your metadata with each chunk of your document, so filters are applied before retrieval returns results to your model.

When you upload a document with metadata:

  1. The metadata is stored as a JSON string during upload.
  2. Vectorize preserves this metadata with each chunk of your document.
  3. You can filter searches using these metadata fields.

Note: Metadata values should use consistent types across documents (e.g., "year": "2024" as a string everywhere).

Metadata Best Practices

  • Use consistent field names and types - Always use the same casing and data types
  • Keep values simple - Stick to strings and numbers for maximum compatibility
  • Plan your schema before uploading - Design your metadata structure upfront
  • Include only fields you'll use in queries - Avoid metadata bloat that won't be filtered on
  • Document your schema - Keep a reference of allowed fields and values for your team

Step 3: Create Your Pipeline

Create a pipeline just like in the first guide. No special configuration is required for user-defined metadata - Vectorize automatically handles it:

import vectorize_client as v

# Create pipelines API client
pipelines_api = v.PipelinesApi(apiClient)

try:
# Create pipeline - metadata handling is automatic
pipeline_config = v.PipelineConfigurationSchema(
pipeline_name="Metadata-Enhanced RAG Pipeline",
source_connectors=[
v.PipelineSourceConnectorSchema(
id=source_connector_id,
type="FILE_UPLOAD",
config={} # No special config needed for metadata
)
],
ai_platform_connector=v.PipelineAIPlatformConnectorSchema(
id=ai_platform_connector_id,
type="VECTORIZE",
config={}
),
destination_connector=v.PipelineDestinationConnectorSchema(
id=destination_connector_id,
type="VECTORIZE",
config={}
),
schedule=v.ScheduleSchema(type="manual")
)

# Create the pipeline
response = pipelines_api.create_pipeline(
organization_id,
pipeline_config
)

pipeline_id = response.data.id
print(f"✅ Created pipeline: {pipeline_id}")
print(f" Documents with metadata will be automatically processed")
print(f" Metadata will be preserved and searchable")

except Exception as e:
print(f"❌ Error creating pipeline: {e}")
raise

For automatic metadata extraction, refer to Automatic Metadata Extraction.

Step 4: Wait for Processing

Monitor your pipeline until it's ready:

import vectorize_client as v
import time

# Create pipelines API client
pipelines_api = v.PipelinesApi(apiClient)

print("Waiting for metadata extraction and indexing...")
max_wait_time = 300
start_time = time.time()

while True:
try:
pipeline = pipelines_api.get_pipeline(organization_id, pipeline_id)
status = pipeline.data.status

if status == "LISTENING":
print("✅ Pipeline ready with metadata indexes!")
break
elif status in ["ERROR_DEPLOYING", "SHUTDOWN"]:
print(f"❌ Pipeline error: {status}")
break

if time.time() - start_time > max_wait_time:
print("⏰ Timeout waiting for pipeline")
break

time.sleep(10)

except Exception as e:
print(f"❌ Error checking status: {e}")
break

Step 5: Query Without Metadata Filters

First, query without any filters to see baseline behavior:

import vectorize_client as v

# Create pipelines API client
pipelines_api = v.PipelinesApi(apiClient)

try:
# Query without any metadata filters
response = pipelines_api.retrieve_documents(
organization_id,
pipeline_id,
v.RetrieveDocumentsRequest(
question="What are the technical requirements for the AI search?",
num_results=5
)
)

# Display results
print(f"Query: 'What are the technical requirements for the AI search?'")
print(f"Results without filtering (searches all documents):\n")

for i, doc in enumerate(response.documents, 1):
print(f"Result {i}:")
print(f" Content: {doc.text[:150]}...")
print(f" Relevance Score: {doc.relevancy}")
print(f" Document ID: {doc.id}")
# Show metadata if available
if hasattr(doc, 'metadata') and doc.metadata:
print(f" Project: {doc.metadata.get('project', 'N/A')}")
print(f" Year: {doc.metadata.get('year', 'N/A')}")
print(f" Status: {doc.metadata.get('status', 'N/A')}")
print()

except Exception as e:
print(f"❌ Error querying pipeline: {e}")
raise

Without filters, your search might return:

  • Marketing documents when you wanted technical specs.
  • Draft content mixed with approved versions.
  • Results from all departments.

Step 6: Query With Metadata Filters

Now query using metadata filters for precise results:

import vectorize_client as v

# Create pipelines API client
pipelines_api = v.PipelinesApi(apiClient)

try:
# Query with project and date-based filters
response = pipelines_api.retrieve_documents(
organization_id,
pipeline_id,
v.RetrieveDocumentsRequest(
question="What are the technical requirements for the AI search?",
num_results=5,
metadata_filters=[
{
"metadata.project": ["apollo", "mercury"] # Project tags
},
{
"metadata.year": ["2024", "2025"] # Target years
},
{
"metadata.status": ["approved", "published"] # Only finalized docs
}
]
)
)

# Display filtered results
print(f"Query: 'What are the technical requirements for the AI search?'")
print(f"Filters: project IN (apollo, mercury) AND year IN (2024, 2025) AND status IN (approved, published)")
print(f"Results (recent approved docs from specific projects):\n")

for i, doc in enumerate(response.documents, 1):
print(f"Result {i}:")
print(f" Content: {doc.text[:150]}...")
print(f" Relevance Score: {doc.relevancy}")
print(f" Document ID: {doc.id}")
# Show metadata to confirm filtering worked
if hasattr(doc, 'metadata') and doc.metadata:
print(f" Project: {doc.metadata.get('project', 'N/A')}")
print(f" Year: {doc.metadata.get('year', 'N/A')}")
print(f" Status: {doc.metadata.get('status', 'N/A')}")
print(f" Document Type: {doc.metadata.get('document_type', 'N/A')}")
print()

except Exception as e:
print(f"❌ Error querying pipeline: {e}")
raise

Metadata Filter Syntax

  • Use the metadata. prefix for user-defined fields.
  • Provide values as arrays (even for single values).
  • Multiple values for the same key use OR logic.
  • Different keys use AND logic.

Example filter structure:

[
{ "metadata.project": ["apollo", "mercury"] },
{ "metadata.status": ["approved"] }
]

This filter returns documents from either apollo OR mercury projects that are also approved.

Step 7: Compare the Impact of Metadata

Let's see the dramatic difference metadata filtering makes. Here's what you might see:

Without Metadata Filters

Query: "What are our API rate limits?"

Results might include:
- Marketing blog post mentioning API limits
- Draft engineering spec with outdated limits
- Customer FAQ about rate limits
- Internal discussion about changing limits

With Metadata Filters

Query: "What are our API rate limits?"
Filters: project=apollo, status=approved

Results now include only:
- Approved apollo project documentation with current rate limits
- Official API specification documents from the apollo project

The filtered results are more accurate, authoritative, and relevant to your needs.

Using Visual Schema Editor (Optional)

For pipelines that use the Iris model, Vectorize includes a Visual Schema Editor that can automatically extract metadata based on defined schemas. This is especially useful when you have consistent document structures.

When to Use Automatic Metadata Extraction

  • Structured documents: Technical specs, contracts, reports with consistent sections
  • Standardized formats: Documents following templates
  • Large volumes: When manual metadata tagging isn't practical

Enabling Automatic Extraction

  1. Navigate to your pipeline in the Vectorize platform
  2. Click on the Schema tab
  3. Use the Visual Schema Editor to define extraction rules
  4. Save and redeploy your pipeline

The schema editor allows you to:

  • Define metadata fields to extract
  • Set extraction rules based on document structure
  • Preview extraction results
  • Combine with manual metadata

Best Practices for Metadata

1. Keep It Consistent

# Good: Consistent types and values
metadata = {
"project": "apollo", # Always lowercase
"year": "2024", # Always string
"status": "approved" # Consistent values
}

# Bad: Inconsistent types and formats
metadata = {
"project": "Apollo", # Sometimes capital (BAD!)
"year": 2024, # Sometimes number
"status": "Approved" # Inconsistent casing
}

2. Plan Your Schema

Before uploading documents, decide on:

  • Essential metadata fields (3-7 is usually optimal)
  • Allowed values for each field
  • Naming conventions (use lowercase with underscores)

3. Use Metadata for Business Logic

# Filter for recent, approved documents in selected projects
filters = [
{"metadata.project": ["apollo", "mercury"]},
{"metadata.year": ["2024", "2025"]},
{"metadata.status": ["approved", "published"]}
]

What's Next?

You've now built a metadata-enhanced RAG pipeline that can:

  • Process documents with rich context
  • Filter results based on business needs
  • Provide more accurate, relevant answers

Next Steps

  • For simple use cases: You're ready to deploy! Start uploading your documents with metadata.
  • For complex scenarios: Explore automatic metadata extraction for large document sets.

Quick Tips

  1. Start with 3-5 metadata fields and expand as needed
  2. Test your metadata filters with diverse queries
  3. Monitor which metadata fields provide the most value
  4. Consider combining manual and automatic metadata extraction

Congratulations! You've learned how to make your AI significantly smarter with metadata. Your RAG pipeline can now provide contextual, filtered responses that match your specific business needs.

Complete Example

Here's all the code from this guide combined into a complete, runnable example:

Required Environment Variables:
• `VECTORIZE_API_KEY`
• `VECTORIZE_ORGANIZATION_ID`

Required Files:
• `project_apollo.txt` • Document with project metadata
• `project_mercury.txt` • Document with project metadata
#!/usr/bin/env python3
"""
Complete example for making your AI smarter with metadata enrichment.
This is a hand-written example that corresponds to the test file:
api-clients/python/tests/developer_journeys/make_your_ai_smarter_with_metadata.py

IMPORTANT: Keep this file in sync with the test file's snippets!

This example shows how to:
1. Create a file upload connector
2. Upload documents with rich metadata
3. Create a metadata-aware pipeline
4. Query without metadata filters (baseline)
5. Query with specific metadata filters for targeted results
"""

import os
import sys
import time
import json
import urllib3
from pathlib import Path
import vectorize_client as v


def get_api_config():
"""Get API configuration from environment variables."""
organization_id = os.environ.get("VECTORIZE_ORGANIZATION_ID")
api_key = os.environ.get("VECTORIZE_API_KEY")

if not organization_id or not api_key:
print("🔑 Setup required:")
print("1. Get your API key from: https://app.vectorize.io/settings")
print("2. Set environment variables:")
print(" export VECTORIZE_ORGANIZATION_ID='your-org-id'")
print(" export VECTORIZE_API_KEY='your-api-key'")
sys.exit(1)

# Always use production API
configuration = v.Configuration(
host="https://api.vectorize.io/v1",
access_token=api_key
)

return configuration, organization_id


def create_connector(api_client, organization_id):
"""Create a file upload connector for metadata-enhanced documents."""
print("📁 Step 1: Create a File Upload Connector")

# Create the connectors API client
connectors_api = v.SourceConnectorsApi(api_client)

try:
# Create a file upload connector
file_upload = v.FileUpload(
name="metadata-enhanced-documents",
type="FILE_UPLOAD",
config={}
)

request = v.CreateSourceConnectorRequest(file_upload)
response = connectors_api.create_source_connector(
organization_id,
request
)

connector_id = response.connector.id
print(f"✅ Created file upload connector: {connector_id}")

return connector_id

except Exception as e:
print(f"❌ Error creating connector: {e}")
raise


def create_sample_documents():
"""Create sample documents with rich metadata for demonstration."""
documents = [
{
"filename": "product_requirements.txt",
"content": """# Product Requirements Document - Apollo Project

## Executive Summary
This document outlines the requirements for the Apollo AI-powered search platform,
designed to revolutionize how users find and interact with information.

## Core Features
1. Semantic search capabilities
2. Real-time result ranking
3. Multi-language support
4. Advanced filtering options

## Technical Requirements
- Sub-100ms response times
- 99.9% uptime SLA
- Scalable to 1M+ queries/day
- RESTful API architecture

## Success Metrics
- User satisfaction > 85%
- Search accuracy > 90%
- Average response time < 50ms

## Timeline
Phase 1: Q1 2024 - Core search functionality
Phase 2: Q2 2024 - Advanced filtering
Phase 3: Q3 2024 - Multi-language support

## Budget
Estimated development cost: $2.5M
Annual operational cost: $500K
""",
"metadata": {
"document_type": "requirements",
"project": "apollo",
"year": "2024",
"status": "approved",
"created_date": "2024-01-15",
"priority": "high",
"department": "product",
"author": "product_team"
}
},
{
"filename": "marketing_strategy.txt",
"content": """# Marketing Strategy - Mercury Campaign

## Campaign Overview
The Mercury campaign aims to establish market leadership in the AI search space
through targeted outreach and thought leadership.

## Target Audience
- Enterprise technology buyers
- Data science teams
- Product managers
- Technical decision makers

## Channel Strategy
1. Content marketing (blog posts, whitepapers)
2. Conference speaking and sponsorships
3. Webinar series on AI search trends
4. Strategic partnerships

## Key Messages
- "Fastest AI search on the market"
- "Enterprise-grade security and compliance"
- "Seamless integration with existing tools"

## Budget Allocation
- Content creation: 40%
- Events and conferences: 30%
- Digital advertising: 20%
- Partnership development: 10%

## Success Metrics
- Lead generation: 500 qualified leads/month
- Brand awareness: 25% increase in 6 months
- Pipeline contribution: $5M ARR
- Content engagement: 15% average engagement rate

## Timeline
Q1 2024: Campaign launch and initial content
Q2 2024: Major conference presence
Q3 2024: Partnership announcements
Q4 2024: Results analysis and planning for 2025
""",
"metadata": {
"document_type": "strategy",
"project": "mercury",
"year": "2024",
"status": "published",
"created_date": "2024-02-01",
"priority": "medium",
"department": "marketing",
"author": "marketing_team"
}
},
{
"filename": "technical_architecture.txt",
"content": """# Technical Architecture Document

## System Overview
This document describes the technical architecture for our next-generation
search platform, codenamed "Apollo".

## Architecture Components

### API Gateway
- Rate limiting and authentication
- Request/response transformation
- Load balancing across search nodes

### Search Engine Core
- Vector similarity search using Faiss
- Hybrid search combining vector and keyword matching
- Real-time indexing pipeline

### Database Layer
- Vector embeddings storage (PostgreSQL with pgvector)
- Metadata storage (PostgreSQL)
- Search logs and analytics (ClickHouse)

### Machine Learning Pipeline
- Document embedding generation (OpenAI Ada-002)
- Query embedding generation
- Relevance ranking models
- A/B testing framework for ranking improvements

### Infrastructure
- Kubernetes deployment on AWS EKS
- Auto-scaling based on query volume
- Multi-zone deployment for high availability
- CDN for static assets and caching

## Security Considerations
- End-to-end encryption for all data
- Role-based access control (RBAC)
- API key management and rotation
- Audit logging for all operations

## Performance Requirements
- Query response time: <100ms p95
- Indexing throughput: 10,000 docs/minute
- Concurrent queries: 1,000 queries/second
- Storage: Up to 10TB of indexed content

## Monitoring and Observability
- Distributed tracing with Jaeger
- Metrics collection with Prometheus
- Log aggregation with ELK stack
- Custom dashboards for business metrics
""",
"metadata": {
"document_type": "architecture",
"project": "apollo",
"year": "2024",
"status": "approved",
"created_date": "2024-01-30",
"priority": "high",
"department": "engineering",
"author": "tech_lead"
}
}
]

return documents


def upload_document_with_metadata(api_client, organization_id, source_connector_id):
"""Upload documents with rich metadata to improve search relevance."""
print("📄 Step 2: Upload Documents with Rich Metadata")

# Create uploads API client
uploads_api = v.UploadsApi(api_client)
http = urllib3.PoolManager()

# Get sample documents
documents = create_sample_documents()

uploaded_count = 0

for doc in documents:
try:
# Write document to temporary file
temp_path = f"/tmp/{doc['filename']}"
with open(temp_path, 'w') as f:
f.write(doc['content'])

# Convert metadata to JSON string
metadata_json = json.dumps(doc['metadata'])

# Step 1: Get upload URL with metadata
upload_request = v.StartFileUploadToConnectorRequest(
name=doc['filename'], # Just the filename, not the full path
content_type="text/plain",
metadata=metadata_json # Metadata as JSON string
)

start_response = uploads_api.start_file_upload_to_connector(
organization_id,
source_connector_id,
start_file_upload_to_connector_request=upload_request
)

# Step 2: Upload file
with open(temp_path, "rb") as f:
response = http.request(
"PUT",
start_response.upload_url,
body=f,
headers={
"Content-Type": "text/plain",
"Content-Length": str(os.path.getsize(temp_path))
}
)

if response.status == 200:
print(f"✅ Uploaded: {doc['filename']}")
print(f" Project: {doc['metadata']['project']}")
print(f" Department: {doc['metadata']['department']}")
print(f" Status: {doc['metadata']['status']}")
uploaded_count += 1

# Clean up temporary file
os.unlink(temp_path)

except Exception as e:
print(f"❌ Error uploading {doc['filename']}: {e}")
# Clean up temp file if it exists
temp_path = f"/tmp/{doc['filename']}"
if os.path.exists(temp_path):
os.unlink(temp_path)

print(f"\n✅ Successfully uploaded {uploaded_count} documents with metadata")
return uploaded_count


def create_pipeline(api_client, organization_id, source_connector_id):
"""Create a pipeline for metadata-enhanced documents."""
print("🔧 Step 3: Create a Pipeline for Metadata-Enhanced Documents")

# Get system connector IDs from environment
ai_platform_connector_id = os.environ.get('VECTORIZE_AI_PLATFORM_CONNECTOR_ID_VECTORIZE')
destination_connector_id = os.environ.get('VECTORIZE_DESTINATION_CONNECTOR_ID_VECTORIZE')

pipelines_api = v.PipelinesApi(api_client)

try:
# Create pipeline - metadata handling is automatic
pipeline_config = v.PipelineConfigurationSchema(
pipeline_name="Metadata-Enhanced RAG Pipeline",
source_connectors=[
v.PipelineSourceConnectorSchema(
id=source_connector_id,
type="FILE_UPLOAD",
config={} # No special config needed for metadata
)
],
ai_platform_connector=v.PipelineAIPlatformConnectorSchema(
id=ai_platform_connector_id,
type="VECTORIZE",
config={}
),
destination_connector=v.PipelineDestinationConnectorSchema(
id=destination_connector_id,
type="VECTORIZE",
config={}
),
schedule=v.ScheduleSchema(type="manual")
)

# Create the pipeline
response = pipelines_api.create_pipeline(
organization_id,
pipeline_config
)

pipeline_id = response.data.id
print(f"✅ Created pipeline: {pipeline_id}")
print(f" Documents with metadata will be automatically processed")
print(f" Metadata will be preserved and searchable")

return pipeline_id

except Exception as e:
print(f"❌ Error creating pipeline: {e}")
raise


def wait_for_processing(api_client, organization_id, pipeline_id):
"""Wait for the pipeline to be ready and process documents with metadata."""
print("⏳ Step 4: Wait for Processing")

# Create pipelines API client
pipelines_api = v.PipelinesApi(api_client)

print("Waiting for metadata extraction and indexing...")
max_wait_time = 300
start_time = time.time()

while True:
try:
pipeline = pipelines_api.get_pipeline(organization_id, pipeline_id)
status = pipeline.data.status

print(f"Pipeline status: {status}")

if status == "LISTENING":
print("✅ Pipeline ready with metadata indexes!")
break
elif status in ["ERROR_DEPLOYING", "SHUTDOWN"]:
print(f"❌ Pipeline error: {status}")
raise Exception(f"Pipeline failed with status: {status}")

if time.time() - start_time > max_wait_time:
print("⏰ Timeout waiting for pipeline")
raise Exception("Pipeline processing timeout")

time.sleep(10)

except Exception as e:
if "Pipeline failed" in str(e) or "timeout" in str(e):
raise
print(f"❌ Error checking status: {e}")
break


def query_without_filters(api_client, organization_id, pipeline_id):
"""Query the pipeline without any metadata filters for comparison."""
print("🔍 Step 5: Query Without Metadata Filters")

# Create pipelines API client
pipelines_api = v.PipelinesApi(api_client)

try:
# Query without any metadata filters
response = pipelines_api.retrieve_documents(
organization_id,
pipeline_id,
v.RetrieveDocumentsRequest(
question="What are the technical requirements for the AI search?",
num_results=5
)
)

# Display results
print(f"Query: 'What are the technical requirements for the AI search?'")
print(f"Results without filtering (searches all documents):\n")

for i, doc in enumerate(response.documents, 1):
print(f"Result {i}:")
print(f" Content: {doc.text[:150]}...")
print(f" Relevance Score: {doc.relevancy}")
print(f" Document ID: {doc.id}")
# Show metadata if available
if hasattr(doc, 'metadata') and doc.metadata:
print(f" Project: {doc.metadata.get('project', 'N/A')}")
print(f" Year: {doc.metadata.get('year', 'N/A')}")
print(f" Status: {doc.metadata.get('status', 'N/A')}")
print(f" Department: {doc.metadata.get('department', 'N/A')}")
print()

return len(response.documents)

except Exception as e:
print(f"❌ Error querying pipeline: {e}")
raise


def query_with_filters(api_client, organization_id, pipeline_id):
"""Query the pipeline with specific metadata filters for targeted results."""
print("🎯 Step 6: Query With Metadata Filters")

# Create pipelines API client
pipelines_api = v.PipelinesApi(api_client)

# Demonstrate different filtering scenarios
filter_scenarios = [
{
"name": "Project-specific search",
"description": "Find technical requirements only from the Apollo project",
"filters": [
{
"metadata.project": ["apollo"]
},
{
"metadata.document_type": ["requirements", "architecture"]
}
]
},
{
"name": "Department and status filtering",
"description": "Find approved documents from engineering",
"filters": [
{
"metadata.department": ["engineering", "product"]
},
{
"metadata.status": ["approved"]
}
]
},
{
"name": "Time-based filtering",
"description": "Find recent high-priority documents",
"filters": [
{
"metadata.year": ["2024"]
},
{
"metadata.priority": ["high"]
}
]
}
]

for scenario in filter_scenarios:
print(f"\n{'='*60}")
print(f"📋 {scenario['name']}")
print(f"Description: {scenario['description']}")

try:
# Query with metadata filters
response = pipelines_api.retrieve_documents(
organization_id,
pipeline_id,
v.RetrieveDocumentsRequest(
question="What are the technical requirements for the AI search?",
num_results=5,
metadata_filters=scenario['filters']
)
)

# Display filtered results
print(f"Query: 'What are the technical requirements for the AI search?'")
filter_desc = " AND ".join([
f"{list(f.keys())[0]} IN ({', '.join(list(f.values())[0])})"
for f in scenario['filters']
])
print(f"Filters: {filter_desc}")
print(f"Results: {len(response.documents)} documents found\n")

if response.documents:
for i, doc in enumerate(response.documents, 1):
print(f"Result {i}:")
print(f" Content: {doc.text[:120]}...")
print(f" Relevance Score: {doc.relevancy}")
print(f" Document ID: {doc.id}")
# Show metadata to confirm filtering worked
if hasattr(doc, 'metadata') and doc.metadata:
print(f" Project: {doc.metadata.get('project', 'N/A')}")
print(f" Year: {doc.metadata.get('year', 'N/A')}")
print(f" Status: {doc.metadata.get('status', 'N/A')}")
print(f" Document Type: {doc.metadata.get('document_type', 'N/A')}")
print(f" Department: {doc.metadata.get('department', 'N/A')}")
print(f" Priority: {doc.metadata.get('priority', 'N/A')}")
print()
else:
print(" No documents matched the specified filters")

except Exception as e:
print(f"❌ Error querying pipeline for scenario '{scenario['name']}': {e}")
continue

print(f"\n✅ Successfully demonstrated metadata filtering scenarios!")


def main():
"""Main function demonstrating metadata-enhanced RAG."""
print("🏷️ Make Your AI Smarter with Metadata\n")

# Initialize the API client
configuration, organization_id = get_api_config()

print(f"⚙️ Configuration:")
print(f" Organization ID: {organization_id}")
print(f" Host: {configuration.host}\n")

source_connector_id = None
pipeline_id = None

try:
# Initialize API client with proper headers for local env
with v.ApiClient(configuration) as api_client:
# Step 1: Create a file upload connector
source_connector_id = create_connector(api_client, organization_id)
print("")

# Step 2: Upload documents with metadata
uploaded_count = upload_document_with_metadata(api_client, organization_id, source_connector_id)
print("")

# Step 3: Create a metadata-aware pipeline
pipeline_id = create_pipeline(api_client, organization_id, source_connector_id)
print("")

# Step 4: Monitor processing
wait_for_processing(api_client, organization_id, pipeline_id)
print("")

# Step 5: Query without filters (baseline)
print(f"\n🎉 Metadata Enhancement Complete!")
print(f"\n📝 What you've learned:")
print("- How to upload documents with rich metadata")
print("- How metadata is automatically indexed by Vectorize")
print("- How to query without filters for baseline results")
print("- How to use metadata filters for targeted search")
print("- How to combine multiple metadata filters")
print("- How metadata improves search relevance and precision")

print(f"\n💡 Advanced metadata strategies:")
print("- Use hierarchical metadata (e.g., 'department.team.role')")
print("- Include temporal metadata for time-based filtering")
print("- Add content-type metadata for different document formats")
print("- Use priority/importance metadata for result ranking")
print("- Include access-control metadata for security")

except ValueError as e:
print(f"❌ Configuration Error: {e}")
print("\n💡 Make sure to set the required environment variables:")
print(" export VECTORIZE_ORGANIZATION_ID='your-org-id'")
print(" export VECTORIZE_API_KEY='your-api-key'")

except Exception as error:
print(f"❌ Error: {error}")
sys.exit(1)

finally:
# ============================================================================
# Cleanup
# ============================================================================
print("\n🧹 Cleanup")

try:
# Initialize API client with proper headers for local env
with v.ApiClient(configuration) as api_client:
# Delete pipeline
if pipeline_id:
try:
pipelines_api = v.PipelinesApi(api_client)
pipelines_api.delete_pipeline(organization_id, pipeline_id)
print(f"Deleted pipeline: {pipeline_id}")
except Exception as e:
print(f"Could not delete pipeline: {e}")

# Delete source connector
if source_connector_id:
try:
connectors_api = v.SourceConnectorsApi(api_client)
connectors_api.delete_source_connector(organization_id, source_connector_id)
print(f"Deleted connector: {source_connector_id}")
except Exception as e:
print(f"Could not delete connector: {e}")
except:
pass


if __name__ == "__main__":
main()

Was this page helpful?