

{
"legal": {
"name": "Legal Compliance Review",
"focus_areas": ["compliance", "liability", "termination", "confidentiality"],
"extraction_schema": {
"clauses": true,
"obligations": true,
"deadlines": true,
"risk_assessment": true,
"compensation": false // HR's domain
},
"chunk_strategy": "clause_boundary",
"embedding_focus": "legal_terminology"
},
"procurement": {
"name": "Vendor Contract Analysis",
"focus_areas": ["pricing", "sla", "payment_terms", "penalties"],
"extraction_schema": {
"tables": true,
"financial_terms": true,
"service_levels": true,
"vendors": true,
"compliance": false // Legal's domain
},
"chunk_strategy": "table_aware",
"embedding_focus": "financial_terminology"
},
"hr": {
"name": "Employment Terms Tracking",
"focus_areas": ["compensation", "benefits", "equity", "performance"],
"extraction_schema": {
"salary_bands": true,
"benefit_tables": true,
"vesting_schedules": true,
"pto_policies": true,
"legal_clauses": false // Legal's domain
},
"chunk_strategy": "table_aware",
"embedding_focus": "hr_terminology"
}
}-- ============================================
-- WORKSPACE TABLES
-- ============================================
CREATE TABLE workspaces (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
slug TEXT UNIQUE NOT NULL, -- URL-safe identifier
use_case_type TEXT NOT NULL, -- 'legal', 'procurement', 'hr', 'custom'
use_case_config JSONB, -- Custom extraction settings
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
is_active BOOLEAN DEFAULT true
);
CREATE TABLE workspace_members (
workspace_id UUID REFERENCES workspaces(id) ON DELETE CASCADE,
user_id UUID REFERENCES users(id) ON DELETE CASCADE,
role TEXT NOT NULL, -- 'owner', 'admin', 'editor', 'viewer'
joined_at TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (workspace_id, user_id)
);
-- ============================================
-- MODIFIED CORE TABLES (add workspace_id)
-- ============================================
-- Documents
ALTER TABLE documents ADD COLUMN workspace_id UUID REFERENCES workspaces(id);
CREATE INDEX idx_documents_workspace ON documents(workspace_id);
-- Graph entities
ALTER TABLE graph_clients ADD COLUMN workspace_id UUID REFERENCES workspaces(id);
ALTER TABLE graph_contracts ADD COLUMN workspace_id UUID REFERENCES workspaces(id);
ALTER TABLE graph_clauses ADD COLUMN workspace_id UUID REFERENCES workspaces(id);
-- RAG chunks
ALTER TABLE rag_chunks ADD COLUMN workspace_id UUID REFERENCES workspaces(id);
CREATE INDEX idx_rag_chunks_workspace ON rag_chunks(workspace_id);
-- Extraction results
ALTER TABLE extraction_results ADD COLUMN workspace_id UUID REFERENCES workspaces(id);# In the route handler
@router.get("/documents")
def get_documents(workspace_id: UUID):
# Application code must remember to filter
docs = db.query(Document).filter(Document.workspace_id == workspace_id).all()
return docsclass WorkspaceService:
def __init__(self, db: Session, workspace_id: UUID):
self.db = db
self.workspace_id = workspace_id
def get_documents(self):
# Workspace scoping is automatic
return self.db.query(Document)\
.filter(Document.workspace_id == self.workspace_id)\
.all()
def get_contracts(self):
# Every method automatically scoped
return self.db.query(Contract)\
.filter(Contract.workspace_id == self.workspace_id)\
.all()from fastapi import Depends
def get_workspace_service(
workspace_id: UUID, # From path or header
db: Session = Depends(get_db)
) -> WorkspaceService:
return WorkspaceService(db, workspace_id)
@router.get("/api/v1/workspaces/{workspace_id}/documents")
def get_documents(
service: WorkspaceService = Depends(get_workspace_service)
):
# Service is pre-scoped to workspace_id
return service.get_documents()class DynamicSchemaGenerator:
def generate_schema(
self,
document_structure: dict,
use_case_config: dict
) -> dict:
"""
Generate extraction schema based on:
1. Document structure (discovery phase)
2. Workspace use case (what team cares about)
"""
base_schema = self._get_base_schema()
# Filter fields based on use case
focus_areas = use_case_config.get("focus_areas", [])
extraction_config = use_case_config.get("extraction_schema", {})
schema = {}
# Always include metadata
schema["metadata"] = base_schema["metadata"]
# Conditionally include extraction types
if extraction_config.get("clauses", False):
schema["clauses"] = self._generate_clause_schema(
document_structure,
focus_categories=focus_areas
)
if extraction_config.get("tables", False):
schema["tables"] = self._generate_table_schema(
document_structure,
focus_types=["pricing", "sla"] if "pricing" in focus_areas else None
)
if extraction_config.get("salary_bands", False):
# HR-specific extraction
schema["salary_bands"] = {
"type": "array",
"items": {
"role": "string",
"min_salary": "number",
"max_salary": "number",
"currency": "string"
}
}
return schema{
"tables_found": [
{"name": "Compensation Schedule", "columns": ["Role", "Salary Range", "Equity"]},
{"name": "Arbitration Procedures", "columns": ["Step", "Timeline", "Outcome"]}
],
"sections_found": [
{"title": "Non-Compete Agreement", "type": "legal"},
{"title": "Benefits Summary", "type": "hr"}
]
}{
"focus_areas": ["legal", "compliance"],
"extraction_schema": {
"clauses": true,
"tables": false,
"salary_bands": false
}
}{
"metadata": {...},
"clauses": [
{
"title": "Non-Compete Agreement",
"text": "Full clause text...",
"category": "legal",
"risk_level": "high"
}
]
// Compensation table NOT extracted (HR's concern, not Legal's)
}{
"focus_areas": ["compensation", "benefits"],
"extraction_schema": {
"clauses": false,
"tables": true,
"salary_bands": true
}
}{
"metadata": {...},
"tables": [
{
"name": "Compensation Schedule",
"rows": [...]
}
],
"salary_bands": [...]
// Non-Compete clause NOT extracted (Legal's concern, not HR's)
}class GraphRAGService:
def __init__(self, db: Session, workspace_id: UUID):
self.db = db
self.workspace_id = workspace_id
def add_client(self, name: str, industry: str) -> str:
"""Add client to THIS workspace's graph"""
client = GraphClient(
name=name,
industry=industry,
workspace_id=self.workspace_id # Automatic scoping
)
self.db.add(client)
self.db.commit()
return client.id
def get_client_contracts(self, client_id: str) -> List[Contract]:
"""Get contracts for client in THIS workspace"""
return self.db.query(GraphContract)\
.filter(
GraphContract.client_id == client_id,
GraphContract.workspace_id == self.workspace_id # Isolated
)\
.all()
def search_clauses(self, query: str, top_k: int = 10) -> List[Clause]:
"""Semantic search across THIS workspace's clauses"""
query_embedding = self.embedding_service.generate(query)
results = self.db.query(GraphClause)\
.filter(GraphClause.workspace_id == self.workspace_id) # Scoped
.order_by(
GraphClause.embedding.cosine_distance(query_embedding)
)\
.limit(top_k)\
.all()
return results
SELECT * FROM rag_chunks WHERE workspace_id = 'abc123...';
-- Seq Scan on rag_chunks (cost=0.00..15234.56 rows=82180)CREATE INDEX idx_rag_chunks_workspace ON rag_chunks(workspace_id);
-- Index Scan using idx_rag_chunks_workspace (cost=0.42..234.56 rows=27394)def generate_embedding(self, text: str, workspace_id: UUID) -> List[float]:
workspace = self.db.query(Workspace).get(workspace_id)
use_case = workspace.use_case_type
# Context-aware embedding
contextualized_text = f"[{use_case}] {text}"
return self.embedding_provider.embed(contextualized_text)