Запуск большого количества потоков с возвращением результ

 
 
 
Сообщения:29
Здравствуйте!
Есть следующая проблема: в своей программе вызываю одну и ту же процедуру с разными параметрами в разных потоках, количество различных вызовов может превышать 60000, в результате чего возникают ошибки связанные с памятью и обрабатывается нормально только первая тысяча. Я разбил потоки по группам по 254 потока, они выполняются, выводится результат и заново создаются 254 потока и так далее, но проблема не исчезла. Я новичёк в Java, прошу подсказать как исправить ситуацию.

Вот код:

public class ResultScan implements Callable<String> {
    private String id, name;
    private int version;
    private long timeout;
    private String[] oid =
        { "1.3.6.1.2.1.1.1", "1.3.6.1.2.1.17.4.3.1.1","1.3.6.1.2.1.17.4.3.1.2" };
    public ResultScan(String id, String name, int version, long timeout){
        this.id = id;
        this.name = name;
        this.version = version;
        this.timeout = timeout;
    }
    public String call() throws IOException {
        Packadg pack = new Packadg(id, name, version, timeout);
        return pack.snmpGetTable(oid);
        //return pack.snmpGetNext("1.3.6.1.2.1.1.1");
    }
    
}


       for(i = 0;i < jList1.getModel().getSize();i++){
           if(f[i] % 254 == 0)
               num = f[i] / 254;
           else
               num = (f[i] / 254) + 1;
           s = f[i];
           for(int p = 0;p < num;p++){
               if(s - 254 >= 0)
                numf = 254;
               else
                   numf = s;

               for(int k = 0; k < lRead.length;k++){

                   DefaultListModel lm = (DefaultListModel)jList4.getModel();

                   ExecutorService exec = Executors.newCachedThreadPool();

                   ArrayList<Future<String>> results = new ArrayList<Future<String>>();

                   for(int j = 0;j < numf;j++){
                        results.add(exec.submit(new ResultScan(lip[i][(254*p)+j], lRead[k], 1, 1000)));
                   
                    }


                    for(Future<String> fs : results)
                    try{

                    lm.addElement(v + " " + fs.get());
                    v = v + 1;
                
            }catch(InterruptedException e){
                System.out.println(e);
            }catch(ExecutionException e){
                System.out.println(e);
            }finally{
                exec.shutdown();
            }

            }

               s = s - 254;
           }
       }
 
 
Сообщения:923
если у вас несколько потоков используют одни и те же данные - могут возникнуть проблемы с синхронизацией.

ЯроллеR
 
 
Сообщения:29
alon4ik У меня все потоки вызывают одну и ту же процедуру с разными параметрами. Как можно избежать проблемы?
 
 
Сообщения:923
то, что они вызывают одну процедуру (а лучше говорить метод, ибо не принято говорить как в делфи) - то ладно, но если в этом методе вы что-то делаете с одной и той же переменной - то результат может быть непредсказуем.
Ну например. Вы вызываете метод:

int b;
private void someMethod(int a){
  b >  10 ? b+=a : b-=a;
}


в нескольких потоках. Если два потока одновременно прочитают значение b, потом первый увеличит его так, что тот будет превышать 10, тогда второй об этом не узнает и тоже увеличит b.

Следует синхронизировать доступ к b, чтобы такого не произошло.

ЯроллеR
 
 
Сообщения:29
В методе выполняется сборка SNMP пакета, отсылка его на адрес и получение результата. Меняется только адрес. Грубо говоря я пытаюсь реализовать широковещательную отсылку пакета. Все переменные инициализируются по новой, результаты с предыдущего потока не используются.
 
 
Сообщения:656
Каждый поток съедает минимум 2Мб памяти для стека. Указывается при старте JVM.

Или менять архитектуру:

Крутятся N потоков (пару десятков, может больше, меньше, подбирать эксперементально) и 1 управляющий.
В каждом потоке главный объект имеет синхронизированный метод получения "задания". Внутри коллекция, которая опрашивается после выполнения очередного задания.
Управляющий поток равномерно раздает задания потокам, а можно и не равномерно, опрашивая "как успехи?"

Или, динамически - управляющий создает поток для нового задания, пока первый созданный не сообщит что выполнил все задания, тогда управляющий дает ему, и т.д.,
 
 
Сообщения:29
Skynin В принципе идея понятна, не могли бы вы привести простой пример в виде кода? Не совсем понимаю как реализовать.
 
 
Сообщения:656
Charodey:
Не совсем понимаю как реализовать.

Что конкретно?

Цикл управляющего:
Формируем(или откуда то берем) Задание.
Если задания кончились, увольняем работников, рассылая уведомления об увольнении каждому
Проверяем есть ли выполнившие работу работники, если нет - нанимаем нового (если установили порог количеству работников - выбираем случайного и даем ему)
Передаем ему Задание

Цикл работника:
Смотрим есть ли задание
Если есть выполняем
Если нет,
- смотрим - а не уволили уже, тогда все, выходим из цикла
- шлем весточку управляющему - Я свободен, и отдыхаем пару-тройку секунд


В каком месте трудности с реализацией?

Кстати, такая организация создаст простенькое но самомасштабируемоеся под конкретные ресурсы решение.
Можно будет даже посмотреть, сколько реально работников-потоков нужно для работы.
 
 
Сообщения:29
Skynin То есть у нас есть один главный поток, в нём выполняются n потоков, каждый из которых возвращает результат. Главный поток отслеживает работу потоков и как только один из потоков выполнил работы его закрываем и на его место присылаем новый. Я правильно понял?
Если так, я не совсем понимаю как поток будет сигнализировать главному о том что работу выполнил и результат вернул.
 
 
Сообщения:656
Charodey:
Главный поток отслеживает работу потоков и как только один из потоков выполнил работы его закрываем и на его место присылаем новый. Я правильно понял?

Неправильно поняли. Если подчиненный выполнил работу ему дается еще, а НЕ создается новый поток. И главный поток - НЕ отслеживает работу - он формирует задания и ищет кому отдать.
Иначе мы приходим к тому же - для каждого задания новый поток. Так смысл тогда огород городить? так же как и с пулами у Вас - зачем Вам пулы если все равно идеология та же - для каждого задания новый поток?

Charodey:
Если так, я не совсем понимаю как поток будет сигнализировать главному о том что работу выполнил и результат вернул.

Очень просто, главный объект когда создает подчиненного передает ему своего слушателя (Паттерн Наблюдатель (Observer design pattern)), и возможно ID (это уже вариант реализации определения - а кто это уже свободен)
Понятно что обмены должны быть synchronized

Подумалось, может у Вас сложность что путаетесь между "потоками" и "объектами"?

Тогда напишите код вначале без потоков, как будто оно само собой уже по потокам распределяется. Потом останется только запустить метод Run в отдельном потоке. Ну и synchronized расставить.
Изменен:12 ноя 2009 13:56
 
 
Сообщения:29
Skynin Ясно. Попробую реализовать, если что обращусь к вам. Спасибо большое!
 
 
Сообщения:656
Charodey:
Попробую реализовать, если что обращусь к вам.

Утрировано - у главного объекта будет коллекция работников.
Есть коллекция заявок - я свободен.
Слушатель главного объекта помещает в эту коллекцию заявку. (поток РабочН, см. откуда будет вызван ниже)
Главный метод заглядывает в эту коллекцию и по ID(или по прямой ссылке, тогда работник должен будет передавать ссылку на себя в слушателе) находит этого работника. (поток Глав)

У каждого работника есть своя коллекция заданий.
Есть метод, который ложит задание - этот метод вызывается главным объектом (поток Глав)
Главный метод работника после успешной работы - заглядывает в свою коллекцию, может уже прислали что. Если не прислали, дергает слушателя Главного Объекта (поток РабочН)

Обращение к коллекциям "я свободен" и "коллекции заданий работника" должны быть синхронизированы.
Я часто пользуюсь таким фокусом:
synchronized MyTask AddGetTask(MyTask addTask)
если передали что-то - добавляет
если передали null - возвращает
или
synchronized MyTask AddGetTask(MyTask addTask, boolean flDelete)
если передали null и true - возвращает и удаляет из коллекции.
если передали null и false - возвращает и оставляет в коллекции.
Если коллекция пустая - возвращаем null

И понятно, как заработает - нарастить детали - обработку ошибок, определение "подвисшего" работника и т.д.
 
 
Сообщения:610
http://lib.juga.ru/article/view/166/1/68/

Вот здесь есть пример чего-то подобного. Но он немного громоздок и сложен, однако если разберётесь... Это ThreadPool Pattern.

Стоимость устранения одной ошибки, обнаруженной после выхода продукта, превышает затраты на исправление одной ошибки, обнаруженной во время проектирования, в 4-5 раз, а если баг был выявлен через техническую поддержку, в 100 раз.
 
 
Сообщения:656
Charodey:
Попробую реализовать

import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;

public class GeneralMan {

        private int maxJob;
        public static Random cuRand;
        public GeneralMan(int pMaxJob) {
                maxJob = pMaxJob;
                cuRand = new Random();
        }

        public void MainProccess() throws InterruptedException {
                TaskTicket cuTT = getNewTT();
                while (cuTT != null) {
                        JoberMan cuJober = getJober();

                        cuJober.addGetTaskTicket(cuTT);

                        if (!allJobers.contains(cuJober)) {
                                allJobers.add(cuJober);
                                Thread cuThe = new Thread(cuJober);
                                allThe.add(cuThe);
                                cuThe.start();
                        }
                        else
                                synchronized(cuJober) { cuJober.notify(); }

                        cuTT = getNewTT();
                }
                System.out.println("Max free jober: " + maxFree);
                System.out.println("countJob: " + JoberMan.countJob);

                // проверочная часть. Здесь нужно отлавливать сбойные задания и убивать подвисших jober'ов
                Thread.sleep(Main.PauseMaxJob * 10);
                for(JoberMan aJober : allJobers) {
                        aJober.isStop = true;
                        synchronized(aJober) { aJober.notify(); }
                }
                Thread.sleep(Main.PauseMaxJob * 10);

                for(Thread aThe : allThe) {
                        if (aThe.isAlive())
                                System.out.println("Кто-то до сих пор работает!");
                }
        }

        private JoberMan getJober() {
                JoberMan cuJober = addGetFreeJober(null);

                if (cuJober == null) { // свободных нет, создаем или добавляем задания
                        if (allJobers.size() < maxJob) {
                                cuJober = new JoberMan(genJoberListener);
                        }
                        else {
                                synchronized(cuRand) {
                                        // выбираем случайным образом потому что неизвестно кто хорошо справляется
                                        cuJober = allJobers.get(cuRand.nextInt(allJobers.size()));
                                        // подумать а если другой поток захватит allJobers и потом cuRand ;)
                                }
                        }
                }

                return cuJober;
        }

        private static int maxTT;
        private TaskTicket getNewTT() {

                if (maxTT > maxJob*2) return null; // искуственный признак окончания

                TaskTicket cuTT = new TaskTicket(); // стоит сохранять в списке, чтобы потом проверить - успешно ли выполнено. см StateJobTicket

                cuTT.dataTask = String.valueOf(maxTT++); // просто строка с номером задания, а не реальные данные
                try {
                        Thread.sleep(Main.PauseNewTask); // затраты времени на формирование задания
                } catch (InterruptedException ex) {
                        //Logger.getLogger(GeneralMan.class.getName()).log(Level.SEVERE, null, ex);
                }

                return cuTT;
        }

        ConcurrentLinkedQueue<Thread> allThe = new ConcurrentLinkedQueue<Thread>(); // список всех потоков
        CopyOnWriteArrayList<JoberMan> allJobers = new CopyOnWriteArrayList<JoberMan>(); // список всех джоберов
        ConcurrentLinkedQueue<JoberMan> freeJobers = new ConcurrentLinkedQueue<JoberMan>(); // список простаивающих

        public int maxFree; // для сбора статистики

        JoberMan addGetFreeJober(JoberMan pFree) {
                JoberMan resultJ = pFree;

                        if (pFree == null) {
                                Thread.yield(); // даем возможность кому-то из джоберов закончить выполнение задания 
                                resultJ = freeJobers.poll();
                        }
                        else {
                                if (!freeJobers.contains(pFree))
                                        freeJobers.add(pFree);

                                if (maxFree < freeJobers.size()) // для сбора максимального количества простаивающих
                  // и в этот момент вызвался freeJobers.poll() - что будет? и в каких еще местах возможна такая ситуация? ;)
                                        maxFree = freeJobers.size();
                }
                return resultJ;
        }

        void delJober(JoberMan pFree) {
                        allJobers.remove(pFree);
        }

        private ReceiveMess genJoberListener = new ReceiveMess();
        private class ReceiveMess implements ListenerJob {

                public void Ifree(JoberMan pObj) {
                        GeneralMan.this.addGetFreeJober(pObj);
                }

                public void Idead(JoberMan pObj) {
                        GeneralMan.this.delJober(pObj);
                }

        }
}


import java.util.concurrent.ConcurrentLinkedQueue;

public class JoberMan implements Runnable {

        public static int countJob;
        public int IDjober;

        private ListenerJob cuListe;
        public JoberMan (ListenerJob pListener) {
                cuListe = pListener;
                IDjober = countJob++;
        }

        ConcurrentLinkedQueue<TaskTicket> lstTT = new ConcurrentLinkedQueue<TaskTicket>();
        public TaskTicket addGetTaskTicket(TaskTicket pTT) {

                TaskTicket resultOp = pTT;
                
                if (pTT == null)
                                resultOp = lstTT.poll();
                else
                        lstTT.add(pTT);

                return resultOp;
        }

        public Exception isEx; // ошибка выполения, для анализа в GeneralMan

        public volatile boolean isStop;
        public void run() {
                TaskTicket cuTT = addGetTaskTicket(null);
                
                ENDRUN:
                while (cuTT != null) {
                        try {
                                procTT(cuTT);

                                cuTT = addGetTaskTicket(null);
                                while (cuTT == null) { // Always wait inside a loop that checks the condition being 
        // waited on – this addresses the timing issue if another 
        // thread satisfes the condition before the wait begins.  
        // Also, it protects your code from spurious wake-ups that 
        // can (and do) occur.
                                        if (isStop) break ENDRUN;

                                        synchronized(this) {
                                                cuListe.Ifree(this);
                                                this.wait();
                                        }
                                        
                                        cuTT = addGetTaskTicket(null);
                                }

                        } catch (InterruptedException ex) {
                                isEx = ex;
                                //Logger.getLogger(JoberMan.class.getName()).log(Level.SEVERE, null, ex);
                        }
                }
                isStop = true;
                cuListe.Idead(this);
        }

        private boolean procTT(TaskTicket cuTT) throws InterruptedException {

                cuTT.stampTask = StateJobTicket.Working;

                int rndInt = 0;
                synchronized(GeneralMan.cuRand) {
                        rndInt = GeneralMan.cuRand.nextInt(Main.PauseMaxJob);
                }

                if (rndInt < 5) { // генерируем ошибку выполения задания шоб жизнь малиной не казалась
                        System.out.println(String.valueOf(IDjober) + " Error " + cuTT.dataTask);
                        cuTT.stampTask = StateJobTicket.Error;
                        cuTT.errorInfo = String.valueOf(IDjober) + String.valueOf(rndInt);
                }
                else {
                        System.out.println(String.valueOf(IDjober) + " task: " + cuTT.dataTask);

                        Thread.sleep(rndInt); // типа работаем

                        cuTT.stampTask = StateJobTicket.Sucess;
                }

                return cuTT.stampTask == StateJobTicket.Sucess;
        }

}


public enum StateJobTicket {
        Start,
        Working,
        Sucess,
        Error
}

public interface ListenerJob {
        void Ifree(JoberMan pObj);
        void Idead(JoberMan pObj);
}

public class Main {

        // для тестов
        public static int MaxTheard = 500; // максимально допустимое количество потоков
        public static int PauseNewTask = 1; // миллисекунд для формирования задания
        public static int PauseMaxJob = 100; // максимум миллисекунд для выполнения задания

        /**
         * @param args the command line arguments
         */
        public static void main(String[] args) {
                GeneralMan cuGMan = new GeneralMan(MaxTheard);
                try {
                        cuGMan.MainProccess();
                } catch (InterruptedException ex) {
                        //Logger.getLogger(Main.class.getName()).log(Level.SEVERE, null, ex);
                }
        }
}
 
 
Сообщения:29
Skynin Спасибо большое! Буду разбирать)

Вадим Спасибо за информацию!
 
Модераторы:Нет
Сейчас эту тему просматривают:Нет