Skip to content
Snippets Groups Projects
Commit 047f2993 authored by jaapjansma's avatar jaapjansma
Browse files

Merge branch 'csv'

parents 72a269aa e3d39cb9
No related branches found
No related tags found
No related merge requests found
Showing
with 796 additions and 145 deletions
<?php
/**
* @author Jaap Jansma <jaap.jansma@civicoop.org>
* @license AGPL-3.0
*/
use CRM_Dataprocessor_ExtensionUtil as E;
class CRM_Dataprocessor_Form_Source_BaseForm extends CRM_Core_Form {
protected $dataProcessorId;
protected $source_id;
/**
* @var \Civi\DataProcessor\Source\SourceInterface
*/
protected $sourceClass;
/**
* @var array
* The source object
*/
protected $source;
/**
* Function to perform processing before displaying form (overrides parent function)
*
* @access public
*/
function preProcess() {
$session = CRM_Core_Session::singleton();
$this->dataProcessorId = CRM_Utils_Request::retrieve('data_processor_id', 'Integer');
$this->assign('data_processor_id', $this->dataProcessorId);
$this->source_id = CRM_Utils_Request::retrieve('source_id', 'Integer', CRM_Core_DAO::$_nullObject, TRUE);
$this->assign('source_id', $this->source_id);
$source = CRM_Dataprocessor_BAO_Source::getValues(array('id' => $this->source_id));
$this->source = $source[$this->source_id];
$this->assign('source', $this->source);
$factory = dataprocessor_get_factory();
$this->sourceClass = $factory->getDataSourceByName($this->source['type']);
$this->sourceClass->setSourceName($this->source['name']);
$this->sourceClass->setSourceTitle($this->source['title']);
$title = E::ts('Data Processor Source Configuration');
CRM_Utils_System::setTitle($title);
$url = CRM_Utils_System::url('civicrm/dataprocessor/form/edit', array('id' => $this->dataProcessorId, 'action' => 'update', 'reset' => 1));
$session->pushUserContext($url);
}
public function buildQuickForm() {
$this->add('hidden', 'data_processor_id');
$this->add('hidden', 'source_id');
$this->addButtons(array(
array('type' => 'next', 'name' => E::ts('Save'), 'isDefault' => TRUE,),
array('type' => 'cancel', 'name' => E::ts('Cancel'))
));
parent::buildQuickForm();
}
function setDefaultValues() {
$defaults = [];
$defaults['data_processor_id'] = $this->dataProcessorId;
$defaults['source_id'] = $this->source_id;
return $defaults;
}
public function postProcess() {
$session = CRM_Core_Session::singleton();
CRM_Utils_System::redirect($session->readUserContext());
parent::postProcess();
}
}
\ No newline at end of file
......@@ -6,70 +6,16 @@
use CRM_Dataprocessor_ExtensionUtil as E;
class CRM_Dataprocessor_Form_Source_Configuration extends CRM_Core_Form {
protected $dataProcessorId;
protected $source_id;
/**
* @var \Civi\DataProcessor\Source\SourceInterface
*/
protected $sourceClass;
/**
* @var array
* The source object
*/
protected $source;
/**
* Function to perform processing before displaying form (overrides parent function)
*
* @access public
*/
function preProcess() {
$session = CRM_Core_Session::singleton();
$this->dataProcessorId = CRM_Utils_Request::retrieve('data_processor_id', 'Integer');
$this->assign('data_processor_id', $this->dataProcessorId);
$this->source_id = CRM_Utils_Request::retrieve('source_id', 'Integer', CRM_Core_DAO::$_nullObject, TRUE);
$this->assign('source_id', $this->source_id);
$source = CRM_Dataprocessor_BAO_Source::getValues(array('id' => $this->source_id));
$this->source = $source[$this->source_id];
$this->assign('source', $this->source);
$factory = dataprocessor_get_factory();
$this->sourceClass = $factory->getDataSourceByName($this->source['type']);
$this->sourceClass->setSourceName($this->source['name']);
$this->sourceClass->setSourceTitle($this->source['title']);
$title = E::ts('Data Processor Source Configuration');
CRM_Utils_System::setTitle($title);
$url = CRM_Utils_System::url('civicrm/dataprocessor/form/edit', array('id' => $this->dataProcessorId, 'action' => 'update', 'reset' => 1));
$session->pushUserContext($url);
}
class CRM_Dataprocessor_Form_Source_Configuration extends CRM_Dataprocessor_Form_Source_BaseForm {
public function buildQuickForm() {
$this->add('hidden', 'data_processor_id');
$this->add('hidden', 'source_id');
parent::buildQuickForm();
$this->addFieldsForFiltering();
$this->addButtons(array(
array('type' => 'next', 'name' => E::ts('Save'), 'isDefault' => TRUE,),
array('type' => 'cancel', 'name' => E::ts('Cancel'))
));
parent::buildQuickForm();
}
function setDefaultValues() {
$defaults = [];
$defaults['data_processor_id'] = $this->dataProcessorId;
$defaults['source_id'] = $this->source_id;
$defaults = parent::setDefaultValues();
if (isset($this->source['configuration']['filter'])) {
foreach($this->source['configuration']['filter'] as $alias => $filter) {
$defaults[$alias.'_op'] = $filter['op'];
......@@ -81,9 +27,6 @@ class CRM_Dataprocessor_Form_Source_Configuration extends CRM_Core_Form {
}
public function postProcess() {
$session = CRM_Core_Session::singleton();
$values = $this->exportValues();
if ($this->dataProcessorId) {
$params['data_processor_id'] = $this->dataProcessorId;
}
......@@ -93,7 +36,6 @@ class CRM_Dataprocessor_Form_Source_Configuration extends CRM_Core_Form {
$params['configuration']['filter'] = $this->postProcessFieldsForFiltering();
CRM_Dataprocessor_BAO_Source::add($params);
CRM_Utils_System::redirect($session->readUserContext());
parent::postProcess();
}
......
<?php
/**
* @author Jaap Jansma <jaap.jansma@civicoop.org>
* @license AGPL-3.0
*/
use CRM_Dataprocessor_ExtensionUtil as E;
/**
* Form controller class
*
* @see https://wiki.civicrm.org/confluence/display/CRMDOC/QuickForm+Reference
*/
class CRM_Dataprocessor_Form_Source_Csv extends CRM_Dataprocessor_Form_Source_BaseForm {
public function buildQuickForm() {
parent::buildQuickForm();
$this->add('text', 'uri', E::ts('URI'), true);
$this->add('text', 'delimiter', E::ts('Field delimiter'), true);
$this->add('text', 'enclosure', E::ts('Field enclosure character'), true);
$this->add('text', 'escape', E::ts('Escape character'), true);
$this->add('checkbox', 'first_row_as_header', E::ts('First row contains column names'));
}
function setDefaultValues() {
$defaults = parent::setDefaultValues();
foreach($this->source['configuration'] as $field => $value) {
$defaults[$field] = $value;
}
if (!isset($defaults['delimiter'])) {
$defaults['delimiter'] = ',';
}
if (!isset($defaults['enclosure'])) {
$defaults['enclosure'] = '"';
}
if (!isset($defaults['escape'])) {
$defaults['escape'] = '\\';
}
return $defaults;
}
public function postProcess() {
$values = $this->exportValues();
if ($this->dataProcessorId) {
$params['data_processor_id'] = $this->dataProcessorId;
}
if ($this->source_id) {
$params['id'] = $this->source_id;
}
$params['configuration']['uri'] = $values['uri'];
$params['configuration']['delimiter'] = $values['delimiter'];
$params['configuration']['enclosure'] = $values['enclosure'];
$params['configuration']['escape'] = $values['escape'];
$params['configuration']['first_row_as_header'] = $values['first_row_as_header'];
CRM_Dataprocessor_BAO_Source::add($params);
parent::postProcess();
}
}
\ No newline at end of file
......@@ -84,7 +84,7 @@ abstract class AbstractDataFlow {
* @return array
* @throws EndOfFlowException
*/
abstract protected function retrieveNextRecord($fieldNameprefix='');
abstract public function retrieveNextRecord($fieldNameprefix='');
/**
* Returns a name for this data flow.
......@@ -228,10 +228,10 @@ abstract class AbstractDataFlow {
/**
* Returns debug information
*
* @return string
* @return array
*/
public function getDebugInformation() {
return "";
return array();
}
public function addAggregateField(FieldSpecification $aggregateField) {
......
......@@ -9,8 +9,9 @@ namespace Civi\DataProcessor\DataFlow\CombinedDataFlow;
use \Civi\DataProcessor\DataFlow\AbstractDataFlow;
use \Civi\DataProcessor\DataFlow\EndOfFlowException;
use Civi\DataProcessor\DataFlow\MultipleDataFlows\DataFlowDescription;
use Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinSpecification;
use Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinInterface;
use Civi\DataProcessor\DataFlow\MultipleDataFlows\MultipleSourceDataFlows;
use Civi\DataProcessor\DataFlow\SqlDataFlow;
use \Civi\DataProcessor\DataSpecification\DataSpecification;
......@@ -46,6 +47,11 @@ class CombinedDataFlow extends AbstractDataFlow implements MultipleSourceDataFlo
*/
protected $dataSpecification;
/**
* @var int
*/
protected $batchSize = 100;
public function __construct() {
$this->dataSpecification = new DataSpecification();
}
......@@ -60,6 +66,16 @@ class CombinedDataFlow extends AbstractDataFlow implements MultipleSourceDataFlo
$this->sourceDataFlowDescriptions[] = $dataFlowDescription;
}
/**
* @param \Civi\DataProcessor\FieldOutputHandler\AbstractFieldOutputHandler $outputFieldHandler[]
*/
public function setOutputFieldHandlers($handlers) {
parent::setOutputFieldHandlers($handlers);
foreach($this->sourceDataFlowDescriptions as $sourceDataFlowDescription) {
$sourceDataFlowDescription->getDataFlow()->setOutputFieldHandlers($handlers);
}
}
/**
* Initialize the data flow
*
......@@ -79,9 +95,16 @@ class CombinedDataFlow extends AbstractDataFlow implements MultipleSourceDataFlo
}
$allRecords = array();
foreach($this->sourceDataFlowDescriptions as $dataFlowDescription) {
$records = $dataFlowDescription->getDataFlow()->allRecords($dataFlowDescription->getDataFlow()->getName());
$allRecords = $this->joinArray($allRecords, $records, $dataFlowDescription->getJoinSpecification());
for($i=0; $i<count($this->sourceDataFlowDescriptions); $i++) {
do {
$batch = $this->getAllRecordsFromDataFlowAsArray($this->sourceDataFlowDescriptions[$i]->getDataFlow(), $this->batchSize);
for($j=$i+1; $j<count($this->sourceDataFlowDescriptions); $j++) {
$this->sourceDataFlowDescriptions[$j]->getJoinSpecification()->prepareRightDataFlow($batch, $this->sourceDataFlowDescriptions[$j]->getDataFlow());
$rightRecords = $this->getAllRecordsFromDataFlowAsArray($this->sourceDataFlowDescriptions[$j]->getDataFlow());
$batch = $this->sourceDataFlowDescriptions[$j]->getJoinSpecification()->join($batch, $rightRecords);
}
$allRecords = array_merge($allRecords, $batch);
} while(count($batch) >= $this->batchSize || $this->batchSize == 0);
}
$this->recordCount = count($allRecords);
......@@ -101,35 +124,25 @@ class CombinedDataFlow extends AbstractDataFlow implements MultipleSourceDataFlo
}
/**
* Join two arrays together based on the combine specification
* This functions like an INNER JOIN in sql.
*
* @param $left
* @param $right
* @param \Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinSpecification|null
* Return all records for a given data flow.
*
* @param \Civi\DataProcessor\DataFlow\AbstractDataFlow $dataFlow
* @param int $batchSize 0 for unlimited
* @return array
* @throws \Civi\DataProcessor\DataFlow\EndOfFlowException
*/
protected function joinArray($left, $right, JoinSpecification $combineSpecification=null) {
$out = array();
if ($combineSpecification === null && empty($left)) {
return $right;
} elseif ($combineSpecification === null && empty($right)) {
return $left;
}
foreach($left as $left_index => $left_record) {
foreach($right as $right_index => $right_record) {
if ($combineSpecification === null || $combineSpecification->isJoinable($left_record, $right_record)) {
$out[] = array_merge($left_record, $right_record);
unset($left[$left_index]);
unset($right[$right_index]);
}
protected function getAllRecordsFromDataFlowAsArray(AbstractDataFlow $dataFlow, $batchSize=0) {
$records = array();
try {
$i = 0;
while(($record = $dataFlow->retrieveNextRecord()) && ($i < $batchSize || $batchSize == 0)) {
$records[] = $record;
$i++;
}
} catch (EndOfFlowException $e) {
// Do nothing
}
return $out;
return $records;
}
/**
......@@ -159,7 +172,7 @@ class CombinedDataFlow extends AbstractDataFlow implements MultipleSourceDataFlo
* @return array
* @throws EndOfFlowException
*/
protected function retrieveNextRecord($fieldNamePrefix='') {
public function retrieveNextRecord($fieldNamePrefix='') {
if (!$this->isInitialized()) {
$this->initialize();
}
......@@ -206,4 +219,13 @@ class CombinedDataFlow extends AbstractDataFlow implements MultipleSourceDataFlo
return 'combined_data_flow';
}
public function getDebugInformation() {
$debug = array();
foreach($this->sourceDataFlowDescriptions as $sourceDataFlowDescription) {
$debug[$sourceDataFlowDescription->getDataFlow()->getName()] = $sourceDataFlowDescription->getDataFlow()->getDebugInformation();
}
return $debug;
}
}
\ No newline at end of file
......@@ -141,7 +141,7 @@ class CombinedSqlDataFlow extends SqlDataFlow implements MultipleSourceDataFlows
* @return array
* @throws EndOfFlowException
*/
protected function retrieveNextRecord($fieldNamePrefix='') {
public function retrieveNextRecord($fieldNamePrefix='') {
if (!$this->isInitialized()) {
$this->initialize();
}
......
<?php
/**
* @author Jaap Jansma <jaap.jansma@civicoop.org>
* @license AGPL-3.0
*/
namespace Civi\DataProcessor\DataFlow;
use CRM_Dataprocessor_ExtensionUtil as E;
class CsvDataFlow extends AbstractDataFlow {
protected $data = [];
protected $currentPointer = 0;
protected $isInitialized = FALSE;
protected $uri;
protected $delimiter = ',';
protected $enclosure = '"';
protected $escape = '\\';
protected $skipRows = 1;
private $uriHandle;
public function __construct($uri, $skipRows = 1, $delimiter=',', $enclosure='"', $escape='\\') {
parent::__construct();
$this->uri = $uri;
$this->skipRows = $skipRows;
$this->delimiter = $delimiter;
$this->enclosure = $enclosure;
$this->escape = $escape;
}
/**
* Returns the header row
*
* @param $headerRowNumber
* @return array
*/
public function getHeaderRow($headerRowNumber=0) {
$header = array();
$handle = fopen($this->uri, 'r');
for($i=1; $i<$headerRowNumber; $i++) {
$skipRow = fgetcsv($handle, 0, $this->delimiter, $this->enclosure, $this->escape);
if ($i == 0) {
// This is the first row, initialize the header with at least as many columns as this row
foreach($skipRow as $col_idx => $col) {
$header[$col_idx] = E::ts('Column %1', array(1=>$col_idx));
}
}
}
if ($headerRowNumber) {
$headerRow = fgetcsv($handle, 0, $this->delimiter, $this->enclosure, $this->escape);
if ($headerRow) {
foreach ($headerRow as $col_idx => $col) {
$header[$col_idx] = $col;
}
}
}
fclose($handle);
return $header;
}
/**
* Initialize the data flow
*
* @return void
*/
public function initialize() {
if ($this->isInitialized()) {
return;
}
$this->uriHandle = fopen($this->uri, 'r');
for($i=0; $i<$this->skipRows; $i++) {
$skipRow = fgetcsv($this->uriHandle, 0, $this->delimiter, $this->enclosure, $this->escape);
}
$this->isInitialized = TRUE;
}
/**
* Returns whether this flow has been initialized or not
*
* @return bool
*/
public function isInitialized() {
return $this->isInitialized;
}
/**
* Resets the initialized state. This function is called
* when a setting has changed. E.g. when offset or limit are set.
*
* @return void
*/
protected function resetInitializeState() {
$this->isInitialized = FALSE;
}
/**
* Returns the next record in an associative array
*
* @param string $fieldNameprefix
* The prefix before the name of the field within the record.
* @return array
* @throws EndOfFlowException
*/
public function retrieveNextRecord($fieldNameprefix='') {
$this->initialize();
$row = fgetcsv($this->uriHandle, 0, $this->delimiter, $this->enclosure, $this->escape);
if (!$row) {
throw new EndOfFlowException();
}
$record = array();
foreach($this->dataSpecification->getFields() as $field) {
$alias = $field->alias;
$col_index = str_replace("col_", "", $field->name);
$record[$fieldNameprefix.$alias] = $row[$col_index];
}
return $record;
}
/**
* Returns a name for this data flow.
*
* @return string
*/
public function getName() {
return 'csv';
}
}
\ No newline at end of file
......@@ -60,7 +60,7 @@ class InMemoryDataFlow extends AbstractDataFlow {
* @return array
* @throws EndOfFlowException
*/
protected function retrieveNextRecord($fieldNameprefix='') {
public function retrieveNextRecord($fieldNameprefix='') {
if (!isset($this->data[$this->currentPointer])) {
throw new EndOfFlowException();
}
......@@ -68,7 +68,8 @@ class InMemoryDataFlow extends AbstractDataFlow {
$record = array();
foreach($this->dataSpecification->getFields() as $field) {
$alias = $field->alias;
$record[$fieldNameprefix.$field->alias] = $data[$alias];
$name = $field->name;
$record[$fieldNameprefix.$alias] = $data[$name];
}
$this->currentPointer++;
return $record;
......
......@@ -14,14 +14,14 @@ class DataFlowDescription {
protected $dataFlow;
/**
* @var \Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinSpecification
* @var \Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinInterface
*/
protected $joinSpecification = array();
public function __construct($datFlow, $joinSpecification = null) {
$this->dataFlow = $datFlow;
$this->joinSpecification = $joinSpecification;
$this->dataFlow->setOffset($this);
$this->dataFlow->setDataFlowDescription($this);
}
/**
......@@ -32,7 +32,7 @@ class DataFlowDescription {
}
/**
* @return \Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinSpecification
* @return \Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinInterface
*/
public function getJoinSpecification() {
return $this->joinSpecification;
......
......@@ -12,15 +12,14 @@ use Civi\DataProcessor\ProcessorType\AbstractProcessorType;
interface JoinInterface{
/**
* Validates the right record against the left record and returns true when the right record
* has a successfull join with the left record. Otherwise false.
* Joins the records sets and return the new created set.
*
* @param $left_record
* @param $right_record
* @param $left_record_set
* @param $right_record_set
*
* @return mixed
* @return array
*/
public function isJoinable($left_record, $right_record);
public function join($left_record_set, $right_record_set);
/**
* Returns true when this join is compatible with this data flow
......@@ -58,4 +57,14 @@ interface JoinInterface{
*/
public function getConfigurationUrl();
/**
* Prepares the right data flow based on the data in the left record set.
*
* @param $left_record_set
* @param \Civi\DataProcessor\DataFlow\AbstractDataFlow $rightDataFlow
*
* @return \Civi\DataProcessor\DataFlow\AbstractDataFlow
*/
public function prepareRightDataFlow($left_record_set, AbstractDataFlow $rightDataFlow);
}
\ No newline at end of file
......@@ -28,6 +28,30 @@ class SimpleJoin implements JoinInterface, SqlJoinInterface {
*/
protected $right_field;
/**
* @var string
* The alias of the left field
*/
protected $left_field_alias;
/**
* @var string
* The alias of the right field
*/
protected $right_field_alias;
/**
* @var \Civi\DataProcessor\DataSpecification\FieldSpecification
* The alias of the left field
*/
protected $leftFieldSpec;
/**
* @var \Civi\DataProcessor\DataSpecification\FieldSpecification
* The alias of the right field
*/
protected $rightFieldSpec;
/**
* @var string
* The prefix for the left field, or in SQL join mode the left table
......@@ -50,6 +74,16 @@ class SimpleJoin implements JoinInterface, SqlJoinInterface {
*/
protected $left_table;
/**
* @var \Civi\DataProcessor\Source\SourceInterface
*/
protected $left_source;
/**
* @var \Civi\DataProcessor\Source\SourceInterface
*/
protected $right_source;
/**
* @var String
* The join type, e.g. INNER, LEFT, OUT etc..
......@@ -61,6 +95,11 @@ class SimpleJoin implements JoinInterface, SqlJoinInterface {
*/
private $dataProcessor;
/**
* @var \Civi\DataProcessor\DataFlow\SqlDataFlow\OrClause
*/
private $rightClause = null;
public function __construct($left_prefix = null, $left_field = null, $right_prefix = null, $right_field = null, $type = "INNER") {
$this->left_prefix = $left_prefix;
$this->left_field = $left_field;
......@@ -135,22 +174,30 @@ class SimpleJoin implements JoinInterface, SqlJoinInterface {
}
if ($this->left_prefix && $this->left_field) {
$this->left_table = $this->left_prefix;
$left_source = $this->dataProcessor->getDataSourceByName($this->left_prefix);
if ($left_source) {
$leftTable = $left_source->ensureField($this->left_field);
$this->left_source = $this->dataProcessor->getDataSourceByName($this->left_prefix);
if ($this->left_source) {
$leftTable = $this->left_source->ensureField($this->left_field);
if ($leftTable && $leftTable instanceof SqlTableDataFlow) {
$this->left_table = $leftTable->getTableAlias();
}
$this->leftFieldSpec = $this->left_source->getAvailableFields()->getFieldSpecificationByName($this->left_field);
if ($this->leftFieldSpec) {
$this->left_field_alias = $this->leftFieldSpec->alias;
}
}
}
if ($this->right_prefix && $this->right_field) {
$this->right_table = $this->right_prefix;
$right_source = $this->dataProcessor->getDataSourceByName($this->right_prefix);
if ($right_source) {
$rightTable = $right_source->ensureField($this->right_field);
$this->right_source = $this->dataProcessor->getDataSourceByName($this->right_prefix);
if ($this->right_source) {
$rightTable = $this->right_source->ensureField($this->right_field);
if ($rightTable && $rightTable instanceof SqlTableDataFlow) {
$this->right_table = $rightTable->getTableAlias();
}
$this->rightFieldSpec = $this->right_source->getAvailableFields()->getFieldSpecificationByName($this->right_field);
if ($this->rightFieldSpec) {
$this->right_field_alias = $this->rightFieldSpec->alias;
}
}
}
......@@ -159,30 +206,80 @@ class SimpleJoin implements JoinInterface, SqlJoinInterface {
}
/**
* Validates the right record against the left record and returns true when the right record
* has a successfull join with the left record. Otherwise false.
* Joins the records sets and return the new created set.
*
* @param $left_record
* @param $right_record
* @param $left_record_set
* @param $right_record_set
*
* @return mixed
* @return array
*/
public function isJoinable($left_record, $right_record) {
if (isset($left_record[$this->left_prefix.$this->left_field]) && isset($right_record[$this->right_prefix.$this->right_field])) {
if ($left_record[$this->left_prefix.$this->left_field] == $right_record[$this->right_prefix.$this->right_field]) {
return TRUE;
}
} elseif ($this->type == 'LEFT') {
if (isset($left_record[$this->left_prefix.$this->left_field]) && !isset($right_record[$this->right_prefix.$this->right_field])) {
return true;
public function join($left_record_set, $right_record_set) {
$joined_record_set = array();
if ($this->type == 'INNER' || $this->type == 'LEFT') {
foreach ($left_record_set as $left_index => $left_record) {
$is_record_present_in_right_set = FALSE;
foreach ($right_record_set as $right_index => $right_record) {
if (isset($left_record[$this->left_field_alias]) && isset($right_record[$this->right_field_alias])) {
if ($left_record[$this->left_field_alias] == $right_record[$this->right_field_alias]) {
$joined_record_set[] = array_merge($left_record, $right_record);
$is_record_present_in_right_set = TRUE;
}
}
}
if (!$is_record_present_in_right_set && $this->type == 'LEFT') {
$joined_record_set[] = $left_record;
}
}
} elseif ($this->type == 'RIGHT') {
if (!isset($left_record[$this->left_prefix.$this->left_field]) && isset($right_record[$this->right_prefix.$this->right_field])) {
return true;
foreach ($right_record_set as $right_index => $right_record) {
$is_record_present_in_left_set = FALSE;
foreach ($left_record_set as $left_index => $left_record) {
if (isset($left_record[$this->left_field_alias]) && isset($right_record[$this->right_field_alias])) {
if ($left_record[$this->left_field_alias] == $right_record[$this->right_field_alias]) {
$joined_record_set[] = array_merge($left_record, $right_record);
$is_record_present_in_left_set = TRUE;
}
}
}
if (!$is_record_present_in_left_set && $this->type == 'RIGHT') {
$joined_record_set[] = $right_record;
}
}
}
return $joined_record_set;
}
return false;
/**
* Prepares the right data flow based on the data in the left record set.
*
* @param $left_record_set
* @param \Civi\DataProcessor\DataFlow\AbstractDataFlow $rightDataFlow
*
* @return AbstractDataFlow
* @throws \Exception
*/
public function prepareRightDataFlow($left_record_set, AbstractDataFlow $rightDataFlow) {
if ($rightDataFlow instanceof SqlTableDataFlow) {
if ($this->rightClause) {
$rightDataFlow->removeWhereClause($this->rightClause);
}
$table = $rightDataFlow->getTableAlias();
$this->rightClause = new SqlDataFlow\OrClause();
foreach ($left_record_set as $left_record) {
if (isset($left_record[$this->left_field_alias])) {
$value = $left_record[$this->left_field_alias];
$this->rightClause->addWhereClause(new SqlDataFlow\SimpleWhereClause($table, $this->right_field, '=', $value));
// Make sure the join field is also available in the select statement of the query.
if (!$rightDataFlow->getDataSpecification()->doesFieldExist($this->right_field_alias)) {
$rightDataFlow->getDataSpecification()
->addFieldSpecification($this->right_field_alias, $this->rightFieldSpec);
}
}
}
$rightDataFlow->addWhereClause($this->rightClause);
}
return $rightDataFlow;
}
/**
......
......@@ -24,9 +24,9 @@ abstract class SqlDataFlow extends AbstractDataFlow {
protected $whereClauses = array();
protected $sqlStatement;
protected $sqlStatements = array();
protected $sqlCountStatement;
protected $sqlCountStatements = array();
/**
* Returns an array with the fields for in the select statement in the sql query.
......@@ -68,7 +68,7 @@ abstract class SqlDataFlow extends AbstractDataFlow {
$orderBy = $this->getOrderByStatement();
$countSql = "SELECT COUNT(*) {$from} {$where} {$groupBy}";
$this->sqlCountStatement = $countSql;
$this->sqlCountStatements[] = $countSql;
$this->count = \CRM_Core_DAO::singleValueQuery($countSql);
$sql = "{$this->getSelectQueryStatement()} {$where} {$groupBy} {$orderBy}";
......@@ -82,17 +82,21 @@ abstract class SqlDataFlow extends AbstractDataFlow {
$limitStatement = "LIMIT 0, {$this->limit}";
}
elseif ($this->offset !== FALSE && $this->limit === FALSE) {
echo $this->getName();
var_dump($this->offset);
$calculatedLimit = $this->count - $this->offset;
$limitStatement = "LIMIT {$this->offset}, {$calculatedLimit}";
}
$sql .= " {$limitStatement}";
$this->sqlStatement = $sql;
$this->sqlStatements[] = $sql;
$this->dao = \CRM_Core_DAO::executeQuery($sql);
} catch (\Exception $e) {
throw new \Exception(
"Error in query.
\r\nCount query: {$this->sqlCountStatement}
\r\nQuery: $this->sqlStatement", 0, $e);
"Error in DataFlow query.
\r\nData flow: {$this->getName()}
\r\nCount query: {$countSql}
\r\nQuery: $sql", 0, $e);
}
}
......@@ -126,7 +130,7 @@ abstract class SqlDataFlow extends AbstractDataFlow {
* @return array
* @throws EndOfFlowException
*/
protected function retrieveNextRecord($fieldNamePrefix='') {
public function retrieveNextRecord($fieldNamePrefix='') {
if (!$this->isInitialized()) {
$this->initialize();
}
......@@ -137,7 +141,7 @@ abstract class SqlDataFlow extends AbstractDataFlow {
$record = array();
foreach($this->dataSpecification->getFields() as $field) {
$alias = $field->alias;
$record[$fieldNamePrefix.$field->alias] = $this->dao->$alias;
$record[$fieldNamePrefix.$alias] = $this->dao->$alias;
}
return $record;
}
......@@ -250,7 +254,10 @@ abstract class SqlDataFlow extends AbstractDataFlow {
* @return string
*/
public function getDebugInformation() {
return $this->sqlStatement;
return array(
'query' => $this->sqlStatements,
'count query' => $this->sqlCountStatements,
);
}
}
\ No newline at end of file
<?php
/**
* @author Jaap Jansma <jaap.jansma@civicoop.org>
* @license AGPL-3.0
*/
namespace Civi\DataProcessor\DataFlow\SqlDataFlow;
use Civi\DataProcessor\DataFlow\SqlDataFlow\WhereClauseInterface;
class OrClause implements WhereClauseInterface {
/**
* @var WhereClauseInterface[]
*/
protected $clauses = array();
/**
* OrClause constructor.
*
* @param WhereClauseInterface[] $clauses
*/
public function __construct($clauses=array()) {
$this->clauses = $clauses;
}
/**
* Add a where clause to this clause
*
* @param \Civi\DataProcessor\DataFlow\SqlDataFlow\WhereClauseInterface $clause
*/
public function addWhereClause(WhereClauseInterface $clause) {
$this->clauses[] = $clause;
}
/**
* Returns the where clause
* E.g. contact_type = 'Individual'
*
* @return string
*/
public function getWhereClause() {
if (count($this->clauses)) {
$clauses = array();
foreach($this->clauses as $clause) {
$clauses[] = "(". $clause->getWhereClause() . ")";
}
return "(" . implode(" OR ", $clauses) . ")";
}
return "1";
}
}
\ No newline at end of file
......@@ -109,6 +109,7 @@ class Factory {
$this->addDataSource('mailing', 'Civi\DataProcessor\Source\MailingSource', E::ts('Mailing'));
$this->addDataSource('mailing_job', 'Civi\DataProcessor\Source\MailingJobSource', E::ts('Mailing Job'));
$this->addDataSource('mailing_group', 'Civi\DataProcessor\Source\MailingGroupSource', E::ts('Mailing Group'));
$this->addDataSource('csv', 'Civi\DataProcessor\Source\CSV', E::ts('CSV File'));
$this->addOutput('api', 'Civi\DataProcessor\Output\Api', E::ts('API'));
$this->addFilter('simple_sql_filter', 'Civi\DataProcessor\FilterHandler\SimpleSqlFilter', E::ts('Simple Filter'));
$this->addjoinType('simple_join', 'Civi\DataProcessor\DataFlow\MultipleDataFlows\SimpleJoin', E::ts('Simple Join'));
......
......@@ -15,6 +15,11 @@ abstract class AbstractFieldOutputHandler {
*/
protected $outputFieldSpecification;
/**
* @var \Civi\DataProcessor\Source\SourceInterface
*/
protected $dataSource;
/**
* Returns the name of the handler type.
*
......@@ -46,8 +51,20 @@ abstract class AbstractFieldOutputHandler {
*/
abstract public function formatField($rawRecord, $formattedRecord);
public function __construct() {
$this->outputFieldSpecification = new FieldSpecification($this->getName(), $this->getType(), $this->getName());
/**
* AbstractFieldOutputHandler constructor.
*
* @param \Civi\DataProcessor\Source\SourceInterface $dataSource
*/
public function __construct(\Civi\DataProcessor\Source\SourceInterface $dataSource) {
$this->dataSource = $dataSource;
}
/**
* @return \Civi\DataProcessor\Source\SourceInterface
*/
public function getDataSource() {
return $this->dataSource;
}
/**
......
......@@ -17,13 +17,8 @@ class RawFieldOutputHandler extends AbstractFieldOutputHandler implements Output
*/
protected $inputFieldSpec;
/**
* @var \Civi\DataProcessor\Source\SourceInterface
*/
protected $dataSource;
public function __construct(FieldSpecification $inputFieldSpec, SourceInterface $dataSource) {
$this->dataSource = $dataSource;
parent::__construct($dataSource);
$this->inputFieldSpec = $inputFieldSpec;
$this->outputFieldSpecification = clone $inputFieldSpec;
$this->outputFieldSpecification->alias = $this->getName();
......
<?php
/**
* @author Jaap Jansma <jaap.jansma@civicoop.org>
* @license AGPL-3.0
*/
namespace Civi\DataProcessor\Source;
use Civi\DataProcessor\DataFlow\CsvDataFlow;
use Civi\DataProcessor\DataFlow\InMemoryDataFlow;
use Civi\DataProcessor\DataSpecification\DataSpecification;
use Civi\DataProcessor\DataSpecification\FieldSpecification;
class CSV extends AbstractSource {
protected $headerRow;
protected $rows;
/**
* @var \Civi\DataProcessor\DataSpecification\DataSpecification
*/
protected $availableFields;
/**
* Initialize the join
*
* @return void
*/
public function initialize() {
if ($this->dataFlow) {
return;
}
$this->availableFields = new DataSpecification();
$uri = $this->configuration['uri'];
$skipRows = 0;
$headerRowNumber = 0;
if (isset($this->configuration['first_row_as_header']) && $this->configuration['first_row_as_header']) {
$skipRows = 1;
$headerRowNumber = 1;
}
$delimiter = $this->configuration['delimiter'];
$enclosure = $this->configuration['enclosure'];
$escape = $this->configuration['escape'];
$this->dataFlow = new CsvDataFlow($uri, $skipRows, $delimiter, $enclosure, $escape);
$this->headerRow = $this->dataFlow->getHeaderRow($headerRowNumber);
foreach($this->headerRow as $idx => $colName) {
$name = 'col_'.$idx;
$alias = $this->getSourceName().$name;
$field = new FieldSpecification($name, 'String', $colName, null, $alias);
$this->availableFields->addFieldSpecification($name, $field);
}
}
/**
* Ensure that filter field is accesible in the query
*
* @param String $fieldName
* @return \Civi\DataProcessor\DataFlow\AbstractDataFlow|null
* @throws \Exception
*/
public function ensureField($fieldName) {
$field = $this->getAvailableFields()->getFieldSpecificationByName($fieldName);
if ($field) {
$this->dataFlow->getDataSpecification()
->addFieldSpecification($fieldName, $field);
}
return $this->dataFlow;
}
/**
* Ensures a field is in the data source
*
* @param \Civi\DataProcessor\DataSpecification\FieldSpecification $fieldSpecification
*
* @return \Civi\DataProcessor\Source\SourceInterface
* @throws \Exception
*/
public function ensureFieldInSource(FieldSpecification $fieldSpecification) {
if (!$this->dataFlow->getDataSpecification()->doesFieldExist($fieldSpecification->name)) {
$this->dataFlow->getDataSpecification()->addFieldSpecification($fieldSpecification->name, $fieldSpecification);
}
return $this;
}
/**
* Ensures an aggregation field in the data source
*
* @param \Civi\DataProcessor\DataSpecification\FieldSpecification $fieldSpecification
*
* @return \Civi\DataProcessor\Source\SourceInterface
* @throws \Exception
*/
public function ensureAggregationFieldInSource(FieldSpecification $fieldSpecification) {
$this->dataFlow->getDataSpecification()->addFieldSpecification($fieldSpecification->name, $fieldSpecification);
return $this;
}
/**
* @return \Civi\DataProcessor\DataSpecification\DataSpecification
*/
public function getAvailableFields() {
return $this->availableFields;
}
/**
* @return \Civi\DataProcessor\DataSpecification\DataSpecification
*/
public function getAvailableFilterFields() {
return $this->availableFields;
}
/**
* @return \Civi\DataProcessor\DataSpecification\AggregationField[]
*/
public function getAvailableAggregationFields() {
return array();
}
/**
* Returns URL to configuration screen
*
* @return false|string
*/
public function getConfigurationUrl() {
return 'civicrm/dataprocessor/form/source/csv';
}
}
\ No newline at end of file
# Howto Develop a Custom Data Source
# Table of contents
In this tutorial we are going to develop our own data source, it is going to be a an importer for CSV files.
# Before we start
When the administrator adds this data source it should be able to define where the CSV file could be found.
# The datasource class.
Begin with a new php file in `Civi\DataProcessor\Source\CSV.php`:
```php
namespace Civi\DataProcessor\Source;
class CSV extends AbstractSource {
}
```
# Giving back a DataFlow
The data source holds the configuration of the source and gives back a `DataFlow` object.
The extension provides the following `DataFlow` objects:
* `InMemoryDataFlow` which retreives data which are stored in memory.
* `SQLTableDataFlow` which retrieves data from a Database Table
* `CombinedSqlDataFlow` which combines several SqlDataFlows
* `CombinedDataFlow` which combines several DataFlows
We are going to use the `InMemoryDataFlow` because we read the data from the csv file and then store it in memory.
{crmScope extensionKey='dataprocessor'}
<div class="crm-submit-buttons">
{include file="CRM/common/formButtons.tpl" location="top"}
</div>
{* block for rule data *}
<h3>{ts}Data Processor Sources configuration{/ts}</h3>
<div class="crm-block crm-form-block crm-data-processor_source_configuration-block">
<div class="help-block" id="help">
{ts}<p>On this form you can configure the CSV data source.</p>
<p>The <strong>URI</strong> is the file location. Which is either a path on the server or a URL from where the file could be downloaded.</p>
<p>The <strong>Field delimiter</strong> the character which separates each field.</p>
<p>The <strong>Field enclosure character</strong> is the a character which is wrapped around each field.</p>
<p>The <strong>Escape character</strong> is the character which marks special characters</p>
{/ts}
</div>
<div class="crm-section">
<div class="label">{$form.uri.label}</div>
<div class="content">{$form.uri.html}</div>
<div class="clear"></div>
</div>
<div class="crm-section">
<div class="label">&nbsp;</div>
<div class="content">{$form.first_row_as_header.html} &nbsp; {$form.first_row_as_header.label}</div>
<div class="clear"></div>
</div>
<div class="crm-section">
<div class="label">{$form.delimiter.label}</div>
<div class="content">{$form.delimiter.html}</div>
<div class="clear"></div>
</div>
<div class="crm-section">
<div class="label">{$form.enclosure.label}</div>
<div class="content">{$form.enclosure.html}</div>
<div class="clear"></div>
</div>
<div class="crm-section">
<div class="label">{$form.escape.label}</div>
<div class="content">{$form.escape.html}</div>
<div class="clear"></div>
</div>
</div>
<div class="crm-submit-buttons">
{include file="CRM/common/formButtons.tpl" location="bottom"}
</div>
{/crmScope}
\ No newline at end of file
......@@ -63,6 +63,13 @@
<access_arguments>access CiviCRM</access_arguments>
<access_arguments>administer CiviCRM</access_arguments>
</item>
<item>
<path>civicrm/dataprocessor/form/source/csv</path>
<page_callback>CRM_Dataprocessor_Form_Source_Csv</page_callback>
<title>DataProcessor</title>
<access_arguments>access CiviCRM</access_arguments>
<access_arguments>administer CiviCRM</access_arguments>
</item>
<item>
<path>civicrm/dataprocessor/form/output/api</path>
<page_callback>CRM_Dataprocessor_Form_Output_API</page_callback>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment