亚欧色一区w666天堂,色情一区二区三区免费看,少妇特黄A片一区二区三区,亚洲人成网站999久久久综合,国产av熟女一区二区三区

  • 發布文章
  • 消息中心
點贊
收藏
評論
分享
原創

Java DelayQueue實現分析

2023-09-21 02:06:53
15
0

DelayQueue是一個無界阻塞隊列,隊列中的元素比較特殊,必須是實現了Delayed接口的元素。Delayed接口是一個混合接口,它繼承了Comparator接口。它也具有PriorityBlockingQueue的特征,元素中優化級最高的元素是延遲時間最長的元素。隊列頭的元素是呆在隊列時間最長的元素,它只有到時期,才能出隊。即getDelay獲取到的時間小于等于0時,否則返回null元素。
DelayQueue的并發控制同樣使用ReentrantLock和它的Condition對象來實現。因為添加元素不阻塞,所以也只有一個Condition對象來實現等待/通知模式。DelayQueue同樣不允許使用null元素。

一、DelayQueue的結構

DelayQueue的結構和PriorityBlockingQueue基本一致,它持有一個PriorityQueue的引用
各種方法實現也委托給了PriorityQueue對象來實現。另外還有一個ReentrantLock和它的Condition對象;隊列中的元素是Delayed接口類型的元素。
Delayed接口定義:

1
2
3
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}

DelayQueue的主要成員變量:

1
2
3
private transient final ReentrantLock lock = new ReentrantLock();
private transient final Condition available = lock.newCondition();
private final PriorityQueue<E> q = new PriorityQueue<E>();

二、DelayQueue的主要方法實現

1、入隊操作

入隊操作因為是無界隊列,所有不會出現阻塞,put/offer都正常添加到隊列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//取出優先級最高的元素,不移除
E first = q.peek();
q.offer(e);
//如果獲取到優先級最高的元素為null,說明隊列為空
//或者當前添加的元素優先級比隊列中的最高優化級元素低,則通知等待隊列中的線程
if (first == null || e.compareTo(first) < 0)
available.signalAll();
return true;
} finally {
lock.unlock();
}
}

2、出隊操作
因為出隊必須是到期的元素,如果獲取不到元素,阻塞版本的take會阻塞等待到delay的時間到期,而超時版本的poll會返回null。具體的實現都大同小異,只看take方法的實現:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//響應中斷
lock.lockInterruptibly();
try {
//自旋
for (;;) {
//獲取隊列中優化級最高的元素
E first = q.peek();
if (first == null) {
//元素為空,則阻塞,釋放鎖,進入Condition等待隊列
available.await();
} else {
//元素不為空,則獲取元素的延遲時間
long delay = first.getDelay(TimeUnit.NANOSECONDS);
//延遲時間>0,繼續釋放鎖,進入有時間的等待
if (delay > 0) {
long tl = available.awaitNanos(delay);
} else {
//否則,出隊,并通知其它出隊線程
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;
}
}
}
} finally {
lock.unlock();
}
}

具體的實現流程如下:
1).獲取鎖,并加鎖;
2).開始自旋,利用peek方法獲取隊列的頭元素,如果獲取失敗,則釋放鎖,進入Condition的等待隊列;
3).如果能獲取到隊列頭的元素,則判斷到期時間;如果還未到期,則繼續在剩下的時間中await;
4).如果元素已經到時,則獲取成功,poll出隊,同時判斷隊列如果非空,則繼續通知阻塞的線程;
5).最后返回獲取到的元素;

三、使用DelayQueue

DelayQueue可以用于很多場景,比如緩存過期管理、會話過期管理、連接超時管理等。下面的例子是使用DelayQueue來管理緩存中過期的元素。

1、保存數據的鍵值對類:

1
2
3
4
5
6
7
8
9
public class Pair<K, V> {
public K first;
public V second;
public Pair(){}
public Pair(K first,V second){
this.first = first;
this.second = second;
}
}

2、實現Delayed接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class DelayItem<T> implements Delayed {

private static long NANO_ORIGIN = System.nanoTime();
final static long now(){
return System.nanoTime() - NANO_ORIGIN;
}
private static final AtomicLong sequencer = new AtomicLong(0);
private long sequenceNumber;
private final long time;
private final T item;

public DelayItem(T sumbmit,long timeout){
this.item = sumbmit;
this.time = now() + timeout;
this.sequenceNumber = sequencer.getAndIncrement();
}

public T getItem(){
return this.item;
}

@Override
public int compareTo(Delayed other) {
if(other == this)
return 0;
if(other instanceof DelayItem){
DelayItem x = (DelayItem)other;
long diff = time - x.time;
if(diff < 0)
return -1;
else if(diff > 0)
return 1;
else if(sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = this.getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS);
return d == 0 ? 0 : (d < 0) ? -1 : 1;
}

@Override
public long getDelay(TimeUnit unit) {
long d = unit.convert(time - now() , unit.NANOSECONDS);
return d;
}

}

3、緩存實現和測試

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class Cache<K,V> {
private final ConcurrentMap<K,V> cache = new ConcurrentHashMap<K,V>();


private final DelayQueue<DelayItem<Pair<K,V>>> q = new DelayQueue<DelayItem<Pair<K,V>>>();

private Thread daemonThread;

public Cache(){
Runnable checkTask = new Runnable(){
@Override
public void run(){
checkTimeout();
}
};
daemonThread = new Thread(checkTask);
daemonThread.setDaemon(true);
daemonThread.start();
}

private void checkTimeout(){
for(;;){
try {
 DelayItem<Pair<K,V>> item = q.take();
if(item != null){
 Pair<K,V> pair = item.getItem();
//刪除超時的對象
 cache.remove(pair.first,pair.second);
}
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}

public void put(K key,V value,long time,TimeUnit unit){
V oldValue = cache.put(key, value);
if(oldValue != null)
cache.remove(oldValue);
long nanoTime = TimeUnit.NANOSECONDS.convert(time,unit);
q.put(new DelayItem<Pair<K,V>>(new Pair<K,V>(key,value),nanoTime));
}

public V get(K key){
return cache.get(key);
}

public static void main(String[] args) throws InterruptedException {
Cache<Integer,String> cache = new Cache<Integer,String>();
cache.put(1, "aaaa", 3, TimeUnit.SECONDS);
Thread.sleep(2000);
System.out.println(cache.get(1));
Thread.sleep(2000);
System.out.println(cache.get(1));
}

}
0條評論
作者已關閉評論
chuoo
13文章數
0粉絲數
chuoo
13 文章 | 0 粉絲
原創

Java DelayQueue實現分析

2023-09-21 02:06:53
15
0

DelayQueue是一個無界阻塞隊列,隊列中的元素比較特殊,必須是實現了Delayed接口的元素。Delayed接口是一個混合接口,它繼承了Comparator接口。它也具有PriorityBlockingQueue的特征,元素中優化級最高的元素是延遲時間最長的元素。隊列頭的元素是呆在隊列時間最長的元素,它只有到時期,才能出隊。即getDelay獲取到的時間小于等于0時,否則返回null元素。
DelayQueue的并發控制同樣使用ReentrantLock和它的Condition對象來實現。因為添加元素不阻塞,所以也只有一個Condition對象來實現等待/通知模式。DelayQueue同樣不允許使用null元素。

一、DelayQueue的結構

DelayQueue的結構和PriorityBlockingQueue基本一致,它持有一個PriorityQueue的引用
各種方法實現也委托給了PriorityQueue對象來實現。另外還有一個ReentrantLock和它的Condition對象;隊列中的元素是Delayed接口類型的元素。
Delayed接口定義:

1
2
3
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}

DelayQueue的主要成員變量:

1
2
3
private transient final ReentrantLock lock = new ReentrantLock();
private transient final Condition available = lock.newCondition();
private final PriorityQueue<E> q = new PriorityQueue<E>();

二、DelayQueue的主要方法實現

1、入隊操作

入隊操作因為是無界隊列,所有不會出現阻塞,put/offer都正常添加到隊列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//取出優先級最高的元素,不移除
E first = q.peek();
q.offer(e);
//如果獲取到優先級最高的元素為null,說明隊列為空
//或者當前添加的元素優先級比隊列中的最高優化級元素低,則通知等待隊列中的線程
if (first == null || e.compareTo(first) < 0)
available.signalAll();
return true;
} finally {
lock.unlock();
}
}

2、出隊操作
因為出隊必須是到期的元素,如果獲取不到元素,阻塞版本的take會阻塞等待到delay的時間到期,而超時版本的poll會返回null。具體的實現都大同小異,只看take方法的實現:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//響應中斷
lock.lockInterruptibly();
try {
//自旋
for (;;) {
//獲取隊列中優化級最高的元素
E first = q.peek();
if (first == null) {
//元素為空,則阻塞,釋放鎖,進入Condition等待隊列
available.await();
} else {
//元素不為空,則獲取元素的延遲時間
long delay = first.getDelay(TimeUnit.NANOSECONDS);
//延遲時間>0,繼續釋放鎖,進入有時間的等待
if (delay > 0) {
long tl = available.awaitNanos(delay);
} else {
//否則,出隊,并通知其它出隊線程
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // wake up other takers
return x;
}
}
}
} finally {
lock.unlock();
}
}

具體的實現流程如下:
1).獲取鎖,并加鎖;
2).開始自旋,利用peek方法獲取隊列的頭元素,如果獲取失敗,則釋放鎖,進入Condition的等待隊列;
3).如果能獲取到隊列頭的元素,則判斷到期時間;如果還未到期,則繼續在剩下的時間中await;
4).如果元素已經到時,則獲取成功,poll出隊,同時判斷隊列如果非空,則繼續通知阻塞的線程;
5).最后返回獲取到的元素;

三、使用DelayQueue

DelayQueue可以用于很多場景,比如緩存過期管理、會話過期管理、連接超時管理等。下面的例子是使用DelayQueue來管理緩存中過期的元素。

1、保存數據的鍵值對類:

1
2
3
4
5
6
7
8
9
public class Pair<K, V> {
public K first;
public V second;
public Pair(){}
public Pair(K first,V second){
this.first = first;
this.second = second;
}
}

2、實現Delayed接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class DelayItem<T> implements Delayed {

private static long NANO_ORIGIN = System.nanoTime();
final static long now(){
return System.nanoTime() - NANO_ORIGIN;
}
private static final AtomicLong sequencer = new AtomicLong(0);
private long sequenceNumber;
private final long time;
private final T item;

public DelayItem(T sumbmit,long timeout){
this.item = sumbmit;
this.time = now() + timeout;
this.sequenceNumber = sequencer.getAndIncrement();
}

public T getItem(){
return this.item;
}

@Override
public int compareTo(Delayed other) {
if(other == this)
return 0;
if(other instanceof DelayItem){
DelayItem x = (DelayItem)other;
long diff = time - x.time;
if(diff < 0)
return -1;
else if(diff > 0)
return 1;
else if(sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = this.getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS);
return d == 0 ? 0 : (d < 0) ? -1 : 1;
}

@Override
public long getDelay(TimeUnit unit) {
long d = unit.convert(time - now() , unit.NANOSECONDS);
return d;
}

}

3、緩存實現和測試

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class Cache<K,V> {
private final ConcurrentMap<K,V> cache = new ConcurrentHashMap<K,V>();


private final DelayQueue<DelayItem<Pair<K,V>>> q = new DelayQueue<DelayItem<Pair<K,V>>>();

private Thread daemonThread;

public Cache(){
Runnable checkTask = new Runnable(){
@Override
public void run(){
checkTimeout();
}
};
daemonThread = new Thread(checkTask);
daemonThread.setDaemon(true);
daemonThread.start();
}

private void checkTimeout(){
for(;;){
try {
 DelayItem<Pair<K,V>> item = q.take();
if(item != null){
 Pair<K,V> pair = item.getItem();
//刪除超時的對象
 cache.remove(pair.first,pair.second);
}
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}

public void put(K key,V value,long time,TimeUnit unit){
V oldValue = cache.put(key, value);
if(oldValue != null)
cache.remove(oldValue);
long nanoTime = TimeUnit.NANOSECONDS.convert(time,unit);
q.put(new DelayItem<Pair<K,V>>(new Pair<K,V>(key,value),nanoTime));
}

public V get(K key){
return cache.get(key);
}

public static void main(String[] args) throws InterruptedException {
Cache<Integer,String> cache = new Cache<Integer,String>();
cache.put(1, "aaaa", 3, TimeUnit.SECONDS);
Thread.sleep(2000);
System.out.println(cache.get(1));
Thread.sleep(2000);
System.out.println(cache.get(1));
}

}
文章來自個人專欄
文章 | 訂閱
0條評論
作者已關閉評論
作者已關閉評論
1
0