Advanced Usage

Take your Knify workflows to the next level with advanced patterns and techniques.

Multi-Turn Workflows

Build complex workflows by chaining multiple job continuations:

# Step 1: Initial analysis
JOB_ID=$(curl -X POST https://api.knify.io/jobs \
  -H "Authorization: Bearer $KNIFY_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "spec": {
      "job_type": "cursor_task",
      "prompt": "Analyze this codebase and identify bottlenecks"
    }
  }' | jq -r '.job_id')

# Wait for completion...
sleep 60

# Step 2: Generate optimizations
curl -X POST https://api.knify.io/jobs/$JOB_ID/continue \
  -H "Authorization: Bearer $KNIFY_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "spec": {
      "job_type": "cursor_task",
      "prompt": "Based on your analysis, implement optimizations"
    }
  }'

# Step 3: Run tests
curl -X POST https://api.knify.io/jobs/$JOB_ID/continue \
  -H "Authorization: Bearer $KNIFY_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "spec": {
      "job_type": "cursor_task",
      "prompt": "Run tests to verify the optimizations"
    }
  }'

# Step 4: Generate report
curl -X POST https://api.knify.io/jobs/$JOB_ID/continue \
  -H "Authorization: Bearer $KNIFY_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "spec": {
      "job_type": "cursor_task",
      "prompt": "Create a markdown report summarizing all changes"
    }
  }'

Parallel Job Execution

Run multiple independent jobs in parallel:

import asyncio
import aiohttp

async def create_job(session, prompt):
    async with session.post(
        'https://api.knify.io/jobs',
        headers={'Authorization': f'Bearer {API_KEY}'},
        json={'spec': {'job_type': 'cursor_task', 'prompt': prompt}}
    ) as resp:
        return await resp.json()

async def run_parallel_jobs():
    prompts = [
        "Analyze the frontend codebase",
        "Analyze the backend codebase",
        "Analyze the database schema",
        "Review CI/CD configuration"
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [create_job(session, prompt) for prompt in prompts]
        results = await asyncio.gather(*tasks)
        return results

# Run all jobs in parallel
results = asyncio.run(run_parallel_jobs())
job_ids = [r['job_id'] for r in results]

Event Streaming

Monitor job progress in real-time with Server-Sent Events:

class KnifyJobMonitor {
  constructor(apiKey, jobId) {
    this.apiKey = apiKey;
    this.jobId = jobId;
    this.eventSource = null;
  }

  start() {
    this.eventSource = new EventSource(
      `https://api.knify.io/jobs/${this.jobId}/cursor/stream`,
      {
        headers: {
          'Authorization': `Bearer ${this.apiKey}`
        }
      }
    );

    this.eventSource.onmessage = (event) => {
      const data = JSON.parse(event.data);
      this.handleEvent(data);
    };

    this.eventSource.onerror = (error) => {
      console.error('Stream error:', error);
      this.eventSource.close();
    };
  }

  handleEvent(event) {
    switch (event.type) {
      case 'agent_message':
        console.log(`Agent: ${event.message}`);
        break;
      case 'tool_use':
        console.log(`Using tool: ${event.tool}`);
        break;
      case 'file_created':
        console.log(`Created file: ${event.path}`);
        break;
      case 'completed':
        console.log('Job completed!');
        this.eventSource.close();
        break;
    }
  }

  stop() {
    if (this.eventSource) {
      this.eventSource.close();
    }
  }
}

// Usage
const monitor = new KnifyJobMonitor(API_KEY, 'job_abc123');
monitor.start();

Workspace Templating

Create reusable workspace templates for your team:

# Directory structure for a testing workspace
mkdir -p knify-testing-workspace/{runbooks,scripts,config,tests}

# Add runbooks
cat > knify-testing-workspace/runbooks/setup.md <<'EOF'
# Test Setup Runbook

1. Install dependencies
2. Configure test database
3. Seed test data
4. Verify setup
EOF

# Add configuration
cat > knify-testing-workspace/config/test-config.json <<'EOF'
{
  "timeout": 300,
  "retries": 3,
  "parallel": true
}
EOF

# Add environment template
cat > knify-testing-workspace/.env.example <<'EOF'
DATABASE_URL=postgresql://localhost/test_db
API_KEY=your_api_key_here
EOF

# Upload to Knify
tar -czf testing-workspace.tar.gz knify-testing-workspace/
curl -X POST https://api.knify.io/workspaces \
  -H "Authorization: Bearer $KNIFY_API_KEY" \
  -F "workspace=@testing-workspace.tar.gz" \
  -F "name=testing-template-v1"

Dynamic Job Configuration

Build job specs programmatically:

from typing import Dict, Any, Optional

class KnifyJobBuilder:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.knify.io"
        
    def build_code_review_job(
        self,
        repo_path: str,
        focus_areas: list[str],
        model: str = "sonnet-4.5"
    ) -> Dict[str, Any]:
        focus = ", ".join(focus_areas)
        prompt = f"""
        Review the codebase at {repo_path}.
        Focus on: {focus}
        
        Provide:
        1. Security vulnerabilities
        2. Performance issues
        3. Code quality concerns
        4. Best practice violations
        """
        
        return {
            "spec": {
                "job_type": "cursor_task",
                "prompt": prompt.strip(),
                "model": model,
                "workspace_path": repo_path,
                "metadata": {
                    "type": "code_review",
                    "focus_areas": focus_areas
                }
            }
        }
    
    def build_e2e_test_job(
        self,
        test_scenario: str,
        environment: str = "staging"
    ) -> Dict[str, Any]:
        return {
            "spec": {
                "job_type": "fh_e2e_tools",
                "prompt": f"Run E2E test: {test_scenario} in {environment}",
                "workspace_path": "/workspaces/e2e-tests",
                "metadata": {
                    "type": "e2e_test",
                    "scenario": test_scenario,
                    "environment": environment
                }
            }
        }

# Usage
builder = KnifyJobBuilder(API_KEY)

job_spec = builder.build_code_review_job(
    repo_path="/workspaces/my-app",
    focus_areas=["security", "performance", "maintainability"]
)

response = requests.post(
    f"{builder.base_url}/jobs",
    headers={"Authorization": f"Bearer {builder.api_key}"},
    json=job_spec
)

Error Handling and Retries

Implement robust error handling:

import time
from typing import Optional

class KnifyClient:
    def __init__(self, api_key: str, max_retries: int = 3):
        self.api_key = api_key
        self.base_url = "https://api.knify.io"
        self.max_retries = max_retries
        
    def create_job_with_retry(self, job_spec: dict) -> Optional[dict]:
        for attempt in range(self.max_retries):
            try:
                response = requests.post(
                    f"{self.base_url}/jobs",
                    headers={
                        "Authorization": f"Bearer {self.api_key}",
                        "Content-Type": "application/json"
                    },
                    json=job_spec,
                    timeout=30
                )
                
                if response.status_code == 200:
                    return response.json()
                elif response.status_code == 429:
                    # Rate limited, wait and retry
                    retry_after = int(response.headers.get('Retry-After', 60))
                    print(f"Rate limited, waiting {retry_after}s...")
                    time.sleep(retry_after)
                elif response.status_code >= 500:
                    # Server error, retry with backoff
                    wait_time = 2 ** attempt
                    print(f"Server error, retrying in {wait_time}s...")
                    time.sleep(wait_time)
                else:
                    # Client error, don't retry
                    print(f"Error: {response.status_code} - {response.text}")
                    return None
                    
            except requests.exceptions.Timeout:
                print(f"Timeout on attempt {attempt + 1}")
                if attempt < self.max_retries - 1:
                    time.sleep(2 ** attempt)
            except requests.exceptions.RequestException as e:
                print(f"Request failed: {e}")
                return None
                
        print("Max retries exceeded")
        return None

# Usage
client = KnifyClient(API_KEY, max_retries=3)
result = client.create_job_with_retry({
    "spec": {
        "job_type": "cursor_task",
        "prompt": "Your task"
    }
})

Batch Processing

Process multiple items with a single job:

# Generate batch processing prompt
cat > batch_items.json <<'EOF'
[
  {"id": "item_1", "data": "..."},
  {"id": "item_2", "data": "..."},
  {"id": "item_3", "data": "..."}
]
EOF

# Create job with batch data
curl -X POST https://api.knify.io/jobs \
  -H "Authorization: Bearer $KNIFY_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "spec": {
      "job_type": "cursor_task",
      "prompt": "Process each item in batch_items.json: 1) Validate data, 2) Transform, 3) Save results to output/{id}.json",
      "workspace_path": "/workspaces/batch-processor"
    }
  }'

Job Orchestration

Build a job orchestration system:

class JobOrchestrator:
    def __init__(self, api_key: str):
        self.client = KnifyClient(api_key)
        
    async def run_pipeline(self, stages: list[dict]) -> dict:
        """
        Run a pipeline of dependent job stages.
        Each stage runs after the previous completes.
        """
        results = []
        job_id = None
        
        for i, stage in enumerate(stages):
            print(f"Running stage {i+1}/{len(stages)}: {stage['name']}")
            
            if job_id:
                # Continue from previous job
                response = await self.client.continue_job(job_id, stage)
            else:
                # Create initial job
                response = await self.client.create_job(stage)
                job_id = response['job_id']
            
            # Wait for completion
            final_status = await self.client.wait_for_completion(job_id)
            results.append({
                "stage": stage['name'],
                "status": final_status,
                "artifacts": await self.client.get_artifacts(job_id)
            })
            
            if final_status != 'completed':
                print(f"Stage {stage['name']} failed, stopping pipeline")
                break
                
        return {
            "pipeline_status": "completed" if all(r['status'] == 'completed' for r in results) else "failed",
            "stages": results
        }

# Usage
orchestrator = JobOrchestrator(API_KEY)

pipeline = [
    {
        "name": "Setup",
        "spec": {
            "job_type": "cursor_task",
            "prompt": "Set up the project environment"
        }
    },
    {
        "name": "Build",
        "spec": {
            "job_type": "cursor_task",
            "prompt": "Build the application"
        }
    },
    {
        "name": "Test",
        "spec": {
            "job_type": "cursor_task",
            "prompt": "Run all tests"
        }
    },
    {
        "name": "Deploy",
        "spec": {
            "job_type": "cursor_task",
            "prompt": "Deploy to staging"
        }
    }
]

result = await orchestrator.run_pipeline(pipeline)

Webhooks for Async Updates

Configure webhooks to get notified when jobs complete:

from flask import Flask, request, jsonify
import hmac
import hashlib

app = Flask(__name__)
WEBHOOK_SECRET = "your_webhook_secret"

@app.route('/webhooks/knify', methods=['POST'])
def handle_knify_webhook():
    # Verify signature
    signature = request.headers.get('X-Knify-Signature')
    if not verify_signature(request.data, signature):
        return jsonify({"error": "Invalid signature"}), 401
    
    event = request.json
    
    if event['type'] == 'job.completed':
        job_id = event['job_id']
        status = event['status']
        
        print(f"Job {job_id} completed with status: {status}")
        
        # Fetch artifacts
        artifacts = fetch_artifacts(job_id)
        
        # Process results
        process_job_results(job_id, artifacts)
        
    return jsonify({"status": "ok"})

def verify_signature(payload, signature):
    computed = hmac.new(
        WEBHOOK_SECRET.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(f"sha256={computed}", signature)

if __name__ == '__main__':
    app.run(port=5000)

Register webhook:

curl -X POST https://api.knify.io/webhooks \
  -H "Authorization: Bearer $KNIFY_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "url": "https://your-domain.com/webhooks/knify",
    "events": ["job.completed", "job.failed"],
    "secret": "your_webhook_secret"
  }'

Performance Optimization

1. Reuse Sandboxes

Instead of cleanup, keep sandboxes alive for related jobs:

# Job 1
JOB_ID=$(curl -X POST .../jobs -d '{"spec": {...}}' | jq -r '.job_id')

# Job 2 (reuses sandbox)
curl -X POST .../jobs/$JOB_ID/continue -d '{"spec": {...}}'

# Job 3 (reuses sandbox)
curl -X POST .../jobs/$JOB_ID/continue -d '{"spec": {...}}'

# Only cleanup when truly done
curl -X POST .../jobs/$JOB_ID/cleanup

2. Pre-warm Workspaces

Upload workspaces once, reference many times:

# Upload once
curl -X POST .../workspaces -F "workspace=@my-workspace.tar.gz"

# Use in many jobs
for i in {1..10}; do
  curl -X POST .../jobs \
    -d '{"spec": {"workspace_path": "/workspaces/my-workspace", ...}}'
done

3. Choose Appropriate Models

Match model to task complexity:

Task Complexity Model Speed
Simple scripts auto Fast
Code refactoring sonnet-4.5 Medium
Complex architecture opus-4.1 Slower
Back to Overview

Return to the introduction.