diff --git a/Civi/DataProcessor/Source/Contribution/ContributionSource.php b/Civi/DataProcessor/Source/Contribution/ContributionSource.php
index 312bc9bfe91cda3ca1eb03b5b456758d0a17afa3..318398918dad92ea19160004f7295f19c4cb1a68 100644
--- a/Civi/DataProcessor/Source/Contribution/ContributionSource.php
+++ b/Civi/DataProcessor/Source/Contribution/ContributionSource.php
@@ -6,12 +6,43 @@
 
 namespace Civi\DataProcessor\Source\Contribution;
 
+use Civi\DataProcessor\DataFlow\CombinedDataFlow\CombinedSqlDataFlow;
+use Civi\DataProcessor\DataFlow\MultipleDataFlows\DataFlowDescription;
+use Civi\DataProcessor\DataFlow\MultipleDataFlows\SimpleJoin;
+use Civi\DataProcessor\DataFlow\CombinedDataFlow\SubqueryDataFlow;
+use Civi\DataProcessor\DataFlow\SqlTableDataFlow;
+use Civi\DataProcessor\DataSpecification\DataSpecification;
 use Civi\DataProcessor\Source\AbstractCivicrmEntitySource;
+use Civi\DataProcessor\DataSpecification\Utils as DataSpecificationUtils;
 
 use CRM_Dataprocessor_ExtensionUtil as E;
 
 class ContributionSource extends AbstractCivicrmEntitySource {
 
+  /**
+   * @var SqlTableDataFlow
+   */
+  protected $contributionDataFlow;
+
+  /**
+   * @var SqlTableDataFlow
+   */
+  protected $contributionSoftDataFlow;
+
+  public function __construct() {
+    parent::__construct();
+
+    // Create the contribution data flow and data flow description
+    $this->contributionDataFlow = new SqlTableDataFlow($this->getTable(), $this->getSourceName().'_contribution', $this->getSourceTitle());
+    DataSpecificationUtils::addDAOFieldsToDataSpecification('CRM_Contribute_DAO_Contribution', $this->contributionDataFlow->getDataSpecification());
+
+    // Create the contribution soft data flow and data flow description
+    $this->contributionSoftDataFlow = new SqlTableDataFlow('civicrm_contribution_soft', $this->getSourceName().'_contribution_soft');
+    DataSpecificationUtils::addDAOFieldsToDataSpecification('CRM_Contribute_DAO_ContributionSoft', $this->contributionSoftDataFlow->getDataSpecification(), array('id'), '', 'contribution_soft_', E::ts('Soft :: '));
+  }
+
+
+
   /**
    * Returns the entity name
    *
@@ -29,4 +60,88 @@ class ContributionSource extends AbstractCivicrmEntitySource {
   protected function getTable() {
     return 'civicrm_contribution';
   }
+
+  /**
+   * Initialize this data source.
+   *
+   * @throws \Exception
+   */
+  public function initialize() {
+    if (!$this->primaryDataFlow) {
+      $this->primaryDataFlow = $this->getEntityDataFlow();
+    }
+    $this->addFilters($this->configuration);
+    if (count($this->customGroupDataFlowDescriptions) || count($this->additionalDataFlowDescriptions)) {
+      $this->dataFlow = new CombinedSqlDataFlow('', $this->primaryDataFlow->getPrimaryTable(), $this->contributionDataFlow->getTableAlias());
+      $this->dataFlow->addSourceDataFlow(new DataFlowDescription($this->primaryDataFlow));
+      foreach ($this->additionalDataFlowDescriptions as $additionalDataFlowDescription) {
+        $this->dataFlow->addSourceDataFlow($additionalDataFlowDescription);
+      }
+      foreach ($this->customGroupDataFlowDescriptions as $customGroupDataFlowDescription) {
+        $this->dataFlow->addSourceDataFlow($customGroupDataFlowDescription);
+      }
+    }
+    else {
+      $this->dataFlow = $this->primaryDataFlow;
+    }
+  }
+
+  /**
+   * @return \Civi\DataProcessor\DataFlow\SqlDataFlow
+   * @throws \Exception
+   */
+  protected function getEntityDataFlow() {
+    $contributionDataDescription = new DataFlowDescription($this->contributionDataFlow);
+
+    $join = new SimpleJoin($this->contributionDataFlow->getTableAlias(), 'id', $this->contributionSoftDataFlow->getTableAlias(), 'contribution_id');
+    $join->setDataProcessor($this->dataProcessor);
+    $contributionSoftDataDescription = new DataFlowDescription($this->contributionSoftDataFlow, $join);
+
+    // Create the subquery data flow
+    $entityDataFlow = new SubqueryDataFlow($this->getSourceName(), $this->getTable(), $this->getSourceName());
+    $entityDataFlow->addSourceDataFlow($contributionDataDescription);
+    $entityDataFlow->addSourceDataFlow($contributionSoftDataDescription);
+
+    return $entityDataFlow;
+  }
+
+  /**
+   * Ensure that the entity table is added the to the data flow.
+   *
+   * @return \Civi\DataProcessor\DataFlow\AbstractDataFlow
+   * @throws \Exception
+   */
+  protected function ensureEntity() {
+    if ($this->primaryDataFlow && $this->primaryDataFlow instanceof SubqueryDataFlow && $this->primaryDataFlow->getPrimaryTable() === $this->getTable()) {
+      return $this->primaryDataFlow;
+    } elseif (empty($this->primaryDataFlow)) {
+      $this->primaryDataFlow = $this->getEntityDataFlow();
+      return $this->primaryDataFlow;
+    }
+    foreach($this->additionalDataFlowDescriptions as $additionalDataFlowDescription) {
+      if ($additionalDataFlowDescription->getDataFlow()->getTable() == $this->getTable()) {
+        return $additionalDataFlowDescription->getDataFlow();
+      }
+    }
+    $entityDataFlow = $this->getEntityDataFlow();
+    $join = new SimpleJoin($this->getSourceName(), 'id', $this->getSourceName(), 'entity_id', 'LEFT');
+    $join->setDataProcessor($this->dataProcessor);
+    $additionalDataFlowDescription = new DataFlowDescription($entityDataFlow,$join);
+    $this->additionalDataFlowDescriptions[] = $additionalDataFlowDescription;
+    return $additionalDataFlowDescription->getDataFlow();
+  }
+
+  /**
+   * Load the fields from this entity.
+   *
+   * @param DataSpecification $dataSpecification
+   * @throws \Civi\DataProcessor\DataSpecification\FieldExistsException
+   */
+  protected function loadFields(DataSpecification $dataSpecification, $fieldsToSkip=array()) {
+    $daoClass = \CRM_Core_DAO_AllCoreTables::getFullName($this->getEntity());
+    $aliasPrefix = $this->getSourceName().'_';
+
+    DataSpecificationUtils::addDAOFieldsToDataSpecification($daoClass, $dataSpecification, $fieldsToSkip, '', $aliasPrefix);
+    DataSpecificationUtils::addDAOFieldsToDataSpecification('CRM_Contribute_DAO_ContributionSoft', $dataSpecification, array('id', 'contribution_id'), 'contribution_soft_', $aliasPrefix, E::ts('Soft :: '));
+  }
 }
\ No newline at end of file