Запуск большого количества потоков с возвращением результ

 
 
 
Сообщения:29
Здравствуйте!
Есть следующая проблема: в своей программе вызываю одну и ту же процедуру с разными параметрами в разных потоках, количество различных вызовов может превышать 60000, в результате чего возникают ошибки связанные с памятью и обрабатывается нормально только первая тысяча. Я разбил потоки по группам по 254 потока, они выполняются, выводится результат и заново создаются 254 потока и так далее, но проблема не исчезла. Я новичёк в Java, прошу подсказать как исправить ситуацию.

Вот код:

public class ResultScan implements Callable<String> {
    private String id, name;
    private int version;
    private long timeout;
    private String[] oid =
        { "1.3.6.1.2.1.1.1", "1.3.6.1.2.1.17.4.3.1.1","1.3.6.1.2.1.17.4.3.1.2" };
    public ResultScan(String id, String name, int version, long timeout){
        this.id = id;
        this.name = name;
        this.version = version;
        this.timeout = timeout;
    }
    public String call() throws IOException {
        Packadg pack = new Packadg(id, name, version, timeout);
        return pack.snmpGetTable(oid);
        //return pack.snmpGetNext("1.3.6.1.2.1.1.1");
    }
    
}


       for(i = 0;i < jList1.getModel().getSize();i++){
           if(f[i] % 254 == 0)
               num = f[i] / 254;
           else
               num = (f[i] / 254) + 1;
           s = f[i];
           for(int p = 0;p < num;p++){
               if(s - 254 >= 0)
                numf = 254;
               else
                   numf = s;

               for(int k = 0; k < lRead.length;k++){

                   DefaultListModel lm = (DefaultListModel)jList4.getModel();

                   ExecutorService exec = Executors.newCachedThreadPool();

                   ArrayList<Future<String>> results = new ArrayList<Future<String>>();

                   for(int j = 0;j < numf;j++){
                        results.add(exec.submit(new ResultScan(lip[i][(254*p)+j], lRead[k], 1, 1000)));
                   
                    }


                    for(Future<String> fs : results)
                    try{

                    lm.addElement(v + " " + fs.get());
                    v = v + 1;
                
            }catch(InterruptedException e){
                System.out.println(e);
            }catch(ExecutionException e){
                System.out.println(e);
            }finally{
                exec.shutdown();
            }

            }

               s = s - 254;
           }
       }
 
 
Сообщения:923
если у вас несколько потоков используют одни и те же данные - могут возникнуть проблемы с синхронизацией.

ЯроллеR
 
 
Сообщения:29
alon4ik У меня все потоки вызывают одну и ту же процедуру с разными параметрами. Как можно избежать проблемы?
 
 
Сообщения:923
то, что они вызывают одну процедуру (а лучше говорить метод, ибо не принято говорить как в делфи) - то ладно, но если в этом методе вы что-то делаете с одной и той же переменной - то результат может быть непредсказуем.
Ну например. Вы вызываете метод:

int b;
private void someMethod(int a){
  b >  10 ? b+=a : b-=a;
}


в нескольких потоках. Если два потока одновременно прочитают значение b, потом первый увеличит его так, что тот будет превышать 10, тогда второй об этом не узнает и тоже увеличит b.

Следует синхронизировать доступ к b, чтобы такого не произошло.

ЯроллеR
 
 
Сообщения:29
В методе выполняется сборка SNMP пакета, отсылка его на адрес и получение результата. Меняется только адрес. Грубо говоря я пытаюсь реализовать широковещательную отсылку пакета. Все переменные инициализируются по новой, результаты с предыдущего потока не используются.
 
 
Сообщения:656
Каждый поток съедает минимум 2Мб памяти для стека. Указывается при старте JVM.

Или менять архитектуру:

Крутятся N потоков (пару десятков, может больше, меньше, подбирать эксперементально) и 1 управляющий.
В каждом потоке главный объект имеет синхронизированный метод получения "задания". Внутри коллекция, которая опрашивается после выполнения очередного задания.
Управляющий поток равномерно раздает задания потокам, а можно и не равномерно, опрашивая "как успехи?"

Или, динамически - управляющий создает поток для нового задания, пока первый созданный не сообщит что выполнил все задания, тогда управляющий дает ему, и т.д.,
 
 
Сообщения:29
Skynin В принципе идея понятна, не могли бы вы привести простой пример в виде кода? Не совсем понимаю как реализовать.
 
 
Сообщения:656
Charodey:
Не совсем понимаю как реализовать.

Что конкретно?

Цикл управляющего:
Формируем(или откуда то берем) Задание.
Если задания кончились, увольняем работников, рассылая уведомления об увольнении каждому
Проверяем есть ли выполнившие работу работники, если нет - нанимаем нового (если установили порог количеству работников - выбираем случайного и даем ему)
Передаем ему Задание

Цикл работника:
Смотрим есть ли задание
Если есть выполняем
Если нет,
- смотрим - а не уволили уже, тогда все, выходим из цикла
- шлем весточку управляющему - Я свободен, и отдыхаем пару-тройку секунд


В каком месте трудности с реализацией?

Кстати, такая организация создаст простенькое но самомасштабируемоеся под конкретные ресурсы решение.
Можно будет даже посмотреть, сколько реально работников-потоков нужно для работы.
 
 
Сообщения:29
Skynin То есть у нас есть один главный поток, в нём выполняются n потоков, каждый из которых возвращает результат. Главный поток отслеживает работу потоков и как только один из потоков выполнил работы его закрываем и на его место присылаем новый. Я правильно понял?
Если так, я не совсем понимаю как поток будет сигнализировать главному о том что работу выполнил и результат вернул.
 
 
Сообщения:656
Charodey:
Главный поток отслеживает работу потоков и как только один из потоков выполнил работы его закрываем и на его место присылаем новый. Я правильно понял?

Неправильно поняли. Если подчиненный выполнил работу ему дается еще, а НЕ создается новый поток. И главный поток - НЕ отслеживает работу - он формирует задания и ищет кому отдать.
Иначе мы приходим к тому же - для каждого задания новый поток. Так смысл тогда огород городить? так же как и с пулами у Вас - зачем Вам пулы если все равно идеология та же - для каждого задания новый поток?

Charodey:
Если так, я не совсем понимаю как поток будет сигнализировать главному о том что работу выполнил и результат вернул.

Очень просто, главный объект когда создает подчиненного передает ему своего слушателя (Паттерн Наблюдатель (Observer design pattern)), и возможно ID (это уже вариант реализации определения - а кто это уже свободен)
Понятно что обмены должны быть synchronized

Подумалось, может у Вас сложность что путаетесь между "потоками" и "объектами"?

Тогда напишите код вначале без потоков, как будто оно само собой уже по потокам распределяется. Потом останется только запустить метод Run в отдельном потоке. Ну и synchronized расставить.
Изменен:12 ноя 2009 13:56
 
 
Сообщения:29
Skynin Ясно. Попробую реализовать, если что обращусь к вам. Спасибо большое!
 
 
Сообщения:656
Charodey:
Попробую реализовать, если что обращусь к вам.

Утрировано - у главного объекта будет коллекция работников.
Есть коллекция заявок - я свободен.
Слушатель главного объекта помещает в эту коллекцию заявку. (поток РабочН, см. откуда будет вызван ниже)
Главный метод заглядывает в эту коллекцию и по ID(или по прямой ссылке, тогда работник должен будет передавать ссылку на себя в слушателе) находит этого работника. (поток Глав)

У каждого работника есть своя коллекция заданий.
Есть метод, который ложит задание - этот метод вызывается главным объектом (поток Глав)
Главный метод работника после успешной работы - заглядывает в свою коллекцию, может уже прислали что. Если не прислали, дергает слушателя Главного Объекта (поток РабочН)

Обращение к коллекциям "я свободен" и "коллекции заданий работника" должны быть синхронизированы.
Я часто пользуюсь таким фокусом:
synchronized MyTask AddGetTask(MyTask addTask)
если передали что-то - добавляет
если передали null - возвращает
или
synchronized MyTask AddGetTask(MyTask addTask, boolean flDelete)
если передали null и true - возвращает и удаляет из коллекции.
если передали null и false - возвращает и оставляет в коллекции.
Если коллекция пустая - возвращаем null

И понятно, как заработает - нарастить детали - обработку ошибок, определение "подвисшего" работника и т.д.
 
 
Сообщения:610
http://lib.juga.ru/article/view/166/1/68/

Вот здесь есть пример чего-то подобного. Но он немного громоздок и сложен, однако если разберётесь... Это ThreadPool Pattern.

Стоимость устранения одной ошибки, обнаруженной после выхода продукта, превышает затраты на исправление одной ошибки, обнаруженной во время проектирования, в 4-5 раз, а если баг был выявлен через техническую поддержку, в 100 раз.
 
 
Сообщения:656
Charodey:
Попробую реализовать

import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;

public class GeneralMan {

	private int maxJob;
	public static Random cuRand;
	public GeneralMan(int pMaxJob) {
		maxJob = pMaxJob;
		cuRand = new Random();
	}

	public void MainProccess() throws InterruptedException {
		TaskTicket cuTT = getNewTT();
		while (cuTT != null) {
			JoberMan cuJober = getJober();

			cuJober.addGetTaskTicket(cuTT);

			if (!allJobers.contains(cuJober)) {
				allJobers.add(cuJober);
				Thread cuThe = new Thread(cuJober);
				allThe.add(cuThe);
				cuThe.start();
			}
			else
				synchronized(cuJober) { cuJober.notify(); }

			cuTT = getNewTT();
		}
		System.out.println("Max free jober: " + maxFree);
		System.out.println("countJob: " + JoberMan.countJob);

		// проверочная часть. Здесь нужно отлавливать сбойные задания и убивать подвисших jober'ов
		Thread.sleep(Main.PauseMaxJob * 10);
		for(JoberMan aJober : allJobers) {
			aJober.isStop = true;
			synchronized(aJober) { aJober.notify(); }
		}
		Thread.sleep(Main.PauseMaxJob * 10);

		for(Thread aThe : allThe) {
			if (aThe.isAlive())
				System.out.println("Кто-то до сих пор работает!");
		}
	}

	private JoberMan getJober() {
		JoberMan cuJober = addGetFreeJober(null);

		if (cuJober == null) { // свободных нет, создаем или добавляем задания
			if (allJobers.size() < maxJob) {
				cuJober = new JoberMan(genJoberListener);
			}
			else {
				synchronized(cuRand) {
					// выбираем случайным образом потому что неизвестно кто хорошо справляется
					cuJober = allJobers.get(cuRand.nextInt(allJobers.size()));
					// подумать а если другой поток захватит allJobers и потом cuRand ;)
				}
			}
		}

		return cuJober;
	}

	private static int maxTT;
	private TaskTicket getNewTT() {

		if (maxTT > maxJob*2) return null; // искуственный признак окончания

		TaskTicket cuTT = new TaskTicket(); // стоит сохранять в списке, чтобы потом проверить - успешно ли выполнено. см StateJobTicket

		cuTT.dataTask = String.valueOf(maxTT++); // просто строка с номером задания, а не реальные данные
		try {
			Thread.sleep(Main.PauseNewTask); // затраты времени на формирование задания
		} catch (InterruptedException ex) {
			//Logger.getLogger(GeneralMan.class.getName()).log(Level.SEVERE, null, ex);
		}

		return cuTT;
	}

	ConcurrentLinkedQueue<Thread> allThe = new ConcurrentLinkedQueue<Thread>(); // список всех потоков
	CopyOnWriteArrayList<JoberMan> allJobers = new CopyOnWriteArrayList<JoberMan>(); // список всех джоберов
	ConcurrentLinkedQueue<JoberMan> freeJobers = new ConcurrentLinkedQueue<JoberMan>(); // список простаивающих

	public int maxFree; // для сбора статистики

	JoberMan addGetFreeJober(JoberMan pFree) {
		JoberMan resultJ = pFree;

			if (pFree == null) {
				Thread.yield(); // даем возможность кому-то из джоберов закончить выполнение задания 
				resultJ = freeJobers.poll();
			}
			else {
				if (!freeJobers.contains(pFree))
					freeJobers.add(pFree);

				if (maxFree < freeJobers.size()) // для сбора максимального количества простаивающих
                  // и в этот момент вызвался freeJobers.poll() - что будет? и в каких еще местах возможна такая ситуация? ;)
					maxFree = freeJobers.size();
		}
		return resultJ;
	}

	void delJober(JoberMan pFree) {
			allJobers.remove(pFree);
	}

	private ReceiveMess genJoberListener = new ReceiveMess();
	private class ReceiveMess implements ListenerJob {

		public void Ifree(JoberMan pObj) {
			GeneralMan.this.addGetFreeJober(pObj);
		}

		public void Idead(JoberMan pObj) {
			GeneralMan.this.delJober(pObj);
		}

	}
}


import java.util.concurrent.ConcurrentLinkedQueue;

public class JoberMan implements Runnable {

	public static int countJob;
	public int IDjober;

	private ListenerJob cuListe;
	public JoberMan (ListenerJob pListener) {
		cuListe = pListener;
		IDjober = countJob++;
	}

	ConcurrentLinkedQueue<TaskTicket> lstTT = new ConcurrentLinkedQueue<TaskTicket>();
	public TaskTicket addGetTaskTicket(TaskTicket pTT) {

		TaskTicket resultOp = pTT;
		
		if (pTT == null)
				resultOp = lstTT.poll();
		else
			lstTT.add(pTT);

		return resultOp;
	}

	public Exception isEx; // ошибка выполения, для анализа в GeneralMan

	public volatile boolean isStop;
	public void run() {
		TaskTicket cuTT = addGetTaskTicket(null);
		
		ENDRUN:
		while (cuTT != null) {
			try {
				procTT(cuTT);

				cuTT = addGetTaskTicket(null);
				while (cuTT == null) { // Always wait inside a loop that checks the condition being 
        // waited on – this addresses the timing issue if another 
        // thread satisfes the condition before the wait begins.  
        // Also, it protects your code from spurious wake-ups that 
        // can (and do) occur.
					if (isStop) break ENDRUN;

					synchronized(this) {
						cuListe.Ifree(this);
						this.wait();
					}
					
					cuTT = addGetTaskTicket(null);
				}

			} catch (InterruptedException ex) {
				isEx = ex;
				//Logger.getLogger(JoberMan.class.getName()).log(Level.SEVERE, null, ex);
			}
		}
		isStop = true;
		cuListe.Idead(this);
	}

	private boolean procTT(TaskTicket cuTT) throws InterruptedException {

		cuTT.stampTask = StateJobTicket.Working;

		int rndInt = 0;
		synchronized(GeneralMan.cuRand) {
			rndInt = GeneralMan.cuRand.nextInt(Main.PauseMaxJob);
		}

		if (rndInt < 5) { // генерируем ошибку выполения задания шоб жизнь малиной не казалась
			System.out.println(String.valueOf(IDjober) + " Error " + cuTT.dataTask);
			cuTT.stampTask = StateJobTicket.Error;
			cuTT.errorInfo = String.valueOf(IDjober) + String.valueOf(rndInt);
		}
		else {
			System.out.println(String.valueOf(IDjober) + " task: " + cuTT.dataTask);

			Thread.sleep(rndInt); // типа работаем

			cuTT.stampTask = StateJobTicket.Sucess;
		}

		return cuTT.stampTask == StateJobTicket.Sucess;
	}

}


public enum StateJobTicket {
	Start,
	Working,
	Sucess,
	Error
}

public interface ListenerJob {
	void Ifree(JoberMan pObj);
	void Idead(JoberMan pObj);
}

public class Main {

	// для тестов
	public static int MaxTheard = 500; // максимально допустимое количество потоков
	public static int PauseNewTask = 1; // миллисекунд для формирования задания
	public static int PauseMaxJob = 100; // максимум миллисекунд для выполнения задания

	/**
	 * @param args the command line arguments
	 */
	public static void main(String[] args) {
		GeneralMan cuGMan = new GeneralMan(MaxTheard);
		try {
			cuGMan.MainProccess();
		} catch (InterruptedException ex) {
			//Logger.getLogger(Main.class.getName()).log(Level.SEVERE, null, ex);
		}
	}
}
 
 
Сообщения:29
Skynin Спасибо большое! Буду разбирать)

Вадим Спасибо за информацию!
 
Модераторы:Нет
Сейчас эту тему просматривают:Нет