diff --git a/Civi/DataProcessor/Source/AbstractCivicrmEntitySource.php b/Civi/DataProcessor/Source/AbstractCivicrmEntitySource.php
index 155e0684b26593fa97666a72c1639b46aae20d13..d3741516588778625d68ae43c6d7f049fedd1033 100644
--- a/Civi/DataProcessor/Source/AbstractCivicrmEntitySource.php
+++ b/Civi/DataProcessor/Source/AbstractCivicrmEntitySource.php
@@ -93,7 +93,13 @@ abstract class AbstractCivicrmEntitySource extends AbstractSource {
     }
     $this->addFilters($this->configuration);
     if (count($this->customGroupDataFlowDescriptions) || count($this->additionalDataFlowDescriptions)) {
-      $this->dataFlow = new CombinedSqlDataFlow('', $this->primaryDataFlow->getTable(), $this->primaryDataFlow->getTableAlias());
+      if ($this->primaryDataFlow instanceof CombinedSqlDataFlow) {
+        $this->dataFlow = new CombinedSqlDataFlow('', $this->primaryDataFlow->getPrimaryTable(), $this->primaryDataFlow->getPrimaryTableAlias());
+      } elseif ($this->primaryDataFlow instanceof SqlTableDataFlow) {
+        $this->dataFlow = new CombinedSqlDataFlow('', $this->primaryDataFlow->getTable(), $this->primaryDataFlow->getTableAlias());
+      } else {
+        throw new \Exception("Invalid primary data source in data source ".$this->getSourceName());
+      }
       $this->dataFlow->addSourceDataFlow(new DataFlowDescription($this->primaryDataFlow));
       foreach ($this->additionalDataFlowDescriptions as $additionalDataFlowDescription) {
         $this->dataFlow->addSourceDataFlow($additionalDataFlowDescription);