經(jīng)驗(yàn)分享:深入理解Java 并行編程
中國IT實(shí)驗(yàn)室 發(fā)表于:12年10月23日 14:42 [轉(zhuǎn)載] 中國IT實(shí)驗(yàn)室
概述
基于Thread類和Runnable接口編程很容易實(shí)現(xiàn)基本的并行編程任務(wù),但實(shí)現(xiàn)復(fù)雜的并行程序就會(huì)比較困難,因?yàn)橐敿?xì)考慮資源的同步訪問以及設(shè)計(jì)必要數(shù)據(jù)結(jié)構(gòu)支持同步訪問。從Java5后,Java平臺(tái)提供了java.util.concurrent包以及HighLevelAPI簡化并行編程模型,并提供了很多支持同步訪問數(shù)據(jù)結(jié)構(gòu)滿足編程需要。具體來講,該HighLevelAPI提供了以下內(nèi)容,在后續(xù)段落我將分別介紹。
Lock(鎖對(duì)象):相對(duì)與Thread模型的隱式的鎖對(duì)象,Lock提供了顯式的鎖操作從而簡化應(yīng)用程序。
Executors:提供了一組HighLevelAPI用來執(zhí)行和管理并行任務(wù)。
ConcurrentCollections(并行集合):包含了一組支持并行處理的數(shù)據(jù)結(jié)構(gòu),大大簡化了并行編程難度。
AtomicVariables(原子變量):減少了同步操作并且避免數(shù)據(jù)不一致問題。
Fork/Join框架:提供了進(jìn)程操作的支持。
Lock(鎖對(duì)象)
Lock在java.util.concurrent.locks包里面,是一個(gè)用來同步對(duì)共享資源的訪問的工具,其實(shí)際效果和Synchronize 比較像。Synchronize用起來很容易,但是Synchroize也有一些限制:當(dāng)Synchronize以一定順序獲得多個(gè)鎖的時(shí)候,必須以相反的順序釋放鎖,并且必須在與獲得鎖相同的代碼scope中釋放鎖。Lock則完全克服了上述限制:能夠以任意的順序獲得和釋放所,而且不需要收到代碼 scope的限制。使用Lock也很簡單,通常會(huì)使用以下代碼使用Lock:
方式一:
Lockl=…;
l.lock();
try{
//訪問共享資源
}finally{
l.unlock()
}
方式二:
Lockl=…;
if(l.tryLock()){
try{
//訪問共享資源
finally{
l.unlock();
}
else{
//沒能夠獲得鎖,做其它事情
}
ConcurrentCollections(并行集合):
java.util.concurrent包還提供了一系列的數(shù)據(jù)結(jié)構(gòu),使得并行程序的開發(fā)人員可以更少地關(guān)注同步操作。
BlockingQueue是一個(gè)支持先入先出(First-in-first-out,FIFO)的數(shù)據(jù)結(jié)構(gòu),對(duì)于隊(duì)列的push和poll等操作都采取阻塞的方式保證只有在獲得資源鎖的時(shí)候才可以執(zhí)行。
ConcurrentMap是一個(gè)跟Map類似的存儲(chǔ)key/value映射的數(shù)據(jù)結(jié)構(gòu),該數(shù)據(jù)結(jié)構(gòu)保證了Map操作的原子性。
ConcurrentNavigableMap是一個(gè)跟TreeMap類似的數(shù)據(jù)結(jié)構(gòu),允許基于key進(jìn)行樹狀遍歷。
下面給出BlockingQueue的基本用法:
BlockingQueuequeue=newSynchronousQueue();
queue.put(“helloworld”);
queue.poll();//返回helloworld
AtomicVariables(原子變量):
java.util.concurrent.atomic包提供了一些數(shù)據(jù)結(jié)構(gòu)用來支持對(duì)變量的原子操作,這些數(shù)據(jù)結(jié)構(gòu)都必須支持get和set方法從而支持對(duì)變量的讀寫。下面是一個(gè)例子來證明如何使用AtomicInteger的。
importjava.util.concurrent.atomic.AtomicInteger;
classAtomicCounter{
privateAtomicIntegerc=newAtomicInteger(0);
publicvoidincrement(){
c.incrementAndGet();
}
publicvoiddecrement(){
c.decrementAndGet();
}
publicintvalue(){
returnc.get();
}
}
Fork/Join框架
Fork/Join是自JDK7以后才有的對(duì)ExecutorService接口的一個(gè)實(shí)現(xiàn),用來支持在多核處理器上進(jìn)行多進(jìn)程操作。該框架適合于那些能夠被遞歸地分解成更小任務(wù)的任務(wù)。該框架最牛的地方在于采用了work-stealing算法,也就是空閑的worker可以從繁忙的worker那里偷(steal)任務(wù)過來執(zhí)行,從而充分利用計(jì)算資源使得任務(wù)執(zhí)行更快。
Fork/Join的核心是ForkJoinPool這個(gè)類,它是AbstractExecutorService的子類并實(shí)現(xiàn)了work-stealing算法,其被用來執(zhí)行ForkJoinTask.
ForkJoinTask實(shí)現(xiàn)類有RecursiveTask核ResursiveAction,通常實(shí)現(xiàn)的邏輯為:
if(當(dāng)前任務(wù)足夠小)
直接執(zhí)行任務(wù)
else
將當(dāng)前任務(wù)分為更小的兩個(gè)任務(wù)
執(zhí)行這兩個(gè)更小的任務(wù)并等待結(jié)果
下面的代碼示意對(duì)Fork/Join框架的使用:
public class MyListPrinter extends RecursiveAction {
private List task;
public MyListPrinter(List list)
task = list;
}
protected void printList() {
// 遍歷并打印task中的元素
}
protected void compute() {
if (task.length() <= 1000) {
printList();
} else{
List first_half_list = ……
List second_half_list = ……
MyListPrinter printer1 = new MyListPrinter(first_half_list);
MyListPrinter printer2 = new MyListPrinter(second_half_list);
invokeAll(printer1, printer2);
}
}
}
MyListPrinterprinter=newMyListPrinter(aHugeList);
ForkJoinPoolpool=newForkJoinPool();
pool.invoke(printer);
公司簡介 | 媒體優(yōu)勢 | 廣告服務(wù) | 客戶寄語 | DOIT歷程 | 誠聘英才 | 聯(lián)系我們 | 會(huì)員注冊(cè) | 訂閱中心
Copyright © 2013 DOIT Media, All rights Reserved. 北京楚科信息技術(shù)有限公司 版權(quán)所有.