Concurrency Utilities¶
The concurrency module offers high-level utilities for running tasks in parallel, making it easier to manage multithreading and multiprocessing.
parallelize_with_processes(func, args_list, max_workers=None, title='Parallelizing with processes', order_results=False)
¶
Executes a function in parallel using a process pool.
This function is best for CPU-bound tasks that can be executed independently, as it leverages multiple CPU cores to perform computations simultaneously.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[[Any], Any]
|
The function to execute in parallel. |
required |
args_list
|
List[Any]
|
A list of arguments for the function. Each element can be a single value or a tuple of arguments. |
required |
max_workers
|
Optional[int]
|
The maximum number of processes to use.
If |
None
|
title
|
str
|
A title for the progress bar. Defaults to "Parallelizing with processes". |
'Parallelizing with processes'
|
order_results
|
bool
|
If |
False
|
Returns:
| Type | Description |
|---|---|
List[Any]
|
List[Any]: A list of results from the function calls. |
Example
Perform CPU-intensive calculations in parallel:
def compute_square(n):
return n * n
numbers = list(range(10))
results = parallelize_with_processes(compute_square, numbers)
print(f"Results (order may vary): {results}")
Parallelizing with processes ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00 [15 avg it/s] [0.07 avg s/it]
Results (order may vary): [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Process tasks with multiple arguments and ordered results:
Output:Source code in opencrate/core/utils/concurrency.py
parallelize_with_threads(func, args_list, max_workers=None, title='Parallelizing with threads', order_results=False)
¶
Executes a function in parallel using a thread pool.
This function is ideal for I/O-bound tasks, such as making network requests or reading from a disk, where threads can perform work while waiting for external resources.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[[Any], Any]
|
The function to execute in parallel. |
required |
args_list
|
List[Any]
|
A list of arguments to be passed to the function. Each element can be a single value or a tuple of arguments. |
required |
max_workers
|
Optional[int]
|
The maximum number of threads to use.
If |
None
|
title
|
str
|
A title for the progress bar. Defaults to "Parallelizing with threads". |
'Parallelizing with threads'
|
order_results
|
bool
|
If |
False
|
Returns:
| Type | Description |
|---|---|
List[Any]
|
List[Any]: A list of results from the function calls. |
Example
Perform multiple network requests in parallel:
import requests
def download_url(url):
try:
return requests.get(url).status_code
except requests.RequestException:
return None
urls = ["https://www.google.com", "https://www.github.com"] * 5
results = parallelize_with_threads(download_url, urls, max_workers=5)
print(f"Received {len(results)} results: {results}")
Parallelizing with threads ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:01 [5 avg it/s] [0.18 avg s/it]
Received 10 results: [200, 200, 200, 200, 200, 200, 200, 200, 200, 200]
Process tasks with ordered results:
Output:Source code in opencrate/core/utils/concurrency.py
parallize_with_batch_processes(func, data, batch_size=None, title=None)
¶
Processes data in batches using a pool of worker processes.
This function is efficient for applying a function to a large dataset, as it distributes the data in batches to worker processes. It preserves the order of the results.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[[Any], Any]
|
The function to apply to each data item. |
required |
data
|
List[Any]
|
The list of data items to process. |
required |
batch_size
|
Optional[int]
|
The number of processes to use. If |
None
|
title
|
Optional[str]
|
A title for the progress bar. Defaults to "Batch processing". |
None
|
Returns:
| Type | Description |
|---|---|
List[Any]
|
List[Any]: A list of results in the same order as the input data. |
Example
Process a list of numbers in batches:
Output: