How to efficiently scrape millions of Google Businesses on a large scale using a distributed crawler

July 31, 2023 (7mo ago)

Support me on Patreon to write more tutorials like this!

profile picture
Tony Wang

Introduction

In the previous post, we covered the process of analyzing the network panel of a webpage to identify the relevant RESTful API for scraping desired data. While this approach works for many websites, some implement techniques like JavaScript encryption, which makes it difficult to decrypt and extract valuable information solely through RESTful APIs. This is where the concept of a “headless browser” can enable us to simulate the actions of a real user browsing the website with a browser.

A headless browser is essentially a web browser without a graphical user interface (GUI). It allows automated web browsing and page interaction, providing a means to access and extract information from websites that employ dynamic content and JavaScript encryption. By using a headless browser, we can overcome some of the challenges posed by traditional scraping methods, as it allows us to execute JavaScript, render web pages, and access dynamically generated content.

Here I will demonstrate the process of creating a distributed crawler using a headless browser, using Google Maps as our target website.

Throughout my experience, I have explored various headless browser frameworks, such as Selenium, Puppeteer, Playwright, and Chromedp. Among them, I believe that Crawlee stands out as the most powerful tool I have ever used for web scraping purposes. Crawlee is a JavaScript-based library, which means you can easily adapt it to work with other frameworks of your choice, making it highly versatile and flexible for different project requirements.

How to list all the businesses in a country

In general, when using Google Maps to find businesses we want to visit, we typically conduct searches based on the business category type and location. For instance, we may use a keyword like “shop near Holtsville” to locate any shops in a small town in New York. However, a challenge arises when multiple towns share the same name within the same country. To overcome this, Google Maps offers a helpful feature: querying by postal code. Consequently, the initial query can be refined to “shop near 00501,” with 00501 being the postal code of a specific location in Holtsville. This approach provides greater clarity and reduces confusion compared to using town names.

With this clear path for efficient searches, our next objective is to compile a comprehensive list of all postal codes in the USA. To accomplish this, I used a free postal code database accessible here. If you happen to know of a better database, leave a comment below.

Snapshot of the postal code list of the US

Snapshot of the postal code list of the US

Once we have downloaded the postal code list file, we can begin testing its functionality on Google Maps.

Search shop near 00501 USA in Google Map

Search shop near 00501 USA in Google Map

Using the keyword shop near 00501 USA in the Google Map search bar, we can observe a list of shops located in Holtsville. As our aim is to scrape all the businesses from this search, it is essential to ensure we retrieve a comprehensive list. To achieve this, we must scroll down through the search results until we reach the bottom of the list. Upon reaching the end, Google Maps will display a clear message stating You’ve reached the end of the list. This indicator serves as our cue to conclude the scrolling process and move on to the next phase of data extraction. By doing so, we can be certain that we have gathered all the relevant businesses from the specified location, enabling us to proceed with the scraping procedure accurately and comprehensively.

Scroll down until seeing the message “You’ve reached the end of the list”

Scroll down until seeing the message “You’ve reached the end of the list”

Once we have compiled the list of businesses from Google Maps, we can proceed to extract the detailed information we need from each business entry. This process involves going through the list one by one and scraping relevant data, such as the business’s address, operating hours, phone number, star ratings, number of reviews, and all available reviews.

Google business review” Google business review” Google business review”

Implementing the code of Google Map scraper

1. Google Map Businesses scraper


'use strict';
const { RequestQueue, PuppeteerCrawler, Configuration, KeyValueStore } = require('crawlee');

const path = require('path')
const { PROXY_HOST, CRAWLEE_MIN_CONCURRENCY, CRAWLEE_MAX_CONCURRENCY } = process.env


const { logger } = require('../log');
const { link } = require('fs');
const { request } = require('http');
const { client: redisClient } = require('../redis');
const { PUPPETEER_MINIMAL_ARGS, BROWSER_POOL_OPTIONS } = require('../const');

const googleMapPostcodetaskQueue = "googlemap:postcode:task:queue"
const googleMapPostcodetaskErrorQueue = "googlemap:postcode:task:error:queue"
const googleMapAddressTaskQueue = "googlemap:address:task:queue"
const googleMapPostcodeTaskSuccessStats = "googlemap:postcode:task:success:stats"
const googleMapPostcodeTaskFailureStats = "googlemap:postcode:task:failure:stats"


var store;
var handledSignal = false;
const SEARCH_PATH = "www.google.com/maps/search/"
const PLACE_PATH = "www.google.com/maps/place/"

async function scrollPage(page, scrollContainer, limit = 50) {
    let count = 0
    while (true) {
        if (count >= limit) {
            logger.error(`reached scoll page limit ${limit} , url ${page.url()}`)
            break
        }
        let lastHeight = await page.evaluate(`document.querySelector("${scrollContainer}").scrollHeight`);
        // await sleep(400)
        await page.evaluate(`document.querySelector("${scrollContainer}").scrollTo(0, document.querySelector("${scrollContainer}").scrollHeight)`);

        count++

        // await page.waitForNavigation({waitUntil: 'domcontentloaded'}); // consider navigation to be finished when the DOMContentLoaded event is fired.
        await waitTillHTMLRendered(page)
        let newHeight = await page.evaluate(`document.querySelector("${scrollContainer}").scrollHeight`);

        if (await page.$("span.HlvSq")) {
            const button = await page.$eval("span.HlvSq", el => el.innerText)
            if (button.includes("reached the end")) {
                logger.info(`button ${button}`)
                break
            }
        }
        lastHeight = newHeight;
    }
    await extractLinks(page)
}

const processMap = async (page, keyword) => {
    if (!keyword) {
        logger.error(`sleeping | google list task queue is empty!`)
        await sleep(10000)
        return false
    }
    logger.info(`processing address: ${keyword}`)
    const url = `https://www.google.com/maps/search/${keyword}?hl=en`
    logger.info(`visiting url: ${url}`)
    await googleMapConsentCheck(page)
    await page.waitForNetworkIdle()
    const notfound = await page.$("div[id='QA0Szd']")
    if (notfound) {
        const notFoundText = await page.$eval("div[id='QA0Szd']", el => el.innerText)
        if (notFoundText && notFoundText.includes("Google Maps can't find ")) {
            logger.error(`${keyword} not found!`)
            return false
        }
    }
    await scrollPage(page, "div[role='feed']")
    return true
}


const getFullPath = (url) => {
    const u = new URL(url)
    return u.protocol + "//" + u.host + u.pathname
}

const extractLinks = async (page) => {
    const links = await page.$$eval("a", link => link.map(a => a.href))
    logger.info(`total ${links.length} links found!`)
    for (let link of links) {
        if (link.includes(PLACE_PATH)) {
            link = getFullPath(link)
            logger.info(`link ${link} is added to queue`)
            await redisClient.lPush(googleMapAddressTaskQueue, link)
        }

    }
}

const getKeywordFromUrl = (url) => {
    return url?.split("maps/search/")[1]?.split("/")[0].split("?")[0]
}


const preNavigationHook = async (crawlingContext, gotoOptions) => {
    const { request } = crawlingContext;
    const url = request.url
    if (url.includes(SEARCH_PATH)) {
        const keyword = getKeywordFromUrl(url)
        await store.setValue(request.id, keyword)
    }

}

const requestHandler = async ({ page, request }) => {
    const start = Date.now()
    await page.waitForNetworkIdle()
    const url = page.url()
    logger.info(`Processing: ${request.url} | page url ${page.url()}`)
    if (url?.includes(PLACE_PATH)) {
        logger.info(`new url ${url} is added to queue`)
        await redisClient.lPush(googleMapAddressTaskQueue, url)
        return
    }
    const keyword = getKeywordFromUrl(request.url)
    const result = await processMap(page, keyword)
    if (result) {
        await redisClient.incr(googleMapPostcodeTaskSuccessStats)
        logger.info(`scraping address info cost time ${(Date.now() - start) / 1000} s`)
    }
    await store.setValue(request.id, null)
}

const errorHandler = async ({ request, error }) => {
    const keyword = getKeywordFromUrl(request.url)
    logger.error(`errorHandler | error found: ${request.url} | keyword ${keyword}, err: ${error}`)
}

const failedRequestHandler = async ({ request, error }) => {
    await redisClient.incr(googleMapPostcodeTaskFailureStats)
    const url = request.url
    logger.error(`failedRequestHandler | error:${request.url}, err: ${error}`)
    const errMsg = error?.message.toLowerCase() || ""
    const keyword = getKeywordFromUrl(request.url)
    if (errMsg.includes("timeout") || errMsg.includes("net::err")) {
        logger.error(`${url} is putting back to queue!`)
        await redisClient.lPush(googleMapPostcodetaskQueue, keyword)
        return
    }
    await redisClient.lPush(googleMapPostcodetaskErrorQueue, keyword)
}

const backupRequestQueue = async (queue, store, signal) => {
    if (handledSignal) {
        logger.info(`${signal} already handled`)
        return
    }
    handledSignal = true
    logger.info(`GOT ${signal}, backing up!`)
    let count = 0
    for (const [key, value] of Object.entries(queue?.queueHeadDict?.dictionary)) {
        const qResult = await queue.getRequest(key)
        const url = qResult.url
        if (!url.includes(SEARCH_PATH)) {
            continue
        }
        count += 1
        const keyword = getKeywordFromUrl(url)
        logger.info(`${signal} signal recieved, backing up key: ${key}, keyword: ${keyword} | request: ${url}`)
        redisClient.lPush(googleMapPostcodetaskQueue, keyword)
    }
    logger.info(`tasks in queue count: ${count}`)
    count = 0
    await store.forEachKey(async (key, index, info) => {
        count += 1
        const url = await store.getValue(key)
        logger.info(`running tasks: key: ${key} ,url:  ${url}`)
        const keyword = getKeywordFromUrl(url)
        if (keyword) {
            redisClient.lPush(googleMapPostcodetaskQueue, keyword)
        }

    });
    logger.info(`Google Map Search: running tasks count: ${count}`)

}

const config = Configuration.getGlobalConfig();
config.set('disableBrowserSandbox', true);


async function index() {
    store = await KeyValueStore.open('google-map-list');
    const queue = await RequestQueue.open("google-map-list");
    var crawler = new PuppeteerCrawler({
        launchContext: {
            launchOptions: {
                handleSIGINT: false,
                handleSIGTERM: false,
                // Other Puppeteer options
            },
        },
        requestQueue: queue,
        headless: true,
        browserPoolOptions: BROWSER_POOL_OPTIONS,
        navigationTimeoutSecs: 2 * 60,
        requestHandlerTimeoutSecs: 10 * 60,
        keepAlive: true,
        maxRequestRetries: 3,
        minConcurrency: CRAWLEE_MIN_CONCURRENCY | 1,
        maxConcurrency: CRAWLEE_MAX_CONCURRENCY | 3,
        preNavigationHooks: [preNavigationHook],
        requestHandler: requestHandler,
        errorHandler: errorHandler,
        failedRequestHandler: failedRequestHandler,
    });

    ['SIGINT', 'SIGTERM', "uncaughtException"].forEach(signal => process.on(signal, async () => {
        await backupRequestQueue(queue, store, signal)
        await crawler.teardown()
        await sleep(200)
        process.exit(1)
    }))

    var keyword
    var errCount = 0
    crawler.run()
    while (true) {
        // const keyword  = "hotel+near+t7x+ca"
        const queueSize = (await crawler.getRequestQueue())?.queueHeadDict?.linkedList.length || 0
        logger.info(`crawler status ${crawler.running} , queue size is ${queueSize}`)
        if (!crawler.running || queueSize > 10) {
            await sleep(1000)
            if (!crawler.running) {
                crawler.run()
            }
            continue
        }
        keyword = await redisClient.rPop(googleMapPostcodetaskQueue)
        try {
            if (!keyword) {
                logger.info(`postcode task queue is empty, sleeping 100000 ms`)
                await sleep(100000)
                continue
            }
            var url
            if (keyword.includes(SEARCH_PATH)) {
                url = keyword
            } else {
                url = `https://www.google.com/maps/search/${keyword}?hl=en`
            }
            logger.info(`new address added ${url}`)
            await crawler.addRequests([url]);
            await sleep(300)
        } catch (e) {
            await redisClient.incr(googleMapPostcodeTaskFailureStats)
            logger.error(e)
            logger.error(`keyword: ${keyword}, err: ${e}, proxy: ${PROXY_HOST}`)
            if (e.message?.toLowerCase().includes("timeout") || e.message.toLowerCase().includes("err_connection_closed")) {
                logger.error(`${keyword} is putting back to queue!`)
                errCount++
                await redisClient.lPush(googleMapPostcodetaskQueue, keyword)
                continue
            }
            await redisClient.lPush(googleMapPostcodetaskErrorQueue, keyword)
            await sleep(500)
        }


    }

The provided source code mainly focuses on extracting information from Google Maps using CSS selectors, which is relatively straightforward. As spot instances can be terminated at any time, it is essential to handle this situation carefully.

To solve this issue, we need to implement code that listens for the SIGTERM and SIGINT events. These events indicate that the instance is about to be terminated. When these events are triggered, we should take appropriate actions to backup any pending tasks in the job queue and also preserve the state of any running tasks that haven’t been completed yet.

By listening to these signals, we can intercept the termination process and ensure that critical data and tasks are not lost. The backup mechanism enables us to store any unfinished work safely, allowing for a seamless continuation of tasks when new instances are launched in the future.

['SIGINT', 'SIGTERM', "uncaughtException"].forEach(signal => process.on(signal, async () => {
 await backupRequestQueue(queue, store, signal)
 await crawler.teardown()
 await sleep(200)
 process.exit(1)
}))

2. Google Map Business Detail Scraper

'use strict';
const { RequestQueue, PuppeteerCrawler, Configuration, utils, KeyValueStore } = require('crawlee');
const { PROXY_HOST, CRAWLEE_MIN_CONCURRENCY, CRAWLEE_MAX_CONCURRENCY } = process.env

const { md5 } = require('../md5');
const { sleep, googleMapConsentCheck } = require('../utils');
const { logger } = require('../log');
const { parseTextDuration } = require('../time');
const { client: redisClient } = require('../redis');
const { getGoogleMap, insertGoogleMap } = require('./model');
const { PUPPETEER_MINIMAL_ARGS, BROWSER_POOL_OPTIONS } = require('../const');

const googleMapAddressTaskQueue = "googlemap:address:task:queue"
const googleMapAddressTaskErrorQueue = "googlemap:address:task:error:queue"

const googleMapAddressTaskSuccess = "googlemap:address:task:success"
const googleMapAddressTaskFailure = "googlemap:address:task:failure"
var store;
var hasMore = true;
var handledSignal = false;

async function scrollPage(page, scrollContainer, limit = 30) {
    let count = 0
    while (true) {
        if (count >= limit) {
            logger.error(`reached scoll page limit ${limit} , url ${page.url()}`)
            break
        }
        let lastHeight = await page.evaluate(`document.querySelector("${scrollContainer}").scrollHeight`);
        logger.info(`scrollPage lastHeight ${lastHeight}`)
        await page.evaluate(`document.querySelector("${scrollContainer}").scrollTo(0, document.querySelector("${scrollContainer}").scrollHeight)`);
        count++
        await page.waitForTimeout(2000);
        let newHeight = await page.evaluate(`document.querySelector("${scrollContainer}").scrollHeight`);
        if (await page.$("span.xRkPPb")) {
            const dates = await page.evaluate(() => {
                return Array.from(document.querySelectorAll("span.xRkPPb")).map((el) => {
                    console.log(`date ${el.innerText.trim().split("on")[0]}`)
                    return el.innerText.trim().split("on")[0];
                });
            })
            const date = dates[dates.length - 1]
            if (date && (date.includes("year ago") || date.includes("years ago"))) {
                break
            }
        }
        if (await page.$("span.rsqaWe")) {
            const dates = await page.evaluate(() => {
                return Array.from(document.querySelectorAll("span.rsqaWe")).map((el) => {
                    return el.innerText.trim();
                });
            })
            const date = dates[dates.length - 1]
            if (date && (date.includes("year ago") || date.includes("years ago"))) {
                break
            }
            lastHeight = newHeight;
        }
    }
}

const processAbout = async (page) => {
    if (! await page.$("button[aria-label*='About']")) {
        return null
    }
    await page.click("button[aria-label*='About']")
    await sleep(500)
    if (!(await page.$("h2"))) {
        return null
    }
    await page.waitForSelector("h2", { timeout: 1000 * 10 })
    const list = await page.$$("div.fontBodyMedium")
    var data = {}
    if (list.length === 1) {
        const text = await page.evaluate(() => {
            return Array.from(document.querySelectorAll("div.P1LL5e")).map((el) => {
                return el.innerText.trim();
            });
        })
        const attrs = await page.evaluate(() => {
            return Array.from(document.querySelectorAll("div.WKLD0c .CK16pd")).map((el) => {
                return el.getAttribute("aria-label")
            })
        })
        return text.join("\n") + "\n" + attrs.join("\n")
    }
    for (let item of list) {
        if (! await item.$("h2")) {
            continue
        }
        let title = await item.$eval("h2", el => el.innerText)
        const texts = await item.$$("li")
        let items = []
        for (let t of texts) {
            const text = await t.$eval("span", el => el.getAttribute("aria-label"))
            items.push(text)
        }
        data[title] = items
    }
    return data
}

const processReviews = async (page) => {
    if (!(await page.$("button[aria-label*='Reviews']"))) {
        return null
    }
    await page.click("button[aria-label*='Reviews']")

    await page.waitForSelector("button[aria-label*='relevant'], button[aria-label*='Sort']", { timeout: 1000 * 10 })
    if (await page.$("button[aria-label*='relevant']")) {
        await page.click("button[aria-label*='relevant']")
    } else if (await page.$("button[aria-label*='Sort']")) {
        await page.click("button[aria-label*='Sort']")
    }

    await page.waitForSelector("div[id='action-menu'] div[data-index='1']", { timeout: 1000 * 10 })

    if (!(await page.$("div[id='action-menu'] div[data-index='1']"))) {
        return null
    }
    await page.click("div[id='action-menu'] div[data-index='1']")
    await page.waitForSelector("div.d4r55", { timeout: 1000 * 10 })
    const start = Date.now()
    await scrollPage(page, '.DxyBCb');
    logger.info(`processReviews scrollPage cost time ${(Date.now() - start) / 1000} s`)
    const reviews = await page.evaluate(() => {
        return Array.from(document.querySelectorAll(".jftiEf")).map((el) => {
            return {
                user: {
                    name: el.querySelector(".d4r55")?.textContent.trim(),
                    link: el.querySelector(".WNxzHc a")?.getAttribute("href"),
                    thumbnail: el.querySelector(".NBa7we")?.getAttribute("src"),
                    localGuide: el.querySelector(".RfnDt span:first-child")?.style.display === "none" ? undefined : true,
                    reviews: parseInt(el.querySelector(".RfnDt span:last-child")?.textContent.replace("·", "")),
                },
                rating: parseFloat(el.querySelector(".kvMYJc")?.getAttribute("aria-label") || parseInt(el.querySelector(".fzvQIb")?.textContent.split("/")[0]) / 5),
                snippet: el.querySelector(".MyEned")?.textContent.trim(),
                date: el.querySelector(".rsqaWe")?.textContent.trim() || el.querySelector(".xRkPPb")?.textContent.trim().split(" on")[0],
            };
        });
    });
    return reviews.map(r => {
        r.date = new Date(Date.now() - parseTextDuration(r.date) * 1000)
        return r
    });
}


const processMapItem = async (page) => {
    const url = page.url()
    const title = await page.$eval("h1", el => el.innerText)
    const review_el = await page.$("div.F7nice") ? await page.$eval("div.F7nice", el => el.innerText) : ""
    const star =  parseFloat( review_el?.split("(")[0] || 0 )
    const  review_count = parseInt( review_el?.split("(").length >= 2 ? review_el?.split("(")[1].split(")")[0] : 0)
    const headline = await page.$("div[aria-label*='About'] div[jslog*='metadata']") ? await page.$eval("div[aria-label*='About'] div[jslog*='metadata']", el => el.innerText) : ""
    const category = await page.$("button[jsaction='pane.rating.category']") ? await page.$eval("button[jsaction='pane.rating.category']", el => el.innerText) : (await page.$("span.mgr77e") ? await page.$eval("span.mgr77e", el => el.innerText) : null)
    let address = await page.$("button[data-item-id='address']") ? await page.$eval("button[data-item-id='address']", el => el.getAttribute("aria-label")) : ""
    address = address.replace("Address: ", "")
    const openHours = await page.$("div[aria-label*='Sunday']") ? await page.$eval("div[aria-label*='Sunday']", el => el.getAttribute("aria-label")) : null
    const checkIn = await page.$("div[data-item-id='place-info-links:'] .Io6YTe") ? await page.$eval("div[data-item-id='place-info-links:'] .Io6YTe", el => el.innerText) : null
    const book = await page.$("a.M77dve") ? await page.$eval("a.M77dve", el => el.getAttribute("href")) : null

    const website = await page.$("a[data-item-id='authority']") ? await page.$eval("a[data-item-id='authority']", el => el.getAttribute("href")) : null
    const phone = await page.$("button[aria-label*='Phone']") ? await page.$eval("button[aria-label*='Phone']", el => el.innerText) : null
    const pluscode = await page.$("button[aria-label*='Plus code']") ? await page.$eval("button[aria-label*='Plus code']", el => el.innerText) : null
    let start = Date.now()
    const about = await processAbout(page)
    logger.info(`processAbout cost time ${(Date.now() - start) / 1000} s | ${url}`)
    start = Date.now()
    const review = await processReviews(page)
    logger.info(`processReviews cost time ${(Date.now() - start) / 1000} s | ${url}`)
    const coordinate = parseCoordinateFromMapUrl(url)
    const result = {
        url: url,
        title: title,
        star: star,
        review_count: review_count,
        headline: headline,
        category: category,
        address: address,
        openHours: openHours,
        checkIn: checkIn,
        book: book,
        website: website,
        phone: phone,
        pluscode: pluscode,
        coordinate: coordinate,
        about: about,
        review: review
    }
    return result
}


const processMap = async (page, url) => {
    if (!url) {
        logger.error(`sleeping | url queue is empty!`)
        await sleep(10000)
        return false
    }
    logger.info(`visiting url: ${url}`)
    await googleMapConsentCheck(page)
    await page.waitForSelector("h1", { timeout: 10 * 1000 })
    await waitTillHTMLRendered(page)

    const start = Date.now()
    const result = await processMapItem(page)
    logger.info(`processMapItem cost time ${(Date.now() - start) / 1000} s || ${JSON.stringify(result)}`)

    result["id"] = parseMapId(url)
    const ok = await insertGoogleMap(result)
    if (ok) {
        logger.info(`google map ${result.title} is insertted into db successfully!`)
    }
    return true

}

const config = Configuration.getGlobalConfig();
config.set('disableBrowserSandbox', true);

const preNavigationHook = async (crawlingContext, gotoOptions) => {
    const { request } = crawlingContext;
    const url = request.url
    if (url.includes("www.google.com/maps/place")) {
        await store.setValue(request.id, url)
    }

}

const requestHandler = async ({ page, request }) => {
    const url = request.url

    if (!url.includes("www.google.com/maps/place")) {
        await store.setValue(request.id, null)
        logger.error(`illegal address ${url}`)
        return
    }
    const start = Date.now()
    await page.waitForNetworkIdle()
    logger.info(`Processing: ${url}`)
    const result = await processMap(page, url)
    if (result) {
        await redisClient.incr(googleMapAddressTaskSuccess)
        logger.info(`scraping address info cost time ${(Date.now() - start) / 1000} s`)
    }
    await store.setValue(request.id, null)
}



const errorHandler = async ({ request, error }) => {
    logger.error(`errorHandler | error found: ${request.url}, err: ${error}`)
}


const failedRequestHandler = async ({ request, error }) => {
    await redisClient.incr(googleMapAddressTaskFailure)
    const url = request.url
    logger.error(`failedRequestHandler | error:${request.url}, err: ${error}`)
    const errMsg = error?.message.toLowerCase() || ""
    if (errMsg.includes("timeout") || errMsg.includes("net::err")) {
        logger.error(`${url} is putting back to queue!`)
        await redisClient.lPush(googleMapAddressTaskQueue, url)
        return
    }
    logger.error(`${url} is putting back to error queue | errMsg: ${errMsg}`)
    await redisClient.lPush(googleMapAddressTaskErrorQueue, url)
}


const backupRequestQueue = async (queue, store, signal) => {
    if (handledSignal) {
        logger.info(`${signal} already handled`)
        return
    }
    handledSignal = true
    logger.info(`GOT ${signal}, backing up!`)
    let count = 0
    for (const [key, value] of Object.entries(queue?.queueHeadDict?.dictionary)) {
        const qResult = await queue.getRequest(key)
        if (!qResult.url.includes("www.google.com/maps/place")) {
            continue
        }
        count += 1
        logger.info(`${signal} signal recieved, backing up key: ${key} | request: ${qResult.url}`)
        redisClient.lPush(googleMapAddressTaskQueue, qResult.url)
    }
    logger.info(`tasks in queue count: ${count}`)
    count = 0
    await store.forEachKey(async (key, index, info) => {
        count += 1
        const val = await store.getValue(key)
        logger.info(`running tasks: key: ${key} url:  ${val}`)
        if (val) {
            redisClient.lPush(googleMapAddressTaskQueue, val)
        }
    });
    logger.info(`running tasks count: ${count}`)

}


async function index() {
    store = await KeyValueStore.open('google-map-item');
    const queue = await RequestQueue.open("google-map-item");
    var crawler = new PuppeteerCrawler({
        launchContext: {
            launchOptions: {
                handleSIGINT: false,
                handleSIGTERM: false,
                // Other Puppeteer options
            },
        },
        requestQueue: queue,
        headless: true,
        browserPoolOptions: BROWSER_POOL_OPTIONS,
        navigationTimeoutSecs: 2 * 60,
        requestHandlerTimeoutSecs: 5 * 60,
        keepAlive: true,
        maxRequestRetries: 3,
        minConcurrency: CRAWLEE_MIN_CONCURRENCY | 1,
        maxConcurrency: CRAWLEE_MAX_CONCURRENCY | 3,
        preNavigationHooks: [preNavigationHook],
        requestHandler: requestHandler,
        errorHandler: errorHandler,
        failedRequestHandler: failedRequestHandler,
    });
    ['SIGINT', 'SIGTERM', 'uncaughtException'].forEach(signal => process.on(signal, async () => {
        await backupRequestQueue(queue, store, signal)
        await crawler.teardown()
        await sleep(200)
        process.exit(1)
    }))

    var url
    var errCount = 0
    crawler.run()
    while (true) {
        const queueSize = queue?.queueHeadDict?.linkedList.length || 0
        logger.info(`crawler status ${crawler.running} , queue size is ${queueSize}`)
        if (!crawler.running || queueSize > 10) {
            await sleep(1000)
            if (!crawler.running) {
                crawler.run()
            }
            continue
        }

        url = await redisClient.lPop(googleMapAddressTaskQueue)
        const start = Date.now()
        try {
            if (!url) {
                const s = 5 * 1000
                logger.error(`url is empty | sleeping ${s} ms`)
                await sleep(s)
                continue
            }
            const mapId = parseMapId(url)
            if (!mapId) {
                logger.error(`mapId is empty, url ${url}`)
                continue
            }
            if (!url.includes("www.google.com/maps/place")) {
                logger.error(`illegal address ${url}`)
                continue
            }
            const ok = await getGoogleMap(mapId)
            if (ok) {
                logger.info(`already processed ${url}, bypassing`)
                continue
            }
            const param = "?authuser=0&hl=en&rclk=1"
            if (!url.includes(param)) {
                url += param
            }
            logger.info(`new url added ${url}`)
            await crawler.addRequests([url]);
            await sleep(500)

        } catch (e) {
            logger.error(e)
            await redisClient.incr(googleMapAddressTaskFailure)
            errCount++
            logger.error(`url: ${url}, err: ${e}, proxy: ${PROXY_HOST} | scraping address info cost time ${(Date.now() - start) / 1000} s`)
            const errMsg = e.message?.toLowerCase() || ""
            if (errMsg.includes("timeout") || errMsg.includes("net::err")) {
                logger.error(`${url} is putting back to queue!`)
                await redisClient.lPush(googleMapAddressTaskQueue, url)
                continue
            }
            await redisClient.lPush(googleMapAddressTaskErrorQueue, url)
            await sleep(400)
        }


    }
}

3. Deployment file for Kubernetes

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: node-crawler-env
data:
  ENV: prod
  MONGO_URI: mongodb+srv://googlemap-crawler:[email protected]/?retryWrites=true&w=majority
  MONGO_DBNAME: googlemap
  REDIS_HOST: chatloop-redis.cache.amazonaws.com
  REDIS_PORT: "6379"
  REDIS_PASSWORD: ""
  PUPPETEER_SKIP_CHROMIUM_DOWNLOAD: "true"
  PUPPETEER_EXECUTABLE_PATH: /usr/bin/google-chrome-stable
  CRAWLEE_MIN_CONCURRENCY: "3"
  CRAWLEE_MAX_CONCURRENCY: "15"
  CRAWLEE_MEMORY_MBYTES: "4096"
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: node-crawler-map-list
spec:
  replicas: 410
  selector:
    matchLabels:
      app: node-crawler-map-list
  strategy:
    rollingUpdate:
      maxSurge: 4
      maxUnavailable: 4
    type: RollingUpdate
  template:
    metadata:
      labels:
        app: node-crawler-map-list
    spec:
      containers:
        - name: node-crawler-map-list
          image: node-crawler:20230720084044
          command: ["node"]
          args: ["./google/addressList.js"]
          imagePullPolicy: IfNotPresent
          envFrom:
            - configMapRef:
                name: node-crawler-env
          resources:
            requests:
              cpu: 1000m
              memory: 1000Mi
            limits:
              cpu: 4000m
              memory: 4096Mi

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: node-crawler-map-item
spec:
  replicas: 40
  selector:
    matchLabels:
      app: node-crawler-map-item
  strategy:
    rollingUpdate:
      maxSurge: 8
      maxUnavailable: 8
    type: RollingUpdate
  template:
    metadata:
      labels:
        app: node-crawler-map-item
    spec:
      containers:
        - name: node-crawler-map-item
          image: node-crawler:20230720084044
          command: ["node"]
          args: ["./google/item.js"]
          imagePullPolicy: IfNotPresent
          envFrom:
            - configMapRef:
                name: node-crawler-env
          resources:
            requests:
              cpu: 1000m
              memory: 1000Mi
            limits:
              cpu: 4000m
              memory: 4096Mi

Monitoring and Optimizing the performance

As of now, everything with Crawlee appears to be functioning well, except for one critical issue. After running in the Kubernetes (k8s) cluster for approximately one hour, the performance of Crawlee experiences a significant drop, resulting in the extraction of only a few hundred items per hour, whereas initially, it was extracting at a much higher rate. Interestingly, this issue is not encountered when using a standalone container with Docker Compose on a dedicated machine.

Moreover, while monitoring the cluster, you may observe a drastic decrease in CPU utilization from around 90% to merely 10%, especially if you have the metric-server installed. This unexpected behavior is concerning and requires investigation to identify the underlying cause.

To address this performance degradation and ensure efficient resource utilization, you have taken the initiative to leverage the Kubernetes API and client-go, the Golang SDK for Kubernetes. By utilizing these tools, you can effectively monitor the CPU utilization of all instances in the cluster. To further mitigate this issue, you have implemented a solution to automatically terminate instances that exhibit very low CPU utilization and have been active for at least 30 minutes.

By automatically terminating such instances, you can avoid inefficiencies in resource allocation and ensure that underperforming instances do not hamper the overall data extraction process. This proactive approach helps maintain the cluster’s performance and ensures that Crawlee operates optimally, delivering consistent and reliable results even in the dynamic and challenging Kubernetes environment.

func cleanupNodesBasedOnResourceUsage() {

	var metrics []Resource
	nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
	if err != nil {
		panic(err)
	}

	for _, node := range nodes.Items {
		if node.Spec.Unschedulable {
			continue
		}
		var metric = Resource{}
		cpuAvailable := node.Status.Allocatable[v1.ResourceCPU]
		memoryAvailable := node.Status.Allocatable[v1.ResourceMemory]

		metric.Name = node.Name
		metric.Cpu = cpuAvailable.AsApproximateFloat64()
		metric.Memory = memoryAvailable.AsApproximateFloat64() / (1024 * 1024 * 1024)
		metric.CreationTimestamp = node.CreationTimestamp.Time
		metrics = append(metrics, metric)
	}

	metricsClient, err := metricsv.NewForConfig(config)
	if err != nil {
		panic(err)
	}
	nodeMetrics, err := metricsClient.MetricsV1beta1().NodeMetricses().List(ctx, metav1.ListOptions{})
	if err != nil {
		panic(err)
	}

	for _, metric := range nodeMetrics.Items {
		index := findResourceIndexByNode(metrics, metric.Name)
		if index == len(metrics) {
			continue
		}
		resource := metrics[index]
		resource.CpuUsage = metric.Usage.Cpu().AsApproximateFloat64()
		resource.MemoryUsage = metric.Usage.Memory().AsApproximateFloat64() / (1024 * 1024 * 1024)
		metrics[index] = resource
	}

	log.Info("running ec2 count %d, ", len(metrics))
	metrics = lo.Filter(metrics, func(metric Resource, _ int) bool {
		if metric.CpuUsage/metric.Cpu > 0.5 {
			return false
		}
		return metric.CreationTimestamp.Add(30 * time.Minute).Before(time.Now())
	})

	sort.Slice(metrics, func(i, j int) bool {
		return metrics[i].CpuUsage/metrics[i].Cpu < metrics[j].CpuUsage/metrics[j].Cpu
	})
	var result []Resource
	if len(metrics) == 0 {
		log.Error("Not candiate instances to be terminated!")
		return
	}
	if len(metrics) == 1 {
		result = metrics
	}
	if len(metrics) > 1 {
		result = metrics[0 : len(metrics)/2]
	}

	log.Info("potential nodes to be terminated %d ", len(result))
	var instances []string
	for _, item := range result {
		instances = append(instances, item.Name)
		log.Info("Going to terminate %s, because its resource utilization %0.2f is too low", item.Name, item.CpuUsage/item.Cpu)
		aws.TerminateInstancesByPrivateDnsNames([]string{item.Name})
		time.Sleep(10 * time.Second)
	}
	log.Info("Instances %s are terminated successfully becaue of low resource utilization! ", instances)

}

the provided code aims to address the issue of low CPU utilization in Kubernetes nodes by utilizing the Kubernetes metrics API to filter out underperforming nodes. Subsequently, the instance termination process is executed through the AWS Go SDK.

To ensure the successful implementation of this solution in a Kubernetes (k8s) cluster, additional steps are required. Specifically, we need to create a ServiceAccount, ClusterRole, and ClusterRoleBinding to properly assign the necessary permissions to the nodes-cleanup-cron-task. These permissions are essential for the task to effectively query the relevant Kubernetes resources and perform the required actions.

The ServiceAccount is responsible for providing an identity to the nodes-cleanup-cron-task, allowing it to authenticate with the Kubernetes API server. The ClusterRole defines a set of permissions that the task requires to interact with the necessary resources, in this case, the metrics API and other Kubernetes objects. Finally, the ClusterRoleBinding connects the ServiceAccount and ClusterRole, granting the task the permissions specified in the ClusterRole.

By establishing this set of permissions and associations, we ensure that the nodes-cleanup-cron-task can access and query the metrics API and other Kubernetes resources, effectively identifying nodes with low CPU utilization and terminating instances using the AWS Go SDK.

---
apiVersion: batch/v1
kind: CronJob
metadata:
  name: nodes-cleanup-cron-task
  namespace: default
spec:
  schedule: "*/3 * * * *"
  concurrencyPolicy: Allow
  failedJobsHistoryLimit: 5
  successfulJobsHistoryLimit: 5
  jobTemplate:
      spec:
        activeDeadlineSeconds: 3000
        backoffLimit: 5
        template:
          metadata:
            labels:
              app: nodes-cleanup-cron-task
          spec:
            serviceAccountName: nodes-cleanup-cron-task-ca
            restartPolicy: OnFailure
            priorityClassName: system-node-critical
            containers:
              - name: nodes-cleanup-cron-task
                image: go-crawler:20230715065044
                command: ["./crawler"]
                args: ["-w", "k8s-cleanup"]
                resources:
                  requests:
                    cpu: 100m
                    memory: 200Mi
                  limits:
                    cpu: 300m
                    memory: 500Mi

---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: nodes-cleanup-cron-task-ca

---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: nodes-cleanup-cron-task-ca-role
rules:
  - apiGroups: [""]
    resources:
      - pods
      - nodes
    verbs:
      - get
      - list
      - watch
      - delete
      
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: nodes-cleanup-cron-task-cr
rules:
- apiGroups: 
    - "*"
  resources:
    - pods
    - nodes
  verbs:
    - get
    - patch
    - list
    - watch
    - delete
- apiGroups: 
    - "apps"
    - "extensions"
  resources:
    - pods
    - nodes
  verbs:
    - get
    - patch
    - list
    - watch
    - delete
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: nodes-cleanup-cron-task-ca-csb
subjects:
  - kind: ServiceAccount
    name: nodes-cleanup-cron-task-ca
    namespace: default
roleRef:
  kind: ClusterRole
  name: nodes-cleanup-cron-task-ca-cr
  apiGroup: rbac.authorization.k8s.io 

---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: nodes-cleanup-cron-task-ca-sb
subjects:
  - kind: ServiceAccount
    name: nodes-cleanup-cron-task-ca
roleRef:
  kind: Role
  name: nodes-cleanup-cron-task-ca-role
  apiGroup: rbac.authorization.k8s.io 

Conclusion

At this stage, the majority of the code is complete, and you have the capability to deploy it on any cloud server with Kubernetes (k8s). This flexibility allows you to scale the application effortlessly, expanding the number of instances as needed to meet your specific requirements.

One of the key advantages of the design lies in its termination tolerance. With the implemented safeguards to handle SIGTERM and SIGINT events, you can deploy spot instances without concerns about potential data loss. Even when spot instances are terminated unexpectedly, the application gracefully manages the data in the job queue and running tasks.

By leveraging this termination tolerance feature, the application can handle spot instance terminations smoothly. This ensures that any pending tasks in the job queue are backed up safely and that the state of running tasks, which haven’t completed yet, is preserved. Consequently, you can rest assured that the integrity of your data and tasks will be maintained throughout the operation.

Deploying the application with Kubernetes and taking advantage of termination tolerance empowers you to scale the Google Maps scraper efficiently, managing numerous instances to meet your data extraction needs effectively. The combination of Kubernetes and the termination tolerance design enhances the overall robustness and reliability of the application, allowing for seamless operation even in the dynamic and unpredictable cloud environment. If you have any questions regarding this article or any suggestions for future articles, please leave a comment below. Additionally, I am available for remote work or contracts, so please feel free to reach out to me via email.