diff --git a/CRM/Dataprocessor/BAO/DataProcessor.php b/CRM/Dataprocessor/BAO/DataProcessor.php
index 14e09256d3f667b93ebc83461ebf944b6b86db51..bb863b8a4553f8520e558cdb57f70776d537937e 100644
--- a/CRM/Dataprocessor/BAO/DataProcessor.php
+++ b/CRM/Dataprocessor/BAO/DataProcessor.php
@@ -226,22 +226,16 @@ class CRM_Dataprocessor_BAO_DataProcessor extends CRM_Dataprocessor_DAO_DataProc
     $dataProcessor = $factory->getDataProcessorTypeByName($this->type);
     $sources = CRM_Dataprocessor_BAO_Source::getValues(array('data_processor_id' => $this->id));
     foreach($sources as $sourceDao) {
-      $source = $factory->getDataSourceByName($sourceDao['type']);
-      $source->setSourceName($sourceDao['name']);
-      $source->setSourceTitle($sourceDao['title']);
-      $source->initialize($sourceDao['configuration']);
-      $join = null;
-      if ($sourceDao['join_type']) {
-        $join = $factory->getJoinByName($sourceDao['join_type']);
-        $join->initialize($sourceDao['join_configuration'], $this->id);
-      }
-      $dataProcessor->addDataSource($source, $join);
+      CRM_Dataprocessor_BAO_Source::getSourceClass($sourceDao, $dataProcessor);
     }
 
-    $aggregationFields = CRM_Dataprocessor_BAO_DataProcessor::getAvailableAggregationFields($this->dataProcessorId);
+    $aggregationFields = CRM_Dataprocessor_BAO_DataProcessor::getAvailableAggregationFields($this->id);
     if (is_string($this->aggregation)) {
       $this->aggregation = json_decode($this->aggregation, true);
     }
+    if (!is_array($this->aggregation)) {
+      $this->aggregation = array();
+    }
     foreach($this->aggregation as $alias) {
       $dataSource = $dataProcessor->getDataSourceByName($aggregationFields[$alias]->dataSource->getSourceName());
       if ($dataSource) {
@@ -268,7 +262,6 @@ class CRM_Dataprocessor_BAO_DataProcessor extends CRM_Dataprocessor_DAO_DataProc
         $dataProcessor->addOutputFieldHandlers($outputHandler);
       }
     }
-
     return $dataProcessor;
   }
 
@@ -280,16 +273,7 @@ class CRM_Dataprocessor_BAO_DataProcessor extends CRM_Dataprocessor_DAO_DataProc
     $dataProcessor = $factory->getDataProcessorTypeByName($dao->type);
     $sources = CRM_Dataprocessor_BAO_Source::getValues(array('data_processor_id' => $dao->id));
     foreach($sources as $sourceDao) {
-      $source = $factory->getDataSourceByName($sourceDao['type']);
-      $source->setSourceName($sourceDao['name']);
-      $source->setSourceTitle($sourceDao['title']);
-      $source->initialize($sourceDao['configuration']);
-      $join = null;
-      if ($sourceDao['join_type']) {
-        $join = $factory->getJoinByName($sourceDao['join_type']);
-        $join->initialize($sourceDao['join_configuration'], $dao->id);
-      }
-      $dataProcessor->addDataSource($source, $join);
+      CRM_Dataprocessor_BAO_Source::getSourceClass($sourceDao, $dataProcessor);
     }
 
     return $dataProcessor->getAvailableOutputHandlers();
@@ -303,16 +287,7 @@ class CRM_Dataprocessor_BAO_DataProcessor extends CRM_Dataprocessor_DAO_DataProc
     $dataProcessor = $factory->getDataProcessorTypeByName($dao->type);
     $sources = CRM_Dataprocessor_BAO_Source::getValues(array('data_processor_id' => $dao->id));
     foreach($sources as $sourceDao) {
-      $source = $factory->getDataSourceByName($sourceDao['type']);
-      $source->setSourceName($sourceDao['name']);
-      $source->setSourceTitle($sourceDao['title']);
-      $source->initialize($sourceDao['configuration']);
-      $join = null;
-      if ($sourceDao['join_type']) {
-        $join = $factory->getJoinByName($sourceDao['join_type']);
-        $join->initialize($sourceDao['join_configuration'], $dao->id);
-      }
-      $dataProcessor->addDataSource($source, $join);
+      CRM_Dataprocessor_BAO_Source::getSourceClass($sourceDao, $dataProcessor);
     }
 
     return $dataProcessor->getAvailableFilterHandlers();
@@ -320,13 +295,14 @@ class CRM_Dataprocessor_BAO_DataProcessor extends CRM_Dataprocessor_DAO_DataProc
 
   public static function getAvailableAggregationFields($data_processor_id) {
     $availableAggregationFields = array();
+    $dao = new CRM_Dataprocessor_BAO_DataProcessor();
+    $dao->id = $data_processor_id;
+    $dao->find(true);
     $factory = dataprocessor_get_factory();
-    $sources = CRM_Dataprocessor_BAO_Source::getValues(array('data_processor_id' => $data_processor_id));
+    $dataProcessor = $factory->getDataProcessorTypeByName($dao->type);
+    $sources = CRM_Dataprocessor_BAO_Source::getValues(array('data_processor_id' => $dao->id));
     foreach($sources as $sourceDao) {
-      $source = $factory->getDataSourceByName($sourceDao['type']);
-      $source->setSourceName($sourceDao['name']);
-      $source->setSourceTitle($sourceDao['title']);
-      $source->initialize($sourceDao['configuration']);
+      $source = CRM_Dataprocessor_BAO_Source::getSourceClass($sourceDao, $dataProcessor);
       $availableAggregationFields = array_merge($availableAggregationFields, $source->getAvailableAggregationFields());
     }
 
diff --git a/CRM/Dataprocessor/BAO/Source.php b/CRM/Dataprocessor/BAO/Source.php
index 2b0ce0214e793a021fc0426229a14970b13fd3fc..de05a21baddb6d57be41c4f818562ed276c353ed 100644
--- a/CRM/Dataprocessor/BAO/Source.php
+++ b/CRM/Dataprocessor/BAO/Source.php
@@ -180,4 +180,30 @@ class CRM_Dataprocessor_BAO_Source extends CRM_Dataprocessor_DAO_Source {
     }
   }
 
+  /**
+   * @param $source
+   * @param \Civi\DataProcessor\ProcessorType\AbstractProcessorType $dataProcessor
+   * @return \Civi\DataProcessor\Source\SourceInterface
+   */
+  public static function getSourceClass($source, \Civi\DataProcessor\ProcessorType\AbstractProcessorType $dataProcessor) {
+    $factory = dataprocessor_get_factory();
+    $sourceClass = $factory->getDataSourceByName($source['type']);
+    $sourceClass->setSourceName($source['name']);
+    $sourceClass->setSourceTitle($source['title']);
+    $sourceClass->setConfiguration($source['configuration']);
+    $sourceClass->setDataProcessor($dataProcessor);
+    $join = null;
+    if ($source['join_type']) {
+      $join = $factory->getJoinByName($source['join_type']);
+      $join->setConfiguration($source['join_configuration']);
+      $join->setDataProcessor($dataProcessor);
+    }
+    $dataProcessor->addDataSource($sourceClass, $join);
+    if ($join) {
+      $join->initialize();
+      $sourceClass->setJoin($join);
+    }
+    return $sourceClass;
+  }
+
 }
\ No newline at end of file
diff --git a/CRM/Dataprocessor/Form/Join/Simple.php b/CRM/Dataprocessor/Form/Join/Simple.php
index e4989d84e90a8a8408ae06da8ad3dda1600cacb9..7b646616ee6520a48b1f065fa01a21d6ce0294b7 100644
--- a/CRM/Dataprocessor/Form/Join/Simple.php
+++ b/CRM/Dataprocessor/Form/Join/Simple.php
@@ -110,7 +110,7 @@ class CRM_Dataprocessor_Form_Join_Simple extends CRM_Core_Form {
       $sourceClass->initialize($source['configuration'], $source['name']);
       $sourceFields = $sourceClass->getAvailableFields()->getFields();
       foreach($sourceFields as $sourceField) {
-        $fields[$source['name'].'.'.$sourceField->name] = $source['title'] . '::'.$sourceField->name;
+        $fields[$source['name'] . '.' . $sourceField->name] = $source['title'] . ' :: ' . $sourceField->title;
       }
 
       if ($source['id'] == $this->source_id) {
diff --git a/Civi/DataProcessor/DataFlow/MultipleDataFlows/DataFlowDescription.php b/Civi/DataProcessor/DataFlow/MultipleDataFlows/DataFlowDescription.php
index 12ad1c2c6f6db70c253ccf7fb204f95069f255a8..0f1bf69af3c2f301e42bd29ff3f4281215be4234 100644
--- a/Civi/DataProcessor/DataFlow/MultipleDataFlows/DataFlowDescription.php
+++ b/Civi/DataProcessor/DataFlow/MultipleDataFlows/DataFlowDescription.php
@@ -16,7 +16,7 @@ class DataFlowDescription {
   /**
    * @var \Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinSpecification
    */
-  protected $joinSpecification = null;
+  protected $joinSpecification = array();
 
   public function __construct($datFlow, $joinSpecification = null) {
     $this->dataFlow = $datFlow;
diff --git a/Civi/DataProcessor/DataFlow/MultipleDataFlows/JoinInterface.php b/Civi/DataProcessor/DataFlow/MultipleDataFlows/JoinInterface.php
index 26812b2b0d2694eefa941cb848bf9d8cbdb7cdf5..362352c88199b1a01be240e308723ecb312d809c 100644
--- a/Civi/DataProcessor/DataFlow/MultipleDataFlows/JoinInterface.php
+++ b/Civi/DataProcessor/DataFlow/MultipleDataFlows/JoinInterface.php
@@ -6,6 +6,9 @@
 
 namespace Civi\DataProcessor\DataFlow\MultipleDataFlows;
 
+use Civi\DataProcessor\DataFlow\AbstractDataFlow;
+use Civi\DataProcessor\ProcessorType\AbstractProcessorType;
+
 interface JoinInterface{
 
   /**
@@ -19,13 +22,34 @@ interface JoinInterface{
    */
   public function isJoinable($left_record, $right_record);
 
+  /**
+   * Returns true when this join is compatible with this data flow
+   *
+   * @param \Civi\DataProcessor\DataFlow\AbstractDataFlow $
+   * @return bool
+   */
+  public function worksWithDataFlow(AbstractDataFlow $dataFlow);
+
+  /**
+   * Initialize the join
+   *
+   * @return void
+   */
+  public function initialize();
+
+  /**
+   * @param AbstractProcessorType $dataProcessor
+   *
+   * @return \Civi\DataProcessor\Source\SourceInterface
+   */
+  public function setDataProcessor(AbstractProcessorType $dataProcessor);
+
   /**
    * @param array $configuration
-   * @param int $data_processor_id
    *
-   * @return \Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinInterface
+   * @return \Civi\DataProcessor\Source\SourceInterface
    */
-  public function initialize($configuration, $data_processor_id);
+  public function setConfiguration($configuration);
 
   /**
    * Returns the URL for the configuration form of the join specification
diff --git a/Civi/DataProcessor/DataFlow/MultipleDataFlows/SimpleJoin.php b/Civi/DataProcessor/DataFlow/MultipleDataFlows/SimpleJoin.php
index 46503597567b1012a04ce53b2edff806b51efb58..bcd4c2e7dfd01cb948c419b5638f615bb587611b 100644
--- a/Civi/DataProcessor/DataFlow/MultipleDataFlows/SimpleJoin.php
+++ b/Civi/DataProcessor/DataFlow/MultipleDataFlows/SimpleJoin.php
@@ -6,11 +6,16 @@
 
 namespace Civi\DataProcessor\DataFlow\MultipleDataFlows;
 
+use Civi\DataProcessor\DataFlow\AbstractDataFlow;
 use Civi\DataProcessor\DataFlow\CombinedDataFlow\CombinedSqlDataFlow;
+use Civi\DataProcessor\DataFlow\SqlDataFlow;
 use Civi\DataProcessor\DataFlow\SqlTableDataFlow;
+use Civi\DataProcessor\ProcessorType\AbstractProcessorType;
 
 class SimpleJoin implements JoinInterface, SqlJoinInterface {
 
+  private $isInitialized = false;
+
   /**
    * @var string
    *   The name of the left field
@@ -35,12 +40,27 @@ class SimpleJoin implements JoinInterface, SqlJoinInterface {
    */
   private $right_prefix;
 
+  /**
+   * @var String
+   */
+  private $right_table;
+
+  /**
+   * @var String
+   */
+  private $left_table;
+
   /**
    * @var String
    *   The join type, e.g. INNER, LEFT, OUT etc..
    */
   private $type = "INNER";
 
+  /**
+   * @var AbstractProcessorType
+   */
+  private $dataProcessor;
+
   public function __construct($left_prefix = null, $left_field = null, $right_prefix = null, $right_field = null, $type = "INNER") {
     $this->left_prefix = $left_prefix;
     $this->left_field = $left_field;
@@ -58,11 +78,10 @@ class SimpleJoin implements JoinInterface, SqlJoinInterface {
 
   /**
    * @param array $configuration
-   * @param int $data_processor_id
    *
    * @return \Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinInterface
    */
-  public function initialize($configuration, $data_processor_id) {
+  public function setConfiguration($configuration) {
     if (isset($configuration['left_field'])) {
       $this->left_field = $configuration['left_field'];
     }
@@ -78,6 +97,65 @@ class SimpleJoin implements JoinInterface, SqlJoinInterface {
     if (isset($configuration['type'])) {
       $this->type = $configuration['type'];
     }
+    return $this;
+  }
+
+  /**
+   * @param AbstractProcessorType $dataProcessor
+   * @return \Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinInterface
+   * @throws \Exception
+   */
+  public function setDataProcessor(AbstractProcessorType $dataProcessor) {
+    $this->dataProcessor = $dataProcessor;
+  }
+
+  /**
+   * Returns true when this join is compatible with this data flow
+   *
+   * @param \Civi\DataProcessor\DataFlow\AbstractDataFlow $
+   * @return bool
+   */
+  public function worksWithDataFlow(AbstractDataFlow $dataFlow) {
+    if (!$dataFlow instanceof SqlDataFlow) {
+      return false;
+    }
+    $this->initialize();
+    if ($dataFlow->getTableAlias() == $this->left_table) {
+      return true;
+    }
+    if ($dataFlow->getTableAlias() == $this->right_table) {
+      return true;
+    }
+    return false;
+  }
+
+  public function initialize() {
+    if ($this->isInitialized) {
+      return $this;
+    }
+    if ($this->left_prefix && $this->left_field) {
+      $this->left_table = $this->left_prefix;
+      $left_source = $this->dataProcessor->getDataSourceByName($this->left_prefix);
+      if ($left_source) {
+        $leftTable = $left_source->ensureField($this->left_field);
+        if ($leftTable && $leftTable instanceof SqlTableDataFlow) {
+          $this->left_table = $leftTable->getTableAlias();
+        }
+      }
+    }
+    if ($this->right_prefix && $this->right_field) {
+      $this->right_table = $this->right_prefix;
+      $right_source = $this->dataProcessor->getDataSourceByName($this->right_prefix);
+      if ($right_source) {
+        $rightTable = $right_source->ensureField($this->right_field);
+        if ($rightTable && $rightTable instanceof SqlTableDataFlow) {
+          $this->right_table = $rightTable->getTableAlias();
+        }
+      }
+    }
+
+    $this->isInitialized = true;
+    return $this;
   }
 
   /**
@@ -112,9 +190,10 @@ class SimpleJoin implements JoinInterface, SqlJoinInterface {
    * @return string
    */
   public function getJoinClause(DataFlowDescription $sourceDataFlowDescription) {
+    $this->initialize();
     $joinClause = "";
     if ($sourceDataFlowDescription->getJoinSpecification()) {
-      $joinClause = "ON `{$this->left_prefix}`.`{$this->left_field}` = `{$this->right_prefix}`.`{$this->right_field}`";
+      $joinClause = "ON `{$this->left_table}`.`{$this->left_field}` = `{$this->right_table}`.`{$this->right_field}`";
     }
     if ($sourceDataFlowDescription->getDataFlow() instanceof SqlTableDataFlow) {
       $table = $sourceDataFlowDescription->getDataFlow()->getTable();
diff --git a/Civi/DataProcessor/Factory.php b/Civi/DataProcessor/Factory.php
index dac494ce648a6dcc9200963783313e96136c5cb0..df910c0ea70fa4cb5ae50ce79d3b94950b597f9e 100644
--- a/Civi/DataProcessor/Factory.php
+++ b/Civi/DataProcessor/Factory.php
@@ -76,6 +76,7 @@ class Factory {
 
     $this->addDataProcessorType('default', 'Civi\DataProcessor\ProcessorType\DefaultProcessorType', E::ts('Default'));
     $this->addDataSource('contact', 'Civi\DataProcessor\Source\ContactSource', E::ts('Contact'));
+    $this->addDataSource('group', 'Civi\DataProcessor\Source\GroupSource', E::ts('Group'));
     $this->addDataSource('email', 'Civi\DataProcessor\Source\EmailSource', E::ts('E-mail'));
     $this->addDataSource('contribution', 'Civi\DataProcessor\Source\ContributionSource', E::ts('Contribution'));
     $this->addDataSource('relationship', 'Civi\DataProcessor\Source\RelationshipSource', E::ts('Relationship'));
diff --git a/Civi/DataProcessor/Output/Api.php b/Civi/DataProcessor/Output/Api.php
index a9a2039e9b4594ba41cc4dbb1eefe43635a241c0..f4af4a408fee74bd2964c7540045d33bebb2296e 100644
--- a/Civi/DataProcessor/Output/Api.php
+++ b/Civi/DataProcessor/Output/Api.php
@@ -163,7 +163,7 @@ class Api implements OutputInterface, API_ProviderInterface, EventSubscriberInte
     } else {
       $options = _civicrm_api3_get_options_from_params($apiRequest['params']);
 
-      if (isset($options['limit'])) {
+      if (isset($options['limit']) && $options['limit'] > 0) {
         $dataProcessor->getDataFlow()->setLimit($options['limit']);
       }
       if (isset($options['offset'])) {
diff --git a/Civi/DataProcessor/Source/AbstractCivicrmEntitySource.php b/Civi/DataProcessor/Source/AbstractCivicrmEntitySource.php
index dca26b3f8dbfaa2f6462e552aafcce88256110eb..4d24e9398bfd048683ee33ac36f016cac408b0e2 100644
--- a/Civi/DataProcessor/Source/AbstractCivicrmEntitySource.php
+++ b/Civi/DataProcessor/Source/AbstractCivicrmEntitySource.php
@@ -6,7 +6,7 @@
 
 namespace Civi\DataProcessor\Source;
 
-use Civi\DataProcessor\DataFlow\SqlDataFlow;
+use Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinInterface;
 use Civi\DataProcessor\DataFlow\SqlDataFlow\SimpleWhereClause;
 use Civi\DataProcessor\DataFlow\SqlTableDataFlow;
 use Civi\DataProcessor\DataFlow\CombinedDataFlow\CombinedSqlDataFlow;
@@ -16,6 +16,7 @@ use Civi\DataProcessor\DataSpecification\AggregationField;
 use Civi\DataProcessor\DataSpecification\CustomFieldSpecification;
 use Civi\DataProcessor\DataSpecification\DataSpecification;
 use Civi\DataProcessor\DataSpecification\FieldSpecification;
+use Civi\DataProcessor\ProcessorType\AbstractProcessorType;
 
 abstract class AbstractCivicrmEntitySource implements SourceInterface {
 
@@ -60,6 +61,21 @@ abstract class AbstractCivicrmEntitySource implements SourceInterface {
    */
   protected $customGroupDataFlowDescriptions = array();
 
+  /**
+   * @var array<\Civi\DataProcessor\DataFlow\MultipleDataFlows\DataFlowDescription>
+   */
+  protected $additionalDataFlowDescriptions = array();
+
+  /**
+   * @var AbstractProcessorType
+   */
+  protected $dataProcessor;
+
+  /**
+   * @var array
+   */
+  protected $configuration;
+
   /**
    * Returns the entity name
    *
@@ -75,33 +91,60 @@ abstract class AbstractCivicrmEntitySource implements SourceInterface {
   abstract protected function getTable();
 
   public function __construct() {
-
   }
 
   /**
-   * Initialize this data source.
+   * @param AbstractProcessorType $dataProcessor
    *
+   * @return \Civi\DataProcessor\Source\SourceInterface
+   */
+  public function setDataProcessor(AbstractProcessorType $dataProcessor) {
+    $this->dataProcessor = $dataProcessor;
+    return $this;
+  }
+
+  /**
    * @param array $configuration
-   * @param string $source_name
    *
    * @return \Civi\DataProcessor\Source\SourceInterface
-   * @throws \Exception
    */
-  public function initialize($configuration) {
-    $this->primaryDataFlow = new SqlTableDataFlow($this->getTable(), $this->getSourceName());
-    $this->addFilters($configuration);
+  public function setConfiguration($configuration) {
+    $this->configuration = $configuration;
+    return $this;
+  }
 
-    if (count($this->customGroupDataFlowDescriptions)) {
-      $this->dataFlow = new CombinedSqlDataFlow('', $this->primaryDataFlow->getTable(), $this->primaryDataFlow->getTableAlias());
-      $this->dataFlow->addSourceDataFlow(new DataFlowDescription($this->primaryDataFlow));
-      foreach($this->customGroupDataFlowDescriptions as $customGroupDataFlowDescription) {
-        $this->dataFlow->addSourceDataFlow($customGroupDataFlowDescription);
+  /**
+   * Initialize this data source.
+   *
+   * @throws \Exception
+   */
+  public function initialize() {
+    if (!$this->primaryDataFlow) {
+      $this->primaryDataFlow = new SqlTableDataFlow($this->getTable(), $this->getSourceName());
+    }
+    if (!$this->dataFlow) {
+      $this->addFilters($this->configuration);
+
+      if (count($this->customGroupDataFlowDescriptions) || count($this->additionalDataFlowDescriptions)) {
+        $this->dataFlow = new CombinedSqlDataFlow('', $this->primaryDataFlow->getTable(), $this->primaryDataFlow->getTableAlias());
+        $this->dataFlow->addSourceDataFlow(new DataFlowDescription($this->primaryDataFlow));
+        foreach ($this->customGroupDataFlowDescriptions as $customGroupDataFlowDescription) {
+          $this->dataFlow->addSourceDataFlow($customGroupDataFlowDescription);
+        }
+        foreach ($this->additionalDataFlowDescriptions as $additionalDataFlowDescription) {
+          $this->dataFlow->addSourceDataFlow($additionalDataFlowDescription);
+        }
+      }
+      else {
+        $this->dataFlow = $this->primaryDataFlow;
       }
-    } else {
-      $this->dataFlow = $this->primaryDataFlow;
     }
+  }
 
-    return $this;
+  protected function reset() {
+    $this->primaryDataFlow = new SqlTableDataFlow($this->getTable(), $this->getSourceName());
+    $this->dataFlow = null;
+    $this->additionalDataFlowDescriptions = array();
   }
 
   /**
@@ -218,14 +261,30 @@ abstract class AbstractCivicrmEntitySource implements SourceInterface {
     if ($this->getAvailableFilterFields()->doesFieldExist($filter_alias)) {
       $spec = $this->getAvailableFilterFields()->getFieldSpecificationByName($filter_alias);
       if ($spec instanceof CustomFieldSpecification) {
-        $customGroupDataFlowDescription = $this->ensureCustomGroup($spec->customGroupTableName, $spec->customGroupName);
-        $customGroupTableAlias = $customGroupDataFlowDescription->getDataFlow()
-          ->getTableAlias();
-        $customGroupDataFlowDescription->getDataFlow()->addWhereClause(
+        $customGroupDataFlow = $this->ensureCustomGroup($spec->customGroupTableName, $spec->customGroupName);
+        $customGroupTableAlias = $customGroupDataFlow->getTableAlias();
+        $customGroupDataFlow->addWhereClause(
           new SimpleWhereClause($customGroupTableAlias, $spec->customFieldColumnName, $filter['op'], $filter['value'])
         );
       } else {
-        $this->primaryDataFlow->addWhereClause(new SimpleWhereClause($this->primaryDataFlow->getTableAlias(), $spec->name, $filter['op'], $filter['value']));
+        $entityDataFlow = $this->ensureEntity();
+        $entityDataFlow->addWhereClause(new SimpleWhereClause($this->primaryDataFlow->getTableAlias(), $spec->name, $filter['op'], $filter['value']));
+      }
+    }
+  }
+
+  /**
+   * Ensure that filter field is accesible in the query
+   *
+   * @param String $fieldName
+   * @return DataFlowDescription|null
+   * @throws \Exception
+   */
+  public function ensureField($fieldName) {
+    if ($this->getAvailableFilterFields()->doesFieldExist($fieldName)) {
+      $spec = $this->getAvailableFilterFields()->getFieldSpecificationByName($fieldName);
+      if ($spec instanceof CustomFieldSpecification) {
+        return $this->ensureCustomGroup($spec->customGroupTableName, $spec->customGroupName);
       }
     }
   }
@@ -235,23 +294,75 @@ abstract class AbstractCivicrmEntitySource implements SourceInterface {
    *
    * @param $customGroupTableName
    * @param $customGroupName
-   * @return DataFlowDescription
+   * @return \Civi\DataProcessor\DataFlow\AbstractDataFlow
+   * @throws \Exception
    */
   protected function ensureCustomGroup($customGroupTableName, $customGroupName) {
-    if (!isset($this->customGroupDataFlowDescriptions[$customGroupName])) {
-      $customGroupTableAlias = $this->getSourceName().'_'.$customGroupName;
-      $this->customGroupDataFlowDescriptions[$customGroupName] = new DataFlowDescription(
-        new SqlTableDataFlow($customGroupTableName, $customGroupTableAlias, new DataSpecification()),
-        new SimpleJoin($this->getSourceName(), 'id', $customGroupTableAlias, 'entity_id', 'LEFT')
-      );
+    if (isset($this->customGroupDataFlowDescriptions[$customGroupName])) {
+      return $this->customGroupDataFlowDescriptions[$customGroupName]->getDataFlow();
+    } elseif ($this->primaryDataFlow && $this->primaryDataFlow->getTable() == $customGroupTableName) {
+      return $this->primaryDataFlow;
     }
-    return $this->customGroupDataFlowDescriptions[$customGroupName];
+    $customGroupTableAlias = $this->getSourceName().'_'.$customGroupName;
+    $join = new SimpleJoin($this->getSourceName(), 'id', $customGroupTableAlias, 'entity_id', 'LEFT');
+    $join->setDataProcessor($this->dataProcessor);
+    $this->customGroupDataFlowDescriptions[$customGroupName] = new DataFlowDescription(
+      new SqlTableDataFlow($customGroupTableName, $customGroupTableAlias, new DataSpecification()),
+      $join
+    );
+    return $this->customGroupDataFlowDescriptions[$customGroupName]->getDataFlow();
+  }
+
+  /**
+   * Ensure that the entity table is added the to the data flow.
+   *
+   * @return \Civi\DataProcessor\DataFlow\AbstractDataFlow
+   * @throws \Exception
+   */
+  protected function ensureEntity() {
+    if ($this->primaryDataFlow && $this->primaryDataFlow->getTable() === $this->getTable()) {
+      return $this->primaryDataFlow;
+    } elseif (empty($this->primaryDataFlow)) {
+      $this->primaryDataFlow = new SqlTableDataFlow($this->getTable(), $this->getSourceName());
+      return $this->primaryDataFlow;
+    }
+    foreach($this->additionalDataFlowDescriptions as $additionalDataFlowDescription) {
+      if ($additionalDataFlowDescription->getDataFlow()->getTable() == $this->getTable()) {
+        return $additionalDataFlowDescription->getDataFlow();
+      }
+    }
+    $entityDataFlow = new SqlTableDataFlow($this->getTable(), $this->getSourceName());
+    $join = new SimpleJoin($this->getSourceName(), 'id', $this->primaryDataFlow->getTableAlias(), 'entity_id', 'LEFT');
+    $join->setDataProcessor($this->dataProcessor);
+    $additionalDataFlowDescription = new DataFlowDescription($entityDataFlow,$join);
+    $this->additionalDataFlowDescriptions[] = $additionalDataFlowDescription;
+    return $additionalDataFlowDescription->getDataFlow();
+  }
+
+  /**
+   * Sets the join specification to connect this source to other data sources.
+   *
+   * @param \Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinInterface $join
+   *
+   * @return \Civi\DataProcessor\Source\SourceInterface
+   */
+  public function setJoin(JoinInterface $join) {
+    foreach($this->customGroupDataFlowDescriptions as $idx => $customGroupDataFlowDescription) {
+      if ($join->worksWithDataFlow($customGroupDataFlowDescription->getDataFlow())) {
+        $this->primaryDataFlow = $customGroupDataFlowDescription->getDataFlow();
+        unset($this->customGroupDataFlowDescriptions[$idx]);
+        unset($this->dataFlow);
+      }
+    }
+    return $this;
   }
 
   /**
    * @return \Civi\DataProcessor\DataFlow\AbstractDataFlow|\Civi\DataProcessor\DataFlow\AbstractDataFlow
+   * @throws \Exception
    */
   public function getDataFlow() {
+    $this->initialize();
     return $this->dataFlow;
   }
 
@@ -304,10 +415,11 @@ abstract class AbstractCivicrmEntitySource implements SourceInterface {
   public function ensureFieldInSource(FieldSpecification $fieldSpecification) {
     if ($this->getAvailableFields()->doesFieldExist($fieldSpecification->name)) {
       if ($fieldSpecification instanceof CustomFieldSpecification) {
-        $customGroupDataFlowDescription = $this->ensureCustomGroup($fieldSpecification->customGroupTableName, $fieldSpecification->customGroupName);
-        $customGroupDataFlowDescription->getDataFlow()->getDataSpecification()->addFieldSpecification($fieldSpecification->alias, $fieldSpecification);
+        $customGroupDataFlow = $this->ensureCustomGroup($fieldSpecification->customGroupTableName, $fieldSpecification->customGroupName);
+        $customGroupDataFlow->getDataSpecification()->addFieldSpecification($fieldSpecification->alias, $fieldSpecification);
       } else {
-        $this->primaryDataFlow->getDataSpecification()->addFieldSpecification($fieldSpecification->alias, $fieldSpecification);
+        $entityDataFlow = $this->ensureEntity();
+        $entityDataFlow->getDataSpecification()->addFieldSpecification($fieldSpecification->alias, $fieldSpecification);
       }
     }
   }
@@ -322,10 +434,11 @@ abstract class AbstractCivicrmEntitySource implements SourceInterface {
   public function ensureAggregationFieldInSource(FieldSpecification $fieldSpecification) {
     if ($this->getAvailableFields()->doesFieldExist($fieldSpecification->name)) {
       if ($fieldSpecification instanceof CustomFieldSpecification) {
-        $customGroupDataFlowDescription = $this->ensureCustomGroup($fieldSpecification->customGroupTableName, $fieldSpecification->customGroupName);
-        $customGroupDataFlowDescription->getDataFlow()->addAggregateField($fieldSpecification);
+        $customGroupDataFlow = $this->ensureCustomGroup($fieldSpecification->customGroupTableName, $fieldSpecification->customGroupName);
+        $customGroupDataFlow->addAggregateField($fieldSpecification);
       } else {
-        $this->primaryDataFlow->addAggregateField($fieldSpecification);
+        $entityDataFlow = $this->ensureEntity();
+        $entityDataFlow->addAggregateField($fieldSpecification);
       }
     }
   }
diff --git a/Civi/DataProcessor/Source/EmailSource.php b/Civi/DataProcessor/Source/EmailSource.php
index 4a41f1d04d436b23f7b76f0640d6a652d73428f7..7294b616764dd0799fcd0bf0e2cf710c8ed534c3 100644
--- a/Civi/DataProcessor/Source/EmailSource.php
+++ b/Civi/DataProcessor/Source/EmailSource.php
@@ -32,17 +32,4 @@ class EmailSource extends AbstractCivicrmEntitySource {
     return 'civicrm_email';
   }
 
-  /**
-   * @return \Civi\DataProcessor\DataSpecification\DataSpecification
-   * @throws \Exception
-   */
-  public function getAvailableFilterFields() {
-    if (!$this->availableFilterFields) {
-      $this->availableFilterFields = new DataSpecification();
-      $this->loadFields($this->availableFilterFields, array());
-      $this->loadCustomGroupsAndFields($this->availableFilterFields, true);
-    }
-    return $this->availableFilterFields;
-  }
-
 }
\ No newline at end of file
diff --git a/Civi/DataProcessor/Source/GroupSource.php b/Civi/DataProcessor/Source/GroupSource.php
new file mode 100644
index 0000000000000000000000000000000000000000..d5ff4a117eed24f45d3c73c8935026c3fd665bdf
--- /dev/null
+++ b/Civi/DataProcessor/Source/GroupSource.php
@@ -0,0 +1,35 @@
+<?php
+/**
+ * @author Jaap Jansma <jaap.jansma@civicoop.org>
+ * @license AGPL-3.0
+ */
+
+namespace Civi\DataProcessor\Source;
+
+use Civi\DataProcessor\DataFlow\SqlTableDataFlow;
+use Civi\DataProcessor\DataSpecification\DataSpecification;
+use Civi\DataProcessor\DataSpecification\FieldSpecification;
+
+use CRM_Dataprocessor_ExtensionUtil as E;
+
+class GroupSource extends AbstractCivicrmEntitySource {
+
+  /**
+   * Returns the entity name
+   *
+   * @return String
+   */
+  protected function getEntity() {
+    return 'Group';
+  }
+
+  /**
+   * Returns the table name of this entity
+   *
+   * @return String
+   */
+  protected function getTable() {
+    return 'civicrm_group';
+  }
+
+}
\ No newline at end of file
diff --git a/Civi/DataProcessor/Source/SourceInterface.php b/Civi/DataProcessor/Source/SourceInterface.php
index cb3d001bc58028570267eaeaf9fc8d655b269efc..f7982b867a6d6bfa6007b40f605add2a68535866 100644
--- a/Civi/DataProcessor/Source/SourceInterface.php
+++ b/Civi/DataProcessor/Source/SourceInterface.php
@@ -7,7 +7,9 @@
 namespace Civi\DataProcessor\Source;
 
 use Civi\DataProcessor\DataFlow\AbstractDataFlow;
+use Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinInterface;
 use Civi\DataProcessor\DataSpecification\FieldSpecification;
+use Civi\DataProcessor\ProcessorType\AbstractProcessorType;
 
 interface SourceInterface {
 
@@ -20,13 +22,34 @@ interface SourceInterface {
 
 
   /**
-   * Initialize this data source.
+   * Initialize the join
    *
+   * @return void
+   */
+  public function initialize();
+
+  /**
+   * Sets the join specification to connect this source to other data sources.
+   *
+   * @param \Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinInterface $join
+   *
+   * @return \Civi\DataProcessor\Source\SourceInterface
+   */
+  public function setJoin(JoinInterface $join);
+
+  /**
+   * @param AbstractProcessorType $dataProcessor
+   *
+   * @return \Civi\DataProcessor\Source\SourceInterface
+   */
+  public function setDataProcessor(AbstractProcessorType $dataProcessor);
+
+  /**
    * @param array $configuration
    *
    * @return \Civi\DataProcessor\Source\SourceInterface
    */
-  public function initialize($configuration);
+  public function setConfiguration($configuration);
 
   /**
    * @return \Civi\DataProcessor\DataSpecification\DataSpecification
@@ -50,6 +73,15 @@ interface SourceInterface {
    */
   public function getConfigurationUrl();
 
+  /**
+   * Ensure that filter field is accesible in the query
+   *
+   * @param String $fieldName
+   * @return \Civi\DataProcessor\DataFlow\MultipleDataFlows\DataFlowDescription
+   * @throws \Exception
+   */
+  public function ensureField($fieldName);
+
   /**
    * Ensures a field is in the data source
    *