English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
요구사항: 기능 A는 데이터를 가져오기 위해 제3자 API를 호출해야 하며, 제3자 API는 비동기 처리 방식을 사용하며, 호출 후 데이터와 상태 { data: "조회 결과", "status": "비동기 처리 중" }을 반환합니다. 이로 인해 제3자 API가 비동기 처리 중일 때 사용자가 기능 A를 사용할 때 반드시 기다려야 하므로, 사용자 요청을 타스크 큐에 추가하고 일부 데이터를 반환하여 요청을 종료합니다. 그런 다음 타스크 큐에서 타스크를 꺼내 제3자 API를 호출합니다. 상태가 "비동기 처리 중"인 경우 해당 타스크를 다시 타스크 큐에 추가하고, 상태가 "처리 완료"인 경우 데이터를 데이터베이스에 저장합니다.
위 문제를 고려하여 Node.js를 사용하는 것을 생각했습니다. + Redis sorted set를 통해 타스크 큐를 구현합니다. Node.js는 사용자 요청을 받기 위해 자신의 애플리케이션 API를 구현하고, 데이터베이스에 저장된 데이터와 API가 반환하는 부분 데이터를 결합하여 사용자에게 반환하고, 타스크를 타스크 큐에 추가합니다. Node.js child process와 cron을 사용하여 타스크 큐에서 타스크를 꺼내 실행합니다.
타스크 큐를 설계하는 과정에서 고려해야 할 몇 가지 문제
위 문제에 대한 해결책
예제 코드
// remote_api.js 세계적인 API를 모의합니다 'use strict'; const app = require('express')(); app.get('/', (req, res) => { setTimeout(() => { let arr = [200, 300]; // 200은 성공을 의미합니다.,300은 재요청이 필요하다는 것을 의미합니다. res.status(200).send({ 'status': arr[parseInt(Math.random() * 2]); }, 3000); }); app.listen('9001', () => { console.log('API 서비스 리스닝 포트:9001); }); // producer.js 자신의 애플리케이션 API, 사용자 요청을 받고 작업을 팀 큐에 추가하는 데 사용됩니다. 'use strict'; const app = require('express')(); const redisClient = require('redis').createClient(); const QUEUE_NAME = 'queue:example'; function addTaskToQueue(taskName, callback) { // 먼저 작업이 이미 존재하는지 확인합니다. 존재하면 건너뜁니다. 존재하지 않으면 작업을 팀 큐에 추가합니다. redisClient.zscore(QUEUE_NAME, taskName, (error, task) => { 아이(에러) { 콘솔 로그(에러); } if (task) { console.log('작업이 이미 존재합니다. 동일한 작업을 추가하지 않습니다.'); callback(null, task); } redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => { 아이(에러) { callback(error); } callback(null, result); } }); } } }); } app.get('/', (req, res) => { let taskName = req.query['task-name']; addTaskToQueue(taskName, (error, result) => { 아이(에러) { 콘솔 로그(에러); } res.status(200).send('검색 중입니다......'); } }); }); app.listen(9002, () => { console.log('생산자 서비스 리스닝 포트:9002); }); // consumer.js 정기적으로 작업을 가져오고 실행 'use strict'; const redisClient = require('redis').createClient(); const request = require('request'); const schedule = require('node-schedule); const QUEUE_NAME = 'queue:expmple'; const PARALLEL_TASK_NUMBER = 2; // 동시 실행 작업 수 function getTasksFromQueue(callback) { // 여러 작업을 가져오기 redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => { 아이(에러) { callback(error); } // 작업 점수를 0으로 설정하여 처리 중으로 표시 아이(태스크 길이 > 0) { let tmp = []; tasks.forEach((task) => { tmp.push(0); tmp.push(task); }); redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => { 아이(에러) { callback(error); } callback(null, tasks) } }); } } }); } function addFailedTaskToQueue(taskName, callback) { redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => { 아이(에러) { callback(error); } callback(null, result); } }); } function removeSucceedTaskFromQueue(taskName, callback) { redisClient.zrem(QUEUE_NAME, taskName, (error, result) => { 아이(에러) { callback(error); } callback(null, result); } } } function execTask(taskName) { return new Promise((resolve, reject) => { let requestOptions = { 'url': 'http://127.0.0.1:9001', 'method': 'GET', 'timeout': 5000 }; 리퀘스트(리퀘스트 옵션, (에러, 응답, 바디) => { 아이(에러) { 결정('실패'); 콘솔 로그(에러); 타스크 이름에서 실패한 타스크를 큐에 추가하기(taskName, (에러) => { 아이(에러) { 콘솔 로그(에러); } } }); } 트라이 { 바디 = 타입 바디 !== '오브젝트' ? JSON 파서(바디) : 바디; } 결정('실패'); 콘솔 로그(에러); 타스크 이름에서 실패한 타스크를 큐에 추가하기(taskName, (에러, 결과) => { 아이(에러) { 콘솔 로그(에러); } } }); 리턴; } 아이(바디 스타스 !== 200) { 결정('실패'); 타스크 이름에서 실패한 타스크를 큐에 추가하기(taskName, (에러, 결과) => { 아이(에러) { 콘솔 로그(에러); } } }); } 결정('성공'); 타스크 이름에서 성공한 타스크를 큐에서 제거하기(taskName, (에러, 결과) => { 아이(에러) { 콘솔 로그(에러); } } }); } } }); }); } // 定时,每隔 5 초에 새 타스크를 실행하도록 잡 = 스케줄 스케줄져브('*/5 * * * * *', () => { 콘솔 로그('새 타스크 가져오기'); 타스크 큐에서 타스크 가져오기((에러, 타스크) => { 아이(에러) { 콘솔 로그(에러); } 아이(태스크 길이 > 0) { 콘솔 로그(태스크); 프롬스 모두(태스크 맵(에克斯ékTask)); .서브((결과) => { 콘솔 로그(결과); } .캐치((에러) => { 콘솔 로그(에러); }); } } }); });