java - Custom blocking queue locking issue -


i trying custom implementation of blocking queue fixed length array of byte arrays. not removing polled elements, therefore adjusted put method return byte array can written directly (producer thread uses mappedbytebuffer write directly byte array). added "commitput()" method increase counters , set "lengths" arrays. (if multiple threads writing concurrency problems, know 1 thread writing).

below have. works if debug through step step, if "run" looks encounters locking problems. copied, stripped down , adjusted arrayblockingqueue code. can better knowledge please @ class , tell me doing wrong, or how better (like write directy buffer , set lengths array , counters @ same step)?

public class bytearrayblockingqueue {      private final int[] lens; // array valid lengths     private final byte[][] items; // array of byte arrays      private int takeindex = 0;     private int putindex = 0;     private int count = 0;      public volatile int polledlen = 0; // lenght of last polled byte array      private final reentrantlock lock;     private final condition notempty;     private final condition notfull;      final int inc(int i) {         return (++i == items.length)? 0 : i;     }      public bytearrayblockingqueue(int capacity, int size, boolean fair) {         if (capacity <= 0)             throw new illegalargumentexception();         this.items = new byte[capacity][size];         this.lens = new int[capacity];         lock = new reentrantlock(fair);         notempty = lock.newcondition();         notfull  = lock.newcondition();     }      public byte[] put() throws interruptedexception {         final byte[][] items = this.items;         final reentrantlock lock = this.lock;         lock.lockinterruptibly();         try {             try {                 while (count == items.length)                     notfull.await();              } catch (interruptedexception ie) {                 notfull.signal(); // propagate non-interrupted thread                 throw ie;             }             //insert(e, len);             return items[putindex];         } {             lock.unlock();         }     }      public void commitput(int lenbuf) throws interruptedexception {         final reentrantlock lock = this.lock;         lock.lockinterruptibly();         try {             lens[putindex] = lenbuf;             putindex = inc(putindex);             ++count;             notempty.signal();         } {             lock.unlock();         }     }      public byte[] poll() {         final reentrantlock lock = this.lock;         lock.lock();         try {             if (count == 0)                 return null;             final byte[][] items = this.items;             final int[] lens = this.lens;             byte[] e = items[takeindex];             this.polledlen = lens[takeindex];             //items[takeindex] = null;             takeindex = inc(takeindex);             --count;             notfull.signal();             return e;          } {             lock.unlock();         }     } } 

if queue wraps around, possible byte arrays reused , overwritten before having been read consumers. in short, you'd need have commitget method make sure producers wait consumers before overwriting array new data.

however, suggestion rely on java.util.concurrent.blockingqueue having second queue return them consumers producer, , on java.nio.bytebyffer keep track of lengths. producer follows:

bytebuffer buffer = bufferqueue.poll(); // instead of put() buffer.put(source);                     // fill buffer source mappedbytebuffer buffer.flip();                          // set length amount written dataqueue.offer(buffer);                // instead of commitput() 

the consumer would:

bytebuffer buffer = dataqueue.poll();   // instead of get() buffer.get(...);                        // use data buffer.clear();                         // reset length                bufferqueue.offer(buffer);              // missing commitget() 

you should insert capacity elements in freequeue. note still copy data once source buffer temporary buffers in queues, original code did.

if don't want copy data (and sure source not change until consumers have read it!), better option use single blocking queue , insert buffers obtained bytebuffer.slice() source buffer each chunk of data handed down consumers. these garbage collected, should take less memory byte arrays themselves.


Comments

Popular posts from this blog

SPSS keyboard combination alters encoding -

Add new record to the table by click on the button in Microsoft Access -

CSS3 Transition to highlight new elements created in JQuery -