Task Parallelism and Data Parallelism Thread Pools

Task Parallelism and Data Parallelism Thread Pools

Parallel computing environments often involve distributing code across multiple processors for efficient execution. Two common paradigms of parallelization are task parallelism and data parallelism.

Task parallelism focuses on distributing encapsulated tasks that can execute the same or different code on the same or different data across different processors. On the other hand, data parallelism involves performing the same operations on different subsets of the same data on multiple processors.

Both task parallelism and data parallelism can be implemented using thread pools. This article explores simple implementations of thread pools for task parallelism and data parallelism scenarios.

Task Parallelism Thread Pool

The following code demonstrates a simple thread pool for task parallelism. It utilizes a task queue to distribute tasks, which are represented as Callable[[], None] objects (callables accepting no parameters and returning None) across multiple threads. Each thread continuously fetches a task from the task queue and executes it. If an exception occurs during task execution, a traceback is printed, and a new task is obtained from the task queue.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import traceback

from collections.abc import Callable, Iterable, Generator
from queue import Queue
from threading import Thread
from typing import Any


class TaskParallelismThreadPoolThread(Thread):
def __init__(self, task_queue: Queue[Callable[[], None] | None]):
super().__init__()
self.task_queue: Queue[Callable[[], None] | None] = task_queue

def run(self) -> None:
while True:
task: Callable[[], None] | None = self.task_queue.get()

if task is None:
break

try:
task()
except:
traceback.print_exc()


def run_simple_task_parallelism_thread_pool(tasks: Iterable[Callable[[], None]], num_threads: int) -> None:
# Create task queue.
task_queue: Queue[Callable[[], None] | None] = Queue()

# Create all threads which share a task queue.
threads: list[Thread] = []
for _ in range(num_threads):
thread: Thread = TaskParallelismThreadPoolThread(task_queue)
thread.start()
threads.append(thread)

# Enqueue all tasks.
for task in tasks:
task_queue.put(task)

# Enqueue sentinel values for all threads to stop once all tasks are finished.
for _ in range(num_threads):
task_queue.put(None)

# Wait for all threads to stop.
for thread in threads:
thread.join()

To use this task parallelism thread pool, provide it with a collection of tasks (represented as Callable[[], None] objects) and the desired number of threads. The tasks will be executed in parallel by the thread pool until all tasks have finished.

Data Parallelism Thread Pool

The following code showcases a simple thread pool for data parallelism. It distributes data as argument tuples across multiple threads using an argument tuple queue. Each thread is assigned an operation created using an operation factory. After executing an operation on an argument tuple, the resulting return value is passed to a return value callback.

As with the task parallelism thread pool, all threads in the thread pool are always busy by getting a new argument tuple whenever its operation finishes execution on a previous argument tuple. Should an exception be raised when executing an operation on an argument tuple, a traceback is printed, and a new argument tuple is taken from the shared argument tuple queue.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import traceback
import multiprocessing

from collections.abc import Callable, Iterable, Generator
from queue import Queue
from threading import Thread
from typing import Any


class DataParallelismThreadPoolThread(Thread):
def __init__(self, operation: Callable[[...], Any], argument_tuple_queue: Queue[tuple[Any, ...] | None], return_value_callback: Callable[[Any], None]):
super().__init__()
self.operation: Callable[[...], Any] = operation
self.argument_tuple_queue: Queue[tuple[Any, ...] | None] = argument_tuple_queue
self.return_value_callback: Callable[[Any], None] = return_value_callback

def run(self) -> None:
while True:
argument_tuple: tuple[Any, ...] | None = self.argument_tuple_queue.get()

if argument_tuple is None:
break

try:
self.return_value_callback(self.operation(*argument_tuple))
except:
traceback.print_exc()


def run_simple_data_parallelism_thread_pool(
operation_factory: Callable[[], Callable[[...], Any]],
argument_tuples: Iterable[tuple[Any, ...]],
return_value_callback: Callable[[Any], None] = lambda return_value: None,
num_threads: int = multiprocessing.cpu_count()
) -> None:
# Create argument tuple queue.
argument_tuple_queue: Queue[tuple[Any, ...] | None] = Queue()

# Create all threads which share an argument tuple queue.
threads: list[Thread] = []
for _ in range(num_threads):
thread: Thread = DataParallelismThreadPoolThread(operation_factory(), argument_tuple_queue, return_value_callback)
thread.start()
threads.append(thread)

# Enqueue all argument tuples.
for argument_tuple in argument_tuples:
argument_tuple_queue.put(argument_tuple)

# Enqueue sentinel values for all threads to stop once execution on all argument tuples have finished.
for _ in range(num_threads):
argument_tuple_queue.put(None)

# Wait for all threads to stop.
for thread in threads:
thread.join()

To utilize the data parallelism thread pool, provide an operation factory, a collection of argument tuples, and a return value callback. The operation factory, when called, creates operations represented as Callable[[...], Any] objects, accepting arguments from an argument tuple and returning a value. Each thread in the thread pool will execute these operations on the provided argument tuples. Any return values will be passed to the return value callback, which can be customized according to your needs. The data parallelism thread pool will process the argument tuples in parallel until all tuples have been processed.

Example

Say that we want to sleep for \(0, 1, 2, \dots, N - 1\) seconds before printing the number of seconds slept in parallel, where \(N\) is the number of threads in our thread pool.

We can adopt a task parallelism approach, where we create tasks which encapsulate how long they sleep, and add them to a task parallelism thread pool:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import time
import random


N = 8


def create_task(sleep_time: int) -> Callable[[], None]:
def task():
nonlocal sleep_time
time.sleep(sleep_time)
print(f'Slept for {sleep_time}')

return task


run_simple_task_parallelism_thread_pool(
(create_task(i) for i in range(N)),
N
)

As an alternative, we can also use a data parallelism approach, in which operations accept the number of seconds they sleep from argument tuples, sleep for those time durations, and return those time durations before return value callbacks operate on the return values and print those time durations:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import time
import random

from collections.abc import Callable


N = 8


def operation(sleep_time: int) -> int:
time.sleep(sleep_time)
return sleep_time


def operation_factory() -> Callable[[int], int]:
return operation


def return_value_callback(sleep_time: int) -> None:
print(f'Slept for {sleep_time}')


run_simple_data_parallelism_thread_pool(
operation_factory,
((i,) for i in range(N)),
return_value_callback,
N
)

Running both thread pools takes the same time and produces the same output.

References

  • https://en.m.wikipedia.org/wiki/Task_parallelism
  • https://en.wikipedia.org/wiki/Data_parallelism

Task Parallelism and Data Parallelism Thread Pools
https://abbaswu.github.io/2023/07/11/Task-Parallelism-and-Data-Parallelism-Thread-Pools/
Author
Jifeng Wu
Posted on
July 11, 2023
Licensed under