Skip to content
Snippets Groups Projects
CombinedSqlDataFlow.php 7.16 KiB
Newer Older
  • Learn to ignore specific revisions
  • jaapjansma's avatar
    jaapjansma committed
    <?php
    /**
     * @author Jaap Jansma <jaap.jansma@civicoop.org>
     * @license AGPL-3.0
     */
    
    namespace Civi\DataProcessor\DataFlow\CombinedDataFlow;
    
    use \Civi\DataProcessor\DataFlow\EndOfFlowException;
    use Civi\DataProcessor\DataFlow\InvalidFlowException;
    use Civi\DataProcessor\DataFlow\MultipleDataFlows\DataFlowDescription;
    
    jaapjansma's avatar
    jaapjansma committed
    use Civi\DataProcessor\DataFlow\MultipleDataFlows\JoinInterface;
    
    jaapjansma's avatar
    jaapjansma committed
    use Civi\DataProcessor\DataFlow\MultipleDataFlows\MultipleSourceDataFlows;
    
    jaapjansma's avatar
    jaapjansma committed
    use Civi\DataProcessor\DataFlow\MultipleDataFlows\SqlJoinInterface;
    
    jaapjansma's avatar
    jaapjansma committed
    use Civi\DataProcessor\DataFlow\SqlDataFlow;
    
    jaapjansma's avatar
    jaapjansma committed
    use Civi\DataProcessor\DataFlow\SqlTableDataFlow;
    
    jaapjansma's avatar
    jaapjansma committed
    use \Civi\DataProcessor\DataSpecification\DataSpecification;
    
    
    class CombinedSqlDataFlow extends SqlDataFlow implements MultipleSourceDataFlows {
    
      /**
       * @var \Civi\DataProcessor\DataFlow\MultipleDataFlows\DataFlowDescription[]
       */
      protected $sourceDataFlowDescriptions = array();
    
    
    jaapjansma's avatar
    jaapjansma committed
      /**
       * @var null|String
       */
      protected $primary_table;
    
      /**
       * @var null|String
       */
      protected $primary_table_alias;
    
    jaapjansma's avatar
    jaapjansma committed
    
    
    jaapjansma's avatar
    jaapjansma committed
      /**
       * @var String
       */
      protected $name;
    
      public function __construct($name = 'combined_sql_data_flow', $primary_table=null, $primary_table_alias=null) {
        parent::__construct();
        $this->primary_table = $primary_table;
        $this->primary_table_alias = $primary_table_alias;
        $this->name = $name;
    
    jaapjansma's avatar
    jaapjansma committed
      }
    
      /**
       * Adds a source data flow
       *
       * @param \Civi\DataProcessor\DataFlow\MultipleDataFlows\DataFlowDescription $dataFlowDescription
       * @return void
       * @throws \Civi\DataProcessor\DataFlow\InvalidFlowException
       */
      public function addSourceDataFlow(DataFlowDescription $dataFlowDescription) {
        if (!$dataFlowDescription->getDataFlow() instanceof SqlDataFlow) {
          throw new InvalidFlowException();
        }
        $this->sourceDataFlowDescriptions[] = $dataFlowDescription;
      }
    
    
    jaapjansma's avatar
    jaapjansma committed
      /**
       * Returns the Table part in the from statement.
       *
       * @return string
       */
      public function getTableStatement() {
    
        $sourceDataFlowDescription = reset($this->sourceDataFlowDescriptions);
        $dataFlow = $sourceDataFlowDescription->getDataFlow();
        return $dataFlow->getTableStatement();
    
    jaapjansma's avatar
    jaapjansma committed
      }
    
    
    jaapjansma's avatar
    jaapjansma committed
      /**
       * Returns the From Statement.
       *
       * @return string
       */
      public function getFromStatement() {
        $fromStatements = array();
    
    jaapjansma's avatar
    jaapjansma committed
        $sourceDataFlowDescription = reset($this->sourceDataFlowDescriptions);
    
        $dataFlow = $sourceDataFlowDescription->getDataFlow();
    
        $fromStatements[] = $dataFlow->getFromStatement();
    
        $fromStatements = array_merge($fromStatements, $this->getJoinStatement(1));
    
    jaapjansma's avatar
    jaapjansma committed
        return implode(" ", $fromStatements);
      }
    
      /**
       * Returns the join Statement part.
       *
       * @param int $skip
       * @return string
       */
      public function getJoinStatement($skip=0) {
        $fromStatements = array();
        $i = 0;
    
    jaapjansma's avatar
    jaapjansma committed
        foreach($this->sourceDataFlowDescriptions as $sourceDataFlowDescription) {
    
    jaapjansma's avatar
    jaapjansma committed
          $i++;
          if ($i > $skip) {
            if ($sourceDataFlowDescription->getJoinSpecification()) {
    
              $joinStatement = $sourceDataFlowDescription->getJoinSpecification()->getJoinClause($sourceDataFlowDescription);
    
    jaapjansma's avatar
    jaapjansma committed
              if (is_array($joinStatement)) {
                $fromStatements = array_merge($fromStatements, $joinStatement);
              } else {
                $fromStatements[] = $joinStatement;
              }
            }
            if ($sourceDataFlowDescription->getDataFlow() instanceof CombinedSqlDataFlow) {
              $fromStatements = array_merge($fromStatements, $sourceDataFlowDescription->getDataFlow()->getJoinStatement(0));
            }
    
    jaapjansma's avatar
    jaapjansma committed
          }
        }
    
    jaapjansma's avatar
    jaapjansma committed
        return $fromStatements;
    
    jaapjansma's avatar
    jaapjansma committed
      }
    
      /**
       * Returns an array with the fields for in the select statement in the sql query.
       *
       * @return string[]
       */
      public function getFieldsForSelectStatement() {
        $fields = array();
        foreach($this->sourceDataFlowDescriptions as $sourceDataFlowDescription) {
    
    jaapjansma's avatar
    jaapjansma committed
          $fields = array_merge($fields, $sourceDataFlowDescription->getDataFlow()->getFieldsForSelectStatement());
        }
        return $fields;
      }
    
      /**
       * Returns an array with the fields for in the group by statement in the sql query.
       *
       * @return string[]
       */
      public function getFieldsForGroupByStatement() {
        $fields = array();
    
        foreach($this->aggregateFields as $field) {
          $fields[] = "`{$this->primary_table_alias}`.`{$field->name}`";
        }
    
    jaapjansma's avatar
    jaapjansma committed
        foreach($this->sourceDataFlowDescriptions as $sourceDataFlowDescription) {
          $fields = array_merge($fields, $sourceDataFlowDescription->getDataFlow()->getFieldsForGroupByStatement());
    
    jaapjansma's avatar
    jaapjansma committed
        }
        return $fields;
      }
    
    
      /**
       * Returns the next record in an associative array
       *
       * @param string $fieldNamePrefix
       *   The prefix before the name of the field within the record
       * @return array
       * @throws EndOfFlowException
       */
    
    jaapjansma's avatar
    jaapjansma committed
      public function retrieveNextRecord($fieldNamePrefix='') {
    
    jaapjansma's avatar
    jaapjansma committed
        if (!$this->isInitialized()) {
          $this->initialize();
        }
    
        if (!$this->dao->fetch()) {
          throw new EndOfFlowException();
        }
        $record = array();
        foreach($this->sourceDataFlowDescriptions as $sourceDataFlowDescription) {
          foreach ($sourceDataFlowDescription->getDataFlow()->getDataSpecification()->getFields() as $field) {
            $alias = $field->alias;
    
    jaapjansma's avatar
    jaapjansma committed
            $record[$alias] = $this->dao->$alias;
    
    jaapjansma's avatar
    jaapjansma committed
          }
        }
        return $record;
      }
    
      /**
       * @return DataSpecification
       * @throws \Civi\DataProcessor\DataSpecification\FieldExistsException
       */
      public function getDataSpecification() {
    
    jaapjansma's avatar
    jaapjansma committed
        if (!$this->dataSpecification) {
          $this->dataSpecification = new DataSpecification();
          foreach ($this->sourceDataFlowDescriptions as $sourceDataFlowDescription) {
    
    jaapjansma's avatar
    jaapjansma committed
            $dataFlow = $sourceDataFlowDescription->getDataFlow();
            $namePrefix = $dataFlow->getName();
            $this->dataSpecification->merge($dataFlow->getDataSpecification(), $namePrefix);
    
    jaapjansma's avatar
    jaapjansma committed
        }
    
    jaapjansma's avatar
    jaapjansma committed
        return $this->dataSpecification;
    
    jaapjansma's avatar
    jaapjansma committed
      }
    
      public function getName() {
    
    jaapjansma's avatar
    jaapjansma committed
        return $this->name;
    
    jaapjansma's avatar
    jaapjansma committed
      }
    
    
    jaapjansma's avatar
    jaapjansma committed
      public function getWhereClauses() {
    
    jaapjansma's avatar
    jaapjansma committed
        foreach($this->whereClauses as $clause) {
          $clauses[] = $clause;
        }
        foreach($this->sourceDataFlowDescriptions as $sourceDataFlowDescription) {
          if ($sourceDataFlowDescription->getDataFlow() instanceof SqlDataFlow) {
            foreach($sourceDataFlowDescription->getDataFlow()->getWhereClauses() as $clause) {
              $clauses[] = $clause;
            }
          }
        }
        return $clauses;
      }
    
    
    jaapjansma's avatar
    jaapjansma committed
      /**
       * @return null|String
       */
      public function getPrimaryTable() {
        return $this->primary_table;
      }
    
      /**
       * @return null|String
       */
      public function getPrimaryTableAlias() {
        return $this->primary_table_alias;
      }
    
    
    jaapjansma's avatar
    jaapjansma committed
      /**
       * @param \Civi\DataProcessor\DataFlow\SqlDataFlow\WhereClauseInterface $clause
       *
       * @return \Civi\DataProcessor\DataFlow\SqlDataFlow
       */
      public function removeWhereClause(SqlDataFlow\WhereClauseInterface $clause) {
        foreach($this->whereClauses as  $i => $c) {
          if ($c->getWhereClause() == $clause->getWhereClause()) {
            unset($this->whereClauses[$i]);
          }
        }
        foreach($this->sourceDataFlowDescriptions as $sourceDataFlowDescription) {
          if ($sourceDataFlowDescription->getDataFlow() instanceof SqlDataFlow) {
            $sourceDataFlowDescription->getDataFlow()->removeWhereClause($clause);
          }
        }
        return $this;
      }