Class ZookeeperDistributedQueue<T extends Serializable>
- All Implemented Interfaces:
Iterable<T>,Collection<T>,BlockingQueue<T>,Queue<T>,DistributedBlockingQueue<T>
Queue that is distributed (used by multiple JVMs or nodes) and managed by Zookeeper. This queue uses distributed locks, also backed by Zookeeper.
Please note that while this works quite well in certain circumstances, it is not recommended for high volume or high capacity queues, nor for large queue messages. It's a relatively slow queue. Zookeeper allows you to create queues that can be used in a distributed way, but large queues can cause performance problems in Zookeeper, and Zookeeper has a 1MB transport limit, so messages have to be smaller than that. Incidentally, initial performance tests showed queue operations (put / take) taking approximately 25-30 milliseconds, or about 30-40 queue operations per second with a small payload (about 15 bytes).
This Queue works quite well for smaller, lower capacity / throughput queues where you need to read/write in a distributed way. Try to limit the size of this queue to around 500 elements or fewer. Otherwise, consider a different queue implementation.
- Author:
- Kelly Tisdell
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.broadleafcommerce.core.util.queue.DistributedBlockingQueue
DistributedBlockingQueue.DistributedQueueException -
Field Summary
Fields -
Constructor Summary
ConstructorsConstructorDescriptionZookeeperDistributedQueue(String queuePath, org.apache.zookeeper.ZooKeeper zk) Constructs a folder structure in Zookeeper for managing a queue and queue state..ZookeeperDistributedQueue(String queuePath, org.apache.zookeeper.ZooKeeper zk, int maxQueueSize) Constructs a folder structure in Zookeeper for managing a queue and queue state..ZookeeperDistributedQueue(String queuePath, org.apache.zookeeper.ZooKeeper zk, int maxQueueSize, boolean useDefaultBasePath, List<org.apache.zookeeper.data.ACL> acls) Constructs a folder structure in Zookeeper for managing a queue and queue state.. -
Method Summary
Modifier and TypeMethodDescriptionbooleanbooleanaddAll(Collection<? extends T> c) voidclear()booleanbooleancontainsAll(Collection<?> c) protected Objectdeserialize(byte[] bytes) Mechanism to convert a byte array to an object.protected voidintdrainTo(Collection<? super T> c) intdrainTo(Collection<? super T> c, int maxElements) element()protected <R> RexecuteOperation(GenericOperation<R> operation) Allows us to execute retry-able operations.protected intprotected List<org.apache.zookeeper.data.ACL>getAcls()protected DistributedLockprotected Stringprotected Stringprotected DistributedLockprotected Stringprotected Stringprotected intprotected org.apache.zookeeper.ZooKeeperprotected DistributedLockprotected DistributedLockprotected voidCreates the appropriate folder(s) in Zookeeper if they don't already exist.booleanisEmpty()iterator()booleanbooleanpeek()poll()voidreadQueueInternal(int qty, boolean remove, long timeout) intremove()booleanbooleanremoveAll(Collection<?> c) booleanretainAll(Collection<?> c) protected voidseMaxCapacity(int size) protected byte[]serialize(Serializable obj) Mechanism to convert an object to a byte array.intsize()take()Object[]toArray()<T> T[]toArray(T[] a) protected intwriteToQueue(List<? extends T> entries, long timeout) Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface java.util.Collection
equals, hashCode, parallelStream, removeIf, spliterator, stream, toArray
-
Field Details
-
DEFAULT_BASE_FOLDER
This is the base folder that all queues will be written to in Solr. The constructors require a lock path, which will be appended to this path.- See Also:
-
QUEUE_ENTRY_FOLDER
- See Also:
-
QUEUE_LOCKS_FOLDER
- See Also:
-
QUEUE_CONFIGS_FOLDER
- See Also:
-
DEFAULT_MAX_QUEUE_SIZE
public static final int DEFAULT_MAX_QUEUE_SIZE- See Also:
-
QUEUE_MONITOR
-
-
Constructor Details
-
ZookeeperDistributedQueue
Constructs a folder structure in Zookeeper for managing a queue and queue state.. The argument, queuePath, should start with a forward slash ('/') and should not end with a slash. This argument should not contain whitespaces or other special characters.The default max queue size will be 500.
- Parameters:
queuePath-zk-
-
ZookeeperDistributedQueue
public ZookeeperDistributedQueue(String queuePath, org.apache.zookeeper.ZooKeeper zk, int maxQueueSize) Constructs a folder structure in Zookeeper for managing a queue and queue state.. The argument, queuePath, should start with a forward slash ('/') and should not end with a slash. This argument should not contain whitespaces or other special characters.The default max queue size will be 500.
- Parameters:
queuePath-zk-maxQueueSize-
-
ZookeeperDistributedQueue
public ZookeeperDistributedQueue(String queuePath, org.apache.zookeeper.ZooKeeper zk, int maxQueueSize, boolean useDefaultBasePath, List<org.apache.zookeeper.data.ACL> acls) Constructs a folder structure in Zookeeper for managing a queue and queue state.. The argument, queuePath, should start with a forward slash ('/') and should not end with a slash. This argument should be alpha-numeric, not contain whitespaces or other special characters, and can contain forward slashes ('/') to delineate folders. If useDefaultBasePath is true, then /broadleaf/app/distributed-queues will be prepended to the queuePath. Otherwise, the queuePath will be used as it is provided.The argument, maxQueueSize, will be a hint. If another thread creates the queue structure in Zookeeper, then it will persist the maxQueueSize.
- Parameters:
queuePath-zk-maxQueueSize-useDefaultBasePath-acls-
-
-
Method Details
-
remove
- Specified by:
removein interfaceQueue<T extends Serializable>
-
poll
- Specified by:
pollin interfaceQueue<T extends Serializable>
-
element
- Specified by:
elementin interfaceQueue<T extends Serializable>
-
peek
- Specified by:
peekin interfaceQueue<T extends Serializable>
-
size
public int size()- Specified by:
sizein interfaceCollection<T extends Serializable>
-
isEmpty
public boolean isEmpty()- Specified by:
isEmptyin interfaceCollection<T extends Serializable>
-
iterator
- Specified by:
iteratorin interfaceCollection<T extends Serializable>- Specified by:
iteratorin interfaceIterable<T extends Serializable>
-
toArray
- Specified by:
toArrayin interfaceCollection<T extends Serializable>
-
toArray
public <T> T[] toArray(T[] a) - Specified by:
toArrayin interfaceCollection<T extends Serializable>
-
containsAll
- Specified by:
containsAllin interfaceCollection<T extends Serializable>
-
addAll
- Specified by:
addAllin interfaceCollection<T extends Serializable>
-
removeAll
- Specified by:
removeAllin interfaceCollection<T extends Serializable>
-
retainAll
- Specified by:
retainAllin interfaceCollection<T extends Serializable>
-
clear
public void clear()- Specified by:
clearin interfaceCollection<T extends Serializable>
-
add
- Specified by:
addin interfaceBlockingQueue<T extends Serializable>- Specified by:
addin interfaceCollection<T extends Serializable>- Specified by:
addin interfaceQueue<T extends Serializable>
-
offer
- Specified by:
offerin interfaceBlockingQueue<T extends Serializable>- Specified by:
offerin interfaceQueue<T extends Serializable>
-
put
- Specified by:
putin interfaceBlockingQueue<T extends Serializable>- Throws:
InterruptedException
-
offer
- Specified by:
offerin interfaceBlockingQueue<T extends Serializable>- Throws:
InterruptedException
-
take
- Specified by:
takein interfaceBlockingQueue<T extends Serializable>- Throws:
InterruptedException
-
poll
- Specified by:
pollin interfaceBlockingQueue<T extends Serializable>- Throws:
InterruptedException
-
remainingCapacity
public int remainingCapacity()- Specified by:
remainingCapacityin interfaceBlockingQueue<T extends Serializable>
-
remove
- Specified by:
removein interfaceBlockingQueue<T extends Serializable>- Specified by:
removein interfaceCollection<T extends Serializable>
-
contains
- Specified by:
containsin interfaceBlockingQueue<T extends Serializable>- Specified by:
containsin interfaceCollection<T extends Serializable>
-
drainTo
- Specified by:
drainToin interfaceBlockingQueue<T extends Serializable>
-
drainTo
- Specified by:
drainToin interfaceBlockingQueue<T extends Serializable>
-
writeToQueue
- Throws:
InterruptedException
-
readQueueInternal
protected Map<String,T> readQueueInternal(int qty, boolean remove, long timeout) throws InterruptedException - Throws:
InterruptedException
-
intializeQueueFolders
protected void intializeQueueFolders()Creates the appropriate folder(s) in Zookeeper if they don't already exist. -
determineMaxCapacity
protected void determineMaxCapacity() -
deserialize
Mechanism to convert a byte array to an object. Default implementation usesObjectInputStream.- Parameters:
bytes-- Returns:
-
serialize
Mechanism to convert an object to a byte array. Default implementation usesObjectOutputStream.- Parameters:
obj-- Returns:
-
initializeQueueAccessLock
-
initializeConfigLock
-
getRequestedMaxQueueSize
protected int getRequestedMaxQueueSize() -
getQueueFolderPath
-
getLocksFolder
-
getConfigsFolder
-
getQueueEntryFolder
-
geMaxCapacity
protected int geMaxCapacity() -
seMaxCapacity
protected void seMaxCapacity(int size) -
getQueueAccessLock
-
getConfigLock
-
getQueueEntryName
-
getZookeeperClient
protected org.apache.zookeeper.ZooKeeper getZookeeperClient() -
getAcls
-
executeOperation
Allows us to execute retry-able operations.- Parameters:
operation-- Returns:
- Throws:
InterruptedException
-