Refactor LocalSearch and AutoDetection worker threads

This commit is contained in:
Reinhard Pointner 2016-04-08 22:59:27 +00:00
parent 3bd1655136
commit d73934f09b
6 changed files with 56 additions and 118 deletions

View File

@ -11,7 +11,6 @@ import static net.filebot.util.FileUtilities.*;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
@ -69,7 +68,7 @@ public final class WebServices {
public static final ID3Lookup MediaInfoID3 = new ID3Lookup(); public static final ID3Lookup MediaInfoID3 = new ID3Lookup();
public static EpisodeListProvider[] getEpisodeListProviders() { public static EpisodeListProvider[] getEpisodeListProviders() {
return new EpisodeListProvider[] { TheTVDB, TheMovieDB_TV, AniDB, TVmaze }; return new EpisodeListProvider[] { TheTVDB, AniDB, TheMovieDB_TV, TVmaze };
} }
public static MovieIdentificationService[] getMovieIdentificationServices() { public static MovieIdentificationService[] getMovieIdentificationServices() {
@ -108,6 +107,7 @@ public final class WebServices {
} }
public static final ExecutorService requestThreadPool = Executors.newCachedThreadPool(); public static final ExecutorService requestThreadPool = Executors.newCachedThreadPool();
public static final ExecutorService workerThreadPool = Executors.newWorkStealingPool(getPreferredThreadPoolSize());
public static class TheTVDBClientWithLocalSearch extends TheTVDBClient { public static class TheTVDBClientWithLocalSearch extends TheTVDBClient {
@ -115,29 +115,10 @@ public final class WebServices {
super(apikey); super(apikey);
} }
// index of local thetvdb data dump // local TheTVDB search index
private static LocalSearch<SearchResult> localIndex; private final Resource<LocalSearch<SearchResult>> localIndex = Resource.lazy(() -> {
return new LocalSearch<SearchResult>(releaseInfo.getTheTVDBIndex(), SearchResult::getEffectiveNames);
public synchronized LocalSearch<SearchResult> getLocalIndex() throws Exception { }).memoize();
if (localIndex == null) {
// fetch data dump
SearchResult[] data = releaseInfo.getTheTVDBIndex();
// index data dump
localIndex = new LocalSearch<SearchResult>(asList(data)) {
@Override
protected Set<String> getFields(SearchResult object) {
return set(object.getEffectiveNames());
}
};
// make local search more restrictive
localIndex.setResultMinimumSimilarity(0.7f);
}
return localIndex;
}
private SearchResult merge(SearchResult prime, List<SearchResult> group) { private SearchResult merge(SearchResult prime, List<SearchResult> group) {
int id = prime.getId(); int id = prime.getId();
@ -150,7 +131,7 @@ public final class WebServices {
public List<SearchResult> fetchSearchResult(final String query, final Locale locale) throws Exception { public List<SearchResult> fetchSearchResult(final String query, final Locale locale) throws Exception {
// run local search and API search in parallel // run local search and API search in parallel
Future<List<SearchResult>> apiSearch = requestThreadPool.submit(() -> TheTVDBClientWithLocalSearch.super.fetchSearchResult(query, locale)); Future<List<SearchResult>> apiSearch = requestThreadPool.submit(() -> TheTVDBClientWithLocalSearch.super.fetchSearchResult(query, locale));
Future<List<SearchResult>> localSearch = requestThreadPool.submit(() -> getLocalIndex().search(query)); Future<List<SearchResult>> localSearch = requestThreadPool.submit(() -> localIndex.get().search(query));
// combine alias names into a single search results, and keep API search name as primary name // combine alias names into a single search results, and keep API search name as primary name
Collection<SearchResult> result = StreamEx.of(apiSearch.get()).append(localSearch.get()).groupingBy(SearchResult::getId, collectingAndThen(toList(), group -> merge(group.get(0), group))).values(); Collection<SearchResult> result = StreamEx.of(apiSearch.get()).append(localSearch.get()).groupingBy(SearchResult::getId, collectingAndThen(toList(), group -> merge(group.get(0), group))).values();
@ -166,8 +147,8 @@ public final class WebServices {
} }
@Override @Override
public List<SearchResult> getAnimeTitles() throws Exception { public SearchResult[] getAnimeTitles() throws Exception {
return asList(releaseInfo.getAnidbIndex()); return releaseInfo.getAnidbIndex();
} }
} }
@ -177,34 +158,15 @@ public final class WebServices {
super(name, version); super(name, version);
} }
// index of local OpenSubtitles data dump // local OpenSubtitles search index
private static LocalSearch<SubtitleSearchResult> localIndex; private final Resource<LocalSearch<SubtitleSearchResult>> localIndex = Resource.lazy(() -> {
return new LocalSearch<SubtitleSearchResult>(releaseInfo.getOpenSubtitlesIndex(), SearchResult::getEffectiveNames);
public synchronized LocalSearch<SubtitleSearchResult> getLocalIndex() throws Exception { }).memoize();
if (localIndex == null) {
// fetch data dump
SubtitleSearchResult[] data = releaseInfo.getOpenSubtitlesIndex();
// index data dump
localIndex = new LocalSearch<SubtitleSearchResult>(asList(data)) {
@Override
protected Set<String> getFields(SubtitleSearchResult object) {
return set(object.getEffectiveNames());
}
};
}
return localIndex;
}
@Override @Override
public synchronized List<SubtitleSearchResult> search(final String query) throws Exception { public List<SubtitleSearchResult> search(final String query) throws Exception {
List<SubtitleSearchResult> results = getLocalIndex().search(query); return sortBySimilarity(localIndex.get().search(query), singleton(query), new MetricAvg(getSeriesMatchMetric(), getMovieMatchMetric()));
return sortBySimilarity(results, singleton(query), new MetricAvg(getSeriesMatchMetric(), getMovieMatchMetric()));
} }
} }
/** /**

View File

@ -27,8 +27,6 @@ import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -122,9 +120,7 @@ public class AutoDetection {
Map<Group, Set<File>> groups = new TreeMap<Group, Set<File>>(); Map<Group, Set<File>> groups = new TreeMap<Group, Set<File>>();
// can't use parallel stream because default fork/join pool doesn't play well with the security manager // can't use parallel stream because default fork/join pool doesn't play well with the security manager
ExecutorService executor = Executors.newWorkStealingPool(); stream(files).collect(toMap(f -> f, f -> workerThreadPool.submit(() -> detectGroup(f)))).forEach((file, group) -> {
stream(files).collect(toMap(f -> f, f -> executor.submit(() -> detectGroup(f)))).forEach((file, group) -> {
try { try {
groups.computeIfAbsent(group.get(), k -> new TreeSet<File>()).add(file); groups.computeIfAbsent(group.get(), k -> new TreeSet<File>()).add(file);
} catch (Exception e) { } catch (Exception e) {
@ -132,7 +128,6 @@ public class AutoDetection {
} }
}); });
executor.shutdown();
return groups; return groups;
} }

View File

@ -8,7 +8,6 @@ import static net.filebot.WebServices.*;
import java.awt.Component; import java.awt.Component;
import java.io.File; import java.io.File;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
@ -16,9 +15,9 @@ import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.Future;
import java.util.concurrent.Executors;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.stream.Stream;
import net.filebot.media.AutoDetection; import net.filebot.media.AutoDetection;
import net.filebot.media.AutoDetection.Group; import net.filebot.media.AutoDetection.Group;
@ -38,21 +37,19 @@ class AutoDetectMatcher implements AutoCompleteMatcher {
Map<Group, Set<File>> groups = new AutoDetection(files, false, locale).group(); Map<Group, Set<File>> groups = new AutoDetection(files, false, locale).group();
// can't use parallel stream because default fork/join pool doesn't play well with the security manager // can't use parallel stream because default fork/join pool doesn't play well with the security manager
ExecutorService executor = Executors.newWorkStealingPool(); Map<Group, Future<List<Match<File, ?>>>> matches = groups.entrySet().stream().collect(toMap(Entry::getKey, it -> {
List<Match<File, ?>> result = new ArrayList<Match<File, ?>>(); return workerThreadPool.submit(() -> match(it.getKey(), it.getValue(), strict, order, locale, autodetection, parent));
}));
groups.entrySet().stream().collect(toMap(Entry::getKey, it -> { // collect results
return executor.submit(() -> match(it.getKey(), it.getValue(), strict, order, locale, autodetection, parent)); return matches.entrySet().stream().flatMap(it -> {
})).forEach((group, matches) -> {
try { try {
result.addAll(matches.get()); return it.getValue().get().stream();
} catch (Exception e) { } catch (Exception e) {
log.log(Level.WARNING, "Failed to process group: " + group, e); log.log(Level.WARNING, "Failed to process group: %s" + it.getKey(), e);
} }
}); return Stream.empty();
}).collect(toList());
executor.shutdown();
return result;
} }
private List<Match<File, ?>> match(Group group, Collection<File> files, boolean strict, SortOrder order, Locale locale, boolean autodetection, Component parent) throws Exception { private List<Match<File, ?>> match(Group group, Collection<File> files, boolean strict, SortOrder order, Locale locale, boolean autodetection, Component parent) throws Exception {

View File

@ -21,7 +21,6 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -35,6 +34,7 @@ import org.w3c.dom.Node;
import net.filebot.Cache; import net.filebot.Cache;
import net.filebot.CacheType; import net.filebot.CacheType;
import net.filebot.Resource;
import net.filebot.ResourceManager; import net.filebot.ResourceManager;
public class AnidbClient extends AbstractEpisodeListProvider { public class AnidbClient extends AbstractEpisodeListProvider {
@ -80,16 +80,14 @@ public class AnidbClient extends AbstractEpisodeListProvider {
return fetchSearchResult(query, locale); return fetchSearchResult(query, locale);
} }
// local AniDB search index
private final Resource<LocalSearch<SearchResult>> localIndex = Resource.lazy(() -> {
return new LocalSearch<SearchResult>(getAnimeTitles(), SearchResult::getEffectiveNames);
}).memoize();
@Override @Override
public List<SearchResult> fetchSearchResult(String query, Locale locale) throws Exception { public List<SearchResult> fetchSearchResult(String query, Locale locale) throws Exception {
LocalSearch<SearchResult> index = new LocalSearch<SearchResult>(getAnimeTitles()) { return localIndex.get().search(query);
@Override
protected Set<String> getFields(SearchResult it) {
return set(it.getEffectiveNames());
}
};
return new ArrayList<SearchResult>(index.search(query));
} }
@Override @Override
@ -183,7 +181,7 @@ public class AnidbClient extends AbstractEpisodeListProvider {
/** /**
* This method is overridden in {@link net.filebot.WebServices.AnidbClientWithLocalSearch} to fetch the Anime Index from our own host and not anidb.net * This method is overridden in {@link net.filebot.WebServices.AnidbClientWithLocalSearch} to fetch the Anime Index from our own host and not anidb.net
*/ */
public synchronized List<SearchResult> getAnimeTitles() throws Exception { public synchronized SearchResult[] getAnimeTitles() throws Exception {
// get data file (unzip and cache) // get data file (unzip and cache)
byte[] bytes = getCache("root").bytes("anime-titles.dat.gz", n -> new URL("http://anidb.net/api/" + n)).get(); byte[] bytes = getCache("root").bytes("anime-titles.dat.gz", n -> new URL("http://anidb.net/api/" + n)).get();
@ -230,24 +228,20 @@ public class AnidbClient extends AbstractEpisodeListProvider {
} }
// build up a list of all possible AniDB search results // build up a list of all possible AniDB search results
List<SearchResult> anime = new ArrayList<SearchResult>(entriesByAnime.size()); return entriesByAnime.entrySet().stream().map(it -> {
List<String> names = it.getValue().stream().sorted((a, b) -> {
entriesByAnime.forEach((aid, triples) -> {
List<String> names = triples.stream().sorted((a, b) -> {
for (int i = 0; i < a.length; i++) { for (int i = 0; i < a.length; i++) {
if (!a[i].equals(b[i])) { if (!a[i].equals(b[i])) {
return ((Comparable) a[i]).compareTo(b[i]); return ((Comparable) a[i]).compareTo(b[i]);
} }
} }
return 0; return 0;
}).map(it -> (String) it[2]).collect(toList()); }).map(n -> n[2].toString()).collect(toList());
String primaryTitle = names.get(0); String primaryTitle = names.get(0);
List<String> aliasNames = names.subList(1, names.size()); List<String> aliasNames = names.subList(1, names.size());
anime.add(new SearchResult(aid, primaryTitle, aliasNames)); return new SearchResult(it.getKey(), primaryTitle, aliasNames);
}); }).toArray(SearchResult[]::new);
return anime;
} }
} }

View File

@ -1,19 +1,19 @@
package net.filebot.web; package net.filebot.web;
import static java.util.Collections.*; import static java.util.Arrays.*;
import static java.util.Collections.reverseOrder;
import static java.util.Comparator.*; import static java.util.Comparator.*;
import static java.util.stream.Collectors.*; import static java.util.stream.Collectors.*;
import static net.filebot.similarity.Normalization.*; import static net.filebot.similarity.Normalization.*;
import java.util.AbstractMap.SimpleImmutableEntry; import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import com.ibm.icu.text.Transliterator; import com.ibm.icu.text.Transliterator;
@ -29,20 +29,20 @@ public class LocalSearch<T> {
private Transliterator transliterator = Transliterator.getInstance("Any-Latin;Latin-ASCII;[:Diacritic:]remove"); private Transliterator transliterator = Transliterator.getInstance("Any-Latin;Latin-ASCII;[:Diacritic:]remove");
private List<T> objects; private T[] objects;
private List<Set<String>> fields; private Set<String>[] fields;
public LocalSearch(Collection<? extends T> data) { public LocalSearch(T[] data, Function<T, Collection<String>> keywords) {
objects = new ArrayList<T>(data); objects = data.clone();
fields = objects.stream().map(this::getFields).collect(toList()); fields = stream(objects).map(keywords).map(this::normalize).toArray(Set[]::new);
} }
public List<T> search(String q) throws ExecutionException, InterruptedException { public List<T> search(String q) throws ExecutionException, InterruptedException {
String query = normalize(q); String query = normalize(q);
return IntStream.range(0, objects.size()).mapToObj(i -> { return IntStream.range(0, objects.length).mapToObj(i -> {
T object = objects.get(i); T object = objects[i];
Set<String> field = fields.get(i); Set<String> field = fields[i];
boolean match = field.stream().anyMatch(it -> it.contains(query)); boolean match = field.stream().anyMatch(it -> it.contains(query));
double similarity = field.stream().mapToDouble(it -> metric.getSimilarity(query, it)).max().orElse(0); double similarity = field.stream().mapToDouble(it -> metric.getSimilarity(query, it)).max().orElse(0);
@ -59,22 +59,12 @@ public class LocalSearch<T> {
this.resultSetSize = resultSetSize; this.resultSetSize = resultSetSize;
} }
protected Set<String> getFields(T object) { protected Set<String> normalize(Collection<String> values) {
return set(singleton(object.toString())); return values.stream().map(this::normalize).collect(toSet());
}
protected Set<String> set(Collection<String> values) {
Set<String> set = new HashSet<String>(values.size());
for (String value : values) {
if (value != null) {
set.add(normalize(value));
}
}
return set;
} }
protected String normalize(String value) { protected String normalize(String value) {
// normalize separator, normalize case and trim // normalize separator, trim and normalize case
return normalizePunctuation(transliterator.transform(value)).toLowerCase(); return normalizePunctuation(transliterator.transform(value)).toLowerCase();
} }

View File

@ -28,8 +28,8 @@ public class AnidbClientTest {
@Test @Test
public void getAnimeTitles() throws Exception { public void getAnimeTitles() throws Exception {
List<SearchResult> animeTitles = anidb.getAnimeTitles(); SearchResult[] animeTitles = anidb.getAnimeTitles();
assertTrue(animeTitles.size() > 8000); assertTrue(animeTitles.length > 8000);
} }
@Test @Test