阻塞队列

suaxi
2021-02-14 / 0 评论 / 40 阅读 / 正在检测是否收录...

阻塞队列

写:如果队列满了,就必须阻塞等待

取:如果队列是空的,必须阻塞等待生产

1.阻塞队列.png


阻塞队列的四组API

抛出异常不抛异常,有返回值等待阻塞等待超时
添加addofferputoffer(,,)
移除removepolltakepoll(,)
检测队首元素elementpeek--

测试Demo:

package com.sw.blockingQueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * @Author suaxi
 * @Date 2021/2/14 19:57
 * 阻塞队列
 */
public class BlockingQueueTest {
    public static void main(String[] args) throws InterruptedException {
        test04();
    }

    //1.抛出异常
    public static void test01(){
        //队列初始大小为3
        ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);

        System.out.println(blockingQueue.add("A"));
        System.out.println(blockingQueue.add("B"));
        System.out.println(blockingQueue.add("C"));
        //java.lang.IllegalStateException: Queue full 队列已满
        //System.out.println(blockingQueue.add("D"));
        System.out.println(blockingQueue.element()); //检测队首元素
        System.out.println("-----分割线-----");

        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        //java.util.NoSuchElementException 没有目标元素
        System.out.println(blockingQueue.remove());
    }

    //2.不抛出异常,有返回值
    public static void test02(){
        //队列初始大小为3
        ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);

        System.out.println(blockingQueue.offer("A"));
        System.out.println(blockingQueue.offer("B"));
        System.out.println(blockingQueue.offer("C"));
        System.out.println(blockingQueue.offer("D")); //false
        System.out.println(blockingQueue.peek());
        System.out.println("-----分割线-----");

        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll()); //null
    }

    //3.等待,阻塞(一直阻塞)
    public static void test03() throws InterruptedException {
        //队列初始大小为3
        ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);

        blockingQueue.put("A");
        blockingQueue.put("B");
        blockingQueue.put("C");
        //blockingQueue.put("D"); //位置不够了,会一直阻塞
        System.out.println("-----分割线-----");

        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take()); //没有这个元素,会一直阻塞
    }

    //4.等待,阻塞(等待超时)
    public static void test04() throws InterruptedException {
        //队列初始大小为3
        ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(3);

        System.out.println(blockingQueue.offer("A"));
        System.out.println(blockingQueue.offer("B"));
        System.out.println(blockingQueue.offer("C"));
        System.out.println(blockingQueue.offer("D",2, TimeUnit.SECONDS)); //等待超过2秒退出
        System.out.println(blockingQueue.peek());
        System.out.println("-----分割线-----");

        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS)); //等待超过2秒退出
    }
}
SynchronousQueue 同步队列

没有容量,put一个元素之后,必须先take取出来,才能再put进去值

package com.sw.blockingQueue;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * @Author suaxi
 * @Date 2021/2/14 20:58
 * 同步队列
 */
public class SynchronousQueueTest {
    public static void main(String[] args) {
        //没有初始容量,put之后必须进行take,才能再进行put操作
        BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();

        //put
        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "进行put01操作");
                synchronousQueue.put("Hello");
                System.out.println(Thread.currentThread().getName() + "进行put02操作");
                synchronousQueue.put("nihao");
                System.out.println(Thread.currentThread().getName() + "进行put03操作");
                synchronousQueue.put("xiexie");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "A").start();


        //take
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + "--->" + synchronousQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + "--->" + synchronousQueue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName() + "--->" + synchronousQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "B").start();
        
        /*
        输出结果:
        A进行put01操作
        B--->Hello
        A进行put02操作
        B--->nihao
        A进行put03操作
        B--->xiexie
         */
    }
}
0

评论 (0)

取消