Introduction
In the previous post, we covered the process of analyzing the network panel of a webpage to identify the relevant RESTful API for scraping desired data. While this approach works for many websites, some implement techniques like JavaScript encryption, which makes it difficult to decrypt and extract valuable information solely through RESTful APIs. This is where the concept of a “headless browser” can enable us to simulate the actions of a real user browsing the website with a browser.
A headless browser is essentially a web browser without a graphical user interface (GUI). It allows automated web browsing and page interaction, providing a means to access and extract information from websites that employ dynamic content and JavaScript encryption. By using a headless browser, we can overcome some of the challenges posed by traditional scraping methods, as it allows us to execute JavaScript, render web pages, and access dynamically generated content.
Here I will demonstrate the process of creating a distributed crawler using a headless browser, using Google Maps as our target website.
Throughout my experience, I have explored various headless browser frameworks, such as Selenium, Puppeteer, Playwright, and Chromedp. Among them, I believe that Crawlee stands out as the most powerful tool I have ever used for web scraping purposes. Crawlee is a JavaScript-based library, which means you can easily adapt it to work with other frameworks of your choice, making it highly versatile and flexible for different project requirements.
How to list all the businesses in a country
In general, when using Google Maps to find businesses we want to visit, we typically conduct searches based on the business category type and location. For instance, we may use a keyword like “shop near Holtsville” to locate any shops in a small town in New York. However, a challenge arises when multiple towns share the same name within the same country. To overcome this, Google Maps offers a helpful feature: querying by postal code. Consequently, the initial query can be refined to “shop near 00501,” with 00501 being the postal code of a specific location in Holtsville. This approach provides greater clarity and reduces confusion compared to using town names.
With this clear path for efficient searches, our next objective is to compile a comprehensive list of all postal codes in the USA. To accomplish this, I used a free postal code database accessible here. If you happen to know of a better database, leave a comment below.
Snapshot of the postal code list of the US
Once we have downloaded the postal code list file, we can begin testing its functionality on Google Maps.
Search shop near 00501 USA in Google Map
Using the keyword shop near 00501 USA in the Google Map search bar, we can observe a list of shops located in Holtsville. As our aim is to scrape all the businesses from this search, it is essential to ensure we retrieve a comprehensive list. To achieve this, we must scroll down through the search results until we reach the bottom of the list. Upon reaching the end, Google Maps will display a clear message stating You’ve reached the end of the list. This indicator serves as our cue to conclude the scrolling process and move on to the next phase of data extraction. By doing so, we can be certain that we have gathered all the relevant businesses from the specified location, enabling us to proceed with the scraping procedure accurately and comprehensively.
Scroll down until seeing the message “You’ve reached the end of the list”
Once we have compiled the list of businesses from Google Maps, we can proceed to extract the detailed information we need from each business entry. This process involves going through the list one by one and scraping relevant data, such as the business’s address, operating hours, phone number, star ratings, number of reviews, and all available reviews.
Implementing the code of Google Map scraper
1. Google Map Businesses scraper
'use strict';
const { RequestQueue, PuppeteerCrawler, Configuration, KeyValueStore } = require('crawlee');
const path = require('path')
const { PROXY_HOST, CRAWLEE_MIN_CONCURRENCY, CRAWLEE_MAX_CONCURRENCY } = process.env
const { logger } = require('../log');
const { link } = require('fs');
const { request } = require('http');
const { client: redisClient } = require('../redis');
const { PUPPETEER_MINIMAL_ARGS, BROWSER_POOL_OPTIONS } = require('../const');
const googleMapPostcodetaskQueue = "googlemap:postcode:task:queue"
const googleMapPostcodetaskErrorQueue = "googlemap:postcode:task:error:queue"
const googleMapAddressTaskQueue = "googlemap:address:task:queue"
const googleMapPostcodeTaskSuccessStats = "googlemap:postcode:task:success:stats"
const googleMapPostcodeTaskFailureStats = "googlemap:postcode:task:failure:stats"
var store;
var handledSignal = false;
const SEARCH_PATH = "www.google.com/maps/search/"
const PLACE_PATH = "www.google.com/maps/place/"
async function scrollPage(page, scrollContainer, limit = 50) {
let count = 0
while (true) {
if (count >= limit) {
logger.error(`reached scoll page limit ${limit} , url ${page.url()}`)
break
}
let lastHeight = await page.evaluate(`document.querySelector("${scrollContainer}").scrollHeight`);
// await sleep(400)
await page.evaluate(`document.querySelector("${scrollContainer}").scrollTo(0, document.querySelector("${scrollContainer}").scrollHeight)`);
count++
// await page.waitForNavigation({waitUntil: 'domcontentloaded'}); // consider navigation to be finished when the DOMContentLoaded event is fired.
await waitTillHTMLRendered(page)
let newHeight = await page.evaluate(`document.querySelector("${scrollContainer}").scrollHeight`);
if (await page.$("span.HlvSq")) {
const button = await page.$eval("span.HlvSq", el => el.innerText)
if (button.includes("reached the end")) {
logger.info(`button ${button}`)
break
}
}
lastHeight = newHeight;
}
await extractLinks(page)
}
const processMap = async (page, keyword) => {
if (!keyword) {
logger.error(`sleeping | google list task queue is empty!`)
await sleep(10000)
return false
}
logger.info(`processing address: ${keyword}`)
const url = `https://www.google.com/maps/search/${keyword}?hl=en`
logger.info(`visiting url: ${url}`)
await googleMapConsentCheck(page)
await page.waitForNetworkIdle()
const notfound = await page.$("div[id='QA0Szd']")
if (notfound) {
const notFoundText = await page.$eval("div[id='QA0Szd']", el => el.innerText)
if (notFoundText && notFoundText.includes("Google Maps can't find ")) {
logger.error(`${keyword} not found!`)
return false
}
}
await scrollPage(page, "div[role='feed']")
return true
}
const getFullPath = (url) => {
const u = new URL(url)
return u.protocol + "//" + u.host + u.pathname
}
const extractLinks = async (page) => {
const links = await page.$$eval("a", link => link.map(a => a.href))
logger.info(`total ${links.length} links found!`)
for (let link of links) {
if (link.includes(PLACE_PATH)) {
link = getFullPath(link)
logger.info(`link ${link} is added to queue`)
await redisClient.lPush(googleMapAddressTaskQueue, link)
}
}
}
const getKeywordFromUrl = (url) => {
return url?.split("maps/search/")[1]?.split("/")[0].split("?")[0]
}
const preNavigationHook = async (crawlingContext, gotoOptions) => {
const { request } = crawlingContext;
const url = request.url
if (url.includes(SEARCH_PATH)) {
const keyword = getKeywordFromUrl(url)
await store.setValue(request.id, keyword)
}
}
const requestHandler = async ({ page, request }) => {
const start = Date.now()
await page.waitForNetworkIdle()
const url = page.url()
logger.info(`Processing: ${request.url} | page url ${page.url()}`)
if (url?.includes(PLACE_PATH)) {
logger.info(`new url ${url} is added to queue`)
await redisClient.lPush(googleMapAddressTaskQueue, url)
return
}
const keyword = getKeywordFromUrl(request.url)
const result = await processMap(page, keyword)
if (result) {
await redisClient.incr(googleMapPostcodeTaskSuccessStats)
logger.info(`scraping address info cost time ${(Date.now() - start) / 1000} s`)
}
await store.setValue(request.id, null)
}
const errorHandler = async ({ request, error }) => {
const keyword = getKeywordFromUrl(request.url)
logger.error(`errorHandler | error found: ${request.url} | keyword ${keyword}, err: ${error}`)
}
const failedRequestHandler = async ({ request, error }) => {
await redisClient.incr(googleMapPostcodeTaskFailureStats)
const url = request.url
logger.error(`failedRequestHandler | error:${request.url}, err: ${error}`)
const errMsg = error?.message.toLowerCase() || ""
const keyword = getKeywordFromUrl(request.url)
if (errMsg.includes("timeout") || errMsg.includes("net::err")) {
logger.error(`${url} is putting back to queue!`)
await redisClient.lPush(googleMapPostcodetaskQueue, keyword)
return
}
await redisClient.lPush(googleMapPostcodetaskErrorQueue, keyword)
}
const backupRequestQueue = async (queue, store, signal) => {
if (handledSignal) {
logger.info(`${signal} already handled`)
return
}
handledSignal = true
logger.info(`GOT ${signal}, backing up!`)
let count = 0
for (const [key, value] of Object.entries(queue?.queueHeadDict?.dictionary)) {
const qResult = await queue.getRequest(key)
const url = qResult.url
if (!url.includes(SEARCH_PATH)) {
continue
}
count += 1
const keyword = getKeywordFromUrl(url)
logger.info(`${signal} signal recieved, backing up key: ${key}, keyword: ${keyword} | request: ${url}`)
redisClient.lPush(googleMapPostcodetaskQueue, keyword)
}
logger.info(`tasks in queue count: ${count}`)
count = 0
await store.forEachKey(async (key, index, info) => {
count += 1
const url = await store.getValue(key)
logger.info(`running tasks: key: ${key} ,url: ${url}`)
const keyword = getKeywordFromUrl(url)
if (keyword) {
redisClient.lPush(googleMapPostcodetaskQueue, keyword)
}
});
logger.info(`Google Map Search: running tasks count: ${count}`)
}
const config = Configuration.getGlobalConfig();
config.set('disableBrowserSandbox', true);
async function index() {
store = await KeyValueStore.open('google-map-list');
const queue = await RequestQueue.open("google-map-list");
var crawler = new PuppeteerCrawler({
launchContext: {
launchOptions: {
handleSIGINT: false,
handleSIGTERM: false,
// Other Puppeteer options
},
},
requestQueue: queue,
headless: true,
browserPoolOptions: BROWSER_POOL_OPTIONS,
navigationTimeoutSecs: 2 * 60,
requestHandlerTimeoutSecs: 10 * 60,
keepAlive: true,
maxRequestRetries: 3,
minConcurrency: CRAWLEE_MIN_CONCURRENCY | 1,
maxConcurrency: CRAWLEE_MAX_CONCURRENCY | 3,
preNavigationHooks: [preNavigationHook],
requestHandler: requestHandler,
errorHandler: errorHandler,
failedRequestHandler: failedRequestHandler,
});
['SIGINT', 'SIGTERM', "uncaughtException"].forEach(signal => process.on(signal, async () => {
await backupRequestQueue(queue, store, signal)
await crawler.teardown()
await sleep(200)
process.exit(1)
}))
var keyword
var errCount = 0
crawler.run()
while (true) {
// const keyword = "hotel+near+t7x+ca"
const queueSize = (await crawler.getRequestQueue())?.queueHeadDict?.linkedList.length || 0
logger.info(`crawler status ${crawler.running} , queue size is ${queueSize}`)
if (!crawler.running || queueSize > 10) {
await sleep(1000)
if (!crawler.running) {
crawler.run()
}
continue
}
keyword = await redisClient.rPop(googleMapPostcodetaskQueue)
try {
if (!keyword) {
logger.info(`postcode task queue is empty, sleeping 100000 ms`)
await sleep(100000)
continue
}
var url
if (keyword.includes(SEARCH_PATH)) {
url = keyword
} else {
url = `https://www.google.com/maps/search/${keyword}?hl=en`
}
logger.info(`new address added ${url}`)
await crawler.addRequests([url]);
await sleep(300)
} catch (e) {
await redisClient.incr(googleMapPostcodeTaskFailureStats)
logger.error(e)
logger.error(`keyword: ${keyword}, err: ${e}, proxy: ${PROXY_HOST}`)
if (e.message?.toLowerCase().includes("timeout") || e.message.toLowerCase().includes("err_connection_closed")) {
logger.error(`${keyword} is putting back to queue!`)
errCount++
await redisClient.lPush(googleMapPostcodetaskQueue, keyword)
continue
}
await redisClient.lPush(googleMapPostcodetaskErrorQueue, keyword)
await sleep(500)
}
}
The provided source code mainly focuses on extracting information from Google Maps using CSS selectors, which is relatively straightforward. As spot instances can be terminated at any time, it is essential to handle this situation carefully.
To solve this issue, we need to implement code that listens for the SIGTERM and SIGINT events. These events indicate that the instance is about to be terminated. When these events are triggered, we should take appropriate actions to backup any pending tasks in the job queue and also preserve the state of any running tasks that haven’t been completed yet.
By listening to these signals, we can intercept the termination process and ensure that critical data and tasks are not lost. The backup mechanism enables us to store any unfinished work safely, allowing for a seamless continuation of tasks when new instances are launched in the future.
['SIGINT', 'SIGTERM', "uncaughtException"].forEach(signal => process.on(signal, async () => {
await backupRequestQueue(queue, store, signal)
await crawler.teardown()
await sleep(200)
process.exit(1)
}))
2. Google Map Business Detail Scraper
'use strict';
const { RequestQueue, PuppeteerCrawler, Configuration, utils, KeyValueStore } = require('crawlee');
const { PROXY_HOST, CRAWLEE_MIN_CONCURRENCY, CRAWLEE_MAX_CONCURRENCY } = process.env
const { md5 } = require('../md5');
const { sleep, googleMapConsentCheck } = require('../utils');
const { logger } = require('../log');
const { parseTextDuration } = require('../time');
const { client: redisClient } = require('../redis');
const { getGoogleMap, insertGoogleMap } = require('./model');
const { PUPPETEER_MINIMAL_ARGS, BROWSER_POOL_OPTIONS } = require('../const');
const googleMapAddressTaskQueue = "googlemap:address:task:queue"
const googleMapAddressTaskErrorQueue = "googlemap:address:task:error:queue"
const googleMapAddressTaskSuccess = "googlemap:address:task:success"
const googleMapAddressTaskFailure = "googlemap:address:task:failure"
var store;
var hasMore = true;
var handledSignal = false;
async function scrollPage(page, scrollContainer, limit = 30) {
let count = 0
while (true) {
if (count >= limit) {
logger.error(`reached scoll page limit ${limit} , url ${page.url()}`)
break
}
let lastHeight = await page.evaluate(`document.querySelector("${scrollContainer}").scrollHeight`);
logger.info(`scrollPage lastHeight ${lastHeight}`)
await page.evaluate(`document.querySelector("${scrollContainer}").scrollTo(0, document.querySelector("${scrollContainer}").scrollHeight)`);
count++
await page.waitForTimeout(2000);
let newHeight = await page.evaluate(`document.querySelector("${scrollContainer}").scrollHeight`);
if (await page.$("span.xRkPPb")) {
const dates = await page.evaluate(() => {
return Array.from(document.querySelectorAll("span.xRkPPb")).map((el) => {
console.log(`date ${el.innerText.trim().split("on")[0]}`)
return el.innerText.trim().split("on")[0];
});
})
const date = dates[dates.length - 1]
if (date && (date.includes("year ago") || date.includes("years ago"))) {
break
}
}
if (await page.$("span.rsqaWe")) {
const dates = await page.evaluate(() => {
return Array.from(document.querySelectorAll("span.rsqaWe")).map((el) => {
return el.innerText.trim();
});
})
const date = dates[dates.length - 1]
if (date && (date.includes("year ago") || date.includes("years ago"))) {
break
}
lastHeight = newHeight;
}
}
}
const processAbout = async (page) => {
if (! await page.$("button[aria-label*='About']")) {
return null
}
await page.click("button[aria-label*='About']")
await sleep(500)
if (!(await page.$("h2"))) {
return null
}
await page.waitForSelector("h2", { timeout: 1000 * 10 })
const list = await page.$$("div.fontBodyMedium")
var data = {}
if (list.length === 1) {
const text = await page.evaluate(() => {
return Array.from(document.querySelectorAll("div.P1LL5e")).map((el) => {
return el.innerText.trim();
});
})
const attrs = await page.evaluate(() => {
return Array.from(document.querySelectorAll("div.WKLD0c .CK16pd")).map((el) => {
return el.getAttribute("aria-label")
})
})
return text.join("\n") + "\n" + attrs.join("\n")
}
for (let item of list) {
if (! await item.$("h2")) {
continue
}
let title = await item.$eval("h2", el => el.innerText)
const texts = await item.$$("li")
let items = []
for (let t of texts) {
const text = await t.$eval("span", el => el.getAttribute("aria-label"))
items.push(text)
}
data[title] = items
}
return data
}
const processReviews = async (page) => {
if (!(await page.$("button[aria-label*='Reviews']"))) {
return null
}
await page.click("button[aria-label*='Reviews']")
await page.waitForSelector("button[aria-label*='relevant'], button[aria-label*='Sort']", { timeout: 1000 * 10 })
if (await page.$("button[aria-label*='relevant']")) {
await page.click("button[aria-label*='relevant']")
} else if (await page.$("button[aria-label*='Sort']")) {
await page.click("button[aria-label*='Sort']")
}
await page.waitForSelector("div[id='action-menu'] div[data-index='1']", { timeout: 1000 * 10 })
if (!(await page.$("div[id='action-menu'] div[data-index='1']"))) {
return null
}
await page.click("div[id='action-menu'] div[data-index='1']")
await page.waitForSelector("div.d4r55", { timeout: 1000 * 10 })
const start = Date.now()
await scrollPage(page, '.DxyBCb');
logger.info(`processReviews scrollPage cost time ${(Date.now() - start) / 1000} s`)
const reviews = await page.evaluate(() => {
return Array.from(document.querySelectorAll(".jftiEf")).map((el) => {
return {
user: {
name: el.querySelector(".d4r55")?.textContent.trim(),
link: el.querySelector(".WNxzHc a")?.getAttribute("href"),
thumbnail: el.querySelector(".NBa7we")?.getAttribute("src"),
localGuide: el.querySelector(".RfnDt span:first-child")?.style.display === "none" ? undefined : true,
reviews: parseInt(el.querySelector(".RfnDt span:last-child")?.textContent.replace("·", "")),
},
rating: parseFloat(el.querySelector(".kvMYJc")?.getAttribute("aria-label") || parseInt(el.querySelector(".fzvQIb")?.textContent.split("/")[0]) / 5),
snippet: el.querySelector(".MyEned")?.textContent.trim(),
date: el.querySelector(".rsqaWe")?.textContent.trim() || el.querySelector(".xRkPPb")?.textContent.trim().split(" on")[0],
};
});
});
return reviews.map(r => {
r.date = new Date(Date.now() - parseTextDuration(r.date) * 1000)
return r
});
}
const processMapItem = async (page) => {
const url = page.url()
const title = await page.$eval("h1", el => el.innerText)
const review_el = await page.$("div.F7nice") ? await page.$eval("div.F7nice", el => el.innerText) : ""
const star = parseFloat( review_el?.split("(")[0] || 0 )
const review_count = parseInt( review_el?.split("(").length >= 2 ? review_el?.split("(")[1].split(")")[0] : 0)
const headline = await page.$("div[aria-label*='About'] div[jslog*='metadata']") ? await page.$eval("div[aria-label*='About'] div[jslog*='metadata']", el => el.innerText) : ""
const category = await page.$("button[jsaction='pane.rating.category']") ? await page.$eval("button[jsaction='pane.rating.category']", el => el.innerText) : (await page.$("span.mgr77e") ? await page.$eval("span.mgr77e", el => el.innerText) : null)
let address = await page.$("button[data-item-id='address']") ? await page.$eval("button[data-item-id='address']", el => el.getAttribute("aria-label")) : ""
address = address.replace("Address: ", "")
const openHours = await page.$("div[aria-label*='Sunday']") ? await page.$eval("div[aria-label*='Sunday']", el => el.getAttribute("aria-label")) : null
const checkIn = await page.$("div[data-item-id='place-info-links:'] .Io6YTe") ? await page.$eval("div[data-item-id='place-info-links:'] .Io6YTe", el => el.innerText) : null
const book = await page.$("a.M77dve") ? await page.$eval("a.M77dve", el => el.getAttribute("href")) : null
const website = await page.$("a[data-item-id='authority']") ? await page.$eval("a[data-item-id='authority']", el => el.getAttribute("href")) : null
const phone = await page.$("button[aria-label*='Phone']") ? await page.$eval("button[aria-label*='Phone']", el => el.innerText) : null
const pluscode = await page.$("button[aria-label*='Plus code']") ? await page.$eval("button[aria-label*='Plus code']", el => el.innerText) : null
let start = Date.now()
const about = await processAbout(page)
logger.info(`processAbout cost time ${(Date.now() - start) / 1000} s | ${url}`)
start = Date.now()
const review = await processReviews(page)
logger.info(`processReviews cost time ${(Date.now() - start) / 1000} s | ${url}`)
const coordinate = parseCoordinateFromMapUrl(url)
const result = {
url: url,
title: title,
star: star,
review_count: review_count,
headline: headline,
category: category,
address: address,
openHours: openHours,
checkIn: checkIn,
book: book,
website: website,
phone: phone,
pluscode: pluscode,
coordinate: coordinate,
about: about,
review: review
}
return result
}
const processMap = async (page, url) => {
if (!url) {
logger.error(`sleeping | url queue is empty!`)
await sleep(10000)
return false
}
logger.info(`visiting url: ${url}`)
await googleMapConsentCheck(page)
await page.waitForSelector("h1", { timeout: 10 * 1000 })
await waitTillHTMLRendered(page)
const start = Date.now()
const result = await processMapItem(page)
logger.info(`processMapItem cost time ${(Date.now() - start) / 1000} s || ${JSON.stringify(result)}`)
result["id"] = parseMapId(url)
const ok = await insertGoogleMap(result)
if (ok) {
logger.info(`google map ${result.title} is insertted into db successfully!`)
}
return true
}
const config = Configuration.getGlobalConfig();
config.set('disableBrowserSandbox', true);
const preNavigationHook = async (crawlingContext, gotoOptions) => {
const { request } = crawlingContext;
const url = request.url
if (url.includes("www.google.com/maps/place")) {
await store.setValue(request.id, url)
}
}
const requestHandler = async ({ page, request }) => {
const url = request.url
if (!url.includes("www.google.com/maps/place")) {
await store.setValue(request.id, null)
logger.error(`illegal address ${url}`)
return
}
const start = Date.now()
await page.waitForNetworkIdle()
logger.info(`Processing: ${url}`)
const result = await processMap(page, url)
if (result) {
await redisClient.incr(googleMapAddressTaskSuccess)
logger.info(`scraping address info cost time ${(Date.now() - start) / 1000} s`)
}
await store.setValue(request.id, null)
}
const errorHandler = async ({ request, error }) => {
logger.error(`errorHandler | error found: ${request.url}, err: ${error}`)
}
const failedRequestHandler = async ({ request, error }) => {
await redisClient.incr(googleMapAddressTaskFailure)
const url = request.url
logger.error(`failedRequestHandler | error:${request.url}, err: ${error}`)
const errMsg = error?.message.toLowerCase() || ""
if (errMsg.includes("timeout") || errMsg.includes("net::err")) {
logger.error(`${url} is putting back to queue!`)
await redisClient.lPush(googleMapAddressTaskQueue, url)
return
}
logger.error(`${url} is putting back to error queue | errMsg: ${errMsg}`)
await redisClient.lPush(googleMapAddressTaskErrorQueue, url)
}
const backupRequestQueue = async (queue, store, signal) => {
if (handledSignal) {
logger.info(`${signal} already handled`)
return
}
handledSignal = true
logger.info(`GOT ${signal}, backing up!`)
let count = 0
for (const [key, value] of Object.entries(queue?.queueHeadDict?.dictionary)) {
const qResult = await queue.getRequest(key)
if (!qResult.url.includes("www.google.com/maps/place")) {
continue
}
count += 1
logger.info(`${signal} signal recieved, backing up key: ${key} | request: ${qResult.url}`)
redisClient.lPush(googleMapAddressTaskQueue, qResult.url)
}
logger.info(`tasks in queue count: ${count}`)
count = 0
await store.forEachKey(async (key, index, info) => {
count += 1
const val = await store.getValue(key)
logger.info(`running tasks: key: ${key} url: ${val}`)
if (val) {
redisClient.lPush(googleMapAddressTaskQueue, val)
}
});
logger.info(`running tasks count: ${count}`)
}
async function index() {
store = await KeyValueStore.open('google-map-item');
const queue = await RequestQueue.open("google-map-item");
var crawler = new PuppeteerCrawler({
launchContext: {
launchOptions: {
handleSIGINT: false,
handleSIGTERM: false,
// Other Puppeteer options
},
},
requestQueue: queue,
headless: true,
browserPoolOptions: BROWSER_POOL_OPTIONS,
navigationTimeoutSecs: 2 * 60,
requestHandlerTimeoutSecs: 5 * 60,
keepAlive: true,
maxRequestRetries: 3,
minConcurrency: CRAWLEE_MIN_CONCURRENCY | 1,
maxConcurrency: CRAWLEE_MAX_CONCURRENCY | 3,
preNavigationHooks: [preNavigationHook],
requestHandler: requestHandler,
errorHandler: errorHandler,
failedRequestHandler: failedRequestHandler,
});
['SIGINT', 'SIGTERM', 'uncaughtException'].forEach(signal => process.on(signal, async () => {
await backupRequestQueue(queue, store, signal)
await crawler.teardown()
await sleep(200)
process.exit(1)
}))
var url
var errCount = 0
crawler.run()
while (true) {
const queueSize = queue?.queueHeadDict?.linkedList.length || 0
logger.info(`crawler status ${crawler.running} , queue size is ${queueSize}`)
if (!crawler.running || queueSize > 10) {
await sleep(1000)
if (!crawler.running) {
crawler.run()
}
continue
}
url = await redisClient.lPop(googleMapAddressTaskQueue)
const start = Date.now()
try {
if (!url) {
const s = 5 * 1000
logger.error(`url is empty | sleeping ${s} ms`)
await sleep(s)
continue
}
const mapId = parseMapId(url)
if (!mapId) {
logger.error(`mapId is empty, url ${url}`)
continue
}
if (!url.includes("www.google.com/maps/place")) {
logger.error(`illegal address ${url}`)
continue
}
const ok = await getGoogleMap(mapId)
if (ok) {
logger.info(`already processed ${url}, bypassing`)
continue
}
const param = "?authuser=0&hl=en&rclk=1"
if (!url.includes(param)) {
url += param
}
logger.info(`new url added ${url}`)
await crawler.addRequests([url]);
await sleep(500)
} catch (e) {
logger.error(e)
await redisClient.incr(googleMapAddressTaskFailure)
errCount++
logger.error(`url: ${url}, err: ${e}, proxy: ${PROXY_HOST} | scraping address info cost time ${(Date.now() - start) / 1000} s`)
const errMsg = e.message?.toLowerCase() || ""
if (errMsg.includes("timeout") || errMsg.includes("net::err")) {
logger.error(`${url} is putting back to queue!`)
await redisClient.lPush(googleMapAddressTaskQueue, url)
continue
}
await redisClient.lPush(googleMapAddressTaskErrorQueue, url)
await sleep(400)
}
}
}
3. Deployment file for Kubernetes
---
apiVersion: v1
kind: ConfigMap
metadata:
name: node-crawler-env
data:
ENV: prod
MONGO_URI: mongodb+srv://googlemap-crawler:[email protected]/?retryWrites=true&w=majority
MONGO_DBNAME: googlemap
REDIS_HOST: chatloop-redis.cache.amazonaws.com
REDIS_PORT: "6379"
REDIS_PASSWORD: ""
PUPPETEER_SKIP_CHROMIUM_DOWNLOAD: "true"
PUPPETEER_EXECUTABLE_PATH: /usr/bin/google-chrome-stable
CRAWLEE_MIN_CONCURRENCY: "3"
CRAWLEE_MAX_CONCURRENCY: "15"
CRAWLEE_MEMORY_MBYTES: "4096"
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: node-crawler-map-list
spec:
replicas: 410
selector:
matchLabels:
app: node-crawler-map-list
strategy:
rollingUpdate:
maxSurge: 4
maxUnavailable: 4
type: RollingUpdate
template:
metadata:
labels:
app: node-crawler-map-list
spec:
containers:
- name: node-crawler-map-list
image: node-crawler:20230720084044
command: ["node"]
args: ["./google/addressList.js"]
imagePullPolicy: IfNotPresent
envFrom:
- configMapRef:
name: node-crawler-env
resources:
requests:
cpu: 1000m
memory: 1000Mi
limits:
cpu: 4000m
memory: 4096Mi
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: node-crawler-map-item
spec:
replicas: 40
selector:
matchLabels:
app: node-crawler-map-item
strategy:
rollingUpdate:
maxSurge: 8
maxUnavailable: 8
type: RollingUpdate
template:
metadata:
labels:
app: node-crawler-map-item
spec:
containers:
- name: node-crawler-map-item
image: node-crawler:20230720084044
command: ["node"]
args: ["./google/item.js"]
imagePullPolicy: IfNotPresent
envFrom:
- configMapRef:
name: node-crawler-env
resources:
requests:
cpu: 1000m
memory: 1000Mi
limits:
cpu: 4000m
memory: 4096Mi
Monitoring and Optimizing the performance
As of now, everything with Crawlee appears to be functioning well, except for one critical issue. After running in the Kubernetes (k8s) cluster for approximately one hour, the performance of Crawlee experiences a significant drop, resulting in the extraction of only a few hundred items per hour, whereas initially, it was extracting at a much higher rate. Interestingly, this issue is not encountered when using a standalone container with Docker Compose on a dedicated machine.
Moreover, while monitoring the cluster, you may observe a drastic decrease in CPU utilization from around 90% to merely 10%, especially if you have the metric-server installed. This unexpected behavior is concerning and requires investigation to identify the underlying cause.
To address this performance degradation and ensure efficient resource utilization, you have taken the initiative to leverage the Kubernetes API and client-go
, the Golang SDK for Kubernetes. By utilizing these tools, you can effectively monitor the CPU utilization of all instances in the cluster. To further mitigate this issue, you have implemented a solution to automatically terminate instances that exhibit very low CPU utilization and have been active for at least 30 minutes.
By automatically terminating such instances, you can avoid inefficiencies in resource allocation and ensure that underperforming instances do not hamper the overall data extraction process. This proactive approach helps maintain the cluster’s performance and ensures that Crawlee operates optimally, delivering consistent and reliable results even in the dynamic and challenging Kubernetes environment.
func cleanupNodesBasedOnResourceUsage() {
var metrics []Resource
nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
panic(err)
}
for _, node := range nodes.Items {
if node.Spec.Unschedulable {
continue
}
var metric = Resource{}
cpuAvailable := node.Status.Allocatable[v1.ResourceCPU]
memoryAvailable := node.Status.Allocatable[v1.ResourceMemory]
metric.Name = node.Name
metric.Cpu = cpuAvailable.AsApproximateFloat64()
metric.Memory = memoryAvailable.AsApproximateFloat64() / (1024 * 1024 * 1024)
metric.CreationTimestamp = node.CreationTimestamp.Time
metrics = append(metrics, metric)
}
metricsClient, err := metricsv.NewForConfig(config)
if err != nil {
panic(err)
}
nodeMetrics, err := metricsClient.MetricsV1beta1().NodeMetricses().List(ctx, metav1.ListOptions{})
if err != nil {
panic(err)
}
for _, metric := range nodeMetrics.Items {
index := findResourceIndexByNode(metrics, metric.Name)
if index == len(metrics) {
continue
}
resource := metrics[index]
resource.CpuUsage = metric.Usage.Cpu().AsApproximateFloat64()
resource.MemoryUsage = metric.Usage.Memory().AsApproximateFloat64() / (1024 * 1024 * 1024)
metrics[index] = resource
}
log.Info("running ec2 count %d, ", len(metrics))
metrics = lo.Filter(metrics, func(metric Resource, _ int) bool {
if metric.CpuUsage/metric.Cpu > 0.5 {
return false
}
return metric.CreationTimestamp.Add(30 * time.Minute).Before(time.Now())
})
sort.Slice(metrics, func(i, j int) bool {
return metrics[i].CpuUsage/metrics[i].Cpu < metrics[j].CpuUsage/metrics[j].Cpu
})
var result []Resource
if len(metrics) == 0 {
log.Error("Not candiate instances to be terminated!")
return
}
if len(metrics) == 1 {
result = metrics
}
if len(metrics) > 1 {
result = metrics[0 : len(metrics)/2]
}
log.Info("potential nodes to be terminated %d ", len(result))
var instances []string
for _, item := range result {
instances = append(instances, item.Name)
log.Info("Going to terminate %s, because its resource utilization %0.2f is too low", item.Name, item.CpuUsage/item.Cpu)
aws.TerminateInstancesByPrivateDnsNames([]string{item.Name})
time.Sleep(10 * time.Second)
}
log.Info("Instances %s are terminated successfully becaue of low resource utilization! ", instances)
}
the provided code aims to address the issue of low CPU utilization in Kubernetes nodes by utilizing the Kubernetes metrics API to filter out underperforming nodes. Subsequently, the instance termination process is executed through the AWS Go SDK.
To ensure the successful implementation of this solution in a Kubernetes (k8s) cluster, additional steps are required. Specifically, we need to create a ServiceAccount, ClusterRole, and ClusterRoleBinding to properly assign the necessary permissions to the nodes-cleanup-cron-task. These permissions are essential for the task to effectively query the relevant Kubernetes resources and perform the required actions.
The ServiceAccount is responsible for providing an identity to the nodes-cleanup-cron-task, allowing it to authenticate with the Kubernetes API server. The ClusterRole defines a set of permissions that the task requires to interact with the necessary resources, in this case, the metrics API and other Kubernetes objects. Finally, the ClusterRoleBinding connects the ServiceAccount and ClusterRole, granting the task the permissions specified in the ClusterRole.
By establishing this set of permissions and associations, we ensure that the nodes-cleanup-cron-task can access and query the metrics API and other Kubernetes resources, effectively identifying nodes with low CPU utilization and terminating instances using the AWS Go SDK.
---
apiVersion: batch/v1
kind: CronJob
metadata:
name: nodes-cleanup-cron-task
namespace: default
spec:
schedule: "*/3 * * * *"
concurrencyPolicy: Allow
failedJobsHistoryLimit: 5
successfulJobsHistoryLimit: 5
jobTemplate:
spec:
activeDeadlineSeconds: 3000
backoffLimit: 5
template:
metadata:
labels:
app: nodes-cleanup-cron-task
spec:
serviceAccountName: nodes-cleanup-cron-task-ca
restartPolicy: OnFailure
priorityClassName: system-node-critical
containers:
- name: nodes-cleanup-cron-task
image: go-crawler:20230715065044
command: ["./crawler"]
args: ["-w", "k8s-cleanup"]
resources:
requests:
cpu: 100m
memory: 200Mi
limits:
cpu: 300m
memory: 500Mi
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: nodes-cleanup-cron-task-ca
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: nodes-cleanup-cron-task-ca-role
rules:
- apiGroups: [""]
resources:
- pods
- nodes
verbs:
- get
- list
- watch
- delete
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: nodes-cleanup-cron-task-cr
rules:
- apiGroups:
- "*"
resources:
- pods
- nodes
verbs:
- get
- patch
- list
- watch
- delete
- apiGroups:
- "apps"
- "extensions"
resources:
- pods
- nodes
verbs:
- get
- patch
- list
- watch
- delete
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: nodes-cleanup-cron-task-ca-csb
subjects:
- kind: ServiceAccount
name: nodes-cleanup-cron-task-ca
namespace: default
roleRef:
kind: ClusterRole
name: nodes-cleanup-cron-task-ca-cr
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: nodes-cleanup-cron-task-ca-sb
subjects:
- kind: ServiceAccount
name: nodes-cleanup-cron-task-ca
roleRef:
kind: Role
name: nodes-cleanup-cron-task-ca-role
apiGroup: rbac.authorization.k8s.io
Conclusion
At this stage, the majority of the code is complete, and you have the capability to deploy it on any cloud server with Kubernetes (k8s). This flexibility allows you to scale the application effortlessly, expanding the number of instances as needed to meet your specific requirements.
One of the key advantages of the design lies in its termination tolerance. With the implemented safeguards to handle SIGTERM and SIGINT events, you can deploy spot instances without concerns about potential data loss. Even when spot instances are terminated unexpectedly, the application gracefully manages the data in the job queue and running tasks.
By leveraging this termination tolerance feature, the application can handle spot instance terminations smoothly. This ensures that any pending tasks in the job queue are backed up safely and that the state of running tasks, which haven’t completed yet, is preserved. Consequently, you can rest assured that the integrity of your data and tasks will be maintained throughout the operation.
Deploying the application with Kubernetes and taking advantage of termination tolerance empowers you to scale the Google Maps scraper efficiently, managing numerous instances to meet your data extraction needs effectively. The combination of Kubernetes and the termination tolerance design enhances the overall robustness and reliability of the application, allowing for seamless operation even in the dynamic and unpredictable cloud environment. If you have any questions regarding this article or any suggestions for future articles, please leave a comment below. Additionally, I am available for remote work or contracts, so please feel free to reach out to me via email.