前言

Node.js 是一个基于 Chrome V8 引擎的 JavaScript 运行环境,使用了一个事件驱动、非阻塞式 I/O 的模型,使其轻量又高效。nodejs是基于异步的写法,有时一个函数需要上一个函数的返回值做参数,对于一些复杂的异步流程操作,就会使得代码变得支离破碎,难以看懂或维护。

怎样才能写出优雅的异步流程代码呢,市面上已经有很多的解决方案,例如Promise以及co等,今天我们一起来学习一下async.js这款框架。

Async.js介绍

Async是一个提供了直接高效操作异步Js的工具模块,可以同时运行在Nodejs和浏览器端,它提供了将近70多个函数,包括常见的mapreducefiltereach等。所有的函数都需要你遵守Nodejs的约定也就是异步方法的最后一个参数返回一个函数,函数的第一个参数是Error,并且只执行一次。

Async.js安装

使用NPM进行安装:

$ npm install --save async

使用bower进行安装:

$ bower install async

然后就能使用了:

var async = require("async");

或者使用其中的一个独立模块:

var waterfall = require("async/waterfall");
var map = require("async/map");

在浏览器中,引入即可:

<script type="text/javascript" src="async.js"></script>

如果想要安装ES2015版本的话:

$ npm install --save async-es

使用:

import waterfall from 'async-es/waterfall';
import async from 'async-es';

Async.js集合类操作方法

concat(coll, iteratee, callback(err)opt)

并行执行迭代器,返回处理结果的合并后的集合,但是不能保证集合顺序。

async.concat(['dir1','dir2','dir3'], fs.readdir, function(err, files) {
    // files为数组,数组中元素为三个文件夹下面的所有文件名集合
});

实例:

async.concat(['dir1','dir2','dir3'], fs.readdir, function(err, files) {
    // 返回一个数组,该数组的元素为文件夹中所有的文件名的集合
});

concatLimit(coll, limit, iteratee, callbackopt)

同concat,只是限制了并行运行的数量

concatSeries(coll, iteratee, callback(err)opt)

concat的串行版本

detect(coll, iteratee, callbackopt)

并行执行迭代器,返回第一个(不能保证顺序)执行为真的值。

实例:

async.detect(['file1','file2','file3'], function(filePath, callback) {
    fs.access(filePath, function(err) {
        callback(null, !err)
    });
}, function(err, result) {
    // result等于第一个存在着的文件的文件名
});

detectLimit(coll, limit, iteratee, callbackopt)

detect,限制了并行运行的数量

detectSeries(coll, iteratee, callbackopt)

detect串行版本

each(coll, iteratee, callbackopt)

并行地给每个元素执行回调函数

实例:

async.each(openFiles, saveFile, function(err){
  // 如果其中一个运行出错,err会等于那个错误
});

eachLimit(coll, limit, iteratee, callbackopt)

each但是限制了并行运行的数量

eachSeries(coll, iteratee, callbackopt)

each的串行版本

eachOf(coll, iteratee, callbackopt)

each,但是比之多了一个序列

实例:

var obj = {dev: "/dev.json", test: "/test.json", prod: "/prod.json"};
var configs = {};
async.forEachOf(obj, function (value, key, callback) {
    fs.readFile(__dirname + value, "utf8", function (err, data) {
        if (err) return callback(err);
        try {
            configs[key] = JSON.parse(data);
        } catch (e) {
            return callback(e);
        }
        callback();
    });
}, function (err) {
    if (err) console.error(err.message);
    // configs is now a map of JSON data
    doSomethingWith(configs);
});

eachOfLimit(coll, limit, iteratee, callbackopt)

eachof但是限制了并发数

eachOfSeries(coll, iteratee, callbackopt)

eachof的串行版本

every(coll, iteratee, callbackopt)

所有的测试都满足才返回true,只要一个不满足就返回false

实例:

async.every(['file1','file2','file3'], function(filePath, callback) {
    fs.access(filePath, function(err) {
        callback(null, !err)
    });
}, function(err, result) {
    // if result is true then every file exists
});

everyLimit(coll, limit, iteratee, callbackopt)

every的限制并发数版

everySeries(coll, iteratee, callbackopt)

every的串行版

filter(coll, iteratee, callbackopt)

并行地选出符合条件的新数组,顺序不变。

实例:

async.filter(['file1','file2','file3'], function(filePath, callback) {
    fs.access(filePath, function(err) {
        callback(null, !err)
    });
}, function(err, results) {
    // results now equals an array of the existing files
});

filterLimit(coll, limit, iteratee, callbackopt)

filter限制并行数版

filterSeries(coll, iteratee, callbackopt)

filter串行版

groupBy(coll, iteratee, callbackopt)

使用coll中某一个key进行分组,返回一个新的对象,key是用来分组的对应的值,value是一个数组,是刚才coll中符合要求的值集合,并行执行,不能保证顺序

实例:

async.groupBy(['userId1', 'userId2', 'userId3'], function(userId, callback) {
    db.findById(userId, function(err, user) {
        if (err) return callback(err);
        return callback(null, user.age);
    });
}, function(err, result) {
    // result is object containing the userIds grouped by age
    // e.g. { 30: ['userId1', 'userId3'], 42: ['userId2']};
});

groupByLimit(coll, limit, iteratee, callbackopt)

group限制并发数版

groupBySeries(coll, limit, iteratee, callbackopt)

group串行版

map(coll, iteratee, callbackopt)

并行地进行映射成新的集合,虽然不能保证完成的顺序,但是能保证结果的顺序不变

实例:

async.map(['file1','file2','file3'], fs.stat, function(err, results) {
    // results is now an array of stats for each file
});

mapLimit(coll, limit, iteratee, callbackopt)

map限制并发数版

mapSeries(coll, iteratee, callbackopt)

map串行版

mapValues(obj, iteratee, callbackopt)

为对象设计的map

实例:

async.mapValues({
    f1: 'file1',
    f2: 'file2',
    f3: 'file3'
}, function (file, key, callback) {
  fs.stat(file, callback);
}, function(err, result) {
    // result is now a map of stats for each file, e.g.
    // {
    //     f1: [stats for file1],
    //     f2: [stats for file2],
    //     f3: [stats for file3]
    // }
});

mapValuesLimit(obj, limit, iteratee, callbackopt)

map限制并发数版

mapValuesSeries(obj, iteratee, callbackopt)

map串行版

reduce(coll, memo, iteratee, callbackopt)

串行递归计算出值

实例:

async.reduce([1,2,3], 0, function(memo, item, callback) {
    // 无意义地异步
    process.nextTick(function() {
        callback(null, memo + item)
    });
}, function(err, result) {
    // result is now equal to the last value of memo, which is 6
});

reduceRight(array, memo, iteratee, callbackopt)

从右边开始进行递归

reject(coll, iteratee, callbackopt)

filter相反,移除符合条件的值

实例:

async.reject(['file1','file2','file3'], function(filePath, callback) {
    fs.access(filePath, function(err) {
        callback(null, !err)
    });
}, function(err, results) {
    // results now equals an array of missing files
    createFiles(results);
});

rejectLimit(coll, limit, iteratee, callbackopt)

reject限制并发数版

rejectSeries(coll, iteratee, callbackopt)

reject串行版

some(coll, iteratee, callbackopt)

只要有一个符合条件就返回true

实例:

async.some(['file1','file2','file3'], function(filePath, callback) {
    fs.access(filePath, function(err) {
        callback(null, !err)
    });
}, function(err, result) {
    // if result is true then at least one of the files exists
});

someLimit(coll, limit, iteratee, callbackopt)

some的限制并发数版

someSeries(coll, iteratee, callbackopt)

some的串行版

sortBy(coll, iteratee, callback)

对集合进行排序

实例:

//升序
async.sortBy([1,9,3,5], function(x, callback) {
    callback(null, x);
}, function(err,result) {
    // result callback
});
// 降序
async.sortBy([1,9,3,5], function(x, callback) {
    callback(null, x*-1);    //<- x*-1 instead of x, turns the order around
}, function(err,result) {
    // result callback
});

transform(coll, accumulatoropt, iteratee, callbackopt)

不断改变叠加器,最后输出

实例:

async.transform([1,2,3], function(acc, item, index, callback) {
    // pointless async:
    process.nextTick(function() {
        acc.push(item * 2)
        callback(null)
    });
}, function(err, result) {
    // result is now equal to [2, 4, 6]
});

Async.js流程控制方法

applyEach(fns, …argsopt, callbackopt)

为数组中的每个函数应用提供的参数,如果只提供了函数数组,将会返回一个函数允许你继续传入参数。

实例:

async.applyEach([enableSearch, updateSchema], 'bucket', callback);

applyEachSeries(fns, …argsopt, callbackopt)

applyEach串行版本

auto(tasks, concurrencyopt, callbackopt)

根据依赖,决定任务的执行先后顺序

实例:

async.auto({
    // this function will just be passed a callback
    readData: async.apply(fs.readFile, 'data.txt', 'utf-8'),
    showData: ['readData', function(results, cb) {
        // results.readData is the file's contents
        // ...
    }]
}, callback);
async.auto({
    get_data: function(callback) {
        console.log('in get_data');
        // async code to get some data
        callback(null, 'data', 'converted to array');
    },
    make_folder: function(callback) {
        console.log('in make_folder');
        // async code to create a directory to store a file in
        // this is run at the same time as getting the data
        callback(null, 'folder');
    },
    write_file: ['get_data', 'make_folder', function(results, callback) {
        console.log('in write_file', JSON.stringify(results));
        // once there is some data and the directory exists,
        // write the data to a file in the directory
        callback(null, 'filename');
    }],
    email_link: ['write_file', function(results, callback) {
        console.log('in email_link', JSON.stringify(results));
        // once the file is written let's email a link to it...
        // results.write_file contains the filename returned by write_file.
        callback(null, {'file':results.write_file, 'email':'user@example.com'});
    }]
}, function(err, results) {
    console.log('err = ', err);
    console.log('results = ', results);
});

autoInject(tasks, callbackopt)

auto的依赖注入版本

实例:

//  The example from `auto` can be rewritten as follows:
async.autoInject({
    get_data: function(callback) {
        // async code to get some data
        callback(null, 'data', 'converted to array');
    },
    make_folder: function(callback) {
        // async code to create a directory to store a file in
        // this is run at the same time as getting the data
        callback(null, 'folder');
    },
    write_file: function(get_data, make_folder, callback) {
        // once there is some data and the directory exists,
        // write the data to a file in the directory
        callback(null, 'filename');
    },
    email_link: function(write_file, callback) {
        // once the file is written let's email a link to it...
        // write_file contains the filename returned by write_file.
        callback(null, {'file':write_file, 'email':'user@example.com'});
    }
}, function(err, results) {
    console.log('err = ', err);
    console.log('email_link = ', results.email_link);
});
// If you are using a JS minifier that mangles parameter names, `autoInject`
// will not work with plain functions, since the parameter names will be
// collapsed to a single letter identifier.  To work around this, you can
// explicitly specify the names of the parameters your task function needs
// in an array, similar to Angular.js dependency injection.
// This still has an advantage over plain `auto`, since the results a task
// depends on are still spread into arguments.
async.autoInject({
    //...
    write_file: ['get_data', 'make_folder', function(get_data, make_folder, callback) {
        callback(null, 'filename');
    }],
    email_link: ['write_file', function(write_file, callback) {
        callback(null, {'file':write_file, 'email':'user@example.com'});
    }]
    //...
}, function(err, results) {
    console.log('err = ', err);
    console.log('email_link = ', results.email_link);
});

cargo(worker, payloadopt)

使用负载创建一个负载对象,添加到负载的任务会被同时执行

实例:

// create a cargo object with payload 2
var cargo = async.cargo(function(tasks, callback) {
    for (var i=0; i<tasks.length; i++) {
        console.log('hello ' + tasks[i].name);
    }
    callback();
}, 2);
// add some items
cargo.push({name: 'foo'}, function(err) {
    console.log('finished processing foo');
});
cargo.push({name: 'bar'}, function(err) {
    console.log('finished processing bar');
});
cargo.push({name: 'baz'}, function(err) {
    console.log('finished processing baz');
});

compose(…functions)

使用多个函数组成一个函数,每个函数会把下一个函数的返回值当做参数传入

实例:

function add1(n, callback) {
    setTimeout(function () {
        callback(null, n + 1);
    }, 10);
}
function mul3(n, callback) {
    setTimeout(function () {
        callback(null, n * 3);
    }, 10);
}
var add1mul3 = async.compose(mul3, add1);
add1mul3(4, function (err, result) {
    // result now equals 15
});

doDuring(fn, test, callbackopt)

先执行再进行检查的during版本,调换了fntest的顺序

doUntil(iteratee, test, callbackopt)

until的先执行在检查版本,调换了fntest的顺序

doWhilst(iteratee, test, callbackopt)

whilst的先执行后检查版本,,调换了fntest的顺序

during(test, fn, callbackopt)

类似whilst,除了测试函数是一个异步函数通过function (err, truth)的形式

实例:

var count = 0;
async.during(
    function (callback) {
        return callback(null, count < 5);
    },
    function (callback) {
        count++;
        setTimeout(callback, 1000);
    },
    function (err) {
        // 5秒后才执行
    }
);

forever(fn, errbackopt)

无限执行

parallel(tasks, callbackopt)

并行执行

实例:

async.parallel([
    function(callback) {
        setTimeout(function() {
            callback(null, 'one');
        }, 200);
    },
    function(callback) {
        setTimeout(function() {
            callback(null, 'two');
        }, 100);
    }
],
// optional callback
function(err, results) {
    // the results array will equal ['one','two'] even though
    // the second function had a shorter timeout.
});
// an example using an object instead of an array
async.parallel({
    one: function(callback) {
        setTimeout(function() {
            callback(null, 1);
        }, 200);
    },
    two: function(callback) {
        setTimeout(function() {
            callback(null, 2);
        }, 100);
    }
}, function(err, results) {
    // results is now equals to: {one: 1, two: 2}
});

parallelLimit(tasks, limit, callbackopt)

限制并发数的parallel版本

priorityQueue(worker, concurrency)

类似queue,不过会有一个权重机制,任务按照权重的升序顺序执行,与queue不同地方在于:
push(task, priority, [callback])多了一个priority,并且没有unshift方法

queue(worker, concurrencyopt)

使用并发数创建一个队列对象,添加的任务会并行执行,只有上一个任务完成了才会执行下一个任务

实例:

// 创建并发数为2的队列
var q = async.queue(function(task, callback) {
    console.log('hello ' + task.name);
    callback();
}, 2);
// 创建完成回调
q.drain = function() {
    console.log('all items have been processed');
};
// 添加任务
q.push({name: 'foo'}, function(err) {
    console.log('finished processing foo');
});
q.push({name: 'bar'}, function (err) {
    console.log('finished processing bar');
});
// 添加多个任务
q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}], function(err) {
    console.log('finished processing item');
});
// 在头部添加任务
q.unshift({name: 'bar'}, function (err) {
    console.log('finished processing bar');
});

race(tasks, callback)

并行执行,只要有一个执行完成就算完成

实例:

async.race([
    function(callback) {
        setTimeout(function() {
            callback(null, 'one');
        }, 200);
    },
    function(callback) {
        setTimeout(function() {
            callback(null, 'two');
        }, 100);
    }
],
// main callback
function(err, result) {
    // the result will be equal to 'two' as it finishes earlier
});

retry(optsopt, task, callbackopt)

尝试执行任务不超过规定次数

实例:

// The `retry` function can be used as a stand-alone control flow by passing
// a callback, as shown below:

// try calling apiMethod 3 times
async.retry(3, apiMethod, function(err, result) {
    // do something with the result
});

// try calling apiMethod 3 times, waiting 200 ms between each retry
async.retry({times: 3, interval: 200}, apiMethod, function(err, result) {
    // do something with the result
});

// try calling apiMethod 10 times with exponential backoff
// (i.e. intervals of 100, 200, 400, 800, 1600, ... milliseconds)
async.retry({
  times: 10,
  interval: function(retryCount) {
    return 50 * Math.pow(2, retryCount);
  }
}, apiMethod, function(err, result) {
    // do something with the result
});

// try calling apiMethod the default 5 times no delay between each retry
async.retry(apiMethod, function(err, result) {
    // do something with the result
});

// try calling apiMethod only when error condition satisfies, all other
// errors will abort the retry control flow and return to final callback
async.retry({
  errorFilter: function(err) {
    return err.message === 'Temporary error'; // only retry on a specific error
  }
}, apiMethod, function(err, result) {
    // do something with the result
});

// to retry individual methods that are not as reliable within other
// control flow functions, use the `retryable` wrapper:
async.auto({
    users: api.getUsers.bind(api),
    payments: async.retryable(3, api.getPayments.bind(api))
}, function(err, results) {
    // do something with the results
});

retryable(optsopt, task)

使任务可被尝试多次执行的

实例:

async.auto({
    dep1: async.retryable(3, getFromFlakyService),
    process: ["dep1", async.retryable(3, function (results, cb) {
        maybeProcessData(results.dep1, cb);
    })]
}, callback);

seq(…functions)

compose的倒过来版,上一个函数的返回值是下一个函数的参数。

实例:

// Requires lodash (or underscore), express3 and dresende's orm2.
// Part of an app, that fetches cats of the logged user.
// This example uses `seq` function to avoid overnesting and error
// handling clutter.
app.get('/cats', function(request, response) {
    var User = request.models.User;
    async.seq(
        _.bind(User.get, User),  // 'User.get' has signature (id, callback(err, data))
        function(user, fn) {
            user.getCats(fn);      // 'getCats' has signature (callback(err, data))
        }
    )(req.session.user_id, function (err, cats) {
        if (err) {
            console.error(err);
            response.json({ status: 'error', message: err.message });
        } else {
            response.json({ status: 'ok', message: 'Cats found', data: cats });
        }
    });
});

series(tasks, callbackopt)

串行执行函数

实例:

async.series([
    function(callback) {
        // do some stuff ...
        callback(null, 'one');
    },
    function(callback) {
        // do some more stuff ...
        callback(null, 'two');
    }
],
// optional callback
function(err, results) {
    // results is now equal to ['one', 'two']
});
async.series({
    one: function(callback) {
        setTimeout(function() {
            callback(null, 1);
        }, 200);
    },
    two: function(callback){
        setTimeout(function() {
            callback(null, 2);
        }, 100);
    }
}, function(err, results) {
    // results is now equal to: {one: 1, two: 2}
});

times(n, iteratee, callback)

并行执行指定次数

实例:

// Pretend this is some complicated async factory
var createUser = function(id, callback) {
    callback(null, {
        id: 'user' + id
    });
};
// generate 5 users
async.times(5, function(n, next) {
    createUser(n, function(err, user) {
        next(err, user);
    });
}, function(err, users) {
    // we should now have 5 users
});

timesLimit(count, limit, iteratee, callback)

times的限制并发数版

timesSeries(n, iteratee, callback)

times的串行版

tryEach(tasks, callbackopt)

串行执行,只要有一个成功则停止

实例:

async.tryEach([
    function getDataFromFirstWebsite(callback) {
        // Try getting the data from the first website
        callback(err, data);
    },
    function getDataFromSecondWebsite(callback) {
        // First website failed,
        // Try getting the data from the backup website
        callback(err, data);
    }
],
// optional callback
function(err, results) {
    Now do something with the data.
});

until(test, iteratee, callbackopt)

直到检查函数返回为真才停止执行iteratee

waterfall(tasks, callbackopt)

串行执行,将执行结果传递给接下来的函数

实例:

async.waterfall([
    function(callback) {
        callback(null, 'one', 'two');
    },
    function(arg1, arg2, callback) {
        // arg1 now equals 'one' and arg2 now equals 'two'
        callback(null, 'three');
    },
    function(arg1, callback) {
        // arg1 now equals 'three'
        callback(null, 'done');
    }
], function (err, result) {
    // result now equals 'done'
});
// Or, with named functions:
async.waterfall([
    myFirstFunction,
    mySecondFunction,
    myLastFunction,
], function (err, result) {
    // result now equals 'done'
});
function myFirstFunction(callback) {
    callback(null, 'one', 'two');
}
function mySecondFunction(arg1, arg2, callback) {
    // arg1 now equals 'one' and arg2 now equals 'two'
    callback(null, 'three');
}
function myLastFunction(arg1, callback) {
    // arg1 now equals 'three'
    callback(null, 'done');
}

whilst(test, iteratee, callbackopt)

只要检查通过就执行

实例:

var count = 0;
async.whilst(
    function() { return count < 5; },
    function(callback) {
        count++;
        setTimeout(function() {
            callback(null, count);
        }, 1000);
    },
    function (err, n) {
        // 5 seconds have passed, n = 5
    }
);

Async.js常用工具类方法

apply(fn)

创建一个函数,并传入参数或者在随后持续传入参数

实例:

// using apply
async.parallel([
    async.apply(fs.writeFile, 'testfile1', 'test1'),
    async.apply(fs.writeFile, 'testfile2', 'test2')
]);
// the same process without using apply
async.parallel([
    function(callback) {
        fs.writeFile('testfile1', 'test1', callback);
    },
    function(callback) {
        fs.writeFile('testfile2', 'test2', callback);
    }
]);
// It's possible to pass any number of additional arguments when calling the
// continuation:

node> var fn = async.apply(sys.puts, 'one');
node> fn('two', 'three');
one
two
three

asyncify(func)

将同步函数变为异步函数

实例:

// passing a regular synchronous function
async.waterfall([
    async.apply(fs.readFile, filename, "utf8"),
    async.asyncify(JSON.parse),
    function (data, next) {
        // data is the result of parsing the text.
        // If there was a parsing error, it would have been caught.
    }
], callback);
// passing a function returning a promise
async.waterfall([
    async.apply(fs.readFile, filename, "utf8"),
    async.asyncify(function (contents) {
        return db.model.create(contents);
    }),
    function (model, next) {
        // `model` is the instantiated model object.
        // If there was an error, this function would be skipped.
    }
], callback);
// es2017 example, though `asyncify` is not needed if your JS environment
// supports async functions out of the box
var q = async.queue(async.asyncify(async function(file) {
    var intermediateStep = await processFile(file);
    return await somePromise(intermediateStep)
}));
q.push(files);

constant()

返回一个函数,当调用时会返回给定的值

实例:

async.waterfall([
    async.constant(42),
    function (value, next) {
        // value === 42
    },
    //...
], callback);
async.waterfall([
    async.constant(filename, "utf8"),
    fs.readFile,
    function (fileData, next) {
        //...
    }
    //...
], callback);
async.auto({
    hostname: async.constant("https://server.net/"),
    port: findFreePort,
    launchServer: ["hostname", "port", function (options, cb) {
        startServer(options, cb);
    }],
    //...
}, callback);

dir(function)

使用console.dir打印出异步的函数的执行结果

实例:

// in a module
var hello = function(name, callback) {
    setTimeout(function() {
        callback(null, {hello: name});
    }, 1000);
};
// in the node repl
node> async.dir(hello, 'world');
{hello: 'world'}

ensureAsync(fn)

包裹一个异步函数,使得它的回调函数在事件循环机制的下一个tick执行,通常是为了防止堆栈溢出

实例:

function sometimesAsync(arg, callback) {
    if (cache[arg]) {
        return callback(null, cache[arg]); // this would be synchronous!!
    } else {
        doSomeIO(arg, callback); // this IO would be asynchronous
    }
}
// this has a risk of stack overflows if many results are cached in a row
async.mapSeries(args, sometimesAsync, done);
// this will defer sometimesAsync's callback if necessary,
// preventing stack overflows
async.mapSeries(args, async.ensureAsync(sometimesAsync), done);

log(function)

使用console.log打印出异步函数的执行结果

实例:

// in a module
var hello = function(name, callback) {
    setTimeout(function() {
        callback(null, 'hello ' + name);
    }, 1000);
};
// in the node repl
node> async.log(hello, 'world');
'hello world'

memoize(fn, hasher)

缓存异步函数的执行结果

实例:

var slow_fn = function(name, callback) {
    // do something
    callback(null, result);
};
var fn = async.memoize(slow_fn);
// fn can now be used as if it were slow_fn
fn('some name', function() {
    // callback
});

nextTick(callback)

在事件循环之后的循环之中调用

实例:

var call_order = [];
async.nextTick(function() {
    call_order.push('two');
    // call_order now equals ['one','two']
});
call_order.push('one');
async.setImmediate(function (a, b, c) {
    // a, b, and c equal 1, 2, and 3
}, 1, 2, 3);

reflect(fn)

包裹一个异步函数,不论是否执行成功总返回一个结果对象

实例:

async.parallel([
    async.reflect(function(callback) {
        // do some stuff ...
        callback(null, 'one');
    }),
    async.reflect(function(callback) {
        // do some more stuff but error ...
        callback('bad stuff happened');
    }),
    async.reflect(function(callback) {
        // do some more stuff ...
        callback(null, 'two');
    })
],
// optional callback
function(err, results) {
    // values
    // results[0].value = 'one'
    // results[1].error = 'bad stuff happened'
    // results[2].value = 'two'
});

reflectAll(tasks)

reflect的包裹数组或多个函数版

实例:

let tasks = [
    function(callback) {
        setTimeout(function() {
            callback(null, 'one');
        }, 200);
    },
    function(callback) {
        // do some more stuff but error ...
        callback(new Error('bad stuff happened'));
    },
    function(callback) {
        setTimeout(function() {
            callback(null, 'two');
        }, 100);
    }
];

async.parallel(async.reflectAll(tasks),
// optional callback
function(err, results) {
    // values
    // results[0].value = 'one'
    // results[1].error = Error('bad stuff happened')
    // results[2].value = 'two'
});

// an example using an object instead of an array
let tasks = {
    one: function(callback) {
        setTimeout(function() {
            callback(null, 'one');
        }, 200);
    },
    two: function(callback) {
        callback('two');
    },
    three: function(callback) {
        setTimeout(function() {
            callback(null, 'three');
        }, 100);
    }
};

async.parallel(async.reflectAll(tasks),
// optional callback
function(err, results) {
    // values
    // results.one.value = 'one'
    // results.two.error = 'two'
    // results.three.value = 'three'
});

setImmediate(callback)

在事件循环之后的循环之中调用

实例:

var call_order = [];
async.nextTick(function() {
    call_order.push('two');
    // call_order now equals ['one','two']
});
call_order.push('one');
async.setImmediate(function (a, b, c) {
    // a, b, and c equal 1, 2, and 3
}, 1, 2, 3);

timeout(asyncFn, milliseconds, infoopt)

如果异步函数在规定时间内还没有调用它的回调函数,则返回超时错误

实例:

function myFunction(foo, callback) {
    doAsyncTask(foo, function(err, data) {
        // handle errors
        if (err) return callback(err);

        // do some stuff ...

        // return processed data
        return callback(null, data);
    });
}
var wrapped = async.timeout(myFunction, 1000);
// call `wrapped` as you would `myFunction`
wrapped({ bar: 'bar' }, function(err, data) {
    // if `myFunction` takes < 1000 ms to execute, `err`
    // and `data` will have their expected values
    // else `err` will be an Error with the code 'ETIMEDOUT'
});

unmemoize(fn)

取消memoize缓存,将之转换为原值

常见问题

1. 同步迭代器

当你使用同步迭代器的时候,很有可能会遇到这个错误RangeError: Maximum call stack size exceeded.,只需使用async.setImmediate或者使用async.ensureAsync即可。

原代码:

async.eachSeries(hugeArray, function iteratee(item, callback) {
    if (inCache(item)) {
        callback(null, cache[item]); //如果缓存很多,则会内存溢出
    } else {
        doSomeIO(item, callback);
    }
}, function done() {
    //...
});

将之改为:

async.eachSeries(hugeArray, function iteratee(item, callback) {
    if (inCache(item)) {
        async.setImmediate(function() {
            callback(null, cache[item]);
        });
    } else {
        doSomeIO(item, callback);
        //...
    }
});

2. 多个回调

确保在提前调用回调的时候加上return,例如:return callback(err, result),否则将会导致回调函数多次被调用:

async.waterfall([
    function(callback) {
        getSomething(options, function (err, result) {
            if (err) {
                callback(new Error("failed getting something:" + err.message));
                // 这里应该return
            }
            // 由于我们没有使用return,该回调还是会被调用,`processData`会被调用2次
            callback(null, result);
        });
    },
    processData
], done)

3. 使用ES2017async函数

Async.js中使用ES2017的async函数的时候,不会传递回调函数,并且只能接受原生的async函数而不是经过babel转化过得,你也可以使用async.asyncify()进行包裹,而且只会使用返回的值或者处理Promise的rejections或者errors。

async.mapLimit(files, async file => { // 没有回调函数
    const text = await util.promisify(fs.readFile)(dir + file, 'utf8')
    const body = JSON.parse(text) // <- 解析错误将会自动捕获
    if (!(await checkValidity(body))) {
        throw new Error(`${file} has invalid contents`) // <- 这个错误也会自动捕获
    }
    return body // <返回值
}, (err, contents) => {
    if (err) throw err
    console.log(contents)
})

4. 为迭代器绑定上下文

Async.js迭代器执行的环境this其实指向的是global对象,想要改变迭代器的this指针,可以使用bind进行改变:

// Here is a simple object with an (unnecessarily roundabout) squaring method
var AsyncSquaringLibrary = {
    squareExponent: 2,
    square: function(number, callback){
        var result = Math.pow(number, this.squareExponent);
        setTimeout(function(){
            callback(null, result);
        }, 200);
    }
};

async.map([1, 2, 3], AsyncSquaringLibrary.square, function(err, result) {
    // result is [NaN, NaN, NaN]
});

async.map([1, 2, 3], AsyncSquaringLibrary.square.bind(AsyncSquaringLibrary), function(err, result) {
    // result is [1, 4, 9]
});
Down

参考资料