Probably you have run into the problem of wanting to update a plot while acquiring a signal, but finding that Python is busy during the acquisition. This happens, for example, when functions or methods take long to execute and you can't regain control until it is done. Python has at least two different ways of solving this issue, one is the threading module and the other is the multiprocessing module. They look the same but are fundamentally different, and therefore you need to understand their differences in order to decide when to use one or the other.
A Simple Measurement Class
First, let's build a simple measurement class to simulate what you would normally find when performing an experiment. Imagine that you would like to measure Ohm's law, i.e. the current that flows through a circuit as a function of the voltage applied to it. We can create a class for the measurement and within it, we can define a method to gather results. Later it will become clear why we use a class instead of just a function:
import numpy as np
from time import sleep
class OhmLaw:
def make_measurement(self, start, stop, num_points, delay):
x_axis = np.linspace(start, stop, num_points)
data = []
for i in x_axis:
# Acquire fake data
data.append(np.random.random())
sleep(delay)
return data
This is a toy example, in which the data is randomly generated and we are not really using the input parameters at all, but it shouldn't be too hard to relate this to what you normally do in your lab. If you want to run the code, you can do the following:
ohm = OhmLaw()
result = ohm.make_measurement(0,1,11,1)
print(result)
As you can see, as soon as the code reaches the highlighted line, it will hang until all the data is generated. This is a very common scenario when the measurement is actively controlled by the computer. In the example above you may wait for 10 seconds, which is not too bad, but normally you would like to see the progress of your experiment in order to decide how to continue.
Running the measurement in a non-blocking way
The first way of solving the issue is to run the measurement in the
background. This will allow you to continue with the execution of the
code after you have called make_measurement
.
Note
When you start dealing with threads, you will find yourself in the situation of having a program that is running an infinite loop and therefore it will never finish. Ctrl + C is your best friend to stop the execution.
The easiest way to achieve this behavior in Python is by using the threading module. Let's first see how to implement our solution and then we can go into the details.
import threading
ohm = OhmLaw()
t = threading.Thread(target=ohm.make_measurement, args=(0,1,11,1))
t.start()
print('Triggered measurement')
If you run the code above, what you will see is that right after
starting the thread, the print
statement is executed. You will also
notice that the program, even if it reached the end, is waiting for the
thread t
to be complete before exiting. We can add a bit more of
action in order to realize what is happening:
import threading
ohm = OhmLaw()
t = threading.Thread(target=ohm.make_measurement, args=(0,1,11,1))
t.start()
print('Triggered measurement')
i = 0
while t.is_alive():
i += 1
print('Acquiring {}\r'.format('.'*i), end=' ')
sleep(0.5)
If you run the code, you will see on the screen the message 'Acquiring'
with an incrementing number of dots. If you add a print statement to the
make_measurement
method, you will see that it gets interleaved into
the output. You can already see that there are two different tasks
running at the same time. On one hand, you have the make_measurement
method that takes longer to run, on the other you are refreshing the
screen every half a second. But it is time to learn a bit more about
what are the threads we have just created.
What are Threads
A crucial component of every computer is its processor. It is the piece of hardware that makes all the calculations and decisions. You probably know that the amount of computations per unit of time that a processor can perform is limited. That is why some programs take longer to open, or complex code takes longer to complete. However, you may have noticed that on your computer several programs can be performing tasks simultaneously. This is thanks to the operating system, which iterates through different programs in order to keep them all responding.
Within Python, the same functionality can be achieved. Each thread is
nothing more than a Python program interpreter running specific tasks.
Each program will have a main thread and you may spawn child threads
from within it, as you have seen above. This means that in the line
where you define threading.Thread
, what you are actually doing is
creating a new python interpreter within your own program, and that
interpreter will be running the method make_measurement
with the given
arguments.
Plotting Results During Acquisition
So far, the only thing we have done is to print to screen that the
acquisition is happening. However, the results of the measurement are
lost, we don't plot nor save them after the program finishes. Now is the
time when we can exploit the use of a class instead of a simple
function. Remember that the core objective of using classes is to
preserve state, exactly what we want to do. We can improve OhmLaw
like
this:
class OhmLaw:
def __init__(self):
self.data = np.zeros(0) # To store the data of the measurement
self.step = 0 # To keep track of the step
def make_measurement(self, start, stop, num_points, delay):
x_axis = np.linspace(start, stop, num_points)
self.data = np.zeros(num_points)
self.step = 0
for i in x_axis:
# Acquire fake data
self.data[self.step] = np.random.random()
self.step += 1
sleep(delay)
return self.data
What we have done now is to define attributes of OhmLaw
(i.e.,
self.data
and self.step
) that will keep track of the acquisition.
The data is immediately available after it has been generated, and
therefore we can change how we trigger the measurement, for example:
import threading
ohm = OhmLaw()
t = threading.Thread(target=ohm.make_measurement, args=(0,1,11,1))
t.start()
print('Triggered measurement')
i = ohm.step
while t.is_alive():
if i != ohm.step:
print('Latest data value: {}'.format(ohm.data[ohm.step-1]))
i = ohm.step
The first few lines are the same, but what we are changing is the
while
loop. First, we check if the step
we are measuring is
different from the last step we saw. If it is different, then we get the
latest data point. Remember that, since the step is incremented right
after the acquisition, we should retrieve data[ohm.step-1]
or we would
be ahead one data point.
As you can see, the while
loop doesn't have any kind of delay, as soon
as a new data point is detected, it will be fetched. If you change the
delay
for make_measurement
you will see that the printing to the
screen is also altered. This may not be exactly the behavior that you
want. In our case, poking the ohm.step
is fast, but it may be that you
have to communicate to a device to see if there are new data points and
perhaps you don't want to do that as fast as possible but after a
certain interval. The code would become:
import threading
ohm = OhmLaw()
t = threading.Thread(target=ohm.make_measurement, args=(0,1,11,1))
t.start()
print('Triggered measurement')
i = ohm.step
while t.is_alive():
if i != ohm.step:
print('Number of points acquired: {}'.format(ohm.step-1))
i = ohm.step
sleep(2)
As simple as that, now you are checking the ohm.step
attribute only
once every two seconds. If you start playing around you will see a lot
of different behaviors. For example, you will notice that you may lose
the last few steps of the measurement if the refresh rate is not fast
enough, etc. All these considerations are natural when you start dealing
with threads and actions happening simultaneously.
Multiple Threads
If you are of a curious type, probably you are wondering if you could start as many threads as you like. In principle the answer is yes, you are not limited to only one. In fact, when you start a thread, it is technically the second one running, since the main thread is the one that holds the code. Imagine that you want to start a second measurement, you can do:
meas_1 = threading.Thread(target=ohm.make_measurement, args=(0, 1, 11, 1))
meas_1.start()
meas_2 = threading.Thread(target=ohm.make_measurement, args=(0, 1, 20, 1))
meas_2.start()
If you run the code above, you will have two threads, one called
meas_1
and the other meas_2
, however they share the same data
and
step
attribute in the object ohm
. Every time a data point is
generated, it will overwrite the value acquired in the other thread. If
you were dealing with a real device, it would become much worse, because
you would be trying to set two different output voltages on the same
device at the same time.
There are different ways around this, the first one is altering the
method make_measurement
in order to allow only one execution at a
time. This can be done by checking if an attribute running
is set to
True
or not. For example:
class OhmLaw:
def __init__(self):
self.data = np.zeros(0) # To store the data of the measurement
self.step = 0 # To keep track of the step
self.running = False
def make_measurement(self, start, stop, num_points, delay):
if self.running:
raise Exception("Can't trigger two measurements at the same time")
x_axis = np.linspace(start, stop, num_points)
self.data = np.zeros(num_points)
self.step = 0
self.running = True
for i in x_axis:
# Acquire fake data
self.data[self.step] = np.random.random()
self.step += 1
sleep(delay)
self.running = False
return self.data
The main changes here are that we set the attribute running
to False
when we instantiate the class. Then, when we trigger the
make_measurement
method, we check if running is set or not. If it is
set, we raise an error that will prevent the method to be run again. If
it is not set, we continue as always. Check that before entering into
the time-consuming loop, we set self.running
to True
and we set it
back to Flase
when it is finished. Go ahead and try to run twice the
measurement and you won't be allowed.
It may seem a bit far-fetched, but trying to run the measurement twice is a very common mistake when you have a graphical user interface. Sometimes you don't realize that a measurement is going on and you try to start a new one. Now we know how to avoid triggering twice the same measurement, but there is one big functionality missing: how to stop a measurement.
Stopping a Thread
When you are running a long task, such as acquiring from a device, it
may happen that you need to stop it. For example, you may notice that
something is not right with your data, or you already have sufficient
information to move on and doesn't make sense to wait until the end.
Python doesn't allow you to kill threads, which means that we have to
find a way around it. As you have seen in the examples above, we are
normally exchanging information with the thread through attributes in a
class. This means that we could use the same strategy to stop a thread,
by breaking the loop. The OhmLaw
class will look like:
class OhmLaw:
def __init__(self):
self.data = np.zeros(0) # To store the data of the measurement
self.step = 0 # To keep track of the step
self.running = False
self.stop = False
def make_measurement(self, start, stop, num_points, delay):
if self.running:
raise Exception("Can't trigger two measurements at the same time")
x_axis = np.linspace(start, stop, num_points)
self.data = np.zeros(num_points)
self.step = 0
self.stop = False
self.running = True
for i in x_axis:
if self.stop:
print('Stopping')
break
# Acquire fake data
self.data[self.step] = np.random.random()
self.step += 1
sleep(delay)
self.running = False
return self.data
The highlighted lines point to the changes that we have done in order to stop the loop. Whenever you feel like stopping the acquisition, the only thing you need to do is the following:
ohm.stop = True
And as soon as the last point is generated, the loop will exit without
errors. Since you will have access to ohm.step
you will know exactly
how many data points were acquired, and those will be available in
ohm.data
. At this point, something that should be bugging you is that
we are polluting the OhmLaw
class with attributes and considerations
that are inherent to working with threads. If you were to use the class
in a non-threaded application, the self.stop
, self.running
, etc. are
not useful and are just making the code more complicated.
Subclassing a Thread
One of the many advantages of Python's syntax is that it is very easy to
extend the functionality of any module. In this case, we want to expand
how the Thread
works, by allowing a direct interaction with the
OhmLaw
class. Let's see first how to subclass a Thread
in order to
start personalizing it. In the examples above, we have constructed a
thread and we have called the start
method. When you subclass a
thread, you don't define a start
, but rather a run
method. The
official
documentation
is quite clear:
from threading import Thread
class Worker(Thread):
def __init__(self, target, args=None):
super().__init__()
self.target = target
self.args = args
def run(self):
self.target(*self.args)
The Worker
class works exactly the same as a Thread
. You can replace
the code to run a measurement like this:
meas_1 = Worker(target=ohm.make_measurement, args=(0, 1, 11, 1))
meas_1.start()
And it will behave in the same way as running a normal Thread
.
Remember that the highlighted line is very important in order to inherit
all the functionality from the base class. The main question is why
would you like to have a custom thread instead of using the default.
Imagine that you don't want to raise an error when you trigger a second
measurement, but instead, you want to build a queue of commands to
execute. In that way, you won't find any issues, nor in our simple
example nor when dealing with real devices.
class Worker(Thread):
def __init__(self):
super().__init__()
self.queue = []
self.keep_running = True
def add_to_queue(self, target, args=None):
print('Adding to queue')
self.queue.append((target, args))
def stop_thread(self):
self.keep_running = False
def run(self):
while self.keep_running:
if self.queue:
func, args = self.queue.pop(0)
func(*args)
The Worker
class has now become a useful tool to run several functions
one after the other. The only thing you need to do is to use the method
add_to_queue
with the appropriate arguments. Let's see step by step.
First, we removed the arguments from the __init__
because we don't
need them. We created two attributes, keep_running
that is going to be
used to stop the execution of the thread. You would use it like this:
worker = Worker()
worker.start()
worker.add_to_queue(ohm.make_measurement, args=(0, 1, 11, .1))
worker.add_to_queue(ohm.make_measurement, args=(0, 1, 11, .1))
worker.add_to_queue(ohm.make_measurement, args=(0, 1, 11, .1))
while worker.queue:
print('Queue length: {}'.format(len(worker.queue)))
sleep(1)
worker.stop_thread()
We begin by creating the worker
and starting a separate thread. This
is the reason why we have to do start()
after instantiating it. The
run
method is an infinite loop that will look for elements within the
queue
. If there is a new element, it will get it and it will execute
it. The pop
command is very useful because it retrieves the element in
the first position and deletes that element from the list. As soon as
you add an element to the queue, it will be executed. You could add, for
example, a method for generating data, a method for saving the data,
etc. Remember that if you don't stop the worker
with stop_thread()
the program will never finish, because the worker
is hanging in an
infinite loop.
You can try different things, for example reimplementing the is_alive
method. There are no real limits to how much you can bend and improve
built-ins by subclassing them. A very useful method to be sure that the
thread has finished running is join
. If you use worker.join()
, the
program will block there until the thread is effectively finished.
Using Locks
The example above is already more complicated than what you normally
need to do in the lab. After all, you are in complete control of your
experiment and therefore you know that you shouldn't trigger two
measurements at the same time. However, there are several tools in
threads that at some point may be useful for you and therefore it makes
sense to know, at least, that they exist. One of such tools is locks.
A lock allows you to prevent the execution of code if another thread is
doing something. Let's see how it works. We start with the simple
version of the worker
:
from threading import Thread, Lock
lock = Lock()
class Worker(Thread):
def __init__(self, target, args=None):
super().__init__()
self.target = target
self.args = args
def run(self):
lock.acquire()
self.target(*self.args)
lock.release()
We define a lock
outside of the Worker
, because it needs to be
shared between different instances. The idea of a lock is that it is
open by default. When you do lock.acquire()
you are going to close the
lock. Unless it is already closed, in which case the code will halt in
there waiting until the command lock.release()
is executed. We acquire
the lock just before running the function, i.e. when the start()
is
executed and we release it right after. If you try to run two
measurements, the second will halt until the first one is finished. The
code:
meas_1 = Worker(target=ohm.make_measurement, args=(0, 1, 11, 1))
meas_1.start()
meas_2 = Worker(target=ohm.make_measurement, args=(0, 1, 11, 1))
meas_2.start()
Even if not blocking, because everything was delegated to a thread, will
run only one measurement at a time. This is a neat trick that if you
implement correctly can save you a lot of time checking whether a
specific task is already running or not. Remember that a crucial mistake
happens when, for example, an error appears. If the target function
raises an error, the lock.release()
line will never be executed and
the subsequent threads will never run.
Advantages and Limitations of Threads
Right now, especially if it is your first encounter with threads in
Python, they may look like the solution to all your problems. They are
an amazing tool, relatively easy to implement, there is no argument
against that. One of the main advantages of threads is that the memory
space is shared, and therefore you can use the information stored in the
class OhmLaw
in any thread, even the main thread. This allows you to
monitor the progress, update a plot or even alter the execution of a
method while it is running.
However, we never discussed what happens when the tasks running on
threads are computationally expensive. So far, the methods that we have
been running inside threads were spending more time in a sleep
statement than anything else. This is a normal case for slow
experiments, but as soon as you start doing data analysis while you
acquire, or you generate a lot of data, things are going to get more
complicated. Let's first see an easy example. Computing random numbers
is a relatively expensive task (by expensive I mean computationally).
We can define the following function:
import numpy as np
def calculate_random(number_points):
for i in range(10, number_points):
data = np.random.random(i)
fft = np.fft.fft(data)
return fft
This is an expensive function. We calculate random arrays of variable size and compute their Fourier transform.
from time import time
t0 = time()
d = calculate_random(5000)
print('Total time: {:2.2f} seconds'.format(time()-t0))
If you run the code above, most likely you are going to get something in the order of 10 seconds. Most likely you are working on a multi-core computer. This means that you have different processors available at the same time. If you look at the use of them while the above code is running, you probably will notice that there are only one of the cores being used at 100%, while the others are quite free.
If you were to run the code more than once, for example:
d = calculate_random(5000)
d2 = calculate_random(5000)
...
You will notice that the total time is multiplied by the number of calls
to calculate_random
. This is expected because while the first is
running the program is waiting and when it is done, you execute the
other. Let's see what happens if we run the code in two different
threads:
from time import time
from threading import Thread
t0 = time()
t1 = Thread(target=calculate_random, args=(5000,))
t2 = Thread(target=calculate_random, args=(5000,))
t1.start()
t2.start()
t1.join()
t2.join()
print('Total time: {:2.2f} seconds'.format(time()-t0))
Most likely you will see that even if you are running in two different threads, the time it takes to run is twice as long and if you monitor the processors, you will still see that only one is being used. This happens because Python implemented something called the Global Interpreter Lock, or GIL.
The GIL
The Global Interpreter Lock is responsible for triggering concurrently different parts of the code. As we saw earlier, a lock is a tool that allows you to wait for other processes to finish before you start something new. In Python, this means that when you are running code, there is a default daemon that will make sure that no two different lines are executed at the same time.
Basically what the GIL is doing is similar to what the operating system
does in single-core CPUs. It runs a task for a short time, switches to
another task runs it for a while, switches, etc. On one hand, this
behavior has a computational cost associated with the switching from one
task to another, on the other, it is not equivalent to two tasks running
simultaneously. When the task is not computationally expensive (such as
sleep
), you will see an increase in efficiency. However, when you
start with more complex scenarios where you need to analyze data or save
to disk, etc., you may start finding bottlenecks hard to debug and you
will see that your computer is far from crashing.
The GIL is also responsible for preventing the simultaneous access to the same memory. Imagine that you are updating a value at the same time that you are deleting it from a different thread. You may face several corruption problems if you are not very careful about how you implement your threads.
The main message, therefore, is that threading doesn't allow you to run code in parallel, i.e. in different cores, but it allows you to run tasks in a non-blocking way. The benefits of using Threads are, for example, that you can share the memory and that you don't need to be too careful about how you read or write data into variables. Especially when dealing with normal experiments, threads are going to be more than enough to improve the behavior of your programs.
The Multiprocessing Module
It would be somewhat naïve to settle with the threading module and limit ourselves to one core per computer. Python provides another module called multiprocessing. You can read the details at the official documentation. Fortunately, the way to work with this module is very similar to the way you work with threads. Let's build on the previous example:
from multiprocessing import Process
t0 = time()
t1 = Process(target=calculate_random, args=(5000,))
t2 = Process(target=calculate_random, args=(5000,))
t3 = Process(target=calculate_random, args=(5000,))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
print('Total time: {:2.2f} seconds'.format(time()-t0))
When you run the code above, you will see that all the processors in your computer are engaged. The number of processes that you can spawn is not limited, but normally you shouldn't see an increase in performance once you have as many processes as cores on your computer.
Multiprocessing has, however, a limitation that has to be addressed
carefully: the state is not shared. Therefore, each process will have
access to its own resources, but you can't simply exchange them. For
example, in the experiment, if you start the measurement after you have
created the process, the class would have the same value for
self.running
, meaning that the second time you want to run it, nothing
will stop you.
Sharing Information with Queues
The proper way of exchanging information between processes is to use Queues. When we developed the worker earlier, we used the word queue exactly preparing for this topic. A queue holds information that can be accessed by any thread in a first-in-first-out base. Let's see a simple example:
from multiprocessing import Process, Queue
def move_from_in_to_out(q_in, q_out):
while not q_in.empty():
data = q_in.get()
q_out.put(data)
q_in = Queue()
q_out = Queue()
for i in range(1000):
q_in.put(i)
p = Process(target=move_from_in_to_out, args=(q_in, q_out))
p.start()
p.join()
print('Q_in is empty: {}'.format(q_in.empty()))
while not q_out.empty():
print(q_out.get())
First, we define a function that can work with Queues
, q_in
and
q_out
. In the example, we are just grabbing elements from one and
placing them in the other. To grab an element from a queue you use
get()
and you use put
for the opposite. We populate the q_in
with
some initial values and then we start a process. Once it is finished, we
check that the queue is empty and we print all the elements.
There is nothing really fancy about the example, but it is enough for
getting you started. Of course, different processes can access the same
queue. For example, you could add a second process that does the
opposite, moves from q_out
to q_in
:
p = Process(target=move_from_in_to_out, args=(q_in, q_out))
p2 = Process(target=move_from_in_to_out, args=(q_out, q_in))
Since p2
will not run if q_out
is empty, we should populate it
together with q_in
. Moreover, we can add a new process to monitor
which one of the other two is winning.
def print_len_queue(q_in, q_out):
while not q_in.empty() or not q_out.empty():
space = int(q_in.qsize() / (q_in.qsize() + q_out.qsize()) * 50)
output = str(q_in.qsize())+ '||' + space * ' '+ '|' + (50-space) * ' ' + '||' + str(q_out.qsize()) + '\r'
print(output, end=' ')
p3 = Process(target=print_len_queue, args=(q_in, q_out))
If you start all the processes, what you will see on screen is a vertical bar that moves to the left or to the right, according to which queue is getting full. This is just a toy example, but that already shows how powerful queues are.
Limitation of Queues
Before you get too enthusiastic about queues, there is a fundamental limitation that you may encounter if you work intensively with them, especially when acquiring large volumes of data. I wanted to use queues in order to acquire images from a CCD and stream them to the hard drive, in order to increase the total time that could be acquired before running out of memory. The idea was having a process that would continuously fetch images from a camera and put them into a queue. A second process would fetch them from it and would save them to a file.
However, it is impossible to know how big a queue can be in Python. Allocating memory is not trivial since the queue can hold any type of data. If you monitor the memory available, you will notice that the larges value that you can store varies from execution to execution and therefore you won't be able to predict exactly when you are running out of memory. If you find a solution to this problem, please leave a comment because I am more than intrigued by it.
The only solution that I came up with was to manually limit the amount of memory that the queue can take up based on previous experiences. Once a threshold is surpassed, the program would stop acquiring images until the queue is free. It is not very elegant, but at least it doesn't crash and therefore the data is saved.
Threads and Jupyter
If you are a Jupyter notebook user, you will be very happy to know that threads are compatible with it. Imagine that you are analyzing a large dataset, or that you are performing a measurement from within a cell. It would be ideal to be able to run other cells simultaneously. If you run either a Thread or a Process in one cell, you will be able to continue using your notebook without any problems.
This is very handy if, for example, you are running a simulation and you would like to check the intermediate results. The same steps that we have done at the beginning, with the simulated acquisition of data, can be performed from within Jupyter. I won't cover the details in this article because they deserve a separated entry, but please, play around and leave your experience in the forum.
Conclusions
Being able to run code in non-blocking ways is fundamental in many applications, not only in the lab but also when you are analyzing or simulating data. When you are running tasks that are not computationally expensive but that take longer to complete, you can easily implement threads. In this article, we have covered some of the strategies that you can implement in order to be able to stop the execution of a thread and how to define your custom workers.
When you are trying to increase the efficiency of a computationally expensive program, threading is not going to help you because of the Global Interpreter Lock (GIL). You should, therefore, use the multiprocessing module, which implements a very similar API to the threading module. This makes your code easy to adapt. The main limitation is that the memory between different processes is not shared, and therefore you need to implement extra strategies in order to exchange data. We have covered Queues, but they are not the only ones.
When the complexity of your program increases, you should always check whether the modules you are using are thread-safe or not. Many developers take into account this factor and develop code that can be run also within threads. However, many developers may not have taken into account that their module could be used in this context and therefore you should test it yourself.
Threading is a very exciting way of programming and is compatible also with older Python versions. I find the Threading and the Multiprocessing syntax very clear and very handy for running tasks such as the ones that appear when controlling a setup or analyzing data. Since Python 3.4 there is a new library called Async that allows running code asynchronously. It looks like the future for this kind of programming, but I found the syntax much harder to understand in order to propose solutions.
Header photo by frank mckenna on Unsplash