Commit d4764c8c authored by jaapjansma's avatar jaapjansma
Browse files

Refactor aggregation #3 aggegation on new sql data flows

parent 5d5f9372
......@@ -13,7 +13,7 @@
* Added filter for searching contacts with a certain type.
* Added filter to respect the ACL. So that a user only sees the contacts he is allowed to see.
* Removed the title attribute from the outputs as those don't make sense.
* Added fields with aggregation.
* Refactored aggregation functionality and added aggregation function field.
* Fixed issue with updating navigation after editing an output.
# Version 1.0.7
......
......@@ -7,6 +7,7 @@
namespace Civi\DataProcessor\DataFlow;
use \Civi\DataProcessor\DataFlow\MultipleDataFlows\DataFlowDescription;
use Civi\DataProcessor\DataFlow\Utils\Aggregator;
use \Civi\DataProcessor\DataSpecification\DataSpecification;
use Civi\DataProcessor\DataSpecification\FieldSpecification;
use \Civi\DataProcessor\FieldOutputHandler\AbstractFieldOutputHandler;
......@@ -144,13 +145,21 @@ abstract class AbstractDataFlow {
public function allRecords($fieldNameprefix = '') {
if (!is_array($this->_allRecords)) {
$this->_allRecords = [];
$_allRecords = [];
try {
while ($record = $this->retrieveNextRecord($fieldNameprefix)) {
$this->_allRecords[] = $this->formatRecordOutput($record);
$_allRecords[] = $record;
}
} catch (EndOfFlowException $e) {
// Do nothing
}
if (count($this->aggregateFields)) {
$aggregator = new Aggregator($_allRecords, $this->aggregateFields, $this->dataSpecification);
$_allRecords = $aggregator->aggregateRecords($fieldNameprefix);
}
foreach($_allRecords as $record) {
$this->_allRecords[] = $this->formatRecordOutput($record);
}
usort($this->_allRecords, array($this, 'sort'));
}
......@@ -271,4 +280,4 @@ abstract class AbstractDataFlow {
}
}
\ No newline at end of file
}
<?php
/**
* @author Jaap Jansma <jaap.jansma@civicoop.org>
* @license AGPL-3.0
*/
namespace Civi\DataProcessor\DataFlow\Utils;
use Civi\DataProcessor\DataSpecification\Aggregatable;
use Civi\DataProcessor\DataSpecification\FieldSpecification;
class Aggregator {
/**
* @var array
*/
protected $records;
/**
* @var FieldSpecification[]
*/
protected $aggregateFields = array();
/**
* @var \Civi\DataProcessor\DataSpecification\DataSpecification
*/
protected $dataSpecification = array();
public function __construct($records, $aggregateFields, $dataSpecification) {
$this->records = $records;
$this->aggregateFields = $aggregateFields;
$this->dataSpecification = $dataSpecification;
}
public function aggregateRecords($fieldNameprefix="") {
$aggregatedRecrodSets = array();
foreach($this->records as $record) {
$key = $this->getAggregationKeyFromRecord($record, $fieldNameprefix);
$aggregatedRecrodSets[$key][] = $record;
}
$newRecordSet = array();
foreach($aggregatedRecrodSets as $aggregatedSet) {
$firstRecord = reset($aggregatedSet);
$newRecord = array();
foreach($this->dataSpecification->getFields() as $fieldSpecification) {
if ($fieldSpecification instanceof Aggregatable) {
$newRecord[$fieldNameprefix.$fieldSpecification->alias] = $fieldSpecification->aggregateRecords($aggregatedSet, $fieldNameprefix.$fieldSpecification->alias);
} elseif (isset($firstRecord[$fieldNameprefix.$fieldSpecification->alias])) {
$newRecord[$fieldNameprefix.$fieldSpecification->alias] = $firstRecord[$fieldNameprefix.$fieldSpecification->alias];
}
}
$newRecordSet[] = $newRecord;
}
return $newRecordSet;
}
protected function getAggregationKeyFromRecord($record, $fieldNameprefix="") {
$key = '';
foreach($this->aggregateFields as $field) {
$alias = $field->alias;
if (isset($record[$fieldNameprefix.$alias])) {
$key .= $record[$fieldNameprefix.$alias].'_';
} else {
$key .= 'null_';
}
}
return $key;
}
}
<?php
/**
* @author Jaap Jansma <jaap.jansma@civicoop.org>
* @license AGPL-3.0
*/
namespace Civi\DataProcessor\DataSpecification;
interface Aggregatable {
/**
* Aggregate the field in all the records and return the aggregated value.
*
* @param $records
* @param string $fieldName
*
* @return mixed
*/
public function aggregateRecords($records, $fieldName="");
}
......@@ -14,10 +14,73 @@ use CRM_Dataprocessor_ExtensionUtil as E;
*
* @package Civi\DataProcessor\DataSpecification
*/
class AggregateFunctionFieldSpecification extends FieldSpecification {
class AggregateFunctionFieldSpecification extends FieldSpecification implements Aggregatable {
protected $function;
/**
* Aggregate the field in all the records and return the aggregated value.
*
* @param $records
* @param string $fieldName
*
* @return mixed
*/
public function aggregateRecords($records, $fieldName="") {
$values = array();
$value = 0;
$processedValues = array();
foreach($records as $record) {
if (isset($record[$fieldName])) {
switch ($this->function) {
case 'SUM':
$value += $record[$fieldName];
break;
case 'COUNT':
$value ++;
break;
case 'COUNT_DISTINCT':
if (!in_array($record[$fieldName], $processedValues)) {
$value ++;
$processedValues[] = $record[$fieldName];
}
break;
case 'MIN':
if ($record[$fieldName] < $value) {
$value = $record[$fieldName];
}
break;
case 'MAX':
if ($record[$fieldName] > $value) {
$value = $record[$fieldName];
}
break;
case 'AVG':
case 'STDDEV_POP':
case 'STDDEV_SAMP':
$values[] = $record[$fieldName];
break;
};
}
}
switch ($this->function) {
case 'AVG':
$value = array_sum($values) / count($values);
break;
case 'STDDEV_POP':
$value = $this->stats_standard_deviation($values, false);
break;
case 'VAR_POP':
$avg = array_sum($values) / count($values);
$value = array_sum(array_map(function ($x) use ($avg) {
return pow($x - $avg, 2);
}, $values)) / count($values);
break;
}
return $value;
}
public function setAggregateFunction($function) {
$functions = self::functionList();
if (!isset($functions[$function])) {
......@@ -69,12 +132,44 @@ class AggregateFunctionFieldSpecification extends FieldSpecification {
'MIN' => E::ts('Minimum'),
'MAX' => E::ts('Maximum'),
'STDDEV_POP' => E::ts('Standard deviation'),
'STDDEV_SAMP' => E::ts('Sample standard deviation'),
'VAR_POP' => E::ts('Standard variance'),
'VAR_SAMP' => E::ts('Sample variance'),
'COUNT' => E::ts('Count'),
'COUNT_DISTINCT' => E::ts('Distinct count')
];
}
/**
* Taken from https://www.php.net/manual/en/function.stats-standard-deviation.php#114473
*
* This user-land implementation follows the implementation quite strictly;
* it does not attempt to improve the code or algorithm in any way. It will
* raise a warning if you have fewer than 2 values in your array, just like
* the extension does (although as an E_USER_WARNING, not E_WARNING).
*
* @param array $a
* @param bool $sample [optional] Defaults to false
* @return float|bool The standard deviation or false on error.
*/
private function stats_standard_deviation(array $a, $sample = false) {
$n = count($a);
if ($n === 0) {
trigger_error("The array has zero elements", E_USER_WARNING);
return false;
}
if ($sample && $n === 1) {
trigger_error("The array has only 1 element", E_USER_WARNING);
return false;
}
$mean = array_sum($a) / $n;
$carry = 0.0;
foreach ($a as $val) {
$d = ((double) $val) - $mean;
$carry += $d * $d;
};
if ($sample) {
--$n;
}
return sqrt($carry / $n);
}
}
......@@ -215,6 +215,7 @@ class AggregateFunctionFieldOutputHandler extends AbstractSimpleFieldOutputHandl
case 'Integer':
case 'Float':
case 'Money':
case 'String':
return true;
break;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment