Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 135 additions & 4 deletions tests/universal-benchmark/universal_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <random>
#include <sstream>
Expand All @@ -16,6 +17,11 @@
#include <thread>
#include <vector>

#ifdef __linux__
#include <sched.h>
#include <pthread.h>
#endif

#include <pcg/pcg_random.hpp>
#include <test_util.hh>

Expand Down Expand Up @@ -48,6 +54,7 @@ size_t g_threads = std::thread::hardware_concurrency();
// a random seed.
size_t g_seed = 0;


const char *args[] = {
"--reads", "--inserts", "--erases",
"--updates", "--upserts", "--initial-capacity",
Expand Down Expand Up @@ -88,6 +95,95 @@ const char *description =
"percentages must be 100.\nMap type is " TABLE_TYPE
"<" XSTR(KEY) ", " XSTR(VALUE) ">.";

#ifdef __linux__
#include <fstream>
#include <set>

// Read the set of HT siblings for a given CPU from sysfs.
// Returns the sibling CPU numbers (including cpu_id itself).
std::set<int> get_ht_siblings(int cpu_id) {
std::set<int> siblings;
std::string path = "/sys/devices/system/cpu/cpu" +
std::to_string(cpu_id) + "/topology/thread_siblings_list";
std::ifstream f(path);
if (!f) return {cpu_id};
std::string line;
std::getline(f, line);
// Parse comma-separated list and ranges like "0,12" or "0-3"
std::istringstream ss(line);
std::string token;
while (std::getline(ss, token, ',')) {
auto dash = token.find('-');
if (dash != std::string::npos) {
int lo = std::stoi(token.substr(0, dash));
int hi = std::stoi(token.substr(dash + 1));
for (int i = lo; i <= hi; ++i) siblings.insert(i);
} else {
siblings.insert(std::stoi(token));
}
}
return siblings;
}

// Get the process affinity mask as a vector of CPU IDs.
std::vector<int> get_affinity_cpus() {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
if (sched_getaffinity(0, sizeof(cpuset), &cpuset) != 0) return {};
std::vector<int> cpus;
for (int i = 0; i < CPU_SETSIZE; ++i)
if (CPU_ISSET(i, &cpuset)) cpus.push_back(i);
return cpus;
}

// Count total online CPUs from /sys/devices/system/cpu/online.
size_t get_online_cpu_count() {
std::ifstream f("/sys/devices/system/cpu/online");
if (!f) return 0;
std::string line;
std::getline(f, line);
size_t count = 0;
std::istringstream ss(line);
std::string token;
while (std::getline(ss, token, ',')) {
auto dash = token.find('-');
if (dash != std::string::npos) {
int lo = std::stoi(token.substr(0, dash));
int hi = std::stoi(token.substr(dash + 1));
count += hi - lo + 1;
} else {
count++;
}
}
return count;
}

// From a set of available CPUs, select one per physical core,
// avoiding HT siblings. Returns the selected CPU IDs.
std::vector<int> select_one_per_core(const std::vector<int> &cpus) {
std::set<int> used;
std::vector<int> selected;
for (int cpu : cpus) {
if (used.count(cpu)) continue;
selected.push_back(cpu);
for (int sib : get_ht_siblings(cpu)) used.insert(sib);
}
return selected;
}

void pin_thread(int cpu) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu, &cpuset);
int rc = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset);
if (rc != 0) {
std::fprintf(stderr, "ERROR: failed to pin thread to CPU %d: %s\n",
cpu, strerror(rc));
std::exit(1);
}
}
#endif

void check_percentage(size_t value, const char *name) {
if (value > 100) {
std::string msg("Percentage for `");
Expand Down Expand Up @@ -216,8 +312,8 @@ int main(int argc, char **argv) {
try {
// Parse parameters and check them.
parse_flags(argc, argv, description, args, arg_vars, arg_descriptions,
sizeof(args) / sizeof(const char *), nullptr, nullptr, nullptr,
0);
sizeof(args) / sizeof(const char *), nullptr, nullptr,
nullptr, 0);
check_percentage(g_read_percentage, "reads");
check_percentage(g_insert_percentage, "inserts");
check_percentage(g_erase_percentage, "erases");
Expand All @@ -233,6 +329,36 @@ int main(int argc, char **argv) {
g_seed = std::random_device()();
}

#ifdef __linux__
// If the process CPU affinity is restricted (via taskset, cpuset, or
// container --cpuset-cpus), pin each thread to its own physical core,
// skipping HT siblings. When affinity is unrestricted, do nothing.
std::vector<int> pin_targets;
{
auto affinity_cpus = get_affinity_cpus();
size_t online_count = get_online_cpu_count();
bool affinity_restricted = affinity_cpus.size() < online_count;
if (affinity_restricted) {
// Select one CPU per physical core, skipping HT siblings
pin_targets = select_one_per_core(affinity_cpus);
if (pin_targets.size() < g_threads) {
std::ostringstream msg;
msg << "Need " << g_threads << " physical cores but only "
<< pin_targets.size() << " available (CPUs:";
for (int c : pin_targets) msg << " " << c;
msg << ")\n";
throw std::runtime_error(msg.str());
}
std::cerr << "Pinning " << g_threads << " threads to CPUs";
for (size_t i = 0; i < g_threads; ++i)
std::cerr << " " << pin_targets[i];
std::cerr << "\n";
}
}
#else
std::vector<int> pin_targets; // empty = no pinning on non-Linux
#endif

pcg64_oneseq_once_insecure base_rng(g_seed);

const size_t initial_capacity = 1UL << g_initial_capacity;
Expand Down Expand Up @@ -315,8 +441,13 @@ int main(int argc, char **argv) {
auto start_time = std::chrono::high_resolution_clock::now();
for (size_t i = 0; i < g_threads; ++i) {
mix_threads[i] = std::thread(
mix, std::ref(tbl), num_ops_per_thread, std::ref(op_mix),
std::ref(keys[i]), prefill_elems_per_thread, std::ref(samples[i]));
[&, i]() {
if (!pin_targets.empty()) {
pin_thread(pin_targets[i]);
}
mix(tbl, num_ops_per_thread, op_mix,
keys[i], prefill_elems_per_thread, samples[i]);
});
}
for (auto &t : mix_threads) {
t.join();
Expand Down