java中不太常见的东西(4) - Fork/Join

news/2024/7/4 9:47:41

引言

《java中不太常见的东西》这个模块已经好久没写了,今天写一个java中自带的分布式处理方式Fork/Join。Fork/Join在JDK1.7的时候引入,它某种程度上可以实现简单的map-reduce操作。笔者目前整理的一些blog针对面试都是超高频出现的。大家可以点击链接:http://blog.csdn.net/u012403290。

技术点

1、map-reduce
处理大数据的编程模型,分为”Map(映射)”和”Reduce(归约)”两部分。应用于分布式编程的情况,可以尽可能提升运算效率和速度。通俗来说就是把一个很大的任务,拆分为很多小任务,然后有各自的线程去处理这些小任务,最后把结果统一起来。

2、产生背景
其实Fork/Join处理一定程度的数据,核心建立于目前水平发展的多核计算机技术,它表达了一种充分利用资源的概念。在如今的计算机领域多核处理器早已是主流,而且并发编程讲究多线程处理问题,对计算机资源利用达到一个新的高度。

Fork/Join结构

正确的使用Fork/Join框架,需要一定熟悉它的结构,对于一个分布式的任务,必然具备两种条件:①任务调度;②任务执行。在Fork/Join中,我们主要用它自定义的线程池来提交任务和调度任务,称之为:ForkJoinPool;同时我们有它自己的任务执行类,称之为:ForkJoinTask。

不过我们不直接使用ForkJoinTask来直接执行和分解任务,我们一般都使用它的两个子类,RecursiveActionRecursiveTask,其中,前者主要处理没有返回结果的任务,后者主要处理有返回结果的任务。总结一下,一下就是Fork/Join的基本模型:
这里写图片描述

接下来我们一部分一部分来分析一下他们各自的结构:

①ForkJoinPool:
网上很多解释ForkJoinPool的源码已经非常老了,在JDK1.8中已经不再继续维护ForkJoinTask和ForkJoinWorkerThread这两个数组了,前者是一个个任务,后者是执行任务的线程。它现在的模式是形成了一个内部类:WorkQueue,下面是它在JDK1.8中的源码:

  /**
     * Queues supporting work-stealing as well as external task
     * submission. See above for descriptions and algorithms.
     * Performance on most platforms is very sensitive to placement of
     * instances of both WorkQueues and their arrays -- we absolutely
     * do not want multiple WorkQueue instances or multiple queue
     * arrays sharing cache lines. The @Contended annotation alerts
     * JVMs to try to keep instances apart.
     */
    @sun.misc.Contended
    static final class WorkQueue {

        // Instance fields
        volatile int scanState;    // versioned, <0: inactive; odd:scanning
        int stackPred;             // pool stack (ctl) predecessor
        int nsteals;               // number of steals
        int hint;                  // randomization and stealer index hint
        int config;                // pool index and mode
        volatile int qlock;        // 1: locked, < 0: terminate; else 0
        volatile int base;         // index of next slot for poll
        int top;                   // index of next slot for push
        ForkJoinTask<?>[] array;   // the elements (initially unallocated)
        final ForkJoinPool pool;   // the containing pool (may be null)
        final ForkJoinWorkerThread owner; // owning thread or null if shared
        volatile Thread parker;    // == owner during call to park; else null
        volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
        volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer

    }

仔细阅读源码我们发现,现在的结构和原来完全不一样了。本来我们需要从ForkJoinTask数组中把任务分发给ForkJoinWorkerThread来执行。而现在,用一个内部类workQueue来完成这个任务,在workQueue中存在一个ForkJoinWorkerThread表示这个队列的执行者,同时在workQueue的成员变量中,我们发现有一个ForkJoinTask数组,这个数组是这个Thread需要执行的任务。

阅读这个内部类的描述,我们发现这个queue还支持线程的任务窃取,什么叫线程的任务窃取呢?就是说你和你的一个伙伴一起吃水果,你的那份吃完了,他那份没吃完,那你就偷偷的拿了他的一些水果吃了。存在执行2个任务的子线程,这里要讲成存在A,B两个个WorkQueue在执行任务,A的任务执行完了,B的任务没执行完,那么A的WorkQueue就从B的WorkQueue的ForkJoinTask数组中拿走了一部分尾部的任务来执行,可以合理的提高运行和计算效率。

我们不深入了解源码,这并不是这篇博文的本意。接下来我们看看ForkJoinPool中提交任务的几个方法:

a、submit

    /**
     * Submits a ForkJoinTask for execution.
     *
     * @param task the task to submit
     * @param <T> the type of the task's result
     * @return the task
     * @throws NullPointerException if the task is null
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     */
    public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
        return task;
    }

b、execute

    /**
     * Arranges for (asynchronous) execution of the given task.
     *
     * @param task the task
     * @throws NullPointerException if the task is null
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     */
    public void execute(ForkJoinTask<?> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
    }

c、invoke

    /**
     * Performs the given task, returning its result upon completion.
     * If the computation encounters an unchecked Exception or Error,
     * it is rethrown as the outcome of this invocation.  Rethrown
     * exceptions behave in the same way as regular exceptions, but,
     * when possible, contain stack traces (as displayed for example
     * using {@code ex.printStackTrace()}) of both the current thread
     * as well as the thread actually encountering the exception;
     * minimally only the latter.
     *
     * @param task the task
     * @param <T> the type of the task's result
     * @return the task's result
     * @throws NullPointerException if the task is null
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     */
    public <T> T invoke(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
        return task.join();
    }

这3种任务提交方法还是有所差别的,在submit中提交了一个任务之后,会异步开始执行任务同时返回这个任务,而 execute会异步执行这个任务但是没有任何返回。而invoke会异步开始执行任务,直接返回一个结果。

②ForkJoinTask:
在ForkJoinTask中我们就简单介绍fork和join这两种操作,以下是fork方法的源码:

    // public methods

    /**
     * Arranges to asynchronously execute this task in the pool the
     * current task is running in, if applicable, or using the {@link
     * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}.  While
     * it is not necessarily enforced, it is a usage error to fork a
     * task more than once unless it has completed and been
     * reinitialized.  Subsequent modifications to the state of this
     * task or any data it operates on are not necessarily
     * consistently observable by any thread other than the one
     * executing it unless preceded by a call to {@link #join} or
     * related methods, or a call to {@link #isDone} returning {@code
     * true}.
     *
     * @return {@code this}, to simplify usage
     */
    public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);//把当前线程添加到workQueue中
        else
            ForkJoinPool.common.externalPush(this);//直接执行这个任务
        return this;
    }

在fork方法中,它会先判断当前的线程是否属于ForkJoinWorkerThread线程,如果属于这个线程,那么就把线程添加到workQueue中,否则就直接执行这个任务。

以下是join方法:

    /**
     * Returns the result of the computation when it {@link #isDone is
     * done}.  This method differs from {@link #get()} in that
     * abnormal completion results in {@code RuntimeException} or
     * {@code Error}, not {@code ExecutionException}, and that
     * interrupts of the calling thread do <em>not</em> cause the
     * method to abruptly return by throwing {@code
     * InterruptedException}.
     *
     * @return the computed result
     */
    public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)//判断任务是否正常,否则要报告异常
            reportException(s);
        return getRawResult();//返回结果
    }



 /**
     * Implementation for join, get, quietlyJoin. Directly handles
     * only cases of already-completed, external wait, and
     * unfork+exec.  Others are relayed to ForkJoinPool.awaitJoin.
     *
     * @return status upon completion
     */
    private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }

    final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                s = setCompletion(NORMAL);//如果任务执行完了,那么就设置为NORMAL
        }
        return s;
    }

在join的操作主要是判断当前任务的执行状态和返回结果,任务状态有四种:已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL)。
在doJoin()方法里,首先通过查看任务的状态,通过doExec方法去判断任务是否执行完毕,如果执行完了,则直接返回任务状态,如果没有执行完,就等待继续执行。如果任务顺利执行完成了,则设置任务状态为NORMAL,如果出现异常,则需要报告异常。

用代码实现Fork/Join实现大数据计算

如果真的要很详细的去介绍Fork/join源码,貌似需要更进一步的去钻研,很多底层的的东西还涉及到了一些乐观锁。我们不继续深究了,我们尝试用fork/join来实现大数列的计算,同时我们尝试把它和一般的计算方式做比较,看看哪个效率更高。

需求:
计算1+2+3+……..+N的和

以下是我实现的用Fork/Join进行计算,主要的核心思想就是把超大的计算拆分为小的计算,通俗来说就是把一个极大的任务拆分为很多个小任务,下面是核心计算模型:
这里写图片描述

下面是代码实现:

package com.brickworkers;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class FockJoinTest extends RecursiveTask<Long>{//继承RecursiveTask来实现
    //设立一个最大计算容量
    private final int DEFAULT_CAPACITY = 10000;


    //用2个数字表示目前要计算的范围
    private int start;

    private int end;

    public FockJoinTest(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {//实现compute方法
        //分为两种情况进行出来
        long sum = 0;
        //如果任务量在最大容量之内
        if(end - start < DEFAULT_CAPACITY){
            for (int i = start; i < end; i++) {
                sum += i;
            }
        }else{//如果超过了最大容量,那么就进行拆分处理
            //计算容量中间值
            int middle = (start + end)/2;
            //进行递归
            FockJoinTest fockJoinTest1 = new FockJoinTest(start, middle);
            FockJoinTest fockJoinTest2 = new FockJoinTest(middle + 1, end);
            //执行任务
            fockJoinTest1.fork();
            fockJoinTest2.fork();
            //等待任务执行并返回结果
            sum = fockJoinTest1.join() + fockJoinTest2.join();
        }

        return sum;
    }


    public static void main(String[] args) {

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        FockJoinTest fockJoinTest = new FockJoinTest(1, 100000000);
        long fockhoinStartTime = System.currentTimeMillis();
        //前面我们说过,任务提交中invoke可以直接返回结果
        long result = forkJoinPool.invoke(fockJoinTest);
        System.out.println("fock/join计算结果耗时"+(System.currentTimeMillis() - fockhoinStartTime));

        long sum = 0;
        long normalStartTime = System.currentTimeMillis();
        for (int i = 0; i < 100000000; i++) {
            sum += i;
        }
        System.out.println("普通计算结果耗时"+(System.currentTimeMillis() - normalStartTime));
    }

}


//执行结果:
//fock/join计算结果耗时33
//普通计算结果耗时141



注意,在上面的例子中,程序的效率其实首你设置的DEFAULT_CAPACITY影响的,如果你把这个容量值设置的太小,那么它会被分解成好多好多的子任务,那么效率反而会降低。但是把容量设置的稍微大一些效率也会相对的提升,经过测试,运行时间和DEFAULT_CAPCITY的关系大致如下图:
这里写图片描述

尾记

在我们的日常开发中,很多地方可以用分布式的方式去实现它,当然了这个是要建立你在资源很富余的情况之下。比如说,定时任务,半夜执行的时候,资源富足,那么我们可以用这种方式加快运算效率。再比如说,项目报表文件的导出,我们可以把超级多行的数据一部分一部分拆开出来,也可以达到加快效率的效果。大家可以尝试。

希望对你有所帮助。


http://www.niftyadmin.cn/n/4544283.html

相关文章

java中不太常见的东西(5) - 注解

引言 在日常的开发过程中&#xff0c;其实每个人都用到了注解&#xff0c;最常见的就是重写Override。既然这么常见为什么还要放入不常见的模块中呢&#xff1f;在本篇博文中会详细介绍关于注解的概念和各个组成部分&#xff0c;同时会写出一个demo来说明自定义注解使用的一种…

python中关于操作时间的方法(二):使用datetime模块

使用datetime模块来获取当前的日期和时间 1 import datetime 2 idatetime.datetime.now() 3 print ("当前的日期和时间是%s"%i) 使用datetime模块来获取当前的年份 1 import datetime 2 idatetime.datetime.now() 3 print ("当前的年份是%s" %i.year) 使用…

为什么互联网行业需要Ping32盗版软件检测?

随着互联网的发展&#xff0c;软件作为互联网生态系统不可或缺的一环&#xff0c;被广泛应用于生产、生活、娱乐等各个领域。然而&#xff0c;很多软件公司为了保护自己的版权收益和知识产权&#xff0c;会采用授权方式进行软件销售和分发&#xff0c;但是授权方式难以避免软件…

java实现(3)-堆

引言 堆&#xff0c;我们一般作为二叉堆的一种总称&#xff0c;它是建立在二叉树之上的。在本篇博文中&#xff0c;会详细介绍堆的结构和原理&#xff0c;以至于写出堆的实现。在代码实现中我们主要是针对于插入和删除做一些操作&#xff0c;在删除中我们只考虑删除最小的&…

集体智慧编程--第3章 发现群组

监督学习和非监督学习 利用样本输入和期望输出来学习如何预测的技术称为监督学习。非监督学习的目标是采集数据&#xff0c;然后从中找出不同的群组。 单词向量 为聚类算法准备数据的常见做法是定义一组公共的数值型属性&#xff0c;我们可以利用这些属性对数据项进行比较。 对…

java实现排序(4)-堆排序

引言 在上一篇博文中&#xff0c;尝试实现了二叉堆的结构。在本篇博文中&#xff0c;将建立在堆的基础之上&#xff0c;讨论如何用堆实现排序。二叉堆的代码直接引用昨天的实现源码&#xff0c;在代码的基础上做一些修改使其变成堆排序。笔者目前整理的一些blog针对面试都是超…

【第三组】第四次冲刺例会(2017.7.14)

开发小组&#xff1a;Geomystery 冲刺经理&#xff1a;程立智&#xff08;李明伦代&#xff09; 小组成员&#xff1a;李明伦 蔡镇泽 郑昊 王涵 温志成 一、会议内容 1、 昨天都做了什么&#xff1a; 程立智&#xff1a;&#xff08;夏令营请假未归&#xff09; 李明伦&#xf…

ASP.NET Core中地址栏传入数据会影响Controller向ViewModel赋值

今日发现一个BUG&#xff0c;经过仔细调试&#xff0c;发现之前没有注意到的一个特性&#xff0c;或者说是很郁闷的一个设定。 需求大致是这样的&#xff1a;1-N个用车账单&#xff0c;可以共同选择出来生成一个报销单&#xff0c;现在要修改报销单。 上部分相关代码&#xff0…