Impossible to fetch more than 50k records via API?

A reasonable third option :slight_smile: Though problematic as one needs to also handle gaps in the cursor when some middle record was deleted.

Which brings us to our 4th and I believe best option for those working in Python - introducing my best bubble data fetch script yet:

All our datas are belong to us

The following creates a Python generator that fetches all of your records via the Data API. It works by first checking how many records you have, then incrementing through them using one of two approaches:

  • If DB has less than 50k records, we use the naive-cursor-increment approach which simply walks the cursor across every record in your DB up to 50k.
  • If DB has more than 50k records, we slice the database into chunks using date constraints and then walk the cursor through those chunks 45k records at a time. See this pseudo code for details.

Every data fetch has a built in pause so as to not overload your bubble app as well as a retry mechanism to gracefully standby until your DB comes back online (if it were momentarily unreachable for whatever reason). All of these variables are configurable so you can, for example, increase the pause between data pulls. Here it is:

import os
import requests
import json
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import datetime
from dateutil import parser
import time

url = 'https://YOUR_BUBBLE_APP_NAME.bubbleapps.io/version-live/api/1.1/obj/DATA_TYPE_OF_INTEREST'
api_key = 'YOUR_APP_KEY'

def crawl_bubble_table(url, api_key, pause_in_seconds = 0.07, start_cursor=0):
  hard_api_limit = 50000
  soft_api_limit = 45000

  retry_strategy = Retry(
    total=6,
    backoff_factor=4,
    status_forcelist=[429, 500, 502, 503, 504],
  )
  adapter = HTTPAdapter(max_retries=retry_strategy)

  def fetch_records(session = None, cursor=0, limit = 100, constraints=None, sort_field=None, descending=False):
    payload = {"cursor": str(cursor),
                "limit": limit,
                "api_token": api_key,
                "constraints": json.dumps(constraints),
                "sort_field": sort_field,
                "descending": descending,
              }
    if session:
      response = session.get(url, params=payload)
    else:
      response = requests.get(url, params=payload)

    if response.status_code != 200:
      return response.status_code, {}
    elif response.json()['response']['results'] == []:
      return 404, {}
    else:
      return response.status_code, response.json()['response']

  def extract_python_datetime(_datetime):
    _datetime = parser.parse(_datetime)
    return _datetime.replace(tzinfo=None)

  def get_start_date_and_db_size():
    status, result = fetch_records(limit=1, sort_field= 'Created Date')
    start_date = result['results'][0]['Created Date']
    records_remaining = result['remaining']
    return extract_python_datetime(start_date), records_remaining

  def get_end_date():
    status, result = fetch_records(limit=1, sort_field= 'Created Date', descending='true')
    return extract_python_datetime(result['results'][0]['Created Date'])

  def add_millisecond(_datetime):
    _datetime = _datetime + datetime.timedelta(milliseconds=1)
    return _datetime, _datetime.isoformat() + 'Z'

  def subtract_millisecond(_datetime):
    _datetime = _datetime - datetime.timedelta(milliseconds=1)
    return _datetime, _datetime.isoformat() + 'Z'

  def ensure_tail_records_not_in_next_session(result):
    all_records = result['results']
    last_date = all_records[-1]['Created Date']
    returnable_records_mask = [key['Created Date']!=last_date for key in all_records]
    records_count = sum(returnable_records_mask)
    ignored_records_count = result['count'] - records_count
    returnable_records = [item for keep, item in zip(returnable_records_mask, all_records) if keep]
    returnable_result = {
      'count': records_count,
      'cursor': result['cursor'],
      'remaining': result['remaining'] + ignored_records_count,
      'results': returnable_records
    }
    return returnable_result

  status, result = fetch_records()
  if status != 200:
    yield status, result
  else:
    start_date, records_remaining = get_start_date_and_db_size()
    cursor = start_cursor
    if records_remaining < hard_api_limit:
      # Download chunks using naive cursor increment
      with requests.Session() as s:
        s.mount("https://", adapter)
        s.mount("http://", adapter)
        while records_remaining > 0:
          time.sleep(pause_in_seconds)
          status, result = fetch_records(cursor=cursor)
          records_remaining = result['remaining']
          cursor += result['count']
          yield status, result
    else:
      # Download chunks using date slicing increment
      end_date = get_end_date()
      end_date_plus_millisecond, end_date_plus_millisecond_string = add_millisecond(end_date)
      incremental_start_date, incremental_start_date_string = subtract_millisecond(start_date)
      # database_age_in_seconds = (end_date - start_date).seconds
      # records_per_second = (records_remaining + 1) / database_age_in_seconds
      cycling_sessions = True
      while cycling_sessions:
        search_constraints = [{
          'key': 'Created Date',
          'constraint_type': 'greater than',
          'value': incremental_start_date_string,
        }, {
          'key': 'Created Date',
          'constraint_type': 'less than',
          'value': end_date_plus_millisecond_string,
        }]
        with requests.Session() as s:
          s.mount("https://", adapter)
          s.mount("http://", adapter)
          while cursor < soft_api_limit and records_remaining > 0:
            time.sleep(pause_in_seconds)
            status, result = fetch_records(cursor=cursor, constraints=search_constraints, sort_field= 'Created Date')
            records_remaining = result['remaining']
            cursor += result['count']

            if cursor >= soft_api_limit and records_remaining > 0:
              # Last loop between sessions (right before next big crawl)
              trimmed_result = ensure_tail_records_not_in_next_session(result)
              # records_remaining = trimmed_result['remaining']
              # cursor += trimmed_result['count']
              yield status, trimmed_result
            else:
              yield status, result
        
        incremental_end_date = extract_python_datetime(result['results'][-1]['Created Date'])
        # records_remaining = True
        if end_date == incremental_end_date and result['remaining'] == 0:
          cycling_sessions = False
        else:
          cursor = 0
          incremental_start_date_string = trimmed_result['results'][-1]['Created Date']
          time.sleep(pause_in_seconds * 3)

And here’s how we use it:

crawl= crawl_bubble_table(url = url, api_key= api_key, pause_in_seconds = 0.1)

for status, result in crawl:
  print(result['remaining'], result['results'])
  # Do something with result['results'] here

This generator returns two variables, the status and the result of each call to our database. As long as the status is 200, we’ll have data in the result (otherwise result=[]). result is the standard Bubble response object containing the following keys, in order of usefullness:

  • results: your data stored as a list of Python dictionaries.
  • remaining: the number of remaining results to be fetched not including the current results.
  • cursor: the cursor value locating this group of results… not generally useful.
  • count: the number of results (dictionaries) in this fetch (almost always 100)… not generally useful.

Cheers,

8 Likes