One of the blogs I like to keep an eye on is Kris Wallsmith his personal blog. He is a Symfony2 contributor and also author of Assetic and Buzz. Last year he wrote about a new experimental project called Spork: a wrapper around pcntl_fork to abstract away the complexities with spawning child processes with php. The article was very interesting, although I didn’t had any valid use case to try the library out. That was, until today.
It happens to be we were preparing a rather large data migration for a application with approximately 17,000 users. The legacy application stored the passwords in a unsafe way – plaintext – so we had to encrypt ’em al during the migration. Our weapon of choice was bcrypt, and using the BlowfishPasswordEncoderBundle implementing was made easy. Using bcrypt did introduce a new problem: encoding all these records would take a lot of time! That’s where Spork comes in!
Setting up the Symfony2 migration Command
If possible I wanted to fork between 8 and 15 processes to gain maximum speed. We’ll run the command on a VPS with 8 virtual cores so I want to stress the machine as much as possible ;). Unfortunately the example on GitHub as well on his blog didn’t function any more so I had to dig in just a little bit. I came up with this to get the forking working:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42  | 
						<?php namespace Netvlies\AcmeMigrationBundle\Command; use Spork\Batch\Strategy\ChunkStrategy; use Spork\EventDispatcher\EventDispatcher; use Spork\ProcessManager; use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; class GeneratePasswordCommand extends ContainerAwareCommand {     protected function configure()     {         $this             ->setName('generate:password')         ;     }     protected function execute(InputInterface $input, OutputInterface $output)     {         $container = $this->getContainer();         $em = $container->get('doctrine.orm.entity_manager');         $callback = function($result) use($output) {             $result = current($result);             $output->writeln('Greeting from '. getmypid() .' with id '.$result['id']);         };         // Select all the passwords that should be encoded         $query = $em->createQuery(             "SELECT u.id, u.password, u.salt FROM AcmeUserBundle:User u WHERE u.isPasswordEncoded != 1 OR u.isPasswordEncoded IS NULL ORDER BY u.id"         )->setMaxResults(10);         $iterator = $query->iterate();         $manager = new ProcessManager(new EventDispatcher(), true);         $strategy = new ChunkStrategy();         $manager->process($iterator, $callback, $strategy);     } }  | 
					
The command generates the following output:
| 
					 1 2 3 4 5 6 7 8 9 10 11  | 
						$ app/console wb:generate:password Greeting from 3743 with id 1 Greeting from 3743 with id 2 Greeting from 3743 with id 3 Greeting from 3744 with id 4 Greeting from 3744 with id 5 Greeting from 3744 with id 6 Greeting from 3745 with id 7 Greeting from 3745 with id 8 Greeting from 3745 with id 9 Greeting from 3746 with id 10  | 
					
Make it a little bit more dynamic
To be really useful I’ve added some parameters so we can control the behavior a little more. As I mentioned before I wanted to control the amount forks so I added a option to control this. This value needs to be passed on to the constructor of the ChunkStrategy:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27  | 
						<?php namespace Netvlies\AcmeMigrationBundle\Command; ... class GeneratePasswordCommand extends ContainerAwareCommand {     protected function configure()     {         $this             ->setName('generate:password')             ->addOption('forks', 'f', InputOption::VALUE_REQUIRED, 'How many childs to be spawned', 4)         ;     }     protected function execute(InputInterface $input, OutputInterface $output)     {         $forks = (int) $input->getOption('forks');         ...         $manager = new ProcessManager(new EventDispatcher(), true);         $strategy = new ChunkStrategy($forks);         $manager->process($iterator, $callback, $strategy);     } }  | 
					
I also added a max parameter so we can run some tests on a small set of users, instead of the whole database. When set I pass it on to the setMaxResults method of the $query object.
Storing the results in MySQL: Beware!
In Symfony2 projects storing and reading data from the database is pretty straight forward using Doctrine2. However when you start forking your PHP process keep in mind the following:
- all the forks share the same database connection;
 - when the first fork exits, it will also close the database connection;
 - database operations in running forks will yield: 
General error: 2006 MySQL server has gone away 
This is a known problem. In order to fix this problem I create and close a new connection in each fork:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58  | 
						<?php use Doctrine\DBAL\DriverManager; class GeneratePasswordCommand extends ContainerAwareCommand {     protected function configure()     {         $this             ->setName('generate:password')             ->addOption('forks', 'f', InputOption::VALUE_REQUIRED, 'How many childs to be spawned', 4)             ->addOption('max', 'm', InputOption::VALUE_OPTIONAL, 'Maximum records to process')         ;     }     protected function execute(InputInterface $input, OutputInterface $output)     {         $forks = (int) $input->getOption('forks');         $container = $this->getContainer();         $em = $container->get('doctrine.orm.entity_manager');         $encoder = $container->get('security.encoder.blowfish');         $params = array(             'dbname'    => $container->getParameter('database_name'),             'user'      => $container->getParameter('database_user'),             'password'  => $container->getParameter('database_password'),             'host'      => $container->getParameter('database_host'),             'driver'    => $container->getParameter('database_driver'),         );         $callback = function($result) use ($encoder, $output, $params) {             // Create connection             $conn = DriverManager::getConnection($params);             $conn->connect();             $result = current($result);             $encodedPassword = $encoder->encodePassword($result['password'], $result['salt']);             // Store result             $stmt = $conn->prepare("UPDATE User u SET u.password = ?, u.isPasswordEncoded = 1 WHERE u.id = ?");             $stmt->bindValue(1, $encodedPassword);             $stmt->bindValue(2, $result['id']);             $stmt->execute();             // Close connection             $conn->close();             $output->writeln(sprintf('Encoded password for id: %d is %s', $result['id'], $encodedPassword));         };         $query = $em->createQuery(             "SELECT u.id, u.password, u.salt FROM AcmeUserBundle:User u WHERE u.isPasswordEncoded != 1 OR u.isPasswordEncoded IS NULL ORDER BY u.id"         );         if ($max = $input->getOption('max')) {             $query->setMaxResults((int) $max);         }         $iterator = $query->iterate();         $manager = new ProcessManager(new EventDispatcher(), true);         $strategy = new ChunkStrategy($forks);         $manager->process($iterator, $callback, $strategy);     } }  | 
					
That’s basically it. Running this command on a VPS comparable with c1.xlarge Amazone EC2 server did speed up things a lot. So if you’re also working on a import job like this which can be split up in separate tasks you know what to do… Give Spork a try! It’s really easy, I promise.
UPDATE 2013-03-19
As stated in the comments by Kris, you should close the connection just before forking. Example of how to do this:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25  | 
						<?php namespace Netvlies\AcmeMigrationBundle\Command; ... use Spork\EventDispatcher\Events; class GeneratePasswordCommand extends ContainerAwareCommand {     ...     protected function execute(InputInterface $input, OutputInterface $output)     {         ...         // Get connection in parent         $parentConnection = $em->getConnection();         $dispatcher = new EventDispatcher();         $dispatcher->addListener(Events::PRE_FORK, function() use ($parentConnection) {             $parentConnection->close();         });         $manager = new ProcessManager($dispatcher, true);         $strategy = new ChunkStrategy($forks);         $manager->process($iterator, $callback, $strategy);     } }  |