diff --git a/Civi/DataProcessor/DataFlow/AbstractDataFlow.php b/Civi/DataProcessor/DataFlow/AbstractDataFlow.php
index a860bbffffc8fa87f3157ba95fcbda6f4d4410bb..4fa85a1f990458cd969d9e2b9a7f98c955b550ce 100644
--- a/Civi/DataProcessor/DataFlow/AbstractDataFlow.php
+++ b/Civi/DataProcessor/DataFlow/AbstractDataFlow.php
@@ -153,10 +153,7 @@ abstract class AbstractDataFlow {
       } catch (EndOfFlowException $e) {
         // Do nothing
       }
-      if (count($this->aggregateFields)) {
-        $aggregator = new Aggregator($_allRecords, $this->aggregateFields, $this->dataSpecification);
-        $_allRecords = $aggregator->aggregateRecords($fieldNameprefix);
-      }
+      $_allRecords = $this->aggregate($_allRecords, $fieldNameprefix);
       foreach($_allRecords as $record) {
         $this->_allRecords[] = $this->formatRecordOutput($record);
       }
@@ -279,5 +276,19 @@ abstract class AbstractDataFlow {
     return $compareValue;
   }
 
+  /**
+   * @param $records
+   * @param string $fieldNameprefix
+   *
+   * @return array();
+   */
+  protected function aggregate($records, $fieldNameprefix="") {
+    if (count($this->aggregateFields)) {
+      $aggregator = new Aggregator($records, $this->aggregateFields, $this->dataSpecification);
+      $records = $aggregator->aggregateRecords($fieldNameprefix);
+    }
+    return $records;
+  }
+
 
 }
diff --git a/Civi/DataProcessor/DataFlow/SqlDataFlow.php b/Civi/DataProcessor/DataFlow/SqlDataFlow.php
index 16a503c7a887d82cb89ceed279dc34215105a542..4f518ba7cc7027ab4164da4a0378e1bf17110c9d 100644
--- a/Civi/DataProcessor/DataFlow/SqlDataFlow.php
+++ b/Civi/DataProcessor/DataFlow/SqlDataFlow.php
@@ -8,6 +8,7 @@ namespace Civi\DataProcessor\DataFlow;
 
 use Civi\DataProcessor\DataFlow\Sort\SortSpecification;
 use Civi\DataProcessor\DataFlow\SqlDataFlow\WhereClauseInterface;
+use Civi\DataProcessor\DataFlow\Utils\Aggregator;
 use \Civi\DataProcessor\DataSpecification\DataSpecification;
 
 abstract class SqlDataFlow extends AbstractDataFlow {
@@ -290,4 +291,15 @@ abstract class SqlDataFlow extends AbstractDataFlow {
     return $this->dao;
   }
 
+  /**
+   * @param $records
+   * @param string $fieldNameprefix
+   *
+   * @return array();
+   */
+  protected function aggregate($records, $fieldNameprefix="") {
+    // Aggregation is done in the database.
+    return $records;
+  }
+
 }