Skip to main content

Overview

Kapso’s deployment system is extensible. You can add custom strategies by creating a new directory under src/deployment/strategies/.

Strategy Structure

Each strategy is a directory with these files:
src/deployment/strategies/my_strategy/
├── __init__.py
├── runner.py              # Runner implementation
├── config.yaml            # Strategy configuration
├── selector_instruction.txt   # Instructions for auto-selection
└── adapter_instruction.txt    # Instructions for code adaptation

Step 1: Create Directory

mkdir -p src/deployment/strategies/my_strategy

Step 2: Implement Runner

Create runner.py with a Runner class:
from kapso.deployment.strategies.base import Runner
from typing import Any, Dict

class MyStrategyRunner(Runner):
    """Runner for my custom strategy."""

    def __init__(
        self,
        code_path: str,
        module: str = "main",
        callable: str = "predict",
        **kwargs,
    ):
        self.code_path = code_path
        self.module = module
        self.callable = callable
        self._running = False

    def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """Execute with inputs."""
        if not self._running:
            self.start()

        try:
            # Your execution logic here
            result = self._execute(inputs)
            return {"status": "success", "output": result}
        except Exception as e:
            return {"status": "error", "error": str(e)}

    def start(self) -> None:
        """Start the service."""
        # Initialize resources
        self._running = True

    def stop(self) -> None:
        """Stop and cleanup."""
        # Cleanup resources
        self._running = False

    def is_healthy(self) -> bool:
        """Check if service is running."""
        return self._running

    def get_logs(self) -> str:
        """Return execution logs."""
        return ""

    def _execute(self, inputs: Dict) -> Any:
        """Internal execution logic."""
        # Implement your execution here
        pass

Step 3: Create Configuration

Create config.yaml:
name: my_strategy
description: "My custom deployment strategy"

# Default run interface
run_interface:
  type: "function"
  module: "main"
  callable: "predict"

# Resource requirements
resources:
  cpu: "1"
  memory: "1Gi"
  gpu: false

# Strategy-specific settings
settings:
  timeout: 300
  retries: 3

Step 4: Selector Instruction

Create selector_instruction.txt for auto-selection:
## My Strategy

### Summary
A custom deployment strategy for [specific use case].

### When to Use
- Condition 1
- Condition 2
- Condition 3

### When NOT to Use
- Anti-condition 1
- Anti-condition 2

### Requirements
- Requirement 1
- Requirement 2

Step 5: Adapter Instruction

Create adapter_instruction.txt for code adaptation:
## Adaptation Instructions for My Strategy

### Required Files
1. `run.py` - Entry point script
2. `requirements.txt` - Dependencies

### Entry Point Format
```python
def predict(inputs: dict) -> dict:
    """Main prediction function."""
    # Implementation
    return {"result": ...}

Environment Variables

  • MY_STRATEGY_CONFIG - Configuration path

Notes

  • Keep dependencies minimal
  • Handle errors gracefully

## Step 6: Register Strategy

Create `__init__.py`:

```python
from .runner import MyStrategyRunner

__all__ = ["MyStrategyRunner"]
The strategy is auto-discovered by the registry.

Runner Base Class

from abc import ABC, abstractmethod
from typing import Any, Dict

class Runner(ABC):
    """Base class for deployment runners."""

    @abstractmethod
    def run(self, inputs: Dict[str, Any]) -> Dict[str, Any]:
        """Execute with inputs. Returns {"status": ..., "output": ...}."""
        pass

    @abstractmethod
    def stop(self) -> None:
        """Stop and cleanup resources."""
        pass

    @abstractmethod
    def start(self) -> None:
        """Start or restart the service."""
        pass

    @abstractmethod
    def is_healthy(self) -> bool:
        """Check if service is running and ready."""
        pass

    def get_logs(self) -> str:
        """Return execution logs."""
        return ""

Strategy Registry

Strategies are auto-registered when discovered:
from kapso.deployment.strategies import StrategyRegistry

registry = StrategyRegistry.get()

# List all strategies
strategies = registry.list_strategies()
print(strategies)  # ["local", "docker", "modal", "bentoml", "my_strategy"]

# Check if strategy exists
exists = registry.strategy_exists("my_strategy")

# Get runner class
runner_class = registry.get_runner_class("my_strategy")

# Get instructions
selector_inst = registry.get_selector_instruction("my_strategy")
adapter_inst = registry.get_adapter_instruction("my_strategy")

Using Your Strategy

from kapso.kapso import Kapso
from kapso.deployment import DeployStrategy

# Add to DeployStrategy enum (or use string)
deployed_program = kapso.deploy(solution, strategy="my_strategy")

# Or via factory
from kapso.deployment import DeploymentFactory, DeployConfig

config = DeployConfig(solution=solution)
deployed_program = DeploymentFactory.create("my_strategy", config)

Testing Your Strategy

import pytest
from kapso.deployment.strategies.my_strategy.runner import MyStrategyRunner

def test_runner_lifecycle():
    runner = MyStrategyRunner(
        code_path="/path/to/code",
        module="main",
        callable="predict",
    )

    # Test start
    runner.start()
    assert runner.is_healthy()

    # Test run
    result = runner.run({"input": "test"})
    assert result["status"] == "success"

    # Test stop
    runner.stop()
    assert not runner.is_healthy()

Example: HTTP Strategy

A strategy that deploys as an HTTP server:
import subprocess
import requests
from kapso.deployment.strategies.base import Runner

class HttpRunner(Runner):
    def __init__(self, code_path: str, port: int = 8000, **kwargs):
        self.code_path = code_path
        self.port = port
        self.process = None
        self.endpoint = f"http://localhost:{port}"

    def start(self) -> None:
        self.process = subprocess.Popen(
            ["python", "-m", "uvicorn", "main:app", "--port", str(self.port)],
            cwd=self.code_path,
        )
        # Wait for server to be ready
        self._wait_for_ready()

    def run(self, inputs: dict) -> dict:
        response = requests.post(f"{self.endpoint}/predict", json=inputs)
        return response.json()

    def stop(self) -> None:
        if self.process:
            self.process.terminate()
            self.process.wait()
            self.process = None

    def is_healthy(self) -> bool:
        try:
            response = requests.get(f"{self.endpoint}/health")
            return response.status_code == 200
        except:
            return False