Overview
Kapso’s deployment system is extensible. You can add custom strategies by creating a new directory undersrc/deployment/strategies/.
Strategy Structure
Each strategy is a directory with these files:Copy
Ask AI
src/deployment/strategies/my_strategy/
├── __init__.py
├── runner.py # Runner implementation
├── config.yaml # Strategy configuration
├── selector_instruction.txt # Instructions for auto-selection
└── adapter_instruction.txt # Instructions for code adaptation
Step 1: Create Directory
Copy
Ask AI
mkdir -p src/deployment/strategies/my_strategy
Step 2: Implement Runner
Createrunner.py with a Runner class:
Copy
Ask AI
from kapso.deployment.strategies.base import Runner
from typing import Any, Dict
class MyStrategyRunner(Runner):
"""Runner for my custom strategy."""
def __init__(
self,
code_path: str,
module: str = "main",
callable: str = "predict",
**kwargs,
):
self.code_path = code_path
self.module = module
self.callable = callable
self._running = False
def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Execute with inputs."""
if not self._running:
self.start()
try:
# Your execution logic here
result = self._execute(inputs)
return {"status": "success", "output": result}
except Exception as e:
return {"status": "error", "error": str(e)}
def start(self) -> None:
"""Start the service."""
# Initialize resources
self._running = True
def stop(self) -> None:
"""Stop and cleanup."""
# Cleanup resources
self._running = False
def is_healthy(self) -> bool:
"""Check if service is running."""
return self._running
def get_logs(self) -> str:
"""Return execution logs."""
return ""
def _execute(self, inputs: Dict) -> Any:
"""Internal execution logic."""
# Implement your execution here
pass
Step 3: Create Configuration
Createconfig.yaml:
Copy
Ask AI
name: my_strategy
description: "My custom deployment strategy"
# Default run interface
run_interface:
type: "function"
module: "main"
callable: "predict"
# Resource requirements
resources:
cpu: "1"
memory: "1Gi"
gpu: false
# Strategy-specific settings
settings:
timeout: 300
retries: 3
Step 4: Selector Instruction
Createselector_instruction.txt for auto-selection:
Copy
Ask AI
## My Strategy
### Summary
A custom deployment strategy for [specific use case].
### When to Use
- Condition 1
- Condition 2
- Condition 3
### When NOT to Use
- Anti-condition 1
- Anti-condition 2
### Requirements
- Requirement 1
- Requirement 2
Step 5: Adapter Instruction
Createadapter_instruction.txt for code adaptation:
Copy
Ask AI
## Adaptation Instructions for My Strategy
### Required Files
1. `run.py` - Entry point script
2. `requirements.txt` - Dependencies
### Entry Point Format
```python
def predict(inputs: dict) -> dict:
"""Main prediction function."""
# Implementation
return {"result": ...}
Environment Variables
MY_STRATEGY_CONFIG- Configuration path
Notes
- Keep dependencies minimal
- Handle errors gracefully
Copy
Ask AI
## Step 6: Register Strategy
Create `__init__.py`:
```python
from .runner import MyStrategyRunner
__all__ = ["MyStrategyRunner"]
Runner Base Class
Copy
Ask AI
from abc import ABC, abstractmethod
from typing import Any, Dict
class Runner(ABC):
"""Base class for deployment runners."""
@abstractmethod
def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
"""Execute with inputs. Returns {"status": ..., "output": ...}."""
pass
@abstractmethod
def stop(self) -> None:
"""Stop and cleanup resources."""
pass
@abstractmethod
def start(self) -> None:
"""Start or restart the service."""
pass
@abstractmethod
def is_healthy(self) -> bool:
"""Check if service is running and ready."""
pass
def get_logs(self) -> str:
"""Return execution logs."""
return ""
Strategy Registry
Strategies are auto-registered when discovered:Copy
Ask AI
from kapso.deployment.strategies import StrategyRegistry
registry = StrategyRegistry.get()
# List all strategies
strategies = registry.list_strategies()
print(strategies) # ["local", "docker", "modal", "bentoml", "my_strategy"]
# Check if strategy exists
exists = registry.strategy_exists("my_strategy")
# Get runner class
runner_class = registry.get_runner_class("my_strategy")
# Get instructions
selector_inst = registry.get_selector_instruction("my_strategy")
adapter_inst = registry.get_adapter_instruction("my_strategy")
Using Your Strategy
Copy
Ask AI
from kapso.kapso import Kapso
from kapso.deployment import DeployStrategy
# Add to DeployStrategy enum (or use string)
deployed_program = kapso.deploy(solution, strategy="my_strategy")
# Or via factory
from kapso.deployment import DeploymentFactory, DeployConfig
config = DeployConfig(solution=solution)
deployed_program = DeploymentFactory.create("my_strategy", config)
Testing Your Strategy
Copy
Ask AI
import pytest
from kapso.deployment.strategies.my_strategy.runner import MyStrategyRunner
def test_runner_lifecycle():
runner = MyStrategyRunner(
code_path="/path/to/code",
module="main",
callable="predict",
)
# Test start
runner.start()
assert runner.is_healthy()
# Test run
result = runner.run({"input": "test"})
assert result["status"] == "success"
# Test stop
runner.stop()
assert not runner.is_healthy()
Example: HTTP Strategy
A strategy that deploys as an HTTP server:Copy
Ask AI
import subprocess
import requests
from kapso.deployment.strategies.base import Runner
class HttpRunner(Runner):
def __init__(self, code_path: str, port: int = 8000, **kwargs):
self.code_path = code_path
self.port = port
self.process = None
self.endpoint = f"http://localhost:{port}"
def start(self) -> None:
self.process = subprocess.Popen(
["python", "-m", "uvicorn", "main:app", "--port", str(self.port)],
cwd=self.code_path,
)
# Wait for server to be ready
self._wait_for_ready()
def run(self, inputs: dict) -> dict:
response = requests.post(f"{self.endpoint}/predict", json=inputs)
return response.json()
def stop(self) -> None:
if self.process:
self.process.terminate()
self.process.wait()
self.process = None
def is_healthy(self) -> bool:
try:
response = requests.get(f"{self.endpoint}/health")
return response.status_code == 200
except:
return False