Thread API

      在〈Thread API〉中有 1 則留言

java.util.concurrent package

在執行緒的章節中, 用synchronized來鎖定資源以避免其他執行緒相互競爭. 但有著效能問題及死結的風險. 所以Java SE 7 提供了concurrent這套件, 讓開發者可以輕易的寫出穩固的多執行緒程式.

java.util.concurrent.atomic package

atomic是原子, 也就是不可分割的意思. atomic package包含lock-free thread-safe的方法

AtomicInteger 為atomic實作的一個物件, 可確保裏面的動作在完成前, 不會被中斷. 常見的動作有
compareAndSet(5, 42);//若aotmicInteger的數字是5, 則會變成42
getAndAdd(10);//先取出, 再加10
addAndGet(10)://先加10, 再取出
getAndIncrement() : 取出再加1
getAndDecrement():取出再減1

public class AtomicTest extends Thread{
    AtomicInteger atomicInteger=new AtomicInteger(10);
    public void run(){
        int a=atomicInteger.getAndAdd(10);
        int b=atomicInteger.addAndGet(10);
        int c=atomicInteger.decrementAndGet();
        atomicInteger.compareAndSet(20, 50);
        System.out.printf("a=%d, b=%d, c=%d, atomicInteger=%d\n", a, b, c, atomicInteger.get());
    }
    public static void main(String[] args) {
        new AtomicTest().start();
    }
}

volatile [ˋvɑlət!]

在變數前加上volatile修飾子, 如
volatile int a=10;

此即告訴JVM, 此變數是不穩定的, 所以當線程訪問時, 就會強迫從共用記憶体區重新取得該變數的值, 讓所有的線程看到的值都是同一個, 所以使用volatile會很耗費資源

只保証執行緒寫回記憶体這段是同步, 但不保証只有一個執行緒在存取這個值, 所以仍有其風險

java.util.concurrent.locks package

此為一framework, 與內建的synchronize作一個區隔. 可在執行緒間進行更為細部的互動, 更為彈性的設計.

Lock介面之下, 有ReentrantLock類別, ReentrantLock再衍生ReentrandReadWriteLock. re-entrant, 是重入的意思
如下是RetrantLock的例子

public class LockTest {
    public static void main(String[] args) {
        ExecutorService service=Executors.newCachedThreadPool();
        service.submit(new Lotto("Thomas"));
        service.submit(new Lotto("John"));
        service.shutdown();
    }
}
class Lotto implements Runnable{
    private String userName;
    private static final ReentrantLock lock=new ReentrantLock();
    public Lotto(String name){userName=name;}
    @Override
    public void run() {
        try {
            lock.lock();
            for (int i=0;i<5;i++){
                Thread.sleep(1000);
                System.out.printf("%s : 第%d組號碼 : %d\n", userName, i, (int)(Math.random()*46)+1);
            }
        } 
        catch (InterruptedException ex) {
            Logger.getLogger(Lotto.class.getName()).log(Level.SEVERE, null, ex);
        }
        finally{
            lock.unlock();
        }
    }
}

執行結果為
Thomas : 第0組號碼 : 43
Thomas : 第1組號碼 : 41
Thomas : 第2組號碼 : 39
Thomas : 第3組號碼 : 10
Thomas : 第4組號碼 : 2
John : 第0組號碼 : 10
John : 第1組號碼 : 36
John : 第2組號碼 : 35
John : 第3組號碼 : 20
John : 第4組號碼 : 1

特別注意 : ReentrantLock lock必需是static, 不然的話會變成物件變數, 每個物件都有自己的鎖, 就鎖不住了
另解鎖時, 需在finally區塊, 不然發生了例外, 就解不開了

ReentrantReadWriteLock 是高效的資源鎖定, 多了lock.readLock()及lock.writeLock()二個方法,  解決執行緒在大量操作與修改時的問題.
取得readLock.lock()時機 : 沒其他執行緒取得writeLock.
取得writeLock.lock()時機 : 沒其他執行緒取得readLock及writeLock

class MyRunnable implements Runnable{
    private final ReentrantReadWriteLock rw1=new ReentrantReadWriteLock();
    private int i=0;
    @Override
    public void run() {
        while (i<=100){
            rw1.writeLock().lock();
            i++;
            rw1.writeLock().unlock();                    
            System.out.println(Thread.currentThread().getId()+" : i : "+i);
        }
    }
}

Thread-Safe Collections

java.util 的collections並不是thread-safe. 習慣使用下面方式
synchronized block
封裝 synchronized , 如java.util.Collections.synchronizedList(List<T>)
使用java.util.concurrent集合
注意 : 此Collection是thread-safe, 但裏面的元素並不是thread-safe

java.util.concurrent.CyclicBarrier 回環柵欄

Cyclic [ˋsɪklɪk] :環式的  Barrier[ˋbærɪr] : 柵欄
利用await()的方式, 讓執行緒進入等待狀態, 又稱為柵欄狀態. 當進入柵欄狀態的執行緒數達到指定的數量時, 再同時開放所有的執行緒執行後續的動作

public class AtomicTest {
    public static void main(String[] args) {
        CyclicBarrier barrier=new CyclicBarrier(2);
        MyRunnable r=new MyRunnable(barrier);
        Thread t1=new Thread(r);
        t1.start();
        Thread t2=new Thread(r);
        t2.start();
    }
}
class MyRunnable implements Runnable{
    CyclicBarrier barrier;
    public MyRunnable(CyclicBarrier barrier){
        this.barrier=barrier;
    }
    @Override
    public void run() {
        try {
            System.out.println(Thread.currentThread().getId()+" : before wait");            
            barrier.await();
            System.out.println(Thread.currentThread().getId()+" : after wait");
        } catch (InterruptedException ex) {
            Logger.getLogger(MyRunnable.class.getName()).log(Level.SEVERE, null, ex);
        } catch (BrokenBarrierException ex) {
            Logger.getLogger(MyRunnable.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}

上述new CyclicBarrire(2) 表示要達到2 個執行緒進入柵欄狀態後, 才會開啟柵欄一同釋放

java.util.concurrent.Executor

執行者. Executor 是一個介面, 需實作execute(Runnable)方法, 然後要作的事全交給這個執行者去作. 其實在execute()裏要作的事就是 new thread(Runnable).start()

public class ExecutorTest {
    public static void main(String[] args) {
        MyExecutor e=new MyExecutor();
        e.execute(new TimeRunnable());
        e.execute(new TimeRunnable());
        e.execute(new TimeRunnable());
    }
}
class MyExecutor implements Executor{
    @Override
    public void execute(Runnable r) {
        new Thread(r).start();
    }
}
class TimeRunnable implements Runnable{
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+" : 目前時間"+ new Date());
    }
}

java.util.concurrent.Callable

要建立一個新的執行緒時, 通常都會先實作Runnable, 然後再把Runnable放入Thread內
現在, 除了Runnable, 還可以實作Callable, 然後放入FutureTask內, FutureTask再放入Thread內執行, 此時就可以用FutureTask取得傳回值

二者差異 :
public interface Runnable{
public abstract void run();
} <==無傳回值

public interface Callable<V>{
V call() throws Exception;
} <==有傳回值

public class CallableTest {
    public static void main(String[] args) {
        FutureTask<Integer> task=new FutureTask<>(new Lotto());
        new Thread(task).start();
        System.out.println("產生中.....主執行緒進入Blocked");
        try {
            System.out.println("主執行緒得到的值為 : "+task.get());
        } catch (InterruptedException ex) {
            Logger.getLogger(CallableTest.class.getName()).log(Level.SEVERE, null, ex);
        } catch (ExecutionException ex) {
            Logger.getLogger(CallableTest.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}
class Lotto implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        int n=(int)(Math.random()*46)+1;
        System.out.println("目前在Callable中, 數字為 : "+n);
        return n;
    }
}

java.util.concurrent.ExecutorService

ExecurtorService提供了管理執行緒池的功能. 此繼承了Executor介面, 本身也是個介面

ExecutorService可由Executors來建立, 分別有如下方法
Executors.newCachedThreadPool();
Executors.newFixedThreadPool(5); 使用5個執行緒
Executors.newSingleThreadExecutor();

ExecurtorService可用submit()啟動Runnable及Callable的任務, submit後的返回值為Future

public class ExecutorServiceTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService exec=Executors.newFixedThreadPool(5);
        Future future=exec.submit(new Lotto());
        System.out.println(future.get());
        exec.shutdown();//若無此行, 就會一直在停留狀態, 等待其他的指派
    }
} 
class Lotto implements Callable{ 
    @Override public Integer call() throws Exception { 
        int num=(int)(Math.random()*100)+1; 
        return num; 
    } 
}

java.util.concurrent.ScheduledExecutorService

排程器, 比如多久後開始執行, 執行幾次之類的. ScheduledExecutorService也是由Executors來建立
Executors.newScheduledThreadPool(5);
Executors.newSingleThreadScheduledExecutor();

ScheduledExecurtorService可用schedule()啟動Runnable及Callable的任務, 返回值為ScheduleFuture

public class ScheduledExecutorServiceTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ScheduledExecutorService exec=Executors.newScheduledThreadPool(2);
        System.out.println("二秒後開始執行.....");
        ScheduledFuture future=exec.schedule(new Lotto(), 2, TimeUnit.SECONDS);
        System.out.println(future.get());
        exec.shutdown();
    }
}

schedule()是單次排程, 若要多次排程, 可用二種方式
exec.scheduleWithFixedDelay(new Lotto(), 500, 1000, TimeUnit.MILLISECONDS); //500ms後開始執行, 每次執行完畢, 延遲1000ms再執行
exec.scheduleAtFixedRate(new Lotto(), 500, 1000, TimeUnit.MILLISECONDS); //500ms後開始執行, 並每隔1000ms執行一次

注意 : 這二個重複性排程, 只能執行Runnable, 不能執行Callable

Fork/Join framework

ForkJoinTask利用fork將任務拆解, 再用join將所有拆解掉的小單元匯總. ForkJoinTask實作了二個子類別
RecursiveTask : 有返回值
RecursiveAction : 無返回值

繼承RecursiveTask需實作compute方法

class MyTask extends RecursiveTask<T>{
    @Override
    protected <T> compute() {
        if (...<= Threshold){
            使用一般遞迴計算;
        }
        else{
            將任務拆開
            再將任務合併
        }
    }
}

ForkJoinTask再交由ForkJoinPool處理, 先設定處理池的cpu數, 再用invoke將task放入

class Fibonacci{ //一般計算法
    public int fibonacci(int num){
        if (num<=1)return num;
        else return fibonacci(num-1)+fibonacci(num-2);
    }
}
class FibonacciTask extends RecursiveTask{ //拆解任務
    private int num=0;
    private int sum=0;
    public FibonacciTask(int n){
        num=n;
    }
    @Override
    protected Integer compute() {
        if(num<=8)sum=new Fibonacci().fibonacci(num);
        else{
            FibonacciTask task1=new FibonacciTask(num-1);
            task1.fork();
            FibonacciTask task2=new FibonacciTask(num-2);
            task2.fork();
            sum=task2.join()+task1.join();
        }
        return sum;
    }
    public int getResult(){
        return sum;
    }
}
public class ForkJoinTest {
    public static void main(String[] args) {
        int num=45;
        long t1=System.nanoTime();
        int cpus=Runtime.getRuntime().availableProcessors();
        ForkJoinPool pool=new ForkJoinPool(cpus);
        FibonacciTask task=new FibonacciTask(num);
        pool.invoke(task);
        long t2=System.nanoTime();
        System.out.println("sum="+task.getResult());
        System.out.println("Time : "+(t2-t1)/1000000000.0f+"秒");
    }
}

取得cpu核心數

Runtime.getRuntime().availableProcessors()

invokeAll()/invokeAny()

ExecutorService可以直接用invokeAll/invokeAny將多個任務啟動, 這樣就不用一個一個用submit啟動了.
步驟如下 :
產生一個ArrayList : Collection<Callable<Integer>> list=new ArrayList<>();
將任務加入ArrayList : list.add(Callable務任)
啟動 : exec.invokeAll(list)

啟動後的傳回值為Future型態, 放在List中

    ExecutorService exec=Executors.newFixedThreadPool(5);
    Collection<Callable> list=new ArrayList<>();
    list.add(new Lotto());
    List<Future> futureList=exec.invokeAll(list);
    for (Future future:futureList){
        System.out.println(future.get());
    }
    exec.shutdown();

invokeAll : 所有任務都結束, 才會有返回值
invokeAny : 只要任何一個任務結束, 就返回結果

1 thought on “Thread API

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *