Skip to content
Snippets Groups Projects
Commit ef103602 authored by jaapjansma's avatar jaapjansma
Browse files

Added aggregation for activity data source so you can aggregate on the most...

Added aggregation for activity data source so you can aggregate on the most recent, the least recent activity.
parent ed0982ad
No related branches found
No related tags found
No related merge requests found
Showing
with 323 additions and 85 deletions
......@@ -2,6 +2,7 @@
* Fix issue filtering for multiple tags !66
* Added aggregation for membership data source so you can aggregate on the most recent, the least recent memberships.
* Added aggregation for activity data source so you can aggregate on the most recent, the least recent activity.
* Fixed issue with Field filter and operator is not one of on a multi value field.
* Contact Search Output is now also able to create smart groups and send bulk mails (with a hidden smarty group). See #32.
* Fixed issue with Field Filter on relationship type and also made relationship type a non required filter on the data source.
......
......@@ -317,5 +317,20 @@ abstract class AbstractDataFlow {
return $records;
}
/**
* When an object is cloned, PHP 5 will perform a shallow copy of all of the
* object's properties. Any properties that are references to other
* variables, will remain references. Once the cloning is complete, if a
* __clone() method is defined, then the newly created object's __clone()
* method will be called, to allow any necessary properties that need to be
* changed. NOT CALLABLE DIRECTLY.
*
* @return void
* @link https://php.net/manual/en/language.oop5.cloning.php
*/
public function __clone() {
$this->dataSpecification = clone $this->dataSpecification;
}
}
......@@ -155,7 +155,10 @@ class CombinedSqlDataFlow extends SqlDataFlow implements MultipleSourceDataFlows
$fields[] = $outputHandler->getAggregateFieldSpec()->getSqlGroupByStatement($this->getName());
}
foreach($this->sourceDataFlowDescriptions as $sourceDataFlowDescription) {
$fields = array_merge($fields, $sourceDataFlowDescription->getDataFlow()->getFieldsForGroupByStatement());
if (!$sourceDataFlowDescription->getDataFlow() instanceof SubqueryDataFlow) {
$fields = array_merge($fields, $sourceDataFlowDescription->getDataFlow()
->getFieldsForGroupByStatement());
}
}
return $fields;
}
......@@ -213,8 +216,11 @@ class CombinedSqlDataFlow extends SqlDataFlow implements MultipleSourceDataFlows
$clauses[] = $clause;
}
foreach($this->sourceDataFlowDescriptions as $sourceDataFlowDescription) {
if ($sourceDataFlowDescription->getDataFlow() instanceof SqlDataFlow) {
if ($sourceDataFlowDescription->getDataFlow() instanceof SqlTableDataFlow) {
foreach($sourceDataFlowDescription->getDataFlow()->getWhereClauses() as $clause) {
if ($clause instanceof SqlDataFlow\WhereClauseInterface) {
}
$clauses[] = $clause;
}
}
......@@ -255,5 +261,23 @@ class CombinedSqlDataFlow extends SqlDataFlow implements MultipleSourceDataFlows
return $this;
}
/**
* When an object is cloned, PHP 5 will perform a shallow copy of all of the
* object's properties. Any properties that are references to other
* variables, will remain references. Once the cloning is complete, if a
* __clone() method is defined, then the newly created object's __clone()
* method will be called, to allow any necessary properties that need to be
* changed. NOT CALLABLE DIRECTLY.
*
* @return void
* @link https://php.net/manual/en/language.oop5.cloning.php
*/
public function __clone() {
$sourceDataFlowDescriptions = array();
foreach($this->sourceDataFlowDescriptions as $sourceDataFlowDescription) {
$sourceDataFlowDescriptions[] = clone $sourceDataFlowDescription;
}
$this->sourceDataFlowDescriptions = $sourceDataFlowDescriptions;
}
}
......@@ -7,6 +7,7 @@
namespace Civi\DataProcessor\DataFlow\CombinedDataFlow;
use Civi\DataProcessor\DataFlow\EndOfFlowException;
use Civi\DataProcessor\DataFlow\SqlTableDataFlow;
use Civi\DataProcessor\DataSpecification\DataSpecification;
use Civi\DataProcessor\DataSpecification\SqlFieldSpecification;
......@@ -58,15 +59,16 @@ class SubqueryDataFlow extends CombinedSqlDataFlow {
$fromStatements[] = $joinStatement;
}
}
if ($sourceDataFlowDescription->getDataFlow() instanceof CombinedSqlDataFlow) {
if ($sourceDataFlowDescription->getDataFlow() instanceof SubqueryDataFlow) {
$fromStatements = array_merge($fromStatements, $sourceDataFlowDescription->getDataFlow()->getJoinStatement(0));
}
}
$from = implode(" ", $fromStatements);
$select = implode(", ", $fields);
$where = $this->getWhereStatement();
$groupBy = $this->getGroupByStatement();
return "(SELECT {$select} {$from} {$groupBy}) `{$this->getPrimaryTableAlias()}`";
return "(SELECT {$select} {$from} {$where} {$groupBy}) `{$this->getPrimaryTableAlias()}`";
}
/**
......@@ -121,4 +123,5 @@ class SubqueryDataFlow extends CombinedSqlDataFlow {
return $record;
}
}
......@@ -16,7 +16,7 @@ class DataFlowDescription {
/**
* @var \Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinInterface
*/
protected $joinSpecification = array();
protected $joinSpecification = null;
public function __construct($datFlow, $joinSpecification = null) {
$this->dataFlow = $datFlow;
......@@ -38,4 +38,24 @@ class DataFlowDescription {
return $this->joinSpecification;
}
}
\ No newline at end of file
/**
* When an object is cloned, PHP 5 will perform a shallow copy of all of the
* object's properties. Any properties that are references to other
* variables, will remain references. Once the cloning is complete, if a
* __clone() method is defined, then the newly created object's __clone()
* method will be called, to allow any necessary properties that need to be
* changed. NOT CALLABLE DIRECTLY.
*
* @return void
* @link https://php.net/manual/en/language.oop5.cloning.php
*/
public function __clone() {
if ($this->dataFlow) {
$this->dataFlow = clone $this->dataFlow;
}
if ($this->joinSpecification) {
$this->joinSpecification = clone $this->joinSpecification;
}
}
}
......@@ -80,7 +80,7 @@ class SimpleNonRequiredJoin extends SimpleJoin {
if ($dataFlow instanceof SqlDataFlow) {
$whereClauses = $dataFlow->getWhereClauses();
foreach($whereClauses as $whereClause) {
if ($whereClause->isJoinClause()) {
if ($whereClause->isJoinClause() && $whereClause) {
$this->filterClauses[] = $whereClause;
$dataFlow->removeWhereClause($whereClause);
}
......
......@@ -124,8 +124,6 @@ abstract class SqlDataFlow extends AbstractDataFlow {
$countName = 'count_'.$this->getName();
$sql = "{$selectAndFrom} {$where} {$groupBy} {$orderBy}";
$countSql = "SELECT COUNT(*) AS count FROM ({$sql}) `{$countName}`";
//$countSql = "SELECT COUNT(*) AS `count` {$from} {$where} {$groupBy}";
$this->sqlCountStatements[] = $countSql;
$countDao = \CRM_Core_DAO::executeQuery($countSql);
$this->count = 0;
......@@ -243,7 +241,9 @@ abstract class SqlDataFlow extends AbstractDataFlow {
public function getWhereStatement() {
$clauses = array("1");
foreach($this->getWhereClauses() as $clause) {
$clauses[] = $clause->getWhereClause();
if (!$clause->isJoinClause()) {
$clauses[] = $clause->getWhereClause();
}
}
return "WHERE ". implode(" AND ", $clauses);
}
......
......@@ -130,4 +130,11 @@ class DataSpecification {
return $this;
}
/**
* Remove all fields from this specification.
*/
public function clear() {
$this->fields = array();
}
}
......@@ -146,6 +146,17 @@ abstract class AbstractCivicrmEntitySource extends AbstractSource {
return parent::setSourceName($name);
}
protected function getEntityTableAlias() {
if ($this->entityDataFlow instanceof SqlTableDataFlow) {
$entityTableAlias = $this->entityDataFlow->getTableAlias();
} elseif ($this->entityDataFlow instanceof CombinedSqlDataFlow) {
$entityTableAlias = $this->entityDataFlow->getPrimaryTableAlias();
} else {
throw new \Exception('No entity data flow set.');
}
return $entityTableAlias;
}
/**
* @return \Civi\DataProcessor\DataFlow\SqlDataFlow
*/
......@@ -154,39 +165,11 @@ abstract class AbstractCivicrmEntitySource extends AbstractSource {
$this->entityDataFlow = new SqlTableDataFlow($this->getTable(), $this->getSourceName());
}
if ($this->isAggregationEnabled()) {
$start_date_field_spec = clone $this->getAvailableFields()->getFieldSpecificationByName($this->getAggregateField());
$start_date_field_spec->setMySqlFunction($this->getAggregateFunction());
$start_date_field_spec->alias = $this->getAggregateField();
$groupByFields = array();
foreach($this->configuration['aggregate_by'] as $aggregate_by) {
$field = clone $this->getAvailableFields()->getFieldSpecificationByName($aggregate_by);
$field->alias = $aggregate_by;
$groupByFields[] = $field;
}
$this->getAggregationDataFlow();
if (!$this->aggregationDateFlow) {
$aggretaed_membership_table_dataflow = new SqlTableDataFlow($this->getTable(), '_aggregated_'.$this->getSourceName());
$this->aggregationDateFlow = new SubqueryDataFlow('', $this->getTable(), '_aggregated_'.$this->getSourceName());
$this->aggregationDateFlow->addSourceDataFlow(new DataFlowDescription($aggretaed_membership_table_dataflow));
$aggretaed_membership_table_dataflow->getDataSpecification()->addFieldSpecification($start_date_field_spec->name, $start_date_field_spec);
foreach($groupByFields as $groupByField) {
$aggretaed_membership_table_dataflow->getDataSpecification()
->addFieldSpecification($groupByField->name, $groupByField);
$aggretaed_membership_table_dataflow->getGroupByDataSpecification()
->addFieldSpecification($groupByField->name, $groupByField);
}
}
$dataFlow = new CombinedSqlDataFlow('', $aggretaed_membership_table_dataflow->getTable(), $aggretaed_membership_table_dataflow->getTableAlias());
$dataFlow = new CombinedSqlDataFlow('', $this->aggregationDateFlow->getPrimaryTable(), $this->aggregationDateFlow->getPrimaryTableAlias());
$dataFlow->addSourceDataFlow(new DataFlowDescription($this->aggregationDateFlow));
$join = new SimpleNonRequiredJoin($this->entityDataFlow->getTableAlias(), $this->getAggregateField(), $aggretaed_membership_table_dataflow->getTableAlias(), $this->getAggregateField(), 'INNER');
foreach($groupByFields as $groupByField) {
$join->addFilterClause(new PureSqlStatementClause("`{$this->entityDataFlow->getTableAlias()}`.`{$groupByField->alias}` = `{$aggretaed_membership_table_dataflow->getTableAlias()}`.`{$groupByField->alias}`", TRUE));
}
$join->setDataProcessor($this->dataProcessor);
$dataFlowDescription = new DataFlowDescription($this->entityDataFlow, $join);
$dataFlowDescription = new DataFlowDescription($this->entityDataFlow, $this->getAggregationJoin($this->getEntityTableAlias()));
$dataFlow->addSourceDataFlow($dataFlowDescription);
return $dataFlow;
} else {
......@@ -194,6 +177,59 @@ abstract class AbstractCivicrmEntitySource extends AbstractSource {
}
}
protected function getAggregationDataFlow() {
$groupByFields = array();
foreach($this->configuration['aggregate_by'] as $aggregate_by) {
$field = clone $this->getAvailableFields()->getFieldSpecificationByName($aggregate_by);
$field->alias = $aggregate_by;
$groupByFields[] = $field;
}
if (!$this->aggregationDateFlow) {
$aggrgeate_field_spec = clone $this->getAvailableFields()->getFieldSpecificationByName($this->getAggregateField());
$aggrgeate_field_spec->setMySqlFunction($this->getAggregateFunction());
$aggrgeate_field_spec->alias = $this->getAggregateField();
$aggretated_table_dataflow = new SqlTableDataFlow($this->getTable(), '_aggregated_'.$this->getSourceName());
$aggretated_table_dataflow->getDataSpecification()->addFieldSpecification($aggrgeate_field_spec->name, $aggrgeate_field_spec);
foreach ($groupByFields as $groupByField) {
$aggretated_table_dataflow->getDataSpecification()
->addFieldSpecification($groupByField->name, $groupByField);
$aggretated_table_dataflow->getGroupByDataSpecification()
->addFieldSpecification($groupByField->name, $groupByField);
}
$this->aggregationDateFlow = new SubqueryDataFlow('', $this->getTable(), '_aggregated_' . $this->getSourceName());
$this->aggregationDateFlow->addSourceDataFlow(new DataFlowDescription($aggretated_table_dataflow));
}
return $this->aggregationDateFlow;
}
/**
* Returns JOIN specification for joining on the aggregation source.
*
* @param $entityTableAlias
*
* @return \Civi\DataProcessor\DataFlow\MultipleDataFlows\SimpleNonRequiredJoin
* @throws \Exception
*/
protected function getAggregationJoin($entityTableAlias) {
$groupByFields = array();
foreach($this->configuration['aggregate_by'] as $aggregate_by) {
$field = clone $this->getAvailableFields()->getFieldSpecificationByName($aggregate_by);
$field->alias = $aggregate_by;
$groupByFields[] = $field;
}
$join = new SimpleNonRequiredJoin($entityTableAlias, $this->getAggregateField(), $this->aggregationDateFlow->getPrimaryTableAlias(), $this->getAggregateField(), 'INNER');
foreach($groupByFields as $groupByField) {
$join->addFilterClause(new PureSqlStatementClause("`{$entityTableAlias}`.`{$groupByField->alias}` = `{$this->aggregationDateFlow->getPrimaryTableAlias()}`.`{$groupByField->alias}`", TRUE));
}
$join->setDataProcessor($this->dataProcessor);
return $join;
}
/**
* Load the fields from this entity.
*
......@@ -269,12 +305,26 @@ abstract class AbstractCivicrmEntitySource extends AbstractSource {
new SimpleWhereClause($customGroupTableAlias, $spec->customFieldColumnName, $op, $values, $spec->type, TRUE)
);
} else {
$entityDataFlow = $this->ensureEntity();
$entityDataFlow->addWhereClause(new SimpleWhereClause($this->getSourceName(), $spec->name,$op, $values, $spec->type, TRUE));
$this->ensureEntity();
$this->entityDataFlow->addWhereClause(new SimpleWhereClause($this->getSourceName(), $spec->name,$op, $values, $spec->type, TRUE));
$this->addFilterToAggregationDataFlow($spec, $op, $values);
}
}
}
/**
* Add a filter to the aggregation data flow.
*
* @param \Civi\DataProcessor\DataSpecification\FieldSpecification $filter
* @param $op
* @param $values
*/
protected function addFilterToAggregationDataFlow(FieldSpecification $filter, $op, $values) {
if ($this->aggregationDateFlow) {
$this->aggregationDateFlow->addWhereClause(new SimpleWhereClause($this->aggregationDateFlow->getPrimaryTableAlias(), $filter->name,$op, $values, $filter->type, FALSE));
}
}
/**
* Ensure that filter or aggregate field is accesible in the query
*
......@@ -367,12 +417,13 @@ abstract class AbstractCivicrmEntitySource extends AbstractSource {
unset($this->dataFlow);
}
}
if ($this->isAggregationEnabled()) {
if ($join instanceof SimpleJoin && $join->getLeftTable() == $this->entityDataFlow->getTableAlias()) {
if ($join instanceof SimpleJoin && $join->getLeftTable() == $this->getEntityTableAlias()) {
$join->setLeftTable($this->aggregationDateFlow->getPrimaryTableAlias());
$join->setLeftPrefix($this->aggregationDateFlow->getPrimaryTableAlias());
}
elseif ($join instanceof SimpleJoin && $join->getRightTable() == $this->entityDataFlow->getTableAlias()) {
elseif ($join instanceof SimpleJoin && $join->getRightTable() == $this->getEntityTableAlias()) {
$join->setRightTable($this->aggregationDateFlow->getPrimaryTableAlias());
$join->setRightPrefix($this->aggregationDateFlow->getPrimaryTableAlias());
}
......@@ -427,8 +478,8 @@ abstract class AbstractCivicrmEntitySource extends AbstractSource {
$dataFlow->getDataSpecification()->addFieldSpecification($fieldSpecification->alias, $fieldSpecification);
}
} elseif ($originalFieldSpecification) {
$dataFlow = $this->ensureEntity();
$dataFlow->getDataSpecification()->addFieldSpecification($fieldSpecification->alias, $fieldSpecification);
$this->ensureEntity();
$this->entityDataFlow->getDataSpecification()->addFieldSpecification($fieldSpecification->alias, $fieldSpecification);
}
} catch (FieldExistsException $e) {
// Do nothing.
......@@ -516,7 +567,9 @@ abstract class AbstractCivicrmEntitySource extends AbstractSource {
$fields[$field->getName()] = $field->title;
}
$aggregateFunctions = $this->getPossibleAggregateFunctions();
$form->assign('aggregation_available', false);
if (is_array($aggregateFunctions)) {
$form->assign('aggregation_available', true);
$form->add('select', "aggregate_function", E::ts('Aggregate function'), $aggregateFunctions, FALSE, [
'style' => 'min-width:250px',
'class' => 'crm-select2 huge',
......
......@@ -6,11 +6,16 @@
namespace Civi\DataProcessor\Source\Activity;
use Civi\DataProcessor\DataFlow\CombinedDataFlow\CombinedSqlDataFlow;
use Civi\DataProcessor\DataFlow\MultipleDataFlows\DataFlowDescription;
use Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinInterface;
use Civi\DataProcessor\DataFlow\MultipleDataFlows\SimpleJoin;
use Civi\DataProcessor\DataFlow\CombinedDataFlow\SubqueryDataFlow;
use Civi\DataProcessor\DataFlow\SqlDataFlow\SimpleWhereClause;
use Civi\DataProcessor\DataFlow\SqlTableDataFlow;
use Civi\DataProcessor\DataSpecification\CustomFieldSpecification;
use Civi\DataProcessor\DataSpecification\DataSpecification;
use Civi\DataProcessor\DataSpecification\FieldSpecification;
use Civi\DataProcessor\Source\AbstractCivicrmEntitySource;
use Civi\DataProcessor\DataSpecification\Utils as DataSpecificationUtils;
......@@ -95,54 +100,158 @@ class ActivitySource extends AbstractCivicrmEntitySource {
);
}
/**
* Returns an array with possible aggregate functions.
* Return false when aggregation is not possible.
*
* @return array|false
*/
protected function getPossibleAggregateFunctions() {
return [
'max_activity_date_time' => E::ts('Last one by activity date'),
'min_activity_date_time' => E::ts('First one by activity date'),
'max_created_date' => E::ts('Last one by created date'),
'min_created_date' => E::ts('First one by created date'),
'max_modified_date' => E::ts('Last one by modified date'),
'min_modified_date' => E::ts('First one by modified date'),
'max_id' => E::ts('Last one by activity id'),
'min_id' => E::ts('Fist one by activity id'),
];
}
/**
* @return \Civi\DataProcessor\DataFlow\SqlDataFlow
* @throws \Exception
*/
protected function getEntityDataFlow() {
$activityDataDescription = new DataFlowDescription($this->activityDataFlow);
// Create the subquery data flow
if (!$this->entityDataFlow) {
$activityDataDescription = new DataFlowDescription($this->activityDataFlow);
$contactJJoin = new SimpleJoin($this->activityDataFlow->getTableAlias(), 'id', $this->activityContactDataFlow->getTableAlias(), 'activity_id');
$contactJJoin->setDataProcessor($this->dataProcessor);
$activityContactDataDescription = new DataFlowDescription($this->activityContactDataFlow, $contactJJoin);
$contactJoin = new SimpleJoin($this->activityDataFlow->getTableAlias(), 'id', $this->activityContactDataFlow->getTableAlias(), 'activity_id');
$contactJoin->setDataProcessor($this->dataProcessor);
$activityContactDataDescription = new DataFlowDescription($this->activityContactDataFlow, $contactJoin);
$caseJoin = new SimpleJoin($this->activityDataFlow->getTableAlias(), 'id', $this->activityCaseDataFlow->getTableAlias(), 'activity_id', 'LEFT');
$caseJoin->setDataProcessor($this->dataProcessor);
$activityCaseDataDescription = new DataFlowDescription($this->activityCaseDataFlow, $caseJoin);
$caseJoin = new SimpleJoin($this->activityDataFlow->getTableAlias(), 'id', $this->activityCaseDataFlow->getTableAlias(), 'activity_id', 'LEFT');
$caseJoin->setDataProcessor($this->dataProcessor);
$activityCaseDataDescription = new DataFlowDescription($this->activityCaseDataFlow, $caseJoin);
// Create the subquery data flow
$entityDataFlow = new SubqueryDataFlow($this->getSourceName(), $this->getTable(), $this->getSourceName());
$entityDataFlow->addSourceDataFlow($activityDataDescription);
$entityDataFlow->addSourceDataFlow($activityContactDataDescription);
$entityDataFlow->addSourceDataFlow($activityCaseDataDescription);
$this->entityDataFlow = new SubqueryDataFlow($this->getSourceName(), $this->getTable(), $this->getSourceName());
$this->entityDataFlow->addSourceDataFlow($activityDataDescription);
$this->entityDataFlow->addSourceDataFlow($activityContactDataDescription);
$this->entityDataFlow->addSourceDataFlow($activityCaseDataDescription);
}
if (empty($this->configuration['aggregate_function'])) {
return $this->entityDataFlow;
} else {
$this->getAggregationDataFlow();
return $entityDataFlow;
$aggregationDataFlow = new CombinedSqlDataFlow('', $this->aggregationDateFlow->getPrimaryTable(), $this->aggregationDateFlow->getPrimaryTableAlias());
$aggregationDataFlow->addSourceDataFlow(new DataFlowDescription($this->aggregationDateFlow));
$join = $this->getAggregationJoin($this->getSourceName());
$dataFlowDescription = new DataFlowDescription($this->entityDataFlow, $join);
$aggregationDataFlow->addSourceDataFlow($dataFlowDescription);
return $aggregationDataFlow;
}
}
/**
* Ensure that the entity table is added the to the data flow.
* Sets the join specification to connect this source to other data sources.
*
* @return \Civi\DataProcessor\DataFlow\AbstractDataFlow
* @throws \Exception
* @param \Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinInterface $join
*
* @return \Civi\DataProcessor\Source\SourceInterface
*/
protected function ensureEntity() {
if ($this->primaryDataFlow && $this->primaryDataFlow instanceof SubqueryDataFlow && $this->primaryDataFlow->getPrimaryTable() === $this->getTable()) {
return $this->primaryDataFlow;
} elseif (empty($this->primaryDataFlow)) {
$this->primaryDataFlow = $this->getEntityDataFlow();
return $this->primaryDataFlow;
public function setJoin(JoinInterface $join) {
parent::setJoin($join);
if (!empty($this->configuration['aggregate_function'])) {
if ($join instanceof SimpleJoin && $join->getLeftTable() == $this->getEntityTableAlias()) {
$join->setLeftTable($this->aggregationDateFlow->getPrimaryTableAlias());
$join->setLeftPrefix($this->aggregationDateFlow->getPrimaryTableAlias());
}
elseif ($join instanceof SimpleJoin && $join->getRightTable() == $this->getEntityTableAlias()) {
$join->setRightTable($this->aggregationDateFlow->getPrimaryTableAlias());
$join->setRightPrefix($this->aggregationDateFlow->getPrimaryTableAlias());
}
}
return $this;
}
protected function getAggregationDataFlow() {
$groupByFields = array();
foreach($this->configuration['aggregate_by'] as $aggregate_by) {
$field = clone $this->getAvailableFields()->getFieldSpecificationByName($aggregate_by);
$field->alias = $aggregate_by;
if (stripos($field->name, 'activity_contact_') === 0) {
$field->name = substr($field->name, 17);
}
$groupByFields[] = $field;
}
foreach($this->additionalDataFlowDescriptions as $additionalDataFlowDescription) {
if ($additionalDataFlowDescription->getDataFlow()->getTable() == $this->getTable()) {
return $additionalDataFlowDescription->getDataFlow();
if (!$this->aggregationDateFlow) {
$activityDataFlow = new SqlTableDataFlow($this->getTable(), $this->getSourceName().'_activity_aggregate_');
$activityContactDataFlow = new SqlTableDataFlow('civicrm_activity_contact', $this->getSourceName().'_activity_contact_aggregate_');
$activityDataDescription = new DataFlowDescription($activityDataFlow);
$contactJoin = new SimpleJoin($activityDataFlow->getTableAlias(), 'id', $activityContactDataFlow->getTableAlias(), 'activity_id');
$contactJoin->setDataProcessor($this->dataProcessor);
$activityContactDataDescription = new DataFlowDescription($activityContactDataFlow, $contactJoin);
$aggrgeate_field_spec = clone $this->getAvailableFields()->getFieldSpecificationByName($this->getAggregateField());
$aggrgeate_field_spec->setMySqlFunction($this->getAggregateFunction());
$aggrgeate_field_spec->alias = $this->getAggregateField();
$aggretated_table_dataflow = new CombinedSqlDataFlow('', $this->getTable(), '_aggregated_'.$this->getSourceName());
$aggretated_table_dataflow->addSourceDataFlow($activityDataDescription);
$aggretated_table_dataflow->addSourceDataFlow($activityContactDataDescription);
$activityDataFlow->getDataSpecification()->addFieldSpecification($aggrgeate_field_spec->name, $aggrgeate_field_spec);
foreach ($groupByFields as $groupByField) {
if (stripos($groupByField->alias, 'activity_contact_') === 0) {
$activityContactDataFlow->getDataSpecification()
->addFieldSpecification($groupByField->name, $groupByField);
$activityContactDataFlow->getGroupByDataSpecification()
->addFieldSpecification($groupByField->name, $groupByField);
} else {
$activityDataFlow->getDataSpecification()
->addFieldSpecification($groupByField->name, $groupByField);
$activityDataFlow->getGroupByDataSpecification()
->addFieldSpecification($groupByField->name, $groupByField);
}
}
$this->aggregationDateFlow = new SubqueryDataFlow('', $activityDataFlow->getTable(), $activityDataFlow->getTableAlias());
$this->aggregationDateFlow->addSourceDataFlow(new DataFlowDescription($aggretated_table_dataflow));
}
return $this->aggregationDateFlow;
}
/**
* Add a filter to the aggregation data flow.
*
* @param \Civi\DataProcessor\DataSpecification\FieldSpecification $filter
* @param $op
* @param $values
*/
protected function addFilterToAggregationDataFlow(FieldSpecification $filter, $op, $values) {
if ($this->aggregationDateFlow) {
if (stripos($filter->name, 'activity_contact_') === 0) {
$name = str_replace('activity_contact_', '', $filter->name);
$this->aggregationDateFlow->addWhereClause(new SimpleWhereClause($this->getSourceName().'_activity_contact_aggregate_', $name, $op, $values, $filter->type, FALSE));
} else {
$this->aggregationDateFlow->addWhereClause(new SimpleWhereClause($this->aggregationDateFlow->getPrimaryTableAlias(), $filter->name, $op, $values, $filter->type, FALSE));
}
}
$entityDataFlow = $this->getEntityDataFlow();
$join = new SimpleJoin($this->getSourceName(), 'id', $this->getSourceName(), 'entity_id', 'LEFT');
$join->setDataProcessor($this->dataProcessor);
$additionalDataFlowDescription = new DataFlowDescription($entityDataFlow,$join);
$this->additionalDataFlowDescriptions[] = $additionalDataFlowDescription;
return $additionalDataFlowDescription->getDataFlow();
}
/**
* Returns true when aggregation is enabled for this data source.
*
* @return bool
*/
protected function isAggregationEnabled() {
return false;
}
/**
......
......@@ -54,12 +54,14 @@ class MembershipSource extends AbstractCivicrmEntitySource {
*/
protected function getPossibleAggregateFunctions() {
return [
'max_start_date' => E::ts('Most recent by start date'),
'min_start_date' => E::ts('Least recent by start date'),
'max_end_date' => E::ts('Most recent by start date'),
'min_end_date' => E::ts('Least recent by start date'),
'max_join_date' => E::ts('Most recent by start date'),
'min_join_date' => E::ts('Least recent by start date'),
'max_start_date' => E::ts('Last one by start date'),
'min_start_date' => E::ts('First one by start date'),
'max_end_date' => E::ts('Last one by start date'),
'min_end_date' => E::ts('First one by start date'),
'max_join_date' => E::ts('Last one by start date'),
'min_join_date' => E::ts('First one by start date'),
'max_id' => E::ts('Last one by membership id'),
'min_id' => E::ts('Fist one by membership id'),
];
}
......
......@@ -2,6 +2,7 @@
{include file="CRM/Dataprocessor/Form/Source/Configuration.tpl"}
{if $aggregation_available}
<div class="crm-accordion-wrapper collapsed">
<div class="crm-accordion-header">{ts}Aggregation{/ts}</div>
<div class="crm-accordion-body">
......@@ -12,10 +13,13 @@
</tr>
<tr class="report-contents crm-report crm-report-criteria-filter">
<td class="report-contents">{$form.aggregate_by.label}</td>
<td>{$form.aggregate_by.html}</td>
<td>{$form.aggregate_by.html}
<p class="description">{ts}Usually you want to aggregate by something like contact id.{/ts}</p>
</td>
</tr>
</table>
</div>
</div>
{/if}
{/crmScope}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment