DR. ATABAK KH
Cloud Platform Modernization Architect specializing in transforming legacy systems into reliable, observable, and cost-efficient Cloud platforms.
Certified: Google Professional Cloud Architect, AWS Solutions Architect, MapR Cluster Administrator
Right-Time AI: Small Copilots, Big Proof - An End-to-End PoC You Can Reproduce
In two weeks, I built three micro-copilots (reading, meeting, code) on Vertex AI served via Cloud Run, orchestrated by Workflows, and measured in BigQuery. I defined strict hypotheses (time saved, edits reduced, predictable cost), mapped decisions to right-time freshness (seconds/minutes/daily), and enforced a kill switch for reversibility. Two copilots passed (≥15% time saved, ≥20% fewer edits), one failed and was retired. In this article I would like to share some finding and the reproducible pattern, the guardrails, and the numbers - so, It will help to understand and plan to be able to decide what to keep, what to stop, and why.
Key Results:
Usually demos are easy, but daily impact is hard. I wanted a personal, honest answer to a simple question: Are small AI helpers to real tasks, would save people time without creating chaos or surprise costs?
The goal wasn’t to build the next ChatGPT/Gemini/Copilot or any other competitor. It was to test a repeatable, governed pattern that delivers measurable value - one that could be templated for other (personal or companies) workflows with clear success gates and a kill switch.
No fake “real-time” where decisions don’t need it.
AI_ENABLED=false)
I have used a personal gmail account with free 3 month 300 credit for this test, and the details are as below:
Characteristics:
Endpoints:
POST /suggest - Main copilot endpointGET /health - Health check with kill switch statusDataset: copilot_poc
Tables:
events - One row per invocation with full telemetrycatalog - Workflow registry with freshness tiers and SLAsmetrics (view) - Daily aggregates by copilotPartitioning: Events table partitioned by date
Clustering: By copilot and outcome for query optimization
AI_ENABLED=falseus-central1 (configurable to EU regions)File: lib/ai_client.py
"""
AI Client abstraction for OpenAI and Vertex AI providers.
"""
import os
import time
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple
from enum import Enum
try:
from google.cloud import aiplatform
from vertexai.preview.generative_models import GenerativeModel
except ImportError:
aiplatform = None
GenerativeModel = None
class AIProvider(str, Enum):
"""Supported AI providers."""
OPENAI = "openai"
VERTEX = "vertex"
class AIClient(ABC):
"""Abstract base class for AI clients."""
@abstractmethod
def generate(
self,
prompt: str,
system_prompt: Optional[str] = None,
max_tokens: int = 2000,
temperature: float = 0.7,
) -> Tuple[str, Dict[str, int]]:
"""
Generate a response from the AI model.
Returns:
Tuple of (response_text, token_usage_dict)
token_usage_dict contains: tokens_in, tokens_out
"""
pass
class VertexAIClient(AIClient):
"""Google Vertex AI client."""
def __init__(
self,
project_id: Optional[str] = None,
location: str = "us-central1",
model: str = "gemini-pro",
):
if GenerativeModel is None:
raise ImportError(
"vertexai package not installed. Install with: pip install google-cloud-aiplatform"
)
self.project_id = project_id or os.getenv("GCP_PROJECT_ID")
if not self.project_id:
raise ValueError("GCP_PROJECT_ID not provided")
self.location = location
self.model_name = model
aiplatform.init(project=self.project_id, location=self.location)
self.model = GenerativeModel(model)
def generate(
self,
prompt: str,
system_prompt: Optional[str] = None,
max_tokens: int = 2000,
temperature: float = 0.7,
) -> Tuple[str, Dict[str, int]]:
"""Generate response using Vertex AI."""
full_prompt = prompt
if system_prompt:
full_prompt = f"{system_prompt}\n\n{prompt}"
response = self.model.generate_content(
full_prompt,
generation_config={
"max_output_tokens": max_tokens,
"temperature": temperature,
},
)
content = response.text
token_usage = {
"tokens_in": len(full_prompt.split()) * 1.3, # Rough estimate
"tokens_out": len(content.split()) * 1.3, # Rough estimate
}
return content, token_usage
def get_ai_client(
provider: Optional[str] = None,
**kwargs
) -> AIClient:
"""
Factory function to get the appropriate AI client.
Args:
provider: "openai" or "vertex" (defaults to AI_PROVIDER env var)
**kwargs: Additional arguments passed to the client constructor
Returns:
AIClient instance
"""
provider = provider or os.getenv("AI_PROVIDER", AIProvider.VERTEX.value)
if provider == AIProvider.OPENAI.value:
return OpenAIClient(**kwargs)
elif provider == AIProvider.VERTEX.value:
return VertexAIClient(**kwargs)
else:
raise ValueError(f"Unknown AI provider: {provider}")
File: lib/kill_switch.py
"""
Kill switch logic for disabling AI and falling back to deterministic outputs.
"""
import os
from typing import Dict, Any, Callable, Optional
def is_ai_enabled() -> bool:
"""Check if AI is enabled via environment variable."""
return os.getenv("AI_ENABLED", "true").lower() == "true"
def get_fallback_response(copilot_type: str) -> Dict[str, Any]:
"""
Get deterministic fallback response when AI is disabled.
Args:
copilot_type: "code", "reading", or "meeting"
Returns:
Dictionary with fallback response structure
"""
fallbacks = {
"code": {
"test_suggestions": [],
"refactor_hints": [],
"message": "AI is disabled. Please enable AI_ENABLED=true to get suggestions.",
},
"reading": {
"digest": [],
"follow_up_prompts": [],
"message": "AI is disabled. Please enable AI_ENABLED=true to get digest.",
},
"meeting": {
"action_items": [],
"message": "AI is disabled. Please enable AI_ENABLED=true to get action items.",
},
}
return fallbacks.get(copilot_type, {"message": "AI is disabled"})
def with_kill_switch(
ai_function: Callable,
copilot_type: str,
*args,
**kwargs
) -> Dict[str, Any]:
"""
Execute AI function with kill switch fallback.
Args:
ai_function: Function to call if AI is enabled
copilot_type: Type of copilot for fallback response
*args, **kwargs: Arguments to pass to ai_function
Returns:
Result from ai_function or fallback response
"""
if not is_ai_enabled():
return {
"result": get_fallback_response(copilot_type),
"ai_enabled": False,
"outcome": "fallback",
}
try:
result = ai_function(*args, **kwargs)
return {
"result": result,
"ai_enabled": True,
"outcome": "success",
}
except Exception as e:
return {
"result": get_fallback_response(copilot_type),
"ai_enabled": True,
"outcome": "error",
"error": str(e),
}
File: copilots/code_copilot/main.py
"""
Code Copilot FastAPI service.
"""
import os
import sys
from typing import Optional
from fastapi import FastAPI, HTTPException, UploadFile, File, Form
from fastapi.responses import JSONResponse
from pydantic import BaseModel
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../'))
from lib.kill_switch import is_ai_enabled, with_kill_switch, get_fallback_response
from lib.bigquery_client import BigQueryClient
from copilots.code_copilot.copilot import CodeCopilot
app = FastAPI(title="Code Copilot", version="1.0.0")
copilot = CodeCopilot()
bq_client = BigQueryClient()
class CodeRequest(BaseModel):
"""Request model for code analysis."""
code: str
workflow: Optional[str] = None
corrections_needed: Optional[int] = None
class SuggestResponse(BaseModel):
"""Response model for suggestions."""
result: dict
latency_ms: Optional[int]
tokens_in: Optional[int]
tokens_out: Optional[int]
ai_enabled: bool
outcome: str
event_id: Optional[str] = None
@app.get("/health")
async def health():
"""Health check endpoint."""
return {"status": "healthy", "ai_enabled": is_ai_enabled()}
@app.post("/suggest", response_model=SuggestResponse)
async def suggest(
code: Optional[str] = Form(None),
file: Optional[UploadFile] = File(None),
workflow: Optional[str] = Form(None),
corrections_needed: Optional[int] = Form(None),
):
"""
Get code analysis suggestions.
Accepts either:
- code: Direct code text
- file: Uploaded code file
"""
# Get code content
code_content = code
if file:
code_content = await file.read()
code_content = code_content.decode('utf-8')
if not code_content:
raise HTTPException(status_code=400, detail="Either 'code' or 'file' must be provided")
input_bytes = len(code_content.encode('utf-8'))
def analyze():
return copilot.analyze_code(code_content)
response = with_kill_switch(analyze, "code")
telemetry = response["result"].get("telemetry", {})
latency_ms = telemetry.get("latency_ms")
tokens_in = telemetry.get("tokens_in", 0)
tokens_out = telemetry.get("tokens_out", 0)
cost_eur = None
if tokens_in and tokens_out:
cost_eur = (tokens_in / 1000 * 0.0005 + tokens_out / 1000 * 0.0005)
event_id = None
try:
event_id = bq_client.log_event(
copilot="code",
workflow=workflow,
freshness_tier="seconds",
input_bytes=input_bytes,
output_bytes=len(str(response["result"]).encode('utf-8')),
tokens_in=tokens_in,
tokens_out=tokens_out,
latency_ms=latency_ms,
ai_enabled=response["ai_enabled"],
corrections_needed=corrections_needed,
outcome=response["outcome"],
cost_eur=cost_eur,
)
except Exception as e:
print(f"Failed to log to BigQuery: {e}")
return SuggestResponse(
result=response["result"],
latency_ms=latency_ms,
tokens_in=tokens_in,
tokens_out=tokens_out,
ai_enabled=response["ai_enabled"],
outcome=response["outcome"],
event_id=event_id,
)
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", 8080))
uvicorn.run(app, host="0.0.0.0", port=port)
File: copilots/code_copilot/copilot.py
"""
Code Copilot - AI-powered code analysis and suggestions.
"""
import sys
import os
from typing import Dict, List, Any
parent_dir = os.path.join(os.path.dirname(__file__), '../../')
sys.path.insert(0, parent_dir)
from lib.ai_client import get_ai_client
from lib.telemetry import Telemetry
class CodeCopilot:
"""Code Copilot for test suggestions and refactor hints."""
def __init__(self):
self.ai_client = get_ai_client()
self.system_prompt = """You are a code analysis assistant. Analyze code and provide:
1. Test case suggestions (unit tests, edge cases)
2. Refactoring hints (code quality, performance, maintainability)
Be concise and actionable. Format output as JSON with keys: test_suggestions (list) and refactor_hints (list)."""
def analyze_code(self, code: str) -> Dict[str, Any]:
"""
Analyze code and provide suggestions.
Args:
code: Source code to analyze
Returns:
Dictionary with test_suggestions and refactor_hints
"""
code_block = f"```python\n{code}\n```"
prompt = f"""Analyze the following code and provide test suggestions and refactoring hints:
{my_code_block}
Provide your analysis in JSON format with:
- test_suggestions: array of test case descriptions
- refactor_hints: array of refactoring suggestions"""
telemetry = Telemetry()
with telemetry.track():
response, token_usage = self.ai_client.generate(
prompt=prompt,
system_prompt=self.system_prompt,
max_tokens=2000,
temperature=0.7,
)
telemetry.set_tokens(token_usage["tokens_in"], token_usage["tokens_out"])
try:
import json
if "```json" in response:
json_start = response.find("```json") + 7
json_end = response.find("```", json_start)
response = response[json_start:json_end].strip()
elif "```" in response:
json_start = response.find("```") + 3
json_end = response.find("```", json_start)
response = response[json_start:json_end].strip()
result = json.loads(response)
except (json.JSONDecodeError, ValueError):
result = {
"test_suggestions": [response],
"refactor_hints": [],
}
return {
"test_suggestions": result.get("test_suggestions", []),
"refactor_hints": result.get("refactor_hints", []),
"telemetry": telemetry.to_dict(),
}
File: lib/bigquery_client.py
"""
BigQuery client for logging events and metrics.
"""
import os
import uuid
from datetime import datetime
from typing import Optional
try:
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
except ImportError:
bigquery = None
NotFound = None
class BigQueryClient:
"""Client for logging events to BigQuery."""
def __init__(self, project_id: Optional[str] = None, dataset_id: str = "copilot_poc"):
if bigquery is None:
raise ImportError("google-cloud-bigquery not installed")
self.project_id = project_id or os.getenv("GCP_PROJECT_ID")
if not self.project_id:
raise ValueError("GCP_PROJECT_ID not provided")
self.dataset_id = dataset_id
self.client = bigquery.Client(project=self.project_id)
self._ensure_dataset_and_table()
def _ensure_dataset_and_table(self):
"""Ensure dataset and table exist."""
dataset_ref = self.client.dataset(self.dataset_id)
try:
self.client.get_dataset(dataset_ref)
except NotFound:
dataset = bigquery.Dataset(dataset_ref)
dataset.location = "US"
dataset.description = "Dataset for storing copilot events and metrics"
self.client.create_dataset(dataset)
table_ref = dataset_ref.table("events")
try:
self.client.get_table(table_ref)
except NotFound:
schema = [
bigquery.SchemaField("event_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("ts", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("copilot", "STRING", mode="REQUIRED"),
bigquery.SchemaField("workflow", "STRING"),
bigquery.SchemaField("freshness_tier", "STRING"),
bigquery.SchemaField("input_bytes", "INT64"),
bigquery.SchemaField("output_bytes", "INT64"),
bigquery.SchemaField("tokens_in", "INT64"),
bigquery.SchemaField("tokens_out", "INT64"),
bigquery.SchemaField("latency_ms", "INT64"),
bigquery.SchemaField("ai_enabled", "BOOL"),
bigquery.SchemaField("corrections_needed", "INT64"),
bigquery.SchemaField("outcome", "STRING"),
bigquery.SchemaField("cost_eur", "NUMERIC"),
bigquery.SchemaField("notes", "STRING"),
]
table = bigquery.Table(table_ref, schema=schema)
table.time_partitioning = bigquery.TimePartitioning(field="ts")
table.clustering_fields = ["copilot", "outcome"]
self.client.create_table(table)
def log_event(
self,
copilot: str,
workflow: Optional[str] = None,
freshness_tier: str = "seconds",
input_bytes: Optional[int] = None,
output_bytes: Optional[int] = None,
tokens_in: Optional[int] = None,
tokens_out: Optional[int] = None,
latency_ms: Optional[int] = None,
ai_enabled: bool = True,
corrections_needed: Optional[int] = None,
outcome: str = "success",
cost_eur: Optional[float] = None,
notes: Optional[str] = None,
) -> str:
"""
Log an event to BigQuery.
Returns:
event_id: Unique identifier for the logged event
"""
event_id = str(uuid.uuid4())
row = {
"event_id": event_id,
"ts": datetime.utcnow().isoformat(),
"copilot": copilot,
"workflow": workflow,
"freshness_tier": freshness_tier,
"input_bytes": input_bytes,
"output_bytes": output_bytes,
"tokens_in": tokens_in,
"tokens_out": tokens_out,
"latency_ms": latency_ms,
"ai_enabled": ai_enabled,
"corrections_needed": corrections_needed,
"outcome": outcome,
"cost_eur": cost_eur,
"notes": notes,
}
table_ref = self.client.dataset(self.dataset_id).table("events")
errors = self.client.insert_rows_json(table_ref, [row])
if errors:
raise Exception(f"Failed to insert row: {errors}")
return event_id
File: lib/telemetry.py
"""
Telemetry tracking for latency and token usage.
"""
import time
from typing import Dict, Optional
class Telemetry:
"""Track latency and token usage for AI operations."""
def __init__(self):
self.start_time: Optional[float] = None
self.end_time: Optional[float] = None
self.tokens_in: int = 0
self.tokens_out: int = 0
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.end_time = time.time()
def track(self):
"""Context manager for tracking latency."""
return self
def set_tokens(self, tokens_in: int, tokens_out: int):
"""Set token usage."""
self.tokens_in = tokens_in
self.tokens_out = tokens_out
@property
def latency_ms(self) -> Optional[int]:
"""Get latency in milliseconds."""
if self.start_time and self.end_time:
return int((self.end_time - self.start_time) * 1000)
return None
def to_dict(self) -> Dict[str, any]:
"""Convert to dictionary."""
return {
"latency_ms": self.latency_ms,
"tokens_in": self.tokens_in,
"tokens_out": self.tokens_out,
}
File: infrastructure/modules/cloud_run/main.tf
variable "project_id" {
description = "GCP Project ID"
type = string
}
variable "region" {
description = "GCP Region"
type = string
default = "eu-west4"
}
variable "service_name" {
description = "Cloud Run service name"
type = string
}
variable "image" {
description = "Container image URL"
type = string
}
variable "service_account_email" {
description = "Service account email"
type = string
}
variable "environment_variables" {
description = "Environment variables"
type = map(string)
default = {}
}
variable "min_instances" {
description = "Minimum number of instances"
type = number
default = 0 # Scale to zero for cost optimization
}
variable "max_instances" {
description = "Maximum number of instances"
type = number
default = 3 # Limit max instances for cost control
}
resource "google_cloud_run_service" "service" {
name = var.service_name
location = var.region
project = var.project_id
template {
spec {
service_account_name = var.service_account_email
containers {
image = var.image
resources {
limits = {
cpu = "1"
memory = "512Mi"
}
}
dynamic "env" {
for_each = var.environment_variables
content {
name = env.key
value = env.value
}
}
}
container_concurrency = 80
timeout_seconds = 300
}
metadata {
annotations = {
"autoscaling.knative.dev/minScale" = tostring(var.min_instances)
"autoscaling.knative.dev/maxScale" = tostring(var.max_instances)
}
labels = {
environment = "dev"
service = var.service_name
}
}
}
traffic {
percent = 100
latest_revision = true
}
}
# Private access only (bearer token authentication)
resource "google_cloud_run_service_iam_member" "authenticated_access" {
service = google_cloud_run_service.service.name
location = google_cloud_run_service.service.location
role = "roles/run.invoker"
member = "serviceAccount:${var.service_account_email}"
}
output "service_url" {
description = "Cloud Run service URL"
value = google_cloud_run_service.service.status[0].url
}
File: scripts/setup_bigquery.sql
-- BigQuery Setup Script for Copilot PoC
-- Dataset: copilot_poc
-- Create dataset
CREATE SCHEMA IF NOT EXISTS `copilot_poc`
OPTIONS(
description="Dataset for storing copilot events and metrics",
location="US"
);
-- Events table
CREATE TABLE IF NOT EXISTS `copilot_poc.events` (
event_id STRING NOT NULL,
ts TIMESTAMP NOT NULL,
copilot STRING NOT NULL,
workflow STRING,
freshness_tier STRING,
input_bytes INT64,
output_bytes INT64,
tokens_in INT64,
tokens_out INT64,
latency_ms INT64,
ai_enabled BOOL,
corrections_needed INT64,
outcome STRING,
cost_eur NUMERIC,
notes STRING
)
PARTITION BY DATE(ts)
CLUSTER BY copilot, outcome
OPTIONS(
description="One row per copilot invocation with full telemetry"
);
-- Catalog table
CREATE TABLE IF NOT EXISTS `copilot_poc.catalog` (
workflow STRING NOT NULL,
copilot STRING NOT NULL,
owner STRING,
freshness_tier STRING,
sla_ms INT64
)
OPTIONS(
description="Workflow registry with freshness tiers and SLAs"
);
-- Insert catalog entries
INSERT INTO `copilot_poc.catalog` (workflow, copilot, owner, freshness_tier, sla_ms)
VALUES
('code-copilot-workflow', 'code', 'dev-team', 'seconds', 2000),
('reading-copilot-nightly', 'reading', 'dev-team', 'daily', 300000),
('meeting-copilot-workflow', 'meeting', 'dev-team', 'minutes', 60000)
ON DUPLICATE KEY UPDATE
copilot = EXCLUDED.copilot,
owner = EXCLUDED.owner,
freshness_tier = EXCLUDED.freshness_tier,
sla_ms = EXCLUDED.sla_ms;
-- Metrics view (daily aggregates)
CREATE OR REPLACE VIEW `copilot_poc.metrics` AS
SELECT
DATE(ts) as date,
copilot,
COUNT(*) as total_events,
AVG(latency_ms) as avg_latency_ms,
APPROX_QUANTILES(latency_ms, 100)[OFFSET(95)] as p95_latency_ms,
SUM(tokens_in) as total_tokens_in,
SUM(tokens_out) as total_tokens_out,
SUM(cost_eur) as total_cost_eur,
AVG(corrections_needed) as avg_corrections,
COUNTIF(outcome = 'success') as success_count,
COUNTIF(outcome = 'fallback') as fallback_count,
COUNTIF(outcome = 'error') as error_count,
COUNTIF(outcome = 'success') / COUNT(*) as success_rate
FROM
`copilot_poc.events`
GROUP BY
date, copilot
ORDER BY
date DESC, copilot;
File: tests/test_kill_switch.py
"""
Unit tests for kill switch functionality.
"""
import os
import pytest
from unittest.mock import patch, MagicMock
from lib.kill_switch import is_ai_enabled, get_fallback_response, with_kill_switch
def test_is_ai_enabled_true():
"""Test AI enabled when environment variable is true."""
with patch.dict(os.environ, {"AI_ENABLED": "true"}):
assert is_ai_enabled() is True
def test_is_ai_enabled_false():
"""Test AI disabled when environment variable is false."""
with patch.dict(os.environ, {"AI_ENABLED": "false"}):
assert is_ai_enabled() is False
def test_is_ai_enabled_default():
"""Test AI enabled by default when environment variable is not set."""
with patch.dict(os.environ, {}, clear=True):
assert is_ai_enabled() is True
def test_get_fallback_response_code():
"""Test fallback response for code copilot."""
response = get_fallback_response("code")
assert "test_suggestions" in response
assert "refactor_hints" in response
assert "message" in response
assert response["test_suggestions"] == []
assert response["refactor_hints"] == []
def test_get_fallback_response_reading():
"""Test fallback response for reading copilot."""
response = get_fallback_response("reading")
assert "digest" in response
assert "follow_up_prompts" in response
assert "message" in response
def test_with_kill_switch_disabled():
"""Test kill switch when AI is disabled."""
with patch.dict(os.environ, {"AI_ENABLED": "false"}):
mock_function = MagicMock()
result = with_kill_switch(mock_function, "code")
assert result["ai_enabled"] is False
assert result["outcome"] == "fallback"
assert "result" in result
mock_function.assert_not_called()
def test_with_kill_switch_enabled_success():
"""Test kill switch when AI is enabled and succeeds."""
with patch.dict(os.environ, {"AI_ENABLED": "true"}):
mock_function = MagicMock(return_value={"test": "result"})
result = with_kill_switch(mock_function, "code")
assert result["ai_enabled"] is True
assert result["outcome"] == "success"
assert result["result"] == {"test": "result"}
mock_function.assert_called_once()
def test_with_kill_switch_enabled_error():
"""Test kill switch when AI is enabled but fails."""
with patch.dict(os.environ, {"AI_ENABLED": "true"}):
mock_function = MagicMock(side_effect=Exception("AI error"))
result = with_kill_switch(mock_function, "code")
assert result["ai_enabled"] is True
assert result["outcome"] == "error"
assert "error" in result
assert result["error"] == "AI error"
File: tests/test_ai_client.py
"""
Unit tests for AI client abstraction.
"""
import pytest
from unittest.mock import patch, MagicMock
from lib.ai_client import get_ai_client, VertexAIClient, AIProvider
@patch('lib.ai_client.aiplatform')
@patch('lib.ai_client.GenerativeModel')
def test_vertex_ai_client_init(mock_model_class, mock_aiplatform):
"""Test Vertex AI client initialization."""
mock_model = MagicMock()
mock_model_class.return_value = mock_model
with patch.dict(os.environ, {"GCP_PROJECT_ID": "test-project"}):
client = VertexAIClient(
project_id="test-project",
location="us-central1",
model="gemini-pro"
)
assert client.project_id == "test-project"
assert client.location == "us-central1"
assert client.model_name == "gemini-pro"
mock_aiplatform.init.assert_called_once()
mock_model_class.assert_called_once_with("gemini-pro")
@patch('lib.ai_client.aiplatform')
@patch('lib.ai_client.GenerativeModel')
def test_vertex_ai_client_generate(mock_model_class, mock_aiplatform):
"""Test Vertex AI client generate method."""
mock_model = MagicMock()
mock_response = MagicMock()
mock_response.text = "Test response"
mock_model.generate_content.return_value = mock_response
mock_model_class.return_value = mock_model
with patch.dict(os.environ, {"GCP_PROJECT_ID": "test-project"}):
client = VertexAIClient()
response, token_usage = client.generate(
prompt="Test prompt",
system_prompt="System prompt",
max_tokens=1000,
temperature=0.7
)
assert response == "Test response"
assert "tokens_in" in token_usage
assert "tokens_out" in token_usage
mock_model.generate_content.assert_called_once()
File: tests/test_code_copilot.py
"""
Unit tests for code copilot.
"""
import pytest
from unittest.mock import patch, MagicMock
from copilots.code_copilot.copilot import CodeCopilot
@patch('copilots.code_copilot.copilot.get_ai_client')
def test_code_copilot_analyze_code(mock_get_client):
"""Test code copilot analyze_code method."""
mock_client = MagicMock()
mock_client.generate.return_value = (
'{"test_suggestions": ["Test 1", "Test 2"], "refactor_hints": ["Hint 1"]}',
{"tokens_in": 100, "tokens_out": 50}
)
mock_get_client.return_value = mock_client
copilot = CodeCopilot()
result = copilot.analyze_code("def test(): pass")
assert "test_suggestions" in result
assert "refactor_hints" in result
assert "telemetry" in result
assert len(result["test_suggestions"]) == 2
assert len(result["refactor_hints"]) == 1
assert result["telemetry"]["tokens_in"] == 100
assert result["telemetry"]["tokens_out"] == 50
File: tests/integration/test_copilot_integration.py
"""
Integration tests for copilot services.
"""
import pytest
import os
from fastapi.testclient import TestClient
from copilots.code_copilot.main import app
@pytest.fixture
def client():
"""Create test client."""
return TestClient(app)
def test_health_endpoint(client):
"""Test health endpoint."""
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert "status" in data
assert "ai_enabled" in data
@patch.dict(os.environ, {"AI_ENABLED": "false"})
def test_suggest_endpoint_kill_switch(client):
"""Test suggest endpoint with kill switch disabled."""
response = client.post(
"/suggest",
data={"code": "def test(): pass"}
)
assert response.status_code == 200
data = response.json()
assert data["ai_enabled"] is False
assert data["outcome"] == "fallback"
File: tests/performance/test_latency.py
"""
Performance tests for latency requirements.
"""
import pytest
import time
from copilots.code_copilot.copilot import CodeCopilot
def test_code_copilot_latency_seconds_tier():
"""Test code copilot meets seconds-tier latency requirement."""
copilot = CodeCopilot()
start_time = time.time()
result = copilot.analyze_code("def test(): pass")
elapsed_ms = (time.time() - start_time) * 1000
# Seconds tier: p95 < 2000ms
assert elapsed_ms < 2000, f"Latency {elapsed_ms}ms exceeds 2000ms SLA"
assert "telemetry" in result
assert result["telemetry"]["latency_ms"] < 2000
File: tests/e2e/test_full_workflow.py
"""
End-to-end tests for full copilot workflow.
"""
import pytest
import requests
import os
@pytest.mark.e2e
def test_code_copilot_e2e():
"""Test full code copilot workflow end-to-end."""
service_url = os.getenv("CODE_COPILOT_URL", "http://localhost:8080")
# Test health check
health_response = requests.get(f"{service_url}/health")
assert health_response.status_code == 200
# Test code analysis
code = """
def calculate_total(items):
total = 0
for item in items:
total += item.price
return total
"""
suggest_response = requests.post(
f"{service_url}/suggest",
data={"code": code}
)
assert suggest_response.status_code == 200
data = suggest_response.json()
assert "result" in data
assert "event_id" in data
Daily Metrics:
SELECT * FROM `copilot_poc.metrics`
WHERE date >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
ORDER BY date DESC, copilot;
Time Saved Calculation:
SELECT
copilot,
AVG(latency_ms) as avg_copilot_time_ms,
CASE
WHEN copilot = 'code' THEN 10000
WHEN copilot = 'reading' THEN 300000
WHEN copilot = 'meeting' THEN 720000
ELSE 0
END as baseline_time_ms,
CASE
WHEN copilot = 'code' THEN (10000 - AVG(latency_ms)) / 10000.0 * 100
WHEN copilot = 'reading' THEN (300000 - AVG(latency_ms)) / 300000.0 * 100
WHEN copilot = 'meeting' THEN (720000 - AVG(latency_ms)) / 720000.0 * 100
ELSE 0
END as time_saved_percent
FROM
`copilot_poc.events`
WHERE
outcome = 'success'
AND DATE(ts) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
GROUP BY
copilot;
Cost per 1K Predictions:
SELECT
copilot,
COUNT(*) as total_predictions,
SUM(cost_eur) as total_cost_eur,
SUM(cost_eur) / COUNT(*) * 1000 as cost_per_1k_predictions_eur
FROM
`copilot_poc.events`
WHERE
outcome = 'success'
AND cost_eur IS NOT NULL
AND DATE(ts) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
GROUP BY
copilot;
Kill Switch Effectiveness:
SELECT
DATE(ts) as date,
copilot,
COUNTIF(outcome = 'fallback') as fallback_count,
COUNTIF(outcome = 'success') as success_count,
COUNTIF(outcome = 'fallback') / COUNT(*) * 100 as fallback_rate
FROM
`copilot_poc.events`
WHERE
DATE(ts) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
GROUP BY
date, copilot
ORDER BY
date DESC, copilot;
Quality Metrics (Edits Reduced):
SELECT
copilot,
AVG(corrections_needed) as avg_corrections_with_copilot,
CASE
WHEN copilot = 'code' THEN 5.0
WHEN copilot = 'reading' THEN 8.0
WHEN copilot = 'meeting' THEN 6.0
ELSE 0
END as baseline_corrections,
CASE
WHEN copilot = 'code' THEN (5.0 - AVG(corrections_needed)) / 5.0 * 100
WHEN copilot = 'reading' THEN (8.0 - AVG(corrections_needed)) / 8.0 * 100
WHEN copilot = 'meeting' THEN (6.0 - AVG(corrections_needed)) / 6.0 * 100
ELSE 0
END as edits_reduced_percent
FROM
`copilot_poc.events`
WHERE
outcome = 'success'
AND corrections_needed IS NOT NULL
AND DATE(ts) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY)
GROUP BY
copilot;
Hypothesis: H1 (time saved), H2 (edits reduced), H3 (cost)
Results:
Analysis:
Daily batch processing eliminated real-time costs while meeting the “fresh enough” requirement. The 10-bullet digest format reduced cognitive load and editing time.
Hypothesis: H1 (time saved), H2 (edits reduced), H4 (latency)
Results:
Analysis:
Structured action items with owners and deadlines reduced follow-up time. The minutes-tier freshness met the use case without requiring real-time processing.
Hypothesis: H1 (time saved), H2 (edits reduced), H4 (latency)
Results:
Analysis:
While latency was acceptable, the time saved and edit reduction didn’t meet the success gates. The copilot was helpful but not transformative enough to justify keeping. Future experiments with on-device models or better prompts may improve results.
Hypothesis: H5 (reversibility)
Results:
Analysis:
The kill switch provided clean reversibility with zero AI API calls when disabled. All fallback events were logged, enabling audit trails.
Why it failed:
Lessons:
Next steps:
1. Start Tiny
2. Map Freshness to Decisions
3. Add Kill Switch
AI_ENABLED=false4. Measure Strictly
5. Keep Only What Beats Control
This PoC proved that small, measurable AI helpers can deliver real value when:
Two copilots passed, one failed and was retired. The pattern is portable, governed, and repeatable - ready to template for other workflows.
Author: Dr. Atabak Kheirkhah
Date: November 17, 2025
Contact: atabakkheirkhah@gmail.com
This is a personal blog. The views, thoughts, and opinions expressed here are my own and do not represent, reflect, or constitute the views, policies, or positions of any employer, university, client, or organization I am associated with or have been associated with.