k3portlock

Cross-process lock using TCP port binding.
k3portlock is a component of pykit3 project: a python3 toolkit set.
Installation
Quick Start
from k3portlock import Portlock, PortlockTimeout
# Using context manager (recommended)
with Portlock('my-resource', timeout=10):
# Critical section - only one process can be here at a time
print("Got the lock!")
# Manual acquire/release
lock = Portlock('another-resource')
try:
lock.acquire()
# Do work...
finally:
lock.release()
# Handle timeout
try:
with Portlock('busy-resource', timeout=5):
pass
except PortlockTimeout:
print("Could not acquire lock within 5 seconds")
API Reference
k3portlock
k3portlock is a cross-process lock that is implemented with tcp port binding.
Since no two processes could bind on a same TCP port.
k3portlock tries to bind 3 ports on loopback ip 127.0.0.1.
If a Portlock instance succeeds on binding 2 ports out of 3,
it is considered this instance has acquired the lock.
Portlock
Bases: object
A lock instance.
Portlock is thread safe.
It is OK to create just one lock in a process for all threads.
Source code in k3portlock/portlock.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174 | class Portlock(object):
"""
A lock instance.
Portlock is thread safe.
It is OK to create just one lock in a process for all threads.
"""
def __init__(self, key, timeout=1, sleep_time=None):
"""
`Portlock` supports `with` statement.
When entering a `with` statement of `Portlock` instance it invokes `acquire()`
automatically.
And when leaving `with` block, `release()` will be called to release the lock.
:param key: is a string as lock key.`key` will be hashed to a certain port
:param timeout: is the max time in second to wait to acquire the lock.
it raises an `portlock.PortlockTimeout` exception.
:param sleep_time: is the time in second between every two attempts to bind a port.
"""
self.key = key
self.addr = str_to_addr(key)
self.timeout = timeout
self.sleep_time = sleep_time or DEFAULT_SLEEP_TIME
self.socks = [None] * PORT_N
self.thread_lock = threading.RLock()
def try_lock(self):
self.thread_lock.acquire()
try:
self._lock()
if self.has_locked():
return True
else:
self.socks = [None] * PORT_N
self.thread_lock.release()
return False
except Exception:
self.thread_lock.release()
raise
def has_locked(self):
"""
It checks if this instances has the lock.
:return: `True` if it has the lock.
"""
if OS == "Linux":
return self.socks[0] is not None
# other OS
return len([x for x in self.socks if x is not None]) > len(self.socks) / 2
def acquire(self):
"""
It tries to acquire the lock before `timeout`.
:return: nothing
"""
t0 = time.time()
while True:
if self.try_lock():
return
now = time.time()
left = t0 + self.timeout - now
if left > 0:
slp = min([self.sleep_time, left + 0.001])
time.sleep(slp)
else:
raise PortlockTimeout("portlock timeout: " + repr(self.key), self.key)
def release(self):
"""
It releases the lock it holds, or does nothing if it does not hold the lock.
:return: nothing
"""
if not self.has_locked():
return
for sock in self.socks:
if sock is not None:
try:
sock.close()
except Exception:
pass
self.socks = [None] * PORT_N
self.thread_lock.release()
def _lock(self):
if OS == "Linux":
so = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
try:
addr = "\0/portlock/" + self.key
so.bind(addr)
self.socks[0] = so
logger.debug("success to bind: {addr}".format(addr=addr))
except socket.error as e:
if e.errno == errno.EADDRINUSE:
logger.debug("failure to bind: {addr}".format(addr=addr))
else:
raise
return
# other OS
for i in range(len(self.socks)):
addr = (self.addr[0], self.addr[1] + i)
so = self._socket()
try:
so.bind(addr)
self.socks[i] = so
logger.debug("success to bind: {addr}".format(addr=addr))
except socket.error as e:
if e.errno == errno.EADDRINUSE:
logger.debug("failure to bind: {addr}".format(addr=addr))
else:
raise
def _socket(self):
return socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def __enter__(self):
self.acquire()
return self
def __exit__(self, typ, value, traceback):
self.release()
def __del__(self):
self.release()
|
__init__(key, timeout=1, sleep_time=None)
Portlock supports with statement.
When entering a with statement of Portlock instance it invokes acquire()
automatically.
And when leaving with block, release() will be called to release the lock.
:param key: is a string as lock key.key will be hashed to a certain port
:param timeout: is the max time in second to wait to acquire the lock.
it raises an portlock.PortlockTimeout exception.
:param sleep_time: is the time in second between every two attempts to bind a port.
Source code in k3portlock/portlock.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 | def __init__(self, key, timeout=1, sleep_time=None):
"""
`Portlock` supports `with` statement.
When entering a `with` statement of `Portlock` instance it invokes `acquire()`
automatically.
And when leaving `with` block, `release()` will be called to release the lock.
:param key: is a string as lock key.`key` will be hashed to a certain port
:param timeout: is the max time in second to wait to acquire the lock.
it raises an `portlock.PortlockTimeout` exception.
:param sleep_time: is the time in second between every two attempts to bind a port.
"""
self.key = key
self.addr = str_to_addr(key)
self.timeout = timeout
self.sleep_time = sleep_time or DEFAULT_SLEEP_TIME
self.socks = [None] * PORT_N
self.thread_lock = threading.RLock()
|
acquire()
It tries to acquire the lock before timeout.
:return: nothing
Source code in k3portlock/portlock.py
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109 | def acquire(self):
"""
It tries to acquire the lock before `timeout`.
:return: nothing
"""
t0 = time.time()
while True:
if self.try_lock():
return
now = time.time()
left = t0 + self.timeout - now
if left > 0:
slp = min([self.sleep_time, left + 0.001])
time.sleep(slp)
else:
raise PortlockTimeout("portlock timeout: " + repr(self.key), self.key)
|
has_locked()
It checks if this instances has the lock.
:return: True if it has the lock.
Source code in k3portlock/portlock.py
80
81
82
83
84
85
86
87
88
89
90 | def has_locked(self):
"""
It checks if this instances has the lock.
:return: `True` if it has the lock.
"""
if OS == "Linux":
return self.socks[0] is not None
# other OS
return len([x for x in self.socks if x is not None]) > len(self.socks) / 2
|
release()
It releases the lock it holds, or does nothing if it does not hold the lock.
:return: nothing
Source code in k3portlock/portlock.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 | def release(self):
"""
It releases the lock it holds, or does nothing if it does not hold the lock.
:return: nothing
"""
if not self.has_locked():
return
for sock in self.socks:
if sock is not None:
try:
sock.close()
except Exception:
pass
self.socks = [None] * PORT_N
self.thread_lock.release()
|
PortlockError
Bases: Exception
Super class of all Portlock exceptions.
Source code in k3portlock/portlock.py
| class PortlockError(Exception):
"""
Super class of all Portlock exceptions.
"""
pass
|
PortlockTimeout
Bases: PortlockError
Timeout when waiting to acquire the lock.
Source code in k3portlock/portlock.py
| class PortlockTimeout(PortlockError):
"""
Timeout when waiting to acquire the lock.
"""
pass
|
License
The MIT License (MIT) - Copyright (c) 2015 Zhang Yanpo (张炎泼)