mirror of
https://github.com/makeplane/plane.git
synced 2025-12-18 04:47:53 +01:00
849 lines
28 KiB
Python
849 lines
28 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Plane API Testing Framework
|
||
|
||
A composable API for testing Plane's customer APIs with chainable operations.
|
||
Build any test flow with method calls across different API domains.
|
||
|
||
Examples:
|
||
# Simple customer operations
|
||
api.customers().create("Acme Corp").update({"description": "Updated"}).run()
|
||
|
||
# Cross-domain composition
|
||
api.customers().create("Acme").properties().create("Status", "OPTION", ["Active", "Inactive"]).run()
|
||
|
||
# With context variables and mock mode
|
||
api.mock().customers().create("Test Corp").as_("main_customer").properties().create("Tier").run()
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import sys
|
||
import urllib.request
|
||
import urllib.error
|
||
from typing import Dict, List, Optional, Tuple, Any
|
||
|
||
# ====================================
|
||
# CORE INFRASTRUCTURE
|
||
# ====================================
|
||
|
||
|
||
class Logger:
|
||
"""Colored console output for test results"""
|
||
|
||
COLORS = {
|
||
"header": "\033[95m",
|
||
"blue": "\033[94m",
|
||
"cyan": "\033[96m",
|
||
"green": "\033[92m",
|
||
"yellow": "\033[93m",
|
||
"red": "\033[91m",
|
||
"bold": "\033[1m",
|
||
"underline": "\033[4m",
|
||
"reset": "\033[0m",
|
||
}
|
||
|
||
@classmethod
|
||
def _colored(cls, text: str, color: str) -> str:
|
||
return f"{cls.COLORS.get(color, '')}{text}{cls.COLORS['reset']}"
|
||
|
||
@classmethod
|
||
def header(cls, text: str):
|
||
print("\n" + "=" * 50)
|
||
print(cls._colored(text, "header"))
|
||
print("=" * 50)
|
||
|
||
@classmethod
|
||
def subheader(cls, text: str):
|
||
print(f"\n--- {cls._colored(text, 'cyan')} ---")
|
||
|
||
@classmethod
|
||
def info(cls, text: str):
|
||
print(f"ℹ️ Info: {text}")
|
||
|
||
@classmethod
|
||
def success(cls, text: str):
|
||
print(f"✅ Success: {cls._colored(text, 'green')}")
|
||
|
||
@classmethod
|
||
def error(cls, text: str):
|
||
print(f"❌ Error: {cls._colored(text, 'red')}")
|
||
|
||
@classmethod
|
||
def warning(cls, text: str):
|
||
print(f"⚠️ Warning: {cls._colored(text, 'yellow')}")
|
||
|
||
@classmethod
|
||
def debug(cls, text: str):
|
||
print(f"🐛 DEBUG: {cls._colored(text, 'blue')}")
|
||
|
||
|
||
class APIClient:
|
||
"""HTTP client for making API requests"""
|
||
|
||
def __init__(self, config: Dict[str, str]):
|
||
self.config = config
|
||
self.base_url = config["base_url"]
|
||
self.workspace_slug = config["workspace_slug"]
|
||
self.api_key = config["api_key"]
|
||
self.mock_mode = config.get("mock_mode", False)
|
||
|
||
def _make_request(
|
||
self, method: str, endpoint: str, data: Optional[Dict] = None
|
||
) -> Tuple[int, Dict]:
|
||
"""Make HTTP request and return status code and response data"""
|
||
url = f"{self.base_url}/workspaces/{self.workspace_slug}{endpoint}"
|
||
|
||
# Prepare request
|
||
if data:
|
||
data_bytes = json.dumps(data).encode("utf-8")
|
||
else:
|
||
data_bytes = None
|
||
|
||
req = urllib.request.Request(url, data=data_bytes, method=method)
|
||
req.add_header("x-api-key", self.api_key)
|
||
if data:
|
||
req.add_header("Content-Type", "application/json")
|
||
|
||
try:
|
||
with urllib.request.urlopen(req) as response:
|
||
response_body = response.read().decode("utf-8")
|
||
response_data = (
|
||
json.loads(response_body) if response_body.strip() else {}
|
||
)
|
||
return response.status, response_data
|
||
except urllib.error.HTTPError as e:
|
||
error_body = e.read().decode("utf-8")
|
||
try:
|
||
error_data = json.loads(error_body) if error_body.strip() else {}
|
||
except json.JSONDecodeError:
|
||
# Handle cases where server returns HTML (like 404 pages)
|
||
error_data = {"error": f"HTTP {e.code}: {error_body[:200]}..."}
|
||
return e.code, error_data
|
||
except Exception as e:
|
||
return 500, {"error": str(e)}
|
||
|
||
def get(self, endpoint: str) -> Tuple[int, Dict]:
|
||
return self._make_request("GET", endpoint)
|
||
|
||
def post(self, endpoint: str, data: Dict) -> Tuple[int, Dict]:
|
||
return self._make_request("POST", endpoint, data)
|
||
|
||
def patch(self, endpoint: str, data: Dict) -> Tuple[int, Dict]:
|
||
return self._make_request("PATCH", endpoint, data)
|
||
|
||
def delete(self, endpoint: str) -> Tuple[int, Dict]:
|
||
return self._make_request("DELETE", endpoint)
|
||
|
||
|
||
class FlowResults:
|
||
"""Container for API execution results"""
|
||
|
||
def __init__(self):
|
||
self.operations = []
|
||
self.context = {}
|
||
self.success_count = 0
|
||
self.total_count = 0
|
||
|
||
def add_operation(self, name: str, success: bool, result: Any = None):
|
||
self.operations.append({"name": name, "success": success, "result": result})
|
||
self.total_count += 1
|
||
if success:
|
||
self.success_count += 1
|
||
|
||
def was_successful(self) -> bool:
|
||
return self.success_count == self.total_count and self.total_count > 0
|
||
|
||
def get_summary(self) -> Dict:
|
||
success_rate = (
|
||
(self.success_count / self.total_count * 100) if self.total_count > 0 else 0
|
||
)
|
||
return {
|
||
"total": self.total_count,
|
||
"successful": self.success_count,
|
||
"failed": self.total_count - self.success_count,
|
||
"success_rate": success_rate,
|
||
}
|
||
|
||
|
||
# ====================================
|
||
# MAIN API
|
||
# ====================================
|
||
|
||
|
||
class PlaneAPI:
|
||
"""Main entry point for Plane API testing - provides access to all domain APIs"""
|
||
|
||
def __init__(self, config: Dict[str, str]):
|
||
self.client = APIClient(config)
|
||
self.context = {} # Shared context for variables
|
||
self.operations = [] # Queue of operations
|
||
self.cleanup_items = [] # Items to cleanup
|
||
self.results = FlowResults()
|
||
self._mock_mode = config.get("mock_mode", False)
|
||
|
||
def mock(self, enabled: bool = True) -> "PlaneAPI":
|
||
"""Enable/disable mock mode (preserves test data)"""
|
||
self._mock_mode = enabled
|
||
self.client.mock_mode = enabled
|
||
if enabled:
|
||
Logger.header("🔸 MOCK MODE ENABLED 🔸")
|
||
Logger.info("All test data will be preserved (no deletions will occur)")
|
||
return self
|
||
|
||
def customers(self) -> "CustomersAPI":
|
||
"""Access customer operations"""
|
||
return CustomersAPI(self)
|
||
|
||
def properties(self) -> "PropertiesAPI":
|
||
"""Access customer properties operations"""
|
||
return PropertiesAPI(self)
|
||
|
||
def issues(self) -> "IssuesAPI":
|
||
"""Access issues operations (placeholder for future)"""
|
||
return IssuesAPI(self)
|
||
|
||
def debug(self, message: str = "Debug checkpoint") -> "PlaneAPI":
|
||
"""Add debug checkpoint"""
|
||
|
||
def debug_op():
|
||
Logger.subheader(f"🐛 DEBUG: {message}")
|
||
Logger.info(f"Context: {self.context}")
|
||
return True, {"debug": message, "context": self.context.copy()}
|
||
|
||
self._add_operation(f"debug({message})", debug_op)
|
||
return self
|
||
|
||
def set(self, key: str, value: str) -> "PlaneAPI":
|
||
"""Set context variable manually"""
|
||
self.context[key] = value
|
||
Logger.info(f"Context: {key} = {value}")
|
||
return self
|
||
|
||
def run(self) -> FlowResults:
|
||
"""Execute all queued operations"""
|
||
if not self.operations:
|
||
Logger.warning(
|
||
"No operations queued. Use API methods to build your test flow."
|
||
)
|
||
return self.results
|
||
|
||
Logger.header("🚀 Running Test Flow")
|
||
Logger.info(f"Executing {len(self.operations)} operations...")
|
||
|
||
for i, (name, operation_func) in enumerate(self.operations, 1):
|
||
Logger.info(f"[{i}/{len(self.operations)}] {name}")
|
||
|
||
try:
|
||
success, result = operation_func()
|
||
self.results.add_operation(name, success, result)
|
||
|
||
if success:
|
||
Logger.success(f"{name} completed")
|
||
else:
|
||
Logger.error(f"{name} failed: {result}")
|
||
|
||
except Exception as e:
|
||
Logger.error(f"{name} crashed: {str(e)}")
|
||
self.results.add_operation(name, False, {"error": str(e)})
|
||
|
||
# Cleanup
|
||
self._cleanup()
|
||
|
||
# Results summary
|
||
summary = self.results.get_summary()
|
||
Logger.header("📊 Flow Results")
|
||
Logger.info(
|
||
f"Operations: {summary['successful']}/{summary['total']} successful ({summary['success_rate']:.1f}%)"
|
||
)
|
||
|
||
if self.results.was_successful():
|
||
Logger.success("All operations completed successfully! 🎉")
|
||
else:
|
||
Logger.error(f"{summary['failed']} operations failed")
|
||
|
||
return self.results
|
||
|
||
def _add_operation(self, name: str, operation_func):
|
||
"""Add operation to execution queue"""
|
||
self.operations.append((name, operation_func))
|
||
|
||
def _add_cleanup(self, item_id: str, endpoint: str):
|
||
"""Add item for cleanup"""
|
||
self.cleanup_items.append({"id": item_id, "endpoint": endpoint})
|
||
|
||
def _cleanup(self):
|
||
"""Clean up created resources"""
|
||
if not self.cleanup_items:
|
||
return
|
||
|
||
if self._mock_mode:
|
||
Logger.subheader("🔸 Mock Mode: Preserving test data")
|
||
for item in self.cleanup_items:
|
||
Logger.info(f"Preserving: {item['id']} ({item['endpoint']})")
|
||
else:
|
||
Logger.subheader("Cleaning up test data")
|
||
for item in self.cleanup_items:
|
||
status, _ = self.client.delete(item["endpoint"])
|
||
if status == 204:
|
||
Logger.info(f"Cleaned up: {item['id']}")
|
||
else:
|
||
Logger.error(f"Failed to cleanup: {item['id']}")
|
||
|
||
def _substitute_context(self, text: str) -> str:
|
||
"""Replace {variable} placeholders with context values"""
|
||
if isinstance(text, str):
|
||
for key, value in self.context.items():
|
||
text = text.replace(f"{{{key}}}", str(value))
|
||
return text
|
||
|
||
def _generate_unique_name(self, base_name: str) -> str:
|
||
"""Generate unique name with timestamp"""
|
||
import time
|
||
|
||
timestamp = int(time.time())
|
||
return f"{base_name} {timestamp}"
|
||
|
||
|
||
# ====================================
|
||
# DOMAIN APIS
|
||
# ====================================
|
||
|
||
|
||
class BaseDomainAPI:
|
||
"""Base class for all domain APIs"""
|
||
|
||
def __init__(self, api: PlaneAPI):
|
||
self.api = api
|
||
self.client = api.client
|
||
self.context = api.context
|
||
|
||
def back(self) -> PlaneAPI:
|
||
"""Return to main API for cross-domain composition"""
|
||
return self.api
|
||
|
||
|
||
class CustomersAPI(BaseDomainAPI):
|
||
"""API for customer operations"""
|
||
|
||
def create(self, name: str, description: str = None, **kwargs) -> "CustomersAPI":
|
||
"""Create a customer"""
|
||
|
||
def create_op():
|
||
actual_name = self.api._substitute_context(name)
|
||
if "{" not in name: # Add timestamp if no context variables
|
||
actual_name = self.api._generate_unique_name(actual_name)
|
||
|
||
data = {
|
||
"name": actual_name,
|
||
"description": description or "Customer created via API",
|
||
**kwargs,
|
||
}
|
||
|
||
status, response = self.client.post("/customers/", data)
|
||
if status == 201:
|
||
customer_id = response.get("id")
|
||
self.context["customer_id"] = customer_id
|
||
self.context["last_customer_id"] = customer_id
|
||
self.api._add_cleanup(customer_id, f"/customers/{customer_id}/")
|
||
Logger.info(f"Created customer: {actual_name} (ID: {customer_id})")
|
||
return True, response
|
||
else:
|
||
return False, response
|
||
|
||
self.api._add_operation(f"create_customer({name})", create_op)
|
||
return self
|
||
|
||
def get(self, customer_id: str = None) -> "CustomersAPI":
|
||
"""Get customer details"""
|
||
|
||
def get_op():
|
||
target_id = (
|
||
customer_id
|
||
or self.context.get("customer_id")
|
||
or self.context.get("last_customer_id")
|
||
)
|
||
if not target_id:
|
||
return False, {"error": "No customer ID available"}
|
||
|
||
status, response = self.client.get(f"/customers/{target_id}/")
|
||
if status == 200:
|
||
Logger.info(f"Retrieved customer: {target_id}")
|
||
return True, response
|
||
else:
|
||
return False, response
|
||
|
||
self.api._add_operation("get_customer", get_op)
|
||
return self
|
||
|
||
def update(self, updates: Dict, customer_id: str = None) -> "CustomersAPI":
|
||
"""Update a customer"""
|
||
|
||
def update_op():
|
||
target_id = (
|
||
customer_id
|
||
or self.context.get("customer_id")
|
||
or self.context.get("last_customer_id")
|
||
)
|
||
if not target_id:
|
||
return False, {"error": "No customer ID available"}
|
||
|
||
# Process context variables in updates
|
||
processed_updates = {}
|
||
for key, value in updates.items():
|
||
processed_updates[key] = (
|
||
self.api._substitute_context(str(value))
|
||
if isinstance(value, str)
|
||
else value
|
||
)
|
||
|
||
status, response = self.client.patch(
|
||
f"/customers/{target_id}/", processed_updates
|
||
)
|
||
if status == 200:
|
||
Logger.info(f"Updated customer: {target_id}")
|
||
return True, response
|
||
else:
|
||
return False, response
|
||
|
||
self.api._add_operation("update_customer", update_op)
|
||
return self
|
||
|
||
def list(self) -> "CustomersAPI":
|
||
"""List all customers"""
|
||
|
||
def list_op():
|
||
status, response = self.client.get("/customers/")
|
||
if status == 200:
|
||
count = response.get("total_count", len(response.get("results", [])))
|
||
Logger.info(f"Listed {count} customers")
|
||
return True, response
|
||
else:
|
||
return False, response
|
||
|
||
self.api._add_operation("list_customers", list_op)
|
||
return self
|
||
|
||
def search(self, query: str) -> "CustomersAPI":
|
||
"""Search customers"""
|
||
|
||
def search_op():
|
||
actual_query = self.api._substitute_context(query)
|
||
status, response = self.client.get(f"/customers/?search={actual_query}")
|
||
if status == 200:
|
||
count = response.get("total_count", len(response.get("results", [])))
|
||
Logger.info(f"Search '{actual_query}': {count} customers found")
|
||
return True, response
|
||
else:
|
||
return False, response
|
||
|
||
self.api._add_operation(f"search_customers({query})", search_op)
|
||
return self
|
||
|
||
def delete(self, customer_id: str = None) -> "CustomersAPI":
|
||
"""Delete a customer"""
|
||
|
||
def delete_op():
|
||
if self.api._mock_mode:
|
||
Logger.info("Mock: Skipping customer deletion")
|
||
return True, {"message": "Skipped in mock mode"}
|
||
|
||
target_id = (
|
||
customer_id
|
||
or self.context.get("customer_id")
|
||
or self.context.get("last_customer_id")
|
||
)
|
||
if not target_id:
|
||
return False, {"error": "No customer ID available"}
|
||
|
||
status, response = self.client.delete(f"/customers/{target_id}/")
|
||
if status == 204:
|
||
Logger.info(f"Deleted customer: {target_id}")
|
||
# Remove from cleanup since deleted explicitly
|
||
self.api.cleanup_items = [
|
||
item for item in self.api.cleanup_items if item["id"] != target_id
|
||
]
|
||
return True, {"deleted": target_id}
|
||
else:
|
||
return False, response
|
||
|
||
self.api._add_operation("delete_customer", delete_op)
|
||
return self
|
||
|
||
def as_(self, context_name: str) -> "CustomersAPI":
|
||
"""Store current customer ID with a custom name in context"""
|
||
customer_id = self.context.get("customer_id") or self.context.get(
|
||
"last_customer_id"
|
||
)
|
||
if customer_id:
|
||
self.context[context_name] = customer_id
|
||
Logger.info(f"Saved customer ID as '{context_name}': {customer_id}")
|
||
return self
|
||
|
||
|
||
class PropertiesAPI(BaseDomainAPI):
|
||
"""API for customer properties operations"""
|
||
|
||
def create(
|
||
self,
|
||
display_name: str,
|
||
property_type: str = "TEXT",
|
||
options: List[str] = None,
|
||
**kwargs,
|
||
) -> "PropertiesAPI":
|
||
"""Create a customer property"""
|
||
|
||
def create_op():
|
||
actual_name = self.api._substitute_context(display_name)
|
||
if "{" not in display_name:
|
||
actual_name = self.api._generate_unique_name(actual_name)
|
||
|
||
data = {
|
||
"name": actual_name,
|
||
"display_name": actual_name,
|
||
"property_type": property_type,
|
||
"description": kwargs.get("description", "Property created via API"),
|
||
**kwargs,
|
||
}
|
||
|
||
# Add options for OPTION type properties
|
||
if property_type == "OPTION" and options:
|
||
data["options"] = [
|
||
{
|
||
"name": option,
|
||
"description": f"Option: {option}",
|
||
"is_default": i == 0,
|
||
}
|
||
for i, option in enumerate(options)
|
||
]
|
||
|
||
status, response = self.client.post("/customer-properties/", data)
|
||
if status == 201:
|
||
property_id = response.get("id")
|
||
self.context["property_id"] = property_id
|
||
self.context["last_property_id"] = property_id
|
||
self.api._add_cleanup(
|
||
property_id, f"/customer-properties/{property_id}/"
|
||
)
|
||
Logger.info(
|
||
f"Created property: {actual_name} ({property_type}) (ID: {property_id})"
|
||
)
|
||
return True, response
|
||
else:
|
||
return False, response
|
||
|
||
self.api._add_operation(
|
||
f"create_property({display_name}, {property_type})", create_op
|
||
)
|
||
return self
|
||
|
||
def get(self, property_id: str = None) -> "PropertiesAPI":
|
||
"""Get property details"""
|
||
|
||
def get_op():
|
||
target_id = (
|
||
property_id
|
||
or self.context.get("property_id")
|
||
or self.context.get("last_property_id")
|
||
)
|
||
if not target_id:
|
||
return False, {"error": "No property ID available"}
|
||
|
||
status, response = self.client.get(f"/customer-properties/{target_id}/")
|
||
if status == 200:
|
||
Logger.info(f"Retrieved property: {target_id}")
|
||
return True, response
|
||
else:
|
||
return False, response
|
||
|
||
self.api._add_operation("get_property", get_op)
|
||
return self
|
||
|
||
def update(self, updates: Dict, property_id: str = None) -> "PropertiesAPI":
|
||
"""Update a customer property"""
|
||
|
||
def update_op():
|
||
target_id = (
|
||
property_id
|
||
or self.context.get("property_id")
|
||
or self.context.get("last_property_id")
|
||
)
|
||
if not target_id:
|
||
return False, {"error": "No property ID available"}
|
||
|
||
# Process context variables
|
||
processed_updates = {}
|
||
for key, value in updates.items():
|
||
processed_updates[key] = (
|
||
self.api._substitute_context(str(value))
|
||
if isinstance(value, str)
|
||
else value
|
||
)
|
||
|
||
status, response = self.client.patch(
|
||
f"/customer-properties/{target_id}/", processed_updates
|
||
)
|
||
if status == 200:
|
||
Logger.info(f"Updated property: {target_id}")
|
||
return True, response
|
||
else:
|
||
return False, response
|
||
|
||
self.api._add_operation("update_property", update_op)
|
||
return self
|
||
|
||
def list(self) -> "PropertiesAPI":
|
||
"""List all customer properties"""
|
||
|
||
def list_op():
|
||
status, response = self.client.get("/customer-properties/")
|
||
if status == 200:
|
||
count = response.get("total_count", len(response.get("results", [])))
|
||
Logger.info(f"Listed {count} properties")
|
||
return True, response
|
||
else:
|
||
return False, response
|
||
|
||
self.api._add_operation("list_properties", list_op)
|
||
return self
|
||
|
||
def delete(self, property_id: str = None) -> "PropertiesAPI":
|
||
"""Delete a customer property"""
|
||
|
||
def delete_op():
|
||
if self.api._mock_mode:
|
||
Logger.info("Mock: Skipping property deletion")
|
||
return True, {"message": "Skipped in mock mode"}
|
||
|
||
target_id = (
|
||
property_id
|
||
or self.context.get("property_id")
|
||
or self.context.get("last_property_id")
|
||
)
|
||
if not target_id:
|
||
return False, {"error": "No property ID available"}
|
||
|
||
status, response = self.client.delete(f"/customer-properties/{target_id}/")
|
||
if status == 204:
|
||
Logger.info(f"Deleted property: {target_id}")
|
||
# Remove from cleanup since deleted explicitly
|
||
self.api.cleanup_items = [
|
||
item for item in self.api.cleanup_items if item["id"] != target_id
|
||
]
|
||
return True, {"deleted": target_id}
|
||
else:
|
||
return False, response
|
||
|
||
self.api._add_operation("delete_property", delete_op)
|
||
return self
|
||
|
||
def as_(self, context_name: str) -> "PropertiesAPI":
|
||
"""Store current property ID with a custom name in context"""
|
||
property_id = self.context.get("property_id") or self.context.get(
|
||
"last_property_id"
|
||
)
|
||
if property_id:
|
||
self.context[context_name] = property_id
|
||
Logger.info(f"Saved property ID as '{context_name}': {property_id}")
|
||
return self
|
||
|
||
|
||
class IssuesAPI(BaseDomainAPI):
|
||
"""API for issues operations (placeholder for future expansion)"""
|
||
|
||
def create(self, title: str, **kwargs) -> "IssuesAPI":
|
||
"""Create an issue (placeholder)"""
|
||
Logger.info("IssuesAPI.create() - Coming soon!")
|
||
return self
|
||
|
||
def link_to_customer(self, customer_id: str, issue_ids: List[str]) -> "IssuesAPI":
|
||
"""Link issues to customer (placeholder)"""
|
||
Logger.info("IssuesAPI.link_to_customer() - Coming soon!")
|
||
return self
|
||
|
||
|
||
# ====================================
|
||
# PREDEFINED FLOWS
|
||
# ====================================
|
||
|
||
|
||
def customer_onboarding_flow(api: PlaneAPI) -> FlowResults:
|
||
"""Complete customer onboarding with properties setup"""
|
||
return (
|
||
api.debug("Customer onboarding process")
|
||
.customers()
|
||
.create("Acme Corporation", description="Enterprise client")
|
||
.as_("main_customer")
|
||
.back()
|
||
.properties()
|
||
.create(
|
||
"Account Tier", "OPTION", options=["Bronze", "Silver", "Gold", "Platinum"]
|
||
)
|
||
.create("Industry", "TEXT", description="Customer's industry sector")
|
||
.create("Annual Revenue", "TEXT", description="Company annual revenue")
|
||
.create("Primary Contact", "TEXT", description="Main point of contact")
|
||
.back()
|
||
.debug("Onboarding complete - customer and properties ready")
|
||
.run()
|
||
)
|
||
|
||
|
||
def validation_testing_flow(api: PlaneAPI) -> FlowResults:
|
||
"""Testing validation and error conditions"""
|
||
return (
|
||
api.mock() # Preserve data for inspection
|
||
.debug("Validation testing flow")
|
||
.customers()
|
||
.create("Validation Test Corp")
|
||
.update({"description": "Testing updates"})
|
||
.search("Validation")
|
||
.back()
|
||
.properties()
|
||
.create("Test Property", "TEXT")
|
||
.update({"description": "Updated description"})
|
||
.list()
|
||
.back()
|
||
.debug("Validation tests completed")
|
||
.run()
|
||
)
|
||
|
||
|
||
def demo_flow(api: PlaneAPI) -> FlowResults:
|
||
"""Demonstration of API capabilities"""
|
||
return (
|
||
api.debug("Starting demo")
|
||
.customers()
|
||
.list()
|
||
.create("Demo Corp")
|
||
.get()
|
||
.update({"description": "Updated via API"})
|
||
.back()
|
||
.properties()
|
||
.list()
|
||
.create("Priority", "OPTION", options=["High", "Medium", "Low"])
|
||
.create("Department", "TEXT")
|
||
.back()
|
||
.debug("Demo completed")
|
||
.run()
|
||
)
|
||
|
||
|
||
# ====================================
|
||
# MAIN CLI
|
||
# ====================================
|
||
|
||
|
||
def main():
|
||
"""Main CLI interface"""
|
||
|
||
# Configuration
|
||
CONFIG = {
|
||
"base_url": "http://localhost:8000/api/v1",
|
||
"workspace_slug": "slack-shah",
|
||
"api_key": "plane_api_b113773f13354d78a0c2d9bb361a0dee",
|
||
"mock_mode": False,
|
||
}
|
||
|
||
parser = argparse.ArgumentParser(
|
||
description="Plane API Testing Framework",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
Examples:
|
||
python test_api.py demo # Run demonstration flow
|
||
python test_api.py onboarding # Customer onboarding flow
|
||
python test_api.py validation # Validation testing flow
|
||
python test_api.py demo --mock # Demo in mock mode (preserve data)
|
||
python test_api.py custom # Interactive custom flow
|
||
|
||
Available flows:
|
||
demo, onboarding, validation, custom
|
||
""",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"flow",
|
||
choices=["demo", "onboarding", "validation", "custom"],
|
||
help="Test flow to run",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--base-url",
|
||
default=CONFIG["base_url"],
|
||
help=f"Base API URL (default: {CONFIG['base_url']})",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--workspace",
|
||
default=CONFIG["workspace_slug"],
|
||
help=f"Workspace slug (default: {CONFIG['workspace_slug']})",
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--api-key", default=CONFIG["api_key"], help="API key for authentication"
|
||
)
|
||
|
||
parser.add_argument(
|
||
"--mock",
|
||
action="store_true",
|
||
help="Mock mode: Skip all deletions and preserve test data",
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
# Update config
|
||
test_config = CONFIG.copy()
|
||
test_config["base_url"] = args.base_url
|
||
test_config["workspace_slug"] = args.workspace
|
||
test_config["api_key"] = args.api_key
|
||
test_config["mock_mode"] = args.mock
|
||
|
||
# Show mock mode info
|
||
if args.mock:
|
||
Logger.header("🔸 MOCK MODE ENABLED 🔸")
|
||
Logger.info("All test data will be preserved (no deletions will occur)")
|
||
print()
|
||
|
||
# Create API instance
|
||
api = PlaneAPI(test_config)
|
||
|
||
# Run selected flow
|
||
if args.flow == "demo":
|
||
results = demo_flow(api)
|
||
elif args.flow == "onboarding":
|
||
results = customer_onboarding_flow(api)
|
||
elif args.flow == "validation":
|
||
results = validation_testing_flow(api)
|
||
elif args.flow == "custom":
|
||
results = run_custom_flow(api)
|
||
else:
|
||
Logger.error(f"Unknown flow: {args.flow}")
|
||
sys.exit(1)
|
||
|
||
# Exit with appropriate code
|
||
sys.exit(0 if results.was_successful() else 1)
|
||
|
||
|
||
def run_custom_flow(api: PlaneAPI) -> FlowResults:
|
||
"""Interactive custom flow builder"""
|
||
Logger.header("🛠 Custom Flow Builder")
|
||
Logger.info("Build your own test flow interactively!")
|
||
Logger.info("For now, running a sample custom flow...")
|
||
|
||
# Sample custom flow - in the future this could be interactive
|
||
return (
|
||
api.debug("Custom flow example")
|
||
.customers()
|
||
.create("Custom Customer")
|
||
.as_("my_customer")
|
||
.back()
|
||
.properties()
|
||
.create("Custom Property", "TEXT")
|
||
.back()
|
||
.debug("Custom flow completed with customer '{my_customer}'")
|
||
.run()
|
||
)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|