Performance Testing
Master load testing, stress testing, and performance optimization techniques
Overview
Performance testing evaluates how applications perform under various load conditions. This module covers load testing, stress testing, volume testing, and performance monitoring to ensure your applications can handle real-world usage scenarios.
Key Metrics: Response time, throughput, resource utilization, concurrent users, error rates, and scalability limits.
Response Time
Time taken to process requests
Throughput
Requests processed per second
Concurrent Users
Simultaneous active users
Error Rate
Percentage of failed requests
Load Testing
Load testing verifies application performance under expected user loads:
Basic Load Testing Scripts
// Load testing with Artillery.js
// artillery.yml configuration
/*
config:
target: 'https://api.example.com'
phases:
- duration: 60
arrivalRate: 10
- duration: 120
arrivalRate: 50
- duration: 60
arrivalRate: 100
scenarios:
- name: "API Load Test"
weight: 100
flow:
- get:
url: "/api/users"
expect:
- statusCode: 200
- hasProperty: "users"
- think: 2
- post:
url: "/api/users"
json:
name: "Load Test User"
email: "loadtest@example.com"
expect:
- statusCode: 201
*/
// Using k6 for load testing
import http from 'k6/http';
import { check, sleep } from 'k6';
import { Rate } from 'k6/metrics';
// Custom metrics
export let errorRate = new Rate('errors');
export let options = {
stages: [
{ duration: '2m', target: 100 }, // Ramp up to 100 users
{ duration: '5m', target: 100 }, // Stay at 100 users
{ duration: '2m', target: 200 }, // Ramp up to 200 users
{ duration: '5m', target: 200 }, // Stay at 200 users
{ duration: '2m', target: 0 }, // Ramp down to 0 users
],
thresholds: {
'http_req_duration': ['p(95)<500'], // 95% of requests under 500ms
'http_req_failed': ['rate<0.1'], // Error rate under 10%
'errors': ['rate<0.1'],
},
};
export default function() {
// Test homepage
let response = http.get('https://example.com');
check(response, {
'status is 200': (r) => r.status === 200,
'response time < 500ms': (r) => r.timings.duration < 500,
}) || errorRate.add(1);
sleep(1);
// Test API endpoint
let apiResponse = http.get('https://api.example.com/users');
check(apiResponse, {
'API status is 200': (r) => r.status === 200,
'API response has users': (r) => {
try {
const body = JSON.parse(r.body);
return body.hasOwnProperty('users');
} catch (e) {
return false;
}
},
'API response time < 1s': (r) => r.timings.duration < 1000,
}) || errorRate.add(1);
sleep(2);
// Test POST request
const payload = {
name: `User ${__VU}-${__ITER}`,
email: `user${__VU}-${__ITER}@example.com`,
};
const params = {
headers: {
'Content-Type': 'application/json',
},
};
let postResponse = http.post('https://api.example.com/users',
JSON.stringify(payload), params);
check(postResponse, {
'POST status is 201': (r) => r.status === 201,
'POST response time < 2s': (r) => r.timings.duration < 2000,
}) || errorRate.add(1);
sleep(1);
}
// Custom teardown function
export function teardown(data) {
console.log('Load test completed');
console.log(`Total VUs: ${data.vus}`);
}
// Load testing with JMeter Java API
import org.apache.jmeter.control.LoopController;
import org.apache.jmeter.engine.StandardJMeterEngine;
import org.apache.jmeter.protocol.http.sampler.HTTPSampler;
import org.apache.jmeter.testelement.TestPlan;
import org.apache.jmeter.threads.ThreadGroup;
import org.apache.jmeter.util.JMeterUtils;
public class LoadTestExample {
public static void main(String[] args) {
// Initialize JMeter
JMeterUtils.loadJMeterProperties("jmeter.properties");
JMeterUtils.setJMeterHome("/path/to/jmeter");
JMeterUtils.initLocale();
// Create Test Plan
TestPlan testPlan = new TestPlan("Load Test Plan");
// Create Thread Group
ThreadGroup threadGroup = new ThreadGroup();
threadGroup.setName("Load Test Thread Group");
threadGroup.setNumThreads(100);
threadGroup.setRampUp(60);
threadGroup.setSamplerController(createLoopController());
// Create HTTP Sampler
HTTPSampler httpSampler = createHttpSampler();
// Add components to test plan
threadGroup.addTestElement(httpSampler);
testPlan.addTestElement(threadGroup);
// Run the test
StandardJMeterEngine jmeter = new StandardJMeterEngine();
jmeter.configure(testPlan);
jmeter.run();
}
private static LoopController createLoopController() {
LoopController loopController = new LoopController();
loopController.setLoops(10);
loopController.setFirst(true);
loopController.initialize();
return loopController;
}
private static HTTPSampler createHttpSampler() {
HTTPSampler httpSampler = new HTTPSampler();
httpSampler.setDomain("api.example.com");
httpSampler.setPort(443);
httpSampler.setProtocol("https");
httpSampler.setPath("/api/users");
httpSampler.setMethod("GET");
httpSampler.setName("API Load Test");
return httpSampler;
}
}
// Using Spring Boot Test for load testing
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ApplicationLoadTest {
@Test
public void loadTestUserEndpoint() throws InterruptedException {
int numberOfThreads = 50;
int numberOfRequestsPerThread = 100;
ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger errorCount = new AtomicInteger(0);
long startTime = System.currentTimeMillis();
for (int i = 0; i < numberOfThreads; i++) {
executor.submit(() -> {
for (int j = 0; j < numberOfRequestsPerThread; j++) {
try {
// Simulate HTTP request
RestTemplate restTemplate = new RestTemplate();
ResponseEntity response = restTemplate.getForEntity(
"http://localhost:8080/api/users", String.class
);
if (response.getStatusCode() == HttpStatus.OK) {
successCount.incrementAndGet();
} else {
errorCount.incrementAndGet();
}
// Small delay between requests
Thread.sleep(100);
} catch (Exception e) {
errorCount.incrementAndGet();
e.printStackTrace();
}
}
});
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.MINUTES);
long endTime = System.currentTimeMillis();
long totalTime = endTime - startTime;
int totalRequests = numberOfThreads * numberOfRequestsPerThread;
System.out.println("Load Test Results:");
System.out.println("Total Requests: " + totalRequests);
System.out.println("Successful Requests: " + successCount.get());
System.out.println("Failed Requests: " + errorCount.get());
System.out.println("Success Rate: " +
(successCount.get() * 100.0 / totalRequests) + "%");
System.out.println("Total Time: " + totalTime + "ms");
System.out.println("Requests per Second: " +
(totalRequests * 1000.0 / totalTime));
// Assertions
assertTrue("Success rate should be > 95%",
successCount.get() > (totalRequests * 0.95));
assertTrue("Average response time should be < 1000ms",
totalTime / totalRequests < 1000);
}
@Test
public void stressTestWithIncreasingLoad() throws InterruptedException {
int[] loadLevels = {10, 25, 50, 100, 200};
for (int users : loadLevels) {
System.out.println("Testing with " + users + " concurrent users");
long startTime = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(users);
AtomicInteger completedRequests = new AtomicInteger(0);
for (int i = 0; i < users; i++) {
executor.submit(() -> {
try {
RestTemplate restTemplate = new RestTemplate();
ResponseEntity response = restTemplate.getForEntity(
"http://localhost:8080/api/users", String.class
);
completedRequests.incrementAndGet();
} catch (Exception e) {
System.err.println("Request failed: " + e.getMessage());
}
});
}
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
long duration = System.currentTimeMillis() - startTime;
double throughput = completedRequests.get() * 1000.0 / duration;
System.out.println("Users: " + users +
", Completed: " + completedRequests.get() +
", Duration: " + duration + "ms" +
", Throughput: " + String.format("%.2f", throughput) + " req/s");
// Wait between load levels
Thread.sleep(5000);
}
}
}
# Load testing with Locust
from locust import HttpUser, task, between
import random
class WebsiteUser(HttpUser):
wait_time = between(1, 5) # Wait 1-5 seconds between tasks
def on_start(self):
"""Called when a user starts"""
# Login or setup tasks
self.login()
def login(self):
"""Simulate user login"""
response = self.client.post("/api/auth/login", json={
"username": f"testuser{random.randint(1, 1000)}",
"password": "password123"
})
if response.status_code == 200:
self.auth_token = response.json().get("token")
else:
self.auth_token = None
@task(3)
def view_homepage(self):
"""Most common task - view homepage"""
self.client.get("/")
@task(2)
def view_products(self):
"""View products page"""
self.client.get("/products")
@task(1)
def view_product_details(self):
"""View specific product"""
product_id = random.randint(1, 100)
with self.client.get(f"/products/{product_id}",
catch_response=True) as response:
if response.status_code == 404:
response.success() # 404 is expected for some products
@task(1)
def api_get_users(self):
"""Test API endpoint"""
headers = {}
if self.auth_token:
headers["Authorization"] = f"Bearer {self.auth_token}"
self.client.get("/api/users", headers=headers)
@task(0.5)
def create_user(self):
"""Less frequent task - create user"""
user_data = {
"name": f"User{random.randint(1, 10000)}",
"email": f"user{random.randint(1, 10000)}@example.com",
"age": random.randint(18, 80)
}
self.client.post("/api/users", json=user_data)
class AdminUser(HttpUser):
"""Different user behavior pattern for admin users"""
wait_time = between(2, 8)
weight = 1 # 10% of users will be admin users
@task
def admin_dashboard(self):
self.client.get("/admin/dashboard")
@task
def view_analytics(self):
self.client.get("/admin/analytics")
# Custom load test with requests
import requests
import threading
import time
from concurrent.futures import ThreadPoolExecutor
import statistics
class LoadTester:
def __init__(self, base_url, num_threads=50, duration=60):
self.base_url = base_url
self.num_threads = num_threads
self.duration = duration
self.results = []
self.start_time = None
def make_request(self, endpoint="/"):
"""Make a single HTTP request and record metrics"""
start_time = time.time()
try:
response = requests.get(f"{self.base_url}{endpoint}", timeout=10)
end_time = time.time()
result = {
'response_time': end_time - start_time,
'status_code': response.status_code,
'success': response.status_code < 400,
'timestamp': start_time
}
self.results.append(result)
return result
except requests.exceptions.RequestException as e:
end_time = time.time()
result = {
'response_time': end_time - start_time,
'status_code': 0,
'success': False,
'error': str(e),
'timestamp': start_time
}
self.results.append(result)
return result
def worker(self):
"""Worker thread that makes requests continuously"""
while time.time() - self.start_time < self.duration:
self.make_request()
time.sleep(0.1) # Small delay between requests
def run_load_test(self):
"""Run the load test"""
print(f"Starting load test with {self.num_threads} threads for {self.duration} seconds")
self.start_time = time.time()
# Start worker threads
with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
futures = [executor.submit(self.worker) for _ in range(self.num_threads)]
# Wait for all threads to complete
for future in futures:
future.result()
self.analyze_results()
def analyze_results(self):
"""Analyze and print test results"""
if not self.results:
print("No results to analyze")
return
successful_requests = [r for r in self.results if r['success']]
failed_requests = [r for r in self.results if not r['success']]
total_requests = len(self.results)
success_rate = len(successful_requests) / total_requests * 100
if successful_requests:
response_times = [r['response_time'] for r in successful_requests]
avg_response_time = statistics.mean(response_times)
median_response_time = statistics.median(response_times)
p95_response_time = statistics.quantiles(response_times, n=20)[18] # 95th percentile
min_response_time = min(response_times)
max_response_time = max(response_times)
else:
avg_response_time = median_response_time = p95_response_time = 0
min_response_time = max_response_time = 0
# Calculate throughput
actual_duration = max(r['timestamp'] for r in self.results) - min(r['timestamp'] for r in self.results)
throughput = total_requests / actual_duration if actual_duration > 0 else 0
print("\n" + "="*50)
print("LOAD TEST RESULTS")
print("="*50)
print(f"Total Requests: {total_requests}")
print(f"Successful Requests: {len(successful_requests)}")
print(f"Failed Requests: {len(failed_requests)}")
print(f"Success Rate: {success_rate:.2f}%")
print(f"Throughput: {throughput:.2f} requests/second")
print(f"Average Response Time: {avg_response_time*1000:.2f}ms")
print(f"Median Response Time: {median_response_time*1000:.2f}ms")
print(f"95th Percentile Response Time: {p95_response_time*1000:.2f}ms")
print(f"Min Response Time: {min_response_time*1000:.2f}ms")
print(f"Max Response Time: {max_response_time*1000:.2f}ms")
# Error analysis
if failed_requests:
print("\nERROR ANALYSIS:")
error_types = {}
for req in failed_requests:
error = req.get('error', f"HTTP {req['status_code']}")
error_types[error] = error_types.get(error, 0) + 1
for error, count in error_types.items():
print(f" {error}: {count} occurrences")
# Example usage
if __name__ == "__main__":
# Run load test
tester = LoadTester("https://api.example.com", num_threads=25, duration=30)
tester.run_load_test()
# Run Locust programmatically
# locust -f loadtest.py --host=https://api.example.com -u 100 -r 10 -t 300s --html=report.html
Stress Testing
Stress testing pushes the system beyond normal operating capacity to identify breaking points:
// Stress testing with k6
import http from 'k6/http';
import { check, sleep } from 'k6';
export let options = {
stages: [
// Warm up
{ duration: '1m', target: 50 },
// Normal load
{ duration: '2m', target: 100 },
// Stress load - gradually increase
{ duration: '2m', target: 200 },
{ duration: '2m', target: 400 },
{ duration: '2m', target: 800 },
// Spike load
{ duration: '1m', target: 1200 },
// Recovery
{ duration: '2m', target: 100 },
{ duration: '1m', target: 0 },
],
thresholds: {
// Normal load thresholds
'http_req_duration{load:normal}': ['p(95)<500'],
'http_req_failed{load:normal}': ['rate<0.01'],
// Stress load thresholds (more lenient)
'http_req_duration{load:stress}': ['p(95)<2000'],
'http_req_failed{load:stress}': ['rate<0.1'],
// Spike load thresholds (most lenient)
'http_req_duration{load:spike}': ['p(95)<5000'],
'http_req_failed{load:spike}': ['rate<0.2'],
},
};
export default function() {
let loadType = 'normal';
if (__VU > 800) {
loadType = 'spike';
} else if (__VU > 200) {
loadType = 'stress';
}
let response = http.get('https://api.example.com/stress-test', {
tags: { load: loadType }
});
check(response, {
'status is 200': (r) => r.status === 200,
'response time reasonable': (r) => {
if (loadType === 'normal') return r.timings.duration < 500;
if (loadType === 'stress') return r.timings.duration < 2000;
return r.timings.duration < 5000; // spike
}
});
// Varying sleep time based on load
if (loadType === 'spike') {
sleep(0.1); // Aggressive load
} else if (loadType === 'stress') {
sleep(0.5);
} else {
sleep(1); // Normal user behavior
}
}
// Stress test for specific endpoints
export function stressTestEndpoints() {
let endpoints = [
'/api/users',
'/api/products',
'/api/orders',
'/api/search'
];
endpoints.forEach(endpoint => {
let response = http.get(`https://api.example.com${endpoint}`);
check(response, {
[`${endpoint} responds`]: (r) => r.status < 500,
[`${endpoint} not too slow`]: (r) => r.timings.duration < 3000,
});
});
}
// Memory and resource stress test
export function resourceStressTest() {
// Create large payload
let largeData = 'x'.repeat(1024 * 100); // 100KB string
let response = http.post('https://api.example.com/upload', {
data: largeData,
type: 'stress-test'
});
check(response, {
'handles large payload': (r) => r.status < 500,
'processes within time limit': (r) => r.timings.duration < 10000,
});
}
// Stress testing with Java
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class StressTestRunner {
private final String baseUrl;
private final AtomicInteger requestCount = new AtomicInteger(0);
private final AtomicInteger successCount = new AtomicInteger(0);
private final AtomicInteger errorCount = new AtomicInteger(0);
private final AtomicLong totalResponseTime = new AtomicLong(0);
public StressTestRunner(String baseUrl) {
this.baseUrl = baseUrl;
}
public void runStressTest() throws InterruptedException {
System.out.println("Starting stress test...");
// Test with increasing load
int[] loadLevels = {50, 100, 200, 500, 1000, 1500, 2000};
for (int users : loadLevels) {
System.out.println("\n--- Testing with " + users + " concurrent users ---");
// Reset counters
requestCount.set(0);
successCount.set(0);
errorCount.set(0);
totalResponseTime.set(0);
long startTime = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(users);
CountDownLatch latch = new CountDownLatch(users);
// Submit tasks
for (int i = 0; i < users; i++) {
executor.submit(new StressTestWorker(latch));
}
// Wait for completion or timeout
boolean completed = latch.await(60, TimeUnit.SECONDS);
executor.shutdown();
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
// Calculate metrics
int total = requestCount.get();
double successRate = total > 0 ? (successCount.get() * 100.0 / total) : 0;
double avgResponseTime = total > 0 ? (totalResponseTime.get() / (double) total) : 0;
double throughput = duration > 0 ? (total * 1000.0 / duration) : 0;
// Print results
System.out.println("Completed: " + completed);
System.out.println("Total requests: " + total);
System.out.println("Successful requests: " + successCount.get());
System.out.println("Failed requests: " + errorCount.get());
System.out.println("Success rate: " + String.format("%.2f", successRate) + "%");
System.out.println("Average response time: " + String.format("%.2f", avgResponseTime) + "ms");
System.out.println("Throughput: " + String.format("%.2f", throughput) + " req/sec");
System.out.println("Duration: " + duration + "ms");
// Determine if system is breaking under this load
if (successRate < 80 || avgResponseTime > 5000) {
System.out.println("⚠️ System showing stress at " + users + " users");
}
if (successRate < 50) {
System.out.println("❌ System breaking point reached at " + users + " users");
break;
}
// Cool down period
Thread.sleep(5000);
}
}
private class StressTestWorker implements Runnable {
private final CountDownLatch latch;
public StressTestWorker(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
try {
// Perform multiple requests per user
for (int i = 0; i < 10; i++) {
performRequest();
Thread.sleep(100); // Small delay between requests
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
}
private void performRequest() {
long startTime = System.currentTimeMillis();
try {
// Simulate HTTP request (replace with actual HTTP client)
RestTemplate restTemplate = new RestTemplate();
ResponseEntity response = restTemplate.getForEntity(
baseUrl + "/api/stress-test", String.class);
long responseTime = System.currentTimeMillis() - startTime;
requestCount.incrementAndGet();
totalResponseTime.addAndGet(responseTime);
if (response.getStatusCode().is2xxSuccessful()) {
successCount.incrementAndGet();
} else {
errorCount.incrementAndGet();
}
} catch (Exception e) {
long responseTime = System.currentTimeMillis() - startTime;
requestCount.incrementAndGet();
totalResponseTime.addAndGet(responseTime);
errorCount.incrementAndGet();
System.err.println("Request failed: " + e.getMessage());
}
}
}
// Memory stress test
public void runMemoryStressTest() {
System.out.println("Starting memory stress test...");
ExecutorService executor = Executors.newFixedThreadPool(20);
for (int i = 0; i < 100; i++) {
executor.submit(() -> {
// Create large data structures
byte[] largeArray = new byte[1024 * 1024 * 10]; // 10MB
// Fill with data
for (int j = 0; j < largeArray.length; j++) {
largeArray[j] = (byte) (j % 256);
}
// Simulate processing
try {
Thread.sleep(1000);
// Send large payload
String largeString = new String(largeArray);
// ... make HTTP request with large payload
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
try {
executor.awaitTermination(300, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) throws InterruptedException {
StressTestRunner runner = new StressTestRunner("https://api.example.com");
runner.runStressTest();
runner.runMemoryStressTest();
}
}
# Stress testing in Python
import requests
import threading
import time
import queue
import statistics
from concurrent.futures import ThreadPoolExecutor, as_completed
import psutil
import gc
class StressTester:
def __init__(self, base_url):
self.base_url = base_url
self.results_queue = queue.Queue()
self.start_time = None
def stress_test_with_increasing_load(self):
"""Test with gradually increasing load to find breaking point"""
load_levels = [10, 25, 50, 100, 200, 400, 800, 1200, 1600, 2000]
print("Starting stress test with increasing load...")
breaking_point = None
for users in load_levels:
print(f"\n--- Testing with {users} concurrent users ---")
# Clear previous results
while not self.results_queue.empty():
try:
self.results_queue.get_nowait()
except queue.Empty:
break
# Run test with current load level
results = self.run_concurrent_test(users, duration=30)
# Analyze results
success_rate = results['success_rate']
avg_response_time = results['avg_response_time']
p95_response_time = results['p95_response_time']
throughput = results['throughput']
print(f"Success Rate: {success_rate:.2f}%")
print(f"Avg Response Time: {avg_response_time*1000:.2f}ms")
print(f"95th Percentile: {p95_response_time*1000:.2f}ms")
print(f"Throughput: {throughput:.2f} req/sec")
# Determine if system is under stress
if success_rate < 95 or avg_response_time > 2.0:
print(f"⚠️ System showing stress at {users} users")
# Check for breaking point
if success_rate < 80 or avg_response_time > 5.0:
print(f"❌ Breaking point reached at {users} users")
breaking_point = users
break
# Monitor system resources
cpu_percent = psutil.cpu_percent(interval=1)
memory_percent = psutil.virtual_memory().percent
print(f"System CPU: {cpu_percent}%, Memory: {memory_percent}%")
# Cool down period
time.sleep(5)
if breaking_point:
print(f"\n🔥 System breaking point: {breaking_point} concurrent users")
else:
print(f"\n✅ System handled up to {load_levels[-1]} users successfully")
def run_concurrent_test(self, num_users, duration=60):
"""Run concurrent test with specified number of users"""
self.start_time = time.time()
end_time = self.start_time + duration
successful_requests = 0
total_requests = 0
response_times = []
def worker():
nonlocal successful_requests, total_requests
while time.time() < end_time:
start = time.time()
try:
response = requests.get(
f"{self.base_url}/api/stress-test",
timeout=10
)
response_time = time.time() - start
total_requests += 1
response_times.append(response_time)
if response.status_code < 400:
successful_requests += 1
except Exception as e:
total_requests += 1
response_times.append(time.time() - start)
# Small random delay
time.sleep(0.1 + (hash(threading.current_thread().ident) % 100) / 1000)
# Start worker threads
with ThreadPoolExecutor(max_workers=num_users) as executor:
futures = [executor.submit(worker) for _ in range(num_users)]
# Wait for all to complete
for future in as_completed(futures):
future.result()
# Calculate results
success_rate = (successful_requests / total_requests * 100) if total_requests > 0 else 0
avg_response_time = statistics.mean(response_times) if response_times else 0
p95_response_time = statistics.quantiles(response_times, n=20)[18] if len(response_times) > 20 else 0
throughput = total_requests / duration
return {
'total_requests': total_requests,
'successful_requests': successful_requests,
'success_rate': success_rate,
'avg_response_time': avg_response_time,
'p95_response_time': p95_response_time,
'throughput': throughput,
'response_times': response_times
}
def memory_stress_test(self):
"""Test system behavior under memory pressure"""
print("Starting memory stress test...")
def memory_intensive_request():
# Create large payload
large_data = 'x' * (1024 * 1024) # 1MB string
try:
response = requests.post(
f"{self.base_url}/api/upload",
data={'large_data': large_data},
timeout=30
)
return response.status_code < 400
except Exception as e:
print(f"Memory stress request failed: {e}")
return False
# Run multiple memory-intensive requests concurrently
with ThreadPoolExecutor(max_workers=50) as executor:
futures = [executor.submit(memory_intensive_request) for _ in range(100)]
successful = sum(1 for future in as_completed(futures) if future.result())
print(f"Memory stress test: {successful}/100 requests successful")
# Force garbage collection
gc.collect()
def spike_test(self):
"""Sudden spike in traffic"""
print("Running spike test...")
# Start with normal load
normal_load_executor = ThreadPoolExecutor(max_workers=10)
normal_futures = []
def normal_request():
while time.time() < self.start_time + 120: # 2 minutes
try:
requests.get(f"{self.base_url}/api/users", timeout=5)
time.sleep(1)
except:
pass
# Start normal load
self.start_time = time.time()
for _ in range(10):
future = normal_load_executor.submit(normal_request)
normal_futures.append(future)
# Wait 30 seconds, then create sudden spike
time.sleep(30)
print("Creating traffic spike...")
spike_results = []
spike_start = time.time()
def spike_request():
try:
start = time.time()
response = requests.get(f"{self.base_url}/api/users", timeout=10)
response_time = time.time() - start
return {
'success': response.status_code < 400,
'response_time': response_time
}
except:
return {
'success': False,
'response_time': time.time() - start
}
# Create sudden spike of 500 concurrent requests
with ThreadPoolExecutor(max_workers=500) as spike_executor:
spike_futures = [spike_executor.submit(spike_request) for _ in range(500)]
for future in as_completed(spike_futures):
spike_results.append(future.result())
spike_duration = time.time() - spike_start
# Analyze spike results
successful_spikes = sum(1 for r in spike_results if r['success'])
success_rate = successful_spikes / len(spike_results) * 100
avg_spike_response = statistics.mean([r['response_time'] for r in spike_results])
print(f"Spike test results:")
print(f"Success rate during spike: {success_rate:.2f}%")
print(f"Average response time during spike: {avg_spike_response*1000:.2f}ms")
print(f"Spike duration: {spike_duration:.2f}s")
# Clean up normal load
normal_load_executor.shutdown()
# Usage example
if __name__ == "__main__":
tester = StressTester("https://api.example.com")
# Run different types of stress tests
tester.stress_test_with_increasing_load()
tester.memory_stress_test()
tester.spike_test()
Best Practices for Performance Testing
- Test Environment: Use production-like environments for accurate results
- Baseline Establishment: Establish performance baselines for comparison
- Realistic Data: Use realistic test data volumes and complexity
- Gradual Load Increase: Ramp up load gradually to identify breaking points
- Monitor Resources: Track CPU, memory, network, and database metrics
- Test Isolation: Ensure tests don't interfere with each other
- Continuous Testing: Integrate performance tests into CI/CD pipelines
- Result Analysis: Analyze trends over time, not just individual test runs
- Real User Monitoring: Complement synthetic tests with real user metrics
- Regular Reviews: Review and update performance requirements regularly