← Back to Learning Modules

Performance Testing

Master load testing, stress testing, and performance optimization techniques

Overview

Performance testing evaluates how applications perform under various load conditions. This module covers load testing, stress testing, volume testing, and performance monitoring to ensure your applications can handle real-world usage scenarios.

Key Metrics: Response time, throughput, resource utilization, concurrent users, error rates, and scalability limits.
Response Time
Time taken to process requests
Throughput
Requests processed per second
Concurrent Users
Simultaneous active users
Error Rate
Percentage of failed requests

Load Testing

Load testing verifies application performance under expected user loads:

Basic Load Testing Scripts

// Load testing with Artillery.js // artillery.yml configuration /* config: target: 'https://api.example.com' phases: - duration: 60 arrivalRate: 10 - duration: 120 arrivalRate: 50 - duration: 60 arrivalRate: 100 scenarios: - name: "API Load Test" weight: 100 flow: - get: url: "/api/users" expect: - statusCode: 200 - hasProperty: "users" - think: 2 - post: url: "/api/users" json: name: "Load Test User" email: "loadtest@example.com" expect: - statusCode: 201 */ // Using k6 for load testing import http from 'k6/http'; import { check, sleep } from 'k6'; import { Rate } from 'k6/metrics'; // Custom metrics export let errorRate = new Rate('errors'); export let options = { stages: [ { duration: '2m', target: 100 }, // Ramp up to 100 users { duration: '5m', target: 100 }, // Stay at 100 users { duration: '2m', target: 200 }, // Ramp up to 200 users { duration: '5m', target: 200 }, // Stay at 200 users { duration: '2m', target: 0 }, // Ramp down to 0 users ], thresholds: { 'http_req_duration': ['p(95)<500'], // 95% of requests under 500ms 'http_req_failed': ['rate<0.1'], // Error rate under 10% 'errors': ['rate<0.1'], }, }; export default function() { // Test homepage let response = http.get('https://example.com'); check(response, { 'status is 200': (r) => r.status === 200, 'response time < 500ms': (r) => r.timings.duration < 500, }) || errorRate.add(1); sleep(1); // Test API endpoint let apiResponse = http.get('https://api.example.com/users'); check(apiResponse, { 'API status is 200': (r) => r.status === 200, 'API response has users': (r) => { try { const body = JSON.parse(r.body); return body.hasOwnProperty('users'); } catch (e) { return false; } }, 'API response time < 1s': (r) => r.timings.duration < 1000, }) || errorRate.add(1); sleep(2); // Test POST request const payload = { name: `User ${__VU}-${__ITER}`, email: `user${__VU}-${__ITER}@example.com`, }; const params = { headers: { 'Content-Type': 'application/json', }, }; let postResponse = http.post('https://api.example.com/users', JSON.stringify(payload), params); check(postResponse, { 'POST status is 201': (r) => r.status === 201, 'POST response time < 2s': (r) => r.timings.duration < 2000, }) || errorRate.add(1); sleep(1); } // Custom teardown function export function teardown(data) { console.log('Load test completed'); console.log(`Total VUs: ${data.vus}`); }
// Load testing with JMeter Java API import org.apache.jmeter.control.LoopController; import org.apache.jmeter.engine.StandardJMeterEngine; import org.apache.jmeter.protocol.http.sampler.HTTPSampler; import org.apache.jmeter.testelement.TestPlan; import org.apache.jmeter.threads.ThreadGroup; import org.apache.jmeter.util.JMeterUtils; public class LoadTestExample { public static void main(String[] args) { // Initialize JMeter JMeterUtils.loadJMeterProperties("jmeter.properties"); JMeterUtils.setJMeterHome("/path/to/jmeter"); JMeterUtils.initLocale(); // Create Test Plan TestPlan testPlan = new TestPlan("Load Test Plan"); // Create Thread Group ThreadGroup threadGroup = new ThreadGroup(); threadGroup.setName("Load Test Thread Group"); threadGroup.setNumThreads(100); threadGroup.setRampUp(60); threadGroup.setSamplerController(createLoopController()); // Create HTTP Sampler HTTPSampler httpSampler = createHttpSampler(); // Add components to test plan threadGroup.addTestElement(httpSampler); testPlan.addTestElement(threadGroup); // Run the test StandardJMeterEngine jmeter = new StandardJMeterEngine(); jmeter.configure(testPlan); jmeter.run(); } private static LoopController createLoopController() { LoopController loopController = new LoopController(); loopController.setLoops(10); loopController.setFirst(true); loopController.initialize(); return loopController; } private static HTTPSampler createHttpSampler() { HTTPSampler httpSampler = new HTTPSampler(); httpSampler.setDomain("api.example.com"); httpSampler.setPort(443); httpSampler.setProtocol("https"); httpSampler.setPath("/api/users"); httpSampler.setMethod("GET"); httpSampler.setName("API Load Test"); return httpSampler; } } // Using Spring Boot Test for load testing import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import org.junit.Test; import org.junit.runner.RunWith; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class ApplicationLoadTest { @Test public void loadTestUserEndpoint() throws InterruptedException { int numberOfThreads = 50; int numberOfRequestsPerThread = 100; ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); AtomicInteger successCount = new AtomicInteger(0); AtomicInteger errorCount = new AtomicInteger(0); long startTime = System.currentTimeMillis(); for (int i = 0; i < numberOfThreads; i++) { executor.submit(() -> { for (int j = 0; j < numberOfRequestsPerThread; j++) { try { // Simulate HTTP request RestTemplate restTemplate = new RestTemplate(); ResponseEntity response = restTemplate.getForEntity( "http://localhost:8080/api/users", String.class ); if (response.getStatusCode() == HttpStatus.OK) { successCount.incrementAndGet(); } else { errorCount.incrementAndGet(); } // Small delay between requests Thread.sleep(100); } catch (Exception e) { errorCount.incrementAndGet(); e.printStackTrace(); } } }); } executor.shutdown(); executor.awaitTermination(10, TimeUnit.MINUTES); long endTime = System.currentTimeMillis(); long totalTime = endTime - startTime; int totalRequests = numberOfThreads * numberOfRequestsPerThread; System.out.println("Load Test Results:"); System.out.println("Total Requests: " + totalRequests); System.out.println("Successful Requests: " + successCount.get()); System.out.println("Failed Requests: " + errorCount.get()); System.out.println("Success Rate: " + (successCount.get() * 100.0 / totalRequests) + "%"); System.out.println("Total Time: " + totalTime + "ms"); System.out.println("Requests per Second: " + (totalRequests * 1000.0 / totalTime)); // Assertions assertTrue("Success rate should be > 95%", successCount.get() > (totalRequests * 0.95)); assertTrue("Average response time should be < 1000ms", totalTime / totalRequests < 1000); } @Test public void stressTestWithIncreasingLoad() throws InterruptedException { int[] loadLevels = {10, 25, 50, 100, 200}; for (int users : loadLevels) { System.out.println("Testing with " + users + " concurrent users"); long startTime = System.currentTimeMillis(); ExecutorService executor = Executors.newFixedThreadPool(users); AtomicInteger completedRequests = new AtomicInteger(0); for (int i = 0; i < users; i++) { executor.submit(() -> { try { RestTemplate restTemplate = new RestTemplate(); ResponseEntity response = restTemplate.getForEntity( "http://localhost:8080/api/users", String.class ); completedRequests.incrementAndGet(); } catch (Exception e) { System.err.println("Request failed: " + e.getMessage()); } }); } executor.shutdown(); executor.awaitTermination(30, TimeUnit.SECONDS); long duration = System.currentTimeMillis() - startTime; double throughput = completedRequests.get() * 1000.0 / duration; System.out.println("Users: " + users + ", Completed: " + completedRequests.get() + ", Duration: " + duration + "ms" + ", Throughput: " + String.format("%.2f", throughput) + " req/s"); // Wait between load levels Thread.sleep(5000); } } }
# Load testing with Locust from locust import HttpUser, task, between import random class WebsiteUser(HttpUser): wait_time = between(1, 5) # Wait 1-5 seconds between tasks def on_start(self): """Called when a user starts""" # Login or setup tasks self.login() def login(self): """Simulate user login""" response = self.client.post("/api/auth/login", json={ "username": f"testuser{random.randint(1, 1000)}", "password": "password123" }) if response.status_code == 200: self.auth_token = response.json().get("token") else: self.auth_token = None @task(3) def view_homepage(self): """Most common task - view homepage""" self.client.get("/") @task(2) def view_products(self): """View products page""" self.client.get("/products") @task(1) def view_product_details(self): """View specific product""" product_id = random.randint(1, 100) with self.client.get(f"/products/{product_id}", catch_response=True) as response: if response.status_code == 404: response.success() # 404 is expected for some products @task(1) def api_get_users(self): """Test API endpoint""" headers = {} if self.auth_token: headers["Authorization"] = f"Bearer {self.auth_token}" self.client.get("/api/users", headers=headers) @task(0.5) def create_user(self): """Less frequent task - create user""" user_data = { "name": f"User{random.randint(1, 10000)}", "email": f"user{random.randint(1, 10000)}@example.com", "age": random.randint(18, 80) } self.client.post("/api/users", json=user_data) class AdminUser(HttpUser): """Different user behavior pattern for admin users""" wait_time = between(2, 8) weight = 1 # 10% of users will be admin users @task def admin_dashboard(self): self.client.get("/admin/dashboard") @task def view_analytics(self): self.client.get("/admin/analytics") # Custom load test with requests import requests import threading import time from concurrent.futures import ThreadPoolExecutor import statistics class LoadTester: def __init__(self, base_url, num_threads=50, duration=60): self.base_url = base_url self.num_threads = num_threads self.duration = duration self.results = [] self.start_time = None def make_request(self, endpoint="/"): """Make a single HTTP request and record metrics""" start_time = time.time() try: response = requests.get(f"{self.base_url}{endpoint}", timeout=10) end_time = time.time() result = { 'response_time': end_time - start_time, 'status_code': response.status_code, 'success': response.status_code < 400, 'timestamp': start_time } self.results.append(result) return result except requests.exceptions.RequestException as e: end_time = time.time() result = { 'response_time': end_time - start_time, 'status_code': 0, 'success': False, 'error': str(e), 'timestamp': start_time } self.results.append(result) return result def worker(self): """Worker thread that makes requests continuously""" while time.time() - self.start_time < self.duration: self.make_request() time.sleep(0.1) # Small delay between requests def run_load_test(self): """Run the load test""" print(f"Starting load test with {self.num_threads} threads for {self.duration} seconds") self.start_time = time.time() # Start worker threads with ThreadPoolExecutor(max_workers=self.num_threads) as executor: futures = [executor.submit(self.worker) for _ in range(self.num_threads)] # Wait for all threads to complete for future in futures: future.result() self.analyze_results() def analyze_results(self): """Analyze and print test results""" if not self.results: print("No results to analyze") return successful_requests = [r for r in self.results if r['success']] failed_requests = [r for r in self.results if not r['success']] total_requests = len(self.results) success_rate = len(successful_requests) / total_requests * 100 if successful_requests: response_times = [r['response_time'] for r in successful_requests] avg_response_time = statistics.mean(response_times) median_response_time = statistics.median(response_times) p95_response_time = statistics.quantiles(response_times, n=20)[18] # 95th percentile min_response_time = min(response_times) max_response_time = max(response_times) else: avg_response_time = median_response_time = p95_response_time = 0 min_response_time = max_response_time = 0 # Calculate throughput actual_duration = max(r['timestamp'] for r in self.results) - min(r['timestamp'] for r in self.results) throughput = total_requests / actual_duration if actual_duration > 0 else 0 print("\n" + "="*50) print("LOAD TEST RESULTS") print("="*50) print(f"Total Requests: {total_requests}") print(f"Successful Requests: {len(successful_requests)}") print(f"Failed Requests: {len(failed_requests)}") print(f"Success Rate: {success_rate:.2f}%") print(f"Throughput: {throughput:.2f} requests/second") print(f"Average Response Time: {avg_response_time*1000:.2f}ms") print(f"Median Response Time: {median_response_time*1000:.2f}ms") print(f"95th Percentile Response Time: {p95_response_time*1000:.2f}ms") print(f"Min Response Time: {min_response_time*1000:.2f}ms") print(f"Max Response Time: {max_response_time*1000:.2f}ms") # Error analysis if failed_requests: print("\nERROR ANALYSIS:") error_types = {} for req in failed_requests: error = req.get('error', f"HTTP {req['status_code']}") error_types[error] = error_types.get(error, 0) + 1 for error, count in error_types.items(): print(f" {error}: {count} occurrences") # Example usage if __name__ == "__main__": # Run load test tester = LoadTester("https://api.example.com", num_threads=25, duration=30) tester.run_load_test() # Run Locust programmatically # locust -f loadtest.py --host=https://api.example.com -u 100 -r 10 -t 300s --html=report.html

Stress Testing

Stress testing pushes the system beyond normal operating capacity to identify breaking points:

// Stress testing with k6 import http from 'k6/http'; import { check, sleep } from 'k6'; export let options = { stages: [ // Warm up { duration: '1m', target: 50 }, // Normal load { duration: '2m', target: 100 }, // Stress load - gradually increase { duration: '2m', target: 200 }, { duration: '2m', target: 400 }, { duration: '2m', target: 800 }, // Spike load { duration: '1m', target: 1200 }, // Recovery { duration: '2m', target: 100 }, { duration: '1m', target: 0 }, ], thresholds: { // Normal load thresholds 'http_req_duration{load:normal}': ['p(95)<500'], 'http_req_failed{load:normal}': ['rate<0.01'], // Stress load thresholds (more lenient) 'http_req_duration{load:stress}': ['p(95)<2000'], 'http_req_failed{load:stress}': ['rate<0.1'], // Spike load thresholds (most lenient) 'http_req_duration{load:spike}': ['p(95)<5000'], 'http_req_failed{load:spike}': ['rate<0.2'], }, }; export default function() { let loadType = 'normal'; if (__VU > 800) { loadType = 'spike'; } else if (__VU > 200) { loadType = 'stress'; } let response = http.get('https://api.example.com/stress-test', { tags: { load: loadType } }); check(response, { 'status is 200': (r) => r.status === 200, 'response time reasonable': (r) => { if (loadType === 'normal') return r.timings.duration < 500; if (loadType === 'stress') return r.timings.duration < 2000; return r.timings.duration < 5000; // spike } }); // Varying sleep time based on load if (loadType === 'spike') { sleep(0.1); // Aggressive load } else if (loadType === 'stress') { sleep(0.5); } else { sleep(1); // Normal user behavior } } // Stress test for specific endpoints export function stressTestEndpoints() { let endpoints = [ '/api/users', '/api/products', '/api/orders', '/api/search' ]; endpoints.forEach(endpoint => { let response = http.get(`https://api.example.com${endpoint}`); check(response, { [`${endpoint} responds`]: (r) => r.status < 500, [`${endpoint} not too slow`]: (r) => r.timings.duration < 3000, }); }); } // Memory and resource stress test export function resourceStressTest() { // Create large payload let largeData = 'x'.repeat(1024 * 100); // 100KB string let response = http.post('https://api.example.com/upload', { data: largeData, type: 'stress-test' }); check(response, { 'handles large payload': (r) => r.status < 500, 'processes within time limit': (r) => r.timings.duration < 10000, }); }
// Stress testing with Java import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; public class StressTestRunner { private final String baseUrl; private final AtomicInteger requestCount = new AtomicInteger(0); private final AtomicInteger successCount = new AtomicInteger(0); private final AtomicInteger errorCount = new AtomicInteger(0); private final AtomicLong totalResponseTime = new AtomicLong(0); public StressTestRunner(String baseUrl) { this.baseUrl = baseUrl; } public void runStressTest() throws InterruptedException { System.out.println("Starting stress test..."); // Test with increasing load int[] loadLevels = {50, 100, 200, 500, 1000, 1500, 2000}; for (int users : loadLevels) { System.out.println("\n--- Testing with " + users + " concurrent users ---"); // Reset counters requestCount.set(0); successCount.set(0); errorCount.set(0); totalResponseTime.set(0); long startTime = System.currentTimeMillis(); ExecutorService executor = Executors.newFixedThreadPool(users); CountDownLatch latch = new CountDownLatch(users); // Submit tasks for (int i = 0; i < users; i++) { executor.submit(new StressTestWorker(latch)); } // Wait for completion or timeout boolean completed = latch.await(60, TimeUnit.SECONDS); executor.shutdown(); long endTime = System.currentTimeMillis(); long duration = endTime - startTime; // Calculate metrics int total = requestCount.get(); double successRate = total > 0 ? (successCount.get() * 100.0 / total) : 0; double avgResponseTime = total > 0 ? (totalResponseTime.get() / (double) total) : 0; double throughput = duration > 0 ? (total * 1000.0 / duration) : 0; // Print results System.out.println("Completed: " + completed); System.out.println("Total requests: " + total); System.out.println("Successful requests: " + successCount.get()); System.out.println("Failed requests: " + errorCount.get()); System.out.println("Success rate: " + String.format("%.2f", successRate) + "%"); System.out.println("Average response time: " + String.format("%.2f", avgResponseTime) + "ms"); System.out.println("Throughput: " + String.format("%.2f", throughput) + " req/sec"); System.out.println("Duration: " + duration + "ms"); // Determine if system is breaking under this load if (successRate < 80 || avgResponseTime > 5000) { System.out.println("⚠️ System showing stress at " + users + " users"); } if (successRate < 50) { System.out.println("❌ System breaking point reached at " + users + " users"); break; } // Cool down period Thread.sleep(5000); } } private class StressTestWorker implements Runnable { private final CountDownLatch latch; public StressTestWorker(CountDownLatch latch) { this.latch = latch; } @Override public void run() { try { // Perform multiple requests per user for (int i = 0; i < 10; i++) { performRequest(); Thread.sleep(100); // Small delay between requests } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { latch.countDown(); } } private void performRequest() { long startTime = System.currentTimeMillis(); try { // Simulate HTTP request (replace with actual HTTP client) RestTemplate restTemplate = new RestTemplate(); ResponseEntity response = restTemplate.getForEntity( baseUrl + "/api/stress-test", String.class); long responseTime = System.currentTimeMillis() - startTime; requestCount.incrementAndGet(); totalResponseTime.addAndGet(responseTime); if (response.getStatusCode().is2xxSuccessful()) { successCount.incrementAndGet(); } else { errorCount.incrementAndGet(); } } catch (Exception e) { long responseTime = System.currentTimeMillis() - startTime; requestCount.incrementAndGet(); totalResponseTime.addAndGet(responseTime); errorCount.incrementAndGet(); System.err.println("Request failed: " + e.getMessage()); } } } // Memory stress test public void runMemoryStressTest() { System.out.println("Starting memory stress test..."); ExecutorService executor = Executors.newFixedThreadPool(20); for (int i = 0; i < 100; i++) { executor.submit(() -> { // Create large data structures byte[] largeArray = new byte[1024 * 1024 * 10]; // 10MB // Fill with data for (int j = 0; j < largeArray.length; j++) { largeArray[j] = (byte) (j % 256); } // Simulate processing try { Thread.sleep(1000); // Send large payload String largeString = new String(largeArray); // ... make HTTP request with large payload } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } executor.shutdown(); try { executor.awaitTermination(300, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } public static void main(String[] args) throws InterruptedException { StressTestRunner runner = new StressTestRunner("https://api.example.com"); runner.runStressTest(); runner.runMemoryStressTest(); } }
# Stress testing in Python import requests import threading import time import queue import statistics from concurrent.futures import ThreadPoolExecutor, as_completed import psutil import gc class StressTester: def __init__(self, base_url): self.base_url = base_url self.results_queue = queue.Queue() self.start_time = None def stress_test_with_increasing_load(self): """Test with gradually increasing load to find breaking point""" load_levels = [10, 25, 50, 100, 200, 400, 800, 1200, 1600, 2000] print("Starting stress test with increasing load...") breaking_point = None for users in load_levels: print(f"\n--- Testing with {users} concurrent users ---") # Clear previous results while not self.results_queue.empty(): try: self.results_queue.get_nowait() except queue.Empty: break # Run test with current load level results = self.run_concurrent_test(users, duration=30) # Analyze results success_rate = results['success_rate'] avg_response_time = results['avg_response_time'] p95_response_time = results['p95_response_time'] throughput = results['throughput'] print(f"Success Rate: {success_rate:.2f}%") print(f"Avg Response Time: {avg_response_time*1000:.2f}ms") print(f"95th Percentile: {p95_response_time*1000:.2f}ms") print(f"Throughput: {throughput:.2f} req/sec") # Determine if system is under stress if success_rate < 95 or avg_response_time > 2.0: print(f"⚠️ System showing stress at {users} users") # Check for breaking point if success_rate < 80 or avg_response_time > 5.0: print(f"❌ Breaking point reached at {users} users") breaking_point = users break # Monitor system resources cpu_percent = psutil.cpu_percent(interval=1) memory_percent = psutil.virtual_memory().percent print(f"System CPU: {cpu_percent}%, Memory: {memory_percent}%") # Cool down period time.sleep(5) if breaking_point: print(f"\n🔥 System breaking point: {breaking_point} concurrent users") else: print(f"\n✅ System handled up to {load_levels[-1]} users successfully") def run_concurrent_test(self, num_users, duration=60): """Run concurrent test with specified number of users""" self.start_time = time.time() end_time = self.start_time + duration successful_requests = 0 total_requests = 0 response_times = [] def worker(): nonlocal successful_requests, total_requests while time.time() < end_time: start = time.time() try: response = requests.get( f"{self.base_url}/api/stress-test", timeout=10 ) response_time = time.time() - start total_requests += 1 response_times.append(response_time) if response.status_code < 400: successful_requests += 1 except Exception as e: total_requests += 1 response_times.append(time.time() - start) # Small random delay time.sleep(0.1 + (hash(threading.current_thread().ident) % 100) / 1000) # Start worker threads with ThreadPoolExecutor(max_workers=num_users) as executor: futures = [executor.submit(worker) for _ in range(num_users)] # Wait for all to complete for future in as_completed(futures): future.result() # Calculate results success_rate = (successful_requests / total_requests * 100) if total_requests > 0 else 0 avg_response_time = statistics.mean(response_times) if response_times else 0 p95_response_time = statistics.quantiles(response_times, n=20)[18] if len(response_times) > 20 else 0 throughput = total_requests / duration return { 'total_requests': total_requests, 'successful_requests': successful_requests, 'success_rate': success_rate, 'avg_response_time': avg_response_time, 'p95_response_time': p95_response_time, 'throughput': throughput, 'response_times': response_times } def memory_stress_test(self): """Test system behavior under memory pressure""" print("Starting memory stress test...") def memory_intensive_request(): # Create large payload large_data = 'x' * (1024 * 1024) # 1MB string try: response = requests.post( f"{self.base_url}/api/upload", data={'large_data': large_data}, timeout=30 ) return response.status_code < 400 except Exception as e: print(f"Memory stress request failed: {e}") return False # Run multiple memory-intensive requests concurrently with ThreadPoolExecutor(max_workers=50) as executor: futures = [executor.submit(memory_intensive_request) for _ in range(100)] successful = sum(1 for future in as_completed(futures) if future.result()) print(f"Memory stress test: {successful}/100 requests successful") # Force garbage collection gc.collect() def spike_test(self): """Sudden spike in traffic""" print("Running spike test...") # Start with normal load normal_load_executor = ThreadPoolExecutor(max_workers=10) normal_futures = [] def normal_request(): while time.time() < self.start_time + 120: # 2 minutes try: requests.get(f"{self.base_url}/api/users", timeout=5) time.sleep(1) except: pass # Start normal load self.start_time = time.time() for _ in range(10): future = normal_load_executor.submit(normal_request) normal_futures.append(future) # Wait 30 seconds, then create sudden spike time.sleep(30) print("Creating traffic spike...") spike_results = [] spike_start = time.time() def spike_request(): try: start = time.time() response = requests.get(f"{self.base_url}/api/users", timeout=10) response_time = time.time() - start return { 'success': response.status_code < 400, 'response_time': response_time } except: return { 'success': False, 'response_time': time.time() - start } # Create sudden spike of 500 concurrent requests with ThreadPoolExecutor(max_workers=500) as spike_executor: spike_futures = [spike_executor.submit(spike_request) for _ in range(500)] for future in as_completed(spike_futures): spike_results.append(future.result()) spike_duration = time.time() - spike_start # Analyze spike results successful_spikes = sum(1 for r in spike_results if r['success']) success_rate = successful_spikes / len(spike_results) * 100 avg_spike_response = statistics.mean([r['response_time'] for r in spike_results]) print(f"Spike test results:") print(f"Success rate during spike: {success_rate:.2f}%") print(f"Average response time during spike: {avg_spike_response*1000:.2f}ms") print(f"Spike duration: {spike_duration:.2f}s") # Clean up normal load normal_load_executor.shutdown() # Usage example if __name__ == "__main__": tester = StressTester("https://api.example.com") # Run different types of stress tests tester.stress_test_with_increasing_load() tester.memory_stress_test() tester.spike_test()

Best Practices for Performance Testing