Считывание из файла и запись в другой файл в потоке

 
 
 
Сообщения:103
А если указать просто Buffer buffer; то создается объектная переменная и с ней работаем?
 
 
Сообщения:103
Переделал так

import java.io.*;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Buffer {
    private ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();

    public void add(String str) {
        queue.offer(str);
    }

    public String get() {
        return queue.poll();
    }

    public int getSize() {
        return queue.size();
    }
}

class Reader implements Runnable {
    Buffer buffer;
    private String separator = File.separator;
    private String path = "C:" + separator + "files" + separator;
    private String file;

    public Reader(Buffer buffer, String file) {
        this.buffer = buffer;
        this.file = file;
    }

    @Override
    public void run() {
        synchronized (buffer) {
            try (BufferedReader buf = new BufferedReader(new InputStreamReader(new FileInputStream(path + file)))) {
                String tmpStr;
                while ((tmpStr = buf.readLine()) != null) {
                    buffer.add(tmpStr);
                    buffer.notify();
                }
            } catch (IOException ex) {

            }
        }
    }
}

class Writer implements Runnable {
    Buffer buffer;
    private String separator = File.separator;
    private String path = "C:" + separator + "files" + separator;
    private String file;

    public Writer(Buffer buffer, String file) {
        this.buffer = buffer;
        this.file = file;
    }

    @Override
    public void run() {
        synchronized (buffer) {
            try (PrintWriter writer = new PrintWriter(new FileWriter(path + file))) {
                if (buffer.get() == null) {
                    buffer.wait();
                } else {
                    do {
                        writer.println(buffer.get());
                    } while (buffer.getSize() != 0);
                }
            } catch (IOException | InterruptedException ex) {

            }
        }
    }
}

class Test {
    public static void main(String[] args) {
        String inFile = "f1.txt";
        String outFile = "f2.txt";

        Buffer buffer = new Buffer();

        Reader reader = new Reader(buffer, inFile);
        Writer writer = new Writer(buffer, outFile);

        Thread p1 = new Thread(reader);
        Thread p2 = new Thread(writer);

        p1.start();
        p2.start();

        try {
            p1.join();
            p2.join();
        } catch (InterruptedException ex) {

        }
    }
}


В моем понимании я делаю в коде следующее. В классе Reader в методе run() создаем синхронизированный блок для объекта buffer. и выполняем считывание из файла и добавление в конец очереди, и после этого для данного объекта buffer вызываем notify(), чтобы уведомить второй поток что в конец очереди добавился элемент (строка). Затем в классе Writer в методе run() создаем синхронизированный блок для объекта buffer и проверяем, если то что берем из головы очереди равно null, то для buffer вызываем wait(), иначе берем из головы элемент и записываем в другой файл.

НО у меня в файле f1.txt 10 строк, после выполнения программы в файле f2.txt оказывается только 9 строк. Где у меня ошибка? И вообще правильно ли я организовал взаимодействие потоков через wait() и notify() ? Спасибо)
 
 
Сообщения:1403
Quote:
А если указать просто Buffer buffer; то создается объектная переменная и с ней работаем?

Нет, вы передаете объект в конструкторе.

 synchronized (buffer) {
            try (BufferedReader buf = new BufferedReader(new InputStreamReader(new FileInputStream(path + file)))) {
                String tmpStr;
                while ((tmpStr = buf.readLine()) != null) {
                    buffer.add(tmpStr);
                    buffer.notify();
                }
            } catch (IOException ex) {
 
            }
        }

Неверно, я же писал
Quote:
после того как положили в очередь вызываете метод в котором делаете notify на какой-нибудь объект

А вы блокируете buffer от начала и до конца, и тут и не пахнет многопоточностью. Сделайте отдельные методы как я писал

    synchronized (buffer) {
            try (PrintWriter writer = new PrintWriter(new FileWriter(path + file))) {
                if (buffer.get() == null) {
                    buffer.wait();
                } else {
                    do {
                        writer.println(buffer.get());
                    } while (buffer.getSize() != 0);
                }
            } catch (IOException | InterruptedException ex) {
 
            }
        }

Это тоже не верно, если у вас условие окончания записи это когда очередь опустеет.
поток1(п1) кладет 1 запись в файл
п1 кладет ещё 1 запись
п2 забирает запись
п2 забирает запись, тут очередь уже пуста
п2 - прекращает работу
п1 - кладет запись, все она никуда не попадет. Вам нужен какой-то флаг, который говорит что больше считывать не надо. Создайте его в буффере и ставьте в true в ридере. И сделайте отдельный метод как я писал, я же не зря отдельно выделил эти моменты
 
 
Сообщения:103
Создал методы

package filesreaderinthreads;

import java.io.*;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Buffer {
    private ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
    boolean flag = false;

    public void add(String str) {
        queue.offer(str);
    }

    public String get() {
        return queue.poll();
    }

    public int getSize() {
        return queue.size();
    }
}

class Reader implements Runnable {
    Buffer buffer;
    private String separator = File.separator;
    private String path = "C:" + separator + "files" + separator;
    private String file;

    public Reader(Buffer buffer, String file) {
        this.buffer = buffer;
        this.file = file;
    }

    public void mNotify() {
        notify();
    }

    @Override
    public void run() {
        try (BufferedReader buf = new BufferedReader(new InputStreamReader(new FileInputStream(path + file)))) {
            String tmpStr;
            while ((tmpStr = buf.readLine()) != null) {
                buffer.add(tmpStr);
                mNotify();
            }
        } catch (IOException ex) {

        }
    }
}

class Writer implements Runnable {
    Buffer buffer;
    private String separator = File.separator;
    private String path = "C:" + separator + "files" + separator;
    private String file;

    public Writer(Buffer buffer, String file) {
        this.buffer = buffer;
        this.file = file;
    }

    public void mWait() {
        try {
            wait();
        } catch (InterruptedException ex) {

        }
    }

    @Override
    public void run() {
        try (PrintWriter writer = new PrintWriter(new FileWriter(path + file))) {
            if (buffer.get() == null) {
                mWait();
            } else {
                do {
                    writer.println(buffer.get());
                } while (buffer.getSize() != 0);
            }
        } catch (IOException ex) {

        }
    }
}

class Test {
    public static void main(String[] args) {
        String inFile = "f1.txt";
        String outFile = "f2.txt";

        Buffer buffer = new Buffer();

        Reader reader = new Reader(buffer, inFile);
        Writer writer = new Writer(buffer, outFile);

        Thread p1 = new Thread(reader);
        Thread p2 = new Thread(writer);

        p1.start();
        p2.start();

        try {
            p1.join();
            p2.join();
        } catch (InterruptedException ex) {

        }
    }
}


Но при запуске программы ошибка

Exception in thread "Thread-0" java.lang.IllegalMonitorStateException
	at java.lang.Object.notify(Native Method)
	at filesreaderinthreads.Reader.mNotify(Buffer.java:35)
	at filesreaderinthreads.Reader.run(Buffer.java:44)
	at java.lang.Thread.run(Thread.java:745)


Можете мой пример переделать как будет правильнее? Я уже совсем запутался, не могу понять работу с wait notify и флагом. и что должно быть в том самом отдельном методе в котором вызывается notify
Изменен:16 ноя 2016 07:42
 
 
Сообщения:1403
Что-то вроде такого
public class Buffer {
    private ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
    private boolean queueEnded = false;
    public void add(String str) {
        synchronized (queue) {
            queue.offer(str);
            queue.notifyAll();
        }
    }

    public String get() {
        synchronized (queue) {
            String result = queue.poll();
            if(result == null && !queueEnded){
                try {
                    queue.wait();
                } catch (InterruptedException e) {}
            }
            return queue.poll(); 
        }
    }

    public void queueEnded(){
        synchronized (queue) {
            queueEnded = true;
            queue.notifyAll();
        }
    }
}

class Reader implements Runnable {
    private Buffer buffer;
    private String separator = File.separator;
    private String path = "C:" + separator + "files" + separator;
    private String file;

    public Reader(Buffer buffer, String file) {
        this.buffer = buffer;
        this.file = file;
    }
    
    @Override
    public void run() {
        try (BufferedReader buf = new BufferedReader(new InputStreamReader(new FileInputStream(path + file)))) {
            String tmpStr;
            while ((tmpStr = buf.readLine()) != null) {
                buffer.add(tmpStr);
            }
        } catch (IOException ex) {}
        buffer.queueEnded();
    }
}

class Writer implements Runnable {
    Buffer buffer;
    private String separator = File.separator;
    private String path = "C:" + separator + "files" + separator;
    private String file;

    public Writer(Buffer buffer, String file) {
        this.buffer = buffer;
        this.file = file;
    }

    @Override
    public void run() {
        String str;
        try (PrintWriter writer = new PrintWriter(new FileWriter(path + file))) {
            while ((str = buffer.get()) != null){
                writer.println(str);
            }
        } catch (IOException ex) {}
    }
}

и почитайте что-нибудь о синхронизации и ошибках которые можно получить при них, вы же лезете в этому тему неподготовленным
 
 
Сообщения:103
Я запустил ваш пример. В файле f2.txt У меня оказалось только 5 строк из первого файла и то не подряд, которые идут. Я думал добавить в методы add и get синхронизированные блоки, но что-то отошел потом от этого.

В первом файле у меня

1
2
3
4
5
6
7
8
9
10

А во второй записалось только

2
4
6
8
10
Изменен:16 ноя 2016 10:26
 
 
Сообщения:1403
package ru.lanit.osp.antifraud.model;

import java.io.*;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Buffer {
    private ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
    private boolean queueEnded = false;

    public void add(String str) {
        synchronized (queue) {
            queue.offer(str);
            System.out.println(String.format("%s Put %s in queue", Thread.currentThread().getName(), str));
            queue.notifyAll();
        }
    }

    public String get() {
        synchronized (queue) {
            String result = queue.poll();
            if (result == null && !queueEnded) {
                try {
                    System.out.println(String.format("%s Empty result, wait", Thread.currentThread().getName()));
                    queue.wait();
                    System.out.println(String.format("%s Get notify", Thread.currentThread().getName()));
                    return queue.poll();
                } catch (InterruptedException e) {
                }
            }
            return result;
        }
    }

    public void queueEnded() {
        synchronized (queue) {
            System.out.println(String.format("%s End file, notify thread", Thread.currentThread().getName()));
            queueEnded = true;
            queue.notifyAll();
        }
    }
}

class Reader implements Runnable {
    private Buffer buffer;
    private String separator = File.separator;
    private String path = "C:" + separator + "tmp" + separator;
    private String file;

    public Reader(Buffer buffer, String file) {
        this.buffer = buffer;
        this.file = file;
    }

    @Override
    public void run() {

        try {
            BufferedReader buf = new BufferedReader(new InputStreamReader(new FileInputStream(path + file)));
            String tmpStr;
            while ((tmpStr = buf.readLine()) != null) {
                buffer.add(tmpStr);
                Thread.sleep(500);
            }
            buffer.queueEnded();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Writer implements Runnable {
    Buffer buffer;
    private String separator = File.separator;
    private String path = "C:" + separator + "tmp" + separator;
    private String file;

    public Writer(Buffer buffer, String file) {
        this.buffer = buffer;
        this.file = file;
    }

    @Override
    public void run() {
        try {
            String str;
            PrintWriter writer = new PrintWriter(new FileWriter(path + file));
            while ((str = buffer.get()) != null) {
                System.out.println(String.format("%s Write %s to file", Thread.currentThread().getName(), str));
                writer.println(str);
            }
            writer.close();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

class Test {
    public static void main(String[] args) {
        String inFile = "1.txt";
        String outFile = "2.txt";

        Buffer buffer = new Buffer();

        Reader reader = new Reader(buffer, inFile);
        Writer writer = new Writer(buffer, outFile);

        Thread p1 = new Thread(reader);
        p1.setName("Reader");
        Thread p2 = new Thread(writer);
        p2.setName("Writer");
        p1.start();
        p2.start();

        try {
            p1.join();
            p2.join();
        } catch (InterruptedException ex) {

        }
    }
}
 
 
Сообщения:103
Понял почему не все записывало. Спасибо.
 
 
Сообщения:103
В вашем примере алгоритм такой получается?

1. Поток 1 запускается, и считывает строки из файла пока не вернет null
2. Записывает в конец очереди и засыпает на 500 мс.
3. Уведомляет второй поток, о том что тот может забрать элемент из головы
4. Второй поток записывает в rezult то что считали из головы очереди
5. Если то что считали равно null и флаг равен true, то ожидаем и возвращаем элемент из головы очереди
6. Иначе сразу возвращаем то что извлекли из головы очереди
7. Второй поток записывает извлеченный из головы очереди результат в файл.
8. В конце если флаг становится равным true, то программа останавливается

правильно понял?
 
 
Сообщения:1403
Да, засыпание 500мс добавил только чтобы было понятно как это все работает
 
 
Сообщения:103
понял.. спасибо.
 
Модераторы:Нет
Сейчас эту тему просматривают:Нет