Say I have a directory foo, with some number of subdirectories. Each of these subdirectories has between 0 and 5 files of variable length which I would like to process. My initial code looks like so:
pool.query(` SET SEARCH_PATH TO public,os_local; `).then(() => fs.readdirSync(srcpath) .filter(file => fs.lstatSync(path.join(srcpath, file)).isDirectory()) .map(dir => { fs.access(`${srcpath + dir}/${dir}_Building.shp`, fs.constants.R_OK, (err) => { if (!err) { openShapeFile(`${srcpath + dir}/${dir}_Building.shp`).then((source) => source.read() .then(function dbWrite (result) { if (result.done) { console.log(`done ${dir}`) } else { const query = `INSERT INTO os_local.buildings(geometry, id, featcode, version) VALUES(os_local.ST_GeomFromGeoJSON($1), $2, $3, $4) ON CONFLICT (id) DO UPDATE SET featcode=$3, geometry=os_local.ST_GeomFromGeoJSON($1), version=$4;` return pool.connect().then(client => { client.query(query, [geoJson.split('"[[').join('[[').split(']]"').join(']]'), result.value.properties.ID, result.value.properties.FEATCODE, version ]).then((result) => { return source.read().then(dbWrite) }).catch((err) => { console.log(err, query, geoJson.split('"[[').join('[[').split(']]"').join(']]'), result.value.properties.ID, result.value.properties.FEATCODE, version ) return source.read().then(dbWrite) }) client.release() }) } })).catch(err => console.log('No Buildings', err)) } }) fs.access(`${srcpath + dir}/${dir}__ImportantBuilding.shp`, fs.constants.R_OK, (err) => { //read file one line at a time //spin up connection in pg.pool, insert data }) fs.access(`${srcpath + dir}/${dir}_Road.shp`, fs.constants.R_OK, (err) => { //read file one line at a time //spin up connection in pg.pool, insert data }) fs.access(`${srcpath + dir}/${dir}_Glasshouse.shp`, fs.constants.R_OK, (err) => { //read file one line at a time //spin up connection in pg.pool, insert data }) fs.access(`${srcpath + dir}/${dir}_RailwayStation.shp`, fs.constants.R_OK, (err) => { //read file one line at a time //spin up connection in pg.pool, insert data }) })
This mostly works, but it ends up having to wait for the longest file to be fully processed in every subdirectory, resulting in practice in there always being only 1 connection to the database.
Is there a way I could rearchitect this to make better use of my computational resources, while limiting the number of active postgres connections and forcing code to wait until connections become available? (I set them to 20 in the pg poolConfig for node-postgres)
2 Answers
Answers 1
If you need to have your files processed in turn for a certain amount of time, then you can use Streams, timers(for scheduling) and process.nextTick(). There is great manual for understanding streams in nodejs.
Answers 2
Here is an example of getting directory contents using generators. You can start getting the first couple files right away and then use asynchronous code afterward to process files in parallel.
// Dependencies const fs = require('fs'); const path = require('path'); // The generator function (note the asterisk) function* getFilesInDirectory(fullPath, recursive = false) { // Convert file names to full paths let contents = fs.readdirSync(fullPath).map(file => { return path.join(fullPath, file); }); for(let i = 0; i < contents.length; i++) { const childPath = contents[i]; let stats = fs.statSync(childPath); if (stats.isFile()) { yield childPath; } else if (stats.isDirectory() && recursive) { yield* getFilesInDirectory(childPath, true); } } }
Usage:
function handleResults(results) { ... // Returns a promise } function processFile(file) { ... // Returns a promise } var files = getFilesInDirectory(__dirname, true); var result = files.next(); var promises = []; while(!result.done) { console.log(result.value); file = files.next(); // Process files in parallel var promise = processFile(file).then(handleResults); promises.push(promise); } promise.all(promises).then() { console.log(done); }
0 comments:
Post a Comment