scraper

View Source
import time
import asyncio
import logging
import controller as controller
from config import get_value
from aiogram import Bot
from aiogram.types import ParseMode
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import Select


LOGGER = logging.getLogger()

# login url
LOGIN_URL = "https://start.prenotazionevaccinicovid.regione.lombardia.it"
# booked url
SEARCH_URL = "https://start.prenotazionevaccinicovid.regione.lombardia.it/cit/#/ricerca"
# fake user data error message
FAKE_USER = "I codici inseriti non sono corretti o non corrispondono a persona appartenente a categoria oggetto della fase corrente del piano vaccinale."


def wait_until_present(driver, xpath=None, class_name=None, el_id=None, name=None, duration=5, frequency=0.01):
  """Wait until element is present

  Args:
      driver: webdriver
      xpath (string): element xpath
      class_name (string): element class name
      el_id (string): element id
      name (string): element name attribute
      duration (int): number of seconds before timing out
      frequency (float): sleep interval between calls

  Raises:
      TimeoutException: timed out

  Returns:
      WebElement: element waited
  """
  if xpath:
    return WebDriverWait(driver, duration, frequency).until(EC.presence_of_element_located((By.XPATH, xpath)))
  elif class_name:
    return WebDriverWait(driver, duration, frequency).until(EC.presence_of_element_located((By.CLASS_NAME, class_name)))
  elif el_id:
    return WebDriverWait(driver, duration, frequency).until(EC.presence_of_element_located((By.ID, el_id)))
  elif name:
    return WebDriverWait(driver, duration, frequency).until(EC.presence_of_element_located((By.NAME, name)))


def switch_filter(driver, mode):
  """Switch appointments sorting

  Args:
      driver: webdriver
      mode (string): sorting mode
  """
  wait_until_present(driver, el_id="sortSelect")
  driver.find_element_by_id("sortSelect").click()
  Select(driver.find_element_by_id("sortSelect")).select_by_visible_text(mode)
  time.sleep(1)
  try:
    overlay = driver.find_element_by_class_name("overlay")
    driver.execute_script("arguments[0].style.visibility='hidden'", overlay)
  except Exception:
    pass


def are_equal(old_appointment, new_appointment, short=False):
  """Check if two appointments are equal

  Args:
      old_appointment (dict): date and place of old appointment
      new_appointment (dict): date and place of new appointment
      short (boolean): False if appointments are reduced (see controller.get_appointments()) else True

  Returns:
      True: appointments are equal
      False: appointments are not equal
  """
  if short:
    return (f"{old_appointment['date'][:5]} {old_appointment['date'][11:16]}" == new_appointment["date"] and old_appointment["place"].replace("CENTRO VACCINALE: ", "") == new_appointment["place"])
  else:
    return (time.strptime(old_appointment["date"], "%d/%m/%Y %H:%M:%S") == time.strptime(new_appointment["date"], "%d/%m/%Y %H:%M:%S") and old_appointment["place"] == new_appointment["place"])


def login(driver, username, password):
  """Perform the login on the website

  Args:
      driver: webdriver
      username (string): health card code
      password (string): fiscal code
  """
  driver.get(LOGIN_URL)
  # try pressing a spam button that appears sometimes
  try:
    time.sleep(1)
    driver.execute_script("document.getElementsByClassName('btn btn-primary btn-sm ng-star-inserted')[0].click()")
  except Exception:
    pass
  # wait fields and fill them
  wait_until_present(driver, el_id="username")
  driver.find_element_by_id("username").send_keys(username)
  wait_until_present(driver, el_id="password")
  driver.find_element_by_id("password").send_keys(password)
  # click privacy checkbox
  driver.execute_script("document.getElementById('privacy').click()")
  # click login button
  driver.execute_script("document.getElementsByClassName('btn btn-primary btn-icon')[0].click()")


def find(driver, region, country, postal_code, phone, date):
  """Perform booking page filling

  Args:
      driver: webdriver
      region (string): region of residence
      country (string): country of residence
      postal_code (string): CAP of residence
      phone (string): phone number
      date (string): birthdate dd/mm/YYYY

  Returns:
      True: user has already booked an appointment
      False: filling done
  """
  # check if page content is different from booking page (e.g. element with bookingCode id)
  try:
    wait_until_present(driver, el_id="bookingCode", duration=3)
    return True
  except Exception:
    pass
  # wait fields and fill them
  wait_until_present(driver, el_id="birthDate")
  driver.find_element_by_id("birthDate").send_keys(date)
  driver.find_element_by_id("phoneNumber").send_keys(phone)
  # remove overlay that hides 'select' elements
  try:
    overlay = driver.find_element_by_class_name("modelOverlay")
    driver.execute_script("arguments[0].style.visibility='hidden'", overlay)
  except Exception:
    pass
  # TODO 4: above overlay can appear also after clicking on a 'select'
  # TODO 5: wait for 'select' elements to appear (useful when the website is slow)
  driver.find_element_by_xpath("//select[@formcontrolname='provinceId']").click()
  Select(driver.find_element_by_xpath("//select[@formcontrolname='provinceId']")).select_by_visible_text(region)
  driver.find_element_by_xpath("//select[@formcontrolname='cityId']").click()
  Select(driver.find_element_by_xpath("//select[@formcontrolname='cityId']")).select_by_visible_text(country)
  driver.find_element_by_xpath("//select[@formcontrolname='postalCode']").click()
  Select(driver.find_element_by_xpath("//select[@formcontrolname='postalCode']")).select_by_visible_text(postal_code)
  # click conditions checkbox
  driver.execute_script("document.getElementById('conditions').click()")
  # click search button
  driver.execute_script("document.getElementsByClassName('btn btn-primary btn-icon')[0].click()")
  return False


def check(driver):
  """Return dict with two list of appointments, one sorted by distance and one sorted by date, and the current time

  Args:
      driver: webdriver

  Returns:
      dict: dict with appointments_by_distance and appointments_by_date and last_fetch
  """
  # appointments by distance
  appointments_by_distance = []
  switch_filter(driver, "Distanza")
  appointments = driver.find_elements_by_class_name("text-wrap")
  for appointment in appointments:
    info = appointment.find_elements_by_tag_name("span")
    place = info[2].text.upper()
    date = f"{(info[0].text)[-10:]} {(info[1].text)[-5:]}:00"
    new_appointment = {"date": date, "place": place}
    appointments_by_distance.append(new_appointment)
  # appointments by date
  appointments_by_date = []
  switch_filter(driver, "Data")
  appointments = driver.find_elements_by_class_name("text-wrap")
  for appointment in appointments:
    info = appointment.find_elements_by_tag_name("span")
    place = info[2].text.upper()
    date = f"{(info[0].text)[-10:]} {(info[1].text)[-5:]}:00"
    new_appointment = {"date": date, "place": place}
    appointments_by_date.append(new_appointment)
  return {"appointments_by_distance": appointments_by_distance, "appointments_by_date": appointments_by_date, "last_fetch": time.strftime("%d/%m/%Y %H:%M:%S", time.localtime())}


def start_scraper():
  """Start scraper loop"""
  LOGGER.info("Starting scraper.")
  # bot instance
  bot = Bot(get_value("token"))
  # webdriver options
  options = webdriver.firefox.options.Options()
  options.headless = True
  # webdriver
  driver = webdriver.Firefox(options=options)
  # loop
  while True:
    try:
      # get all users
      # TODO 1: should get only active users (e.g with is_vaccinated=False)
      users = controller.get_users()
      # loop through users
      for user in users:
        # skip user if is vaccinated or if its notifications are disabled
        # TODO: see TODO 1
        # TODO 2: add 'notifications' flag to user to differentiate vaccinated users and muted notifications users
        if user["is_vaccinated"]:
          # next user
          continue
        # last fetched appointments
        last_appointments_by_distance = user["appointments_by_distance"]
        last_appointments_by_date = user["appointments_by_date"]
        # perform login
        login(driver, user["health_card"], user["fiscal_code"])
        # give the website the time to check if user data are fake
        time.sleep(3)
        # if user data are fake delete all its data and send him a notification
        if FAKE_USER in driver.page_source:
          asyncio.get_event_loop().run_until_complete(controller.delete_user(user["_id"]))
          asyncio.get_event_loop().run_until_complete(bot.send_message(
              user["_id"], "Ho cancellato i dati che hai registrato perchè non sono corretti. Rieffettua la registrazione con /registra"))
        # perform booking page filling
        is_vaccinated = find(driver, user["region"], user["country"], user["postal_code"], user["phone"], user["date"])
        # booking page not filled because user has already booked an appointment
        if is_vaccinated:
          # set user is_vaccinated to True to stop checking for him and send him a notification
          asyncio.get_event_loop().run_until_complete(controller.update_status(user["_id"], is_vaccinated))
          asyncio.run(bot.send_message(
              user["_id"], "Ho notato che hai già effettuato una prenotazione perciò non controllerò le date per te.\n\nSe dovessi annullare la prenotazione e volessi essere notificato ancora digita /reset\n\nSe vuoi cancellare i tuoi dati digita /cancella"))
          # wait 30 seconds for next user
          time.sleep(30)
          continue
        # get new appointments
        result = check(driver)
        # update user appointments and fetch date
        controller.update_appointments(user["_id"], result["appointments_by_distance"], result["appointments_by_date"], result["last_fetch"])
        # number of appointments
        appointments_by_distance_length = len(result["appointments_by_distance"])
        appointments_by_date_length = len(result["appointments_by_date"])
        # if appointments have been found
        if appointments_by_distance_length > 0 or appointments_by_date_length > 0:
          new_by_distance = ""
          # if appointments_by_distance have been found
          if appointments_by_distance_length > 0:
            # if user had no appointments_by_distance or first appointment of both old and new appointments is different
            # setup new_by_distance message
            if len(last_appointments_by_distance) == 0 or not are_equal(last_appointments_by_distance[0], result["appointments_by_distance"][0]):
              new_by_distance = f"Nuovo appuntamento ordinato per distanza:\n{result['appointments_by_distance'][0]['date']}\n{result['appointments_by_distance'][0]['place']}\n\n"

          new_by_date = ""
          # if appointments_by_date have been found
          if appointments_by_date_length > 0:
            # if user had no appointments_by_date or first appointment of both old and new appointments is different
            # setup new_by_distance message
            if len(last_appointments_by_date) == 0 or not are_equal(last_appointments_by_date[0], result["appointments_by_date"][0]):
              new_by_date = f"Nuovo appuntamento ordinato per data:\n{result['appointments_by_date'][0]['date']}\n{result['appointments_by_date'][0]['place']}\n\n"

          # if new first appointments have been found send a notification
          if new_by_distance != "" or new_by_date != "":
            asyncio.run(bot.send_message(
                user["_id"], f"{new_by_distance}{new_by_date}Per tutti gli appuntamenti disponibili digita /disponibili e per prenotare digita /prenota oppure effettua la procedura manuale: {LOGIN_URL}\nUsername: <pre>{user['health_card']}</pre>\nPassword: <pre>{user['fiscal_code']}</pre>", parse_mode=ParseMode.HTML))
        # clear cookies and wait 30 seconds for next user
        driver.delete_all_cookies()
        time.sleep(30)
      # get active users count
      active_users = controller.get_active_users()
      # calculate sleep time after the loop through users is completed
      if active_users > 0:
        sleep_time = 60 * int(15 if active_users < 50 else (15 / (active_users / 50)))
        time.sleep(sleep_time)
    except Exception as e:
      LOGGER.exception(e)
      try:
        # clear cookies
        driver.delete_all_cookies()
        # check for IP ban
        if "Sessione scaduta" in driver.page_source:
          LOGGER.info("IP bannato")
          # wait 60 minutes but I think it's useless, need to change IP
          time.sleep(60 * 60)
      except Exception as e:
        LOGGER.exception(e)
      # wait 5 minutes
      time.sleep(60 * 5)
#   def wait_until_present( driver, xpath=None, class_name=None, el_id=None, name=None, duration=5, frequency=0.01 ):
View Source
def wait_until_present(driver, xpath=None, class_name=None, el_id=None, name=None, duration=5, frequency=0.01):
  """Wait until element is present

  Args:
      driver: webdriver
      xpath (string): element xpath
      class_name (string): element class name
      el_id (string): element id
      name (string): element name attribute
      duration (int): number of seconds before timing out
      frequency (float): sleep interval between calls

  Raises:
      TimeoutException: timed out

  Returns:
      WebElement: element waited
  """
  if xpath:
    return WebDriverWait(driver, duration, frequency).until(EC.presence_of_element_located((By.XPATH, xpath)))
  elif class_name:
    return WebDriverWait(driver, duration, frequency).until(EC.presence_of_element_located((By.CLASS_NAME, class_name)))
  elif el_id:
    return WebDriverWait(driver, duration, frequency).until(EC.presence_of_element_located((By.ID, el_id)))
  elif name:
    return WebDriverWait(driver, duration, frequency).until(EC.presence_of_element_located((By.NAME, name)))

Wait until element is present

Args
  • driver: webdriver
  • xpath (string): element xpath
  • class_name (string): element class name
  • el_id (string): element id
  • name (string): element name attribute
  • duration (int): number of seconds before timing out
  • frequency (float): sleep interval between calls
Raises
  • TimeoutException: timed out
Returns

WebElement: element waited

#   def switch_filter(driver, mode):
View Source
def switch_filter(driver, mode):
  """Switch appointments sorting

  Args:
      driver: webdriver
      mode (string): sorting mode
  """
  wait_until_present(driver, el_id="sortSelect")
  driver.find_element_by_id("sortSelect").click()
  Select(driver.find_element_by_id("sortSelect")).select_by_visible_text(mode)
  time.sleep(1)
  try:
    overlay = driver.find_element_by_class_name("overlay")
    driver.execute_script("arguments[0].style.visibility='hidden'", overlay)
  except Exception:
    pass

Switch appointments sorting

Args
  • driver: webdriver
  • mode (string): sorting mode
#   def are_equal(old_appointment, new_appointment, short=False):
View Source
def are_equal(old_appointment, new_appointment, short=False):
  """Check if two appointments are equal

  Args:
      old_appointment (dict): date and place of old appointment
      new_appointment (dict): date and place of new appointment
      short (boolean): False if appointments are reduced (see controller.get_appointments()) else True

  Returns:
      True: appointments are equal
      False: appointments are not equal
  """
  if short:
    return (f"{old_appointment['date'][:5]} {old_appointment['date'][11:16]}" == new_appointment["date"] and old_appointment["place"].replace("CENTRO VACCINALE: ", "") == new_appointment["place"])
  else:
    return (time.strptime(old_appointment["date"], "%d/%m/%Y %H:%M:%S") == time.strptime(new_appointment["date"], "%d/%m/%Y %H:%M:%S") and old_appointment["place"] == new_appointment["place"])

Check if two appointments are equal

Args
  • old_appointment (dict): date and place of old appointment
  • new_appointment (dict): date and place of new appointment
  • short (boolean): False if appointments are reduced (see controller.get_appointments()) else True
Returns

True: appointments are equal False: appointments are not equal

#   def login(driver, username, password):
View Source
def login(driver, username, password):
  """Perform the login on the website

  Args:
      driver: webdriver
      username (string): health card code
      password (string): fiscal code
  """
  driver.get(LOGIN_URL)
  # try pressing a spam button that appears sometimes
  try:
    time.sleep(1)
    driver.execute_script("document.getElementsByClassName('btn btn-primary btn-sm ng-star-inserted')[0].click()")
  except Exception:
    pass
  # wait fields and fill them
  wait_until_present(driver, el_id="username")
  driver.find_element_by_id("username").send_keys(username)
  wait_until_present(driver, el_id="password")
  driver.find_element_by_id("password").send_keys(password)
  # click privacy checkbox
  driver.execute_script("document.getElementById('privacy').click()")
  # click login button
  driver.execute_script("document.getElementsByClassName('btn btn-primary btn-icon')[0].click()")

Perform the login on the website

Args
  • driver: webdriver
  • username (string): health card code
  • password (string): fiscal code
#   def find(driver, region, country, postal_code, phone, date):
View Source
def find(driver, region, country, postal_code, phone, date):
  """Perform booking page filling

  Args:
      driver: webdriver
      region (string): region of residence
      country (string): country of residence
      postal_code (string): CAP of residence
      phone (string): phone number
      date (string): birthdate dd/mm/YYYY

  Returns:
      True: user has already booked an appointment
      False: filling done
  """
  # check if page content is different from booking page (e.g. element with bookingCode id)
  try:
    wait_until_present(driver, el_id="bookingCode", duration=3)
    return True
  except Exception:
    pass
  # wait fields and fill them
  wait_until_present(driver, el_id="birthDate")
  driver.find_element_by_id("birthDate").send_keys(date)
  driver.find_element_by_id("phoneNumber").send_keys(phone)
  # remove overlay that hides 'select' elements
  try:
    overlay = driver.find_element_by_class_name("modelOverlay")
    driver.execute_script("arguments[0].style.visibility='hidden'", overlay)
  except Exception:
    pass
  # TODO 4: above overlay can appear also after clicking on a 'select'
  # TODO 5: wait for 'select' elements to appear (useful when the website is slow)
  driver.find_element_by_xpath("//select[@formcontrolname='provinceId']").click()
  Select(driver.find_element_by_xpath("//select[@formcontrolname='provinceId']")).select_by_visible_text(region)
  driver.find_element_by_xpath("//select[@formcontrolname='cityId']").click()
  Select(driver.find_element_by_xpath("//select[@formcontrolname='cityId']")).select_by_visible_text(country)
  driver.find_element_by_xpath("//select[@formcontrolname='postalCode']").click()
  Select(driver.find_element_by_xpath("//select[@formcontrolname='postalCode']")).select_by_visible_text(postal_code)
  # click conditions checkbox
  driver.execute_script("document.getElementById('conditions').click()")
  # click search button
  driver.execute_script("document.getElementsByClassName('btn btn-primary btn-icon')[0].click()")
  return False

Perform booking page filling

Args
  • driver: webdriver
  • region (string): region of residence
  • country (string): country of residence
  • postal_code (string): CAP of residence
  • phone (string): phone number
  • date (string): birthdate dd/mm/YYYY
Returns

True: user has already booked an appointment False: filling done

#   def check(driver):
View Source
def check(driver):
  """Return dict with two list of appointments, one sorted by distance and one sorted by date, and the current time

  Args:
      driver: webdriver

  Returns:
      dict: dict with appointments_by_distance and appointments_by_date and last_fetch
  """
  # appointments by distance
  appointments_by_distance = []
  switch_filter(driver, "Distanza")
  appointments = driver.find_elements_by_class_name("text-wrap")
  for appointment in appointments:
    info = appointment.find_elements_by_tag_name("span")
    place = info[2].text.upper()
    date = f"{(info[0].text)[-10:]} {(info[1].text)[-5:]}:00"
    new_appointment = {"date": date, "place": place}
    appointments_by_distance.append(new_appointment)
  # appointments by date
  appointments_by_date = []
  switch_filter(driver, "Data")
  appointments = driver.find_elements_by_class_name("text-wrap")
  for appointment in appointments:
    info = appointment.find_elements_by_tag_name("span")
    place = info[2].text.upper()
    date = f"{(info[0].text)[-10:]} {(info[1].text)[-5:]}:00"
    new_appointment = {"date": date, "place": place}
    appointments_by_date.append(new_appointment)
  return {"appointments_by_distance": appointments_by_distance, "appointments_by_date": appointments_by_date, "last_fetch": time.strftime("%d/%m/%Y %H:%M:%S", time.localtime())}

Return dict with two list of appointments, one sorted by distance and one sorted by date, and the current time

Args
  • driver: webdriver
Returns

dict: dict with appointments_by_distance and appointments_by_date and last_fetch

#   def start_scraper():
View Source
def start_scraper():
  """Start scraper loop"""
  LOGGER.info("Starting scraper.")
  # bot instance
  bot = Bot(get_value("token"))
  # webdriver options
  options = webdriver.firefox.options.Options()
  options.headless = True
  # webdriver
  driver = webdriver.Firefox(options=options)
  # loop
  while True:
    try:
      # get all users
      # TODO 1: should get only active users (e.g with is_vaccinated=False)
      users = controller.get_users()
      # loop through users
      for user in users:
        # skip user if is vaccinated or if its notifications are disabled
        # TODO: see TODO 1
        # TODO 2: add 'notifications' flag to user to differentiate vaccinated users and muted notifications users
        if user["is_vaccinated"]:
          # next user
          continue
        # last fetched appointments
        last_appointments_by_distance = user["appointments_by_distance"]
        last_appointments_by_date = user["appointments_by_date"]
        # perform login
        login(driver, user["health_card"], user["fiscal_code"])
        # give the website the time to check if user data are fake
        time.sleep(3)
        # if user data are fake delete all its data and send him a notification
        if FAKE_USER in driver.page_source:
          asyncio.get_event_loop().run_until_complete(controller.delete_user(user["_id"]))
          asyncio.get_event_loop().run_until_complete(bot.send_message(
              user["_id"], "Ho cancellato i dati che hai registrato perchè non sono corretti. Rieffettua la registrazione con /registra"))
        # perform booking page filling
        is_vaccinated = find(driver, user["region"], user["country"], user["postal_code"], user["phone"], user["date"])
        # booking page not filled because user has already booked an appointment
        if is_vaccinated:
          # set user is_vaccinated to True to stop checking for him and send him a notification
          asyncio.get_event_loop().run_until_complete(controller.update_status(user["_id"], is_vaccinated))
          asyncio.run(bot.send_message(
              user["_id"], "Ho notato che hai già effettuato una prenotazione perciò non controllerò le date per te.\n\nSe dovessi annullare la prenotazione e volessi essere notificato ancora digita /reset\n\nSe vuoi cancellare i tuoi dati digita /cancella"))
          # wait 30 seconds for next user
          time.sleep(30)
          continue
        # get new appointments
        result = check(driver)
        # update user appointments and fetch date
        controller.update_appointments(user["_id"], result["appointments_by_distance"], result["appointments_by_date"], result["last_fetch"])
        # number of appointments
        appointments_by_distance_length = len(result["appointments_by_distance"])
        appointments_by_date_length = len(result["appointments_by_date"])
        # if appointments have been found
        if appointments_by_distance_length > 0 or appointments_by_date_length > 0:
          new_by_distance = ""
          # if appointments_by_distance have been found
          if appointments_by_distance_length > 0:
            # if user had no appointments_by_distance or first appointment of both old and new appointments is different
            # setup new_by_distance message
            if len(last_appointments_by_distance) == 0 or not are_equal(last_appointments_by_distance[0], result["appointments_by_distance"][0]):
              new_by_distance = f"Nuovo appuntamento ordinato per distanza:\n{result['appointments_by_distance'][0]['date']}\n{result['appointments_by_distance'][0]['place']}\n\n"

          new_by_date = ""
          # if appointments_by_date have been found
          if appointments_by_date_length > 0:
            # if user had no appointments_by_date or first appointment of both old and new appointments is different
            # setup new_by_distance message
            if len(last_appointments_by_date) == 0 or not are_equal(last_appointments_by_date[0], result["appointments_by_date"][0]):
              new_by_date = f"Nuovo appuntamento ordinato per data:\n{result['appointments_by_date'][0]['date']}\n{result['appointments_by_date'][0]['place']}\n\n"

          # if new first appointments have been found send a notification
          if new_by_distance != "" or new_by_date != "":
            asyncio.run(bot.send_message(
                user["_id"], f"{new_by_distance}{new_by_date}Per tutti gli appuntamenti disponibili digita /disponibili e per prenotare digita /prenota oppure effettua la procedura manuale: {LOGIN_URL}\nUsername: <pre>{user['health_card']}</pre>\nPassword: <pre>{user['fiscal_code']}</pre>", parse_mode=ParseMode.HTML))
        # clear cookies and wait 30 seconds for next user
        driver.delete_all_cookies()
        time.sleep(30)
      # get active users count
      active_users = controller.get_active_users()
      # calculate sleep time after the loop through users is completed
      if active_users > 0:
        sleep_time = 60 * int(15 if active_users < 50 else (15 / (active_users / 50)))
        time.sleep(sleep_time)
    except Exception as e:
      LOGGER.exception(e)
      try:
        # clear cookies
        driver.delete_all_cookies()
        # check for IP ban
        if "Sessione scaduta" in driver.page_source:
          LOGGER.info("IP bannato")
          # wait 60 minutes but I think it's useless, need to change IP
          time.sleep(60 * 60)
      except Exception as e:
        LOGGER.exception(e)
      # wait 5 minutes
      time.sleep(60 * 5)

Start scraper loop