Bulk Data Ingestion in Redis using AWS Elasticache and Python
Redis is a highly available memory store that drastically reduces latency for database calls. For highly performant applications where milliseconds matter for data retrieval, redis really makes the difference.
In this article, I’ll walk you through using Redis as your in memory data store, including how to process bulk loads via thepipeline()
method.
In this exercise, I use a redis cluster implemented by AWS ElastiCache. Take note that in order to access ElastiCache, you’ll need to use an EC2 instance in the same VPC. Spinning up EC2 instances or ElastiCache clusters will not be covered in this article.
The data set consists of 17 million rows of weather data. This is the result of pre-processing I had already done some time back. These 17 million rows were loaded into Snowflake which will form the source database for this task.
Install Modules
Start by installing python modules for the next few sections.
# install the snowflake connector package
pip install snowflake-connector-python
# install the pandas extension for snowflake
pip install snowflake-connector-python[pandas]
# install redis-py
pip install redis
Load Into Snowflake
Since I already had preprocessed weather data, I wrote a simple script to fetch from csv and load into a new Snowflake table using the snowflake connector module and pandas.
import pandas as pd
import snowflake.connector as sc
from snowflake.connector.pandas_tools import write_pandas
conn = sc.connect(user="myUserName",
password="myPassWord",
account="myTrialAccount",
warehouse="COMPUTE_WH",
database="DEVDB",
schema="PUBLIC")
#create cursor
cursor = conn.cursor()
#load dataframe
station_data = pd.read_csv('path_to\station_data.txt')
print('Number of records in data file loaded: ', station_data.shape[0])
#write test scores dataframe to snowflake table (creating the table in the process.)
write_pandas(conn, station_data, table_name ='STATION_DATA', auto_create_table=True)
table_count = cursor.execute("select count(*) from STATION_DATA").fetchall()[0]
print('Table record count is: ',table_count[0])
#close cursor
cursor.close()
Load into Redis cache in Batches
My approach to preparing batches was to apply limit
and offset
keywords in the SQL select query that retrieves data from the Snowflake table. In this way, it’s possible to limit query results to a certain number of records, and the “flip through the pages” with offset
Redis pipeline()
methods is used to collect all the hmset commands before a pipe.execute()
call transmits them in bulk into the redis cache.
Take note that hmset()
accepts a dictionary as the value to the hash key we are creating.
import pandas as pd
import snowflake.connector as sc
from snowflake.connector.pandas_tools import write_pandas
from datetime import datetime
import redis
conn = sc.connect(user="myUserName",
password="myPassWord",
account="myTrialAccount",
warehouse="COMPUTE_WH",
database="DEVDB",
schema="PUBLIC")
#create cursor
cursor = conn.cursor()
#create redis cluster connection
redis_cluster_conn = redis.cluster.RedisCluster(
host = "clustercfg.devredis.oif3hh.use2.cache.amazonaws.com",
port=6379,
ssl= True,
reinitialize_steps=1
)
for offset in range(0,17):
# I wanted to track time
start = datetime.now()
try:
print('Initiating database call')
result = cursor.execute(f"select * from station_data limit 1000000 offset {offset * 1000000}").fetchall()
#create dataframe from results. use cursor description to fetch column names
df = pd.DataFrame(result,columns=[col[0] for col in cursor.description])
except Exception as e:
print('An error occurred while fetching from database', e)
# create redis pipeline
pipe = redis_cluster_conn.pipeline()
print("***********************************")
print("Num records in batch: ", df.shape[0])
for ind in df.index:
try:
#insert record into pipeline
pipe.hmset(f"{df.iloc[ind]['STATION']}:{df.iloc[ind]['DATE']}", {k:v for k,v in df.iloc[ind].to_dict().items()})
#set expiry time for key, if required
pipe.expire(f"{df.iloc[ind]['STATION']}:{df.iloc[ind]['DATE']}",3600)
except Exception as e:
print(e)
print("Loaded values into pipeline. Now attempting to load pipeline into redis...")
pipe.execute()
time_diff = datetime.now()-start
print(f"Inserted batch number {offset}. Time taken {time_diff.seconds}")
cursor.close()
The screenshot above from Redis Insight shows the hash keys inserted and their details. Redis Insight is a graphic interface tool that allows you to browse and monitor Redis instances.
From the screenshot above we see that it takes about 15 minutes to fetch and load a batch of 100,000 records. One optimization that was implemented in the code was to load an entire row in each hmset() command, as opposed to loading individual column:value pairs in a set command.
# load multiple columns in each call
hmset('key_name',{'col1':'val1','col2':'val2'})
#vs
# load 1 key value pair
hset('key_name','col1','val1')
hset('key_name','col2','val2')
It took 20% less time loading the entire row as one set command.