java - Custom blocking queue locking issue -
i trying custom implementation of blocking queue fixed length array of byte arrays. not removing polled elements, therefore adjusted put method return byte array can written directly (producer thread uses mappedbytebuffer write directly byte array). added "commitput()" method increase counters , set "lengths" arrays. (if multiple threads writing concurrency problems, know 1 thread writing).
below have. works if debug through step step, if "run" looks encounters locking problems. copied, stripped down , adjusted arrayblockingqueue code. can better knowledge please @ class , tell me doing wrong, or how better (like write directy buffer , set lengths array , counters @ same step)?
public class bytearrayblockingqueue { private final int[] lens; // array valid lengths private final byte[][] items; // array of byte arrays private int takeindex = 0; private int putindex = 0; private int count = 0; public volatile int polledlen = 0; // lenght of last polled byte array private final reentrantlock lock; private final condition notempty; private final condition notfull; final int inc(int i) { return (++i == items.length)? 0 : i; } public bytearrayblockingqueue(int capacity, int size, boolean fair) { if (capacity <= 0) throw new illegalargumentexception(); this.items = new byte[capacity][size]; this.lens = new int[capacity]; lock = new reentrantlock(fair); notempty = lock.newcondition(); notfull = lock.newcondition(); } public byte[] put() throws interruptedexception { final byte[][] items = this.items; final reentrantlock lock = this.lock; lock.lockinterruptibly(); try { try { while (count == items.length) notfull.await(); } catch (interruptedexception ie) { notfull.signal(); // propagate non-interrupted thread throw ie; } //insert(e, len); return items[putindex]; } { lock.unlock(); } } public void commitput(int lenbuf) throws interruptedexception { final reentrantlock lock = this.lock; lock.lockinterruptibly(); try { lens[putindex] = lenbuf; putindex = inc(putindex); ++count; notempty.signal(); } { lock.unlock(); } } public byte[] poll() { final reentrantlock lock = this.lock; lock.lock(); try { if (count == 0) return null; final byte[][] items = this.items; final int[] lens = this.lens; byte[] e = items[takeindex]; this.polledlen = lens[takeindex]; //items[takeindex] = null; takeindex = inc(takeindex); --count; notfull.signal(); return e; } { lock.unlock(); } } }
if queue wraps around, possible byte arrays reused , overwritten before having been read consumers. in short, you'd need have commitget
method make sure producers wait consumers before overwriting array new data.
however, suggestion rely on java.util.concurrent.blockingqueue having second queue return them consumers producer, , on java.nio.bytebyffer keep track of lengths. producer follows:
bytebuffer buffer = bufferqueue.poll(); // instead of put() buffer.put(source); // fill buffer source mappedbytebuffer buffer.flip(); // set length amount written dataqueue.offer(buffer); // instead of commitput()
the consumer would:
bytebuffer buffer = dataqueue.poll(); // instead of get() buffer.get(...); // use data buffer.clear(); // reset length bufferqueue.offer(buffer); // missing commitget()
you should insert capacity
elements in freequeue
. note still copy data once source
buffer temporary buffers in queues, original code did.
if don't want copy data (and sure source not change until consumers have read it!), better option use single blocking queue , insert buffers obtained bytebuffer.slice() source buffer each chunk of data handed down consumers. these garbage collected, should take less memory byte arrays themselves.
Comments
Post a Comment