well-api/usage_example.py
2025-06-15 18:39:43 -07:00

147 lines
4.8 KiB
Python

import numpy as np
import time
from filter_short_groups import filter_short_groups_c
def filter_short_groups_numpy(presence_list, filter_size, device_id, dates_str):
"""
Original NumPy implementation, kept for comparison purposes.
"""
# Start timer (optional, for benchmarking)
st = time.time()
if not presence_list or filter_size <= 1:
print(f"NumPy Optimized: Early exit/no processing time: {time.time() - st:.6f}s")
return presence_list[:] if isinstance(presence_list, list) else list(presence_list)
result = np.array(presence_list, dtype=float)
n = len(result)
previous_states = set()
while True:
# Cycle detection
current_state_tuple = tuple(result)
if current_state_tuple in previous_states:
print("NumPy Optimized: Cycle detected, breaking.")
break
previous_states.add(current_state_tuple)
# 1. Calculate the sign of each element (-1, 0, 1)
signs = np.sign(result)
# 2. Find indices where the sign changes
change_indices = np.where(np.diff(signs) != 0)[0] + 1
# 3. Define the boundaries of all consecutive runs
boundaries = np.concatenate(([0], change_indices, [n]))
# If there's only one segment, no further processing is needed.
if len(boundaries) <= 2:
break
# 4. Vectorized extraction of run properties
run_starts = boundaries[:-1]
run_ends = boundaries[1:]
run_lengths = run_ends - run_starts
run_signs = signs[run_starts]
# 5. Identify short runs and collect their properties
short_runs_to_process = []
for i in range(len(run_starts)):
if run_lengths[i] > 0 and run_lengths[i] < filter_size:
short_runs_to_process.append({
'start': run_starts[i],
'end': run_ends[i],
'sign': run_signs[i],
'length': run_lengths[i]
})
# 6. Check if any modifiable short runs were found
if not short_runs_to_process:
break
# 7. Sort the short runs: shortest first, then by start index for determinism
short_runs_to_process.sort(key=lambda r: (r['length'], r['start']))
# 8. Process ONLY the *first* (shortest) identified run in this pass
run_to_process = short_runs_to_process[0]
start = run_to_process['start']
end = run_to_process['end']
run_sign = run_to_process['sign']
# Determine the replacement value
replacement_value = 1.0 if run_sign == 0 else 0.0
# 9. Apply the replacement
result[start:end] = replacement_value
# End timer and print
print(f"filter_short_groups_numpy time: {time.time() - st:.6f}s")
return result.tolist()
def benchmark_comparison(input_data, filter_size, iterations=10):
"""
Compare performance between NumPy and C implementations
"""
device_id = "test_device"
dates_str = "2025-05-21"
# Warm-up runs
_ = filter_short_groups_numpy(input_data, filter_size, device_id, dates_str)
_ = filter_short_groups_c(input_data, filter_size, device_id, dates_str)
# NumPy benchmark
numpy_times = []
for _ in range(iterations):
start = time.time()
numpy_result = filter_short_groups_numpy(input_data, filter_size, device_id, dates_str)
numpy_times.append(time.time() - start)
# C implementation benchmark
c_times = []
for _ in range(iterations):
start = time.time()
c_result = filter_short_groups_c(input_data, filter_size, device_id, dates_str)
c_times.append(time.time() - start)
# Check results match
results_match = numpy_result == c_result
# Print results
print(f"\nResults from NumPy and C implementation match: {results_match}")
print(f"\nNumPy Implementation:")
print(f" Average time: {np.mean(numpy_times):.6f}s")
print(f" Min time: {np.min(numpy_times):.6f}s")
print(f" Max time: {np.max(numpy_times):.6f}s")
print(f"\nC Implementation:")
print(f" Average time: {np.mean(c_times):.6f}s")
print(f" Min time: {np.min(c_times):.6f}s")
print(f" Max time: {np.max(c_times):.6f}s")
speedup = np.mean(numpy_times) / np.mean(c_times)
print(f"\nSpeedup factor: {speedup:.2f}x")
return numpy_result, c_result
if __name__ == "__main__":
# Example data
# Random presence list with mix of 0s and 1s
np.random.seed(42)
data_small = list(np.random.choice([0.0, 1.0], size=100))
data_medium = list(np.random.choice([0.0, 1.0], size=1000))
data_large = list(np.random.choice([0.0, 1.0], size=10000))
# Example with small data
print("\n===== Small dataset (100 elements) =====")
numpy_result_small, c_result_small = benchmark_comparison(data_small, filter_size=3)
# Example with medium data
print("\n===== Medium dataset (1000 elements) =====")
numpy_result_medium, c_result_medium = benchmark_comparison(data_medium, filter_size=5)
# Example with large data
print("\n===== Large dataset (10000 elements) =====")
numpy_result_large, c_result_large = benchmark_comparison(data_large, filter_size=10)