bpo-22367: Add tests for fcntl.lockf(). (GH-17010) · python/cpython@befa032
@@ -5,14 +5,14 @@
55import struct
66import sys
77import unittest
8+from multiprocessing import Process
89from test.support import (verbose, TESTFN, unlink, run_unittest, import_module,
910cpython_only)
10111112# Skip test if no fcntl module.
1213fcntl = import_module('fcntl')
1314141515-# TODO - Write tests for flock() and lockf().
16161717def get_lockdata():
1818try:
@@ -138,6 +138,33 @@ def test_flock(self):
138138self.assertRaises(ValueError, fcntl.flock, -1, fcntl.LOCK_SH)
139139self.assertRaises(TypeError, fcntl.flock, 'spam', fcntl.LOCK_SH)
140140141+def test_lockf_exclusive(self):
142+self.f = open(TESTFN, 'wb+')
143+cmd = fcntl.LOCK_EX | fcntl.LOCK_NB
144+def try_lockf_on_other_process():
145+self.assertRaises(BlockingIOError, fcntl.lockf, self.f, cmd)
146+147+fcntl.lockf(self.f, cmd)
148+p = Process(target=try_lockf_on_other_process)
149+p.start()
150+p.join()
151+fcntl.lockf(self.f, fcntl.LOCK_UN)
152+self.assertEqual(p.exitcode, 0)
153+154+def test_lockf_share(self):
155+self.f = open(TESTFN, 'wb+')
156+cmd = fcntl.LOCK_SH | fcntl.LOCK_NB
157+def try_lockf_on_other_process():
158+fcntl.lockf(self.f, cmd)
159+fcntl.lockf(self.f, fcntl.LOCK_UN)
160+161+fcntl.lockf(self.f, cmd)
162+p = Process(target=try_lockf_on_other_process)
163+p.start()
164+p.join()
165+fcntl.lockf(self.f, fcntl.LOCK_UN)
166+self.assertEqual(p.exitcode, 0)
167+141168@cpython_only
142169def test_flock_overflow(self):
143170import _testcapi