One of the blogs I like to keep an eye on is Kris Wallsmith his personal blog. He is a Symfony2 contributor and also author of Assetic and Buzz. Last year he wrote about a new experimental project called Spork: a wrapper around pcntl_fork
to abstract away the complexities with spawning child processes with php. The article was very interesting, although I didn’t had any valid use case to try the library out. That was, until today.
It happens to be we were preparing a rather large data migration for a application with approximately 17,000 users. The legacy application stored the passwords in a unsafe way – plaintext – so we had to encrypt ’em al during the migration. Our weapon of choice was bcrypt
, and using the BlowfishPasswordEncoderBundle implementing was made easy. Using bcrypt
did introduce a new problem: encoding all these records would take a lot of time! That’s where Spork comes in!
Setting up the Symfony2 migration Command
If possible I wanted to fork between 8 and 15 processes to gain maximum speed. We’ll run the command on a VPS with 8 virtual cores so I want to stress the machine as much as possible ;). Unfortunately the example on GitHub as well on his blog didn’t function any more so I had to dig in just a little bit. I came up with this to get the forking working:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
<?php namespace Netvlies\AcmeMigrationBundle\Command; use Spork\Batch\Strategy\ChunkStrategy; use Spork\EventDispatcher\EventDispatcher; use Spork\ProcessManager; use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; class GeneratePasswordCommand extends ContainerAwareCommand { protected function configure() { $this ->setName('generate:password') ; } protected function execute(InputInterface $input, OutputInterface $output) { $container = $this->getContainer(); $em = $container->get('doctrine.orm.entity_manager'); $callback = function($result) use($output) { $result = current($result); $output->writeln('Greeting from '. getmypid() .' with id '.$result['id']); }; // Select all the passwords that should be encoded $query = $em->createQuery( "SELECT u.id, u.password, u.salt FROM AcmeUserBundle:User u WHERE u.isPasswordEncoded != 1 OR u.isPasswordEncoded IS NULL ORDER BY u.id" )->setMaxResults(10); $iterator = $query->iterate(); $manager = new ProcessManager(new EventDispatcher(), true); $strategy = new ChunkStrategy(); $manager->process($iterator, $callback, $strategy); } } |
The command generates the following output:
1 2 3 4 5 6 7 8 9 10 11 |
$ app/console wb:generate:password Greeting from 3743 with id 1 Greeting from 3743 with id 2 Greeting from 3743 with id 3 Greeting from 3744 with id 4 Greeting from 3744 with id 5 Greeting from 3744 with id 6 Greeting from 3745 with id 7 Greeting from 3745 with id 8 Greeting from 3745 with id 9 Greeting from 3746 with id 10 |
Make it a little bit more dynamic
To be really useful I’ve added some parameters so we can control the behavior a little more. As I mentioned before I wanted to control the amount forks so I added a option to control this. This value needs to be passed on to the constructor of the ChunkStrategy
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
<?php namespace Netvlies\AcmeMigrationBundle\Command; ... class GeneratePasswordCommand extends ContainerAwareCommand { protected function configure() { $this ->setName('generate:password') ->addOption('forks', 'f', InputOption::VALUE_REQUIRED, 'How many childs to be spawned', 4) ; } protected function execute(InputInterface $input, OutputInterface $output) { $forks = (int) $input->getOption('forks'); ... $manager = new ProcessManager(new EventDispatcher(), true); $strategy = new ChunkStrategy($forks); $manager->process($iterator, $callback, $strategy); } } |
I also added a max parameter so we can run some tests on a small set of users, instead of the whole database. When set I pass it on to the setMaxResults
method of the $query
object.
Storing the results in MySQL: Beware!
In Symfony2 projects storing and reading data from the database is pretty straight forward using Doctrine2. However when you start forking your PHP process keep in mind the following:
- all the forks share the same database connection;
- when the first fork exits, it will also close the database connection;
- database operations in running forks will yield:
General error: 2006 MySQL server has gone away
This is a known problem. In order to fix this problem I create and close a new connection in each fork:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
<?php use Doctrine\DBAL\DriverManager; class GeneratePasswordCommand extends ContainerAwareCommand { protected function configure() { $this ->setName('generate:password') ->addOption('forks', 'f', InputOption::VALUE_REQUIRED, 'How many childs to be spawned', 4) ->addOption('max', 'm', InputOption::VALUE_OPTIONAL, 'Maximum records to process') ; } protected function execute(InputInterface $input, OutputInterface $output) { $forks = (int) $input->getOption('forks'); $container = $this->getContainer(); $em = $container->get('doctrine.orm.entity_manager'); $encoder = $container->get('security.encoder.blowfish'); $params = array( 'dbname' => $container->getParameter('database_name'), 'user' => $container->getParameter('database_user'), 'password' => $container->getParameter('database_password'), 'host' => $container->getParameter('database_host'), 'driver' => $container->getParameter('database_driver'), ); $callback = function($result) use ($encoder, $output, $params) { // Create connection $conn = DriverManager::getConnection($params); $conn->connect(); $result = current($result); $encodedPassword = $encoder->encodePassword($result['password'], $result['salt']); // Store result $stmt = $conn->prepare("UPDATE User u SET u.password = ?, u.isPasswordEncoded = 1 WHERE u.id = ?"); $stmt->bindValue(1, $encodedPassword); $stmt->bindValue(2, $result['id']); $stmt->execute(); // Close connection $conn->close(); $output->writeln(sprintf('Encoded password for id: %d is %s', $result['id'], $encodedPassword)); }; $query = $em->createQuery( "SELECT u.id, u.password, u.salt FROM AcmeUserBundle:User u WHERE u.isPasswordEncoded != 1 OR u.isPasswordEncoded IS NULL ORDER BY u.id" ); if ($max = $input->getOption('max')) { $query->setMaxResults((int) $max); } $iterator = $query->iterate(); $manager = new ProcessManager(new EventDispatcher(), true); $strategy = new ChunkStrategy($forks); $manager->process($iterator, $callback, $strategy); } } |
That’s basically it. Running this command on a VPS comparable with c1.xlarge Amazone EC2 server did speed up things a lot. So if you’re also working on a import job like this which can be split up in separate tasks you know what to do… Give Spork a try! It’s really easy, I promise.
UPDATE 2013-03-19
As stated in the comments by Kris, you should close the connection just before forking. Example of how to do this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
<?php namespace Netvlies\AcmeMigrationBundle\Command; ... use Spork\EventDispatcher\Events; class GeneratePasswordCommand extends ContainerAwareCommand { ... protected function execute(InputInterface $input, OutputInterface $output) { ... // Get connection in parent $parentConnection = $em->getConnection(); $dispatcher = new EventDispatcher(); $dispatcher->addListener(Events::PRE_FORK, function() use ($parentConnection) { $parentConnection->close(); }); $manager = new ProcessManager($dispatcher, true); $strategy = new ChunkStrategy($forks); $manager->process($iterator, $callback, $strategy); } } |