java.util.concurrent package
在執行緒的章節中, 用synchronized來鎖定資源以避免其他執行緒相互競爭. 但有著效能問題及死結的風險. 所以Java SE 7 提供了concurrent這套件, 讓開發者可以輕易的寫出穩固的多執行緒程式.
java.util.concurrent.atomic package
atomic是原子, 也就是不可分割的意思. atomic package包含lock-free thread-safe的方法
AtomicInteger 為atomic實作的一個物件, 可確保裏面的動作在完成前, 不會被中斷. 常見的動作有
compareAndSet(5, 42);//若aotmicInteger的數字是5, 則會變成42
getAndAdd(10);//先取出, 再加10
addAndGet(10)://先加10, 再取出
getAndIncrement() : 取出再加1
getAndDecrement():取出再減1
public class AtomicTest extends Thread{ AtomicInteger atomicInteger=new AtomicInteger(10); public void run(){ int a=atomicInteger.getAndAdd(10); int b=atomicInteger.addAndGet(10); int c=atomicInteger.decrementAndGet(); atomicInteger.compareAndSet(20, 50); System.out.printf("a=%d, b=%d, c=%d, atomicInteger=%d\n", a, b, c, atomicInteger.get()); } public static void main(String[] args) { new AtomicTest().start(); } }
volatile [ˋvɑlət!]
在變數前加上volatile修飾子, 如
volatile int a=10;
此即告訴JVM, 此變數是不穩定的, 所以當線程訪問時, 就會強迫從共用記憶体區重新取得該變數的值, 讓所有的線程看到的值都是同一個, 所以使用volatile會很耗費資源
只保証執行緒寫回記憶体這段是同步, 但不保証只有一個執行緒在存取這個值, 所以仍有其風險
java.util.concurrent.locks package
此為一framework, 與內建的synchronize作一個區隔. 可在執行緒間進行更為細部的互動, 更為彈性的設計.
Lock介面之下, 有ReentrantLock類別, ReentrantLock再衍生ReentrandReadWriteLock. re-entrant, 是重入的意思
如下是RetrantLock的例子
public class LockTest {
public static void main(String[] args) {
ExecutorService service=Executors.newCachedThreadPool();
service.submit(new Lotto("Thomas"));
service.submit(new Lotto("John"));
service.shutdown();
}
}
class Lotto implements Runnable{
private String userName;
private static final ReentrantLock lock=new ReentrantLock();
public Lotto(String name){userName=name;}
@Override
public void run() {
try {
lock.lock();
for (int i=0;i<5;i++){
Thread.sleep(1000);
System.out.printf("%s : 第%d組號碼 : %d\n", userName, i, (int)(Math.random()*46)+1);
}
}
catch (InterruptedException ex) {
Logger.getLogger(Lotto.class.getName()).log(Level.SEVERE, null, ex);
}
finally{
lock.unlock();
}
}
}
執行結果為
Thomas : 第0組號碼 : 43
Thomas : 第1組號碼 : 41
Thomas : 第2組號碼 : 39
Thomas : 第3組號碼 : 10
Thomas : 第4組號碼 : 2
John : 第0組號碼 : 10
John : 第1組號碼 : 36
John : 第2組號碼 : 35
John : 第3組號碼 : 20
John : 第4組號碼 : 1
特別注意 : ReentrantLock lock必需是static, 不然的話會變成物件變數, 每個物件都有自己的鎖, 就鎖不住了
另解鎖時, 需在finally區塊, 不然發生了例外, 就解不開了
ReentrantReadWriteLock 是高效的資源鎖定, 多了lock.readLock()及lock.writeLock()二個方法, 解決執行緒在大量操作與修改時的問題.
取得readLock.lock()時機 : 沒其他執行緒取得writeLock.
取得writeLock.lock()時機 : 沒其他執行緒取得readLock及writeLock
class MyRunnable implements Runnable{ private final ReentrantReadWriteLock rw1=new ReentrantReadWriteLock(); private int i=0; @Override public void run() { while (i<=100){ rw1.writeLock().lock(); i++; rw1.writeLock().unlock(); System.out.println(Thread.currentThread().getId()+" : i : "+i); } } }
Thread-Safe Collections
java.util 的collections並不是thread-safe. 習慣使用下面方式
synchronized block
封裝 synchronized , 如java.util.Collections.synchronizedList(List<T>)
使用java.util.concurrent集合
注意 : 此Collection是thread-safe, 但裏面的元素並不是thread-safe
java.util.concurrent.CyclicBarrier 回環柵欄
Cyclic [ˋsɪklɪk] :環式的 Barrier[ˋbærɪr] : 柵欄
利用await()的方式, 讓執行緒進入等待狀態, 又稱為柵欄狀態. 當進入柵欄狀態的執行緒數達到指定的數量時, 再同時開放所有的執行緒執行後續的動作
public class AtomicTest { public static void main(String[] args) { CyclicBarrier barrier=new CyclicBarrier(2); MyRunnable r=new MyRunnable(barrier); Thread t1=new Thread(r); t1.start(); Thread t2=new Thread(r); t2.start(); } } class MyRunnable implements Runnable{ CyclicBarrier barrier; public MyRunnable(CyclicBarrier barrier){ this.barrier=barrier; } @Override public void run() { try { System.out.println(Thread.currentThread().getId()+" : before wait"); barrier.await(); System.out.println(Thread.currentThread().getId()+" : after wait"); } catch (InterruptedException ex) { Logger.getLogger(MyRunnable.class.getName()).log(Level.SEVERE, null, ex); } catch (BrokenBarrierException ex) { Logger.getLogger(MyRunnable.class.getName()).log(Level.SEVERE, null, ex); } } }
上述new CyclicBarrire(2) 表示要達到2 個執行緒進入柵欄狀態後, 才會開啟柵欄一同釋放
java.util.concurrent.Executor
執行者. Executor 是一個介面, 需實作execute(Runnable)方法, 然後要作的事全交給這個執行者去作. 其實在execute()裏要作的事就是 new thread(Runnable).start()
public class ExecutorTest { public static void main(String[] args) { MyExecutor e=new MyExecutor(); e.execute(new TimeRunnable()); e.execute(new TimeRunnable()); e.execute(new TimeRunnable()); } } class MyExecutor implements Executor{ @Override public void execute(Runnable r) { new Thread(r).start(); } } class TimeRunnable implements Runnable{ @Override public void run() { System.out.println(Thread.currentThread().getName()+" : 目前時間"+ new Date()); } }
java.util.concurrent.Callable
要建立一個新的執行緒時, 通常都會先實作Runnable, 然後再把Runnable放入Thread內
現在, 除了Runnable, 還可以實作Callable, 然後放入FutureTask內, FutureTask再放入Thread內執行, 此時就可以用FutureTask取得傳回值
二者差異 :
public interface Runnable{
public abstract void run();
} <==無傳回值
public interface Callable<V>{
V call() throws Exception;
} <==有傳回值
public class CallableTest {
public static void main(String[] args) {
FutureTask<Integer> task=new FutureTask<>(new Lotto());
new Thread(task).start();
System.out.println("產生中.....主執行緒進入Blocked");
try {
System.out.println("主執行緒得到的值為 : "+task.get());
} catch (InterruptedException ex) {
Logger.getLogger(CallableTest.class.getName()).log(Level.SEVERE, null, ex);
} catch (ExecutionException ex) {
Logger.getLogger(CallableTest.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
class Lotto implements Callable<Integer>{
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
int n=(int)(Math.random()*46)+1;
System.out.println("目前在Callable中, 數字為 : "+n);
return n;
}
}
java.util.concurrent.ExecutorService
ExecurtorService提供了管理執行緒池的功能. 此繼承了Executor介面, 本身也是個介面
ExecutorService可由Executors來建立, 分別有如下方法
Executors.newCachedThreadPool();
Executors.newFixedThreadPool(5); 使用5個執行緒
Executors.newSingleThreadExecutor();
ExecurtorService可用submit()啟動Runnable及Callable的任務, submit後的返回值為Future
public class ExecutorServiceTest { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService exec=Executors.newFixedThreadPool(5); Future future=exec.submit(new Lotto()); System.out.println(future.get()); exec.shutdown();//若無此行, 就會一直在停留狀態, 等待其他的指派 } } class Lotto implements Callable{ @Override public Integer call() throws Exception { int num=(int)(Math.random()*100)+1; return num; } }
java.util.concurrent.ScheduledExecutorService
排程器, 比如多久後開始執行, 執行幾次之類的. ScheduledExecutorService也是由Executors來建立
Executors.newScheduledThreadPool(5);
Executors.newSingleThreadScheduledExecutor();
ScheduledExecurtorService可用schedule()啟動Runnable及Callable的任務, 返回值為ScheduleFuture
public class ScheduledExecutorServiceTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ScheduledExecutorService exec=Executors.newScheduledThreadPool(2);
System.out.println("二秒後開始執行.....");
ScheduledFuture future=exec.schedule(new Lotto(), 2, TimeUnit.SECONDS);
System.out.println(future.get());
exec.shutdown();
}
}
schedule()是單次排程, 若要多次排程, 可用二種方式
exec.scheduleWithFixedDelay(new Lotto(), 500, 1000, TimeUnit.MILLISECONDS); //500ms後開始執行, 每次執行完畢, 延遲1000ms再執行
exec.scheduleAtFixedRate(new Lotto(), 500, 1000, TimeUnit.MILLISECONDS); //500ms後開始執行, 並每隔1000ms執行一次
注意 : 這二個重複性排程, 只能執行Runnable, 不能執行Callable
Fork/Join framework
ForkJoinTask利用fork將任務拆解, 再用join將所有拆解掉的小單元匯總. ForkJoinTask實作了二個子類別
RecursiveTask : 有返回值
RecursiveAction : 無返回值
繼承RecursiveTask需實作compute方法
class MyTask extends RecursiveTask<T>{ @Override protected <T> compute() { if (...<= Threshold){ 使用一般遞迴計算; } else{ 將任務拆開 再將任務合併 } } }
ForkJoinTask再交由ForkJoinPool處理, 先設定處理池的cpu數, 再用invoke將task放入
class Fibonacci{ //一般計算法
public int fibonacci(int num){
if (num<=1)return num;
else return fibonacci(num-1)+fibonacci(num-2);
}
}
class FibonacciTask extends RecursiveTask{ //拆解任務
private int num=0;
private int sum=0;
public FibonacciTask(int n){
num=n;
}
@Override
protected Integer compute() {
if(num<=8)sum=new Fibonacci().fibonacci(num);
else{
FibonacciTask task1=new FibonacciTask(num-1);
task1.fork();
FibonacciTask task2=new FibonacciTask(num-2);
task2.fork();
sum=task2.join()+task1.join();
}
return sum;
}
public int getResult(){
return sum;
}
}
public class ForkJoinTest {
public static void main(String[] args) {
int num=45;
long t1=System.nanoTime();
int cpus=Runtime.getRuntime().availableProcessors();
ForkJoinPool pool=new ForkJoinPool(cpus);
FibonacciTask task=new FibonacciTask(num);
pool.invoke(task);
long t2=System.nanoTime();
System.out.println("sum="+task.getResult());
System.out.println("Time : "+(t2-t1)/1000000000.0f+"秒");
}
}
取得cpu核心數
Runtime.getRuntime().availableProcessors()
invokeAll()/invokeAny()
ExecutorService可以直接用invokeAll/invokeAny將多個任務啟動, 這樣就不用一個一個用submit啟動了.
步驟如下 :
產生一個ArrayList : Collection<Callable<Integer>> list=new ArrayList<>();
將任務加入ArrayList : list.add(Callable務任)
啟動 : exec.invokeAll(list)
啟動後的傳回值為Future型態, 放在List中
ExecutorService exec=Executors.newFixedThreadPool(5); Collection<Callable> list=new ArrayList<>(); list.add(new Lotto()); List<Future> futureList=exec.invokeAll(list); for (Future future:futureList){ System.out.println(future.get()); } exec.shutdown();
invokeAll : 所有任務都結束, 才會有返回值
invokeAny : 只要任何一個任務結束, 就返回結果
This blog inspires me to continue my own passions, appreciate it.