Migration guide#

The Migration Guide serves as a valuable resource for developers seeking to transition their parallel computing applications from different APIs (i.e. OpenMP, Intel Threading Building Blocks (TBB), MPI) to HPX. HPX, an advanced C++ library, offers a versatile and high-performance platform for parallel and distributed computing, providing a wide range of features and capabilities. This guide aims to assist developers in understanding the key differences between different APIs and HPX, and it provides step-by-step instructions for converting code to HPX code effectively.

Some general steps that can be used to migrate code to HPX code are the following:

  1. Install HPX using the Quick start guide.

  2. Include the HPX header files:

    Add the necessary header files for HPX at the beginning of your code, such as:

    #include <hpx/init.hpp>
    
  3. Replace your code with HPX code using the guide that follows.

  4. Use HPX-specific features and APIs:

    HPX provides additional features and APIs that can be used to take advantage of the library’s capabilities. For example, you can use the HPX asynchronous execution to express fine-grained tasks and dependencies, or utilize HPX’s distributed computing features for distributed memory systems.

  5. Compile and run the HPX code:

    Compile the converted code with the HPX library and run it using the appropriate HPX runtime environment.

OpenMP#

The OpenMP API supports multi-platform shared-memory parallel programming in C/C++. Typically it is used for loop-level parallelism, but it also supports function-level parallelism. Below are some examples on how to convert OpenMP to HPX code:

OpenMP parallel for loop#

Parallel for loop#

OpenMP code:

#pragma omp parallel for
for (int i = 0; i < n; ++i) {
    // loop body
}

HPX equivalent:

#include <hpx/algorithm.hpp>

hpx::experimental::for_loop(hpx::execution::par, 0, n, [&](int i) {
    // loop body
});

In the above code, the OpenMP #pragma omp parallel for directive is replaced with hpx::experimental::for_loop from the HPX library. The loop body within the lambda function will be executed in parallel for each iteration.

Private variables#

OpenMP code:

int x = 0;

#pragma omp parallel for private(x)
for (int i = 0; i < n; ++i) {
    // loop body
}

HPX equivalent:

#include <hpx/algorithm.hpp>

hpx::experimental::for_loop(hpx::execution::par, 0, n, [&](int i) {
        int x = 0; // Declare 'x' as a local variable inside the loop body
        // loop body
});

The variable x is declared as a local variable inside the loop body, ensuring that it is private to each thread.

Shared variables#

OpenMP code:

int x = 0;

#pragma omp parallel for shared(x)
for (int i = 0; i < n; ++i) {
    // loop body
}

HPX equivalent:

#include <hpx/algorithm.hpp>

std::atomic<int> x = 0; // Declare 'x' as a shared variable outside the loop

hpx::experimental::for_loop(hpx::execution::par, 0, n, [&](int i) {
    // loop body
});

To ensure variable x is shared among all threads, you simply have to declare it as an atomic variable outside the for_loop.

Number of threads#

OpenMP code:

#pragma omp parallel for num_threads(2)
for (int i = 0; i < n; ++i) {
    // loop body
}

HPX equivalent:

#include <hpx/algorithm.hpp>
#include <hpx/execution.hpp>

hpx::execution::experimental::num_cores nc(2);

hpx::experimental::for_loop(hpx::execution::par.with(nc), 0, n, [&](int i) {
    // loop body
});

To declare the number of threads to be used for the parallel region, you can use hpx::execution::experimental::num_cores and pass the number of cores (nc) to hpx::experimental::for_loop using hpx::execution::par.with(nc). This example uses 2 threads for the parallel loop.

Reduction#

OpenMP code:

int s = 0;

#pragma omp parallel for reduction(+: s)
for (int i = 0; i < n; ++i) {
    s += i;
    // loop body
}

HPX equivalent:

#include <hpx/algorithm.hpp>
#include <hpx/execution.hpp>

int s = 0;

hpx::experimental::for_loop(hpx::execution::par, 0, n, reduction(s, 0, plus<>()), [&](int i, int& accum) {
    accum += i;
    // loop body
});

The reduction clause specifies that the variable s` should be reduced across iterations using the plus<>` operation. It initializes s to 0 at the beginning of the loop and accumulates the values of s from each iteration using the + operator. The lambda function representing the loop body takes two parameters: i, which represents the loop index, and accum, which is the reduction variable s. The lambda function is executed for each iteration of the loop. The reduction ensures that the accum value is correctly accumulated across different iterations and threads.

Schedule#

OpenMP code:

int s = 0;

// static scheduling with chunk size 1000
#pragma omp parallel for schedule(static, 1000)
for (int i = 0; i < n; ++i) {
    // loop body
}

HPX equivalent:

#include <hpx/algorithm.hpp>
#include <hpx/execution.hpp>

hpx::execution::experimental::static_chunk_size cs(1000);

hpx::experimental::for_loop(hpx::execution::par.with(cs), 0, n, [&](int i) {
    // loop body
});

To define the scheduling type, you can use the corresponding execution policy from hpx::execution::experimental, define the chunk size (cs, here declared as 1000) and pass it to the to hpx::experimental::for_loop using hpx::execution::par.with(cs).

Accordingly, other types of scheduling are available and can be used in a similar manner:

#include <hpx/execution.hpp>
hpx::execution::experimental::dynamic_chunk_size cs(1000);
#include <hpx/execution.hpp>
hpx::execution::experimental::guided_chunk_size cs(1000);
#include <hpx/execution.hpp>
hpx::execution::experimental::auto_chunk_size cs(1000);

OpenMP single thread#

OpenMP code:

{   // parallel code
    #pragma omp single
    {
        // single-threaded code
    }
    // more parallel code
}

HPX equivalent:

#include <hpx/mutex.hpp>

hpx::mutex mtx;

{   // parallel code
    {   // single-threaded code
        std::scoped_lock l(mtx);
    }
    // more parallel code
}

To make sure that only one thread accesses a specific code within a parallel section you can use hpx::mutex and std::scoped_lock to take ownership of the given mutex mtx. For more information about mutexes please refer to Mutex.

OpenMP tasks#

Simple tasks#

OpenMP code:

// executed asynchronously by any available thread
#pragma omp task
{
    // task code
}

HPX equivalent:

#include <hpx/future.hpp>

auto future = hpx::async([](){
    // task code
});

or

#include <hpx/future.hpp>

hpx::post([](){
    // task code
}); // fire and forget

The tasks in HPX can be defined simply by using the async function and passing as argument the code you wish to run asynchronously. Another alternative is to use post which is a fire-and-forget method.

Tip

If you think you will like to synchronize your tasks later on, we suggest you use hpx::async which provides synchronization options, while hpx::post explicitly states that there is no return value or way to synchronize with the function execution. Synchronization options are listed below.

Task wait#

OpenMP code:

#pragma omp task
{
    // task code
}

#pragma omp taskwait
// code after completion of task

HPX equivalent:

#include <hpx/future.hpp>

hpx::async([](){
    // task code
}).get(); // wait for the task to complete

// code after completion of task

The get() function can be used to ensure that the task created with hpx::async is completed before the code continues executing beyond that point.

Multiple tasks synchronization#

OpenMP code:

#pragma omp task
{
    // task 1 code
}

#pragma omp task
{
    // task 2 code
}

#pragma omp taskwait
// code after completion of both tasks 1 and 2

HPX equivalent:

#include <hpx/future.hpp>

auto future1 = hpx::async([](){
    // task 1 code
});

auto future2 = hpx::async([](){
    // task 2 code
});

auto future = hpx::when_all(future1, future2).then([](auto&&){
    // code after completion of both tasks 1 and 2
});

If you would like to synchronize multiple tasks, you can use the hpx::when_all function to define which futures have to be ready and the then() function to declare what should be executed once these futures are ready.

Dependencies#

OpenMP code:

int a = 10;
int b = 20;
int c = 0;

#pragma omp task depend(in: a, b) depend(out: c)
{
    // task code
    c = 100;
}

HPX equivalent:

#include <hpx/future.hpp>

int a = 10;
int b = 20;
int c = 0;

// Create a future representing 'a'
auto future_a = hpx::make_ready_future(a);

// Create a future representing 'b'
auto future_b = hpx::make_ready_future(b);

// Create a task that depends on 'a' and 'b' and executes 'task_code'
auto future_c = hpx::dataflow(
    []() {
        // task code
        return 100;
    },
    future_a, future_b);

c = future_c.get();

If one of the arguments of hpx::dataflow is a future, then it will wait for the future to be ready to launch the thread. Hence, to define the dependencies of tasks you have to create futures representing the variables that create dependencies and pass them as arguments to hpx::dataflow. get() is used to save the result of the future to the desired variable.

Nested tasks#

OpenMP code:

#pragma omp task
{
    // Outer task code
    #pragma omp task
    {
        // Inner task code
    }
}

HPX equivalent:

#include <hpx/future.hpp>

auto future_outer = hpx::async([](){
    // Outer task code

    hpx::async([](){
        // Inner task code
    });
});

or

#include <hpx/future.hpp>

auto future_outer = hpx::post([](){ // fire and forget
    // Outer task code

    hpx::post([](){ // fire and forget
        // Inner task code
    });
});

If you have nested tasks, you can simply use nested hpx::async or hpx::post calls. The implementation is similar if you want to take care of synchronization:

OpenMP code:

#pragma omp taskwait
{
    // Outer task code
    #pragma omp taskwait
    {
        // Inner task code
    }
}

HPX equivalent:

#include <hpx/future.hpp>

auto future_outer = hpx::async([]() {
    // Outer task code

    hpx::async([]() {
        // Inner task code
    }).get();    // Wait for the inner task to complete
});

future_outer.get();    // Wait for the outer task to complete

Task yield#

OpenMP code:

#pragma omp task
{
    // code before yielding
    #pragma omp taskyield
    // code after yielding
}

HPX equivalent:

#include <hpx/future.hpp>
#include <hpx/thread.hpp>

auto future = hpx::async([](){
    // code before yielding
});

// yield execution to potentially allow other tasks to run
hpx::this_thread::yield();

// code after yielding

After creating a task using hpx::async, hpx::this_thread::yield can be used to reschedule the execution of threads, allowing other threads to run.

Task group#

OpenMP code:

#pragma omp taskgroup
{
    #pragma omp task
    {
        // task 1 code
    }

    #pragma omp task
    {
        // task 2 code
    }
}

HPX equivalent:

#include <hpx/task_group.hpp>

// Declare a task group
hpx::experimental::task_group tg;

// Run the tasks
tg.run([](){
    // task 1 code
});
tg.run(
    // task 2 code
});

// Wait for the task group
tg.wait();

To create task groups, you can use hpx::experimental::task_group. The function run() can be used to run each task within the task group, while wait() can be used to achieve synchronization. If you do not care about waiting for the task group to complete its execution, you can simply remove the wait() function.

OpenMP sections#

OpenMP code:

#pragma omp sections
{
    #pragma omp section
    // section 1 code
    #pragma omp section
    // section 2 code
} // implicit synchronization

HPX equivalent:

#include <hpx/future.hpp>

auto future_section1 = hpx::async([](){
    // section 1 code
});
auto future_section2 = hpx::async([](){
    // section 2 code
);

// synchronization: wait for both sections to complete
hpx::wait_all(future_section1, future_section2);

Unlike tasks, there is an implicit synchronization barrier at the end of each sections` directive in OpenMP. This synchronization is achieved using hpx::wait_all function.

Note

If the nowait clause is used in the sections directive, then you can just remove the hpx::wait_all function while keeping the rest of the code as it is.

Intel Threading Building Blocks (TBB)#

Intel Threading Building Blocks (TBB) provides a high-level interface for parallelism and concurrent programming using standard ISO C++ code. Below are some examples on how to convert Intel Threading Building Blocks (TBB) to HPX code:

parallel_for#

Intel Threading Building Blocks (TBB) code:

auto values = std::vector<double>(10000);

tbb::parallel_for( tbb::blocked_range<int>(0,values.size()),
                    [&](tbb::blocked_range<int> r)
{
    for (int i=r.begin(); i<r.end(); ++i)
    {
        // loop body
    }
});

HPX equivalent:

#include <hpx/algorithm.hpp>

auto values = std::vector<double>(10000);

hpx::experimental::for_loop(hpx::execution::par, 0, values.size(), [&](int i) {
    // loop body
});

In the above code, tbb::parallel_for is replaced with hpx::experimental::for_loop from the HPX library. The loop body within the lambda function will be executed in parallel for each iteration.

parallel_for_each#

Intel Threading Building Blocks (TBB) code:

auto values = std::vector<double>(10000);

tbb::parallel_for_each(values.begin(), values.end(), [&](){
    // loop body
});

HPX equivalent:

#include <hpx/algorithm.hpp>

auto values = std::vector<double>(10000);

hpx::for_each(hpx::execution::par, values.begin(), values.end(), [&](){
    // loop body
});

By utilizing hpx::for_each` and specifying a parallel execution policy with hpx::execution::par, it is possible to transform tbb::parallel_for_each` into its equivalent counterpart in HPX.

parallel_invoke#

Intel Threading Building Blocks (TBB) code:

tbb::parallel_invoke(task1, task2, task3);

HPX equivalent:

#include <hpx/future.hpp>

hpx::wait_all(hpx::async(task1), hpx::async(task2), hpx::async(task3));

To convert tbb::parallel_invoke to HPX, we use hpx::async to asynchronously execute each task, which returns a future representing the result of each task. We then pass these futures to hpx::when_all, which waits for all the futures to complete before returning.

parallel_pipeline#

Intel Threading Building Blocks (TBB) code:

tbb::parallel_pipeline(4,
    tbb::make_filter<void, int>(tbb::filter::serial_in_order,
        [](tbb::flow_control& fc) -> int {
            // Generate numbers from 1 to 10
            static int i = 1;
            if (i <= 10) {
                return i++;
            }
            else {
                fc.stop();
                return 0;
            }
        }) &
    tbb::make_filter<int, int>(tbb::filter::parallel,
        [](int num) -> int {
            // Multiply each number by 2
            return num * 2;
        }) &
    tbb::make_filter<int, void>(tbb::filter::serial_in_order,
        [](int num) {
            // Print the results
            std::cout << num << " ";
        })
);

HPX equivalent:

#include <iostream>
#include <vector>
#include <ranges>
#include <hpx/algorithm.hpp>

// generate the values
auto range = std::views::iota(1) | std::views::take(10);

// materialize the output vector
std::vector<int> results(10);

// in parallel execution of pipeline and transformation
hpx::ranges::transform(
    hpx::execution::par, range, result.begin(), [](int i) { return 2 * i; });

// print the modified vector
for (int i : result)
{
    std::cout << i << " ";
}
std::cout << std::endl;

The line auto range = std::views::iota(1) | std::views::take(10); generates a range of values using the std::views::iota function. It starts from the value 1 and generates an infinite sequence of incrementing values. The std::views::take(10) function is then applied to limit the sequence to the first 10 values. The result is stored in the range variable.

Hint

A view is a lightweight object that represents a particular view of a sequence or range. It acts as a read-only interface to the original data, providing a way to query and traverse the elements without making any copies or modifications.

Views can be composed and chained together to form complex pipelines of operations. These operations are evaluated lazily, meaning that the actual computation is performed only when the result is needed or consumed.

Since views perform lazy evaluation, we use std::vector<int> results(10); to meterialize the vector that will store the transformed values. The hpx::ranges::transform function is then used to perform a parallel transformation on the range. The transformed values will be written to the results vector.

Hint

Ranges enable loop fusion by combining multiple operations into a single parallel loop, eliminating waiting time and reducing overhead. Using ranges, you can express these operations as a pipeline of transformations on a sequence of elements. This pipeline is evaluated in a single pass, performing all the desired operations in parallel without the need to wait between them.

In addition, HPX enhances the benefits of range fusion by offering parallel execution policies, which can be used to optimize the execution of the fused loop across multiple threads.

parallel_reduce#

Reduction#

Intel Threading Building Blocks (TBB) code:

auto values = std::vector<double>{1,2,3,4,5,6,7,8,9};

auto total = tbb::parallel_reduce(
                tbb::blocked_range<int>(0,values.size()),
                0.0,
                [&](tbb::blocked_range<int> r, double running_total)
                {
                    for (int i=r.begin(); i<r.end(); ++i)
                    {
                        running_total += values[i];
                    }


                    return running_total;
                },
                std::plus<double>());

HPX equivalent:

#include <hpx/numeric.hpp>

auto values = std::vector<double>{1,2,3,4,5,6,7,8,9};

auto total = hpx::reduce(
    hpx::execution::par, values.begin(), values.end(), 0, std::plus{});

By utilizing hpx::reduce and specifying a parallel execution policy with hpx::execution::par, it is possible to transform tbb::parallel_reduce` into its equivalent counterpart in HPX. As demonstrated in the previous example, the management of intermediate results is seamlessly handled internally by HPX, eliminating the need for explicit consideration.

Transformation & Reduction#

Intel Threading Building Blocks (TBB) code:

auto values = std::vector<double>{1,2,3,4,5,6,7,8,9};

auto transform_function(double current_value){
    // transformation code
}

auto total = tbb::parallel_reduce(
                tbb::blocked_range<int>(0,values.size()),
                0.0,
                [&](tbb::blocked_range<int> r, double transformed_val)
                {
                    for (int i=r.begin(); i<r.end(); ++i)
                    {
                        transformed_val += transform_function(values[i]);
                    }
                    return transformed_val;
                },
                std::plus<double>());

HPX equivalent:

#include <hpx/numeric.hpp>

auto values = std::vector<double>{1,2,3,4,5,6,7,8,9};

auto transform_function(double current_value)
{
    // transformation code
}

auto total = hpx::transform_reduce(hpx::execution::par, values.begin(),
    values.end(), 0, std::plus{},
    [&](double current_value) { return transform_function(current_value); });

In situations where certain values require transformation before the reduction process, HPX provides a straightforward solution through hpx::transform_reduce. The transform_function() allows for the application of the desired transformation to each value.

parallel_scan#

Intel Threading Building Blocks (TBB) code:

tbb::parallel_scan(tbb::blocked_range<size_t>(0, input.size()),
    0,
    [&input, &output](const tbb::blocked_range<size_t>& range, int& partial_sum, bool is_final_scan) {
        for (size_t i = range.begin(); i != range.end(); ++i) {
            partial_sum += input[i];
            if (is_final_scan) {
                output[i] = partial_sum;
            }
        }
        return partial_sum;
    },
    [](int left_sum, int right_sum) {
        return left_sum + right_sum;
    }
);

HPX equivalent:

#include <hpx/numeric.hpp>

hpx::inclusive_scan(hpx::execution::par, input.begin(), input.end(),
    output.begin(),
    [](const int& left, const int& right) { return left + right; });

hpx::inclusive_scan with hpx::execution::par as execution policy can be used to perform a prefix scan in parallel. The management of intermediate results is seamlessly handled internally by HPX, eliminating the need for explicit consideration. input.begin() and input.end() refer to the beginning and end of the sequence of elements the algorithm will be applied to respectively. output.begin() refers to the beginning of the destination, while the last argument specifies the function which will be invoked for each of the values of the input sequence.

Apart from hpx::inclusive_scan, HPX provides its users with hpx::exclusive_scan. The key difference between inclusive scan and exclusive scan lies in the treatment of the current element during the scan operation. In an inclusive scan, each element in the output sequence includes the contribution of the corresponding element in the input sequence, while in an exclusive scan, the current element in the input sequence does not contribute to the corresponding element in the output sequence.

parallel_sort#

Intel Threading Building Blocks (TBB) code:

std::vector<int> numbers = {9, 2, 7, 1, 5, 3};

tbb::parallel_sort(numbers.begin(), numbers.end());

HPX equivalent:

#include <hpx/algorithm.hpp>

std::vector<int> numbers = {9, 2, 7, 1, 5, 3};

hpx::sort(hpx::execution::par, numbers.begin(), numbers.end());

hpx::sort provides an equivalent functionality to tbb::parallel_sort. When given a parallel execution policy with hpx::execution::par, the algorithm employs parallel execution, allowing for efficient sorting across available threads.

task_group#

Intel Threading Building Blocks (TBB) code:

// Declare a task group
tbb::task_group tg;

// Run the tasks
tg.run(task1);
tg.run(task2);

// Wait for the task group
tg.wait();

HPX equivalent:

#include <hpx/task_group.hpp>

// Declare a task group
hpx::experimental::task_group tg;

// Run the tasks
tg.run(task1);
tg.run(task2);

// Wait for the task group
tg.wait();

HPX drew inspiration from Intel Threading Building Blocks (TBB) to introduce the hpx::experimental::task_group feature. Therefore, utilizing hpx::experimental::task_group provides an equivalent functionality to tbb::task_group.

MPI#

MPI is a standardized communication protocol and library that allows multiple processes or nodes in a parallel computing system to exchange data and coordinate their execution.

List of MPI-HPX functions#

MPI function

HPX equivalent

MPI_Allgather

hpx::collectives::all_gather

MPI_Allreduce

hpx::collectives::all_reduce

MPI_Alltoall

hpx::collectives::all_to_all

MPI_Barrier

hpx::distributed::barrier

MPI_Bcast

hpx::collectives::broadcast_to() and hpx::collectives::broadcast_from() used with get()

MPI_Comm_size

hpx::get_num_localities

MPI_Comm_rank

hpx::get_locality_id()

MPI_Exscan

hpx::collectives::exclusive_scan() used with get()

MPI_Gather

hpx::collectives::gather_here() and hpx::collectives::gather_there() used with get()

MPI_Irecv

hpx::collectives::get()

MPI_Isend

hpx::collectives::set()

MPI_Reduce

hpx::collectives::reduce_here and hpx::collectives::reduce_there used with get()

MPI_Scan

hpx::collectives::inclusive_scan() used with get()

MPI_Scatter

hpx::collectives::scatter_to() and hpx::collectives::scatter_from()

MPI_Wait

hpx::collectives::get() used with a future i.e. setf.get()

MPI_Send & MPI_Recv#

Let’s assume we have the following simple message passing code where each process sends a message to the next process in a circular manner. The exchanged message is modified and printed to the console.

MPI code:

#include <cstddef>
#include <cstdint>
#include <iostream>
#include <mpi.h>
#include <vector>

constexpr int times = 2;

int main(int argc, char *argv[]) {
MPI_Init(&argc, &argv);

int num_localities;
MPI_Comm_size(MPI_COMM_WORLD, &num_localities);

int this_locality;
MPI_Comm_rank(MPI_COMM_WORLD, &this_locality);

int next_locality = (this_locality + 1) % num_localities;
std::vector<int> msg_vec = {0, 1};

int cnt = 0;
int msg = msg_vec[this_locality];

int recv_msg;
MPI_Request request_send, request_recv;
MPI_Status status;

while (cnt < times) {
    cnt += 1;

    MPI_Isend(&msg, 1, MPI_INT, next_locality, cnt, MPI_COMM_WORLD,
            &request_send);
    MPI_Irecv(&recv_msg, 1, MPI_INT, next_locality, cnt, MPI_COMM_WORLD,
            &request_recv);

    MPI_Wait(&request_send, &status);
    MPI_Wait(&request_recv, &status);

    std::cout << "Time: " << cnt << ", Locality " << this_locality
            << " received msg: " << recv_msg << "\n";

    recv_msg += 10;
    msg = recv_msg;
}

MPI_Finalize();
return 0;
}

HPX equivalent:

#include <hpx/config.hpp>

#if !defined(HPX_COMPUTE_DEVICE_CODE)
#include <hpx/algorithm.hpp>
#include <hpx/hpx_init.hpp>
#include <hpx/modules/collectives.hpp>

#include <cstddef>
#include <cstdint>
#include <iostream>
#include <utility>
#include <vector>

using namespace hpx::collectives;

constexpr char const* channel_communicator_name =
    "/example/channel_communicator/";

// the number of times
constexpr int times = 2;

int hpx_main()
{
    std::uint32_t num_localities = hpx::get_num_localities(hpx::launch::sync);
    std::uint32_t this_locality = hpx::get_locality_id();

    // allocate channel communicator
    auto comm = create_channel_communicator(hpx::launch::sync,
        channel_communicator_name, num_sites_arg(num_localities),
        this_site_arg(this_locality));

    std::uint32_t next_locality = (this_locality + 1) % num_localities;
    std::vector<int> msg_vec = {0, 1};

    int cnt = 0;
    int msg = msg_vec[this_locality];

    // send values to another locality
    auto setf = set(comm, that_site_arg(next_locality), msg, tag_arg(cnt));
    auto got_msg = get<int>(comm, that_site_arg(next_locality), tag_arg(cnt));

    setf.get();

    while (cnt < times)
    {
        cnt += 1;

        auto done_msg = got_msg.then([&](auto&& f) {
            int rec_msg = f.get();
            std::cout << "Time: " << cnt << ", Locality " << this_locality
                      << " received msg: " << rec_msg << "\n";

            // change msg by adding 10
            rec_msg += 10;

            // start next round
            setf =
                set(comm, that_site_arg(next_locality), rec_msg, tag_arg(cnt));
            got_msg =
                get<int>(comm, that_site_arg(next_locality), tag_arg(cnt));
            setf.get();
        });

        done_msg.get();
    }

    return hpx::finalize();
}
#endif

int main(int argc, char* argv[])
{
#if !defined(HPX_COMPUTE_DEVICE_CODE)
    hpx::init_params params;
    params.cfg = {"--hpx:run-hpx-main"};
    return hpx::init(argc, argv, params);
#else
    (void) argc;
    (void) argv;
    return 0;
#endif
}

To perform message passing between different processes in HPX we can use a channel communicator. To understand this example, let’s focus on the hpx_main() function:

  • hpx::get_num_localities(hpx::launch::sync) retrieves the number of localities, while hpx::get_locality_id() returns the ID of the current locality.

  • create_channel_communicator function is used to create a channel to serve the communication. This function takes several arguments, including the launch policy (hpx::launch::sync), the name of the communicator (channel_communicator_name), the number of localities, and the ID of the current locality.

  • The communication follows a ring pattern, where each process (or locality) sends a message to its neighbor in a circular manner. This means that the messages circulate around the localities, ensuring that the communication wraps around when reaching the end of the locality sequence. To achieve this, the next_locality variable is calculated as the ID of the next locality in the ring.

  • The initial values for the communication are set (msg_vec, cnt, msg).

  • The set() function is called to send the message to the next locality in the ring. The message is sent asynchronously and is associated with a tag (cnt).

  • The get() function is called to receive a message from the next locality. It is also associated with the same tag as the set() operation.

  • The setf.get() call blocks until the message sending operation is complete.

  • A continuation is set up using the function then() to handle the received message. Inside the continuation:

    • The received message value (rec_msg) is retrieved using f.get().

    • The received message is printed to the console and then modified by adding 10.

    • The set() and get() operations are repeated to send and receive the modified message to the next locality.

    • The setf.get() call blocks until the new message sending operation is complete.

  • The done_msg.get() call blocks until the continuation is complete for the current loop iteration.

Having said that, we conclude to the following table:

MPI_Gather#

The following code gathers data from all processes to the root process and verifies the gathered data in the root process.

MPI code:

#include <iostream>
#include <mpi.h>
#include <numeric>
#include <vector>

int main(int argc, char *argv[]) {
    MPI_Init(&argc, &argv);

    int num_localities, this_locality;
    MPI_Comm_size(MPI_COMM_WORLD, &num_localities);
    MPI_Comm_rank(MPI_COMM_WORLD, &this_locality);

    std::vector<int> local_data; // Data to be gathered

    if (this_locality == 0) {
        local_data.resize(num_localities); // Resize the vector on the root process
    }

    // Each process calculates its local data value
    int my_data = 42 + this_locality;

    for (std::uint32_t i = 0; i != 10; ++i) {

        // Gather data from all processes to the root process (process 0)
        MPI_Gather(&my_data, 1, MPI_INT, local_data.data(), 1, MPI_INT, 0,
                MPI_COMM_WORLD);

        // Only the root process (process 0) will print the gathered data
        if (this_locality == 0) {
        std::cout << "Gathered data on the root: ";
        for (int i = 0; i < num_localities; ++i) {
            std::cout << local_data[i] << " ";
        }
        std::cout << std::endl;
        }
    }
    std::cout << std::endl;

    MPI_Finalize();
    return 0;
}

HPX equivalent:

std::uint32_t num_localities = hpx::get_num_localities(hpx::launch::sync);
std::uint32_t this_locality = hpx::get_locality_id();

// test functionality based on immediate local result value
auto gather_direct_client = create_communicator(gather_direct_basename,
    num_sites_arg(num_localities), this_site_arg(this_locality));

for (std::uint32_t i = 0; i != 10; ++i)
{
    if (this_locality == 0)
    {
        hpx::future<std::vector<std::uint32_t>> overall_result =
            gather_here(gather_direct_client, std::uint32_t(42));

        std::vector<std::uint32_t> sol = overall_result.get();
        std::cout << "Gathered data on the root:";

        for (std::size_t j = 0; j != sol.size(); ++j)
        {
            HPX_TEST(j + 42 == sol[j]);
            std::cout << " " << sol[j];
        }
        std::cout << std::endl;
    }
    else
    {
        hpx::future<void> overall_result =
            gather_there(gather_direct_client, this_locality + 42);
        overall_result.get();
    }

}

This code will print 10 times the following message:

Gathered data on the root: 42 43

HPX uses two functions to implement the functionality of MPI_Gather: gather_here and gather_there. gather_here is gathering data from all localities to the locality with ID 0 (root locality). gather_there allows non-root localities to participate in the gather operation by sending data to the root locality. In more detail:

  • hpx::get_num_localities(hpx::launch::sync) retrieves the number of localities, while hpx::get_locality_id() returns the ID of the current locality.

  • The function create_communicator() is used to create a communicator called gather_direct_client.

  • If the current locality is the root (its ID is equal to 0):

    • The gather_here function is used to perform the gather operation. It collects data from all other localities into the overall_result future object. The function arguments provide the necessary information, such as the base name for the gather operation (gather_direct_basename), the value to be gathered (value), the number of localities (num_localities), the current locality ID (this_locality), and the generation number (related to the gather operation).

    • The get() member function of the overall_result future is used to retrieve the gathered data.

    • The next for loop is used to verify the correctness of the gathered data (sol). HPX_TEST is a macro provided by the HPX testing utilities to perform similar testing with the Standard C++ macro assert.

  • If the current locality is not the root:

    • The gather_there function is used to participate in the gather operation initiated by the root locality. It sends the data (in this case, the value this_locality + 42) to the root locality, indicating that it should be included in the gathering.

    • The get() member function of the overall_result future is used to wait for the gather operation to complete for this locality.

MPI_Scatter#

The following code gathers data from all processes to the root process and verifies the gathered data in the root process.

MPI code:

#include <iostream>
#include <mpi.h>
#include <vector>

int main(int argc, char *argv[]) {
    MPI_Init(&argc, &argv);

    int num_localities, this_locality;
    MPI_Comm_size(MPI_COMM_WORLD, &num_localities);
    MPI_Comm_rank(MPI_COMM_WORLD, &this_locality);

    int num_localities = num_localities;
    std::vector<int> data(num_localities);

    if (this_locality == 0) {
        // Fill the data vector on the root locality (locality 0)
        for (int i = 0; i < num_localities; ++i) {
        data[i] = 42 + i;
        }
    }

    int local_data; // Variable to store the received data

    // Scatter data from the root locality to all other localities
    MPI_Scatter(&data[0], 1, MPI_INT, &local_data, 1, MPI_INT, 0, MPI_COMM_WORLD);

    // Now, each locality has its own local_data

    // Print the local_data on each locality
    std::cout << "Locality " << this_locality << " received " << local_data
                << std::endl;

    MPI_Finalize();
    return 0;
}

HPX equivalent:

std::uint32_t num_localities = hpx::get_num_localities(hpx::launch::sync);
HPX_TEST_LTE(std::uint32_t(2), num_localities);

std::uint32_t this_locality = hpx::get_locality_id();

auto scatter_direct_client =
    hpx::collectives::create_communicator(scatter_direct_basename,
        num_sites_arg(num_localities), this_site_arg(this_locality));

// test functionality based on immediate local result value
for (std::uint32_t i = 0; i != 10; ++i)
{
    if (this_locality == 0)
    {
        std::vector<std::uint32_t> data(num_localities);
        std::iota(data.begin(), data.end(), 42 + i);

        hpx::future<std::uint32_t> result =
            scatter_to(scatter_direct_client, std::move(data));

        HPX_TEST_EQ(i + 42 + this_locality, result.get());
    }
    else
    {
        hpx::future<std::uint32_t> result =
            scatter_from<std::uint32_t>(scatter_direct_client);

        HPX_TEST_EQ(i + 42 + this_locality, result.get());

        std::cout << "Locality " << this_locality << " received "
                  << i + 42 + this_locality << std::endl;
    }
}

For num_localities = 2 and since we run for 10 iterations this code will print the following message:

Locality 1 received 43
Locality 1 received 44
Locality 1 received 45
Locality 1 received 46
Locality 1 received 47
Locality 1 received 48
Locality 1 received 49
Locality 1 received 50
Locality 1 received 51
Locality 1 received 52

HPX uses two functions to implement the functionality of MPI_Scatter: hpx::scatter_to and hpx::scatter_from. hpx::scatter_to is distributing the data from the locality with ID 0 (root locality) to all other localities. hpx::scatter_from allows non-root localities to receive the data from the root locality. In more detail:

  • hpx::get_num_localities(hpx::launch::sync) retrieves the number of localities, while hpx::get_locality_id() returns the ID of the current locality.

  • The function hpx::collectives::create_communicator() is used to create a communicator called scatter_direct_client.

  • If the current locality is the root (its ID is equal to 0):

    • The data vector is filled with values ranging from 42 + i to 42 + i + num_localities - 1.

    • The hpx::scatter_to function is used to perform the scatter operation using the communicator scatter_direct_client. This scatters the data vector to other localities and returns a future representing the result.

    • HPX_TEST_EQ is a macro provided by the HPX testing utilities to test the distributed values.

  • If the current locality is not the root:

    • The hpx::scatter_from function is used to collect the data by the root locality.

    • HPX_TEST_EQ is a macro provided by the HPX testing utilities to test the collected values.

MPI_Allgather#

The following code gathers data from all processes and sends the data to all processes.

MPI code:

#include <cstdint>
#include <iostream>
#include <mpi.h>
#include <vector>

int main(int argc, char **argv) {
    MPI_Init(&argc, &argv);

    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    // Get the number of MPI processes
    int num_localities = size;

    // Get the MPI process rank
    int here = rank;

    std::uint32_t value = here;

    std::vector<std::uint32_t> r(num_localities);

    // Perform an all-gather operation to gather values from all processes.
    MPI_Allgather(&value, 1, MPI_UINT32_T, r.data(), 1, MPI_UINT32_T,
                    MPI_COMM_WORLD);

    // Print the result.
    std::cout << "Locality " << here << " has values:";
    for (size_t j = 0; j < r.size(); ++j) {
        std::cout << " " << r[j];
    }
    std::cout << std::endl;

    MPI_Finalize();
    return 0;
}

HPX equivalent:

std::uint32_t num_localities = hpx::get_num_localities(hpx::launch::sync);
std::uint32_t here = hpx::get_locality_id();

// test functionality based on immediate local result value
auto all_gather_direct_client =
    create_communicator(all_gather_direct_basename,
        num_sites_arg(num_localities), this_site_arg(here));

std::uint32_t value = here;

hpx::future<std::vector<std::uint32_t>> overall_result =
    all_gather(all_gather_direct_client, value);

std::vector<std::uint32_t> r = overall_result.get();

std::cout << "Locality " << here << " has values:";
for (std::size_t j = 0; j != r.size(); ++j)
{
    std::cout << " " << j;
}
std::cout << std::endl;

For num_localities = 2 this code will print the following message:

Locality 0 has values: 0 1
Locality 1 has values: 0 1

HPX uses the function all_gather to implement the functionality of MPI_Allgather. In more detail:

  • hpx::get_num_localities(hpx::launch::sync) retrieves the number of localities, while hpx::get_locality_id() returns the ID of the current locality.

  • The function hpx::collectives::create_communicator() is used to create a communicator called all_gather_direct_client.

  • The values that the localities exchange with each other are equal to each locality’s ID.

  • The gather operation is performed using all_gather. The result is stored in an hpx::future object called overall_result, which represents a future result that can be retrieved later when needed.

  • The get() function waits until the result is available and then stores it in the vector called r.

MPI_Allreduce#

The following code combines values from all processes and distributes the result back to all processes.

MPI code:

#include <cstdint>
#include <iostream>
#include <mpi.h>

int main(int argc, char **argv) {
    MPI_Init(&argc, &argv);

    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    // Get the number of MPI processes
    int num_localities = size;

    // Get the MPI process rank
    int here = rank;

    // Create a communicator for the all reduce operation.
    MPI_Comm all_reduce_direct_client;
    MPI_Comm_split(MPI_COMM_WORLD, 0, rank, &all_reduce_direct_client);

    // Perform the all reduce operation to calculate the sum of 'here' values.
    std::uint32_t value = here;
    std::uint32_t res = 0;
    MPI_Allreduce(&value, &res, 1, MPI_UINT32_T, MPI_SUM,
                    all_reduce_direct_client);

    std::cout << "Locality " << rank << " has value: " << res << std::endl;

    MPI_Finalize();
    return 0;
}

HPX equivalent:

std::uint32_t const num_localities =
    hpx::get_num_localities(hpx::launch::sync);
std::uint32_t const here = hpx::get_locality_id();

auto const all_reduce_direct_client =
    create_communicator(all_reduce_direct_basename,
        num_sites_arg(num_localities), this_site_arg(here));

std::uint32_t value = here;

hpx::future<std::uint32_t> overall_result =
    all_reduce(all_reduce_direct_client, value, std::plus<std::uint32_t>{});

std::uint32_t res = overall_result.get();
std::cout << "Locality " << here << " has value: " << res << std::endl;

For num_localities = 2 this code will print the following message:

Locality 0 has value: 1
Locality 1 has value: 1

HPX uses the function all_reduce to implement the functionality of MPI_Allreduce. In more detail:

  • hpx::get_num_localities(hpx::launch::sync) retrieves the number of localities, while hpx::get_locality_id() returns the ID of the current locality.

  • The function hpx::collectives::create_communicator() is used to create a communicator called all_reduce_direct_client.

  • The value of each locality is equal to its ID.

  • The reduce operation is performed using all_reduce. The result is stored in an hpx::future object called overall_result, which represents a future result that can be retrieved later when needed.

  • The get() function waits until the result is available and then stores it in the variable res.

MPI_Alltoall#

The following code gathers data from and scatters data to all processes.

MPI code:

#include <algorithm>
#include <cstdint>
#include <iostream>
#include <mpi.h>
#include <vector>

int main(int argc, char **argv) {
    MPI_Init(&argc, &argv);

    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    // Get the number of MPI processes
    int num_localities = size;

    // Get the MPI process rank
    int this_locality = rank;

    // Create a communicator for all-to-all operation.
    MPI_Comm all_to_all_direct_client;
    MPI_Comm_split(MPI_COMM_WORLD, 0, rank, &all_to_all_direct_client);

    std::vector<std::uint32_t> values(num_localities);
    std::fill(values.begin(), values.end(), this_locality);

    // Create vectors to store received values.
    std::vector<std::uint32_t> r(num_localities);

    // Perform an all-to-all operation to exchange values with other localities.
    MPI_Alltoall(values.data(), 1, MPI_UINT32_T, r.data(), 1, MPI_UINT32_T,
                all_to_all_direct_client);

    // Print the results.
    std::cout << "Locality " << this_locality << " has values:";
    for (std::size_t j = 0; j != r.size(); ++j) {
        std::cout << " " << r[j];
    }
    std::cout << std::endl;

    MPI_Finalize();
    return 0;
}

HPX equivalent:

std::uint32_t num_localities = hpx::get_num_localities(hpx::launch::sync);
std::uint32_t this_locality = hpx::get_locality_id();

auto all_to_all_direct_client =
    create_communicator(all_to_all_direct_basename,
        num_sites_arg(num_localities), this_site_arg(this_locality));

std::vector<std::uint32_t> values(num_localities);
std::fill(values.begin(), values.end(), this_locality);

hpx::future<std::vector<std::uint32_t>> overall_result =
    all_to_all(all_to_all_direct_client, std::move(values));

std::vector<std::uint32_t> r = overall_result.get();
std::cout << "Locality " << this_locality << " has values:";

for (std::size_t j = 0; j != r.size(); ++j)
{
    std::cout << " " << r[j];
}
std::cout << std::endl;

For num_localities = 2 this code will print the following message:

Locality 0 has values: 0 1
Locality 1 has values: 0 1

HPX uses the function all_to_all to implement the functionality of MPI_Alltoall. In more detail:

  • hpx::get_num_localities(hpx::launch::sync) retrieves the number of localities, while hpx::get_locality_id() returns the ID of the current locality.

  • The function hpx::collectives::create_communicator() is used to create a communicator called all_to_all_direct_client.

  • The value each locality sends is equal to its ID.

  • The all-to-all operation is performed using all_to_all. The result is stored in an hpx::future object called overall_result, which represents a future result that can be retrieved later when needed.

  • The get() function waits until the result is available and then stores it in the variable r.

MPI_Barrier#

The following code shows how barrier is used to synchronize multiple processes.

MPI code:

#include <cstdlib>
#include <iostream>
#include <mpi.h>

int main(int argc, char **argv) {
    MPI_Init(&argc, &argv);

    std::size_t iterations = 5;

    int rank, size;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    for (std::size_t i = 0; i != iterations; ++i) {
        MPI_Barrier(MPI_COMM_WORLD);
        if (rank == 0) {
        std::cout << "Iteration " << i << " completed." << std::endl;
        }
    }

    MPI_Finalize();
    return 0;
}

HPX equivalent:

std::size_t iterations = 5;
std::uint32_t this_locality = hpx::get_locality_id();

char const* const barrier_test_name = "/test/barrier/multiple";

hpx::distributed::barrier b(barrier_test_name);
for (std::size_t i = 0; i != iterations; ++i)
{
    b.wait();
    if (this_locality == 0)
    {
        std::cout << "Iteration " << i << " completed." << std::endl;
    }
}

This code will print the following message:

Iteration 0 completed.
Iteration 1 completed.
Iteration 2 completed.
Iteration 3 completed.
Iteration 4 completed.

HPX uses the function barrier to implement the functionality of MPI_Barrier. In more detail:

  • After defining the number of iterations, we use hpx::get_locality_id() to get the ID of the current locality.

  • char const* const barrier_test_name = “/test/barrier/multiple”: This line defines a constant character array as the name of the barrier. This name is used to identify the barrier across different localities. All participating threads that use this name will synchronize at this barrier.

  • Using hpx::distributed::barrier b(barrier_test_name), we create an instance of the distributed barrier with the previously defined name. This barrier will be used to synchronize the execution of threads across different localities.

  • Running for all the desired iterations, we use b.wait() to synchronize the threads. Each thread waits until all other threads also reach this point before any of them can proceed further.

MPI_Bcast#

The following code broadcasts data from one process to all other processes.

MPI code:

#include <iostream>
#include <mpi.h>

int main(int argc, char *argv[]) {
    MPI_Init(&argc, &argv);

    int num_localities;
    MPI_Comm_size(MPI_COMM_WORLD, &num_localities);

    int here;
    MPI_Comm_rank(MPI_COMM_WORLD, &here);

    int value;

    for (int i = 0; i < 5; ++i) {
        if (here == 0) {
            value = i + 42;
        }

        // Broadcast the value from process 0 to all other processes
        MPI_Bcast(&value, 1, MPI_INT, 0, MPI_COMM_WORLD);

        if (here != 0) {
            std::cout << "Locality " << here << " received " << value << std::endl;
        }

    }

    MPI_Finalize();
    return 0;
}

HPX equivalent:

std::uint32_t num_localities = hpx::get_num_localities(hpx::launch::sync);

std::uint32_t here = hpx::get_locality_id();

auto broadcast_direct_client =
    create_communicator(broadcast_direct_basename,
        num_sites_arg(num_localities), this_site_arg(here));

// test functionality based on immediate local result value
for (std::uint32_t i = 0; i != 5; ++i)
{
    if (here == 0)
    {
        hpx::future<std::uint32_t> result =
            broadcast_to(broadcast_direct_client, i + 42);

        result.get();
    }
    else
    {
        hpx::future<std::uint32_t> result =
            hpx::collectives::broadcast_from<std::uint32_t>(
                broadcast_direct_client);

        uint32_t r = result.get();

        std::cout << "Locality " << here << " received " << r << std::endl;
    }
}

For num_localities = 2 this code will print the following message:

Locality 1 received 42
Locality 1 received 43
Locality 1 received 44
Locality 1 received 45
Locality 1 received 46

HPX uses two functions to implement the functionality of MPI_Bcast: broadcast_to and broadcast_from. broadcast_to is broadcasting the data from the root locality to all other localities. broadcast_from allows non-root localities to collect the data sent by the root locality. In more detail:

  • hpx::get_num_localities(hpx::launch::sync) retrieves the number of localities, while hpx::get_locality_id() returns the ID of the current locality.

  • The function create_communicator() is used to create a communicator called broadcast_direct_client.

  • If the current locality is the root (its ID is equal to 0):

    • The broadcast_to function is used to perform the broadcast operation using the communicator broadcast_direct_client. This sends the data to other localities and returns a future representing the result.

    • The get() member function of the result future is used to wait for and retrieve the result.

  • If the current locality is not the root:

    • The broadcast_from function is used to collect the data by the root locality.

    • The get() member function of the result future is used to wait for the result.

MPI_Exscan#

The following code computes the exclusive scan (partial reductions) of data on a collection of processes.

MPI code:

#include <iostream>
#include <mpi.h>
#include <numeric>
#include <vector>

int main(int argc, char *argv[]) {
    MPI_Init(&argc, &argv);

    int num_localities;
    MPI_Comm_size(MPI_COMM_WORLD, &num_localities);

    int here;
    MPI_Comm_rank(MPI_COMM_WORLD, &here);

    // Calculate the value for this locality (here)
    int value = here;

    // Perform an exclusive scan
    std::vector<int> result(num_localities);
    MPI_Exscan(&value, &result[0], 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);

    if (here != 0) {
        int r = result[here - 1]; // Result is in the previous rank's slot

        std::cout << "Locality " << here << " has value " << r << std::endl;
    }

    MPI_Finalize();
    return 0;
}

HPX equivalent:

std::uint32_t num_localities = hpx::get_num_localities(hpx::launch::sync);
std::uint32_t here = hpx::get_locality_id();

auto exclusive_scan_client = create_communicator(exclusive_scan_basename,
    num_sites_arg(num_localities), this_site_arg(here));

// test functionality based on immediate local result value
std::uint32_t value = here;

hpx::future<std::uint32_t> overall_result = exclusive_scan(
    exclusive_scan_client, value, std::plus<std::uint32_t>{});

uint32_t r = overall_result.get();

if (here != 0)
{
    std::cout << "Locality " << here << " has value " << r << std::endl;
}

For num_localities = 2 this code will print the following message:

Locality 1 has value 0

HPX uses the function exclusive_scan to implement MPI_Exscan. In more detail:

  • hpx::get_num_localities(hpx::launch::sync) retrieves the number of localities, while hpx::get_locality_id() returns the ID of the current locality.

  • The function create_communicator() is used to create a communicator called exclusive_scan_client.

  • The exclusive_scan function is used to perform the exclusive scan operation using the communicator exclusive_scan_client. std::plus<std::uint32_t>{} specifies the binary associative operator to use for the scan. In this case, it’s addition for summing values.

  • The get() member function of the overall_result future is used to wait for the result.

MPI_Scan#

The following code Computes the inclusive scan (partial reductions) of data on a collection of processes.

MPI code:

#include <iostream>
#include <mpi.h>
#include <numeric>
#include <vector>

int main(int argc, char *argv[]) {
    MPI_Init(&argc, &argv);

    int num_localities;
    MPI_Comm_size(MPI_COMM_WORLD, &num_localities);

    int here;
    MPI_Comm_rank(MPI_COMM_WORLD, &here);

    // Calculate the value for this locality (here)
    int value = here;

    std::vector<int> result(num_localities);

    MPI_Scan(&value, &result[0], 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);

    std::cout << "Locality " << here << " has value " << result[0] << std::endl;

    MPI_Finalize();
    return 0;
}

HPX equivalent:

std::uint32_t num_localities = hpx::get_num_localities(hpx::launch::sync);
std::uint32_t here = hpx::get_locality_id();

auto inclusive_scan_client = create_communicator(inclusive_scan_basename,
    num_sites_arg(num_localities), this_site_arg(here));

std::uint32_t value = here;

hpx::future<std::uint32_t> overall_result = inclusive_scan(
    inclusive_scan_client, value, std::plus<std::uint32_t>{});

uint32_t r = overall_result.get();

std::cout << "Locality " << here << " has value " << r << std::endl;

For num_localities = 2 this code will print the following message:

Locality 0 has value 0
Locality 1 has value 1

HPX uses the function inclusive_scan to implement MPI_Scan. In more detail:

  • hpx::get_num_localities(hpx::launch::sync) retrieves the number of localities, while hpx::get_locality_id() returns the ID of the current locality.

  • The function create_communicator() is used to create a communicator called inclusive_scan_client.

  • The inclusive_scan function is used to perform the exclusive scan operation using the communicator inclusive_scan_client. std::plus<std::uint32_t>{} specifies the binary associative operator to use for the scan. In this case, it’s addition for summing values.

  • The get() member function of the overall_result future is used to wait for the result.

MPI_Reduce#

The following code performs a global reduce operation across all processes.

MPI code:

#include <iostream>
#include <mpi.h>

int main(int argc, char *argv[]) {
    MPI_Init(&argc, &argv);

    int num_processes;
    MPI_Comm_size(MPI_COMM_WORLD, &num_processes);

    int this_rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &this_rank);

    int value = this_rank;

    int result = 0;

    // Perform the reduction operation
    MPI_Reduce(&value, &result, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);

    // Print the result for the root process (process 0)
    if (this_rank == 0) {
        std::cout << "Locality " << this_rank << " has value " << result
                << std::endl;
    }

    MPI_Finalize();
    return 0;
}

HPX equivalent:

std::uint32_t num_localities = hpx::get_num_localities(hpx::launch::sync);
std::uint32_t this_locality = hpx::get_locality_id();

auto reduce_direct_client = create_communicator(reduce_direct_basename,
    num_sites_arg(num_localities), this_site_arg(this_locality));

std::uint32_t value = hpx::get_locality_id();

if (this_locality == 0)
{
    hpx::future<std::uint32_t> overall_result = reduce_here(
        reduce_direct_client, value, std::plus<std::uint32_t>{});

    uint32_t r = overall_result.get();

    std::cout << "Locality " << this_locality << " has value " << r
              << std::endl;
}
else
{
    hpx::future<void> overall_result =
        reduce_there(reduce_direct_client, std::move(value));
    overall_result.get();
}

This code will print the following message:

Locality 0 has value 1

HPX uses two functions to implement the functionality of MPI_Reduce: reduce_here and reduce_there. reduce_here is gathering data from all localities to the locality with ID 0 (root locality) and then performs the defined reduction operation. reduce_there allows non-root localities to participate in the reduction operation by sending data to the root locality. In more detail:

  • hpx::get_num_localities(hpx::launch::sync) retrieves the number of localities, while hpx::get_locality_id() returns the ID of the current locality.

  • The function create_communicator() is used to create a communicator called reduce_direct_client.

  • If the current locality is the root (its ID is equal to 0):

    • The reduce_here function initiates a reduction operation with addition (std::plus) as the reduction operator. The result is stored in overall_result.

    • The get() member function of the overall_result future is used to wait for the result.

  • If the current locality is not the root:

    • The reduce_there initiates a remote reduction operation.

    • The get() member function of the overall_result future is used to wait for the remote reduction operation to complete. This is done to ensure synchronization among localities.