Kush VThis writeup documents the transactional maps I’ve implemented over the last few weeks. It mainly...
This writeup documents the transactional maps I’ve implemented over the last few weeks. It mainly focuses on design, lifecycle, and implementation details. No benchmarks or perf numbers included.
NOTE: The original writeup can be found here
This is mainly inspired by the approach described in the transactional collections classes paper.
Core Data Structures:
ConcurrentMap<K, V> -> The underlying mapKeyToLockers<K> -> Maps each key to operation-specific GuardedTxSetsGuardedTxSet contains:
ReentrantReadWriteLock (split into read/write locks)tx.get(key) // Acquires READ lock immediately
tx.put(key, val) // Does NOT acquire any lock yet
For reads (GET/CONTAINS/SIZE):
GuardedTxSet for that key + operationFor writes (PUT/REMOVE):
For reads:
For writes:
identityHashCode() (prevents deadlock via normal ordering)PUT on new key -> acquire SIZE write lock (size will increase)PUT on existing key -> release CONTAINS write lock if held (size unchanged, CONTAINS always true)REMOVE on existing key -> acquire SIZE write lock (size will decrease)REMOVE on non-existent key -> release CONTAINS write lock if held (size unchanged, CONTAINS always false)GuardedTxSet
This is mainly inspired by the approach described in the transactional collections classes paper.
Core Data Structures:
ConcurrentMap<K, V> -> The underlying mapKeyToLockers<K> -> Maps each key to operation-specific GuardedTxSetsGuardedTxSet contains:
ReentrantLock (write lock)AtomicInteger reader countLatch with status (FREE/HELD), parent transaction, and CountDownLatch
tx.get(key) // Does NOT acquire lock, just registers
tx.put(key, val) // Does NOT acquire lock, just records
For reads:
GuardedTxSet
For writes:
For writes:
identityHashCode() for deadlock preventionPUT on new key -> acquire SIZE lockREMOVE on existing key -> acquire SIZE lockreaderCount > 0
For reads:
CountDownLatch (blocks until writer releases)readerCount
readerCount
readerCount
GuardedTxSet
readerCount)CountDownLatch
Core Data Structures:
ConcurrentMap<K, V> -> The underlying mapLockHolder<K, V> -> Per-key ReentrantLocks for writesMap<K, V>) -> local cache of writesFor writes:
identityHashCode()
For reads:
Before committing any operations:
// Snapshot current map size
tx.size = txMap.map.size();
// Pre-populate store buffer with current values for each operation with a key
storeBuf.put(key, txMap.map.get(key));
Then for each operation:
For writes (PUT/REMOVE):
PUT on new key: delta++
REMOVE on existing key: delta--
For reads (GET):
For CONTAINS:
For SIZE:
snapshotSize + delta
Core Data Structures:
AtomicReference<ConcurrentMap<K, V>> -> Ref to current map snapshotRetry loop:
A quite simple retry loop
do {
prev = txMap.map.get(); // Get current snapshot
underlyingMap = new HashMap<>(prev); // COPY entire map
// Apply all operations to local copy
txs.forEach(child -> child.tryValidate());
} while (hasWrite && !txMap.map.compareAndSet(prev, new ConcurrentHashMap<>(underlyingMap)));
tryValidate():
If CAS fails:
Instead of threads fighting for locks, they:
Uses one of four combiner types:
This is based on the approach described in this paper.
Data Structures:
Node<E, R> for each threadNode head)StatefulAction with:
Action<E, R> action -> This is a volatile barrierR resultAtomicInteger status (ACTIVE/INACTIVE)Implementation:
Enqueue:
if (node.isInactive()) {
node.setActive();
prevHead = head.getAndSet(node); // Atomic swap
node.setNext(prevHead);
}
Combine:
while (action != null) { // Spin until result ready
if (lock.tryLock()) {
scanCombineApply(); // Process all nodes
return result;
}
idle();
enqueueIfInactive(node); // Re-enqueue if removed
}
scanCombineApply:
node.statefulAction.apply(e)node.action = null (signals completion)threshold passes, scan and dequeue aged nodes:
(currentCount - node.age) >= threshold: unlink, set INACTIVEKeeping Track of a Livelock Bug I Fixed:
Threads could get stuck when the combiner removed their node before they noticed their action was applied. Fixed by forcing combiners to always re-enqueue and apply their own action rather than trusting others.
Key Difference: Reuses nodes instead of cleanup.
Data Structures:
Node<E, R>
AtomicInteger status (NOT_COMBINER/IS_COMBINER)AtomicReference<Node> tailImplementation:
// Reset current node
Node newTail = local.get();
newTail.status.set(NOT_COMBINER);
// Swap, my node becomes tail, get previous tail
curNode = tail.getAndSet(newTail);
local.set(curNode); // Previous tail becomes my node
curNode.action = action;
curNode.setNext(newTail);
// Wait to become combiner
while (curNode.status.get() == NOT_COMBINER) {
idle();
}
//Node chain should look something like this given 3 threads(T1, T2, T3 with the inital node T0), waiting for their result to be applied, assuming natural order
//TO -> T1 -> T2 -> T3
// T3 node will be marked as the combiner when Thread 1, finishes applying
// Now I'm the combiner, traverse and apply
for (node = curNode; i < threshold && node.next != null; node = node.next) {
node.apply(e);
node.action = null;
node.next = null;
node.status.lazySet(IS_COMBINER); // Make next last node in the queue combiner
}
Unique Property: Nodes cycle between threads, no cleanup needed.
Data Structures:
AtomicReferenceArray<Node> of size capacity
AtomicLong cellNum -> monotonically increasing cell assignmentThreadLocal<Node>
Implementation:
Enqueue:
cell = (cellNum.getAndIncrement() % capacity);
// CAS into array slot
while (!cells.compareAndSet(cell, null, node)) {
Thread.yield(); // Slot occupied, retry
}
Combine:
while (!node.statefulAction.isApplied) {
if (lock.tryLock()) {
scanCombineApply(); // Scan entire array
return result;
}
idle();
}
scanCombineApply:
for (i = 0; i < capacity; i++) {
Node curr = cells.get(i);
if (curr != null) {
curr.statefulAction.apply(e);
cells.setOpaque(i, null); // Clear and apply every slot to prevent a situation where waiters spin on an unapplied node forever
//Best when working with a fixed capacity of threads
}
}
Unique Property: Fixed memory, no pointer chasing, but potential false sharing.
lock.lock();
try {
return action.apply(e);
} finally {
lock.unlock();
}
Plain lock which is used as baseline to measure if flat combining actually helps.
Key Difference: Instead of one combiner for the entire map, one combiner per key + one for SIZE.
Data Structures:
ConcurrentMap<K, Combiner<Map<K, V>>> -> Maps each key to its own combinersizeCombiner for SIZE operationsImplementation:
// Group operations by key
for (key, operations) in keyToFuture:
combiner = getCombiner(key); // Get or create combiner for this key
combiner.combine(_ -> {
for (operation in operations) {
result = operation.apply(map);
operation.complete(result);
}
});
// Separately handle SIZE operations
sizeCombiner.combine(_ -> { /* apply size ops */ });
Unique Properties:
Rather than blocking readers with locks, each key maintains a version chain, an ordered list of all historical values written to that key. Each version has a beginTs and endTs defining the epoch range in which it is visible. Readers find the version that overlaps their snapshot epoch without acquiring any locks. Writers append new versions and conflict only with other concurrent writers on the same key.
This is based on the approach described in the VLDB paper.
ConcurrentMap<K, VersionChain<V>> — maps each key to its version chainConcurrentMap<K, KeyStatus> — per-key write lock (CAS-based, not a real lock)EpochTracker — global epoch counter, tracks active transaction begin epochs for GCGCThread — background thread that prunes unreachable versionsAtomicInteger(size) — dirty global size countertBegin = epochTracker.currentEpoch(); // Snapshot epoch
The transaction records the current global epoch as its tBegin. This is the epoch from which it reads any version visible at tBegin is part of its snapshot. The epoch tracker also registers this tBegin so the GC knows the oldest epoch still in use.
For writes (PUT/REMOVE):
KeyStatus write lock for that key via CAStBegin still overlaps the latest version on the chain (late-arriving transaction check):
if (!(tBegin >= latest.beginTs() && tBegin < latest.endTs())) abort();
For reads (GET/CONTAINS):
KeyStatus is held by another transaction, if so, aborttBegin immediately:
seen = versionChain(key).findOverlap(tBegin);
At commit time, a tCommit epoch is assigned via epochTracker.newEpoch().Sequentially, each read operation then re-checks whether the version it saw at tBegin is still the overlapping version at tCommit:
Version overlapAtCommit = versionChain(key).findOverlap(tCommit);
if (seen != overlapAtCommit) abort(); // Someone wrote to this key between tBegin and tCommit
//A quick example
//tBegin = 100, tCommit = 105
//key = "A", versions = [(0,101,"old"), (102,INF,"new")] //INF = INFINITY
// tBegin should see version "old", while tCommit should see version "new"
This catches read-write conflicts, if any key you read was written by a concurrent transaction between your begin and commit, you abort.
beginTs = tCommit
endTs = tCommit (closing its visibility window)KeyStatus write locksepochTracker.leaveEpoch(tBegin) so GC knows this epoch might no longer be visibleEach key's history is stored in a VersionChain<V>. Two independent implementations exist.
ConcurrentLinkedDeque
findOverlap() does a descending linear scan, meaning we start from the tail of the deque, to find newer versions easier and cut traversal timeAtomicLong to avoid O(N) size() calls on the dequeConcurrentSkipListMap keyed by beginTs
findOverlap() uses floorEntry(tBegin), O(log N)Both implementations maintain a MinVisibleEpoch cache to short-circuit GC scans when no prunable versions exist, a minimal optimization which avoids a full traversal when nothing has changed.
The epoch tracker serves two purposes: assigning monotonically increasing commit epochs, and tracking the minimum tBegin of all active transactions so the GC knows which versions are safe to delete.
Three implementations exist:
ConcurrentHashMap<Long, AtomicLong> mapping epoch -> active transaction countcurrentEpoch() registers the transaction in the map and increments the counterleaveEpoch() decrements; removes the entry when count hits zerominVisibleEpoch() streams over the key set to find the minimumLong2LongOpenHashMap (using fast-util's Long2LongHashMap) maps thread ID -> current epochleaveEpoch() writes a sentinel value (-1) rather than removing the entryminVisibleEpoch() scans values, skipping sentinelsConcurrentHashMap<Long, long[]> mapping thread ID to a single-element long[]
Long2LongEpochTracker but avoids boxing via the long[] trick without fast-utils's hashmapVersion chains grow unboundedly without cleanup. The GC thread handles pruning old versions that no transaction can ever see again.
Design:
LinkedBlockingQueue) of cleanup requestsScheduledExecutorService (virtual thread) refreshes a cached minVisibleEpoch from an EpochTracker every 100msversionChain.size() % threshold == 0
Why cached epoch reads:
Reading minVisibleEpoch() on every write transaction commit was a hotspot. It involves scanning the epoch tracker under contention. Decoupling this into a scheduled read trades precision for significantly lower write path overhead. Versions may survive slightly longer than necessary though.
Pruning logic:
// A version is prunable if:
version.endTs < minVisibleEpoch && version != latest
The latest version is always preserved regardless of its timestamps, since new transactions may still need it. The MinVisibleEpoch cache per version chain short-circuits the scan entirely if no version has an endTs below the current GC epoch.
DefaultEpochTracker works correctly with any thread model but contends on computeIfAbsent() calls, which could kill perf if frequent. The thread keyed trackers (LongToArray, Long2Long) eliminate that contention entirely since each key is owned by exactly one thread| Implementation | Isolation Level | When Locks Acquired | Readers Block Writers? | Writers Block Readers? | Probably Best For |
|---|---|---|---|---|---|
| Optimistic | READ COMMITTED (per-key SERIALIZABLE) | Reads: eager, Writes: lazy | Yes | Yes | Balanced workloads with strong consistency needs |
| Pessimistic | READ COMMITTED (per-key SERIALIZABLE) | Both lazy | Yes | Yes | Similar to optimistic |
| Read Uncommitted | READ UNCOMMITTED | Writes only at validation | No | Yes | Write-heavy and weak consistency is OK |
| Copy-on-Write | READ COMMITTED | Never (uses CAS) | No | No | Read heavy or small map size |
| Flat Combined | SERIALIZABLE | N/A (combiners) | N/A | N/A | High contention, batch benefits |
| Segmented Flat Combined | SERIALIZABLE | N/A (per-key combiners) | N/A | N/A | Outperformed by a single flat combined map |
| MVCC | SNAPSHOT | Reads: eager Writes: eager | N/A | N/A | Best for read heavy scenarios |