diff --git a/Civi/DataProcessor/DataFlow/CombinedDataFlow/CombinedSqlDataFlow.php b/Civi/DataProcessor/DataFlow/CombinedDataFlow/CombinedSqlDataFlow.php
index c6823614b2106ee849a5db585b370371557b32c8..0c9c20b9fbfe79be79364fe9abc31b03edc39714 100644
--- a/Civi/DataProcessor/DataFlow/CombinedDataFlow/CombinedSqlDataFlow.php
+++ b/Civi/DataProcessor/DataFlow/CombinedDataFlow/CombinedSqlDataFlow.php
@@ -60,6 +60,15 @@ class CombinedSqlDataFlow extends SqlDataFlow implements MultipleSourceDataFlows
     $this->sourceDataFlowDescriptions[] = $dataFlowDescription;
   }
 
+  /**
+   * Returns the Table part in the from statement.
+   *
+   * @return string
+   */
+  public function getTableStatement() {
+    return "`{$this->primary_table}` `{$this->primary_table_alias}`";
+  }
+
   /**
    * Returns the From Statement.
    *
diff --git a/Civi/DataProcessor/DataFlow/SqlDataFlow/SubqueryDataFlow.php b/Civi/DataProcessor/DataFlow/CombinedDataFlow/SubqueryDataFlow.php
similarity index 91%
rename from Civi/DataProcessor/DataFlow/SqlDataFlow/SubqueryDataFlow.php
rename to Civi/DataProcessor/DataFlow/CombinedDataFlow/SubqueryDataFlow.php
index 9a6569b23ac6e3fbbe3b8c0dd50e5b5267c4e3e9..f394d7ac78a70674e5a35e5a5d4c8127dbc9dadb 100644
--- a/Civi/DataProcessor/DataFlow/SqlDataFlow/SubqueryDataFlow.php
+++ b/Civi/DataProcessor/DataFlow/CombinedDataFlow/SubqueryDataFlow.php
@@ -4,9 +4,8 @@
  * @license AGPL-3.0
  */
 
-namespace Civi\DataProcessor\DataFlow\SqlDataFlow;
+namespace Civi\DataProcessor\DataFlow\CombinedDataFlow;
 
-use Civi\DataProcessor\DataFlow\CombinedDataFlow\CombinedSqlDataFlow;
 use Civi\DataProcessor\DataFlow\EndOfFlowException;
 use Civi\DataProcessor\DataSpecification\DataSpecification;
 use Civi\DataProcessor\DataSpecification\SqlFieldSpecification;
@@ -19,6 +18,26 @@ class SubqueryDataFlow extends CombinedSqlDataFlow {
    * @return string
    */
   public function getFromStatement() {
+    return "FROM {$this->getTableStatement()}";
+  }
+
+  /**
+   * Returns the join Statement part.
+   *
+   * @param int $skip
+   * @return string
+   */
+  public function getJoinStatement($skip=0) {
+    $fromStatements = array();
+    return $fromStatements;
+  }
+
+  /**
+   * Returns the Table part in the from statement.
+   *
+   * @return string
+   */
+  public function getTableStatement() {
     $fields = array();
     foreach($this->sourceDataFlowDescriptions as $sourceDataFlowDescription) {
       $fields = array_merge($fields, $sourceDataFlowDescription->getDataFlow()->getFieldsForSelectStatement());
@@ -47,19 +66,7 @@ class SubqueryDataFlow extends CombinedSqlDataFlow {
     $from = implode(" ", $fromStatements);
 
     $select = implode(", ", $fields);
-    return "FROM (SELECT {$select} {$from}) `{$this->getPrimaryTableAlias()}`";
-  }
-
-  /**
-   * Returns the join Statement part.
-   *
-   * @param int $skip
-   * @return string
-   */
-  public function getJoinStatement($skip=0) {
-    $fromStatements = array();
-    $i = 0;
-    return $fromStatements;
+    return "(SELECT {$select} {$from}) `{$this->getPrimaryTableAlias()}`";
   }
 
   /**
diff --git a/Civi/DataProcessor/DataFlow/MultipleDataFlows/SimpleJoin.php b/Civi/DataProcessor/DataFlow/MultipleDataFlows/SimpleJoin.php
index 6f8e3ed1c66e5f61d67568c5449842e548b9f772..bfd5045d0f99fe4113ae1339c6ff85d3de943112 100644
--- a/Civi/DataProcessor/DataFlow/MultipleDataFlows/SimpleJoin.php
+++ b/Civi/DataProcessor/DataFlow/MultipleDataFlows/SimpleJoin.php
@@ -309,15 +309,11 @@ class SimpleJoin implements JoinInterface, SqlJoinInterface {
       }
       $joinClause = "ON {$leftColumnName}  = {$rightColumnName}";
     }
-    if ($sourceDataFlowDescription->getDataFlow() instanceof SqlTableDataFlow) {
-      $table = $sourceDataFlowDescription->getDataFlow()->getTable();
-      $table_alias = $sourceDataFlowDescription->getDataFlow()->getTableAlias();
-    } elseif ($sourceDataFlowDescription->getDataFlow() instanceof CombinedSqlDataFlow) {
-      $table = $sourceDataFlowDescription->getDataFlow()->getPrimaryTable();
-      $table_alias = $sourceDataFlowDescription->getDataFlow()->getPrimaryTableAlias();
+    if ($sourceDataFlowDescription->getDataFlow() instanceof SqlDataFlow) {
+      $tablePart = $sourceDataFlowDescription->getDataFlow()->getTableStatement();
     }
 
-    return "{$this->type} JOIN `{$table}` `{$table_alias}` {$joinClause} ";
+    return "{$this->type} JOIN {$tablePart} {$joinClause} ";
   }
 
 
diff --git a/Civi/DataProcessor/DataFlow/MultipleDataFlows/SimpleNonRequiredJoin.php b/Civi/DataProcessor/DataFlow/MultipleDataFlows/SimpleNonRequiredJoin.php
index a134944a9a67adb8798dbd31ad81c30c8478b36c..2b25ff08c7ef92442c1d7d73f1a5826297128c6b 100644
--- a/Civi/DataProcessor/DataFlow/MultipleDataFlows/SimpleNonRequiredJoin.php
+++ b/Civi/DataProcessor/DataFlow/MultipleDataFlows/SimpleNonRequiredJoin.php
@@ -80,12 +80,8 @@ class SimpleNonRequiredJoin  extends  SimpleJoin {
       }
       $joinClause = "ON {$leftColumnName}  = {$rightColumnName}";
     }
-    if ($sourceDataFlowDescription->getDataFlow() instanceof SqlTableDataFlow) {
-      $table = $sourceDataFlowDescription->getDataFlow()->getTable();
-      $table_alias = $sourceDataFlowDescription->getDataFlow()->getTableAlias();
-    } elseif ($sourceDataFlowDescription->getDataFlow() instanceof CombinedSqlDataFlow) {
-      $table = $sourceDataFlowDescription->getDataFlow()->getPrimaryTable();
-      $table_alias = $sourceDataFlowDescription->getDataFlow()->getPrimaryTableAlias();
+    if ($sourceDataFlowDescription->getDataFlow() instanceof SqlDataFlow) {
+      $tablePart = $sourceDataFlowDescription->getDataFlow()->getTableStatement();
     }
 
     $extraClause  = "";
@@ -107,7 +103,7 @@ class SimpleNonRequiredJoin  extends  SimpleJoin {
       $extraClause = " AND (".implode(" AND ", $extraClauses). ")";
     }
 
-    return "{$this->type} JOIN `{$table}` `{$table_alias}` {$joinClause} {$extraClause}";
+    return "{$this->type} JOIN {$tablePart} {$joinClause} {$extraClause}";
   }
 
 
diff --git a/Civi/DataProcessor/DataFlow/SqlDataFlow.php b/Civi/DataProcessor/DataFlow/SqlDataFlow.php
index 63a421995aa31f279e2f2b3ced35ad19f6a81e85..d4969976fd3778aa35dc5eee57219ef3b3f06d9f 100644
--- a/Civi/DataProcessor/DataFlow/SqlDataFlow.php
+++ b/Civi/DataProcessor/DataFlow/SqlDataFlow.php
@@ -44,12 +44,21 @@ abstract class SqlDataFlow extends AbstractDataFlow {
    */
   abstract public function getFieldsForGroupByStatement();
 
+  /**
+   * Returns the Table part in the from statement.
+   *
+   * @return string
+   */
+  abstract public function getTableStatement();
+
   /**
    * Returns the From Statement.
    *
    * @return string
    */
-  abstract public function getFromStatement();
+  public function getFromStatement() {
+    return "FROM {$this->getTableStatement()}";
+  }
 
   /**
    * Initialize the data flow
diff --git a/Civi/DataProcessor/DataFlow/SqlTableDataFlow.php b/Civi/DataProcessor/DataFlow/SqlTableDataFlow.php
index de98ca5376fc500f56ca4aaa3986c231981b6ad4..c8e7bcc73b85a6592bb7dd5acd8c08f1f8ae5123 100644
--- a/Civi/DataProcessor/DataFlow/SqlTableDataFlow.php
+++ b/Civi/DataProcessor/DataFlow/SqlTableDataFlow.php
@@ -46,12 +46,12 @@ class SqlTableDataFlow extends SqlDataFlow {
   }
 
   /**
-   * Returns the From Statement.
+   * Returns the Table part in the from statement.
    *
    * @return string
    */
-  public function getFromStatement() {
-    return "FROM `{$this->table}` `{$this->table_alias}`";
+  public function getTableStatement() {
+    return "`{$this->table}` `{$this->table_alias}`";
   }
 
   /**
diff --git a/Civi/DataProcessor/Source/Activity/ActivitySource.php b/Civi/DataProcessor/Source/Activity/ActivitySource.php
index 0ef77bb9c227a9b509a66dd14eb3abcd5795e7bf..766524d17c79fdb8e678233ebf08a349e2767065 100644
--- a/Civi/DataProcessor/Source/Activity/ActivitySource.php
+++ b/Civi/DataProcessor/Source/Activity/ActivitySource.php
@@ -8,7 +8,7 @@ namespace Civi\DataProcessor\Source\Activity;
 
 use Civi\DataProcessor\DataFlow\MultipleDataFlows\DataFlowDescription;
 use Civi\DataProcessor\DataFlow\MultipleDataFlows\SimpleJoin;
-use Civi\DataProcessor\DataFlow\SqlDataFlow\SubqueryDataFlow;
+use Civi\DataProcessor\DataFlow\CombinedDataFlow\SubqueryDataFlow;
 use Civi\DataProcessor\DataFlow\SqlTableDataFlow;
 use Civi\DataProcessor\DataSpecification\DataSpecification;
 use Civi\DataProcessor\Source\AbstractCivicrmEntitySource;