RError.com

RError.com Logo RError.com Logo

RError.com Navigation

  • 主页

Mobile menu

Close
  • 主页
  • 系统&网络
    • 热门问题
    • 最新问题
    • 标签
  • Ubuntu
    • 热门问题
    • 最新问题
    • 标签
  • 帮助
主页 / 问题 / 1166735
Accepted
Имя Фамилия
Имя Фамилия
Asked:2020-08-17 01:06:28 +0000 UTC2020-08-17 01:06:28 +0000 UTC 2020-08-17 01:06:28 +0000 UTC

如何使用这个 ForkJoinPool(th)?

  • 772

创建了两种计算斐波那契数的方法:一种使用普通递归,另一种使用ForkJoinPool(ом). 问题是该方法fibonacci1比fibonacci. 请解释原因。一般如何使用ForkJoinPool(ом)?

public class MyRecursiveAction extends RecursiveTask<Long> {

    private static ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

    public static void main(String[] args) {
        long l1, l2;
        l1 = System.nanoTime();
        fibonacci1(40);
        l2 = System.nanoTime();
        System.out.println(l2 - l1);
        l1 = System.nanoTime();
        fibonacci(40);
        l2 = System.nanoTime();
        System.out.println(l2 - l1);
    }

    private long n;

    private MyRecursiveAction(long n) {
        this.n = n;
    }

    public static long fibonacci(long n) {
        return forkJoinPool.invoke(new MyRecursiveAction(n));
//        return new MyRecursiveAction(n).invoke();
    }

    public static long fibonacci1(long n) {
        if (n == 0 || n == 1)
            return n;
        if (n > 0)
            return fibonacci1(n - 1) + fibonacci1(n - 2);
        else
            return fibonacci1(n + 2) - fibonacci1(n + 1);
    }

    @Override
    protected Long compute() {
        if (n == 0 || n == 1)
            return n;
        if (n > 0) {
            MyRecursiveAction myRecursiveAction1 = new MyRecursiveAction(n - 1);
            forkJoinPool.invoke(myRecursiveAction1);
//            myRecursiveAction1.fork();
            MyRecursiveAction myRecursiveAction2 = new MyRecursiveAction(n - 2);
            forkJoinPool.invoke(myRecursiveAction2);
//            myRecursiveAction2.fork();
//            return myRecursiveAction1.join() + myRecursiveAction2.join();
            return myRecursiveAction1.join() + myRecursiveAction2.join();
        } else {
            MyRecursiveAction myRecursiveAction1 = new MyRecursiveAction(n + 1);
            forkJoinPool.invoke(myRecursiveAction1);
//            myRecursiveAction1.fork();
            MyRecursiveAction myRecursiveAction2 = new MyRecursiveAction(n + 2);
            forkJoinPool.invoke(myRecursiveAction2);
//            myRecursiveAction2.fork();
//            return myRecursiveAction2.join() - myRecursiveAction1.join();
            return myRecursiveAction2.join() - myRecursiveAction1.join();
        }
    }
}
java
  • 1 1 个回答
  • 10 Views

1 个回答

  • Voted
  1. Best Answer
    Pak Uula
    2020-08-17T03:10:07Z2020-08-17T03:10:07Z

    所有ExecutorService这些都旨在在专用线程中执行相对繁重的任务。例如,在处理照片时可以使用它们 - 将照片分割成n区域并将它们加载到n流中,然后合并结果。在多核处理器上,处理速度很可能会加快一个n因素,因为线程将在专用内核上工作。或者你可以在 Web 服务中处理请求:调度程序接收请求并将其发送到池中进行处理;当线程在池中空闲时,它将接收请求并开始计算。如果计算比产生线程、调度线程、等待线程完成的开销大得多,那么与单线程执行相比,使用池将显着提高性能。

    但是在您的情况下,加法操作是如此轻量级,以至于使用池的开销比实际计算的成本大很多倍。

    UPD

    示例:查找除数

    例如,让我们开始并行搜索整数的除数。搜索算法有两个参数——枚举间隔的最大长度和生成的子任务的数量。

    如果分配给任务的除数搜索间隔太大,即 (b-a) > intervalLength,然后该任务生成几个子任务,为每个子任务减少间隔。

    也就是说,一个任务最终会生成一整棵子任务树。为了在已经找到除数时不迭代所有内容,每个任务都会接收一个StopFlag带有布尔标志的类的实例。找到除数的子任务会引发这个标志,而所有其他子任务什么都不做。

    子任务通过调用启动task.fork()- 此方法将要执行的子任务放在用于启动父任务的同一池中。

    我必须马上说——这个例子很愚蠢,只是为了说明并行化。

    package org.example;
    
    import java.util.ArrayList;
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.RecursiveTask;
    
    @SuppressWarnings("serial")
    class Divisors extends RecursiveTask<Long> {
        private long n;
        private long a;
        private long b;
        private int forkFactor ;
        private long length;
        private StopFlag stopFlag;
    
        protected static class StopFlag {
            public boolean done = false;
            public boolean isDone() {
                return done;
            }
        }
        
        public Divisors(long n, long a, long b, int forkFactor, long length, StopFlag stopFlag) {
            this.n = n;
            if (0 == a % 2) {
                a += 1;
            }
            this.a = a;
            this.b = b;
            this.forkFactor = forkFactor;
            this.length = length;
            
            if (stopFlag == null) {
                this.stopFlag = new StopFlag();
            } else {
                this.stopFlag = stopFlag;
            }
        }
    
        public Divisors(long n, long a, long b, int forkFactor, long length) {
            this(n,a,b,forkFactor, length, null);
        }
    
        protected Long compute() {
            // System.out.println("Compute started, thread " + Thread.currentThread().getId());
            if (stopFlag.isDone()) {
                return 0L;
            }
            if ((b - a) < length) {
                // Interval is short enough to do the brute force search
                return bruteForce();
            } else {
                // Split the interval and start subtasks
                return divideAndConquer();
            }
        }
    
        private Long divideAndConquer() {
            ArrayList<ForkJoinTask<Long>> results = new ArrayList<ForkJoinTask<Long>>();
            long step = (b-a)/forkFactor + 1;
            for (long i = a; i < b; i+= step) {
                ForkJoinTask<Long> task = new Divisors(n, i, i+step, forkFactor, length, stopFlag);
                results.add(task.fork());
            }
            for (ForkJoinTask<Long> result : results) {
                long divisor = result.join();
                if (divisor > 0) {
                    return divisor;
                }
            }
            return 0L;
        }
    
        private Long bruteForce() {
            for (long i = a; i <= b; i += 2) {
                long res = n % i;
                if (res == 0) {
                    System.out.println("Found");
                    stopFlag.done = true;
                    return i;
                }
            }
            return 0L;
        }
    }
    

    以下启动器附加到此任务:

    package org.example;
    
    import java.math.BigInteger;
    import java.util.Random;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    
    public class MainDivisors {
    
        /**
         * if {@code (b-a) > intervalLength} then fork the task into {@code forkFactor} subtasks
         */
        private static final long intervalLength = 10000000L;
        private static int forkFactor = 4;
        private static ForkJoinPool forkJoinPool = new ForkJoinPool(forkFactor);
    
        public static void main(String[] args) {
            long t1, t2;
            
            BigInteger p = BigInteger.probablePrime(32, new Random());
            BigInteger q = p.nextProbablePrime();
            long n = p.longValue()*q.longValue();
            
            System.out.println("Factoring " + n + " = " + p + "*" + q);
            
            System.out.println("Single thread");
            {
                t1 = System.nanoTime();
                long result = divisorSingleThread(n);
                t2 = System.nanoTime();
                System.out.println("Divisor: " + result + ", time: " + (t2 - t1));
            }
            System.out.println("Pool");
            {
                t1 = System.nanoTime();
                long result = divisorPooled(n, forkFactor, intervalLength);
                t2 = System.nanoTime();
                System.out.println("Divisor: " + result + ", time: " + (t2 - t1));
            }
        }
    
        public static Long divisorPooled(long n, int forkFactor, long intervalLength) {
            if (n %2 == 0) {
                return 2L;
            }
            long a = 3;
            long b = (long) (Math.floor(Math.sqrt(n))) + 1;
    
            ForkJoinTask<Long> task = new Divisors(n, a, b, forkFactor, intervalLength);
            ForkJoinTask<Long> result = forkJoinPool.submit(task);
            return result.join();
        }
    
        public static Long divisorSingleThread(long n) {
            if (n %2 == 0) {
                return 2L;
            }
            long a = 3;
            long b = (long) (Math.floor(Math.sqrt(n))) + 1;
            for (long i = a; i < b; i += 2) {
                long res = n % i;
                if (res == 0) {
                    return i;
                }
            }
            return 0L;
        }
    
    }
    

    如测试所示,加速度为 2-3 倍。例子从手指头上吸了出来,组织并行的开销与性能增益不相上下,但它展示了 MapReduce 的主要思想:

    • 问题空间分区(partition),
    • 将计算函数应用于生成的切片(地图),
    • 减少来自不同分区的结果(减少)。

    UPD2

    正确的例子

    尽管问题的作者已经接受了我的答案,但我将对其进行扩展和补充 - 我将展示正确的示例。上面的示例以及文档中的示例都是错误的。

    这两个示例的问题在于池化任务会产生子任务并等待它们完成。即,池线程之一被阻塞。在存在阻塞线程的情况下,池创建新线程来解决问题,这需要大量资金。

    正确的解决方案是将map和reduce分开。Map 在一个地方完成,reduce 在另一个地方完成。你不必等待任何东西。子任务产生后,父任务终止以释放资源。生成任务的工作结果必须在其他地方处理。

    为了向任务传递更少的参数,我将它们分为两个类。Interval设置迭代除数的间隔。

    package org.example;
    
    public class Interval {
        public final long a;
        public final long b;
    
        public Interval(long a, long b) {
            super();
            this.a = a;
            this.b = b;
        }
    
        public long length() {
            return b - a;
        }
    }
    

    一个完全异步的解决方案需要两件事:结果的共享存储,以及一个同步原语,表示所有任务都已完成并且解决方案已经生成。

    要将结果存储在 Java 中,有一个类CompletableFuture<Long>。该方法isDone表示结果已准备好,该方法get允许接收结果,该方法join阻塞调用线程,直到结果准备好。结果由方法设置,该方法complete将结果存储在内部,isDone启动并解除阻塞等待join。那里还有很多,但作为一个例子就足够了。

    为了计算已完成任务的数量,我使用了原语Phaser. 它包含一个活动成员的内部计数器,可以通过方法调用递增bulkRegister,并通过调用递减arriveAndDeregister。您可以等待通过该方法重置计数器awaitAdvance。移相器中有更多功能,但这足以管理查找分隔器的任务。为简单起见,我将这些函数包装在一个类中CountLock:

    package org.example;
    
    import java.util.concurrent.Phaser;
    
    public class CountLock {
        private final Phaser lock;
        private int phase;
    
        public CountLock() {
            lock = new Phaser();
            phase = lock.getPhase();
        }
        
        public void addTasks(int count) {
            lock.bulkRegister(count);
        }
        
        public void completeTask() {
            lock.arriveAndDeregister();
        }
        
        public void await() {
            lock.awaitAdvance(phase);
        }
    }
    

    求解过程的参数——最大搜索间隔、生成子任务的数量、活动任务的计数器和预期结果的存储——我删除到一个单独的类Params中。纯粹是为了美观 - 我不喜欢调用中的长链参数。

    package org.example;
    
    import java.util.concurrent.CompletableFuture;
    
    public class Params {
        public final int forkFactor;
        public final long maxInterval;
        public final CompletableFuture<Long> result;
    
        public final CountLock lock;
    
        public Params(int forkFactor, long maxInterval, CountLock lock, CompletableFuture<Long> result) {
            super();
            this.forkFactor = forkFactor;
            this.maxInterval = maxInterval;
            this.result = result;
            
            this.lock = lock;
        }
    }
    

    一切准备就绪,可以做饭了。

    由于结果现在存储在单独的位置,因此类型已从 更改RecursiveTask<V>为RecursiveAction。

    package org.example;
    
    import java.util.concurrent.RecursiveAction;
    
    @SuppressWarnings("serial")
    class Divisors extends RecursiveAction {
        private long n;
        private Interval interval;
        private Params params;
    
        public Divisors(long n, Interval i, Params params) {
            this.n = n;
            this.params = params;
            this.interval = i;
        }
    
        protected void compute() {
            try {
                if (params.result.isDone()) {
                    return;
                }
                if (interval.length() < params.maxInterval) {
                    // Interval is short enough to do the brute force search
                    bruteForce();
                } else {
                    // Split the interval and start subtasks
                    divideAndConquer();
                }
            } finally {
                params.lock.completeTask();
            }
        }
    
        private void divideAndConquer() {
            long step = interval.length() / params.forkFactor + 1;
            params.lock.addTasks(params.forkFactor);
    
            for (long i = interval.a; i < interval.b; i += step) {
                RecursiveAction task = new Divisors(n, new Interval(i, i + step), params);
                task.fork();
            }
        }
    
        private void bruteForce() {
            long a = interval.a;
            if (0 == a % 2) {
                a += 1;
            }
            long b = interval.b;
    
            for (long i = a; i <= b; i += 2) {
                long res = n % i;
                if (res == 0) {
                    System.out.println("Found: " + i);
                    params.result.complete(i);
                    return;
                }
            }
        }
    }
    

    Более чем 10-кратный прирост скорости прячется внутри новой организации методов bruteForce, compute и divideAndConquer.

    bruteForce теперь не возвращает результат, а сохраняет его в общее хранилище, заодно взводя флаг о завершении процесса счёта: params.result.complete(i).

    divideAndConquer бьет задачи на подзадачи и увеличивает счётчик активных задач: params.lock.addTasks(params.forkFactor); Обратите внимание, счётчик увеличивается до того как задачи ставятся на исполнение, иначе можно влететь в race condition - в порождённой задаче complete будет вызван до того, как в порождающей addTasks.

    compute при завершении уменьшает счетчик задач на единицу:

            try {
                // ...
            } finally {
                params.lock.completeTask();
            }
    

    Тест производительности. У меня четырехядерный процессор с восемью физическими потоками, поэтому я задал размер пула равным семи. Performance monitor показывает загрузку всех ядер, то есть пул успешно распараллелил задачу, и ядра на 100% заняты счётом - никто никого не ждёт.

    Одна задача перебирает в интервале не длиннее одного миллиона. Варьирование этого параметра может повлиять на производительность - если поставить слишком маленький интервал, накладные затраты на переключение задач будут выше затрат на счёт. Если поставить слишком большим, то максимум параллельности достигнут не будет.

    package org.example;
    
    import java.math.BigInteger;
    import java.util.Random;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.RecursiveAction;
    
    public class MainDivisors {
    
        /**
         * if {@code (b-a) > intervalLength} then fork the task into {@code forkFactor} subtasks
         */
        private static final long intervalLength = 1000000L;
        private static int forkFactor = 7;
        private static ForkJoinPool forkJoinPool = new ForkJoinPool(forkFactor);
    
        public static void main(String[] args) {
            long t1, t2;
            
            BigInteger p = BigInteger.probablePrime(31, new Random());
            BigInteger q = p.nextProbablePrime();
            long n = p.longValue()*q.longValue();      
            System.out.println("Factoring " + n + " = " + p + "*" + q);
            
            // long n = 2130663859L*2130663869L; // 4539728501355410471 
            // System.out.println("Factoring " + n + " = 2130663859L*2130663869");
    
            System.out.println("Single thread");
            {
                long start = System.nanoTime();
                long result = divisorSingleThread(n);
                t1 = System.nanoTime() - start;
                System.out.println("Divisor: " + result + ", time: " + t1);
            }
            System.out.println("Pool");
            {
                long start = System.nanoTime();
                long result = divisorPooled(n, forkFactor, intervalLength);
                t2 = System.nanoTime() - start;
                System.out.println("Divisor: " + result + ", time: " + t2);
            }
            System.out.println("Speed up " + ((double)t1)/t2 + " times");
        }
    
        public static Long divisorPooled(long n, int forkFactor, long intervalLength) {
            if (n %2 == 0) {
                return 2L;
            }
            long a = 3;
            long b = (long) (Math.floor(Math.sqrt(n))) + 1;
            
            CountLock lock = new CountLock();
            CompletableFuture<Long> result = new CompletableFuture<Long>();
    
            Params params = new Params(forkFactor, intervalLength, lock, result);
            
            RecursiveAction task = new Divisors(n, new Interval(a, b), params);
            lock.addTasks(1);
            
            forkJoinPool.submit(task);
            lock.await();
            
            if (result.isDone()) {
                return result.join();
            } else {
                return 0L;
            }
        }
    
        public static Long divisorSingleThread(long n) {
            if (n %2 == 0) {
                return 2L;
            }
            long a = 3;
            long b = (long) (Math.floor(Math.sqrt(n))) + 1;
            for (long i = a; i < b; i += 2) {
                long res = n % i;
                if (res == 0) {
                    return i;
                }
            }
            return 0L;
        }
    
    }
    

    Тест раскладывает на множители 62-х битное число: произведение двух случайных 31-битных простых. То есть нужно перебрать порядка миллиарда вариантов.

    При разложении числа 4539728501355410471 = 2130663859*2130663869 поиск делителей в цикле находит делитель за 8-11 секунд. Не могу сказать, откуда такой разброс берётся. Какие-то фокусы планировщика Windows или JIT компилятора Java.

    Параллельный алгоритм находит делитель за 0.04 - 0.06 секунд. Ускорение в среднем в 200 раз. Это не опечатка, двести раз.

    Factoring 4539728501355410471 = 2130663859L*2130663869
    Single thread
    Divisor: 2130663859, time: 10414989800
    Pool
    Found: 2130663859
    Divisor: 2130663859, time: 48492800
    Speed up 214.77394169856143 times
    

    Обсуждение

    Объясняется такой выигрыш просто. По умолчанию ForkJoinPool использует стек для хранения задач, поступивших на счёт. То есть последние выполняются первыми, и в нашем случае счёт начинается с конца. Так как тестовое число есть произведение двух больших сомножителей, то его находят практически сразу.

    Если же в качестве теста взять произведение трёх 20-битных сомножителей, то лобовой перебор побеждает с разгромным счётом. Тоже в двести раз быстрее ))

    Если снова вернуться к произведению двух больших сомножителей, и убрать оптимизацию, прекращающую счёт в задачах, когда найден ответ, то ускорение будет в 2-2.5 раз. Разброс вызван тем, что время счёта в цикле варьируется от 8.5 до 11 секунд, а параллельный счёт всегда считает за 4.5 секунд.

    • 4

相关问题

  • wpcap 找不到指定的模块

  • 如何以编程方式从桌面应用程序打开 HTML 页面?

  • Android Studio 中的 R.java 文件在哪里?

  • HashMap 初始化

  • 如何使用 lambda 表达式通过增加与原点的距离来对点进行排序?

  • 最大化窗口时如何调整元素大小?

Sidebar

Stats

  • 问题 10021
  • Answers 30001
  • 最佳答案 8000
  • 用户 6900
  • 常问
  • 回答
  • Marko Smith

    如何从列表中打印最大元素(str 类型)的长度?

    • 2 个回答
  • Marko Smith

    如何在 PyQT5 中清除 QFrame 的内容

    • 1 个回答
  • Marko Smith

    如何将具有特定字符的字符串拆分为两个不同的列表?

    • 2 个回答
  • Marko Smith

    导航栏活动元素

    • 1 个回答
  • Marko Smith

    是否可以将文本放入数组中?[关闭]

    • 1 个回答
  • Marko Smith

    如何一次用多个分隔符拆分字符串?

    • 1 个回答
  • Marko Smith

    如何通过 ClassPath 创建 InputStream?

    • 2 个回答
  • Marko Smith

    在一个查询中连接多个表

    • 1 个回答
  • Marko Smith

    对列表列表中的所有值求和

    • 3 个回答
  • Marko Smith

    如何对齐 string.Format 中的列?

    • 1 个回答
  • Martin Hope
    Alexandr_TT 2020年新年大赛! 2020-12-20 18:20:21 +0000 UTC
  • Martin Hope
    Alexandr_TT 圣诞树动画 2020-12-23 00:38:08 +0000 UTC
  • Martin Hope
    Air 究竟是什么标识了网站访问者? 2020-11-03 15:49:20 +0000 UTC
  • Martin Hope
    Qwertiy 号码显示 9223372036854775807 2020-07-11 18:16:49 +0000 UTC
  • Martin Hope
    user216109 如何为黑客设下陷阱,或充分击退攻击? 2020-05-10 02:22:52 +0000 UTC
  • Martin Hope
    Qwertiy 并变成3个无穷大 2020-11-06 07:15:57 +0000 UTC
  • Martin Hope
    koks_rs 什么是样板代码? 2020-10-27 15:43:19 +0000 UTC
  • Martin Hope
    Sirop4ik 向 git 提交发布的正确方法是什么? 2020-10-05 00:02:00 +0000 UTC
  • Martin Hope
    faoxis 为什么在这么多示例中函数都称为 foo? 2020-08-15 04:42:49 +0000 UTC
  • Martin Hope
    Pavel Mayorov 如何从事件或回调函数中返回值?或者至少等他们完成。 2020-08-11 16:49:28 +0000 UTC

热门标签

javascript python java php c# c++ html android jquery mysql

Explore

  • 主页
  • 问题
    • 热门问题
    • 最新问题
  • 标签
  • 帮助

Footer

RError.com

关于我们

  • 关于我们
  • 联系我们

Legal Stuff

  • Privacy Policy

帮助

© 2023 RError.com All Rights Reserve   沪ICP备12040472号-5