A reasonable third option Though problematic as one needs to also handle gaps in the cursor when some middle record was deleted.
Which brings us to our 4th and I believe best option for those working in Python - introducing my best bubble data fetch script yet:
All our datas are belong to us
The following creates a Python generator that fetches all of your records via the Data API. It works by first checking how many records you have, then incrementing through them using one of two approaches:
- If DB has less than 50k records, we use the naive-cursor-increment approach which simply walks the cursor across every record in your DB up to 50k.
- If DB has more than 50k records, we slice the database into chunks using date constraints and then walk the cursor through those chunks 45k records at a time. See this pseudo code for details.
Every data fetch has a built in pause so as to not overload your bubble app as well as a retry mechanism to gracefully standby until your DB comes back online (if it were momentarily unreachable for whatever reason). All of these variables are configurable so you can, for example, increase the pause between data pulls. Here it is:
import os
import requests
import json
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import datetime
from dateutil import parser
import time
url = 'https://YOUR_BUBBLE_APP_NAME.bubbleapps.io/version-live/api/1.1/obj/DATA_TYPE_OF_INTEREST'
api_key = 'YOUR_APP_KEY'
def crawl_bubble_table(url, api_key, pause_in_seconds = 0.07, start_cursor=0):
hard_api_limit = 50000
soft_api_limit = 45000
retry_strategy = Retry(
total=6,
backoff_factor=4,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
def fetch_records(session = None, cursor=0, limit = 100, constraints=None, sort_field=None, descending=False):
payload = {"cursor": str(cursor),
"limit": limit,
"api_token": api_key,
"constraints": json.dumps(constraints),
"sort_field": sort_field,
"descending": descending,
}
if session:
response = session.get(url, params=payload)
else:
response = requests.get(url, params=payload)
if response.status_code != 200:
return response.status_code, {}
elif response.json()['response']['results'] == []:
return 404, {}
else:
return response.status_code, response.json()['response']
def extract_python_datetime(_datetime):
_datetime = parser.parse(_datetime)
return _datetime.replace(tzinfo=None)
def get_start_date_and_db_size():
status, result = fetch_records(limit=1, sort_field= 'Created Date')
start_date = result['results'][0]['Created Date']
records_remaining = result['remaining']
return extract_python_datetime(start_date), records_remaining
def get_end_date():
status, result = fetch_records(limit=1, sort_field= 'Created Date', descending='true')
return extract_python_datetime(result['results'][0]['Created Date'])
def add_millisecond(_datetime):
_datetime = _datetime + datetime.timedelta(milliseconds=1)
return _datetime, _datetime.isoformat() + 'Z'
def subtract_millisecond(_datetime):
_datetime = _datetime - datetime.timedelta(milliseconds=1)
return _datetime, _datetime.isoformat() + 'Z'
def ensure_tail_records_not_in_next_session(result):
all_records = result['results']
last_date = all_records[-1]['Created Date']
returnable_records_mask = [key['Created Date']!=last_date for key in all_records]
records_count = sum(returnable_records_mask)
ignored_records_count = result['count'] - records_count
returnable_records = [item for keep, item in zip(returnable_records_mask, all_records) if keep]
returnable_result = {
'count': records_count,
'cursor': result['cursor'],
'remaining': result['remaining'] + ignored_records_count,
'results': returnable_records
}
return returnable_result
status, result = fetch_records()
if status != 200:
yield status, result
else:
start_date, records_remaining = get_start_date_and_db_size()
cursor = start_cursor
if records_remaining < hard_api_limit:
# Download chunks using naive cursor increment
with requests.Session() as s:
s.mount("https://", adapter)
s.mount("http://", adapter)
while records_remaining > 0:
time.sleep(pause_in_seconds)
status, result = fetch_records(cursor=cursor)
records_remaining = result['remaining']
cursor += result['count']
yield status, result
else:
# Download chunks using date slicing increment
end_date = get_end_date()
end_date_plus_millisecond, end_date_plus_millisecond_string = add_millisecond(end_date)
incremental_start_date, incremental_start_date_string = subtract_millisecond(start_date)
# database_age_in_seconds = (end_date - start_date).seconds
# records_per_second = (records_remaining + 1) / database_age_in_seconds
cycling_sessions = True
while cycling_sessions:
search_constraints = [{
'key': 'Created Date',
'constraint_type': 'greater than',
'value': incremental_start_date_string,
}, {
'key': 'Created Date',
'constraint_type': 'less than',
'value': end_date_plus_millisecond_string,
}]
with requests.Session() as s:
s.mount("https://", adapter)
s.mount("http://", adapter)
while cursor < soft_api_limit and records_remaining > 0:
time.sleep(pause_in_seconds)
status, result = fetch_records(cursor=cursor, constraints=search_constraints, sort_field= 'Created Date')
records_remaining = result['remaining']
cursor += result['count']
if cursor >= soft_api_limit and records_remaining > 0:
# Last loop between sessions (right before next big crawl)
trimmed_result = ensure_tail_records_not_in_next_session(result)
# records_remaining = trimmed_result['remaining']
# cursor += trimmed_result['count']
yield status, trimmed_result
else:
yield status, result
incremental_end_date = extract_python_datetime(result['results'][-1]['Created Date'])
# records_remaining = True
if end_date == incremental_end_date and result['remaining'] == 0:
cycling_sessions = False
else:
cursor = 0
incremental_start_date_string = trimmed_result['results'][-1]['Created Date']
time.sleep(pause_in_seconds * 3)
And here’s how we use it:
crawl= crawl_bubble_table(url = url, api_key= api_key, pause_in_seconds = 0.1)
for status, result in crawl:
print(result['remaining'], result['results'])
# Do something with result['results'] here
This generator returns two variables, the status
and the result
of each call to our database. As long as the status is 200
, we’ll have data in the result
(otherwise result=[]
). result
is the standard Bubble response object containing the following keys, in order of usefullness:
-
results
: your data stored as a list of Python dictionaries. -
remaining
: the number of remainingresults
to be fetched not including the current results. -
cursor
: the cursor value locating this group ofresults
… not generally useful. -
count
: the number ofresults
(dictionaries) in this fetch (almost always 100)… not generally useful.
Cheers,