Commit f9f7d7a3 authored by jaapjansma's avatar jaapjansma

Improved In Memory Dataflow so that joins and filters would work.

parent 82c2d601
......@@ -7,6 +7,7 @@
* Added data source for note
* Refactored API Output to an Abstract Class so that it is easy for extension developers to develop their own implementation.
* Added Markup/Html Field Value output field handler.
* Improved In Memory Dataflow so that joins and filters would work.
# Version 1.6.0
......
......@@ -56,7 +56,7 @@ class CRM_Dataprocessor_Utils_DataSourceFields {
* Returns an array with the name of the field as the key and the label of the field as the value.
*
* @oaram int $dataProcessorId
* @param callable $callback
* @param callable $filterFieldsCallback
* Function to filter certain fields.
* @return array
* @throws \Exception
......@@ -66,7 +66,7 @@ class CRM_Dataprocessor_Utils_DataSourceFields {
$dataProcessorClass = \CRM_Dataprocessor_BAO_DataProcessor::dataProcessorToClass($dataProcessor);
$fieldSelect = array();
foreach($dataProcessorClass->getDataSources() as $dataSource) {
$fieldSelect = array_merge($fieldSelect, self::getAvailableFilterFieldsInDataSource($dataSource, $dataSource->getSourceTitle().' :: ', $dataSource->getSourceName().'::', $filterFieldsCallback));
$fieldSelect = array_merge($fieldSelect, self::getAvailableFilterFieldsInDataSource($dataSource, $dataSource->getSourceTitle() . ' :: ', $dataSource->getSourceName() . '::', $filterFieldsCallback));
}
return $fieldSelect;
}
......@@ -87,7 +87,7 @@ class CRM_Dataprocessor_Utils_DataSourceFields {
foreach($dataSource->getAvailableFilterFields()->getFields() as $field) {
$isFieldValid = true;
if ($filterFieldsCallback) {
$isFieldValid = call_user_func($filterFieldsCallback, $field);
$isFieldValid = call_user_func($filterFieldsCallback, $field, $dataSource);
}
if ($isFieldValid) {
$fieldSelect[$namePrefix . $field->alias] = $titlePrefix . $field->title;
......
......@@ -6,6 +6,8 @@
namespace Civi\DataProcessor\DataFlow;
use Civi\DataProcessor\DataFlow\InMemoryDataFlow\FilterInterface;
class InMemoryDataFlow extends AbstractDataFlow {
protected $data = [];
......@@ -14,7 +16,12 @@ class InMemoryDataFlow extends AbstractDataFlow {
protected $isInitialized = FALSE;
public function __construct($data) {
/**
* @var \Civi\DataProcessor\DataFlow\InMemoryDataFlow\FilterInterface[]
*/
protected $filters = [];
public function __construct($data=null) {
parent::__construct();
$this->data = $data;
}
......@@ -42,7 +49,7 @@ class InMemoryDataFlow extends AbstractDataFlow {
return $this->isInitialized;
}
/**
/**
* Resets the initialized state. This function is called
* when a setting has changed. E.g. when offset or limit are set.
*
......@@ -62,20 +69,40 @@ class InMemoryDataFlow extends AbstractDataFlow {
* @throws EndOfFlowException
*/
public function retrieveNextRecord($fieldNameprefix='') {
if (!isset($this->data[$this->currentPointer])) {
throw new EndOfFlowException();
if (!$this->isInitialized()) {
$this->initialize();
}
$data = $this->data[$this->currentPointer];
$record = array();
foreach($this->dataSpecification->getFields() as $field) {
$alias = $field->alias;
$name = $field->name;
$record[$fieldNameprefix.$alias] = $data[$name];
}
$this->currentPointer++;
do {
if (!isset($this->data[$this->currentPointer])) {
throw new EndOfFlowException();
}
$data = $this->data[$this->currentPointer];
$record = [];
foreach ($this->dataSpecification->getFields() as $field) {
$alias = $field->alias;
$name = $field->name;
$record[$fieldNameprefix . $alias] = $data[$name];
}
$this->currentPointer++;
} while (!$this->filterRecord($record));
return $record;
}
/**
* @param $record
*
* @return bool
*/
protected function filterRecord($record) {
foreach($this->filters as $filter) {
if (!$filter->filterRecord($record)) {
return FALSE;
}
}
return TRUE;
}
/**
* Returns a name for this data flow.
*
......@@ -85,4 +112,34 @@ class InMemoryDataFlow extends AbstractDataFlow {
return 'in_memory';
}
/**
* Adds a filter to the data flow.
*
* @param \Civi\DataProcessor\DataFlow\InMemoryDataFlow\FilterInterface $filter
*/
public function addFilter(FilterInterface $filter) {
if (!in_array($filter, $this->filters)) {
$this->filters[] = $filter;
}
}
/**
* Removes a filter from the data flow.
*
* @param \Civi\DataProcessor\DataFlow\InMemoryDataFlow\FilterInterface $filter
*/
public function removeFilter(FilterInterface $filter) {
$key = array_search($filter, $this->filters);
if ($key!==false) {
unset($this->filters[$key]);
}
}
/**
* @return \Civi\DataProcessor\DataFlow\InMemoryDataFlow\FilterInterface[]
*/
public function getFilters() {
return $this->filters;
}
}
<?php
/**
* @author Jaap Jansma <jaap.jansma@civicoop.org>
* @license AGPL-3.0
*/
namespace Civi\DataProcessor\DataFlow\InMemoryDataFlow;
use Civi\DataProcessor\DataFlow\InMemoryDataFlow;
class CallbackDataFlow extends InMemoryDataFlow {
protected $callback;
public function __construct(callable $callback) {
parent::__construct();
$this->callback = $callback;
}
public function initialize() {
parent::initialize();
if (!is_array($this->data)) {
$this->data = call_user_func($this->callback, $this);
}
}
public function resetInitializeState() {
parent::resetInitializeState();
$this->data = null;
}
}
<?php
/**
* @author Jaap Jansma <jaap.jansma@civicoop.org>
* @license AGPL-3.0
*/
namespace Civi\DataProcessor\DataFlow\InMemoryDataFlow;
interface FilterInterface {
/**
* Returns true when the record is in the filter.
* Returns false when the reocrd is not in the filter.
*
* @param $record
*
* @return bool
*/
public function filterRecord($record);
}
<?php
/**
* @author Jaap Jansma <jaap.jansma@civicoop.org>
* @license AGPL-3.0
*/
namespace Civi\DataProcessor\DataFlow\InMemoryDataFlow;
class SimpleFilter implements FilterInterface {
protected $field;
protected $operator;
protected $value;
public function __construct($field, $operator, $value) {
if (is_array($value)) {
switch ($operator) {
case '=':
$operator = 'IN';
break;
case '!=':
$operator = 'NOT IN';
break;
}
}
$this->field = $field;
$this->operator = $operator;
if ($operator == 'IS NULL' || $operator == 'IS NOT NULL') {
$this->value = NULL;
} else {
$this->value = $value;
}
}
/**
* Returns true when the record is in the filter.
* Returns false when the reocrd is not in the filter.
*
* @param $record
*
* @return bool
*/
public function filterRecord($record) {
switch ($this->operator) {
case 'IS NULL':
return !(isset($record[$this->field]) && $record[$this->field]!='');
break;
case 'IS NOT NULL':
return (isset($record[$this->field]) && $record[$this->field]!='');
break;
case '=':
return (isset($record[$this->field]) && $record[$this->field]==$this->value);
break;
case '!=':
return (!isset($record[$this->field]) || $record[$this->field]!=$this->value);
break;
case '>':
return (isset($record[$this->field]) && $record[$this->field]>$this->value);
break;
case '<':
return (isset($record[$this->field]) && $record[$this->field]<$this->value);
break;
case '>=':
return (isset($record[$this->field]) && $record[$this->field]>=$this->value);
break;
case '<=':
return (isset($record[$this->field]) && $record[$this->field]<=$this->value);
break;
case 'IN':
return (isset($record[$this->field]) && in_array($record[$this->field], $this->value));
break;
case 'NOT IN':
return !(isset($record[$this->field]) && in_array($record[$this->field], $this->value));
break;
}
}
public function getField() {
return $this->field;
}
public function getOperator() {
return $this->operator;
}
public function getValue() {
return $this->value;
}
}
......@@ -10,6 +10,7 @@ use Civi\DataProcessor\DataFlow\AbstractDataFlow;
use Civi\DataProcessor\DataFlow\CombinedDataFlow\CombinedSqlDataFlow;
use Civi\DataProcessor\DataFlow\SqlDataFlow;
use Civi\DataProcessor\DataFlow\SqlTableDataFlow;
use Civi\DataProcessor\DataSpecification\FieldExistsException;
use Civi\DataProcessor\ProcessorType\AbstractProcessorType;
use Civi\DataProcessor\Source\SourceInterface;
......@@ -348,8 +349,8 @@ class SimpleJoin implements JoinInterface, SqlJoinInterface {
foreach ($left_record_set as $left_index => $left_record) {
$is_record_present_in_right_set = FALSE;
foreach ($right_record_set as $right_index => $right_record) {
if (isset($left_record[$this->left_field_alias]) && isset($right_record[$this->right_field_alias])) {
if ($left_record[$this->left_field_alias] == $right_record[$this->right_field_alias]) {
if (isset($left_record[$this->right_field_alias]) && isset($right_record[$this->left_field_alias])) {
if ($left_record[$this->right_field_alias] == $right_record[$this->left_field_alias]) {
$joined_record_set[] = array_merge($left_record, $right_record);
$is_record_present_in_right_set = TRUE;
}
......@@ -363,8 +364,8 @@ class SimpleJoin implements JoinInterface, SqlJoinInterface {
foreach ($right_record_set as $right_index => $right_record) {
$is_record_present_in_left_set = FALSE;
foreach ($left_record_set as $left_index => $left_record) {
if (isset($left_record[$this->left_field_alias]) && isset($right_record[$this->right_field_alias])) {
if ($left_record[$this->left_field_alias] == $right_record[$this->right_field_alias]) {
if (isset($left_record[$this->right_field_alias]) && isset($right_record[$this->left_field_alias])) {
if ($left_record[$this->right_field_alias] == $right_record[$this->left_field_alias]) {
$joined_record_set[] = array_merge($left_record, $right_record);
$is_record_present_in_left_set = TRUE;
}
......@@ -395,14 +396,15 @@ class SimpleJoin implements JoinInterface, SqlJoinInterface {
$table = $rightDataFlow->getTableAlias();
$this->rightClause = new SqlDataFlow\OrClause();
foreach ($left_record_set as $left_record) {
if (isset($left_record[$this->left_field_alias])) {
$value = $left_record[$this->left_field_alias];
$this->rightClause->addWhereClause(new SqlDataFlow\SimpleWhereClause($table, $this->right_field, '=', $value));
if (isset($left_record[$this->right_field_alias])) {
$value = $left_record[$this->right_field_alias];
$this->rightClause->addWhereClause(new SqlDataFlow\SimpleWhereClause($table, $this->leftFieldSpec->name, '=', $value));
// Make sure the join field is also available in the select statement of the query.
if (!$rightDataFlow->getDataSpecification()->doesFieldExist($this->right_field_alias)) {
$rightDataFlow->getDataSpecification()
->addFieldSpecification($this->right_field_alias, $this->rightFieldSpec);
try {
$rightDataFlow->getDataSpecification()->addFieldSpecification($this->left_field_alias, $this->leftFieldSpec);
} catch (FieldExistsException $e) {
// Do nothing.
}
}
}
......
......@@ -33,11 +33,6 @@ class DataSpecification {
throw new FieldExistsException($name);
}
$this->fields[] = $field;
/*if (isset($this->fields[$name])) {
throw new FieldExistsException($name);
}
$this->fields[$name] = $field;*/
return $this;
}
......
......@@ -6,6 +6,7 @@
namespace Civi\DataProcessor\FilterHandler;
use Civi\DataProcessor\DataFlow\InMemoryDataFlow;
use Civi\DataProcessor\DataFlow\SqlDataFlow;
use Civi\DataProcessor\Exception\DataSourceNotFoundException;
use Civi\DataProcessor\Exception\FieldNotFoundException;
......@@ -33,6 +34,12 @@ abstract class AbstractFieldFilterHandler extends AbstractFilterHandler {
*/
protected $whereClause;
/**
* @var \Civi\DataProcessor\DataFlow\InMemoryDataFlow\SimpleFilter
*/
protected $filterClass;
/**
* @param $datasource_name
* @param $field_name
......@@ -83,6 +90,9 @@ abstract class AbstractFieldFilterHandler extends AbstractFilterHandler {
if ($dataFlow && $dataFlow instanceof SqlDataFlow && $this->whereClause) {
$dataFlow->removeWhereClause($this->whereClause);
unset($this->whereClause);
} elseif ($dataFlow && $dataFlow instanceof InMemoryDataFlow && $this->filterClass) {
$dataFlow->removeFilter($this->filterClass);
unset($this->filter);
}
}
......@@ -102,6 +112,9 @@ abstract class AbstractFieldFilterHandler extends AbstractFilterHandler {
}
$this->whereClause = new SqlDataFlow\SimpleWhereClause($dataFlow->getName(), $this->inputFieldSpecification->name, $filter['op'], $value, $this->inputFieldSpecification->type);
$dataFlow->addWhereClause($this->whereClause);
} elseif ($dataFlow && $dataFlow instanceof InMemoryDataFlow) {
$this->filterClass = new InMemoryDataFlow\SimpleFilter($this->inputFieldSpecification->name, $filter['op'], $filter['value']);
$dataFlow->addFilter($this->filterClass);
}
}
......
......@@ -6,6 +6,7 @@
namespace Civi\DataProcessor\FilterHandler;
use Civi\DataProcessor\DataFlow\InMemoryDataFlow;
use Civi\DataProcessor\DataFlow\SqlDataFlow;
use Civi\DataProcessor\DataSpecification\CustomFieldSpecification;
use Civi\DataProcessor\Exception\InvalidConfigurationException;
......@@ -42,6 +43,9 @@ class SimpleSqlFilter extends AbstractFieldFilterHandler {
$this->whereClause = new SqlDataFlow\SimpleWhereClause($dataFlow->getName(), $this->inputFieldSpecification->name, $filter['op'], $filter['value'], $this->inputFieldSpecification->type);
}
$dataFlow->addWhereClause($this->whereClause);
} elseif ($dataFlow && $dataFlow instanceof InMemoryDataFlow && !$this->isMultiValueField()) {
$this->filterClass = new InMemoryDataFlow\SimpleFilter($this->inputFieldSpecification->name, $filter['op'], $filter['value']);
$dataFlow->addFilter($this->filterClass);
}
}
......
......@@ -7,6 +7,7 @@
namespace Civi\DataProcessor\Source;
use Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinInterface;
use Civi\DataProcessor\DataSpecification\FieldExistsException;
use Civi\DataProcessor\DataSpecification\FieldSpecification;
use Civi\DataProcessor\ProcessorType\AbstractProcessorType;
......@@ -134,8 +135,12 @@ abstract class AbstractSource implements SourceInterface {
public function ensureField(FieldSpecification $field) {
$field = $this->getAvailableFields()->getFieldSpecificationByAlias($field->alias);
if ($field) {
$this->dataFlow->getDataSpecification()
->addFieldSpecification($field->name, $field);
try {
$this->dataFlow->getDataSpecification()
->addFieldSpecification($field->name, $field);
} catch (FieldExistsException $e) {
// Do nothing.
}
}
return $this->dataFlow;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment