Многопоточность с зависящими друг от друга потоками.

 
 
 
Сообщения:7
Всем доброго времени суток, столкнулся с проблемой, что нужно синхронизировать несколько потоков, данные внутри которых зависят друг от друга. То есть первый поток создает массив с данными array[a], второй должен этот массив обработать и выдать, условно, array[a+b], а третий соответственно array[b+c].
Я с многопоточностью плаваю на базовом уровне и знаю оператор synchronized, но он только регулирует очередность использования объекта класса "вне" потоков, которые к нему обращаются. А мне нужно, чтобы потоки брали объект не из самого класса, а последовательно друг из друга по мере обработки объекта предыдущим потоком. Это как-то можно реализовать? Как?
Идея следующая:

public static void main(String[] args){    
        int len = 715000000;

        Thread t1 = new Thread( () -> {
            int nums[] = new int[len];
            for(int i = 0; i < len; i++) nums[i] = i + 2;
        });

        Thread t2 = new Thread( () -> {
            for(int i = 0; i < len; i++) nums[i] *= 3;  
            // на каждой итерации значение nums[i] берется из t1
        });

        Thread t3 = new Thread( () -> {
            for(int i = 0; i < len; i++) nums[i] -= 10;
            // на каждой итерации значение nums[i] берется из t2
        });

        t1.start();
        t2.start();
        t3.start();

        if (t1.isAlive()) t1.join();        
        if (t2.isAlive()) t2.join();
        if (t3.isAlive()) t2.join();

        long sum=0;
        for(int i=0; i < nums.length; i++) {
             sum += nums[i];
        }
        System.out.println(sum);    
}


Забить и выполнить последовательно не вариант. Если поможет, я могу утверждать, что по времени выполнения одного цикла в каждом потоке зависимость такая: t1 < t2 < t3.

Зачем я в это полез вообще
Изменен:22 сен 2020 22:03
 
 
Сообщения:10007
Дак ты вроде бы последовательное выполнение и описываешь. Звучит как будто хочешь запускать след поток как только предыдущий сделал свою работу.
 
 
Сообщения:1046
типичный пример для wait/notify.
нужны 3 массива и 2 переменных указывающих на последний отработанный индекс (volatile).
первый поток работает без остановки, пишет в первый массив, прибавляет к первому указателю и, каждые 5 чисел, орёт notify (или notifyAll тут не совсем уверен, пробуй где лучше.)
второй поток работает, пока не уткнётся в первый указатель. тогда включает wait. тоже орёт notify каждые 5 чисел.
третий поток работает, пока не уткнётся во второй указатель.
не забудь разбудить все потоки, когда закончишь обработку. может случиться, что следующий поток заснёт незадолго до конца, а будящий сигнал не придёт потому, что конец не кратный 5.
так-как потоки из чужих массивов только читают и только те ячейки, где есть данные, то синхронизация не нужна.
ожидание сделай в цикле. тут будят многие и если разбудили, а работы нема, то самое время спать дальше.
подумай на тему: а не проще ли будет сделать всё одним движением?
int [] arr1;
int [] arr2;
int [] arr3;

for(int i = 0; i < arr1.length && i < arr2.length && i < arr3.length; i++)
{
    int a = generateNumber();
    arr1[i] = a;
    int b = calculateSecond(a);
    arr2[i] = b;
    int c = calculateThird(b);
    arr3[i] = c;
}
 
 
Сообщения:7
Староверъ:
Дак ты вроде бы последовательное выполнение и описываешь. Звучит как будто хочешь запускать след поток как только предыдущий сделал свою работу.

Условие эту последовательность многопоточно реализовать. То есть t2 не ждёт массив от t1, а начинает работать как только t1 заполняет первую ячейку массива. А как только t2 заканчивает с первой ячейкой массива, t3 сразу начинает с ней же работать. t1 по времени может обрабатывать все ячейки от 20 минут и больше и если последовательно после него запускать t2 и t3, мне никакого чая не хватит его дождаться.
И такая продолжительность вряд ли из-за моего косяка, так как при выводе на экран промежуточных значений видно, что он работает шустро.

Зачем я в это полез вообще
Изменен:23 сен 2020 08:56
 
 
Сообщения:7
windruf:
типичный пример для wait/notify.
нужны 3 массива и 2 переменных указывающих на последний отработанный индекс (volatile).
первый поток работает без остановки, пишет в первый массив, прибавляет к первому указателю и, каждые 5 чисел, орёт notify (или notifyAll тут не совсем уверен, пробуй где лучше.)
второй поток работает, пока не уткнётся в первый указатель. тогда включает wait. тоже орёт notify каждые 5 чисел.
третий поток работает, пока не уткнётся во второй указатель.
не забудь разбудить все потоки, когда закончишь обработку. может случиться, что следующий поток заснёт незадолго до конца, а будящий сигнал не придёт потому, что конец не кратный 5.
так-как потоки из чужих массивов только читают и только те ячейки, где есть данные, то синхронизация не нужна.
ожидание сделай в цикле. тут будят многие и если разбудили, а работы нема, то самое время спать дальше.

Спасибо, попробую

windruf:
подумай на тему: а не проще ли будет сделать всё одним движением?

Сделать один цикл вместо трёх и в нем реализовать потоки? Думал конечно, но пока это менее принципиально, к тому же в концепции я этого не отражал, но по факту величина массива опрелеляется через arrayList.add в t1, а дальше в циклах идёт ссылка на размер arrayList.

Зачем я в это полез вообще
 
 
Сообщения:825
isded:
мне нужно, чтобы потоки брали объект не из самого класса, а последовательно друг из друга по мере обработки объекта предыдущим потоком. Это как-то можно реализовать? Как?

Никак.
Потоки не могут брать информацию "друг из друга", потому что потоки - это цепочка вызовов методов, а данные внутри методов другим потокам недоступны.
Поэтому нужны объекты вне потоков, с помощью которых потоки обмениваются данными, и самый полезный такой объект - ArrayBlockingQueue, покрывающий 90% потребностей по передаче данных между потоками.

    public static void main(String[] args) throws InterruptedException {
        int len = 715000000;
        ArrayBlockingQueue<Integer> nums1 = new ArrayBlockingQueue<>(1000);
        ArrayBlockingQueue<Integer> nums2 = new ArrayBlockingQueue<>(1000);
        ArrayBlockingQueue<Integer> nums3 = new ArrayBlockingQueue<>(1000);

        Thread t1 = new Thread( () -> {
            try {
                for(int i = 0; i < len; i++) {
                    nums1.put(i + 2);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Thread t2 = new Thread( () -> {
            try {
                for(int i = 0; i < len; i++) {
                    nums2.put(nums1.take()*3);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        Thread t3 = new Thread( () -> {
            try {
                for(int i = 0; i < len; i++) {
                    nums3.put(nums2.take()-10);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        t1.start();
        t2.start();
        t3.start();
        
        long sum=0;
        for(int i=0; i < len; i++) {
            sum += nums3.take();
        }
        System.out.println(sum);
    }


Как видим, философия сменилась радикально: вместо того, чтобы долбиться в одни и те же ячейки массива и думать о синхронизации доступа, используются потки данных. Потребление памяти резко снизилось - не надо держать в памяти все миллионы значений одновременно. Но увеличилось время исполнения: нужно упаковывать и распаковывать числа и синхронизироваться по записи и чтению в очереди. Чтобы снизить накладные расходы, можно обмениваться не отдельными числами, а массивами по 100 или 1000 чисел.
Изменен:23 сен 2020 13:25
 
Модераторы:frymock
Сейчас эту тему просматривают:Нет