In-memory software design 2025 ? - from 40GB/s to 55GB/s
Posted on Sunday, 16 February 2025
In-memory software design in 2025 - from 40GB/s to 55GB/s
In the last blog, we looked at techniques for improving when we were doing partial sums and to reduce the scanned trades we used in-memory indexes using sets.
But what if we simply want to go faster for the complete aggregation?
Up to now, we've been programming for the programmer making the program easy to write and understand, but what if we make the program easy to run for the cpu?
How do we do that? By improving memory locality and structures.
Memory locality
The initial implementation uses a std::vector<Trade> trades;
with each trade maintaining a std::vector<DailyDelivery> dailyDeliveries;
that contains the two vectors of power and value std::array<int, 100> power; std::array<int, 100> value;
.
While vector
does a great job at trying to get the memory allocated to it to be continuous in cpp, when you nest vectors in vectors and you allocate the sub-vectors, you are creating fragmented memory. This means the CPU's memory controller has to jump around a lot to get the next bloc and there's a high probability that adjacent will not be in the cache for the other cores of the CPU to be able to use.
The best architecture for locality will be always dependent on the access patters to the data. In our case here, we are going to optimize for maximum speed when doing complete aggregations.
This is done by creating an specific area for the Power and Value data of each delivery day is allocated to. Trades point to the beginning and end of this data. If you need to move inside this area, because you know exactly the start delivery date of the trade and the size of the delivery day vector (100 ints), you can immediately index into the large array.
Now, if you are dealing with a small amount of trades (~50'000 with an average of 120 days of delivery ~~ 4.9 GB of RAM for 600 million quarter hours), you can get away with a single coherent bloc of RAM for your delivery vector. But see later for a more practical production discussion:
// Each daily delivery contributes 100 ints for power and 100 ints for value.
std::vector<int> flatPower(totalDailyDeliveries * 100);
std::vector<int> flatValue(totalDailyDeliveries * 100);
Now with this information, we can directly index into the area based on the trades data:
#ifdef _OPENMP
#pragma omp parallel for reduction(+: all_totalPower, all_totalValue, \
traderX_totalPower, traderX_totalValue, \
traderX_area1_totalPower, traderX_area1_totalValue, \
traderX_area2_totalPower, traderX_area2_totalValue)
#endif
for (std::size_t i = 0; i < flatTrades.size(); ++i)
{
const auto& ft = flatTrades[i];
// Each trade's deliveries are stored in a contiguous block.
size_t startIndex = ft.deliveriesOffset * 100;
size_t numInts = ft.numDeliveries * 100;
long long sumPower = 0;
long long sumValue = 0;
for (size_t j = 0; j < numInts; ++j)
{
sumPower += flatPower[startIndex + j];
sumValue += flatValue[startIndex + j];
}
// (a) All trades
all_totalPower += sumPower;
all_totalValue += sumValue;
When running with this configuration, my time to aggregation on the laptop improves from 241ms to 178ms a 37% improvement in speed - which get us to 55 GB/s on a commodity laptop (using OpenMP of course).
Limits
But as you scale to larger and larger amounts of trades and delivery days, you will really find that the vector
At that point, we'll start running our own custom allocation, keeping our own block memory via a custom allocator. By creating blocks of 64 to 256MB that we allocate as needed and indexing our trade deliveries into it, we can then scale to more of the the entire memory of your machine.
Two good references on that are Custom Allocators in C++: High Performance Memory Management and CppCon 2017: Bob Steagall “How to Write a Custom Allocator”.
Next steps?
Going from a simple data structure to a memory adapted structure allowed us to go from 40GB/s (42% of laptop bandwidth) to 55GB/s (57% of laptop's 96 GB/s bandwidth).
If you still need more performance, you must further adapt your data structures and locality to the access patterns. Then start looking in depth at where the stalls are in the execution trace. There's other approaches such as looking at AVX instructions in more detail to find some perf, loop unrolling, and so on. Get a real cpp expert to consult on it.
Or just get a machine with more bandwidth! An example of that is the M2, M4 Max and Ultra family of chips from Apple, with memory bandwidth of over 800GB/s - over 8x what my laptop has. Or just run on a server, as noted in the first article, Azure has now a machine with 6'900 GB/s of bandwidth.
In-memory software design 2025 - Applied to energy
Posted on Saturday, 15 February 2025
In-memory software design
In the last blog, In-memory performance in 2025 we looked at a simple design for an energy aggregation system to aggregate 9.8 GB of 100'000 trades rolled out and saw we got about 40 GB/s performance.
I upped the numbers of trades to 500'000 trades (rolled out over quarter hours, with average trade length of 120 days in quarter hours) and validated that on a ~50 GB dataset, we are running at ~1200 ms for the complete aggregation, confirming a linear scaling of the compute time.
But this brings us to a point where on commodity hardware, we are running our aggregations over 1 second in duration. So how can we improve this? This time, instead of trying to brute force, let's bring in some other techniques from databases: Indexes!
Simple indexes for in-memory aggregations
For our use case, we are going to look at two meta-data properties on our trades, Trader and Delivery area, but the concept scales efficiently to large collections of meta-data, because we are dealing with such a small amounts of Trades (500k).
Building the index
We can generate at insertion a set for each meta-data, trader and delivery area.
// =================
// 2) Build Indexes
// =================
// We'll do single-attribute indexes: trader -> set of tradeIDs, area -> set of tradeIDs
// For large data, consider using std::vector<size_t> sorted, or some other structure.
std::unordered_map<std::string, std::unordered_set<size_t>> traderIndex;
traderIndex.reserve(10); // if you know approx how many traders you have to avoid resizing continuously
std::unordered_map<std::string, std::unordered_set<size_t>> areaIndex;
areaIndex.reserve(10);
I'm using an unordered set, but could also use an ordered set, this might be more efficient.
As keys, I'm using the string data of the meta-data on the trade - in production this would probably be integer keys, but for this use case is sufficient.
for (size_t i = 0; i < trades.size(); ++i)
{
const auto& t = trades[i];
traderIndex[t.trader].trades(i);
areaIndex[t.deliveryArea].trades(i);
}
When we insert a set of trades, we can add them to the set index.
As a linear pass, it's extremely efficient to build this index and cheap to keep it updated as we insert new trades.
Using the index
If we have a search query on a single attribute, we can now use the simple index and have directly the result.
But if we are doing a query on two (or more), we are going to take use the smallest index first and match with the largest index. The unordered_set gives an acceptable performance for bigger.find(id)
but you can probably do even better with a set structure that is optimized for intersections. You can benchmark using std::set_intersection
against my simple implementation if you are using sorted sets.
// ===================================
// 3) Use the indexes for filtering
// ===================================
// For instance, let's do a query: TraderX, Area2
// We'll find the intersection of (all trades for TraderX) and (all trades for Area2).
std::string queryTrader = "TraderX";
std::string queryArea = "Area2";
// Get sets from the index
// (handle the case if the key doesn't exist -> empty set)
auto itT = traderIndex.find(queryTrader);
auto itA = areaIndex.find(queryArea);
if (itT == traderIndex.end() || itA == areaIndex.end()) {
std::cout << "No trades found for " << queryTrader << " AND " << queryArea << "\n";
return 0;
}
const auto& traderSet = itT->second;
const auto& areaSet = itA->second;
// Intersection
// We'll create a vector of trade IDs that are in both sets
// For speed, we can iterate over the smaller set and check membership in the larger set.
const auto& smaller = (traderSet.size() < areaSet.size()) ? traderSet : areaSet;
const auto& bigger = (traderSet.size() < areaSet.size()) ? areaSet : traderSet;
std::vector<size_t> intersection;
intersection.reserve(smaller.size()); // a safe upper bound if all the smaller set is selected, to avoid resizing
for (auto id : smaller)
{
if (bigger.find(id) != bigger.end())
{
intersection.push_back(id);
}
}
This gives us the intersection and we iterate it to do the summation - again, here we are summing up over the entire year into a single value, but we could just as easily be doing daily, weekly or monthly sums.
// ====================================
// 4) Aggregate deliveries for matches
// ====================================
// Let's sum up total power / total value for the intersection set.
long long totalPower = 0;
long long totalValue = 0;
#ifdef _OPENMP
#pragma omp parallel for num_threads(4) reduction(+:totalPower, totalValue)
#endif
for (auto id : intersection)
{
const Trade& t = trades[id];
// sum up all deliveries
for (const auto& dd : t.dailyDeliveries)
{
for (int slot = 0; slot < 100; ++slot)
{
totalPower += dd.power[slot];
totalValue += dd.value[slot];
}
}
}
This addition here is single threaded, but you can also use OpenMP to accelerate it - this will require some tuning, you don't want to use too many threads for these smaller aggregations, the omp parallel for num_threads(4)
can be added to example limit to 4 threads. (note: I added the openMP in the code above).
Generally you get a 10x or more acceleration in single core, depending on how selective the indexes you are using are- In parallel, I'm getting 40-50x acceleration in multi-core with a num_threads to 4.
Why array<int,100> ?
In my last post, I used an array of 100 points in a day. I'm using it because it's simplest to have all days have the same "size" for memory alignment, therefore if I am calculating days in local time I have a 25h and 23h hour day once a year. I would generally prefer to work in UTC and have constant 24h days -- but for some reason humans prefer local time so it aligns with expectations.
Just be clear with the developer if you use an internal UTC or Local time representation. If using local make sure to:
- Properly sanitize your inputs, check trades fill the 96 quarter hours only for all days apart from the short (92) and long day (100) and zero fill the remainder.
- Keep summing on all 100 hours for summations, the 4% extra index length is not worth an if statement in the inner loop of the code - try to keep loops jump free.
How to deal with Canceled / Amended / Recalled trades ?
My first answer is don't! Let me explain: Normal trades should represent the 99.9% or 99.99% of your deals - unless there's something you haven't told me about the way you are trading!
We can design this by having a tradeStatus on the trade.
int TradeStatus:
- 0 : trade is valid
- 1 : trade is canceled
- 2 : trade is amended (ie. replaced by a new one)
- ... : any other status necessary
When a trade is canceled, we leave it in the trade vector, but simply set the tradeStatus to a non-zero value, and skip it with a test at the beginning of the aggregation.
#ifdef _OPENMP
#pragma omp parallel for num_threads(4) reduction(+:totalPower, totalValue)
#endif
for (auto id : intersection)
{
const Trade& t = trades[id];
if(t.tradeStatus != 0) continue; // skip canceled/amended trades
If the trade is amended, same thing, we add a new trade to our list of trades and set the previous one to amended status. Generally, the this is not using up much memory. If it ever becomes a problem, we could:
- have a "trade compression" which removes all non-zero trade status from the vector.
- flush the entire trade vector and reload the whole set.
Depending on your implementation, the flushing and reloading might be just as fast - not every program needs to stay resident in memory all the time.
Snapshot state to disk for recovery or storage - Fork() as in Redis
If we want to take snapshots of the state, we can get inspired from Redis' famous fork() snapshotting technique.
Use this when needing to snapshot a large data structure in RAM to disk (serialize the entire state), without blocking our main process from accepting new trades for the entire duration of the write.
How fork() helps
On Linux, calling fork() creates a child process that initially shares the same physical memory pages as the parent.
Copy-on-write (CoW): If either the parent or the child writes to a page after the fork, the kernel duplicates that page so each process sees consistent data.
The child process can serialize the in-memory data (in a consistent state from the moment of forking) to disk, while the parent continues to run to accept new trades. New trades arriving in the parent process after the fork will not affect the child’s view of memory. The child effectively sees a snapshot as of the fork().
You want the data structure to be in a consistent state at the instant of fork(). A brief lock (or pause writes) just before the fork() is triggered, ensuring no partial updates. Immediately after fork() returns, you can unlock, letting the parent continue. Meanwhile, the child proceeds to write out the data.
We can store an atomic counter value in the program that represents the last tradeid inserted or a state version. This gives you a “version” or “stamp” number for the dataset.
I won't put the full code for that here, since the design is a little more involved, but the basics are:
static std::atomic<long> g_version{0}; //snapshot version id
static std::mutex g_tradeMutex; // protect g_trades from concurrent modification - lock on write to
// ---------------------------------------------------
// fork() to create a child that writes the snapshot
// ---------------------------------------------------
int snapshotNow(const char* filename) {
// 1) Acquire short lock to ensure no partial updates in progress
g_tradeMutex.lock();
long snapVer = g_version.load(std::memory_order_relaxed);
// 2) Fork
pid_t pid = fork();
if(pid < 0) {
// error
std::cerr << "fork() failed\n";
g_tradeMutex.unlock();
return -1;
}
if(pid == 0) {
// child
// We have a consistent view of memory as of the fork.
// release the lock in the child
g_tradeMutex.unlock();
// write the snapshot
writeSnapshotToDisk(filename, snapVer);
// exit child
_exit(0);
} else {
// parent
// release the lock and continue
g_tradeMutex.unlock();
std::cout << "[Parent] Snapshot child pid=" << pid
<< ", version=" << snapVer << "\n";
return 0;
}
}
In essence we have two processes continuing from the same command fork() return, each taking one branch.
Data I/O
To integrate your cpp aggregation software into the rest of your stack depends on the software running around it.
You can run the application as an on-demand aggregation, loading everything to memory, doing the aggregation and exiting - leaving the server to do something else - this can be worth it if you only do aggregations on-demand and can afford the load time of a second or two from your NVME storage.
You can keep the cpp program running either :
- exposing an http RestAPI (https://github.com/microsoft/cpprestsdk is a good library for that, I've used it before).
- having a GRPC endpoint for performance
- receiving data from a Kafka stream - I'm sure Confluent can give you a good example of that.
Summary
In-memory data aggregation using cpp is relatively easy to write and maintain. Cpp is no longer the terrible monster it was - auto pointers help and using std:: components makes everything simple. OpenMP is an easy win to add to compute or memory intensive sections.
Sidenotes
On Windows you can get everything you need to compile cpp by installing:
winget install LLVM.LLVM
On Linux (or wsl), you need to install the following:
sudo apt-get update
sudo apt-get install clang
sudo apt-get install libomp-dev
Then in both os, you can usually run a compilation on your source file (inmem_agg_omp.cpp) using:
clang++ -O3 -march=native -flto -ffast-math -fopenmp -o inmem_agg_omp inmem_agg_omp.cpp
Performance of in-memory in 2025
Posted on Thursday, 13 February 2025
Performance of in-memory software in 2025
Yesterday, at an Energy panel in Essen, I mentioned that some heavy calculations should be done in-memory. It is something that people have a tendency to dismiss because generally they are not aware of the capability and speed of modern CPUs and RAM. A 32GB dataset in memory can now be processed every second by a commodity cpu in your laptop.
Living in the Future, by the numbers is a great article on the progress we have had since 2004:
- CPU Compute is 1000x faster
- Web servers are 100x faster
- Ram is 16x to 750x larger
- SSD can do 10'000x more transactions per second.
You can also see this progression on Azure with the high-compute servers, Microsoft and AMD are packing so much more memory bandwidth in modern compute.
We are going to have 7 TB/s of memory bandwidth!
You can now run super-computer level problems on Azure!
But what does that all mean? What can we do even on a commodity laptop? I have a Latitude 9440 laptop on my desk here, with a 13th gen i7-1365U with 32 GB of RAM.
Energy Trade aggregation
Let's start with a small calculation from the world of Energy. I have 100'000 trades, these trades affect one or multiple quarter hours of one year (8'784 hours => 35'136 quarter hours).
Pulling out a little C++, completely unoptimized, how long does it take to aggregate them and how much RAM is used?
// A struct to hold the daily delivery arrays (power, value).
struct DailyDelivery
{
int dayOfYear; // 1..365
std::array<int, 100> power;
std::array<int, 100> value;
};
// A struct to hold the trade metadata.
struct Trade
{
int tradeId;
std::string trader; // e.g. "TraderX"
std::string deliveryArea; // e.g. "Area1", "Area2"
// You could store time points, but we'll just store day indexes for simplicity.
// Real code might store start_delivery, end_delivery as std::chrono::system_clock::time_point.
int startDay; // 1..365
int endDay; // 1..365
std::vector<DailyDelivery> dailyDeliveries;
};
The Daily Delivery structure represents a day of delivery, with 100 slots (for the 25h day => 100 quarter hours).
- I'm storing the delivery of power in kW as an int32, meaning in a single trade I can do –2'147'483'648 kW to 2'147'483'647 kW.
- Same thing for the value, we store the individual value of the MW in milli values (decimal shift 3), so each MW could be priced at -2'147'483.648 € to 2'147'483.647 €.
The Trade stores:
- metadata: Trader and DeliveryArea. We could add as many metadata elements as we need, but for simplicity in the demo, I only use this
- a dailyDeliveries vector containing the array of all days affected by the trade.
Now if we wanted to see what is the total sum of power of all trades, the sum of TraderX and the sum of TraderX's deals in Area1 and Area2, we can runn the aggregation over all the memory. This is completely straight forward code, no optimizations what so ever.
// --------------------------------------
// 2) Run the aggregations (measure time)
// --------------------------------------
// The aggregates we want:
// (a) All Trader total (yearly - all zones)
// (b) Trader X total
// (c) Trader X / Area1
// (d) Trader X / Area2
//
// We'll assume we only have TraderX, so "All Trader" == "TraderX" in this simple version.
//
// But let's keep it generic. If you had multiple traders, you'd do some checks:
//
// For Weighted Average Cost = total_value / total_power (where total_power != 0)
// We'll measure the time for a single pass that gathers all these sums.
using Clock = std::chrono::steady_clock;
auto startTime = Clock::now();
long long all_totalPower = 0;
long long all_totalValue = 0;
long long traderX_totalPower = 0;
long long traderX_totalValue = 0;
long long traderX_area1_totalPower = 0;
long long traderX_area1_totalValue = 0;
long long traderX_area2_totalPower = 0;
long long traderX_area2_totalValue = 0;
for(const auto& trade : trades)
{
// (a) "All Trader" sums:
// Summation for all trades, all areas, all days
// Because this example is all TraderX, you might have to adapt if you had multiple traders
for(const auto& dd : trade.dailyDeliveries)
{
for(int slot = 0; slot < 100; ++slot)
{
all_totalPower += dd.power[slot];
all_totalValue += dd.value[slot];
}
}
// (b) If trade.trader == "TraderX"
if(trade.trader == "TraderX")
{
for(const auto& dd : trade.dailyDeliveries)
{
for(int slot = 0; slot < 100; ++slot)
{
traderX_totalPower += dd.power[slot];
traderX_totalValue += dd.value[slot];
}
}
// (c) and (d) by area
if(trade.deliveryArea == "Area1")
{
for(const auto& dd : trade.dailyDeliveries)
{
for(int slot = 0; slot < 100; ++slot)
{
traderX_area1_totalPower += dd.power[slot];
traderX_area1_totalValue += dd.value[slot];
}
}
}
else if(trade.deliveryArea == "Area2")
{
for(const auto& dd : trade.dailyDeliveries)
{
for(int slot = 0; slot < 100; ++slot)
{
traderX_area2_totalPower += dd.power[slot];
traderX_area2_totalValue += dd.value[slot];
}
}
}
}
}
auto endTime = Clock::now();
auto durationMs = std::chrono::duration_cast<std::chrono::milliseconds>(endTime - startTime).count();
How long do you think that takes on a commodity laptop? It's 9.8 GB of RAM to scan and fully aggregate. This is also running inside a VM on my Windows WSL instance, with other software running at the same time.
Time for in-memory aggregation: 907 ms
--- Memory usage statistics (approx) ---
Total bytes allocated (cumulative): 9822202136 bytes
Peak bytes allocated (concurrent): 9822202136 bytes
Current bytes allocated: 9822202136 bytes
Parallelization
Since we are running on a multicore CPU, we can use more than one core to do the aggregation. With OpenMP, it's extremely simple to setup some parallelization for the compute. At the beginning of the loop, we can define a parallel aggregation for reduction, meaning a final sum.
// Parallel over trades
// The 'reduction(+: variableList)' tells OpenMP to create private copies of
// these variables in each thread, accumulate them, and then combine them
// at the end.
#ifdef _OPENMP
#pragma omp parallel for reduction(+ : all_totalPower, all_totalValue, \
traderX_totalPower, traderX_totalValue, \
traderX_area1_totalPower, traderX_area1_totalValue, \
traderX_area2_totalPower, traderX_area2_totalValue)
#endif
for (std::size_t i = 0; i < trades.size(); ++i)
{
const auto& trade = trades[i];
With this, we improve the time to aggregate on the laptop to: 241ms. This means we can now do the complete aggregation on a laptop 4x per second - even on a completely unoptimized, simplistic memory structure for trades.
Time for in-memory aggregation: 241 ms
So when you are doing large numerical aggregations or calculations, ask yourself - can I do this in RAM? If so, you might be surprised at how quickly and efficiently you can do it with modern cpp.
Performance vs Memory bandwidth
This completely unoptimized implementation is running at :
Data size (GB) / time (s) = bandwidth (GB/s) => 9.8 GB / 0.241 s ≈ 40.7 GB/s
My laptop has approx ~96 GB/s of memory bandwidth. I calculate it as follows: LPDDR5 at 6000 MT/s, 8 bytes, dual channel = 6000 * 8 * 2 =~ 96 GB/s. My laptop has less than half bandwidth of the HC family on Azure (AMD EPYC™ 7003-series CPU) using CPUs that were released in 2021. Still impressive for my laptop, but it shows you could do much better.
If I really needed to optimize, I would re-organise the data structures to improve the memory aligment as an initial step. With that, we should get closer to the theoretical bandwidth of the machine.
Network with Tailscale
Posted on Monday, 13 January 2025
I updated my OpenVPN based network to use Tailscale instead in 2023 and it is game changing. I have used Tailscale ever since. I simply did not update my blog and network diagram.
With Tailscale, all my machines appear seamlessly on a single control pane and I can reach any of them from any device.
Zone Z
Z has a single fiber connection via Swisscom to internet.
Inventory
- DeepThread is an AMD Threadripper 1920x running Windows 10.
- Minis4 is the Beelink MiniS12 N95s running Ubuntu Server 24.10.
- NAS is an an older QNAP TS-269L
Zone N
N has two connections, a Starlink (v1 round) with only the powerbrick router and Sosh as a backup DSL provider (with an ADSL Router) both connected to a Ubiquity UDM-PRO-SE in Failover mode.
Inventory
- Minis1 is the Beelink MiniS12 N95s running Windows 11, enjoying it VESA mounted behind a screen in the office currently. I originally thought I would also put Ubuntu, but a windows machine is useful.
- Minis2 and Minis3 are the Beelink MiniS12 N95s running Ubuntu Server 24.10. Currently rackmounted with the UDM-PRO.
VPN
On the UDM-PRO, a VPN is configured with Ubiquity and I can use the iOS application WifiMan to access the network. It's really a backup of a backup solution to have Wifiman.
On Minis2 and minis4, a cloudflared docker is running, reaching up to Cloudflare and providing an Zero trust tunnel to expose several dockerized websites hosted on it.
I made a Suno song on how awesome it is.