# src/deployment/strategies/mycloud/runner.py
"""
MyCloud Runner
Executes software by making HTTP requests to MyCloud endpoints.
"""
import requests
from typing import Any, Dict, List, Union
from src.deployment.strategies.base import Runner
class MyCloudRunner(Runner):
"""
Runner for MyCloud deployments.
Makes HTTP requests to the deployed MyCloud endpoint.
"""
def __init__(
self,
endpoint: str = None,
predict_path: str = "/predict",
code_path: str = None,
timeout: int = 300,
**kwargs, # Accept extra params from run_interface
):
"""
Initialize the MyCloud runner.
Args:
endpoint: The deployed endpoint URL
predict_path: Path for prediction endpoint
code_path: Path to code (for reference)
timeout: Request timeout in seconds
**kwargs: Additional parameters (ignored)
"""
self.endpoint = endpoint
self.predict_path = predict_path
self.code_path = code_path
self.timeout = timeout
self._logs: List[str] = []
if endpoint:
self._logs.append(f"Initialized with endpoint: {endpoint}")
else:
self._logs.append("Warning: No endpoint provided")
def run(self, inputs: Union[Dict, str, bytes]) -> Any:
"""
Execute by making HTTP POST to the endpoint.
Args:
inputs: Input data for the prediction
Returns:
Response from the endpoint
"""
if not self.endpoint:
return {
"error": "No endpoint configured",
"instructions": [
"1. Deploy to MyCloud first",
f"2. Run: mycloud deploy --app-name <name>",
"3. Use the returned endpoint URL",
],
}
url = f"{self.endpoint.rstrip('/')}{self.predict_path}"
self._logs.append(f"POST {url}")
try:
# Convert inputs to JSON-serializable format
if isinstance(inputs, bytes):
inputs = inputs.decode('utf-8')
if isinstance(inputs, str):
inputs = {"input": inputs}
response = requests.post(
url,
json=inputs,
timeout=self.timeout,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
result = response.json()
self._logs.append(f"Response: {response.status_code}")
return result
except requests.exceptions.Timeout:
self._logs.append(f"Timeout after {self.timeout}s")
return {"error": f"Request timed out after {self.timeout}s"}
except requests.exceptions.RequestException as e:
self._logs.append(f"Request error: {e}")
return {"error": str(e)}
def stop(self) -> None:
"""
Stop the runner.
For HTTP-based runners, this is usually a no-op.
Override if your cloud provider supports stopping instances.
"""
self._logs.append("Stopped")
# Optionally: call mycloud API to stop the instance
def is_healthy(self) -> bool:
"""
Check if the endpoint is healthy.
Returns:
True if endpoint responds, False otherwise
"""
if not self.endpoint:
return False
try:
health_url = f"{self.endpoint.rstrip('/')}/health"
response = requests.get(health_url, timeout=5)
return response.status_code == 200
except:
return False
def get_logs(self) -> str:
"""Get runner logs."""
return "\n".join(self._logs)