How to make atomic updates with Triemap and Concurrenthashmap?

Asked

Viewed 88 times

5

In Java ConcurrentHashMap has an interesting property. Operations such as putIfAbsent, remove, replace, computeIfAbsent, computeIfPresent, compute and merge are atomic.

Example:

final Map<String, Integer> map = new ConcurrentHashMap<>();
IntStream.rangeClosed(1, 10000)
         .parallel()
         .forEach(i -> map.merge("key" + (i % 5 + 1), i, Integer::sum));
final long total = map.values().stream().mapToInt(Integer::intValue).sum(); // 50005000

I did not find any similar operation in the API of TrieMap:

val map = TrieMap[String, Int]().withDefaultValue(0)
(1 to 10000).par.foreach(i => map(s"key${i % 5 + 1}") += i)
val total = map.values.sum // valor aleatório, += não é uma operação atomica

It is clearly possible to synchronize operations manually as well as use constructions such as CAS, STM, etc.:

(1 to 10000).par.foreach(i => {
  val key = s"key${i % 5 + 1}".intern
  key.synchronized {
    map(key) += i
  }
})

Alternatively, it is possible to use ConcurrentHashMap from Java directly (without using asScala):

val map = new java.util.concurrent.ConcurrentHashMap[String, Int]()
(1 to 10000).par.foreach(i => map.merge(s"key${i % 5 + 1}", i, Integer.sum _))
val total = map.values().asScala.sum  

Is there any simpler / idiomatic way to ensure updates atomics in a concurrent.Map?

1 answer

3


Scala 2.12.x

For those who are still in version 2.12.x, follow a solution with Compare-and-swap that works properly:

val map = TrieMap[String, Int]()
(1 to 10000).par.foreach { i =>
  val key = s"key${i % 5 + 1}"
  map.putIfAbsent(key, 0)

  Iterator.continually(map(key)).find { oldValue =>
    map.replace(key, oldValue, oldValue + i)
  }
}
val total = map.values.sum

In the example above the TrieMap may be replaced by a concurrent.Map without prejudice to the solution:

import scala.collection.JavaConverters._

val map = new ConcurrentHashMap[String, Int]().asScala

Scala 2.13.x

That said, since I asked this question three years have passed and much has changed. The most current version of Scala at the moment I am writing this response is 2.13.4. From 2018 to here:

  • Parallel collections have been moved to their own module: scala-parallel-collections.

    This module now needs to be included as a project dependency:

    libraryDependencies += "org.scala-lang.modules" %% "scala-parallel-collections" % "1.0.0"
    

    And the use of the method par requires you to import implicit converters:

    import scala.collection.parallel.CollectionConverters._
    
    val parallelRange = (1 to 10000).par
    
  • Some operations that were not atomic became. For example the method: TrieMap.getOrElseUpdate (see Issue 7943).

  • Alternatives for Java 8 operations have been introduced. E.g., the counterpart of java.util.Map#Compute in Java is scala.collection.mutable.Map#updateWith.

In Scala 2.13.x the above code can be rewritten in a more idiomatic way:

val map = TrieMap[String, Int]()
(1 to 10000).par.foreach { i =>
  map.updateWith(s"key${i % 5 + 1}") {
    case Some(c) => Some(c + i)
    case None    => Some(i)
  }
}
val total = map.values.sum

In the above example we can also replace the TrieMap by a concurrent.Map, but the converters were migrated to another package:

import scala.jdk.CollectionConverters._

val map = new ConcurrentHashMap[String, Int]().asScala

Browser other questions tagged

You are not signed in. Login or sign up in order to post.