public class ZookeeperDistributedQueue<T extends Serializable> extends Object implements DistributedBlockingQueue<T>
Queue that is distributed (used by multiple JVMs or nodes) and managed by Zookeeper. This queue uses distributed locks, also backed by Zookeeper.
Please note that while this works quite well in certain circumstances, it is not recommended for high volume or high capacity queues,
nor for large queue messages. It's a relatively slow queue. Zookeeper allows you to create queues that can be used in a distributed way, but large queues can cause performance problems
in Zookeeper, and Zookeeper has a 1MB transport limit, so messages have to be smaller than that. Incidentally, initial performance tests showed queue operations (put / take) taking
approximately 25-30 milliseconds, or about 30-40 queue operations per second with a small payload (about 15 bytes).
This Queue works quite well for smaller, lower capacity / throughput queues where you need to read/write in a distributed way.
Try to limit the size of this queue to around 500 elements or fewer. Otherwise, consider a different queue implementation.DistributedBlockingQueue.DistributedQueueException| Modifier and Type | Field and Description |
|---|---|
static String |
DEFAULT_BASE_FOLDER
This is the base folder that all queues will be written to in Solr.
|
static int |
DEFAULT_MAX_QUEUE_SIZE |
static String |
QUEUE_CONFIGS_FOLDER |
static String |
QUEUE_ENTRY_FOLDER |
static String |
QUEUE_LOCKS_FOLDER |
protected Object |
QUEUE_MONITOR |
| Constructor and Description |
|---|
ZookeeperDistributedQueue(String queuePath,
org.apache.zookeeper.ZooKeeper zk)
Constructs a folder structure in Zookeeper for managing a queue and queue state..
|
ZookeeperDistributedQueue(String queuePath,
org.apache.zookeeper.ZooKeeper zk,
int maxQueueSize)
Constructs a folder structure in Zookeeper for managing a queue and queue state..
|
ZookeeperDistributedQueue(String queuePath,
org.apache.zookeeper.ZooKeeper zk,
int maxQueueSize,
boolean useDefaultBasePath,
List<org.apache.zookeeper.data.ACL> acls)
Constructs a folder structure in Zookeeper for managing a queue and queue state..
|
| Modifier and Type | Method and Description |
|---|---|
boolean |
add(T e) |
boolean |
addAll(Collection<? extends T> c) |
void |
clear() |
boolean |
contains(Object o) |
boolean |
containsAll(Collection<?> c) |
protected Object |
deserialize(byte[] bytes)
Mechanism to convert a byte array to an object.
|
protected void |
determineMaxCapacity() |
int |
drainTo(Collection<? super T> c) |
int |
drainTo(Collection<? super T> c,
int maxElements) |
T |
element() |
protected <R> R |
executeOperation(GenericOperation<R> operation)
Allows us to execute retry-able operations.
|
protected int |
geMaxCapacity() |
protected List<org.apache.zookeeper.data.ACL> |
getAcls() |
protected DistributedLock |
getConfigLock() |
protected String |
getConfigsFolder() |
protected String |
getLocksFolder() |
protected DistributedLock |
getQueueAccessLock() |
protected String |
getQueueEntryFolder() |
protected String |
getQueueEntryName() |
String |
getQueueFolderPath() |
protected int |
getRequestedMaxQueueSize() |
protected org.apache.zookeeper.ZooKeeper |
getZookeeperClient() |
protected DistributedLock |
initializeConfigLock() |
protected DistributedLock |
initializeQueueAccessLock() |
protected void |
intializeQueueFolders()
Creates the appropriate folder(s) in Zookeeper if they don't already exist.
|
boolean |
isEmpty() |
Iterator<T> |
iterator() |
boolean |
offer(T e) |
boolean |
offer(T e,
long timeout,
TimeUnit unit) |
T |
peek() |
T |
poll() |
T |
poll(long timeout,
TimeUnit unit) |
void |
put(T e) |
protected Map<String,T> |
readQueueInternal(int qty,
boolean remove,
long timeout) |
int |
remainingCapacity() |
T |
remove() |
boolean |
remove(Object o) |
boolean |
removeAll(Collection<?> c) |
boolean |
retainAll(Collection<?> c) |
protected void |
seMaxCapacity(int size) |
protected byte[] |
serialize(Serializable obj)
Mechanism to convert an object to a byte array.
|
int |
size() |
T |
take() |
Object[] |
toArray() |
<T> T[] |
toArray(T[] a) |
protected int |
writeToQueue(List<? extends T> entries,
long timeout) |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitequals, hashCode, parallelStream, removeIf, spliterator, streampublic static final String DEFAULT_BASE_FOLDER
public static final String QUEUE_ENTRY_FOLDER
public static final String QUEUE_LOCKS_FOLDER
public static final String QUEUE_CONFIGS_FOLDER
public static final int DEFAULT_MAX_QUEUE_SIZE
protected final Object QUEUE_MONITOR
public ZookeeperDistributedQueue(String queuePath, org.apache.zookeeper.ZooKeeper zk)
queuePath - zk - public ZookeeperDistributedQueue(String queuePath, org.apache.zookeeper.ZooKeeper zk, int maxQueueSize)
queuePath - zk - maxQueueSize - public ZookeeperDistributedQueue(String queuePath, org.apache.zookeeper.ZooKeeper zk, int maxQueueSize, boolean useDefaultBasePath, List<org.apache.zookeeper.data.ACL> acls)
queuePath - zk - maxQueueSize - useDefaultBasePath - acls - public T remove()
remove in interface Queue<T extends Serializable>public T poll()
poll in interface Queue<T extends Serializable>public T element()
element in interface Queue<T extends Serializable>public T peek()
peek in interface Queue<T extends Serializable>public int size()
size in interface Collection<T extends Serializable>public boolean isEmpty()
isEmpty in interface Collection<T extends Serializable>public Iterator<T> iterator()
iterator in interface Iterable<T extends Serializable>iterator in interface Collection<T extends Serializable>public Object[] toArray()
toArray in interface Collection<T extends Serializable>public <T> T[] toArray(T[] a)
toArray in interface Collection<T extends Serializable>public boolean containsAll(Collection<?> c)
containsAll in interface Collection<T extends Serializable>public boolean addAll(Collection<? extends T> c)
addAll in interface Collection<T extends Serializable>public boolean removeAll(Collection<?> c)
removeAll in interface Collection<T extends Serializable>public boolean retainAll(Collection<?> c)
retainAll in interface Collection<T extends Serializable>public void clear()
clear in interface Collection<T extends Serializable>public boolean add(T e)
add in interface Collection<T extends Serializable>add in interface BlockingQueue<T extends Serializable>add in interface Queue<T extends Serializable>public boolean offer(T e)
offer in interface BlockingQueue<T extends Serializable>offer in interface Queue<T extends Serializable>public void put(T e) throws InterruptedException
put in interface BlockingQueue<T extends Serializable>InterruptedExceptionpublic boolean offer(T e, long timeout, TimeUnit unit) throws InterruptedException
offer in interface BlockingQueue<T extends Serializable>InterruptedExceptionpublic T take() throws InterruptedException
take in interface BlockingQueue<T extends Serializable>InterruptedExceptionpublic T poll(long timeout, TimeUnit unit) throws InterruptedException
poll in interface BlockingQueue<T extends Serializable>InterruptedExceptionpublic int remainingCapacity()
remainingCapacity in interface BlockingQueue<T extends Serializable>public boolean remove(Object o)
remove in interface Collection<T extends Serializable>remove in interface BlockingQueue<T extends Serializable>public boolean contains(Object o)
contains in interface Collection<T extends Serializable>contains in interface BlockingQueue<T extends Serializable>public int drainTo(Collection<? super T> c)
drainTo in interface BlockingQueue<T extends Serializable>public int drainTo(Collection<? super T> c, int maxElements)
drainTo in interface BlockingQueue<T extends Serializable>protected int writeToQueue(List<? extends T> entries, long timeout) throws InterruptedException
InterruptedExceptionprotected Map<String,T> readQueueInternal(int qty, boolean remove, long timeout) throws InterruptedException
InterruptedExceptionprotected void intializeQueueFolders()
protected void determineMaxCapacity()
protected Object deserialize(byte[] bytes)
ObjectInputStream.bytes - protected byte[] serialize(Serializable obj)
ObjectOutputStream.obj - protected DistributedLock initializeQueueAccessLock()
protected DistributedLock initializeConfigLock()
protected int getRequestedMaxQueueSize()
public String getQueueFolderPath()
protected String getLocksFolder()
protected String getConfigsFolder()
protected String getQueueEntryFolder()
protected int geMaxCapacity()
protected void seMaxCapacity(int size)
protected DistributedLock getQueueAccessLock()
protected DistributedLock getConfigLock()
protected String getQueueEntryName()
protected org.apache.zookeeper.ZooKeeper getZookeeperClient()
protected List<org.apache.zookeeper.data.ACL> getAcls()
protected <R> R executeOperation(GenericOperation<R> operation) throws InterruptedException
operation - InterruptedExceptionCopyright © 2021. All rights reserved.