Форум программистов, компьютерный форум, киберфорум
Наши страницы
Ovederax
Войти
Регистрация
Восстановить пароль
Оценить эту запись

Остановка потоков обработки данных при помощи "Отравления очереди"

Запись от Ovederax размещена 09.04.2019 в 17:07
Метки java

Недавно я узнал о интересном способе остановки потоков, занятых чтением данных из очереди, в которую периодически могут поступать данные.
Преподаватель назвал этот способ "Отравлением очереди".
Его идея заключается в том, чтобы определить некоторый объект как недопустимый для очереди,
чтобы он никак не мог там оказаться случайно. Этот объект будет ядом.
После завершения потоков записи этот яд добавляется в очередь.
Потоки чтения заканчивают обрабатывать очередь допустимых данных и натыкаются на отраву.
Обнаружив ее они завершают работу с очередью.

Этот метод был применен для следующей учебной задачи:
Доступные средства: Java, классы пакета и подпакетов java.util.concurent.*
Нужно реализовать очередь данных, данные представлены классом Data с полем int[] data внутри.
Имеются потоки чтения и потоки записи работающие с этой очередью.
Потоки записи соответственно ставят некоторое конечное число объектов Data в очередь.
Потоки чтения берут данные из очереди и распечатывают их, при этом не знают сколько всего поступит объектов в очередь.
В некоторый момент данные перестают поступать в очередь и
необходимо прекратить попытки считать из нее данные и последовательно завершить все потоки.

В целях синхронизации и обеспечения безопасного совместного доступа к данным была применена очередь ArrayBlockingQueue.
У этой очереди имеются методы take() и put() блокирующие вставку объектов если очередь пуста и получение объектов при пустой очереди.
Подробней про блокирующие очереди.

Код, решающий эту задачу и использующий описанную идею
Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package com.company;
 
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
 
class Data {
    private int[] data;
 
    public Data(int[] data) {
        this.data = data;
    }
 
    int[] get() {
        return data;
    }
}
 
class PoisonArrayBlockingQueue<T> {
    private ArrayBlockingQueue<T> queue;
    private T poison;
 
    public PoisonArrayBlockingQueue(T poison) {
        if(poison == null) {
            throw new NullPointerException();
        }
        this.poison = poison;
        this.queue = new ArrayBlockingQueue<>(8);
    }
 
    T get() throws InterruptedException {
        return queue.take();
    }
 
    void put(T item) throws InterruptedException {
        queue.put(item);
    }
 
    boolean isPoison(T testObj) {
        return testObj == poison;
    }
 
    void addPoison() throws InterruptedException {
        put(poison);
    }
}
 
class ReadTask extends Thread {
    private PoisonArrayBlockingQueue<Data> store;
    private int id;
 
    public ReadTask(int id, PoisonArrayBlockingQueue<Data> store) {
        this.store = store;
        this.id = id;
    }
 
    @Override
    public void run() {
        System.out.println("ReadTask with id="+id+" is run");
        try {
            Data data = store.get();
            while( !store.isPoison(data) ) {
                System.out.println("Reader #"+id+" "+Arrays.toString(data.get()));
                Thread.sleep(1000);
                data = store.get();
            }
            store.addPoison();      // ADD POISON BACK to other thread
        } catch (InterruptedException e) {
            System.out.println("Thread Reader interrupt");
        }
        System.out.println("Read task #"+id+" finish");
    }
}
 
class WriteTask extends Thread {
    private PoisonArrayBlockingQueue<Data> store;
    private int id;
    private int countData;
 
    public WriteTask(int id, PoisonArrayBlockingQueue<Data> store, int countData) {
        this.store = store;
        this.id = id;
        this.countData = countData;
    }
 
    @Override
    public void run() {
        try {
            System.out.println("WriteTask with id="+id+" is run");
            for( int i=0; i<countData; ++i ) {
                int count = 3;      //1+(int)(Math.random()*5);
                int[] data = new int[count];
                for (int j=0; j<count; ++j) {
                    data[j] = (int)(Math.random()*10);
                }
                System.out.println("Writer #"+id+" "+Arrays.toString(data));
                store.put(new Data(data));
                Thread.sleep(1000);
            }
        } catch (InterruptedException e) {
            System.out.println("Thread Writer interrupt");
        }
        System.out.println("Writer task #"+id+" finish");
    }
}
 
public class Main {
    public static void main(String[] args) {
        final int READ_THREAD_COUNT = 2;
        final int WRITE_THREAD_COUNT = 3;
 
        Data poison = new Data(null);
        PoisonArrayBlockingQueue<Data> store = new PoisonArrayBlockingQueue<>(poison);
 
        List<Thread> readWaitList = new ArrayList<>(READ_THREAD_COUNT);
        List<Thread> writeWaitList = new ArrayList<>(WRITE_THREAD_COUNT);
        try {
            for (int i = 0; i < WRITE_THREAD_COUNT; ++i) {
                Thread t = new WriteTask(i, store, (int) (Math.random() * 4 + 2));
                writeWaitList.add(t);
                t.start();
            }
            for (int i = 0; i < READ_THREAD_COUNT; ++i) {
                Thread t = new ReadTask(i, store);
                readWaitList.add(t);
                t.start();
            }
 
            // WAIT OTHER THREADS
            for (int i = 0; i < WRITE_THREAD_COUNT; ++i) {
                writeWaitList.get(i).join();
                store.addPoison();   // <----ADD POISON-----
            }
            for (int i = 0; i < READ_THREAD_COUNT; ++i) {
                readWaitList.get(i).join();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("main() final execution");
    }
}
Размещено в Без категории
Просмотров 165 Комментарии 0
Всего комментариев 0
Комментарии
 
КиберФорум - форум программистов, компьютерный форум, программирование
Powered by vBulletin® Version 3.8.9
Copyright ©2000 - 2019, vBulletin Solutions, Inc.
Рейтинг@Mail.ru