`
yadsun
  • 浏览: 180229 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Java多线程--让主线程等待所有子线程执行完毕

阅读更多
    朋友让我帮忙写个程序从文本文档中导入数据到oracle数据库中,技术上没有什么难度,文档的格式都是固定的只要对应数据库中的字段解析就行了,关键在于性能。
    数据量很大百万条记录,因此考虑到要用多线程并发执行,在写的过程中又遇到问题,我想统计所有子进程执行完毕总共的耗时,在第一个子进程创建前记录当前时间用System.currentTimeMillis()在最后一个子进程结束后记录当前时间,两次一减得到的时间差即为总共的用时,代码如下
   
    long tStart = System.currentTimeMillis();
    System.out.println(Thread.currentThread().getName() + "开始");//打印开始标记
    for (int ii = 0; ii < threadNum; ii++) {//开threadNum个线程
    Runnable r = new Runnable(){
    @Override
    public void run(){
    System.out.println(Thread.currentThread().getName() + "开始");
    //做一些事情... ...
    System.out.println(Thread.currentThread().getName() + "结束.");
    }
    }
    Thread t = new Thread(r);
    t.start();
    }
    System.out.println(Thread.currentThread().getName() + "结束.");//打印结束标记
    long tEnd = System.currentTimeMillis();
    System.out.println("总共用时:"+ (tEnd - tStart) + "millions");
    

    结果是几乎在for循环结束的瞬间就执行了主线程打印总共用时的语句,原因是所有的子线程是并发执行的,它们运行时主线程也在运行,这就引出了一个问题即本文标题如何"让主线程等待所有子线程执行完毕"。试过在每个子线程开始后加上t.join(),结果是所有线程都顺序执行,这就失去了并发的意义了,显然不是我想要的。
    网上Google了很久也没有找到解决方案,难道就没有人遇到过这种需求吗?还是这个问题太简单了?无耐只得自己想办法了...
    最后我的解决办法是,自定义一个ImportThread类继承自java.lang.Thread,重载run()方法,用一个List属性保存所有产生的线程,这样只要判断这个List是否为空就知道还有没有子线程没有执行完了,类代码如下:
   
    public class ImportThread extends Thread {
    private static List<Thread> runningThreads = new ArrayList<Thread>();
    public ImportThread() {
    }
    @Override
    public void run() {
    regist(this);//线程开始时注册
    System.out.println(Thread.currentThread().getName() + "开始...");//打印开始标记
    //做一些事情... ...
    unRegist(this);//线程结束时取消注册
    System.out.println(Thread.currentThread().getName() + "结束.");//打印结束标记
    }
    public void regist(Thread t){
        synchronized(runningThreads){ 
            runningThreads.add(t);
        }
    }
    public void unRegist(Thread t){
        synchronized(runningThreads){ 
            runningThreads.remove(t);
        }
    }
    public static boolean hasThreadRunning() {
    return (runningThreads.size() > 0);//通过判断runningThreads是否为空就能知道是否还有线程未执行完
    }
    }
    

    主线程中代码:
   
    long tStart = System.currentTimeMillis();
    System.out.println(Thread.currentThread().getName() + "开始");//打印开始标记
    for (int ii = 0; ii < threadNum; ii++) {//开threadNum个线程
    Thread t = new ImportThread();
    t.start();
    }
    while(true){//等待所有子线程执行完
    if(!ImportThread.hasThreadRunning()){
    break;
    }
    Thread.sleep(500);
    }
    System.out.println(Thread.currentThread().getName() + "结束.");//打印结束标记
    long tEnd = System.currentTimeMillis();
    System.out.println("总共用时:"+ (tEnd - tStart) + "millions");
    

    打印的结果是:
            main开始
            Thread-1开始...
            Thread-5开始...
            Thread-0开始...
            Thread-2开始...
            Thread-3开始...
            Thread-4开始...
            Thread-5结束.
            Thread-4结束.
            Thread-2结束.
            Thread-0结束.
            Thread-3结束.
            Thread-1结束.
            main结束.
            总共用时:20860millions
    可以看到main线程是等所有子线程全部执行完后才开始执行的。
    ==================================================以下为第二次编辑===============================================
    上面的方法有一个隐患:如果线程1开始并且结束了,而其他线程还没有开始此时runningThreads的size也为0,主线程会以为所有线程都执行完了。解决办法是用一个非简单类型的计数器来取代List型的runningThreads,并且在线程创建之前就应该设定好计数器的值。
    MyCountDown类
   
    public class MyCountDown {
    private int count;
    public MyCountDown(int count){
    this.count = count;
    }
    public synchronized void countDown(){
    count--;
    }
    public synchronized boolean hasNext(){
    return (count > 0);
    }
    public int getCount() {
    return count;
    }
    public void setCount(int count) {
    this.count = count;
    }
    }
    

    ImportThread类
   
    public class ImportThread extends Thread {
    private MyCountDown c;
    public ImportThread(MyCountDown c) {
    this.c = c;
    }
    @Override
    public void run() {
    System.out.println(Thread.currentThread().getName() + "开始...");//打印开始标记
    //Do something
    c.countDown();//计时器减1
    System.out.println(Thread.currentThread().getName() + "结束. 还有" + c.getCount() + " 个线程");//打印结束标记
    }
    }
    

    主线程中
   
    System.out.println(Thread.currentThread().getName() + "开始");//打印开始标记
    MyCountDown c = new MyCountDown(threadNum);//初始化countDown
    for (int ii = 0; ii < threadNum; ii++) {//开threadNum个线程
    Thread t = new ImportThread(c);
    t.start();
    }
    while(true){//等待所有子线程执行完
    if(!c.hasNext()) break;
    }
    System.out.println(Thread.currentThread().getName() + "结束.");//打印结束标记
    

    打印结果:
            main开始
            Thread-2开始...
            Thread-1开始...
            Thread-0开始...
            Thread-3开始...
            Thread-5开始...
            Thread-4开始...
            Thread-5结束. 还有5 个线程
            Thread-1结束. 还有4 个线程
            Thread-4结束. 还有3 个线程
            Thread-2结束. 还有2 个线程
            Thread-3结束. 还有1 个线程
            Thread-0结束. 还有0 个线程
            main结束.
    更简单的方法:使用java.util.concurrent.CountDownLatch代替MyCountDown,用await()方法代替while(true){...}
    ImportThread类
   
    public class ImportThread extends Thread {
    private CountDownLatch threadsSignal;
    public ImportThread(CountDownLatch threadsSignal) {
    this.threadsSignal = threadsSignal;
    }
    @Override
    public void run() {
    System.out.println(Thread.currentThread().getName() + "开始...");
    //Do somethings
    threadsSignal.countDown();//线程结束时计数器减1
    System.out.println(Thread.currentThread().getName() + "结束. 还有" + threadsSignal.getCount() + " 个线程");
    }
    }
    

    主线程中
   
    CountDownLatch threadSignal = new CountDownLatch(threadNum);//初始化countDown
    for (int ii = 0; ii < threadNum; ii++) {//开threadNum个线程
    final Iterator<String> itt = it.get(ii);
    Thread t = new ImportThread(itt,sql,threadSignal);
    t.start();
    }
    threadSignal.await();//等待所有子线程执行完
    System.out.println(Thread.currentThread().getName() + "结束.");//打印结束标记
    

    打印结果:
            main开始
            Thread-1开始...
            Thread-0开始...
            Thread-2开始...
            Thread-3开始...
            Thread-4开始...
            Thread-5开始...
            Thread-0结束. 还有5 个线程
            Thread-1结束. 还有4 个线程
            Thread-4结束. 还有3 个线程
            Thread-2结束. 还有2 个线程
            Thread-5结束. 还有1 个线程
            Thread-3结束. 还有0 个线程
            main结束.
分享到:
评论
72 楼 kyson 2014-01-17  
lz活生生地弄了一个线程池啊。。
71 楼 xining 2013-07-03  
其实可以使用 ThreadGroup    调用ThreadGroup的activeCount方法 ==0 就是子线程全部结束, 主线程可以继续往下走
70 楼 guowusmiile 2013-05-13  
何必这么麻烦,其实只需要一个for循环,就可以解决LZ标题问题
List<Thread> tList = new ArrayList<Thread>();
for (int i = 0; i < tList.size(); i++) {
try {
tList.get(i).join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

只要在子线程结束的时候增加这些代码即可,谢谢!
69 楼 tonghaoqi__ 2013-05-10  
楼主写的挺好的 也挺清晰的 我不明白别人为啥看不明白,什么样的才算是明白呢? 唉不说那些了。
感觉能做下试验验证一下,呵呵 收走了,,
68 楼 avi9111 2012-06-19  
好像用

join就能做到了

t.join()

写了几年c#的线程,很多东西都很简单就做到了,可以说我连概念都没搞清楚就能用了,所见即所得,想到就能做到

不知道为什么现在用java的线程那么难用

不知道为什么现在看用java线程的程序员的文章都复杂的要命
67 楼 foart 2012-06-16  
看上去使用java.util.concurrent.CountDownLatch,最后一个办法挺简单清晰的,有空试试看。
66 楼 yunnysunny 2011-09-21  
yiyidog125 写道
干吗不用join?没理解错的话就是主线程等着子线程们结束吧

yiyidog125 写道
干吗不用join?没理解错的话就是主线程等着子线程们结束吧

貌似用join之后,线程之间就变成串行的了,不是并行的了。这样会严重影响效率。
65 楼 yiyidog125 2010-10-29  
yadsun 写道
nishizhutoua 写道
说句老实话,你确定你的这个需求要用多线程么?
读的方面,瓶颈在IO,用单线程,全速的读就是最快的了.除非你在读取一行数据后要对行数据进行格式化,并且这个格式化时间消耗要大于理论上的连续读两行文本时的CPU时间. 不过恐怕不容易吧,这两个时间差好几个级.
写的部分,瓶颈还是在IO,因为你要写到数据库中.
这么算两个线程就够了,生产者消费者模式即可.

假设有100万条记录,顺序执行的话光是遍历完这100万条记录就要很长时间了,现在开10个线程,每个线程只要插入10万条,是不是会快很多?


看你几块disk,只有一块的话单线顺序程读写就是最快了 磁盘是sequantial access才快的 多线程access pattern被打乱了会慢因为这相当于random access了
64 楼 yiyidog125 2010-10-29  
干吗不用join?没理解错的话就是主线程等着子线程们结束吧
63 楼 yadsun 2010-10-29  
cjmcn-sh 写道
import java.util.concurrent.*

然后做该做的事情
oracle导数据还要写程序?

我前面说了因为有xmltype类型的字段,所以想用java先解析一下再插入
62 楼 cjmcn-sh 2010-10-28  
import java.util.concurrent.*

然后做该做的事情
oracle导数据还要写程序?
61 楼 luobin23628 2010-10-27  
循环栅栏比计数器跟适合楼主的这个应用场景

public static void main(String[] args) throws InterruptedException {
		int threadNumber = 10;
		final long start = System.currentTimeMillis();
		CyclicBarrier barrier = new CyclicBarrier(threadNumber,new Runnable(){
			@Override
			public void run() {
				long end = System.currentTimeMillis();
				System.out.println("total time : "+(end-start)+"ms");
			}
		});
		
		 for (int i = 0; i < threadNumber; i++) {   
	            new ImportThread(barrier,i).start();   
	        }  
           
        System.out.println("main thread finished!!");   
    }  
	
	private static class ImportThread extends Thread{
		private final CyclicBarrier barrier;
		private final int threadID;
		public ImportThread(CyclicBarrier barrier,int threadID){
			this.barrier = barrier;	
			this.threadID = threadID;
		}
		
		public void run() { 
            try {
                Thread.sleep((long) (Math.random() * 10000));   
            } catch (InterruptedException e) {   
                e.printStackTrace();   
            }   
            System.out.println(String.format("threadID:[%s] finished!!", threadID));   
            try {
				barrier.await();
			} catch (InterruptedException e1) {
				e1.printStackTrace();
			} catch (BrokenBarrierException e1) {
				e1.printStackTrace();
			}
        }   
	}
60 楼 babecue 2010-10-27  
把所有子线程启动了之后再逐一调用join()不行吗?
59 楼 thorlst 2010-02-01  
ReadWriteLock rwl = new ReentrantReadWriteLock();

//执行子线程
for(....){
rwl.readLock().lock();
new Thread(new Runnable(){
  public void run(){
    try{
    ... 
   }finally{
    rwl.readLock().unlock();
   }
  }
}).start();
}

//等待子线程
try {
 rwl.writeLock().lock();
} finally {
 rwl.writeLock().unlock();
}
58 楼 yadsun 2010-01-31  
蓝皮鼠 写道
这个主题,如果是导入数据到Oracle,用SQL Loader最简单最快,如果在借题发挥到并发多任务,就是一个很大的话题了。

好像这篇帖子的内容相关
http://www.iteye.com/topic/405492

fuermos 写道
大数据量,用oralce sqlLoad就行了,都不需要java

因为表中有个字段是xmltype型的,插入时需调用oracle的sys.xmltype.createXml()来创建xmltype对象然后插入,这样服务器端压力会很大,所以想用java在客户端创建好XMLType对象,这样服务器端就少了创建xmltype对象的压力了
57 楼 zcq100 2010-01-30  
在main里面循环判断线程池里面线程是否运行完毕即可
56 楼 lyy3323 2010-01-29  
楼主你的问题其实很简单。。。
countdown 就行了。。。

如果你确定了你开启的线程数。
那么在主程序运行前。
CountDownLatch countdown = new CountDownLatch(10);//这里的10就是你的子线程数。


在每个子线程结束后,调用 countdown.countDown();

在主线程里启动子线程的方法后面添加。
countdown.await();//这里进行同步等待

等所有子线程结束后,执行 countdown.await()后面的代码
55 楼 fuermos 2010-01-29  
大数据量,用oralce sqlLoad就行了,都不需要java
54 楼 蓝皮鼠 2010-01-29  
这个主题,如果是导入数据到Oracle,用SQL Loader最简单最快,如果在借题发挥到并发多任务,就是一个很大的话题了。

好像这篇帖子的内容相关
http://www.iteye.com/topic/405492
53 楼 xuyan2680 2010-01-29  
多线程---批量线程同步模型
在一批线程处理程序中,有时必须等到所有线程全部运行完后,才能进行下一步任务处理,
可以采用如下方法解决,创建一个锁对象 ,该锁对象提供一个当前线程等待其他线程的方法。见代码:

/**  
 *   
 * 此类主要用来处理线程的同步屏蔽模型,比如,一批线程运行,必须在最后一个线程运行  
 * 完后,才能进行下一步的操作,那么就可以创建一个锁对象,锁对象提供一个线程等待其他线程  
 * 的方法,如果当前线程运行时,还有未运行的线程,则此线程wait,否则,此线程唤醒其他阻塞的  
 * 线程,进而最终完成线程的运行  
 * */  
public class LockObject {   
  
    private int totalThread = 0;   
    private int currentThread = 0;   
  
    public LockObject(int totalThread) {   
        this.totalThread = totalThread;   
        this.currentThread = 1;   
    }   
  
    public synchronized void waitForOtherThread() {   
        if (this.currentThread < this.totalThread) {   
            this.currentThread++;   
            try {   
                this.wait();   
            } catch (InterruptedException e) {   
                // TODO Auto-generated catch block   
                e.printStackTrace();   
            }   
        } else {   
            this.currentThread = 1;   
            notifyAll();   
        }   
    }   
  
    public int getTotalThread() {   
        return totalThread;   
    }   
  
    public void setTotalThread(int totalThread) {   
        this.totalThread = totalThread;   
    }   
  
    public int getCurrentThread() {   
        return currentThread;   
    }   
  
    public void setCurrentThread(int currentThread) {   
        this.currentThread = currentThread;   
    }   
}  

/**
 * 
 * 此类主要用来处理线程的同步屏蔽模型,比如,一批线程运行,必须在最后一个线程运行
 * 完后,才能进行下一步的操作,那么就可以创建一个锁对象,锁对象提供一个线程等待其他线程
 * 的方法,如果当前线程运行时,还有未运行的线程,则此线程wait,否则,此线程唤醒其他阻塞的
 * 线程,进而最终完成线程的运行
 * */
public class LockObject {

	private int totalThread = 0;
	private int currentThread = 0;

	public LockObject(int totalThread) {
		this.totalThread = totalThread;
		this.currentThread = 1;
	}

	public synchronized void waitForOtherThread() {
		if (this.currentThread < this.totalThread) {
			this.currentThread++;
			try {
				this.wait();
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		} else {
			this.currentThread = 1;
			notifyAll();
		}
	}

	public int getTotalThread() {
		return totalThread;
	}

	public void setTotalThread(int totalThread) {
		this.totalThread = totalThread;
	}

	public int getCurrentThread() {
		return currentThread;
	}

	public void setCurrentThread(int currentThread) {
		this.currentThread = currentThread;
	}
}

此对象提供 二个私有变量,totalThread 的初始值为所运行的线程的总数,currentThread 为当前正在运行的线程数。 

此对象提供 二个私有变量,totalThread 的初始值为所运行的线程的总数,currentThread 为当前正在运行的线程数。Java代码
线程运行时处理完自己的任务后调用方法waitForOtherThread 等待其他线程结束,即当前运行线程数与线程总数的比较 

线程运行时处理完自己的任务后调用方法waitForOtherThread 等待其他线程结束,即当前运行线程数与线程总数的比较Java代码
如果运行线程数小于线程总数,则当前运行线程数+1 后,当前线程进入等待状态,否则,唤醒其他等待线程。 

如果运行线程数小于线程总数,则当前运行线程数+1 后,当前线程进入等待状态,否则,唤醒其他等待线程。
见测试程序

public class MyThread extends Thread {   
    public static LockObject lo = new LockObject(1000);   
  
    public MyThread(String threadName) {   
        super(threadName);   
    }   
  
    public void run() {   
            System.out.println(Thread.currentThread().getName() + " ----开始运行");   
            lo.waitForOtherThread();   
            System.out.println(Thread.currentThread().getName() + " ----结束运行");   
    }   
  
    public static void main(String[] args) {   
        for (int i = 1; i <= 1000; i++) {   
            Thread thread = new MyThread("第" + i + "个线程");   
            thread.setPriority(NORM_PRIORITY);   
            thread.start();   
        }   
    }   
  
}  

http://xuyan2680.iteye.com/admin/blogs/467701

相关推荐

Global site tag (gtag.js) - Google Analytics