Skip to content

Commit

Permalink
ProfileCollectorManager to support child profiler collector
Browse files Browse the repository at this point in the history
ProfileCollectorManager is our collector manager used for profiling
which supports parallel collection. We have integrated it in the DFS
phase which has a very simple collection hierarcy: a top docs collector
only.

In the query phase, we may have a top-level collector that holds two sub
collectors where each one is profiled separately.

This commit adds support for children profilers to
ProfileCollectorManager so that we can use it in the query phase as
well.
  • Loading branch information
javanna committed Jul 5, 2023
1 parent b862707 commit 528f322
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,14 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(getName());
out.writeString(getReason());
out.writeLong(getTime());
out.writeList(getCollectorResults());
out.writeList(getChildrenResults());
}

public List<CollectorResult> getCollectorResults() {
/**
* Exposes a list of children collector results. Same as {@link ProfilerCollectorResult#getProfiledChildren()} with each
* item in the list being cast to a {@link CollectorResult}
*/
public List<CollectorResult> getChildrenResults() {
return super.getProfiledChildren().stream().map(profilerCollectorResult -> (CollectorResult) profilerCollectorResult).toList();
}

Expand All @@ -80,12 +84,12 @@ public boolean equals(Object obj) {
return getName().equals(other.getName())
&& getReason().equals(other.getReason())
&& getTime() == other.getTime()
&& getCollectorResults().equals(other.getCollectorResults());
&& getChildrenResults().equals(other.getChildrenResults());
}

@Override
public int hashCode() {
return Objects.hash(getName(), getReason(), getTime(), getCollectorResults());
return Objects.hash(getName(), getReason(), getTime(), getChildrenResults());
}

@Override
Expand All @@ -105,7 +109,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par

if (getProfiledChildren().isEmpty() == false) {
builder = builder.startArray(CHILDREN.getPreferredName());
for (CollectorResult child : getCollectorResults()) {
for (CollectorResult child : getChildrenResults()) {
builder = child.toXContent(builder, params);
}
builder = builder.endArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,54 +8,112 @@

package org.elasticsearch.search.profile.query;

import org.apache.lucene.sandbox.search.ProfilerCollector;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

/**
* A {@link CollectorManager} that takes another CollectorManager as input and wraps all Collectors generated by it
* in an {@link InternalProfileCollector}. It delegates all the profiling to the generated collectors via {@link #getCollectorTree()}
* and joins them up when its {@link #reduce} method is called. The profile result can
* and joins the different collector trees together when its {@link #reduce} method is called.
* Supports optionally providing sub-collector managers for top docs as well as aggs collection, so that each
* {@link InternalProfileCollector} created is provided with the corresponding sub-collectors that are children of the top-level collector.
* @param <T> the return type of the wrapped collector manager, which the reduce method returns.
*/
public final class ProfileCollectorManager<T> implements CollectorManager<InternalProfileCollector, T> {

private final CollectorManager<Collector, T> collectorManager;
private final String reason;
private final ProfileCollectorManager<?> topDocsSubCollectorManager;
private final ProfileCollectorManager<?> aggsSubCollectorManager;
// this is a bit of a hack: it allows us to retrieve the last collector that newCollector has returned for sub collector managers,
// so that we can provide them to InternalProfileCollector's constructor as children.
// We rely on the fact that newCollector is called by the coordinating thread before parallelizing execution across slices.
private InternalProfileCollector profileCollector;

private CollectorResult collectorTree;

@SuppressWarnings("unchecked")
public ProfileCollectorManager(CollectorManager<? extends Collector, T> collectorManager, String reason) {
this(collectorManager, reason, null, null);
}

@SuppressWarnings("unchecked")
public ProfileCollectorManager(
CollectorManager<? extends Collector, T> collectorManager,
String reason,
ProfileCollectorManager<?> topDocsSubCollectorManager,
ProfileCollectorManager<?> aggsSubCollectorManager
) {
this.collectorManager = (CollectorManager<Collector, T>) collectorManager;
this.reason = reason;
this.topDocsSubCollectorManager = topDocsSubCollectorManager;
this.aggsSubCollectorManager = aggsSubCollectorManager;
}

@Override
public InternalProfileCollector newCollector() throws IOException {
return new InternalProfileCollector(collectorManager.newCollector(), reason);
Collector collector = collectorManager.newCollector();
if (aggsSubCollectorManager == null && topDocsSubCollectorManager == null) {
profileCollector = new InternalProfileCollector(collector, reason);
} else if (aggsSubCollectorManager == null) {
assert topDocsSubCollectorManager.profileCollector != null;
profileCollector = new InternalProfileCollector(collector, reason, topDocsSubCollectorManager.profileCollector);
} else {
assert topDocsSubCollectorManager.profileCollector != null && aggsSubCollectorManager.profileCollector != null;
profileCollector = new InternalProfileCollector(
collector,
reason,
topDocsSubCollectorManager.profileCollector,
aggsSubCollectorManager.profileCollector
);
}
return profileCollector;
}

@Override
public T reduce(Collection<InternalProfileCollector> profileCollectors) throws IOException {
assert profileCollectors.size() > 0 : "at least one collector expected";
List<Collector> unwrapped = profileCollectors.stream()
.map(InternalProfileCollector::getWrappedCollector)
.collect(Collectors.toList());
List<Collector> unwrapped = profileCollectors.stream().map(InternalProfileCollector::getWrappedCollector).toList();
T returnValue = collectorManager.reduce(unwrapped);

List<CollectorResult> resultsPerProfiler = profileCollectors.stream()
.map(ipc -> ipc.getCollectorTree())
.collect(Collectors.toList());

List<CollectorResult> resultsPerProfiler = profileCollectors.stream().map(InternalProfileCollector::getCollectorTree).toList();
long totalTime = resultsPerProfiler.stream().map(CollectorResult::getTime).reduce(0L, Long::sum);
String collectorName = resultsPerProfiler.get(0).getName();
this.collectorTree = new CollectorResult(collectorName, reason, totalTime, Collections.emptyList());
assert profileCollectors.stream().map(ProfilerCollector::getReason).allMatch(reason::equals);
assert profileCollectors.stream().map(ProfilerCollector::getName).allMatch(collectorName::equals);
assert assertChildrenSize(resultsPerProfiler);

List<CollectorResult> childrenResults = new ArrayList<>();
if (topDocsSubCollectorManager != null) {
childrenResults.add(topDocsSubCollectorManager.getCollectorTree());
}
if (aggsSubCollectorManager != null) {
childrenResults.add(aggsSubCollectorManager.getCollectorTree());
}
this.collectorTree = new CollectorResult(collectorName, reason, totalTime, childrenResults);

return returnValue;
}

private boolean assertChildrenSize(List<CollectorResult> resultsPerProfiler) {
int expectedSize = 0;
if (topDocsSubCollectorManager != null) {
expectedSize++;
}
if (aggsSubCollectorManager != null) {
expectedSize++;
}
final int expectedChildrenSize = expectedSize;
return resultsPerProfiler.stream()
.map(collectorResult -> collectorResult.getChildrenResults().size())
.allMatch(integer -> integer == expectedChildrenSize);
}

public CollectorResult getCollectorTree() {
if (this.collectorTree == null) {
throw new IllegalStateException("A collectorTree hasn't been set yet. Call reduce() before attempting to retrieve it");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
assertEquals("SimpleTopScoreDocCollector", (collectorResult.getName()));
assertEquals("search_top_hits", (collectorResult.getReason()));
assertTrue(collectorResult.getTime() > 0);
List<CollectorResult> children = collectorResult.getCollectorResults();
List<CollectorResult> children = collectorResult.getChildrenResults();
if (children.size() > 0) {
long totalTime = 0L;
for (CollectorResult child : children) {
Expand Down
Loading

0 comments on commit 528f322

Please sign in to comment.