Skip to content

k3portlock

Action-CI Documentation Status Package

Cross-process lock using TCP port binding.

k3portlock is a component of pykit3 project: a python3 toolkit set.

Installation

pip install k3portlock

Quick Start

from k3portlock import Portlock, PortlockTimeout

# Using context manager (recommended)
with Portlock('my-resource', timeout=10):
    # Critical section - only one process can be here at a time
    print("Got the lock!")

# Manual acquire/release
lock = Portlock('another-resource')
try:
    lock.acquire()
    # Do work...
finally:
    lock.release()

# Handle timeout
try:
    with Portlock('busy-resource', timeout=5):
        pass
except PortlockTimeout:
    print("Could not acquire lock within 5 seconds")

API Reference

k3portlock

k3portlock is a cross-process lock that is implemented with tcp port binding. Since no two processes could bind on a same TCP port.

k3portlock tries to bind 3 ports on loopback ip 127.0.0.1. If a Portlock instance succeeds on binding 2 ports out of 3, it is considered this instance has acquired the lock.

Portlock

Bases: object

A lock instance. Portlock is thread safe. It is OK to create just one lock in a process for all threads.

Source code in k3portlock/portlock.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
class Portlock(object):
    """
    A lock instance.
    Portlock is thread safe.
    It is OK to create just one lock in a process for all threads.
    """

    def __init__(self, key, timeout=1, sleep_time=None):
        """
        `Portlock` supports `with` statement.

        When entering a `with` statement of `Portlock` instance it invokes `acquire()`
        automatically.

        And when leaving `with` block, `release()` will be called to release the lock.
        :param key: is a string as lock key.`key` will be hashed to a certain port
        :param timeout: is the max time in second to wait to acquire the lock.
        it raises an `portlock.PortlockTimeout` exception.
        :param sleep_time: is the time in second between every two attempts to bind a port.
        """
        self.key = key
        self.addr = str_to_addr(key)
        self.timeout = timeout
        self.sleep_time = sleep_time or DEFAULT_SLEEP_TIME
        self.socks = [None] * PORT_N

        self.thread_lock = threading.RLock()

    def try_lock(self):
        self.thread_lock.acquire()

        try:
            self._lock()

            if self.has_locked():
                return True
            else:
                self.socks = [None] * PORT_N
                self.thread_lock.release()
                return False

        except Exception:
            self.thread_lock.release()
            raise

    def has_locked(self):
        """
        It checks if this instances has the lock.
        :return: `True` if it has the lock.
        """
        if OS == "Linux":
            return self.socks[0] is not None

        # other OS

        return len([x for x in self.socks if x is not None]) > len(self.socks) / 2

    def acquire(self):
        """
        It tries to acquire the lock before `timeout`.
        :return: nothing
        """
        t0 = time.time()

        while True:
            if self.try_lock():
                return

            now = time.time()
            left = t0 + self.timeout - now
            if left > 0:
                slp = min([self.sleep_time, left + 0.001])
                time.sleep(slp)
            else:
                raise PortlockTimeout("portlock timeout: " + repr(self.key), self.key)

    def release(self):
        """
        It releases the lock it holds, or does nothing if it does not hold the lock.
        :return: nothing
        """
        if not self.has_locked():
            return

        for sock in self.socks:
            if sock is not None:
                try:
                    sock.close()
                except Exception:
                    pass

        self.socks = [None] * PORT_N

        self.thread_lock.release()

    def _lock(self):
        if OS == "Linux":
            so = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            try:
                addr = "\0/portlock/" + self.key
                so.bind(addr)
                self.socks[0] = so
                logger.debug("success to bind: {addr}".format(addr=addr))
            except socket.error as e:
                if e.errno == errno.EADDRINUSE:
                    logger.debug("failure to bind: {addr}".format(addr=addr))
                else:
                    raise

            return

        # other OS

        for i in range(len(self.socks)):
            addr = (self.addr[0], self.addr[1] + i)

            so = self._socket()

            try:
                so.bind(addr)
                self.socks[i] = so
                logger.debug("success to bind: {addr}".format(addr=addr))
            except socket.error as e:
                if e.errno == errno.EADDRINUSE:
                    logger.debug("failure to bind: {addr}".format(addr=addr))
                else:
                    raise

    def _socket(self):
        return socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    def __enter__(self):
        self.acquire()
        return self

    def __exit__(self, typ, value, traceback):
        self.release()

    def __del__(self):
        self.release()

__init__(key, timeout=1, sleep_time=None)

Portlock supports with statement.

When entering a with statement of Portlock instance it invokes acquire() automatically.

And when leaving with block, release() will be called to release the lock. :param key: is a string as lock key.key will be hashed to a certain port :param timeout: is the max time in second to wait to acquire the lock. it raises an portlock.PortlockTimeout exception. :param sleep_time: is the time in second between every two attempts to bind a port.

Source code in k3portlock/portlock.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(self, key, timeout=1, sleep_time=None):
    """
    `Portlock` supports `with` statement.

    When entering a `with` statement of `Portlock` instance it invokes `acquire()`
    automatically.

    And when leaving `with` block, `release()` will be called to release the lock.
    :param key: is a string as lock key.`key` will be hashed to a certain port
    :param timeout: is the max time in second to wait to acquire the lock.
    it raises an `portlock.PortlockTimeout` exception.
    :param sleep_time: is the time in second between every two attempts to bind a port.
    """
    self.key = key
    self.addr = str_to_addr(key)
    self.timeout = timeout
    self.sleep_time = sleep_time or DEFAULT_SLEEP_TIME
    self.socks = [None] * PORT_N

    self.thread_lock = threading.RLock()

acquire()

It tries to acquire the lock before timeout. :return: nothing

Source code in k3portlock/portlock.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def acquire(self):
    """
    It tries to acquire the lock before `timeout`.
    :return: nothing
    """
    t0 = time.time()

    while True:
        if self.try_lock():
            return

        now = time.time()
        left = t0 + self.timeout - now
        if left > 0:
            slp = min([self.sleep_time, left + 0.001])
            time.sleep(slp)
        else:
            raise PortlockTimeout("portlock timeout: " + repr(self.key), self.key)

has_locked()

It checks if this instances has the lock. :return: True if it has the lock.

Source code in k3portlock/portlock.py
80
81
82
83
84
85
86
87
88
89
90
def has_locked(self):
    """
    It checks if this instances has the lock.
    :return: `True` if it has the lock.
    """
    if OS == "Linux":
        return self.socks[0] is not None

    # other OS

    return len([x for x in self.socks if x is not None]) > len(self.socks) / 2

release()

It releases the lock it holds, or does nothing if it does not hold the lock. :return: nothing

Source code in k3portlock/portlock.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def release(self):
    """
    It releases the lock it holds, or does nothing if it does not hold the lock.
    :return: nothing
    """
    if not self.has_locked():
        return

    for sock in self.socks:
        if sock is not None:
            try:
                sock.close()
            except Exception:
                pass

    self.socks = [None] * PORT_N

    self.thread_lock.release()

PortlockError

Bases: Exception

Super class of all Portlock exceptions.

Source code in k3portlock/portlock.py
19
20
21
22
23
24
class PortlockError(Exception):
    """
    Super class of all Portlock exceptions.
    """

    pass

PortlockTimeout

Bases: PortlockError

Timeout when waiting to acquire the lock.

Source code in k3portlock/portlock.py
27
28
29
30
31
32
class PortlockTimeout(PortlockError):
    """
    Timeout when waiting to acquire the lock.
    """

    pass

License

The MIT License (MIT) - Copyright (c) 2015 Zhang Yanpo (张炎泼)