Python Threading vs Async/Await for AV Control: Real-Time System Architecture Guide
Professional AV control systems demand responsive, concurrent operation to manage multiple devices simultaneously. Whether you're controlling projectors, audio processors, video switchers, and lighting systems, understanding Python's concurrency models is crucial for building robust, real-time control applications. This comprehensive guide explores threading versus async/await patterns, GIL workarounds, and practical implementation strategies for professional AV installations.
Table of Contents
- Concurrency in AV Control Systems
- Threading vs Async/Await Fundamentals
- Real-Time Control Requirements
- Python GIL and AV Control Implications
- Multiprocessing for AV Systems
- Queue-Based Communication Patterns
- Threading Implementation Examples
- Async/Await Implementation Examples
- Hybrid Approaches
- Performance Comparison and Benchmarking
- Best Practices and Architecture Decisions
Concurrency in AV Control Systems
Modern AV installations require simultaneous control of numerous devices with varying response times and communication protocols. A typical corporate boardroom might include:
- Projectors: Power control, input switching (2-5 second response)
- Audio DSP: Volume adjustment, preset changes (100-500ms response)
- Video Switchers: Input routing, scaling adjustments (instant to 1 second)
- Lighting Systems: Scene changes, dimming (100ms to 2 seconds)
- Environmental Controls: Screen/shade positioning (5-30 seconds)
Each device operates independently with different timing requirements, making concurrency essential for responsive user experiences.
[object Object],
,[object Object], time
,[object Object],
,[object Object], ,[object Object],():
,[object Object],(,[object Object],)
time.sleep(,[object Object],) ,[object Object],
,[object Object],(,[object Object],)
time.sleep(,[object Object],) ,[object Object],
,[object Object],(,[object Object],)
time.sleep(,[object Object],) ,[object Object],
,[object Object],(,[object Object],) ,[object Object],
,[object Object],
,[object Object], threading
,[object Object], ,[object Object],():
,[object Object], ,[object Object],():
,[object Object],(,[object Object],)
time.sleep(,[object Object],)
,[object Object],(,[object Object],)
,[object Object], ,[object Object],():
,[object Object],(,[object Object],)
time.sleep(,[object Object],)
,[object Object],(,[object Object],)
,[object Object], ,[object Object],():
,[object Object],(,[object Object],)
time.sleep(,[object Object],)
,[object Object],(,[object Object],)
,[object Object],
threads = [
threading.Thread(target=start_projector),
threading.Thread(target=configure_audio),
threading.Thread(target=set_lighting)
]
,[object Object], thread ,[object Object], threads:
thread.start()
,[object Object], thread ,[object Object], threads:
thread.join()
,[object Object],(,[object Object],) ,[object Object],
Threading vs Async/Await Fundamentals
Threading Model
Threading creates multiple execution contexts within a single process, allowing concurrent execution of I/O-bound operations despite Python's Global Interpreter Lock (GIL).
Advantages for AV Control:
- Familiar programming model
- Good for blocking I/O operations (serial, TCP/IP)
- Built-in synchronization primitives
- Works well with existing libraries
Disadvantages:
- GIL limits CPU-bound parallelism
- Higher memory overhead per thread
- Context switching overhead
- Race condition complexity
Async/Await Model
Async/await provides cooperative concurrency using a single-threaded event loop, yielding control during I/O operations.
Advantages for AV Control:
- Lower memory overhead
- No race conditions on shared state
- Excellent for I/O-bound network operations
- Precise control over execution flow
Disadvantages:
- Learning curve for traditional programmers
- All operations must be async-compatible
- Blocking calls can freeze entire system
- Limited third-party library support
Real-Time Control Requirements
AV control systems must satisfy specific timing requirements:
Latency Requirements
- User Interface Response: < 100ms for button press acknowledgment
- Audio Control: < 50ms for volume changes to prevent user confusion
- Video Switching: < 200ms for seamless transitions
- Emergency Shutoff: < 1 second for safety systems
Throughput Requirements
- Status Polling: 1-10 Hz per device depending on criticality
- Command Processing: 10-100 commands/second system-wide
- Event Handling: Real-time response to device state changes
[object Object], asyncio
,[object Object], time
,[object Object], dataclasses ,[object Object], dataclass
,[object Object], typing ,[object Object], ,[object Object],, ,[object Object],
,[object Object],
,[object Object], ,[object Object],:
command_latency: ,[object Object],
response_time: ,[object Object],
throughput: ,[object Object],
,[object Object], ,[object Object],:
,[object Object], ,[object Object],(,[object Object],):
,[object Object],.metrics: ,[object Object],[,[object Object],, PerformanceMetrics] = {}
,[object Object],.command_queue = asyncio.Queue()
,[object Object], ,[object Object], ,[object Object],(,[object Object],):
,[object Object],
start_time = time.perf_counter()
,[object Object],
,[object Object], ,[object Object],.execute_command(device_id, command)
end_time = time.perf_counter()
latency = (end_time - start_time) * ,[object Object], ,[object Object],
,[object Object],
,[object Object], device_id ,[object Object], ,[object Object], ,[object Object],.metrics:
,[object Object],.metrics[device_id] = PerformanceMetrics(,[object Object],, ,[object Object],, ,[object Object],)
,[object Object],.metrics[device_id].command_latency = latency
,[object Object],
,[object Object], latency > ,[object Object],: ,[object Object],
,[object Object],(,[object Object],)
,[object Object], ,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object], asyncio.sleep(,[object Object],) ,[object Object],
Python GIL and AV Control Implications
The Global Interpreter Lock (GIL) prevents true parallelism in CPU-bound Python operations but has different implications for AV control:
GIL Impact Analysis
[object Object], threading
,[object Object], time
,[object Object], serial
,[object Object], asyncio
,[object Object], ,[object Object],:
,[object Object], ,[object Object],(,[object Object],):
,[object Object],.serial_ports = {}
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
start = time.time()
,[object Object], time.time() - start < duration:
,[object Object],
,[object Object],(,[object Object],(,[object Object],))
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
time.sleep(duration) ,[object Object],
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object],
,[object Object],(,[object Object],)
start_time = time.time()
cpu_threads = [
threading.Thread(target=,[object Object],.cpu_intensive_task, args=(,[object Object],,))
,[object Object], _ ,[object Object], ,[object Object],(,[object Object],)
]
,[object Object], thread ,[object Object], cpu_threads:
thread.start()
,[object Object], thread ,[object Object], cpu_threads:
thread.join()
cpu_time = time.time() - start_time
,[object Object],(,[object Object],)
,[object Object],
,[object Object],(,[object Object],)
start_time = time.time()
io_threads = [
threading.Thread(target=,[object Object],.io_bound_task, args=(,[object Object],,))
,[object Object], _ ,[object Object], ,[object Object],(,[object Object],)
]
,[object Object], thread ,[object Object], io_threads:
thread.start()
,[object Object], thread ,[object Object], io_threads:
thread.join()
io_time = time.time() - start_time
,[object Object],(,[object Object],)
GIL Workarounds for AV Control
- Use I/O-bound operations (serial, TCP/IP communication)
- Multiprocessing for CPU-intensive tasks
- Async/await for cooperative concurrency
- C extensions for performance-critical code
Multiprocessing for AV Systems
When GIL limitations become problematic, multiprocessing provides true parallelism:
[object Object], multiprocessing ,[object Object], mp
,[object Object], queue
,[object Object], time
,[object Object], typing ,[object Object], ,[object Object],, ,[object Object],
,[object Object], ,[object Object],:
,[object Object], ,[object Object],(,[object Object],):
,[object Object],.num_processes = num_processes
,[object Object],.command_queue = mp.Queue()
,[object Object],.result_queue = mp.Queue()
,[object Object],.processes = []
,[object Object],.running = ,[object Object],
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object],(,[object Object],)
,[object Object], ,[object Object],:
,[object Object],:
,[object Object],
command = ,[object Object],.command_queue.get(timeout=,[object Object],)
,[object Object], command ,[object Object], ,[object Object],: ,[object Object],
,[object Object],
,[object Object],
result = ,[object Object],.process_device_command(command)
,[object Object],.result_queue.put(result)
,[object Object], queue.Empty:
,[object Object],
,[object Object], Exception ,[object Object], e:
error_result = {
,[object Object],: worker_id,
,[object Object],: ,[object Object],(e),
,[object Object],: command
}
,[object Object],.result_queue.put(error_result)
,[object Object], ,[object Object],(,[object Object],) -> ,[object Object],[,[object Object],, ,[object Object],]:
,[object Object],
device_id = command.get(,[object Object],)
action = command.get(,[object Object],)
,[object Object],
time.sleep(,[object Object],)
,[object Object], {
,[object Object],: device_id,
,[object Object],: action,
,[object Object],: ,[object Object],,
,[object Object],: time.time()
}
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object],.running = ,[object Object],
,[object Object], i ,[object Object], ,[object Object],(,[object Object],.num_processes):
process = mp.Process(target=,[object Object],.worker_process, args=(i,))
process.start()
,[object Object],.processes.append(process)
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object],.running = ,[object Object],
,[object Object],
,[object Object], _ ,[object Object], ,[object Object],(,[object Object],.num_processes):
,[object Object],.command_queue.put(,[object Object],)
,[object Object],
,[object Object], process ,[object Object], ,[object Object],.processes:
process.join()
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
command = {
,[object Object],: device_id,
,[object Object],: action,
**kwargs
}
,[object Object],.command_queue.put(command)
,[object Object], ,[object Object],(,[object Object],) -> ,[object Object],[,[object Object],[,[object Object],, ,[object Object],]]:
,[object Object],
results = []
start_time = time.time()
,[object Object], time.time() - start_time < timeout:
,[object Object],:
result = ,[object Object],.result_queue.get_nowait()
results.append(result)
,[object Object], queue.Empty:
,[object Object],
,[object Object], results
,[object Object],
,[object Object], __name__ == ,[object Object],:
controller = MultiProcessAVController(num_processes=,[object Object],)
controller.start()
,[object Object],
devices = [,[object Object],, ,[object Object],, ,[object Object],, ,[object Object],]
,[object Object], device ,[object Object], devices:
controller.send_command(device, ,[object Object],)
controller.send_command(device, ,[object Object],)
,[object Object],
time.sleep(,[object Object],)
results = controller.get_results()
,[object Object], result ,[object Object], results:
,[object Object],(,[object Object],)
controller.stop()
Queue-Based Communication Patterns
Queues provide thread-safe communication between concurrent operations:
[object Object], threading
,[object Object], queue
,[object Object], time
,[object Object], enum ,[object Object], Enum
,[object Object], dataclasses ,[object Object], dataclass
,[object Object], typing ,[object Object], ,[object Object],, ,[object Object],
,[object Object], ,[object Object],(,[object Object],):
LOW = ,[object Object],
NORMAL = ,[object Object],
HIGH = ,[object Object],
EMERGENCY = ,[object Object],
,[object Object],
,[object Object], ,[object Object],:
device_id: ,[object Object],
action: ,[object Object],
parameters: ,[object Object],
priority: Priority
callback: ,[object Object],[,[object Object],] = ,[object Object],
timeout: ,[object Object], = ,[object Object],
,[object Object], ,[object Object],:
,[object Object], ,[object Object],(,[object Object],):
,[object Object],.command_queue = queue.PriorityQueue()
,[object Object],.workers = []
,[object Object],.running = ,[object Object],
,[object Object],.max_workers = max_workers
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object],.running = ,[object Object],
,[object Object], i ,[object Object], ,[object Object],(,[object Object],.max_workers):
worker = threading.Thread(
target=,[object Object],._worker_thread,
args=(,[object Object],,),
daemon=,[object Object],
)
worker.start()
,[object Object],.workers.append(worker)
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object],.running = ,[object Object],
,[object Object],
,[object Object], _ ,[object Object], ,[object Object],(,[object Object],.max_workers):
,[object Object],.command_queue.put((Priority.EMERGENCY.value, ,[object Object],))
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object],(,[object Object],)
,[object Object], ,[object Object],.running:
,[object Object],:
,[object Object],
priority, command = ,[object Object],.command_queue.get(timeout=,[object Object],)
,[object Object], command ,[object Object], ,[object Object],: ,[object Object],
,[object Object],
,[object Object],(,[object Object],)
,[object Object],
result = ,[object Object],._execute_command(command)
,[object Object],
,[object Object], command.callback:
command.callback(result)
,[object Object],.command_queue.task_done()
,[object Object], queue.Empty:
,[object Object],
,[object Object], Exception ,[object Object], e:
,[object Object],(,[object Object],)
,[object Object], ,[object Object],(,[object Object],) -> ,[object Object],:
,[object Object],
,[object Object],
time.sleep(,[object Object], + (command.priority.value * ,[object Object],))
,[object Object], {
,[object Object],: command.device_id,
,[object Object],: command.action,
,[object Object],: ,[object Object],,
,[object Object],: command.parameters
}
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
command = AVCommand(
device_id=device_id,
action=action,
parameters=parameters,
priority=priority,
callback=callback
)
,[object Object],
,[object Object],.command_queue.put((priority.value, command))
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
devices = [,[object Object],, ,[object Object],, ,[object Object],]
,[object Object], device ,[object Object], devices:
,[object Object],.send_command(
device_id=device,
action=,[object Object],,
priority=Priority.EMERGENCY
)
,[object Object],
,[object Object], ,[object Object],(,[object Object],):
,[object Object],(,[object Object],)
controller = PriorityAVController(max_workers=,[object Object],)
controller.start()
,[object Object],
controller.send_command(,[object Object],, ,[object Object],, Priority.LOW, result_callback)
controller.send_command(,[object Object],, ,[object Object],, Priority.HIGH, result_callback)
controller.send_command(,[object Object],, ,[object Object],, Priority.NORMAL, result_callback, volume=,[object Object],)
,[object Object],
controller.send_command(,[object Object],, ,[object Object],, Priority.EMERGENCY, result_callback)
time.sleep(,[object Object],)
controller.stop()
Threading Implementation Examples
Thread-Safe Device Manager
[object Object], threading
,[object Object], time
,[object Object], serial
,[object Object], typing ,[object Object], ,[object Object],, ,[object Object],, ,[object Object],
,[object Object], dataclasses ,[object Object], dataclass, field
,[object Object],
,[object Object], ,[object Object],:
power: ,[object Object], = ,[object Object],
input_source: ,[object Object], = ,[object Object],
volume: ,[object Object], = ,[object Object],
last_update: ,[object Object], = field(default_factory=time.time)
lock: threading.RLock = field(default_factory=threading.RLock)
,[object Object], ,[object Object],:
,[object Object], ,[object Object],(,[object Object],):
,[object Object],.device_id = device_id
,[object Object],.port = port
,[object Object],.baudrate = baudrate
,[object Object],.state = DeviceState()
,[object Object],.serial_connection: ,[object Object],[serial.Serial] = ,[object Object],
,[object Object],.monitoring_thread: ,[object Object],[threading.Thread] = ,[object Object],
,[object Object],.running = ,[object Object],
,[object Object], ,[object Object],(,[object Object],) -> ,[object Object],:
,[object Object],
,[object Object],:
,[object Object],.serial_connection = serial.Serial(
port=,[object Object],.port,
baudrate=,[object Object],.baudrate,
timeout=,[object Object],
)
,[object Object], ,[object Object],
,[object Object], Exception ,[object Object], e:
,[object Object],(,[object Object],)
,[object Object], ,[object Object],
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object], ,[object Object],.monitoring_thread ,[object Object], ,[object Object],.monitoring_thread.is_alive():
,[object Object],
,[object Object],.running = ,[object Object],
,[object Object],.monitoring_thread = threading.Thread(
target=,[object Object],._monitoring_loop,
daemon=,[object Object],
)
,[object Object],.monitoring_thread.start()
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object],.running = ,[object Object],
,[object Object], ,[object Object],.monitoring_thread:
,[object Object],.monitoring_thread.join(timeout=,[object Object],)
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object], ,[object Object],.running:
,[object Object],:
,[object Object],
status = ,[object Object],._query_device_status()
,[object Object],
,[object Object], ,[object Object],.state.lock:
,[object Object],.state.power = status.get(,[object Object],, ,[object Object],)
,[object Object],.state.input_source = status.get(,[object Object],, ,[object Object],)
,[object Object],.state.volume = status.get(,[object Object],, ,[object Object],)
,[object Object],.state.last_update = time.time()
time.sleep(,[object Object],) ,[object Object],
,[object Object], Exception ,[object Object], e:
,[object Object],(,[object Object],)
time.sleep(,[object Object],) ,[object Object],
,[object Object], ,[object Object],(,[object Object],) -> ,[object Object],[,[object Object],, ,[object Object],]:
,[object Object],
,[object Object], ,[object Object], ,[object Object],.serial_connection:
,[object Object], {}
,[object Object],:
,[object Object],
,[object Object],.serial_connection.write(,[object Object],)
response = ,[object Object],.serial_connection.readline().decode(,[object Object],).strip()
,[object Object],
,[object Object], ,[object Object],._parse_status_response(response)
,[object Object], Exception ,[object Object], e:
,[object Object],(,[object Object],)
,[object Object], {}
,[object Object], ,[object Object],(,[object Object],) -> ,[object Object],[,[object Object],, ,[object Object],]:
,[object Object],
,[object Object],
,[object Object], {
,[object Object],: ,[object Object], ,[object Object], response,
,[object Object],: ,[object Object],,
,[object Object],: ,[object Object],
}
,[object Object], ,[object Object],(,[object Object],) -> ,[object Object],[,[object Object],]:
,[object Object],
,[object Object], ,[object Object], ,[object Object],.serial_connection:
,[object Object], ,[object Object],
,[object Object],
,[object Object], threading.Lock():
,[object Object],:
,[object Object],.serial_connection.write(,[object Object],.encode(,[object Object],))
,[object Object], wait_for_response:
response = ,[object Object],.serial_connection.readline().decode(,[object Object],).strip()
,[object Object], response
,[object Object], ,[object Object],
,[object Object], Exception ,[object Object], e:
,[object Object],(,[object Object],)
,[object Object], ,[object Object],
,[object Object], ,[object Object],(,[object Object],) -> ,[object Object],[,[object Object],, ,[object Object],]:
,[object Object],
,[object Object], ,[object Object],.state.lock:
,[object Object], {
,[object Object],: ,[object Object],.state.power,
,[object Object],: ,[object Object],.state.input_source,
,[object Object],: ,[object Object],.state.volume,
,[object Object],: ,[object Object],.state.last_update
}
,[object Object], ,[object Object],:
,[object Object], ,[object Object],(,[object Object],):
,[object Object],.devices: ,[object Object],[,[object Object],, ThreadedAVDevice] = {}
,[object Object],.system_lock = threading.RLock()
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object], ,[object Object],.system_lock:
device = ThreadedAVDevice(device_id, port, baudrate)
,[object Object], device.connect():
device.start_monitoring()
,[object Object],.devices[device_id] = device
,[object Object], ,[object Object],
,[object Object], ,[object Object],
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object], ,[object Object],.system_lock:
,[object Object], device_id ,[object Object], ,[object Object],.devices:
,[object Object],.devices[device_id].stop_monitoring()
,[object Object], ,[object Object],.devices[device_id]
,[object Object], ,[object Object],(,[object Object],) -> ,[object Object],[,[object Object],, ,[object Object],[,[object Object],, ,[object Object],]]:
,[object Object],
,[object Object], ,[object Object],.system_lock:
,[object Object], {
device_id: device.get_state()
,[object Object], device_id, device ,[object Object], ,[object Object],.devices.items()
}
Async/Await Implementation Examples
Async Device Controller
[object Object], asyncio
,[object Object], aiofiles
,[object Object], aiohttp
,[object Object], typing ,[object Object], ,[object Object],, ,[object Object],, ,[object Object],, ,[object Object],
,[object Object], dataclasses ,[object Object], dataclass
,[object Object], json
,[object Object], time
,[object Object],
,[object Object], ,[object Object],:
device_id: ,[object Object],
endpoint: ,[object Object],
poll_interval: ,[object Object], = ,[object Object],
timeout: ,[object Object], = ,[object Object],
max_retries: ,[object Object], = ,[object Object],
,[object Object], ,[object Object],:
,[object Object], ,[object Object],(,[object Object],):
,[object Object],.config = config
,[object Object],.state = {}
,[object Object],.session: ,[object Object],[aiohttp.ClientSession] = ,[object Object],
,[object Object],.monitoring_task: ,[object Object],[asyncio.Task] = ,[object Object],
,[object Object],.callbacks: ,[object Object],[,[object Object],] = []
,[object Object], ,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object],.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=,[object Object],.config.timeout)
)
,[object Object],
,[object Object],.monitoring_task = asyncio.create_task(,[object Object],._monitoring_loop())
,[object Object], ,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object], ,[object Object],.monitoring_task:
,[object Object],.monitoring_task.cancel()
,[object Object],:
,[object Object], ,[object Object],.monitoring_task
,[object Object], asyncio.CancelledError:
,[object Object],
,[object Object], ,[object Object],.session:
,[object Object], ,[object Object],.session.close()
,[object Object], ,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object], ,[object Object],:
,[object Object],:
,[object Object],
new_state = ,[object Object], ,[object Object],._fetch_device_status()
,[object Object],
,[object Object], new_state != ,[object Object],.state:
old_state = ,[object Object],.state.copy()
,[object Object],.state = new_state
,[object Object],
,[object Object], ,[object Object],._notify_state_change(old_state, new_state)
,[object Object], asyncio.sleep(,[object Object],.config.poll_interval)
,[object Object], asyncio.CancelledError:
,[object Object],
,[object Object], Exception ,[object Object], e:
,[object Object],(,[object Object],)
,[object Object], asyncio.sleep(,[object Object],)
,[object Object], ,[object Object], ,[object Object],(,[object Object],) -> ,[object Object],[,[object Object],, ,[object Object],]:
,[object Object],
,[object Object], ,[object Object], ,[object Object],.session:
,[object Object], {}
,[object Object],:
,[object Object], ,[object Object], ,[object Object],.session.get(,[object Object],) ,[object Object], response:
,[object Object], response.status == ,[object Object],:
,[object Object], ,[object Object], response.json()
,[object Object],:
,[object Object],(,[object Object],)
,[object Object], {}
,[object Object], Exception ,[object Object], e:
,[object Object],(,[object Object],)
,[object Object], {}
,[object Object], ,[object Object], ,[object Object],(,[object Object],) -> ,[object Object],[,[object Object],, ,[object Object],]:
,[object Object],
,[object Object], ,[object Object], ,[object Object],.session:
,[object Object], {,[object Object],: ,[object Object],}
payload = {
,[object Object],: command,
,[object Object],: parameters ,[object Object], {}
}
,[object Object], attempt ,[object Object], ,[object Object],(,[object Object],.config.max_retries):
,[object Object],:
,[object Object], ,[object Object], ,[object Object],.session.post(
,[object Object],,
json=payload
) ,[object Object], response:
,[object Object], response.status == ,[object Object],:
,[object Object], ,[object Object], response.json()
,[object Object],:
error_msg = ,[object Object],
,[object Object], attempt == ,[object Object],.config.max_retries - ,[object Object],:
,[object Object], {,[object Object],: error_msg}
,[object Object], asyncio.sleep(,[object Object], * (attempt + ,[object Object],))
,[object Object], Exception ,[object Object], e:
,[object Object], attempt == ,[object Object],.config.max_retries - ,[object Object],:
,[object Object], {,[object Object],: ,[object Object],(e)}
,[object Object], asyncio.sleep(,[object Object], * (attempt + ,[object Object],))
,[object Object], {,[object Object],: ,[object Object],}
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object],.callbacks.append(callback)
,[object Object], ,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object], callback ,[object Object], ,[object Object],.callbacks:
,[object Object],:
,[object Object], asyncio.iscoroutinefunction(callback):
,[object Object], callback(,[object Object],.config.device_id, old_state, new_state)
,[object Object],:
callback(,[object Object],.config.device_id, old_state, new_state)
,[object Object], Exception ,[object Object], e:
,[object Object],(,[object Object],)
,[object Object], ,[object Object],:
,[object Object], ,[object Object],(,[object Object],):
,[object Object],.devices: ,[object Object],[,[object Object],, AsyncAVDevice] = {}
,[object Object],.command_queue = asyncio.Queue()
,[object Object],.processor_task: ,[object Object],[asyncio.Task] = ,[object Object],
,[object Object], ,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object],
,[object Object],.processor_task = asyncio.create_task(,[object Object],._command_processor())
,[object Object], ,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object],
,[object Object], ,[object Object],.processor_task:
,[object Object],.processor_task.cancel()
,[object Object],:
,[object Object], ,[object Object],.processor_task
,[object Object], asyncio.CancelledError:
,[object Object],
,[object Object],
cleanup_tasks = [device.cleanup() ,[object Object], device ,[object Object], ,[object Object],.devices.values()]
,[object Object], asyncio.gather(*cleanup_tasks, return_exceptions=,[object Object],)
,[object Object], ,[object Object], ,[object Object],(,[object Object],):
,[object Object],
device = AsyncAVDevice(config)
,[object Object], device.initialize()
,[object Object],
device.add_callback(,[object Object],._device_state_changed)
,[object Object],.devices[config.device_id] = device
,[object Object], ,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object],(,[object Object],)
,[object Object],
,[object Object],
,[object Object], ,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object], ,[object Object],:
,[object Object],:
command_data = ,[object Object], ,[object Object],.command_queue.get()
device_id = command_data[,[object Object],]
command = command_data[,[object Object],]
parameters = command_data.get(,[object Object],, {})
,[object Object], device_id ,[object Object], ,[object Object],.devices:
result = ,[object Object], ,[object Object],.devices[device_id].send_command(command, parameters)
,[object Object],
,[object Object], ,[object Object], ,[object Object], result:
,[object Object],(,[object Object],)
,[object Object],.command_queue.task_done()
,[object Object], asyncio.CancelledError:
,[object Object],
,[object Object], Exception ,[object Object], e:
,[object Object],(,[object Object],)
,[object Object], ,[object Object], ,[object Object],(,[object Object],):
,[object Object],
command_data = {
,[object Object],: device_id,
,[object Object],: command,
,[object Object],: parameters ,[object Object], {}
}
,[object Object], ,[object Object],.command_queue.put(command_data)
,[object Object], ,[object Object], ,[object Object],(,[object Object],):
,[object Object],
tasks = [
,[object Object],.send_command(device_id, command, parameters)
,[object Object], device_id ,[object Object], ,[object Object],.devices.keys()
]
,[object Object], asyncio.gather(*tasks)
,[object Object], ,[object Object],(,[object Object],) -> ,[object Object],[,[object Object],, ,[object Object],[,[object Object],, ,[object Object],]]:
,[object Object],
,[object Object], {
device_id: device.state
,[object Object], device_id, device ,[object Object], ,[object Object],.devices.items()
}
,[object Object],
,[object Object], ,[object Object], ,[object Object],():
system = AsyncAVSystem()
,[object Object], system.initialize()
,[object Object],
devices = [
AsyncDeviceConfig(,[object Object],, ,[object Object],),
AsyncDeviceConfig(,[object Object],, ,[object Object],),
AsyncDeviceConfig(,[object Object],, ,[object Object],)
]
,[object Object], config ,[object Object], devices:
,[object Object], system.add_device(config)
,[object Object],
,[object Object], system.send_command(,[object Object],, ,[object Object],)
,[object Object], system.broadcast_command(,[object Object],)
,[object Object],
,[object Object], asyncio.sleep(,[object Object],)
,[object Object],
,[object Object], system.shutdown()
,[object Object],
,[object Object],
Hybrid Approaches
Combining threading and async/await for optimal performance:
[object Object], asyncio
,[object Object], threading
,[object Object], queue
,[object Object], concurrent.futures ,[object Object], ThreadPoolExecutor
,[object Object], typing ,[object Object], ,[object Object],, ,[object Object],, ,[object Object],
,[object Object], ,[object Object],:
,[object Object], ,[object Object],(,[object Object],):
,[object Object],.max_threads = max_threads
,[object Object],.thread_pool = ThreadPoolExecutor(max_workers=max_threads)
,[object Object],.blocking_operations_queue = queue.Queue()
,[object Object], ,[object Object], ,[object Object],(,[object Object],) -> ,[object Object],[,[object Object],, ,[object Object],]:
,[object Object],
,[object Object],
,[object Object], asyncio.sleep(,[object Object],)
,[object Object], {,[object Object],: device_id, ,[object Object],: command, ,[object Object],: ,[object Object],}
,[object Object], ,[object Object],(,[object Object],) -> ,[object Object],[,[object Object],, ,[object Object],]:
,[object Object],
,[object Object], time
,[object Object],
time.sleep(,[object Object],)
,[object Object], {,[object Object],: device_id, ,[object Object],: command, ,[object Object],: ,[object Object],}
,[object Object], ,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object], use_async:
,[object Object],
,[object Object], ,[object Object], ,[object Object],.async_operation(device_id, command)
,[object Object],:
,[object Object],
loop = asyncio.get_event_loop()
,[object Object], ,[object Object], loop.run_in_executor(
,[object Object],.thread_pool,
,[object Object],.blocking_operation,
device_id,
command
)
,[object Object], ,[object Object], ,[object Object],(,[object Object],):
,[object Object],
tasks = [
,[object Object],
,[object Object],.execute_hybrid_command(,[object Object],, ,[object Object],, use_async=,[object Object],),
,[object Object],.execute_hybrid_command(,[object Object],, ,[object Object],, use_async=,[object Object],),
,[object Object],
,[object Object],.execute_hybrid_command(,[object Object],, ,[object Object],, use_async=,[object Object],),
,[object Object],.execute_hybrid_command(,[object Object],, ,[object Object],, use_async=,[object Object],),
]
results = ,[object Object], asyncio.gather(*tasks)
,[object Object], results
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object],.thread_pool.shutdown(wait=,[object Object],)
,[object Object],
,[object Object], ,[object Object], ,[object Object],():
controller = HybridAVController()
,[object Object],:
results = ,[object Object], controller.mixed_system_control()
,[object Object], result ,[object Object], results:
,[object Object],(,[object Object],)
,[object Object],:
controller.cleanup()
,[object Object],
Performance Comparison and Benchmarking
Understanding performance characteristics helps choose the right approach:
[object Object], asyncio
,[object Object], threading
,[object Object], time
,[object Object], statistics
,[object Object], concurrent.futures ,[object Object], ThreadPoolExecutor
,[object Object], typing ,[object Object], ,[object Object],, ,[object Object],
,[object Object], ,[object Object],:
,[object Object], ,[object Object],(,[object Object],):
,[object Object],.results = {}
,[object Object], ,[object Object],(,[object Object],):
,[object Object],
time.sleep(duration)
,[object Object], ,[object Object],
,[object Object], ,[object Object], ,[object Object],(,[object Object],):
,[object Object],
,[object Object], asyncio.sleep(duration)
,[object Object], ,[object Object],
,[object Object], ,[object Object],(,[object Object],) -> ,[object Object],[,[object Object],, ,[object Object],]:
,[object Object],
start_time = time.time()
threads = []
results = []
,[object Object], ,[object Object],():
result = ,[object Object],.simulate_device_io()
results.append(result)
,[object Object],
,[object Object], _ ,[object Object], ,[object Object],(num_operations):
thread = threading.Thread(target=worker)
threads.append(thread)
thread.start()
,[object Object],
,[object Object], thread ,[object Object], threads:
thread.join()
end_time = time.time()
,[object Object], {
,[object Object],: end_time - start_time,
,[object Object],: num_operations,
,[object Object],: num_operations / (end_time - start_time),
,[object Object],: ,[object Object],(results)
}
,[object Object], ,[object Object], ,[object Object],(,[object Object],) -> ,[object Object],[,[object Object],, ,[object Object],]:
,[object Object],
start_time = time.time()
,[object Object],
tasks = [
,[object Object],.async_device_io()
,[object Object], _ ,[object Object], ,[object Object],(num_operations)
]
,[object Object],
results = ,[object Object], asyncio.gather(*tasks)
end_time = time.time()
,[object Object], {
,[object Object],: end_time - start_time,
,[object Object],: num_operations,
,[object Object],: num_operations / (end_time - start_time),
,[object Object],: ,[object Object],(results)
}
,[object Object], ,[object Object],(,[object Object],) -> ,[object Object],[,[object Object],, ,[object Object],]:
,[object Object],
start_time = time.time()
,[object Object], ThreadPoolExecutor(max_workers=max_workers) ,[object Object], executor:
futures = [
executor.submit(,[object Object],.simulate_device_io)
,[object Object], _ ,[object Object], ,[object Object],(num_operations)
]
results = [future.result() ,[object Object], future ,[object Object], futures]
end_time = time.time()
,[object Object], {
,[object Object],: end_time - start_time,
,[object Object],: num_operations,
,[object Object],: num_operations / (end_time - start_time),
,[object Object],: ,[object Object],(results)
}
,[object Object], ,[object Object], ,[object Object],(,[object Object],):
,[object Object],
test_sizes = [,[object Object],, ,[object Object],, ,[object Object],, ,[object Object],]
,[object Object], size ,[object Object], test_sizes:
,[object Object],(,[object Object],)
,[object Object],
threading_result = ,[object Object],.benchmark_threading(size)
,[object Object],(,[object Object],)
,[object Object],
async_result = ,[object Object], ,[object Object],.benchmark_async(size)
,[object Object],(,[object Object],)
,[object Object],
pool_result = ,[object Object],.benchmark_thread_pool(size)
,[object Object],(,[object Object],)
,[object Object],
max_ops = ,[object Object],(
threading_result[,[object Object],],
async_result[,[object Object],],
pool_result[,[object Object],]
)
,[object Object],(,[object Object],)
,[object Object],
,[object Object],
,[object Object],
Best Practices and Architecture Decisions
Decision Matrix for Concurrency Approach
Scenario | Threading | Async/Await | Multiprocessing | Hybrid |
---|---|---|---|---|
Serial Communication | ✓ | ✗ | ✓ | ✓ |
Network/HTTP APIs | ✓ | ✓✓ | ✓ | ✓✓ |
CPU-Intensive Processing | ✗ | ✗ | ✓✓ | ✓ |
Real-Time Response | ✓ | ✓✓ | ✓ | ✓✓ |
Memory Efficiency | ✓ | ✓✓ | ✗ | ✓ |
Complexity | ✓ | ✗ | ✗ | ✗ |
Architecture Recommendations
-
Use Async/Await When:
- Primarily network-based device communication
- Need maximum I/O concurrency
- Memory usage is critical
- Team has async experience
-
Use Threading When:
- Mixed blocking/non-blocking operations
- Integrating with synchronous libraries
- Simpler debugging requirements
- Legacy code integration
-
Use Multiprocessing When:
- CPU-intensive tasks (video processing, complex calculations)
- True parallelism required
- GIL limitations are problematic
- Fault isolation needed
-
Use Hybrid Approach When:
- Diverse device types (serial + network)
- Performance optimization needed
- Gradual migration from synchronous code
- Maximum flexibility required
This comprehensive guide provides the foundation for implementing robust, concurrent AV control systems in Python. Choose the approach that best matches your specific requirements, device types, and team expertise. Remember that the best solution often combines multiple concurrency models to leverage the strengths of each approach while mitigating their individual limitations.