Есть большой файл, около 500К строк. В каждой строке файла есть идентификатор , характеризующий одну из 50 таблиц (энтити, класс). Мне надо как-то очень оптимально пробежаться по этому файлу, достать каждую строку и кинуть её в уже написанный обрабтчик. Мне настоятельно рекомендовали использовать ForkJoinPool. Пока не получается то, что хочется.
Запуск в методе разбора файла
Java |
1
2
3
4
5
6
| public void parseFile() {
ForkJoinPool pool = new ForkJoinPool(4);
FileParserProcessor fileParser = new FileParserProcessor("MyBigFile.txt", 24);
pool.invoke(fileParser);
pool.shutdown();
} |
|
Моя неправильная реализация:
Java |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| public class FileParserProcessor extends RecursiveAction {
private long workLoad = 0;
private String path;
public FileParserProcessor(String path, long workLoad) {
this.workLoad = workLoad;
this.path = path;
}
@Override
protected void compute() {
//if work is above threshold, break tasks up into smaller tasks
if(this.workLoad > 16) {
List<FileParserProcessor> subtasks =
new ArrayList<FileParserProcessor>(createSubtasks());
for(RecursiveAction subtask : subtasks){
subtask.fork();
}
} else {
try (BufferedReader br = new BufferedReader(
// TODO: to make file upload via interface
new FileReader(path)
)) {
String line;
while ((line = br.readLine()) != null) {
System.out.println(line);
}
} catch (FileNotFoundException e) {
throw new RuntimeException("An error occurred: file not found", e);
} catch (IOException e) {
throw new RuntimeException("An error occurred while parsing file", e);
}
}
}
private List<FileParserProcessor> createSubtasks() {
List<FileParserProcessor> subtasks =
new ArrayList<FileParserProcessor>();
FileParserProcessor subtask1 = new FileParserProcessor(path, this.workLoad / 2);
FileParserProcessor subtask2 = new FileParserProcessor(path, this.workLoad / 2);
subtasks.add(subtask1);
subtasks.add(subtask2);
return subtasks;
}
} |
|
Добавлено через 6 минут
При этом я нашёл вариант решения через ExecutorService. Плохо только то, что нужно знать количество строк, чтобы задать capacity в BlockingQueue. Подразумевается, что впоследствии файлы будут загружаться пользователем, поэтому количество строк может быть каким угодно.
Найденное решение через ExecutorService:
Java |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
| public void parseFile() {
final int threadCount = Runtime.getRuntime().availableProcessors();
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2000000);
// create thread pool with given size
ExecutorService service = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < (threadCount - 1); i++) {
service.submit(new CPUTask(queue));
}
// Wait til FileTask completes
try {
service.submit(new FileTask(queue)).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("An error .......................................", e);
}
service.shutdownNow(); // interrupt CPUTasks
// Wait til CPUTasks terminate
try {
service.awaitTermination(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException("An error occurred when terminate parsing process by timeout", e);
}
}
class CPUTask implements Runnable {
private final BlockingQueue<String> queue;
public CPUTask(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
String line;
while (true) {
try {
// block if the queue is empty
line = queue.take();
System.out.println(line);
// do things with line
} catch (InterruptedException ex) {
break; // FileTask has completed
}
}
}
}
class FileTask implements Runnable {
private final BlockingQueue<String> queue;
public FileTask(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
try (BufferedReader br = new BufferedReader(
// TODO: to make file upload via interface
new FileReader("MyVeryBigFile.txt")
)) {
String line;
while ((line = br.readLine()) != null) {
// block if the queue is full
queue.put(line);
}
} catch (FileNotFoundException e) {
throw new RuntimeException("An error occurred: file not found", e);
} catch (IOException | InterruptedException e) {
throw new RuntimeException("An error occurred while parsing file", e);
}
}
} |
|
Добавлено через 22 минуты
А через параллельные стримы будет оптимально? вот так, например:
Java |
1
2
3
4
5
6
7
8
9
| public void parseFile() {
try {
Stream<String> lines = Files.lines(Paths.get("MyBigFile.txt"));
lines.parallel()
.forEach(System.out::println);
} catch (IOException e) {
e.printStackTrace();
}
} |
|