Make sure that we can cancel worker pools if something goes wrong and an exception is thrown

This commit is contained in:
Reinhard Pointner 2016-04-08 22:59:41 +00:00
parent e3be1e1bad
commit 1a4c66d977
5 changed files with 99 additions and 74 deletions

View File

@ -107,7 +107,6 @@ public final class WebServices {
}
public static final ExecutorService requestThreadPool = Executors.newCachedThreadPool();
public static final ExecutorService workerThreadPool = Executors.newWorkStealingPool(getPreferredThreadPoolSize());
public static class TheTVDBClientWithLocalSearch extends TheTVDBClient {

View File

@ -27,6 +27,8 @@ import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.regex.Pattern;
import java.util.stream.Stream;
@ -120,13 +122,18 @@ public class AutoDetection {
Map<Group, Set<File>> groups = new TreeMap<Group, Set<File>>();
// can't use parallel stream because default fork/join pool doesn't play well with the security manager
stream(files).collect(toMap(f -> f, f -> workerThreadPool.submit(() -> detectGroup(f)))).forEach((file, group) -> {
try {
groups.computeIfAbsent(group.get(), k -> new TreeSet<File>()).add(file);
} catch (Exception e) {
debug.log(Level.SEVERE, e.getMessage(), e);
}
});
ExecutorService workerThreadPool = Executors.newWorkStealingPool();
try {
stream(files).collect(toMap(f -> f, f -> workerThreadPool.submit(() -> detectGroup(f)))).forEach((file, group) -> {
try {
groups.computeIfAbsent(group.get(), k -> new TreeSet<File>()).add(file);
} catch (Exception e) {
debug.log(Level.SEVERE, e.getMessage(), e);
}
});
} finally {
workerThreadPool.shutdownNow();
}
return groups;
}

View File

@ -15,6 +15,8 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.stream.Stream;
@ -37,19 +39,24 @@ class AutoDetectMatcher implements AutoCompleteMatcher {
Map<Group, Set<File>> groups = new AutoDetection(files, false, locale).group();
// can't use parallel stream because default fork/join pool doesn't play well with the security manager
Map<Group, Future<List<Match<File, ?>>>> matches = groups.entrySet().stream().collect(toMap(Entry::getKey, it -> {
return workerThreadPool.submit(() -> match(it.getKey(), it.getValue(), strict, order, locale, autodetection, parent));
}));
ExecutorService workerThreadPool = Executors.newWorkStealingPool();
try {
Map<Group, Future<List<Match<File, ?>>>> matches = groups.entrySet().stream().collect(toMap(Entry::getKey, it -> {
return workerThreadPool.submit(() -> match(it.getKey(), it.getValue(), strict, order, locale, autodetection, parent));
}));
// collect results
return matches.entrySet().stream().flatMap(it -> {
try {
return it.getValue().get().stream();
} catch (Exception e) {
log.log(Level.WARNING, "Failed to process group: %s" + it.getKey(), e);
}
return Stream.empty();
}).collect(toList());
// collect results
return matches.entrySet().stream().flatMap(it -> {
try {
return it.getValue().get().stream();
} catch (Exception e) {
log.log(Level.WARNING, "Failed to process group: %s" + it.getKey(), e);
}
return Stream.empty();
}).collect(toList());
} finally {
workerThreadPool.shutdownNow();
}
}
private List<Match<File, ?>> match(Group group, Collection<File> files, boolean strict, SortOrder order, Locale locale, boolean autodetection, Component parent) throws Exception {

View File

@ -24,6 +24,8 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
@ -51,7 +53,7 @@ class EpisodeListMatcher implements AutoCompleteMatcher {
private boolean anime;
// only allow one fetch session at a time so later requests can make use of cached results
private final Object providerLock = new Object();
private Object providerLock = new Object();
public EpisodeListMatcher(EpisodeListProvider provider, boolean anime) {
this.provider = provider;
@ -141,7 +143,7 @@ class EpisodeListMatcher implements AutoCompleteMatcher {
return provider.getEpisodeList(selectedSearchResult, sortOrder, locale);
}
}
return new ArrayList<Episode>();
return (List<Episode>) EMPTY_LIST;
});
}).collect(toList());
@ -169,38 +171,44 @@ class EpisodeListMatcher implements AutoCompleteMatcher {
Map<String, SearchResult> selectionMemory = new TreeMap<String, SearchResult>(CommonSequenceMatcher.getLenientCollator(Locale.ENGLISH));
Map<String, List<String>> inputMemory = new TreeMap<String, List<String>>(CommonSequenceMatcher.getLenientCollator(Locale.ENGLISH));
// detect series names and create episode list fetch tasks
List<Future<List<Match<File, ?>>>> tasks = new ArrayList<Future<List<Match<File, ?>>>>();
if (strict) {
// in strict mode simply process file-by-file (ignoring all files that don't contain clear SxE patterns)
mediaFiles.stream().filter(f -> isEpisode(f, false)).map(f -> {
return workerThreadPool.submit(() -> {
return matchEpisodeSet(singletonList(f), detectSeriesNames(singleton(f), anime, locale), sortOrder, strict, locale, autodetection, selectionMemory, inputMemory, parent);
});
}).forEach(tasks::add);
} else {
// in non-strict mode use the complicated (more powerful but also more error prone) match-batch-by-batch logic
mapSeriesNamesByFiles(mediaFiles, locale, anime).forEach((f, n) -> {
// 1. handle series name batch set all at once -> only 1 batch set
// 2. files don't seem to belong to any series -> handle folder per folder -> multiple batch sets
Collection<List<File>> batches = n != null && n.size() > 0 ? singleton(new ArrayList<File>(f)) : mapByFolder(f).values();
batches.stream().map(b -> {
return workerThreadPool.submit(() -> {
return matchEpisodeSet(b, n, sortOrder, strict, locale, autodetection, selectionMemory, inputMemory, parent);
});
}).forEach(tasks::add);
});
}
// merge episode matches
List<Match<File, ?>> matches = new ArrayList<Match<File, ?>>();
for (Future<List<Match<File, ?>>> future : tasks) {
// make sure each episode has unique object data
for (Match<File, ?> it : future.get()) {
matches.add(new Match<File, Episode>(it.getValue(), ((Episode) it.getCandidate()).clone()));
ExecutorService workerThreadPool = Executors.newWorkStealingPool();
try {
// detect series names and create episode list fetch tasks
List<Future<List<Match<File, ?>>>> tasks = new ArrayList<Future<List<Match<File, ?>>>>();
if (strict) {
// in strict mode simply process file-by-file (ignoring all files that don't contain clear SxE patterns)
mediaFiles.stream().filter(f -> isEpisode(f, false)).map(f -> {
return workerThreadPool.submit(() -> {
return matchEpisodeSet(singletonList(f), detectSeriesNames(singleton(f), anime, locale), sortOrder, strict, locale, autodetection, selectionMemory, inputMemory, parent);
});
}).forEach(tasks::add);
} else {
// in non-strict mode use the complicated (more powerful but also more error prone) match-batch-by-batch logic
mapSeriesNamesByFiles(mediaFiles, locale, anime).forEach((f, n) -> {
// 1. handle series name batch set all at once -> only 1 batch set
// 2. files don't seem to belong to any series -> handle folder per folder -> multiple batch sets
Collection<List<File>> batches = n != null && n.size() > 0 ? singleton(new ArrayList<File>(f)) : mapByFolder(f).values();
batches.stream().map(b -> {
return workerThreadPool.submit(() -> {
return matchEpisodeSet(b, n, sortOrder, strict, locale, autodetection, selectionMemory, inputMemory, parent);
});
}).forEach(tasks::add);
});
}
for (Future<List<Match<File, ?>>> future : tasks) {
// make sure each episode has unique object data
for (Match<File, ?> it : future.get()) {
matches.add(new Match<File, Episode>(it.getValue(), ((Episode) it.getCandidate()).clone()));
}
}
} finally {
workerThreadPool.shutdownNow();
}
// handle derived files

View File

@ -5,7 +5,6 @@ import static java.util.Comparator.*;
import static java.util.stream.Collectors.*;
import static net.filebot.Logging.*;
import static net.filebot.MediaTypes.*;
import static net.filebot.WebServices.*;
import static net.filebot.media.MediaDetection.*;
import static net.filebot.similarity.CommonSequenceMatcher.*;
import static net.filebot.similarity.Normalization.*;
@ -28,6 +27,8 @@ import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
@ -145,31 +146,32 @@ class MovieMatcher implements AutoCompleteMatcher {
movieMatchFiles.addAll(filter(orphanedFiles, SUBTITLE_FILES)); // run movie detection only on orphaned subtitle files
// match remaining movies file by file in parallel
List<Future<Map<File, List<Movie>>>> tasks = movieMatchFiles.stream().filter(f -> movieByFile.get(f) == null).map(f -> {
return workerThreadPool.submit(() -> {
if (strict) {
// in strict mode, only process movies that follow the name (year) pattern
List<Integer> year = parseMovieYear(getRelativePathTail(f, 3).getPath());
if (year.isEmpty() || isEpisode(f, true)) {
return null;
ExecutorService workerThreadPool = Executors.newWorkStealingPool();
try {
List<Future<Map<File, List<Movie>>>> tasks = movieMatchFiles.stream().filter(f -> movieByFile.get(f) == null).map(f -> {
return workerThreadPool.submit(() -> {
if (strict) {
// in strict mode, only process movies that follow the name (year) pattern
List<Integer> year = parseMovieYear(getRelativePathTail(f, 3).getPath());
if (year.isEmpty() || isEpisode(f, true)) {
return (Map<File, List<Movie>>) EMPTY_MAP;
}
// allow only movie matches where the the movie year matches the year pattern in the filename
return singletonMap(f, detectMovie(f, service, locale, strict).stream().filter(m -> year.contains(m.getYear())).collect(toList()));
} else {
// in non-strict mode just allow all options
return singletonMap(f, detectMovie(f, service, locale, strict));
}
});
}).collect(toList());
// allow only movie matches where the the movie year matches the year pattern in the filename
return singletonMap(f, detectMovie(f, service, locale, strict).stream().filter(m -> year.contains(m.getYear())).collect(toList()));
} else {
// in non-strict mode just allow all options
return singletonMap(f, detectMovie(f, service, locale, strict));
}
});
}).collect(toList());
// remember user decisions and only bother user once
Map<String, Object> memory = new HashMap<String, Object>();
memory.put(MEMORY_INPUT, new TreeMap<String, String>(getLenientCollator(locale)));
memory.put(MEMORY_SELECTION, new TreeMap<String, String>(getLenientCollator(locale)));
// remember user decisions and only bother user once
Map<String, Object> memory = new HashMap<String, Object>();
memory.put(MEMORY_INPUT, new TreeMap<String, String>(getLenientCollator(locale)));
memory.put(MEMORY_SELECTION, new TreeMap<String, String>(getLenientCollator(locale)));
for (Future<Map<File, List<Movie>>> future : tasks) {
if (future.get() != null) {
for (Future<Map<File, List<Movie>>> future : tasks) {
for (Entry<File, List<Movie>> it : future.get().entrySet()) {
// auto-select movie or ask user
Movie movie = grabMovieName(it.getKey(), it.getValue(), strict, locale, autodetect, memory, parent);
@ -180,6 +182,8 @@ class MovieMatcher implements AutoCompleteMatcher {
}
}
}
} finally {
workerThreadPool.shutdownNow();
}
// map movies to (possibly multiple) files (in natural order)