mirror of
https://github.com/zebrajr/ansible.git
synced 2025-12-06 12:19:53 +01:00
39 lines
1.0 KiB
Python
39 lines
1.0 KiB
Python
from __future__ import annotations
|
|
|
|
import time
|
|
import threading
|
|
import os
|
|
|
|
from concurrent.futures import ThreadPoolExecutor, wait
|
|
|
|
from ansible._internal import _locking
|
|
|
|
|
|
def test_blocking_lock(tmp_path: str) -> None:
|
|
"""Validate synchronization behavior of `named_mutex`."""
|
|
work_count = 0
|
|
running_threads: set[int] = set()
|
|
|
|
def exercise_lock() -> None:
|
|
nonlocal work_count
|
|
|
|
with _locking.named_mutex(os.path.join(tmp_path, "ansible-core")):
|
|
thread_id = threading.current_thread().native_id
|
|
|
|
try:
|
|
assert not running_threads
|
|
|
|
running_threads.add(thread_id)
|
|
work_count += 1
|
|
time.sleep(0.1)
|
|
finally:
|
|
running_threads.remove(thread_id)
|
|
|
|
with ThreadPoolExecutor(max_workers=10) as executor:
|
|
jobs = [executor.submit(exercise_lock) for _idx in range(10)]
|
|
|
|
wait(jobs, 5)
|
|
|
|
assert all(job.result() is None for job in jobs)
|
|
assert work_count == 10
|