Error Handling
Best practices for handling errors and implementing retry strategies with SanctionsWise API.
Error Response Format​
All errors return a JSON response:
{
"error": "Error type",
"message": "Detailed error message"
}
Error Types​
400 Bad Request​
Invalid request parameters:
{
"error": "Field \"name\" is required and must be a string"
}
Common causes:
- Missing required fields
- Invalid field types
- Value out of range (e.g., threshold < 0.50)
Solution: Validate input before sending.
401 Unauthorized​
Missing API key:
{
"message": "Unauthorized"
}
Solution: Include x-api-key header.
403 Forbidden​
Invalid or revoked API key:
{
"message": "Forbidden - Invalid API key"
}
Solution: Check API key at app.orchestraprime.ai.
429 Too Many Requests​
Rate limit exceeded:
{
"message": "Rate limit exceeded",
"retry_after_seconds": 60
}
Solution: Implement exponential backoff.
500 Internal Server Error​
Server-side error:
{
"error": "Internal server error",
"message": "An error occurred processing your request"
}
Solution: Retry with exponential backoff.
Retry Strategy​
Exponential Backoff​
import time
from functools import wraps
def retry_with_backoff(max_retries=3, base_delay=1):
"""Decorator for retry with exponential backoff."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
# Rate limited - use retry-after header
retry_after = int(e.response.headers.get("Retry-After", base_delay * (2 ** attempt)))
time.sleep(retry_after)
elif e.response.status_code >= 500:
# Server error - exponential backoff
time.sleep(base_delay * (2 ** attempt))
else:
# Client error - don't retry
raise
except requests.exceptions.RequestException:
# Network error - exponential backoff
time.sleep(base_delay * (2 ** attempt))
# Final attempt
return func(*args, **kwargs)
return wrapper
return decorator
@retry_with_backoff(max_retries=3)
def screen_entity_with_retry(name, entity_type, threshold):
response = requests.post(
f"{BASE_URL}/screen/entity",
headers={"x-api-key": API_KEY, "Content-Type": "application/json"},
json={"name": name, "entity_type": entity_type, "threshold": threshold},
timeout=30
)
response.raise_for_status()
return response.json()
Circuit Breaker Pattern​
Prevent cascading failures:
from datetime import datetime, timedelta
class CircuitBreaker:
"""Simple circuit breaker for API calls."""
def __init__(self, failure_threshold=5, reset_timeout=60):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failures = 0
self.last_failure_time = None
self.state = "closed" # closed, open, half-open
def call(self, func, *args, **kwargs):
if self.state == "open":
if datetime.now() - self.last_failure_time > timedelta(seconds=self.reset_timeout):
self.state = "half-open"
else:
raise Exception("Circuit breaker is open")
try:
result = func(*args, **kwargs)
if self.state == "half-open":
self.state = "closed"
self.failures = 0
return result
except Exception as e:
self.failures += 1
self.last_failure_time = datetime.now()
if self.failures >= self.failure_threshold:
self.state = "open"
raise e
# Usage
breaker = CircuitBreaker(failure_threshold=5, reset_timeout=60)
result = breaker.call(screen_entity_with_retry, "Vladimir Putin", "individual", 0.60)
Connection Pooling​
Improve performance with connection reuse:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session():
"""Create a session with connection pooling and retry."""
session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["POST", "GET"]
)
adapter = HTTPAdapter(
pool_connections=10,
pool_maxsize=10,
max_retries=retry_strategy
)
session.mount("https://", adapter)
session.headers.update({
"x-api-key": API_KEY,
"Content-Type": "application/json"
})
return session
# Reuse session for all requests
session = create_session()
response = session.post(f"{BASE_URL}/screen/entity", json={"name": "John Smith"})
Error Logging​
Log errors for debugging:
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def screen_with_logging(name):
try:
result = screen_entity(name)
logger.info(f"Screening complete: {result['screening_id']} - {result['status']}")
return result
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP error screening {name}: {e.response.status_code} - {e.response.text}")
raise
except requests.exceptions.RequestException as e:
logger.error(f"Network error screening {name}: {str(e)}")
raise
Timeout Handling​
Always set reasonable timeouts:
# Single entity - 30 second timeout
response = requests.post(
f"{BASE_URL}/screen/entity",
json={"name": name},
timeout=30 # seconds
)
# Batch - longer timeout
response = requests.post(
f"{BASE_URL}/screen/batch",
json={"entities": entities},
timeout=120 # 2 minutes for large batches
)