diff --git a/CRM/Dataprocessor/Form/Source/BaseForm.php b/CRM/Dataprocessor/Form/Source/BaseForm.php
new file mode 100644
index 0000000000000000000000000000000000000000..ebe1104c22b2230c510de5d5ba46536f0ded0e08
--- /dev/null
+++ b/CRM/Dataprocessor/Form/Source/BaseForm.php
@@ -0,0 +1,80 @@
+<?php
+/**
+ * @author Jaap Jansma <jaap.jansma@civicoop.org>
+ * @license AGPL-3.0
+ */
+
+use CRM_Dataprocessor_ExtensionUtil as E;
+
+class CRM_Dataprocessor_Form_Source_BaseForm extends CRM_Core_Form {
+
+  protected $dataProcessorId;
+
+  protected $source_id;
+
+  /**
+   * @var \Civi\DataProcessor\Source\SourceInterface
+   */
+  protected $sourceClass;
+
+  /**
+   * @var array
+   *   The source object
+   */
+  protected $source;
+
+  /**
+   * Function to perform processing before displaying form (overrides parent function)
+   *
+   * @access public
+   */
+  function preProcess() {
+    $session = CRM_Core_Session::singleton();
+    $this->dataProcessorId = CRM_Utils_Request::retrieve('data_processor_id', 'Integer');
+    $this->assign('data_processor_id', $this->dataProcessorId);
+
+    $this->source_id = CRM_Utils_Request::retrieve('source_id', 'Integer', CRM_Core_DAO::$_nullObject, TRUE);
+    $this->assign('source_id', $this->source_id);
+
+    $source = CRM_Dataprocessor_BAO_Source::getValues(array('id' => $this->source_id));
+    $this->source = $source[$this->source_id];
+    $this->assign('source', $this->source);
+
+    $factory = dataprocessor_get_factory();
+    $this->sourceClass = $factory->getDataSourceByName($this->source['type']);
+    $this->sourceClass->setSourceName($this->source['name']);
+    $this->sourceClass->setSourceTitle($this->source['title']);
+
+    $title = E::ts('Data Processor Source Configuration');
+    CRM_Utils_System::setTitle($title);
+
+    $url = CRM_Utils_System::url('civicrm/dataprocessor/form/edit', array('id' => $this->dataProcessorId, 'action' => 'update', 'reset' => 1));
+    $session->pushUserContext($url);
+  }
+
+  public function buildQuickForm() {
+    $this->add('hidden', 'data_processor_id');
+    $this->add('hidden', 'source_id');
+
+    $this->addButtons(array(
+      array('type' => 'next', 'name' => E::ts('Save'), 'isDefault' => TRUE,),
+      array('type' => 'cancel', 'name' => E::ts('Cancel'))
+    ));
+    parent::buildQuickForm();
+  }
+
+  function setDefaultValues() {
+    $defaults = [];
+    $defaults['data_processor_id'] = $this->dataProcessorId;
+    $defaults['source_id'] = $this->source_id;
+
+    return $defaults;
+  }
+
+  public function postProcess() {
+    $session = CRM_Core_Session::singleton();
+    CRM_Utils_System::redirect($session->readUserContext());
+    parent::postProcess();
+  }
+
+}
\ No newline at end of file
diff --git a/CRM/Dataprocessor/Form/Source/Configuration.php b/CRM/Dataprocessor/Form/Source/Configuration.php
index 6aa2decc773bc3af61796aba4e5567d85719ac8d..b50ac6e18e3575b1b5c9a320df76214dfba652f6 100644
--- a/CRM/Dataprocessor/Form/Source/Configuration.php
+++ b/CRM/Dataprocessor/Form/Source/Configuration.php
@@ -6,70 +6,16 @@
 
 use CRM_Dataprocessor_ExtensionUtil as E;
 
-class CRM_Dataprocessor_Form_Source_Configuration extends CRM_Core_Form {
-
-  protected $dataProcessorId;
-
-  protected $source_id;
-
-  /**
-   * @var \Civi\DataProcessor\Source\SourceInterface
-   */
-  protected $sourceClass;
-
-  /**
-   * @var array
-   *   The source object
-   */
-  protected $source;
-
-  /**
-   * Function to perform processing before displaying form (overrides parent function)
-   *
-   * @access public
-   */
-  function preProcess() {
-    $session = CRM_Core_Session::singleton();
-    $this->dataProcessorId = CRM_Utils_Request::retrieve('data_processor_id', 'Integer');
-    $this->assign('data_processor_id', $this->dataProcessorId);
-
-    $this->source_id = CRM_Utils_Request::retrieve('source_id', 'Integer', CRM_Core_DAO::$_nullObject, TRUE);
-    $this->assign('source_id', $this->source_id);
-
-    $source = CRM_Dataprocessor_BAO_Source::getValues(array('id' => $this->source_id));
-    $this->source = $source[$this->source_id];
-    $this->assign('source', $this->source);
-
-    $factory = dataprocessor_get_factory();
-    $this->sourceClass = $factory->getDataSourceByName($this->source['type']);
-    $this->sourceClass->setSourceName($this->source['name']);
-    $this->sourceClass->setSourceTitle($this->source['title']);
-
-    $title = E::ts('Data Processor Source Configuration');
-    CRM_Utils_System::setTitle($title);
-
-    $url = CRM_Utils_System::url('civicrm/dataprocessor/form/edit', array('id' => $this->dataProcessorId, 'action' => 'update', 'reset' => 1));
-    $session->pushUserContext($url);
-  }
+class CRM_Dataprocessor_Form_Source_Configuration extends CRM_Dataprocessor_Form_Source_BaseForm {
 
   public function buildQuickForm() {
-    $this->add('hidden', 'data_processor_id');
-    $this->add('hidden', 'source_id');
+    parent::buildQuickForm();
 
     $this->addFieldsForFiltering();
-
-    $this->addButtons(array(
-      array('type' => 'next', 'name' => E::ts('Save'), 'isDefault' => TRUE,),
-      array('type' => 'cancel', 'name' => E::ts('Cancel'))
-    ));
-    parent::buildQuickForm();
   }
 
   function setDefaultValues() {
-    $defaults = [];
-    $defaults['data_processor_id'] = $this->dataProcessorId;
-    $defaults['source_id'] = $this->source_id;
-
+    $defaults = parent::setDefaultValues();
     if (isset($this->source['configuration']['filter'])) {
       foreach($this->source['configuration']['filter'] as $alias => $filter) {
         $defaults[$alias.'_op'] = $filter['op'];
@@ -81,9 +27,6 @@ class CRM_Dataprocessor_Form_Source_Configuration extends CRM_Core_Form {
   }
 
   public function postProcess() {
-    $session = CRM_Core_Session::singleton();
-
-    $values = $this->exportValues();
     if ($this->dataProcessorId) {
       $params['data_processor_id'] = $this->dataProcessorId;
     }
@@ -93,7 +36,6 @@ class CRM_Dataprocessor_Form_Source_Configuration extends CRM_Core_Form {
 
     $params['configuration']['filter'] = $this->postProcessFieldsForFiltering();
     CRM_Dataprocessor_BAO_Source::add($params);
-    CRM_Utils_System::redirect($session->readUserContext());
     parent::postProcess();
   }
 
diff --git a/CRM/Dataprocessor/Form/Source/Csv.php b/CRM/Dataprocessor/Form/Source/Csv.php
new file mode 100644
index 0000000000000000000000000000000000000000..1b0ca38f301c27043474c4255b33815092857294
--- /dev/null
+++ b/CRM/Dataprocessor/Form/Source/Csv.php
@@ -0,0 +1,60 @@
+<?php
+/**
+ * @author Jaap Jansma <jaap.jansma@civicoop.org>
+ * @license AGPL-3.0
+ */
+
+use CRM_Dataprocessor_ExtensionUtil as E;
+
+/**
+ * Form controller class
+ *
+ * @see https://wiki.civicrm.org/confluence/display/CRMDOC/QuickForm+Reference
+ */
+class CRM_Dataprocessor_Form_Source_Csv extends CRM_Dataprocessor_Form_Source_BaseForm {
+
+  public function buildQuickForm() {
+    parent::buildQuickForm();
+    $this->add('text', 'uri', E::ts('URI'), true);
+    $this->add('text', 'delimiter', E::ts('Field delimiter'), true);
+    $this->add('text', 'enclosure', E::ts('Field enclosure character'), true);
+    $this->add('text', 'escape', E::ts('Escape character'), true);
+    $this->add('checkbox', 'first_row_as_header', E::ts('First row contains column names'));
+  }
+
+  function setDefaultValues() {
+    $defaults = parent::setDefaultValues();
+    foreach($this->source['configuration'] as $field => $value) {
+      $defaults[$field] = $value;
+    }
+    if (!isset($defaults['delimiter'])) {
+      $defaults['delimiter'] = ',';
+    }
+    if (!isset($defaults['enclosure'])) {
+      $defaults['enclosure'] = '"';
+    }
+    if (!isset($defaults['escape'])) {
+      $defaults['escape'] = '\\';
+    }
+    return $defaults;
+  }
+
+  public function postProcess() {
+    $values = $this->exportValues();
+    if ($this->dataProcessorId) {
+      $params['data_processor_id'] = $this->dataProcessorId;
+    }
+    if ($this->source_id) {
+      $params['id'] = $this->source_id;
+    }
+
+    $params['configuration']['uri'] = $values['uri'];
+    $params['configuration']['delimiter'] = $values['delimiter'];
+    $params['configuration']['enclosure'] = $values['enclosure'];
+    $params['configuration']['escape'] = $values['escape'];
+    $params['configuration']['first_row_as_header'] = $values['first_row_as_header'];
+    CRM_Dataprocessor_BAO_Source::add($params);
+    parent::postProcess();
+  }
+
+}
\ No newline at end of file
diff --git a/Civi/DataProcessor/DataFlow/AbstractDataFlow.php b/Civi/DataProcessor/DataFlow/AbstractDataFlow.php
index bbf1310e0a157946106a505e8fdba9da34b8a80e..2987c65162f1214db7f327fb87a52b6c1768b6bd 100644
--- a/Civi/DataProcessor/DataFlow/AbstractDataFlow.php
+++ b/Civi/DataProcessor/DataFlow/AbstractDataFlow.php
@@ -84,7 +84,7 @@ abstract class AbstractDataFlow {
    * @return array
    * @throws EndOfFlowException
    */
-  abstract protected function retrieveNextRecord($fieldNameprefix='');
+  abstract public function retrieveNextRecord($fieldNameprefix='');
 
   /**
    * Returns a name for this data flow.
@@ -228,10 +228,10 @@ abstract class AbstractDataFlow {
   /**
    * Returns debug information
    *
-   * @return string
+   * @return array
    */
   public function getDebugInformation() {
-    return "";
+    return array();
   }
 
   public function addAggregateField(FieldSpecification $aggregateField) {
diff --git a/Civi/DataProcessor/DataFlow/CombinedDataFlow/CombinedDataFlow.php b/Civi/DataProcessor/DataFlow/CombinedDataFlow/CombinedDataFlow.php
index 0072b8fee2449828ffd55184daf9a6c4c2263a61..56af659a7213a63d2394911f16ba0c28d55fe327 100644
--- a/Civi/DataProcessor/DataFlow/CombinedDataFlow/CombinedDataFlow.php
+++ b/Civi/DataProcessor/DataFlow/CombinedDataFlow/CombinedDataFlow.php
@@ -11,6 +11,7 @@ use \Civi\DataProcessor\DataFlow\EndOfFlowException;
 use Civi\DataProcessor\DataFlow\MultipleDataFlows\DataFlowDescription;
 use Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinInterface;
 use Civi\DataProcessor\DataFlow\MultipleDataFlows\MultipleSourceDataFlows;
+use Civi\DataProcessor\DataFlow\SqlDataFlow;
 use \Civi\DataProcessor\DataSpecification\DataSpecification;
 
 
@@ -46,6 +47,11 @@ class CombinedDataFlow extends AbstractDataFlow implements MultipleSourceDataFlo
    */
   protected $dataSpecification;
 
+  /**
+   * @var int
+   */
+  protected $batchSize = 100;
+
   public function __construct() {
     $this->dataSpecification = new DataSpecification();
   }
@@ -60,6 +66,16 @@ class CombinedDataFlow extends AbstractDataFlow implements MultipleSourceDataFlo
     $this->sourceDataFlowDescriptions[] = $dataFlowDescription;
   }
 
+  /**
+   * @param \Civi\DataProcessor\FieldOutputHandler\AbstractFieldOutputHandler $outputFieldHandler[]
+   */
+  public function setOutputFieldHandlers($handlers) {
+    parent::setOutputFieldHandlers($handlers);
+    foreach($this->sourceDataFlowDescriptions as $sourceDataFlowDescription) {
+      $sourceDataFlowDescription->getDataFlow()->setOutputFieldHandlers($handlers);
+    }
+  }
+
   /**
    * Initialize the data flow
    *
@@ -79,9 +95,16 @@ class CombinedDataFlow extends AbstractDataFlow implements MultipleSourceDataFlo
     }
 
     $allRecords = array();
-    foreach($this->sourceDataFlowDescriptions as $dataFlowDescription) {
-      $records = $dataFlowDescription->getDataFlow()->allRecords($dataFlowDescription->getDataFlow()->getName());
-      $allRecords = $this->joinArray($allRecords, $records, $dataFlowDescription->getJoinSpecification());
+    for($i=0; $i<count($this->sourceDataFlowDescriptions); $i++) {
+      do {
+        $batch = $this->getAllRecordsFromDataFlowAsArray($this->sourceDataFlowDescriptions[$i]->getDataFlow(), $this->batchSize);
+        for($j=$i+1; $j<count($this->sourceDataFlowDescriptions); $j++) {
+          $this->sourceDataFlowDescriptions[$j]->getJoinSpecification()->prepareRightDataFlow($batch, $this->sourceDataFlowDescriptions[$j]->getDataFlow());
+          $rightRecords = $this->getAllRecordsFromDataFlowAsArray($this->sourceDataFlowDescriptions[$j]->getDataFlow());
+          $batch = $this->sourceDataFlowDescriptions[$j]->getJoinSpecification()->join($batch, $rightRecords);
+        }
+        $allRecords = array_merge($allRecords, $batch);
+      } while(count($batch) >= $this->batchSize || $this->batchSize == 0);
     }
     $this->recordCount = count($allRecords);
 
@@ -101,35 +124,25 @@ class CombinedDataFlow extends AbstractDataFlow implements MultipleSourceDataFlo
   }
 
   /**
-   * Join two arrays together based on the combine specification
-   * This functions like an INNER JOIN in sql.
-   *
-   * @param $left
-   * @param $right
-   * @param \Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinInterface|null
+   * Return all records for a given data flow.
    *
+   * @param \Civi\DataProcessor\DataFlow\AbstractDataFlow $dataFlow
+   * @param int $batchSize 0 for unlimited
    * @return array
+   * @throws \Civi\DataProcessor\DataFlow\EndOfFlowException
    */
-  protected function joinArray($left, $right, JoinInterface $combineSpecification=null) {
-    $out = array();
-
-    if ($combineSpecification === null && empty($left)) {
-      return $right;
-    } elseif ($combineSpecification === null && empty($right)) {
-      return $left;
-    }
-
-    foreach($left as $left_index => $left_record) {
-      foreach($right as $right_index => $right_record) {
-        if ($combineSpecification === null || $combineSpecification->isJoinable($left_record, $right_record)) {
-          $out[] = array_merge($left_record, $right_record);
-          unset($left[$left_index]);
-          unset($right[$right_index]);
-        }
+  protected function getAllRecordsFromDataFlowAsArray(AbstractDataFlow $dataFlow, $batchSize=0) {
+    $records = array();
+    try {
+      $i = 0;
+      while(($record = $dataFlow->retrieveNextRecord()) && ($i < $batchSize || $batchSize == 0)) {
+        $records[] = $record;
+        $i++;
       }
+    } catch (EndOfFlowException $e) {
+      // Do nothing
     }
-
-    return $out;
+    return $records;
   }
 
   /**
@@ -159,7 +172,7 @@ class CombinedDataFlow extends AbstractDataFlow implements MultipleSourceDataFlo
    * @return array
    * @throws EndOfFlowException
    */
-  protected function retrieveNextRecord($fieldNamePrefix='') {
+  public function retrieveNextRecord($fieldNamePrefix='') {
     if (!$this->isInitialized()) {
       $this->initialize();
     }
@@ -206,4 +219,13 @@ class CombinedDataFlow extends AbstractDataFlow implements MultipleSourceDataFlo
     return 'combined_data_flow';
   }
 
+  public function getDebugInformation() {
+    $debug = array();
+    foreach($this->sourceDataFlowDescriptions as $sourceDataFlowDescription) {
+      $debug[$sourceDataFlowDescription->getDataFlow()->getName()] = $sourceDataFlowDescription->getDataFlow()->getDebugInformation();
+    }
+    return $debug;
+
+  }
+
 }
\ No newline at end of file
diff --git a/Civi/DataProcessor/DataFlow/CombinedDataFlow/CombinedSqlDataFlow.php b/Civi/DataProcessor/DataFlow/CombinedDataFlow/CombinedSqlDataFlow.php
index 184619abeee74e3a47e73f0ec17dcb737a5081c9..eb29e3d6c4b9930bc72563d345e7ba77becbbbff 100644
--- a/Civi/DataProcessor/DataFlow/CombinedDataFlow/CombinedSqlDataFlow.php
+++ b/Civi/DataProcessor/DataFlow/CombinedDataFlow/CombinedSqlDataFlow.php
@@ -141,7 +141,7 @@ class CombinedSqlDataFlow extends SqlDataFlow implements MultipleSourceDataFlows
    * @return array
    * @throws EndOfFlowException
    */
-  protected function retrieveNextRecord($fieldNamePrefix='') {
+  public function retrieveNextRecord($fieldNamePrefix='') {
     if (!$this->isInitialized()) {
       $this->initialize();
     }
diff --git a/Civi/DataProcessor/DataFlow/CsvDataFlow.php b/Civi/DataProcessor/DataFlow/CsvDataFlow.php
new file mode 100644
index 0000000000000000000000000000000000000000..d61f01d6926d7f3caace3c54984a1f5f71bf9cb7
--- /dev/null
+++ b/Civi/DataProcessor/DataFlow/CsvDataFlow.php
@@ -0,0 +1,141 @@
+<?php
+/**
+ * @author Jaap Jansma <jaap.jansma@civicoop.org>
+ * @license AGPL-3.0
+ */
+
+namespace Civi\DataProcessor\DataFlow;
+
+use CRM_Dataprocessor_ExtensionUtil as E;
+
+class CsvDataFlow extends AbstractDataFlow {
+
+  protected $data = [];
+
+  protected $currentPointer = 0;
+
+  protected $isInitialized = FALSE;
+
+  protected $uri;
+
+  protected $delimiter = ',';
+
+  protected $enclosure = '"';
+
+  protected $escape = '\\';
+
+  protected $skipRows = 1;
+
+  private $uriHandle;
+
+  public function __construct($uri, $skipRows = 1, $delimiter=',', $enclosure='"', $escape='\\') {
+    parent::__construct();
+    $this->uri = $uri;
+    $this->skipRows = $skipRows;
+    $this->delimiter = $delimiter;
+    $this->enclosure = $enclosure;
+    $this->escape = $escape;
+  }
+
+  /**
+   * Returns the header row
+   *
+   * @param $headerRowNumber
+   * @return array
+   */
+  public function getHeaderRow($headerRowNumber=0) {
+    $header = array();
+    $handle = fopen($this->uri, 'r');
+    for($i=1; $i<$headerRowNumber; $i++) {
+      $skipRow = fgetcsv($handle, 0, $this->delimiter, $this->enclosure, $this->escape);
+      if ($i == 0) {
+        // This is the first row, initialize the header with at least as many columns as this row
+        foreach($skipRow as $col_idx => $col) {
+          $header[$col_idx] = E::ts('Column %1', array(1=>$col_idx));
+        }
+      }
+    }
+    if ($headerRowNumber) {
+      $headerRow = fgetcsv($handle, 0, $this->delimiter, $this->enclosure, $this->escape);
+      if ($headerRow) {
+        foreach ($headerRow as $col_idx => $col) {
+          $header[$col_idx] = $col;
+        }
+      }
+    }
+    fclose($handle);
+    return $header;
+  }
+
+
+  /**
+   * Initialize the data flow
+   *
+   * @return void
+   */
+  public function initialize() {
+    if ($this->isInitialized()) {
+      return;
+    }
+
+    $this->uriHandle = fopen($this->uri, 'r');
+    for($i=0; $i<$this->skipRows; $i++) {
+      $skipRow = fgetcsv($this->uriHandle, 0, $this->delimiter, $this->enclosure, $this->escape);
+    }
+
+    $this->isInitialized = TRUE;
+  }
+
+  /**
+   * Returns whether this flow has been initialized or not
+   *
+   * @return bool
+   */
+  public function isInitialized() {
+    return $this->isInitialized;
+  }
+
+/**
+   * Resets the initialized state. This function is called
+   * when a setting has changed. E.g. when offset or limit are set.
+   *
+   * @return void
+   */
+  protected function resetInitializeState() {
+    $this->isInitialized = FALSE;
+  }
+
+  /**
+   * Returns the next record in an associative array
+   *
+   * @param string $fieldNameprefix
+   *   The prefix before the name of the field within the record.
+   * @return array
+   * @throws EndOfFlowException
+   */
+  public function retrieveNextRecord($fieldNameprefix='') {
+    $this->initialize();
+    $row = fgetcsv($this->uriHandle, 0, $this->delimiter, $this->enclosure, $this->escape);
+    if (!$row) {
+      throw new EndOfFlowException();
+    }
+
+    $record = array();
+    foreach($this->dataSpecification->getFields() as $field) {
+      $alias = $field->alias;
+      $col_index = str_replace("col_", "", $field->name);
+      $record[$fieldNameprefix.$alias] = $row[$col_index];
+    }
+    return $record;
+  }
+
+  /**
+   * Returns a name for this data flow.
+   *
+   * @return string
+   */
+  public function getName() {
+    return 'csv';
+  }
+
+}
\ No newline at end of file
diff --git a/Civi/DataProcessor/DataFlow/InMemoryDataFlow.php b/Civi/DataProcessor/DataFlow/InMemoryDataFlow.php
index d1ff6054b9f5bcb63ddea9c12fa88020aa883b6c..d9388af06757adb6fe1503ada3bde6e43ab82f63 100644
--- a/Civi/DataProcessor/DataFlow/InMemoryDataFlow.php
+++ b/Civi/DataProcessor/DataFlow/InMemoryDataFlow.php
@@ -60,7 +60,7 @@ class InMemoryDataFlow extends AbstractDataFlow {
    * @return array
    * @throws EndOfFlowException
    */
-  protected function retrieveNextRecord($fieldNameprefix='') {
+  public function retrieveNextRecord($fieldNameprefix='') {
     if (!isset($this->data[$this->currentPointer])) {
       throw new EndOfFlowException();
     }
@@ -68,7 +68,8 @@ class InMemoryDataFlow extends AbstractDataFlow {
     $record = array();
     foreach($this->dataSpecification->getFields() as $field) {
       $alias = $field->alias;
-      $record[$fieldNameprefix.$field->alias] = $data[$alias];
+      $name = $field->name;
+      $record[$fieldNameprefix.$alias] = $data[$name];
     }
     $this->currentPointer++;
     return $record;
diff --git a/Civi/DataProcessor/DataFlow/MultipleDataFlows/DataFlowDescription.php b/Civi/DataProcessor/DataFlow/MultipleDataFlows/DataFlowDescription.php
index 0f1bf69af3c2f301e42bd29ff3f4281215be4234..ce79d5ec13759355d133075d626520549d2d4e96 100644
--- a/Civi/DataProcessor/DataFlow/MultipleDataFlows/DataFlowDescription.php
+++ b/Civi/DataProcessor/DataFlow/MultipleDataFlows/DataFlowDescription.php
@@ -14,14 +14,14 @@ class DataFlowDescription {
   protected $dataFlow;
 
   /**
-   * @var \Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinSpecification
+   * @var \Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinInterface
    */
   protected $joinSpecification = array();
 
   public function __construct($datFlow, $joinSpecification = null) {
     $this->dataFlow = $datFlow;
     $this->joinSpecification = $joinSpecification;
-    $this->dataFlow->setOffset($this);
+    $this->dataFlow->setDataFlowDescription($this);
   }
 
   /**
@@ -32,7 +32,7 @@ class DataFlowDescription {
   }
 
   /**
-   * @return \Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinSpecification
+   * @return \Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinInterface
    */
   public function getJoinSpecification() {
     return $this->joinSpecification;
diff --git a/Civi/DataProcessor/DataFlow/MultipleDataFlows/JoinInterface.php b/Civi/DataProcessor/DataFlow/MultipleDataFlows/JoinInterface.php
index 362352c88199b1a01be240e308723ecb312d809c..7c0de65a46205652e0985668589730d2b7eb4bc8 100644
--- a/Civi/DataProcessor/DataFlow/MultipleDataFlows/JoinInterface.php
+++ b/Civi/DataProcessor/DataFlow/MultipleDataFlows/JoinInterface.php
@@ -12,15 +12,14 @@ use Civi\DataProcessor\ProcessorType\AbstractProcessorType;
 interface JoinInterface{
 
   /**
-   * Validates the right record against the left record and returns true when the right record
-   * has a successfull join with the left record. Otherwise false.
+   * Joins the records sets and return the new created set.
    *
-   * @param $left_record
-   * @param $right_record
+   * @param $left_record_set
+   * @param $right_record_set
    *
-   * @return mixed
+   * @return array
    */
-  public function isJoinable($left_record, $right_record);
+  public function join($left_record_set, $right_record_set);
 
   /**
    * Returns true when this join is compatible with this data flow
@@ -58,4 +57,14 @@ interface JoinInterface{
    */
   public function getConfigurationUrl();
 
+  /**
+   * Prepares the right data flow based on the data in the left record set.
+   *
+   * @param $left_record_set
+   * @param \Civi\DataProcessor\DataFlow\AbstractDataFlow $rightDataFlow
+   *
+   * @return \Civi\DataProcessor\DataFlow\AbstractDataFlow
+   */
+  public function prepareRightDataFlow($left_record_set, AbstractDataFlow $rightDataFlow);
+
 }
\ No newline at end of file
diff --git a/Civi/DataProcessor/DataFlow/MultipleDataFlows/SimpleJoin.php b/Civi/DataProcessor/DataFlow/MultipleDataFlows/SimpleJoin.php
index ba9ece8f3e0a64428db8fd92696e2150920cd469..a6f47dc2ec9b81a546b721fc4c23294d4cd30c31 100644
--- a/Civi/DataProcessor/DataFlow/MultipleDataFlows/SimpleJoin.php
+++ b/Civi/DataProcessor/DataFlow/MultipleDataFlows/SimpleJoin.php
@@ -28,6 +28,30 @@ class SimpleJoin implements JoinInterface, SqlJoinInterface {
    */
   protected $right_field;
 
+  /**
+   * @var string
+   *   The alias of the left field
+   */
+  protected $left_field_alias;
+
+  /**
+   * @var string
+   *   The alias of the right field
+   */
+  protected $right_field_alias;
+
+  /**
+   * @var \Civi\DataProcessor\DataSpecification\FieldSpecification
+   *   The alias of the left field
+   */
+  protected $leftFieldSpec;
+
+  /**
+   * @var \Civi\DataProcessor\DataSpecification\FieldSpecification
+   *   The alias of the right field
+   */
+  protected $rightFieldSpec;
+
   /**
    * @var string
    *   The prefix for the left field, or in SQL join mode the left table
@@ -50,6 +74,16 @@ class SimpleJoin implements JoinInterface, SqlJoinInterface {
    */
   protected $left_table;
 
+  /**
+   * @var \Civi\DataProcessor\Source\SourceInterface
+   */
+  protected $left_source;
+
+  /**
+   * @var \Civi\DataProcessor\Source\SourceInterface
+   */
+  protected $right_source;
+
   /**
    * @var String
    *   The join type, e.g. INNER, LEFT, OUT etc..
@@ -61,6 +95,11 @@ class SimpleJoin implements JoinInterface, SqlJoinInterface {
    */
   private $dataProcessor;
 
+  /**
+   * @var \Civi\DataProcessor\DataFlow\SqlDataFlow\OrClause
+   */
+  private $rightClause = null;
+
   public function __construct($left_prefix = null, $left_field = null, $right_prefix = null, $right_field = null, $type = "INNER") {
     $this->left_prefix = $left_prefix;
     $this->left_field = $left_field;
@@ -135,22 +174,30 @@ class SimpleJoin implements JoinInterface, SqlJoinInterface {
     }
     if ($this->left_prefix && $this->left_field) {
       $this->left_table = $this->left_prefix;
-      $left_source = $this->dataProcessor->getDataSourceByName($this->left_prefix);
-      if ($left_source) {
-        $leftTable = $left_source->ensureField($this->left_field);
+      $this->left_source = $this->dataProcessor->getDataSourceByName($this->left_prefix);
+      if ($this->left_source) {
+        $leftTable = $this->left_source->ensureField($this->left_field);
         if ($leftTable && $leftTable instanceof SqlTableDataFlow) {
           $this->left_table = $leftTable->getTableAlias();
         }
+        $this->leftFieldSpec = $this->left_source->getAvailableFields()->getFieldSpecificationByName($this->left_field);
+        if ($this->leftFieldSpec) {
+          $this->left_field_alias = $this->leftFieldSpec->alias;
+        }
       }
     }
     if ($this->right_prefix && $this->right_field) {
       $this->right_table = $this->right_prefix;
-      $right_source = $this->dataProcessor->getDataSourceByName($this->right_prefix);
-      if ($right_source) {
-        $rightTable = $right_source->ensureField($this->right_field);
+      $this->right_source = $this->dataProcessor->getDataSourceByName($this->right_prefix);
+      if ($this->right_source) {
+        $rightTable = $this->right_source->ensureField($this->right_field);
         if ($rightTable && $rightTable instanceof SqlTableDataFlow) {
           $this->right_table = $rightTable->getTableAlias();
         }
+        $this->rightFieldSpec = $this->right_source->getAvailableFields()->getFieldSpecificationByName($this->right_field);
+        if ($this->rightFieldSpec) {
+          $this->right_field_alias = $this->rightFieldSpec->alias;
+        }
       }
     }
 
@@ -159,30 +206,80 @@ class SimpleJoin implements JoinInterface, SqlJoinInterface {
   }
 
   /**
-   * Validates the right record against the left record and returns true when the right record
-   * has a successfull join with the left record. Otherwise false.
+   * Joins the records sets and return the new created set.
    *
-   * @param $left_record
-   * @param $right_record
+   * @param $left_record_set
+   * @param $right_record_set
    *
-   * @return mixed
+   * @return array
    */
-  public function isJoinable($left_record, $right_record) {
-    if (isset($left_record[$this->left_prefix.$this->left_field]) && isset($right_record[$this->right_prefix.$this->right_field])) {
-      if ($left_record[$this->left_prefix.$this->left_field] == $right_record[$this->right_prefix.$this->right_field]) {
-        return TRUE;
-      }
-    } elseif ($this->type == 'LEFT') {
-      if (isset($left_record[$this->left_prefix.$this->left_field]) && !isset($right_record[$this->right_prefix.$this->right_field])) {
-        return true;
+  public function join($left_record_set, $right_record_set) {
+    $joined_record_set = array();
+    if ($this->type == 'INNER' || $this->type == 'LEFT') {
+      foreach ($left_record_set as $left_index => $left_record) {
+        $is_record_present_in_right_set = FALSE;
+        foreach ($right_record_set as $right_index => $right_record) {
+          if (isset($left_record[$this->left_field_alias]) && isset($right_record[$this->right_field_alias])) {
+            if ($left_record[$this->left_field_alias] == $right_record[$this->right_field_alias]) {
+              $joined_record_set[] = array_merge($left_record, $right_record);
+              $is_record_present_in_right_set = TRUE;
+            }
+          }
+        }
+        if (!$is_record_present_in_right_set && $this->type == 'LEFT') {
+          $joined_record_set[] = $left_record;
+        }
       }
     } elseif ($this->type == 'RIGHT') {
-      if (!isset($left_record[$this->left_prefix.$this->left_field]) && isset($right_record[$this->right_prefix.$this->right_field])) {
-        return true;
+      foreach ($right_record_set as $right_index => $right_record) {
+        $is_record_present_in_left_set = FALSE;
+        foreach ($left_record_set as $left_index => $left_record) {
+          if (isset($left_record[$this->left_field_alias]) && isset($right_record[$this->right_field_alias])) {
+            if ($left_record[$this->left_field_alias] == $right_record[$this->right_field_alias]) {
+              $joined_record_set[] = array_merge($left_record, $right_record);
+              $is_record_present_in_left_set = TRUE;
+            }
+          }
+        }
+        if (!$is_record_present_in_left_set && $this->type == 'RIGHT') {
+          $joined_record_set[] = $right_record;
+        }
       }
     }
+    return $joined_record_set;
+  }
 
-    return false;
+  /**
+   * Prepares the right data flow based on the data in the left record set.
+   *
+   * @param $left_record_set
+   * @param \Civi\DataProcessor\DataFlow\AbstractDataFlow $rightDataFlow
+   *
+   * @return AbstractDataFlow
+   * @throws \Exception
+   */
+  public function prepareRightDataFlow($left_record_set, AbstractDataFlow $rightDataFlow) {
+    if ($rightDataFlow instanceof SqlTableDataFlow) {
+      if ($this->rightClause) {
+        $rightDataFlow->removeWhereClause($this->rightClause);
+      }
+      $table = $rightDataFlow->getTableAlias();
+      $this->rightClause = new SqlDataFlow\OrClause();
+      foreach ($left_record_set as $left_record) {
+        if (isset($left_record[$this->left_field_alias])) {
+          $value = $left_record[$this->left_field_alias];
+          $this->rightClause->addWhereClause(new SqlDataFlow\SimpleWhereClause($table, $this->right_field, '=', $value));
+
+          // Make sure the join field is also available in the select statement of the query.
+          if (!$rightDataFlow->getDataSpecification()->doesFieldExist($this->right_field_alias)) {
+            $rightDataFlow->getDataSpecification()
+              ->addFieldSpecification($this->right_field_alias, $this->rightFieldSpec);
+          }
+        }
+      }
+      $rightDataFlow->addWhereClause($this->rightClause);
+    }
+    return $rightDataFlow;
   }
 
   /**
diff --git a/Civi/DataProcessor/DataFlow/SqlDataFlow.php b/Civi/DataProcessor/DataFlow/SqlDataFlow.php
index d3999e371e1e4854d627a9c94d7e4c63005dbc41..9845d64bf546416c3d8b6f64350dacfcfcd3cc71 100644
--- a/Civi/DataProcessor/DataFlow/SqlDataFlow.php
+++ b/Civi/DataProcessor/DataFlow/SqlDataFlow.php
@@ -24,9 +24,9 @@ abstract class SqlDataFlow extends AbstractDataFlow {
 
   protected $whereClauses = array();
 
-  protected $sqlStatement;
+  protected $sqlStatements = array();
 
-  protected $sqlCountStatement;
+  protected $sqlCountStatements = array();
 
   /**
    * Returns an array with the fields for in the select statement in the sql query.
@@ -68,7 +68,7 @@ abstract class SqlDataFlow extends AbstractDataFlow {
       $orderBy = $this->getOrderByStatement();
 
       $countSql = "SELECT COUNT(*) {$from} {$where} {$groupBy}";
-      $this->sqlCountStatement = $countSql;
+      $this->sqlCountStatements[] = $countSql;
       $this->count = \CRM_Core_DAO::singleValueQuery($countSql);
 
       $sql = "{$this->getSelectQueryStatement()} {$where} {$groupBy} {$orderBy}";
@@ -82,17 +82,21 @@ abstract class SqlDataFlow extends AbstractDataFlow {
         $limitStatement = "LIMIT 0, {$this->limit}";
       }
       elseif ($this->offset !== FALSE && $this->limit === FALSE) {
+        echo $this->getName();
+        var_dump($this->offset);
+
         $calculatedLimit = $this->count - $this->offset;
         $limitStatement = "LIMIT {$this->offset}, {$calculatedLimit}";
       }
       $sql .= " {$limitStatement}";
-      $this->sqlStatement = $sql;
+      $this->sqlStatements[] = $sql;
       $this->dao = \CRM_Core_DAO::executeQuery($sql);
     } catch (\Exception $e) {
       throw new \Exception(
-        "Error in query. 
-        \r\nCount query: {$this->sqlCountStatement}
-        \r\nQuery: $this->sqlStatement", 0, $e);
+        "Error in DataFlow query.
+        \r\nData flow: {$this->getName()}
+        \r\nCount query: {$countSql}
+        \r\nQuery: $sql", 0, $e);
     }
   }
 
@@ -126,7 +130,7 @@ abstract class SqlDataFlow extends AbstractDataFlow {
    * @return array
    * @throws EndOfFlowException
    */
-  protected function retrieveNextRecord($fieldNamePrefix='') {
+  public function retrieveNextRecord($fieldNamePrefix='') {
     if (!$this->isInitialized()) {
       $this->initialize();
     }
@@ -137,7 +141,7 @@ abstract class SqlDataFlow extends AbstractDataFlow {
     $record = array();
     foreach($this->dataSpecification->getFields() as $field) {
       $alias = $field->alias;
-      $record[$fieldNamePrefix.$field->alias] = $this->dao->$alias;
+      $record[$fieldNamePrefix.$alias] = $this->dao->$alias;
     }
     return $record;
   }
@@ -250,7 +254,10 @@ abstract class SqlDataFlow extends AbstractDataFlow {
    * @return string
    */
   public function getDebugInformation() {
-    return $this->sqlStatement;
+    return array(
+      'query' => $this->sqlStatements,
+      'count query' => $this->sqlCountStatements,
+    );
   }
 
 }
\ No newline at end of file
diff --git a/Civi/DataProcessor/DataFlow/SqlDataFlow/OrClause.php b/Civi/DataProcessor/DataFlow/SqlDataFlow/OrClause.php
new file mode 100644
index 0000000000000000000000000000000000000000..903d7d9e660d649290bdc432406c7a5ddb6ea7cc
--- /dev/null
+++ b/Civi/DataProcessor/DataFlow/SqlDataFlow/OrClause.php
@@ -0,0 +1,53 @@
+<?php
+/**
+ * @author Jaap Jansma <jaap.jansma@civicoop.org>
+ * @license AGPL-3.0
+ */
+
+namespace Civi\DataProcessor\DataFlow\SqlDataFlow;
+
+use Civi\DataProcessor\DataFlow\SqlDataFlow\WhereClauseInterface;
+
+class OrClause implements WhereClauseInterface {
+
+  /**
+   * @var WhereClauseInterface[]
+   */
+  protected $clauses = array();
+
+  /**
+   * OrClause constructor.
+   *
+   * @param WhereClauseInterface[] $clauses
+   */
+  public function __construct($clauses=array()) {
+    $this->clauses = $clauses;
+  }
+
+  /**
+   * Add a where clause to this clause
+   *
+   * @param \Civi\DataProcessor\DataFlow\SqlDataFlow\WhereClauseInterface $clause
+   */
+  public function addWhereClause(WhereClauseInterface $clause) {
+    $this->clauses[] = $clause;
+  }
+
+  /**
+   * Returns the where clause
+   * E.g. contact_type = 'Individual'
+   *
+   * @return string
+   */
+  public function getWhereClause() {
+    if (count($this->clauses)) {
+      $clauses = array();
+      foreach($this->clauses as $clause) {
+        $clauses[] = "(". $clause->getWhereClause() . ")";
+      }
+      return "(" . implode(" OR ", $clauses) . ")";
+    }
+    return "1";
+  }
+
+}
\ No newline at end of file
diff --git a/Civi/DataProcessor/FieldOutputHandler/AbstractFieldOutputHandler.php b/Civi/DataProcessor/FieldOutputHandler/AbstractFieldOutputHandler.php
index f9d6c0daba3339414bc618f936d900e7bbd89f1d..37442dcbe6023a57623528157de5d169ab139d4f 100644
--- a/Civi/DataProcessor/FieldOutputHandler/AbstractFieldOutputHandler.php
+++ b/Civi/DataProcessor/FieldOutputHandler/AbstractFieldOutputHandler.php
@@ -15,6 +15,11 @@ abstract class AbstractFieldOutputHandler {
    */
   protected $outputFieldSpecification;
 
+  /**
+   * @var \Civi\DataProcessor\Source\SourceInterface
+   */
+  protected $dataSource;
+
   /**
    * Returns the name of the handler type.
    *
@@ -46,8 +51,20 @@ abstract class AbstractFieldOutputHandler {
    */
   abstract public function formatField($rawRecord, $formattedRecord);
 
-  public function __construct() {
-    $this->outputFieldSpecification = new FieldSpecification($this->getName(), $this->getType(), $this->getName());
+  /**
+   * AbstractFieldOutputHandler constructor.
+   *
+   * @param \Civi\DataProcessor\Source\SourceInterface $dataSource
+   */
+  public function __construct(\Civi\DataProcessor\Source\SourceInterface $dataSource) {
+    $this->dataSource = $dataSource;
+  }
+
+  /**
+   * @return \Civi\DataProcessor\Source\SourceInterface
+   */
+  public function getDataSource() {
+    return $this->dataSource;
   }
 
   /**
diff --git a/Civi/DataProcessor/FieldOutputHandler/RawFieldOutputHandler.php b/Civi/DataProcessor/FieldOutputHandler/RawFieldOutputHandler.php
index 28d8c8cb30a0079b55d5197109d4fc28ff9676f0..9735e834a7766ccab97bbe348f181297794fe9df 100644
--- a/Civi/DataProcessor/FieldOutputHandler/RawFieldOutputHandler.php
+++ b/Civi/DataProcessor/FieldOutputHandler/RawFieldOutputHandler.php
@@ -17,13 +17,8 @@ class RawFieldOutputHandler extends AbstractFieldOutputHandler implements Output
    */
   protected $inputFieldSpec;
 
-  /**
-   * @var \Civi\DataProcessor\Source\SourceInterface
-   */
-  protected $dataSource;
-
   public function __construct(FieldSpecification $inputFieldSpec, SourceInterface $dataSource) {
-    $this->dataSource = $dataSource;
+    parent::__construct($dataSource);
     $this->inputFieldSpec = $inputFieldSpec;
     $this->outputFieldSpecification = clone $inputFieldSpec;
     $this->outputFieldSpecification->alias = $this->getName();
diff --git a/Civi/DataProcessor/Source/CSV.php b/Civi/DataProcessor/Source/CSV.php
index bd5deb41c450445731d30550b9b18680eb621fe6..226795df26844f3589c1d45413f5ad762afd4238 100644
--- a/Civi/DataProcessor/Source/CSV.php
+++ b/Civi/DataProcessor/Source/CSV.php
@@ -7,6 +7,7 @@
 namespace Civi\DataProcessor\Source;
 
 
+use Civi\DataProcessor\DataFlow\CsvDataFlow;
 use Civi\DataProcessor\DataFlow\InMemoryDataFlow;
 use Civi\DataProcessor\DataSpecification\DataSpecification;
 use Civi\DataProcessor\DataSpecification\FieldSpecification;
@@ -33,31 +34,25 @@ class CSV extends AbstractSource {
     }
 
     $this->availableFields = new DataSpecification();
-
-    // Read the header row
-    $handle = fopen('/var/www/html/test.csv', 'r');
-    if (!$handle) {
-      return;
+    $uri = $this->configuration['uri'];
+    $skipRows = 0;
+    $headerRowNumber = 0;
+    if (isset($this->configuration['first_row_as_header']) && $this->configuration['first_row_as_header']) {
+      $skipRows = 1;
+      $headerRowNumber = 1;
     }
+    $delimiter = $this->configuration['delimiter'];
+    $enclosure = $this->configuration['enclosure'];
+    $escape = $this->configuration['escape'];
+    $this->dataFlow = new CsvDataFlow($uri, $skipRows, $delimiter, $enclosure, $escape);
+    $this->headerRow = $this->dataFlow->getHeaderRow($headerRowNumber);
 
-    $this->headerRow = fgetcsv($handle);
     foreach($this->headerRow as  $idx => $colName) {
-      $field = new FieldSpecification('col_'.$idx, 'String', $colName);
-      $this->availableFields->addFieldSpecification('col_'.$idx, $field);
+      $name = 'col_'.$idx;
+      $alias = $this->getSourceName().$name;
+      $field = new FieldSpecification($name, 'String', $colName, null, $alias);
+      $this->availableFields->addFieldSpecification($name, $field);
     }
-    $this->rows = array();
-    while ($row = fgetcsv($handle)) {
-      $record = array();
-      foreach ($row as $idx => $value) {
-        $record['col_'.$idx] = $value;
-      }
-
-      $this->rows[] = $record;
-    }
-    fclose($handle);
-
-    $this->dataFlow = new InMemoryDataFlow($this->rows);
-
   }
 
 
@@ -72,7 +67,10 @@ class CSV extends AbstractSource {
    */
   public function ensureField($fieldName) {
     $field = $this->getAvailableFields()->getFieldSpecificationByName($fieldName);
-    $this->dataFlow->getDataSpecification()->addFieldSpecification($fieldName, $field);
+    if ($field) {
+      $this->dataFlow->getDataSpecification()
+        ->addFieldSpecification($fieldName, $field);
+    }
     return $this->dataFlow;
   }
 
@@ -122,7 +120,7 @@ class CSV extends AbstractSource {
    * @return \Civi\DataProcessor\DataSpecification\AggregationField[]
    */
   public function getAvailableAggregationFields() {
-    return $this->availableFields;
+    return array();
   }
 
   /**
@@ -131,7 +129,7 @@ class CSV extends AbstractSource {
    * @return false|string
    */
   public function getConfigurationUrl() {
-    return false;
+    return 'civicrm/dataprocessor/form/source/csv';
   }
 
 }
\ No newline at end of file
diff --git a/templates/CRM/Dataprocessor/Form/Source/Csv.tpl b/templates/CRM/Dataprocessor/Form/Source/Csv.tpl
new file mode 100644
index 0000000000000000000000000000000000000000..b1f0120819939a138dde51df26a912fab2b1e739
--- /dev/null
+++ b/templates/CRM/Dataprocessor/Form/Source/Csv.tpl
@@ -0,0 +1,39 @@
+{crmScope extensionKey='dataprocessor'}
+    <div class="crm-submit-buttons">
+        {include file="CRM/common/formButtons.tpl" location="top"}
+    </div>
+
+    {* block for rule data *}
+    <h3>{ts}Data Processor Sources configuration{/ts}</h3>
+    <div class="crm-block crm-form-block crm-data-processor_source_configuration-block">
+        <div class="crm-section">
+            <div class="label">{$form.uri.label}</div>
+            <div class="content">{$form.uri.html}</div>
+            <div class="clear"></div>
+        </div>
+        <div class="crm-section">
+            <div class="label">&nbsp;</div>
+            <div class="content">{$form.first_row_as_header.html} &nbsp; {$form.first_row_as_header.label}</div>
+            <div class="clear"></div>
+        </div>
+        <div class="crm-section">
+            <div class="label">{$form.delimiter.label}</div>
+            <div class="content">{$form.delimiter.html}</div>
+            <div class="clear"></div>
+        </div>
+        <div class="crm-section">
+            <div class="label">{$form.enclosure.label}</div>
+            <div class="content">{$form.enclosure.html}</div>
+            <div class="clear"></div>
+        </div>
+        <div class="crm-section">
+            <div class="label">{$form.escape.label}</div>
+            <div class="content">{$form.escape.html}</div>
+            <div class="clear"></div>
+        </div>
+    </div>
+
+    <div class="crm-submit-buttons">
+        {include file="CRM/common/formButtons.tpl" location="bottom"}
+    </div>
+{/crmScope}
\ No newline at end of file
diff --git a/xml/Menu/dataprocessor.xml b/xml/Menu/dataprocessor.xml
index 80dc1fcbf45215f478d6e2200a8cc2119256615f..89761078ab4063223c0f81a4be4fa22fbdcd7b46 100644
--- a/xml/Menu/dataprocessor.xml
+++ b/xml/Menu/dataprocessor.xml
@@ -63,6 +63,13 @@
     <access_arguments>access CiviCRM</access_arguments>
     <access_arguments>administer CiviCRM</access_arguments>
   </item>
+  <item>
+    <path>civicrm/dataprocessor/form/source/csv</path>
+    <page_callback>CRM_Dataprocessor_Form_Source_Csv</page_callback>
+    <title>DataProcessor</title>
+    <access_arguments>access CiviCRM</access_arguments>
+    <access_arguments>administer CiviCRM</access_arguments>
+  </item>
   <item>
     <path>civicrm/dataprocessor/form/output/api</path>
     <page_callback>CRM_Dataprocessor_Form_Output_API</page_callback>