diff --git a/Civi/DataProcessor/DataFlow/SqlDataFlow.php b/Civi/DataProcessor/DataFlow/SqlDataFlow.php
index 6b41a363b9adbff01786078e910aba7dc225b990..f60d2ad017c310673420f65b2d4abfd6afcb8bb8 100644
--- a/Civi/DataProcessor/DataFlow/SqlDataFlow.php
+++ b/Civi/DataProcessor/DataFlow/SqlDataFlow.php
@@ -67,9 +67,13 @@ abstract class SqlDataFlow extends AbstractDataFlow {
       $groupBy = $this->getGroupByStatement();
       $orderBy = $this->getOrderByStatement();
 
-      $countSql = "SELECT COUNT(*) {$from} {$where} {$groupBy}";
+      $countSql = "SELECT COUNT(*) AS `count` {$from} {$where} {$groupBy}";
       $this->sqlCountStatements[] = $countSql;
-      $this->count = \CRM_Core_DAO::singleValueQuery($countSql);
+      $countDao = \CRM_Core_DAO::executeQuery($countSql);
+      $this->count = 0;
+      while($countDao->fetch()) {
+        $this->count = $this->count + $countDao->count;
+      }
 
       $sql = "{$this->getSelectQueryStatement()} {$where} {$groupBy} {$orderBy}";
 
diff --git a/Civi/DataProcessor/DataFlow/SqlTableDataFlow.php b/Civi/DataProcessor/DataFlow/SqlTableDataFlow.php
index d73df4e5d680e05e5936594e9d0fe9ff83c8ed47..13b93fbd3b398b39f30330f6b8eb7998f10d6683 100644
--- a/Civi/DataProcessor/DataFlow/SqlTableDataFlow.php
+++ b/Civi/DataProcessor/DataFlow/SqlTableDataFlow.php
@@ -8,6 +8,7 @@ namespace Civi\DataProcessor\DataFlow;
 
 use Civi\DataProcessor\DataSpecification\DataSpecification;
 use Civi\DataProcessor\DataSpecification\FieldSpecification;
+use Civi\DataProcessor\DataSpecification\SqlFieldSpecification;
 
 class SqlTableDataFlow extends SqlDataFlow {
 
@@ -52,7 +53,11 @@ class SqlTableDataFlow extends SqlDataFlow {
   public function getFieldsForSelectStatement() {
     $fields = array();
     foreach($this->getDataSpecification()->getFields() as $field) {
-      $fields[] = "`{$this->table_alias}`.`{$field->name}` AS `{$field->alias}`";
+      if ($field instanceof SqlFieldSpecification) {
+        $fields[] = $field->getSqlSelectStatement($this->table_alias);
+      } else {
+        $fields[] = "`{$this->table_alias}`.`{$field->name}` AS `{$field->alias}`";
+      }
     }
     return $fields;
   }
diff --git a/Civi/DataProcessor/DataSpecification/CountFieldSpecification.php b/Civi/DataProcessor/DataSpecification/CountFieldSpecification.php
new file mode 100644
index 0000000000000000000000000000000000000000..c75ed757d5c6ade369c1fcc0550fb11530e70776
--- /dev/null
+++ b/Civi/DataProcessor/DataSpecification/CountFieldSpecification.php
@@ -0,0 +1,22 @@
+<?php
+/**
+ * @author Jaap Jansma <jaap.jansma@civicoop.org>
+ * @license AGPL-3.0
+ */
+
+namespace Civi\DataProcessor\DataSpecification;
+
+class CountFieldSpecification extends FieldSpecification {
+
+  /**
+   * Returns the select statement for this field.
+   * E.g. COUNT(civicrm_contact.id) AS contact_id_count
+   *
+   * @param String $table_alias
+   * @return string
+   */
+  public function getSqlSelectStatement($table_alias) {
+    return "COUNT(`{$table_alias}`.`{$this->name}`) AS `{$this->alias}`";
+  }
+
+}
\ No newline at end of file
diff --git a/Civi/DataProcessor/DataSpecification/FieldSpecification.php b/Civi/DataProcessor/DataSpecification/FieldSpecification.php
index 225e2c2acff5fdfe6a490f7b31aa534b6180f46b..22d2eba9e07aa0d9124169790c9a23a009639c7d 100644
--- a/Civi/DataProcessor/DataSpecification/FieldSpecification.php
+++ b/Civi/DataProcessor/DataSpecification/FieldSpecification.php
@@ -6,7 +6,7 @@
 
 namespace Civi\DataProcessor\DataSpecification;
 
-class FieldSpecification {
+class FieldSpecification implements SqlFieldSpecification {
 
   /**
    * @var String
@@ -49,4 +49,15 @@ class FieldSpecification {
     return $this->options;
   }
 
+  /**
+   * Returns the select statement for this field.
+   * E.g. COUNT(civicrm_contact.id) AS contact_id_count
+   *
+   * @param String $table_alias
+   * @return string
+   */
+  public function getSqlSelectStatement($table_alias) {
+    return "`{$table_alias}`.`{$this->name}` AS `{$this->alias}`";
+  }
+
 }
\ No newline at end of file
diff --git a/Civi/DataProcessor/DataSpecification/SqlFieldSpecification.php b/Civi/DataProcessor/DataSpecification/SqlFieldSpecification.php
new file mode 100644
index 0000000000000000000000000000000000000000..b917539e28ff4a541ca7223ca9df2ce9f2817214
--- /dev/null
+++ b/Civi/DataProcessor/DataSpecification/SqlFieldSpecification.php
@@ -0,0 +1,20 @@
+<?php
+/**
+ * @author Jaap Jansma <jaap.jansma@civicoop.org>
+ * @license AGPL-3.0
+ */
+
+namespace Civi\DataProcessor\DataSpecification;
+
+interface SqlFieldSpecification {
+
+  /**
+   * Returns the select statement for this field.
+   * E.g. COUNT(civicrm_contact.id) AS contact_id_count
+   *
+   * @param String $table_alias
+   * @return String
+   */
+  public function getSqlSelectStatement($table_alias);
+
+}
\ No newline at end of file