Wednesday, April 13, 2016

Zend framework 2 / Doctrine 2 / Bulk operations and events triggering

Leave a Comment

For a huge project, with a lot of entities, I wrote a save() common method.

This method is stored in an abstract service and is used in all the project to save entities state.

AbstractService::save() looks like this :

public function save($entity) {     $transactionStarted = $this->beginTransaction();      try     {         $action = $entity->getId() ? self::UPDATE : self::CREATION;          $this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]);          $this->getEntityManager()->persist($entity);         $this->getEntityManager()->flush();          $this->getEventManager()->trigger('save.post', $entity, ['action' => $action]);          if ($transactionStarted)         {             $this->commitTransaction();         }     } catch (\Exception $e)     {         if ($transactionStarted)         {             $this->rollbackTransaction();         }          throw new Exception('Unable to save entity', $e);     }      return true; }  public function beginTransaction() {     if (!$this->getEntityManager()->getConnection()->isTransactionActive())     {         $this->getEntityManager()->getConnection()->beginTransaction();          return true;     }      return false; }  public function commitTransaction() {     $this->getEntityManager()->getConnection()->commit();      return $this; }  public function rollbackTransaction() {     $this->getEntityManager()->getConnection()->rollBack();      return $this; } 

In my case, when a member is inserted (new Member entity) when calling the Member service (extended AbstractService), an email is sent (e.g) through the save.post event. Or another action related to another service calling save method too can be proceed.

Example of the "child" MemberService::save() method

MemberService  public function save(Member $member) {     // some stuff, e.g set a property     $member->setFirstName('John');      return parent::save($member); } 

Example of triggered event

$sharedEventManager->attach(MemberService::class, 'save.post', [$this, 'onMembersCreation']);  public function onMembersCreation(EventInterface $event) {     // send an email      // anything else ... update another entity ... (call AnotherService::save() too)  } 

That's great for a simple saving process.

But now, I want to massively import a lot of members, with creations, updates, ... And to achieve that, I read the Doctrine doc related to bulk imports. Doc here

But how to update my code properly to handle "bulk saving" and "single saving" ? And keep transactions security and events ?

1 Answers

Answers 1

Basically I suggest you implement the Doctrine\Common\Collections\Collection interface, maybe extending ArrayCollection, and create a method save that will do what the doc told you to.

<?php  class MyDirtyCollection extends \Doctrine\Common\Collections\ArrayCollection {      public function __construct(AbstractService $abstractService)     {         $this->service = $abstractService;     }      public function save()     {         foreach ($this as $entity) {             $this->service->save($entity);         }     } }  class MyCollection extends \Doctrine\Common\Collections\ArrayCollection {      public $bulkSize = 500;      protected $eventManager;     protected $entityManager;      public function __construct(EntityManager $entityManager, EventManager $eventManager)     {         $this->entityManager = $entityManager;         $this->eventManager = $eventManager;     }      public function getEventManager()     {         return $this->eventManager;     }      public function getEntityManager()     {         return $this->entityManager;     }      public function setBulkSize(int $bulkSize)     {         $this->bulkSize = $bulkSize;     }      public function save()     {         $transactionStarted = $this->getEntityManager()->getConnection()->beginTransaction();          try {             foreach ($this as $entity) {                 $action = $entity->getId() ? self::UPDATE : self::CREATION;                 $this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]);             }              $i = 0;             foreach ($this as $entity) {                 $i++;                  $this->getEntityManager()->persist($entity);                  if (($i % $this->bulkSize) === 0) {                     $this->getEntityManager()->flush();                     $this->getEntityManager()->clear();                 }             }              $this->getEntityManager()->flush();             $this->getEntityManager()->clear();              foreach ($this as $entity) {                 $action = $entity->getId() ? self::UPDATE : self::CREATION;                 $this->getEventManager()->trigger('save.post', $entity, ['action' => $action]);             }              if ($transactionStarted) {                 $this->getEntityManager()->getConnection()->commitTransaction();             }          } catch (Exception $e) {             $this->getEntityManager()->rollbackTransaction();         }     } } 

Something like that ;) When you fetch your data you hydrate your collection, then you deal with your entity and finally call $collection->save();

EDIT : Add insert class and use case below :

The performance here will be low, but still better than commit by commit. Yet you should think about using Doctrine DBAL instead of the ORM if you are looking for hgih performance. Here I share with you my DBAL class for bulk Insert :

<?php  namespace JTH\Doctrine\DBAL;  use Doctrine\DBAL\Query\QueryBuilder; use Exception; use InvalidArgumentException; use Traversable; use UnderflowException;  class Insert extends QueryBuilder {     const CALLBACK_FAILURE_SKIP = 0;     const CALLBACK_FAILURE_BREAK = 1;      protected $callbackFailureStrategy = self::CALLBACK_FAILURE_BREAK;      public static $defaultBulkSize = 500;      public $ignore = false;     public $onDuplicate = null;      public function values(array $values)     {         $this->resetQueryPart('values');         $this->addValues($values);     }      public function addValues(array $values)     {         $this->add('values', $values, true);     }      public function setCallbackFailureStrategy($strategy)     {         if ($strategy == static::CALLBACK_FAILURE_BREAK) {             $this->callbackFailureStrategy = static::CALLBACK_FAILURE_BREAK;         } elseif ($strategy == static::CALLBACK_FAILURE_SKIP) {             $this->callbackFailureStrategy = static::CALLBACK_FAILURE_SKIP;         } else {             $class = self::class;             throw new InvalidArgumentException(                 "Invalid failure behaviour. See $class::CALLBACK_FAILURE_SKIP and $class::CALLBACK_FAILURE_BREAK"             );         }     }      public function getCallbackFailureStrategy()     {         return $this->callbackFailureStrategy;     }      public function execute()     {         return $this->getConnection()->executeUpdate(             $this->getSQLForInsert(),             $this->getParameters(),             $this->getParameterTypes()         );     }      /**      * Converts this instance into an INSERT string in SQL.      * @return string      * @throws \Exception      */     private function getSQLForInsert()     {         $count = sizeof($this->getQueryPart('values'));          if ($count == 0) {             throw new UnderflowException("No values ready for INSERT");         }          $values = current($this->getQueryPart('values'));         $ignore = $this->ignore ? 'IGNORE' : '' ;         $sql = "INSERT $ignore INTO " . $this->getQueryPart('from')['table'] .             ' (' . implode(', ', array_keys($values)) . ')' . ' VALUES ';          foreach ($this->getQueryPart('values') as $values) {             $sql .= '(' ;              foreach ($values as $value) {                 if (is_array($value)) {                     if ($value['raw']) {                         $sql .= $value['value'] . ',';                     } else {                         $sql .= $this->expr()->literal($value['value'], $value['type']) . ',';                     }                 } else {                     $sql .= $this->expr()->literal($value) . ',';                 }             }              $sql = substr($sql, 0, -1);             $sql .= '),';         }          $sql = substr($sql, 0, -1);          if (!is_null($this->onDuplicate)) {             $sql .= ' ON DUPLICATE KEY UPDATE ' . $this->onDuplicate . ' ';         }          return $sql;     }      /**      * @param $loopable array | Traversable An array or object to loop over      * @param $callable Callable A callable that will be called before actually insert the row.      * two parameters will be passed :      * - the key of the current row      * - the row values (Array)      * An array of rows to insert must be returned      * @param $bulkSize int How many rows will be inserted at once      * @param bool $transactionnal      * @throws \Doctrine\DBAL\ConnectionException      * @throws \Exception      */     public function bulk($loopable, callable $callable, $bulkSize = null, $transactionnal = true)     {         if (!is_array($loopable) and !($loopable instanceof Traversable)) {             throw new InvalidArgumentException("\$loppable must be either an array or a traversable object");         }          $bulkSize = $bulkSize ?? static::$defaultBulkSize;          $this->getConnection()->getConfiguration()->setSQLLogger(null); // Avoid MonoLog memory overload          if ($transactionnal) {             $this->getConnection()->beginTransaction();         }          $this->resetQueryPart('values');          foreach ($loopable as $key => $values) {             try {                 $callbackedValues = $callable($key, $values);                  if (sizeof($callbackedValues) > 0) {                     foreach ($callbackedValues as $callbackedValuesRow) {                         $this->addValues($callbackedValuesRow);                     }                 }             } catch (Exception $e) {                 /*                  * If a callback exception must break the transaction, then throw the exception to the call stack                  * Else, skip the row insertion                  */                 if ($this->callbackFailureStrategy == static::CALLBACK_FAILURE_BREAK) {                     throw $e;                 } else {                     continue;                 }             }              $count = count($this->getQueryPart('values'));              if ($count >= $bulkSize) {                 $this->execute();                 $this->resetQueryPart('values');             }         }          $count = count($this->getQueryPart('values'));          if ($count > 0) {             $this->execute();         }          $this->resetQueryPart('values');          if ($transactionnal) {             $this->getConnection()->commit();         }     }      /**      * @return boolean      */     public function isIgnore()     {         return $this->ignore;     }      /**      * @param boolean $ignore      */     public function setIgnore(bool $ignore)     {         $this->ignore = $ignore;     }      /**      * @return null|string      */     public function getOnDuplicate() : string     {         return $this->onDuplicate;     }      /**      * @param null $onDuplicate      */     public function setOnDuplicate($onDuplicate)     {         $this->onDuplicate = $onDuplicate;         $this->ignore = false;     }   } 

Use case :

    try {         $i = new Insert($this->getDoctrine()->getConnection('myDB'));         $i->insert('myTable');         $i->setOnDuplicate('col1 = VALUES(col1), updated_last = NOW()');         $i->setCallbackFailureStrategy(Insert::CALLBACK_FAILURE_BREAK);         $i->bulk($myArrayOfRows, function ($key, $row) {              // Some pre-insert processing              $rowset[] = $row;              return $rowset;          }, 500, true);          $this->addFlash('success', 'Yay !');      } catch (DBALException $e) {         $this->addFlash('error', 'Damn, error : ' . $e->getMessage());     } 
If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment