Overview
Stop conditions determine when Kapso should stop the experimentation loop. They check after each experiment whether to continue or return the best result.
Available Conditions
| Condition | Description |
|---|
never | Never stop early (default) |
threshold | Stop when score reaches threshold |
max_iterations | Stop after N iterations |
plateau | Stop if no improvement for N iterations |
cost_limit | Stop when cost limit reached |
time_limit | Stop when time limit reached |
consecutive_errors | Stop after N consecutive errors |
composite | Combine multiple conditions |
Usage
Via Kapso API
solution = kapso.evolve(
goal="Build a classifier",
stop_condition="threshold",
stop_condition_params={"threshold": 0.95},
)
Direct Usage
from src.environment.stop_conditions import StopConditionFactory
condition = StopConditionFactory.create(
"threshold",
threshold=0.95,
)
decision = condition.check(
best_score=0.92,
current_score=0.88,
iteration=5,
)
if decision.should_stop:
print(f"Stopping: {decision.reason}")
never
Never stop early. Run until budget exhausted.
condition = StopConditionFactory.create("never")
This is the default stop condition.
threshold
Stop when score reaches a threshold.
condition = StopConditionFactory.create(
"threshold",
threshold=0.95,
maximize=True, # Stop when score >= threshold
)
Parameters
| Parameter | Default | Description |
|---|
threshold | - | Score to reach |
maximize | True | If True, stop when >=; else when <= |
Example
# Stop when accuracy >= 0.95
condition = StopConditionFactory.create("threshold", threshold=0.95)
# Stop when loss <= 0.1
condition = StopConditionFactory.create("threshold", threshold=0.1, maximize=False)
max_iterations
Stop after maximum iterations.
condition = StopConditionFactory.create(
"max_iterations",
max_iter=50,
)
Parameters
| Parameter | Default | Description |
|---|
max_iter | - | Maximum iterations |
plateau
Stop if no improvement for N iterations.
condition = StopConditionFactory.create(
"plateau",
patience=5,
min_delta=0.001,
)
Parameters
| Parameter | Default | Description |
|---|
patience | 5 | Iterations without improvement |
min_delta | 0.001 | Minimum improvement to count |
How It Works
def check(self, best_score, current_score, iteration, **context):
if best_score > self._best_seen + self.min_delta:
self._best_seen = best_score
self._no_improve_count = 0
else:
self._no_improve_count += 1
return StopDecision(
should_stop=self._no_improve_count >= self.patience,
reason=f"No improvement for {self._no_improve_count}/{self.patience}",
)
cost_limit
Stop when cost exceeds limit.
condition = StopConditionFactory.create(
"cost_limit",
max_cost=50.0, # dollars
)
Parameters
| Parameter | Default | Description |
|---|
max_cost | - | Maximum cost in dollars |
Requires cost in context when calling check().
time_limit
Stop when time limit exceeded.
condition = StopConditionFactory.create(
"time_limit",
max_seconds=3600, # 1 hour
)
Parameters
| Parameter | Default | Description |
|---|
max_seconds | - | Maximum time in seconds |
Requires elapsed_time in context when calling check().
consecutive_errors
Stop after N consecutive errors.
condition = StopConditionFactory.create(
"consecutive_errors",
max_errors=5,
)
Parameters
| Parameter | Default | Description |
|---|
max_errors | 5 | Max consecutive errors |
Requires had_error in context when calling check().
composite
Combine multiple stop conditions.
condition = StopConditionFactory.create(
"composite",
conditions=[
("threshold", {"threshold": 0.95}),
("max_iterations", {"max_iter": 50}),
("plateau", {"patience": 10}),
],
mode="any", # Stop if ANY condition met
)
Parameters
| Parameter | Default | Description |
|---|
conditions | - | List of (type, params) tuples |
mode | "any" | "any" or "all" |
Modes
any: Stop if ANY condition is met
all: Stop only if ALL conditions are met
Examples
# Stop if score >= 0.95 OR after 50 iterations
condition = StopConditionFactory.create(
"composite",
conditions=[
("threshold", {"threshold": 0.95}),
("max_iterations", {"max_iter": 50}),
],
mode="any",
)
# Stop only if score >= 0.9 AND no improvement for 5 iterations
condition = StopConditionFactory.create(
"composite",
conditions=[
("threshold", {"threshold": 0.9}),
("plateau", {"patience": 5}),
],
mode="all",
)
StopDecision
@dataclass
class StopDecision:
should_stop: bool # Whether to stop
reason: str # Human-readable reason
details: Dict = {} # Additional data
Creating Custom Conditions
from src.environment.stop_conditions.base import StopCondition, StopDecision
from src.environment.stop_conditions.factory import register_stop_condition
@register_stop_condition("my_custom_condition")
class MyCustomCondition(StopCondition):
description = "My custom stopping logic"
def __init__(self, my_param: float, **params):
super().__init__(**params)
self.my_param = my_param
def check(self, best_score, current_score, iteration, **context):
# Custom stopping logic
should_stop = compute_stop(best_score, self.my_param)
return StopDecision(
should_stop=should_stop,
reason=f"Custom condition with {self.my_param}",
details={"best_score": best_score},
)
Configuration
stop_condition:
type: "composite"
params:
conditions:
- ["threshold", {"threshold": 0.95}]
- ["max_iterations", {"max_iter": 50}]
mode: "any"
Listing Conditions
PYTHONPATH=. python -m src.cli --list-stop-conditions
Best Practices
Use composite to combine threshold with a fallback like max_iterations.
Plateau is useful when you don’t know the achievable score in advance.
never is risky for expensive experiments. Always set a budget fallback.
Common Patterns
Production Setup
# Stop at 0.95 OR after 100 iterations OR $50 cost
solution = kapso.evolve(
goal="...",
stop_condition="composite",
stop_condition_params={
"conditions": [
("threshold", {"threshold": 0.95}),
("max_iterations", {"max_iter": 100}),
("cost_limit", {"max_cost": 50.0}),
],
"mode": "any",
},
)
Quick Testing
# Stop after 3 iterations (for testing)
solution = kapso.evolve(
goal="...",
stop_condition="max_iterations",
stop_condition_params={"max_iter": 3},
)
Adaptive Stopping
# Stop when converged (no improvement for 10 iterations)
solution = kapso.evolve(
goal="...",
stop_condition="plateau",
stop_condition_params={"patience": 10, "min_delta": 0.001},
)