原文地址:Using SingleStore as a Time Series Database
原文作者:Akmal Chaudhri
通過使用Kaggle的S&P 500stock data(釋義見下面劃線部分),來探索SingleStore對時間序列數(shù)據(jù)庫的支持。我們將構(gòu)建一個快速儀表板來使candlestick charts(釋義見下面劃線部分)可視化。
S&P 500stock data:S&P是標準普爾公司的英文簡寫,標準普爾為投資者提供信用評級、獨立分析研究、投資咨詢等服務(wù)。這個數(shù)據(jù)集就是來自標準普爾500指數(shù)的500家樣本股從2013年2月到2018年2月五年的部分股票信息,分為所有公司的股票數(shù)據(jù)和分開的個體股票數(shù)據(jù),都包含7個字段,分別為data(日期),open(開盤價),high(最高),low(最低),close(收盤價),volume(成交量),name(股票名)。詳細概念可參考此鏈接:S&P 500 stock data
candlestick charts:蠟燭圖,通常叫K線圖,股市及期貨市場中的一種圖表。
摘要
SingleStore是一款多功能的數(shù)據(jù)庫系統(tǒng)。它基于相關(guān)技術(shù),支持多種模式,如鍵值、JSON、全文搜索、地理空間和時間序列。
本文將通過Kaggle中的S&P500股票數(shù)據(jù)集來探討SingleStore對時間序列數(shù)據(jù)的支持。我們還將構(gòu)建一個快速儀表板,以使用Streamlit可視化蠟燭圖。
本文中使用的SQL腳本、Python代碼和筆記本文件可在GitHub上找到。筆記本文件在DBC、HTML和iPython格式中是可用的。
介紹
自從關(guān)系型數(shù)據(jù)庫技術(shù)出現(xiàn)以來,許多管理數(shù)據(jù)的新要求不斷涌現(xiàn)。Martin Fowler等知名人士提出了Polyglot Persistence(多語言持久化)作為管理各種數(shù)據(jù)和數(shù)據(jù)處理要求的一種解決方案,如圖1所示。

圖1 Polyglot Persistence.
然而,Polyglot Persistence是有成本的,并引起了爭議,例如:
在一篇關(guān)于經(jīng)常引用polyglot persistence的文章中,Martin Fowler為一個假設(shè)的零售商設(shè)計了一個Web應(yīng)用程序,該應(yīng)用程序?qū)iak,Neo4j,MongoDB,Cassandra和RDBMS用于不同的數(shù)據(jù)集。不難想象,這造成了零售商的DevOps(Development和Operations的組合詞,這里根據(jù)語境譯為軟件開發(fā)、IT運維技術(shù)工程師)會成群結(jié)隊地辭職。
— —?Stephen Pimentel
此外:
根據(jù)歷史經(jīng)驗來看,如果你試圖采用其中六種[技術(shù)],你至少需要18名員工來操作存儲方面 - 比如六種存儲技術(shù)。這是不可估量的,而且太昂貴了。
— —?—?Dave McCrory
近年來,也有一些關(guān)于使用微服務(wù)來實現(xiàn)Polyglot Persistence(多語言持久化)架構(gòu)的建議。但是,SingleStore可以通過在單個多模型數(shù)據(jù)庫系統(tǒng)中支持不同的數(shù)據(jù)類型和處理要求來提供更簡單的解決方案。這提供了許多很多便利之處,例如,更低的TCO(Total Cost of Ownership,總擁有成本),可以減少開發(fā)人員學(xué)習(xí)多種產(chǎn)品的負擔,沒有集成難題等等。在一系列文章中,我們將更詳細地討論SingleStore的多模型功能。讓我們從時序數(shù)據(jù)開始吧。
首先,我們需要在SingleStore網(wǎng)站上創(chuàng)建一個免費的托管服務(wù)帳戶,并在Databricks網(wǎng)站上創(chuàng)建一個免費的CE社區(qū)版 帳戶。在撰寫本文時,SingleStore的托管服務(wù)帳戶附帶$500的信用值。這對于本文中描述的案例研究來說綽綽有余。對于Databricks社區(qū)版,我們需要注冊免費帳戶而不是試用版。使用Spark是因為,在之前的一篇文章中,我們注意到Spark非常適合使用SingleStore的ETL。
如果您在Kaggle沒有帳戶,請創(chuàng)建一個帳戶并下載all_stocks_5yr.csv文件。Kaggle網(wǎng)站顯示,此文件的大小為29.58MB。數(shù)據(jù)集由以下字段組成:
- date,日期:從 2013年2月8日至2018年2月7日的五年每日期間。無缺省值。
- open,開盤價:開盤價。11個缺失值。
- high,高:價格高。缺少8個值。
- low,低:價格低。缺少 8 個值。
- close,收盤價:收盤價。無缺省值。
- volume,交易量:交易的股票總數(shù)。無缺省值。
- name,名稱: 交易品種。505 個唯一值。無缺省值。
為了方便我們的初步探索,我們可以選擇日期、收盤價、名稱。
配置Databricks社區(qū)版
上一篇文章提供了有關(guān)如何配置Databricks社區(qū)版以用于SingleStore的詳細說明。我們可以將這些確切的說明用于此用例。
上傳CSV文件
我們需要將CSV文件上傳到Databricks社區(qū)版環(huán)境。上一篇文章對怎樣上傳CSV文件有詳細介紹。我們可以對這個用例進行精確說明。
創(chuàng)建數(shù)據(jù)庫表
在我們的SingleStore托管服務(wù)帳戶中,使用SQL編輯器創(chuàng)建一個新數(shù)據(jù)庫。命名為timeseries_db,創(chuàng)建語句如下:
CREATE DATABASE IF NOT EXISTS timeseries_db;
在timeseries_db數(shù)據(jù)庫中創(chuàng)建表,語句如下:
USE timeseries_db;
CREATE ROWSTORE TABLE IF NOT EXISTS tick (
ts DATETIME SERIES TIMESTAMP,
symbol VARCHAR(5),
price NUMERIC(18, 4),
KEY(ts)
);
每行都有一個名為ts的時間值屬性。因為在此示例中,我們不使用小數(shù)秒,所以用DATETIME而不是DATETIME(6)。SERIES TIMESTAMP指定表中的列作為默認時間戳。我們將在ts上創(chuàng)建一個KEY,以便能夠有效地篩選值的范圍。
在Databricks上新建Python腳本
現(xiàn)在讓我們在Databricks社區(qū)版上新建一個Python腳本。我們稱之為時序數(shù)據(jù)加載器。然后將新建腳本附加到Spark集群。
在新的代碼塊中,讓我們添加以下內(nèi)容:
from pyspark.sql.types import *
tick_schema = StructType([
StructField("ts", TimestampType(), True),
StructField("open", DoubleType(), True),
StructField("high", DoubleType(), True),
StructField("low", DoubleType(), True),
StructField("price", DoubleType(), True),
StructField("volume", IntegerType(), True),
StructField("symbol", StringType(), True)
])
此模式可確保我們擁有正確的列類型。
我們將在接下來的代碼塊中創(chuàng)建一個新的數(shù)據(jù)框架,如下:
tick_df = spark.read.csv("/FileStore/all_stocks_5yr.csv",
header = True,
? ? ? ? ? ? ? ? ? ? ? ? ?schema = tick_schema)
此段的作用是讀取CSV文件并創(chuàng)建一個名為tick_df的數(shù)據(jù)框架。還告訴Spark有一個標題行,并要求它使用以前定義的模式。
在以下代碼中,讓我們獲取行數(shù):
tick_df.count()
執(zhí)行完此操作,我們得到的返回值是619040。
基于初始分析,決定刪除這些列,如下所示:
tick_df = tick_df.drop("open", "high", "low", "volume")
并對數(shù)據(jù)進行排序:
tick_df = tick_df.sort("ts", "symbol")
使用如下語句,對Dataframe的結(jié)構(gòu)進行查看:
tick_df.show(10)
輸出如下:
+-------------------+-------+------+
| ts| price|symbol|
+-------------------+-------+------+
|2013-02-08 00:00:00| 45.08| A|
|2013-02-08 00:00:00| 14.75| AAL|
|2013-02-08 00:00:00| 78.9| AAP|
|2013-02-08 00:00:00|67.8542| AAPL|
|2013-02-08 00:00:00| 36.25| ABBV|
|2013-02-08 00:00:00| 46.89| ABC|
|2013-02-08 00:00:00| 34.41| ABT|
|2013-02-08 00:00:00| 73.31| ACN|
|2013-02-08 00:00:00| 39.12| ADBE|
|2013-02-08 00:00:00| 45.7| ADI|
+-------------------+-------+------+
only showing top 10 rows
現(xiàn)在,將Dataframe寫入SingleStore。我們可以添加以下內(nèi)容:
%run ./Setup
在Setup腳本中,我們需要確保已為SingleStore托管服務(wù)集群添加了服務(wù)器地址和密碼。
在如下代碼中,我們將為SingleStore Spark Connector設(shè)置部分參數(shù),如下所示:
spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster)
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", password)
spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")
最后, 我們將使用此Spark Connector把Dataframe寫入到SingleStore中:
(tick_df.write
.format("singlestore")
.option("loadDataCompression", "LZ4")
.mode("ignore")
? ?.save("timeseries_db.tick"))
以上操作是將Dataframe寫入timeseries_db數(shù)據(jù)庫中的tick表。我們可以檢查此表是否已成功從SingleStore中填充。
示例查詢
現(xiàn)在已經(jīng)構(gòu)建了我們的系統(tǒng),可以在上面運行一些查詢。SingleStore支持一系列用于處理時序數(shù)據(jù)的有用函數(shù)。讓我們看一些例子。
Average Aggregate(平均聚合函數(shù))
下面的查詢說明了如何在表中計算簡單的平均總體時間序列值。
SELECT symbol, AVG(price)
FROM tick
GROUP BY symbol
ORDER BY symbol;
輸出如下:
+--------+---------------+
| symbol | AVG(price) |
+--------+---------------+
| A | 49.20202542 |
| AAL | 38.39325226 |
| AAP | 132.43346307 |
| AAPL | 109.06669849 |
| ABBV | 60.86444003 |
? ... ? ? ?...
Time Bucketing函數(shù)
Time Bucketing函數(shù)可以按固定的時間間隔聚合和分組不同時間序列的數(shù)據(jù)。單一存儲支持多種函數(shù):
- FIRST:聚合函數(shù),它返回一組輸入值的第一個值,該值定義為與最小時間關(guān)聯(lián)的值。。該文檔包含其他詳細信息和示例。
- LAST:聚合函數(shù),它返回一組輸入值的最后一個值,定義為與最長時間關(guān)聯(lián)的值。與最大時間戳關(guān)聯(lián)的值。該文檔包含其他詳細信息和示例。
- TIME_BUCKET:聚合函數(shù),它將時間規(guī)范化為最接近的存儲桶開始時間。該文檔包含其他詳細信息和示例。
例如,我們可以用TIME_BUCKET來查找以5天為一個間隔分組的平均時間序列值,操作如下:
SELECT symbol, TIME_BUCKET("5d", ts), AVG(price)
FROM tick
WHERE symbol = "AAPL"
GROUP BY 1, 2
ORDER BY 1, 2;
輸出如下:
+--------+-----------------------+--------------+
| symbol | TIME_BUCKET("5d", ts) | AVG(price) |
+--------+-----------------------+--------------+
| AAPL | 2013-02-08 00:00:00.0 | 67.75280000 |
| AAPL | 2013-02-13 00:00:00.0 | 66.36943333 |
| AAPL | 2013-02-18 00:00:00.0 | 64.48960000 |
| AAPL | 2013-02-23 00:00:00.0 | 63.63516667 |
| AAPL | 2013-02-28 00:00:00.0 | 61.51996667 |
? ... ? ? ?... ? ? ? ? ? ? ? ? ? ? ...
我們還可以套用這些函數(shù)來創(chuàng)建蠟燭圖,圖表中顯示股票隨時間推移的高價、最低價、開盤價和收盤價,按五天時段進行存儲,如下所示:
SELECT TIME_BUCKET("5d") AS ts,
symbol,
MIN(price) AS low,
MAX(price) AS high,
FIRST(price) AS open,
LAST(price) AS close
FROM tick
WHERE symbol = "AAPL"
GROUP BY 2, 1
ORDER BY 2, 1;
輸出如下:
+------------+--------+----------+----------+----------+----------+
| ts | symbol | low | high | open | close |
+------------+--------+----------+----------+----------+----------+
| 2013-02-08 | AAPL | 66.8428 | 68.5614 | 67.8542 | 66.8428 |
| 2013-02-13 | AAPL | 65.7371 | 66.7156 | 66.7156 | 65.7371 |
| 2013-02-18 | AAPL | 63.7228 | 65.7128 | 65.7128 | 64.4014 |
| 2013-02-23 | AAPL | 63.2571 | 64.1385 | 63.2571 | 63.5099 |
| 2013-02-28 | AAPL | 60.0071 | 63.0571 | 63.0571 | 60.0071 |
? ... ? ? ? ? ?... ? ? ?... ? ? ? ?... ? ? ? ?... ? ? ? ?...
Smoothing(平滑)
我們可以使用AVG函數(shù)將時序數(shù)據(jù)用作窗口聚合。下面是一個示例,我們查看價格和過去三個點變動的價格平均線:
SELECT symbol, ts, price, AVG(price)
OVER (ORDER BY ts ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) AS smoothed_price
FROM tick
WHERE symbol = "AAPL";
輸出如下:
+--------+-----------------------+----------+----------------+
| symbol | ts | price | smoothed_price |
+--------+-----------------------+----------+----------------+
| AAPL | 2013-02-08 00:00:00.0 | 67.8542 | 67.85420000 |
| AAPL | 2013-02-11 00:00:00.0 | 68.5614 | 68.20780000 |
| AAPL | 2013-02-12 00:00:00.0 | 66.8428 | 67.75280000 |
| AAPL | 2013-02-13 00:00:00.0 | 66.7156 | 67.49350000 |
| AAPL | 2013-02-14 00:00:00.0 | 66.6556 | 67.19385000 |
? ... ? ? ?... ? ? ? ? ? ? ? ? ? ? ... ? ? ? ?...
AS OF
查找當前AS OF時間點的表中的行也是常見的時序要求。這可以使用ORDER BY和LIMIT輕松實現(xiàn)。示例如下:
SELECT *
FROM tick
WHERE ts <= "2021-10-11 00:00:00"
AND symbol = "AAPL"
ORDER BY ts DESC
LIMIT 1;
輸出如下:
+-----------------------+--------+----------+
| ts | symbol | price |
+-----------------------+--------+----------+
| 2018-02-07 00:00:00.0 | AAPL | 159.5400 |
+-----------------------+--------+----------+
Interpolation(插值)
時序數(shù)據(jù)可能存在間隙。我們可以插入缺失點。SingleStore文檔提供了一個示例存儲過程,在處理價格變動數(shù)據(jù)時,可以使用該存儲過程來實現(xiàn)此目的。
Bonus: Streamlit Visualization(流光可視化)
早些時候,提到了蠟燭圖,能以圖形而不是表格格式查看這些圖表,那就太好了。我們可以通過Streamlit輕松做到這一點。上一篇文章展示了我們可以輕松地將Streamlit連接到SingleStore。
Install the Required Software(安裝所需的軟件)
streamlit
pandas
plotly
pymysql
這些可以在GitHub上的requirements.txt文件中找到。按如下所示運行該文件:
pip install -r requirements.txt
應(yīng)用示例
以下是streamlit_app.py的完整代碼:
# streamlit_app.py
import streamlit as st
import pandas as pd
import plotly.graph_objects as go
import pymysql
# Initialize connection.
def init_connection():
return pymysql.connect(**st.secrets["singlestore"])
conn = init_connection()
symbol = st.sidebar.text_input("Symbol", value = "AAPL", max_chars = None, key = None, type = "default")
num_days = st.sidebar.slider("Number of days", 2, 30, 5)
# Perform query.
data = pd.read_sql("""
SELECT TIME_BUCKET(%s) AS day,
symbol,
MIN(price) AS low,
MAX(price) AS high,
FIRST(price) AS open,
LAST(price) AS close
FROM tick
WHERE symbol = %s
GROUP BY 2, 1
ORDER BY 2, 1;
""", conn, params = (str(num_days) + "d", symbol.upper()))
st.subheader(symbol.upper())
fig = go.Figure(data = [go.Candlestick(
x = data["day"],
open = data["open"],
high = data["high"],
low = data["low"],
close = data["close"],
name = symbol,
)])
fig.update_xaxes(type = "category")
fig.update_layout(height = 700)
st.plotly_chart(fig, use_container_width = True)
st.write(data)
Create Secrets file(創(chuàng)建機密文件)
我們的本地Streamlit應(yīng)用程序?qū)膽?yīng)用程序根目錄中的讀取此文件:streamlit/secrets.toml。因此需要創(chuàng)建此文件,內(nèi)容如下:
# .streamlit/secrets.toml
[singlestore]
host = "<TO DO>"
port = 3306
database = "timeseries_db"
user = "admin"
password = "<TO DO>"
創(chuàng)建集群時,應(yīng)將TO DO的主機和密碼替換為從SingleStore托管服務(wù)獲取的值。
運行代碼
我們可以按如下方式運行Streamlit應(yīng)用程序:
streamlit run streamlit_app.py
Web瀏覽器中的輸出應(yīng)如圖2所示。

圖2. Streamlit.
在網(wǎng)頁上,我們可以在文本框中輸入新的股票代碼,然后使用滾動條更改TIME_BUCKET的天數(shù)。可以隨意試驗代碼以滿足您的需求。
小結(jié)
本文展示了 SingleStore 是處理時序數(shù)據(jù)的強大解決方案。使用SQL和內(nèi)置函數(shù),我們可以實現(xiàn)目標。SingleStore通過添加FIRST、LAST和TIME_BUCKET擴展了對時序的支持。
致謝
感謝John Pickford博士對時間序列數(shù)據(jù)集的建議和指導(dǎo)。
也萬分感激Part-Time Larry 在 Streamlit—Building Financial Dashboards with Python和GitHub上的代碼及精彩視頻,以激發(fā)本文中對Streamlit Visualization的靈感。




