locks: llnl.util.lock now only writes host info when in debug mode
- write locks previously wrote information about the lock holder (host and pid), and read locks woudl read this in. - This is really only for debugging, so only enable it then - add some tests that target debug info, and improve multiproc lock test output
This commit is contained in:
parent
b9af52a888
commit
ab794fa741
@ -51,7 +51,7 @@ class Lock(object):
|
|||||||
is enabled) and recent NFS versions.
|
is enabled) and recent NFS versions.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, path, start=0, length=0):
|
def __init__(self, path, start=0, length=0, debug=False):
|
||||||
"""Construct a new lock on the file at ``path``.
|
"""Construct a new lock on the file at ``path``.
|
||||||
|
|
||||||
By default, the lock applies to the whole file. Optionally,
|
By default, the lock applies to the whole file. Optionally,
|
||||||
@ -72,7 +72,10 @@ def __init__(self, path, start=0, length=0):
|
|||||||
self._start = start
|
self._start = start
|
||||||
self._length = length
|
self._length = length
|
||||||
|
|
||||||
# PID and host of lock holder
|
# enable debug mode
|
||||||
|
self.debug = debug
|
||||||
|
|
||||||
|
# PID and host of lock holder (only used in debug mode)
|
||||||
self.pid = self.old_pid = None
|
self.pid = self.old_pid = None
|
||||||
self.host = self.old_host = None
|
self.host = self.old_host = None
|
||||||
|
|
||||||
@ -118,12 +121,14 @@ def _lock(self, op, timeout=_default_timeout):
|
|||||||
fcntl.lockf(self._file, op | fcntl.LOCK_NB,
|
fcntl.lockf(self._file, op | fcntl.LOCK_NB,
|
||||||
self._length, self._start, os.SEEK_SET)
|
self._length, self._start, os.SEEK_SET)
|
||||||
|
|
||||||
|
# help for debugging distributed locking
|
||||||
|
if self.debug:
|
||||||
# All locks read the owner PID and host
|
# All locks read the owner PID and host
|
||||||
self._read_lock_data()
|
self._read_debug_data()
|
||||||
|
|
||||||
# Exclusive locks write their PID/host
|
# Exclusive locks write their PID/host
|
||||||
if op == fcntl.LOCK_EX:
|
if op == fcntl.LOCK_EX:
|
||||||
self._write_lock_data()
|
self._write_debug_data()
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -148,15 +153,19 @@ def _ensure_parent_directory(self):
|
|||||||
e.errno == errno.EISDIR):
|
e.errno == errno.EISDIR):
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def _read_lock_data(self):
|
def _read_debug_data(self):
|
||||||
"""Read PID and host data out of the file if it is there."""
|
"""Read PID and host data out of the file if it is there."""
|
||||||
|
self.old_pid = self.pid
|
||||||
|
self.old_host = self.host
|
||||||
|
|
||||||
line = self._file.read()
|
line = self._file.read()
|
||||||
if line:
|
if line:
|
||||||
pid, host = line.strip().split(',')
|
pid, host = line.strip().split(',')
|
||||||
_, _, self.pid = pid.rpartition('=')
|
_, _, self.pid = pid.rpartition('=')
|
||||||
_, _, self.host = host.rpartition('=')
|
_, _, self.host = host.rpartition('=')
|
||||||
|
self.pid = int(self.pid)
|
||||||
|
|
||||||
def _write_lock_data(self):
|
def _write_debug_data(self):
|
||||||
"""Write PID and host data to the file, recording old values."""
|
"""Write PID and host data to the file, recording old values."""
|
||||||
self.old_pid = self.pid
|
self.old_pid = self.pid
|
||||||
self.old_host = self.host
|
self.old_host = self.host
|
||||||
|
@ -62,13 +62,14 @@
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
import os
|
import os
|
||||||
|
import socket
|
||||||
import shutil
|
import shutil
|
||||||
import tempfile
|
import tempfile
|
||||||
import traceback
|
import traceback
|
||||||
import glob
|
import glob
|
||||||
import getpass
|
import getpass
|
||||||
from contextlib import contextmanager
|
from contextlib import contextmanager
|
||||||
from multiprocessing import Process
|
from multiprocessing import Process, Queue
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
@ -201,17 +202,21 @@ def lock_path(lock_dir):
|
|||||||
os.unlink(lock_file)
|
os.unlink(lock_file)
|
||||||
|
|
||||||
|
|
||||||
def local_multiproc_test(*functions):
|
def local_multiproc_test(*functions, **kwargs):
|
||||||
"""Order some processes using simple barrier synchronization."""
|
"""Order some processes using simple barrier synchronization."""
|
||||||
b = mp.Barrier(len(functions), timeout=barrier_timeout)
|
b = mp.Barrier(len(functions), timeout=barrier_timeout)
|
||||||
procs = [Process(target=f, args=(b,)) for f in functions]
|
|
||||||
|
args = (b,) + tuple(kwargs.get('extra_args', ()))
|
||||||
|
procs = [Process(target=f, args=args, name=f.__name__)
|
||||||
|
for f in functions]
|
||||||
|
|
||||||
for p in procs:
|
for p in procs:
|
||||||
p.start()
|
p.start()
|
||||||
|
|
||||||
for p in procs:
|
for p in procs:
|
||||||
p.join()
|
p.join()
|
||||||
assert p.exitcode == 0
|
|
||||||
|
assert all(p.exitcode == 0 for p in procs)
|
||||||
|
|
||||||
|
|
||||||
def mpi_multiproc_test(*functions):
|
def mpi_multiproc_test(*functions):
|
||||||
@ -907,3 +912,74 @@ def do_write_with_exception(exit_fn):
|
|||||||
assert vals['exception']
|
assert vals['exception']
|
||||||
assert not vals['exited_fn']
|
assert not vals['exited_fn']
|
||||||
assert not vals['exception_fn']
|
assert not vals['exception_fn']
|
||||||
|
|
||||||
|
|
||||||
|
def test_lock_debug_output(lock_path):
|
||||||
|
host = socket.getfqdn()
|
||||||
|
|
||||||
|
def p1(barrier, q1, q2):
|
||||||
|
# exchange pids
|
||||||
|
p1_pid = os.getpid()
|
||||||
|
q1.put(p1_pid)
|
||||||
|
p2_pid = q2.get()
|
||||||
|
|
||||||
|
# set up lock
|
||||||
|
lock = lk.Lock(lock_path, debug=True)
|
||||||
|
|
||||||
|
with lk.WriteTransaction(lock):
|
||||||
|
# p1 takes write lock and writes pid/host to file
|
||||||
|
barrier.wait() # ------------------------------------ 1
|
||||||
|
|
||||||
|
assert lock.pid == p1_pid
|
||||||
|
assert lock.host == host
|
||||||
|
|
||||||
|
# wait for p2 to verify contents of file
|
||||||
|
barrier.wait() # ---------------------------------------- 2
|
||||||
|
|
||||||
|
# wait for p2 to take a write lock
|
||||||
|
barrier.wait() # ---------------------------------------- 3
|
||||||
|
|
||||||
|
# verify pid/host info again
|
||||||
|
with lk.ReadTransaction(lock):
|
||||||
|
assert lock.old_pid == p1_pid
|
||||||
|
assert lock.old_host == host
|
||||||
|
|
||||||
|
assert lock.pid == p2_pid
|
||||||
|
assert lock.host == host
|
||||||
|
|
||||||
|
barrier.wait() # ---------------------------------------- 4
|
||||||
|
|
||||||
|
def p2(barrier, q1, q2):
|
||||||
|
# exchange pids
|
||||||
|
p2_pid = os.getpid()
|
||||||
|
p1_pid = q1.get()
|
||||||
|
q2.put(p2_pid)
|
||||||
|
|
||||||
|
# set up lock
|
||||||
|
lock = lk.Lock(lock_path, debug=True)
|
||||||
|
|
||||||
|
# p1 takes write lock and writes pid/host to file
|
||||||
|
barrier.wait() # ---------------------------------------- 1
|
||||||
|
|
||||||
|
# verify that p1 wrote information to lock file
|
||||||
|
with lk.ReadTransaction(lock):
|
||||||
|
assert lock.pid == p1_pid
|
||||||
|
assert lock.host == host
|
||||||
|
|
||||||
|
barrier.wait() # ---------------------------------------- 2
|
||||||
|
|
||||||
|
# take a write lock on the file and verify pid/host info
|
||||||
|
with lk.WriteTransaction(lock):
|
||||||
|
assert lock.old_pid == p1_pid
|
||||||
|
assert lock.old_host == host
|
||||||
|
|
||||||
|
assert lock.pid == p2_pid
|
||||||
|
assert lock.host == host
|
||||||
|
|
||||||
|
barrier.wait() # ------------------------------------ 3
|
||||||
|
|
||||||
|
# wait for p1 to verify pid/host info
|
||||||
|
barrier.wait() # ---------------------------------------- 4
|
||||||
|
|
||||||
|
q1, q2 = Queue(), Queue()
|
||||||
|
local_multiproc_test(p2, p1, extra_args=(q1, q2))
|
||||||
|
Loading…
Reference in New Issue
Block a user