My Hazelcast-based program can work in two modes: submitter and worker.
Submitter puts some POJO to the distributed map by some key, e.g.: hazelcastInstance.getMap(MAP_NAME).put(key, value);
Worker has an infinite loop (with Thread.sleep(1000L);
inside for timeout) which must process entities from map. For now I'm just printing the map size in this loop.
Now here's the problem. I start worker app. Then I start four submitters simultaneously (each adds an entry to the map and terminates it's work). But after all submitter apps are done, the worker app prints arbitrary size: sometimes it detects that only one entry was added, sometimes two, sometimes three (actually it never has seen all four entries).
What is the problem with this simple flow? I've read in Hazelcast docs that put()
method is synchronous, so it guarantees that after it returns, entry is placed to distributed map and is replicated. But it doesn't seem so in my experiment.
UPD (code)
Submitter:
public void submit(String key) { Object mySerializableObject = ... IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME); map.putIfAbsent(key, mySerializableObject, TASK_TTL_IN_HOURS, TimeUnit.HOURS); }
Worker:
public void process() { while (true) { IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME); System.out.println(map.size()); // Optional<Map.Entry<String, Object>> objectToProcess = getObjectToProcess(); // objectToProcess.ifPresent(objectToProcess-> processObject(id, objectToProcess)); try { Thread.sleep(PAUSE); } catch (InterruptedException e) { LOGGER.error(e.getMessage(), e); } } }
I commented out "processing" part itself, because now I'm just trying to get consistent state of the map. The code above prints different results each time, e.g.: "4, 3, 1, 1, 1, 1, 1..." (so it can even see 4 submitted tasks for a moment, but then they... disappear).
UPD (log)
Worker:
... tasksMap.size() = 0 tasksMap.size() = 0 tasksMap.size() = 0 tasksMap.size() = 0 tasksMap.size() = 1 tasksMap.size() = 2 tasksMap.size() = 2 tasksMap.size() = 2 tasksMap.size() = 2 tasksMap.size() = 2 ...
Submitter 1:
Before: tasksMap.size() = 0 After: tasksMap.size() = 1
Submitter 2:
Before: tasksMap.size() = 1 After: tasksMap.size() = 4
Submitter 3:
Before: tasksMap.size() = 1 After: tasksMap.size() = 2
Submitter 4:
Before: tasksMap.size() = 3 After: tasksMap.size() = 4
2 Answers
Answers 1
Well, I guess, I've figured out the problem. As far as I understand, distributed IMap
returned by hazelcastInstance.getMap
doesn't guarantee that data is replicated over all existing nodes in the cluster: some portions of data may be replicated to some nodes, another portion - to another nodes. That's why in my example some of submitted tasks were replicated not to worker node (which works perpetually), but to some other submitters, which terminate their execution after submission. So such entries were lost on submitters exit.
I solved this issue by replacing hazelcastInstance.getMap
to hazelcastInstance.getReplicatedMap
. This method returns ReplicatedMap
, which, AFAIK, guarantees that entries placed into it will be replicated to all nodes of the cluster. So now everything works fine in my system.
Answers 2
public interface IMap extends ConcurrentMap, BaseMap Concurrent, distributed, observable and queryable map. This class is not a general-purpose ConcurrentMap implementation! While this class implements the Map interface, it intentionally violates Map's general contract, which mandates the use of the equals method when comparing objects. Instead of the equals method, this implementation compares the serialized byte version of the objects.
Gotchas:
Methods, including but not limited to get, containsKey, containsValue, evict, remove, put, putIfAbsent, replace, lock, unlock, do not use hashCode and equals implementations of keys. Instead, they use hashCode and equals of binary (serialized) forms of the objects. The get method returns a clone of original values, so modifying the returned value does not change the actual value in the map. You should put the modified value back to make changes visible to all nodes. For additional info, see get(Object). Methods, including but not limited to keySet, values, entrySet, return a collection clone of the values. The collection is NOT backed by the map, so changes to the map are NOT reflected in the collection, and vice-versa. This class does not allow null to be used as a key or value.
0 comments:
Post a Comment