Advanced Usage
Take your Knify workflows to the next level with advanced patterns and techniques.
Multi-Turn Workflows
Build complex workflows by chaining multiple job continuations:
# Step 1: Initial analysis
JOB_ID=$(curl -X POST https://api.knify.io/jobs \
-H "Authorization: Bearer $KNIFY_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"spec": {
"job_type": "cursor_task",
"prompt": "Analyze this codebase and identify bottlenecks"
}
}' | jq -r '.job_id')
# Wait for completion...
sleep 60
# Step 2: Generate optimizations
curl -X POST https://api.knify.io/jobs/$JOB_ID/continue \
-H "Authorization: Bearer $KNIFY_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"spec": {
"job_type": "cursor_task",
"prompt": "Based on your analysis, implement optimizations"
}
}'
# Step 3: Run tests
curl -X POST https://api.knify.io/jobs/$JOB_ID/continue \
-H "Authorization: Bearer $KNIFY_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"spec": {
"job_type": "cursor_task",
"prompt": "Run tests to verify the optimizations"
}
}'
# Step 4: Generate report
curl -X POST https://api.knify.io/jobs/$JOB_ID/continue \
-H "Authorization: Bearer $KNIFY_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"spec": {
"job_type": "cursor_task",
"prompt": "Create a markdown report summarizing all changes"
}
}'
Parallel Job Execution
Run multiple independent jobs in parallel:
import asyncio
import aiohttp
async def create_job(session, prompt):
async with session.post(
'https://api.knify.io/jobs',
headers={'Authorization': f'Bearer {API_KEY}'},
json={'spec': {'job_type': 'cursor_task', 'prompt': prompt}}
) as resp:
return await resp.json()
async def run_parallel_jobs():
prompts = [
"Analyze the frontend codebase",
"Analyze the backend codebase",
"Analyze the database schema",
"Review CI/CD configuration"
]
async with aiohttp.ClientSession() as session:
tasks = [create_job(session, prompt) for prompt in prompts]
results = await asyncio.gather(*tasks)
return results
# Run all jobs in parallel
results = asyncio.run(run_parallel_jobs())
job_ids = [r['job_id'] for r in results]
Event Streaming
Monitor job progress in real-time with Server-Sent Events:
class KnifyJobMonitor {
constructor(apiKey, jobId) {
this.apiKey = apiKey;
this.jobId = jobId;
this.eventSource = null;
}
start() {
this.eventSource = new EventSource(
`https://api.knify.io/jobs/${this.jobId}/cursor/stream`,
{
headers: {
'Authorization': `Bearer ${this.apiKey}`
}
}
);
this.eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
this.handleEvent(data);
};
this.eventSource.onerror = (error) => {
console.error('Stream error:', error);
this.eventSource.close();
};
}
handleEvent(event) {
switch (event.type) {
case 'agent_message':
console.log(`Agent: ${event.message}`);
break;
case 'tool_use':
console.log(`Using tool: ${event.tool}`);
break;
case 'file_created':
console.log(`Created file: ${event.path}`);
break;
case 'completed':
console.log('Job completed!');
this.eventSource.close();
break;
}
}
stop() {
if (this.eventSource) {
this.eventSource.close();
}
}
}
// Usage
const monitor = new KnifyJobMonitor(API_KEY, 'job_abc123');
monitor.start();
Workspace Templating
Create reusable workspace templates for your team:
# Directory structure for a testing workspace
mkdir -p knify-testing-workspace/{runbooks,scripts,config,tests}
# Add runbooks
cat > knify-testing-workspace/runbooks/setup.md <<'EOF'
# Test Setup Runbook
1. Install dependencies
2. Configure test database
3. Seed test data
4. Verify setup
EOF
# Add configuration
cat > knify-testing-workspace/config/test-config.json <<'EOF'
{
"timeout": 300,
"retries": 3,
"parallel": true
}
EOF
# Add environment template
cat > knify-testing-workspace/.env.example <<'EOF'
DATABASE_URL=postgresql://localhost/test_db
API_KEY=your_api_key_here
EOF
# Upload to Knify
tar -czf testing-workspace.tar.gz knify-testing-workspace/
curl -X POST https://api.knify.io/workspaces \
-H "Authorization: Bearer $KNIFY_API_KEY" \
-F "workspace=@testing-workspace.tar.gz" \
-F "name=testing-template-v1"
Dynamic Job Configuration
Build job specs programmatically:
from typing import Dict, Any, Optional
class KnifyJobBuilder:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.knify.io"
def build_code_review_job(
self,
repo_path: str,
focus_areas: list[str],
model: str = "sonnet-4.5"
) -> Dict[str, Any]:
focus = ", ".join(focus_areas)
prompt = f"""
Review the codebase at {repo_path}.
Focus on: {focus}
Provide:
1. Security vulnerabilities
2. Performance issues
3. Code quality concerns
4. Best practice violations
"""
return {
"spec": {
"job_type": "cursor_task",
"prompt": prompt.strip(),
"model": model,
"workspace_path": repo_path,
"metadata": {
"type": "code_review",
"focus_areas": focus_areas
}
}
}
def build_e2e_test_job(
self,
test_scenario: str,
environment: str = "staging"
) -> Dict[str, Any]:
return {
"spec": {
"job_type": "fh_e2e_tools",
"prompt": f"Run E2E test: {test_scenario} in {environment}",
"workspace_path": "/workspaces/e2e-tests",
"metadata": {
"type": "e2e_test",
"scenario": test_scenario,
"environment": environment
}
}
}
# Usage
builder = KnifyJobBuilder(API_KEY)
job_spec = builder.build_code_review_job(
repo_path="/workspaces/my-app",
focus_areas=["security", "performance", "maintainability"]
)
response = requests.post(
f"{builder.base_url}/jobs",
headers={"Authorization": f"Bearer {builder.api_key}"},
json=job_spec
)
Error Handling and Retries
Implement robust error handling:
import time
from typing import Optional
class KnifyClient:
def __init__(self, api_key: str, max_retries: int = 3):
self.api_key = api_key
self.base_url = "https://api.knify.io"
self.max_retries = max_retries
def create_job_with_retry(self, job_spec: dict) -> Optional[dict]:
for attempt in range(self.max_retries):
try:
response = requests.post(
f"{self.base_url}/jobs",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json=job_spec,
timeout=30
)
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
# Rate limited, wait and retry
retry_after = int(response.headers.get('Retry-After', 60))
print(f"Rate limited, waiting {retry_after}s...")
time.sleep(retry_after)
elif response.status_code >= 500:
# Server error, retry with backoff
wait_time = 2 ** attempt
print(f"Server error, retrying in {wait_time}s...")
time.sleep(wait_time)
else:
# Client error, don't retry
print(f"Error: {response.status_code} - {response.text}")
return None
except requests.exceptions.Timeout:
print(f"Timeout on attempt {attempt + 1}")
if attempt < self.max_retries - 1:
time.sleep(2 ** attempt)
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
return None
print("Max retries exceeded")
return None
# Usage
client = KnifyClient(API_KEY, max_retries=3)
result = client.create_job_with_retry({
"spec": {
"job_type": "cursor_task",
"prompt": "Your task"
}
})
Batch Processing
Process multiple items with a single job:
# Generate batch processing prompt
cat > batch_items.json <<'EOF'
[
{"id": "item_1", "data": "..."},
{"id": "item_2", "data": "..."},
{"id": "item_3", "data": "..."}
]
EOF
# Create job with batch data
curl -X POST https://api.knify.io/jobs \
-H "Authorization: Bearer $KNIFY_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"spec": {
"job_type": "cursor_task",
"prompt": "Process each item in batch_items.json: 1) Validate data, 2) Transform, 3) Save results to output/{id}.json",
"workspace_path": "/workspaces/batch-processor"
}
}'
Job Orchestration
Build a job orchestration system:
class JobOrchestrator:
def __init__(self, api_key: str):
self.client = KnifyClient(api_key)
async def run_pipeline(self, stages: list[dict]) -> dict:
"""
Run a pipeline of dependent job stages.
Each stage runs after the previous completes.
"""
results = []
job_id = None
for i, stage in enumerate(stages):
print(f"Running stage {i+1}/{len(stages)}: {stage['name']}")
if job_id:
# Continue from previous job
response = await self.client.continue_job(job_id, stage)
else:
# Create initial job
response = await self.client.create_job(stage)
job_id = response['job_id']
# Wait for completion
final_status = await self.client.wait_for_completion(job_id)
results.append({
"stage": stage['name'],
"status": final_status,
"artifacts": await self.client.get_artifacts(job_id)
})
if final_status != 'completed':
print(f"Stage {stage['name']} failed, stopping pipeline")
break
return {
"pipeline_status": "completed" if all(r['status'] == 'completed' for r in results) else "failed",
"stages": results
}
# Usage
orchestrator = JobOrchestrator(API_KEY)
pipeline = [
{
"name": "Setup",
"spec": {
"job_type": "cursor_task",
"prompt": "Set up the project environment"
}
},
{
"name": "Build",
"spec": {
"job_type": "cursor_task",
"prompt": "Build the application"
}
},
{
"name": "Test",
"spec": {
"job_type": "cursor_task",
"prompt": "Run all tests"
}
},
{
"name": "Deploy",
"spec": {
"job_type": "cursor_task",
"prompt": "Deploy to staging"
}
}
]
result = await orchestrator.run_pipeline(pipeline)
Webhooks for Async Updates
Configure webhooks to get notified when jobs complete:
from flask import Flask, request, jsonify
import hmac
import hashlib
app = Flask(__name__)
WEBHOOK_SECRET = "your_webhook_secret"
@app.route('/webhooks/knify', methods=['POST'])
def handle_knify_webhook():
# Verify signature
signature = request.headers.get('X-Knify-Signature')
if not verify_signature(request.data, signature):
return jsonify({"error": "Invalid signature"}), 401
event = request.json
if event['type'] == 'job.completed':
job_id = event['job_id']
status = event['status']
print(f"Job {job_id} completed with status: {status}")
# Fetch artifacts
artifacts = fetch_artifacts(job_id)
# Process results
process_job_results(job_id, artifacts)
return jsonify({"status": "ok"})
def verify_signature(payload, signature):
computed = hmac.new(
WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={computed}", signature)
if __name__ == '__main__':
app.run(port=5000)
Register webhook:
curl -X POST https://api.knify.io/webhooks \
-H "Authorization: Bearer $KNIFY_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"url": "https://your-domain.com/webhooks/knify",
"events": ["job.completed", "job.failed"],
"secret": "your_webhook_secret"
}'
Performance Optimization
1. Reuse Sandboxes
Instead of cleanup, keep sandboxes alive for related jobs:
# Job 1
JOB_ID=$(curl -X POST .../jobs -d '{"spec": {...}}' | jq -r '.job_id')
# Job 2 (reuses sandbox)
curl -X POST .../jobs/$JOB_ID/continue -d '{"spec": {...}}'
# Job 3 (reuses sandbox)
curl -X POST .../jobs/$JOB_ID/continue -d '{"spec": {...}}'
# Only cleanup when truly done
curl -X POST .../jobs/$JOB_ID/cleanup
2. Pre-warm Workspaces
Upload workspaces once, reference many times:
# Upload once
curl -X POST .../workspaces -F "workspace=@my-workspace.tar.gz"
# Use in many jobs
for i in {1..10}; do
curl -X POST .../jobs \
-d '{"spec": {"workspace_path": "/workspaces/my-workspace", ...}}'
done
3. Choose Appropriate Models
Match model to task complexity:
| Task Complexity | Model | Speed |
|---|---|---|
| Simple scripts | auto |
Fast |
| Code refactoring | sonnet-4.5 |
Medium |
| Complex architecture | opus-4.1 |
Slower |
Return to the introduction.