Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<?php
/**
* @author Jaap Jansma <jaap.jansma@civicoop.org>
* @license AGPL-3.0
*/
namespace Civi\DataProcessor\DataFlow;
use Civi\DataProcessor\DataFlow\SqlDataFlow\WhereClauseInterface;
use \Civi\DataProcessor\DataSpecification\DataSpecification;
abstract class SqlDataFlow extends AbstractDataFlow {
/**
* @var null|\CRM_Core_DAO
*/
protected $dao = null;
/**
* @var null|int
*/
protected $count = null;
protected $whereClauses = array();
/**
* Returns an array with the fields for in the select statement in the sql query.
*
* @return string[]
* @throws \Civi\DataProcessor\DataSpecification\FieldExistsException
*/
abstract public function getFieldsForSelectStatement();
/**
* Returns the From Statement.
*
* @return string
*/
abstract public function getFromStatement();
/**
* Initialize the data flow
*
* @return void
*/
public function initialize() {
if ($this->isInitialized()) {
return;
}
$from = $this->getFromStatement();
$where = $this->getWhereStatement();
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
$countSql = "SELECT COUNT(*) {$from} {$where}";
$this->count = \CRM_Core_DAO::singleValueQuery($countSql);
$sql = $this->getSelectQueryStatement();
$sql .= $where;
// Build Limit and Offset.
$limitStatement = "";
if ($this->offset !== false && $this->limit !== false) {
$limitStatement = "LIMIT {$this->offset}, {$this->limit}";
} elseif ($this->offset === false && $this->limit !== false) {
$limitStatement = "LIMIT 0, {$this->limit}";
}
elseif ($this->offset !== false && $this->limit === false) {
$calculatedLimit = $this->count - $this->offset;
$limitStatement = "LIMIT {$this->offset}, {$calculatedLimit}";
}
$sql .= " {$limitStatement}";
$this->dao = \CRM_Core_DAO::executeQuery($sql);
}
/**
* Returns whether this flow has been initialized or not
*
* @return bool
*/
public function isInitialized() {
if ($this->dao !== null) {
return true;
}
return false;
}
/**
* Resets the initialized state. This function is called
* when a setting has changed. E.g. when offset or limit are set.
*
* @return void
*/
protected function resetInitializeState() {
$this->dao = null;
}
/**
* Returns the next record in an associative array
*
* @param string $fieldNamePrefix
* The prefix before the name of the field within the record
* @return array
* @throws EndOfFlowException
*/
protected function retrieveNextRecord($fieldNamePrefix='') {
if (!$this->isInitialized()) {
$this->initialize();
}
if (!$this->dao->fetch()) {
throw new EndOfFlowException();
}
$record = array();
foreach($this->dataSpecification->getFields() as $field) {
$alias = $field->alias;
$record[$fieldNamePrefix.$field->name] = $this->dao->$alias;
}
return $record;
}
/**
* @return int
*/
public function recordCount() {
if (!$this->isInitialized()) {
$this->initialize();
}
return $this->count;
}
/**
* @return string
* @throws \Civi\DataProcessor\DataSpecification\FieldExistsException
*/
public function getSelectQueryStatement() {
$select = implode(", ", $this->getFieldsForSelectStatement());
$from = $this->getFromStatement();
return "SELECT {$select} {$from}";
}
/**
* Returns the where statement for this query.
*
* @return string
*/
public function getWhereStatement() {
$clauses = array("1");
$clauses[] = $clause->getWhereClause();
}
return "WHERE ". implode(" AND ", $clauses);
}
/**
* @param \Civi\DataProcessor\DataFlow\SqlDataFlow\WhereClauseInterface $clause
*
* @return \Civi\DataProcessor\DataFlow\SqlDataFlow
*/
public function addWhereClause(WhereClauseInterface $clause) {
$this->whereClauses[] = $clause;
return $this;
}
/**
* Return all the where clauses
*
* @return array
*/
public function getWhereClauses() {
return $this->whereClauses;
}