Java 和 Python 是當(dāng)今最流行的兩種計算機(jī)語言。兩者都非常成熟,并提供了工具和技術(shù)生態(tài)系統(tǒng),幫助我們解決數(shù)據(jù)科學(xué)領(lǐng)域出現(xiàn)的挑戰(zhàn)性問題。每種語言都各有優(yōu)勢,我們要知道什么時候應(yīng)該使用哪種工具,或者什么時候它們應(yīng)該協(xié)同工作相互補(bǔ)充。
Python 是一種動態(tài)類型語言,使用起來非常簡單,如果我們不想接觸復(fù)雜的程序,它肯定是進(jìn)行復(fù)雜計算的首選語言。Python 提供了優(yōu)秀的庫(Pandas、NumPy、Matplotlib、ScyPy、PyTorch、TensorFlow 等)來支持對數(shù)據(jù)結(jié)構(gòu)或數(shù)組的邏輯、數(shù)學(xué)和科學(xué)操作。
Java 是一種非常健壯的語言,具有強(qiáng)類型,因此有更嚴(yán)格的語法規(guī)則,所以不易出現(xiàn)程序錯誤。與Python一樣,它也提供了大量的庫來處理數(shù)據(jù)結(jié)構(gòu)、線性代數(shù)、機(jī)器學(xué)習(xí)和數(shù)據(jù)處理(ND4J、Mahout、Spark、Deeplearning4J 等)。
本文將介紹如何對大量表格數(shù)據(jù)進(jìn)行簡單的數(shù)據(jù)分析,并使用 Java 和 Python 計算一些統(tǒng)計數(shù)據(jù)。我們可以看到使用各個平臺進(jìn)行數(shù)據(jù)分析的不同技術(shù),對比它們的擴(kuò)展方式,以及應(yīng)用并行計算來提高其性能的可行性。
提出問題
我們要對不同州的一大批城市的價格做一個簡單的分析,這里假設(shè)有一個包含此信息的 CSV 文件。閱讀文件并繼續(xù)過濾掉一些州,并將剩余的州按城市-州分組以進(jìn)行一些基本統(tǒng)計。希望能夠找到有效執(zhí)行的解決方案,并且能夠隨著輸入數(shù)據(jù)規(guī)模的增長而有良好的擴(kuò)展。
數(shù)據(jù)樣本是:
城市 | 州 | 基本價格 | 實(shí)際價格 |
La Jose | PA | 34.17 | 33.19 |
Preachers Slough | WA | 27,46 | 90.17 |
Doonan Corners | NY | 92.0 | 162.46 |
Doonan Corners | NY | 97.45 | 159.46 |
Castle Rock | WA | 162.16 | 943.21 |
Marble Rock | IA | 97.13 | 391.49 |
Mineral | CA | 99.13 | 289.37 |
Blountville | IN | 92.50 | 557.66 |
Blountsville | IN | 122.50 | 557.66 |
Coe | IN | 187.85 | 943.98 |
Cecilia | KY | 92.85 | 273.61 |
目的是展示如何使用 Java 和 Python 解決這些類型的問題。該示例非常簡單且范圍有限,但很容易拓展到更具挑戰(zhàn)性的問題。
Java 的方法
首先定義一個封裝數(shù)據(jù)元素的 Java 記錄:
record InputEntry(String city, String state, double basePrice, double actualPrice) {}
記錄(record)是 JDK 14 中引入的一種新型類型聲明。它是定義提供構(gòu)造函數(shù)、訪問器、equals 和哈希實(shí)現(xiàn)的不可變類的一種簡捷方式。
接下來,讀取 CVS 文件并將它們增加到一個列表中:
List inputEntries = readRecordEntriesFromCSVFile(recordEntries.csv);
為了按城市和州對輸入的元素進(jìn)行分組,將其定義:
record CityState(String city, String state) {};
使用以下類來封裝屬于一個組的所有元素的統(tǒng)計信息:
record StatsAggregation(StatsAccumulator basePrice, StatsAccumulator actualPrice) {}
StatsAccumulator是Guava 庫的一部分??梢詫㈦p精度值集合添加到類中,它會計算基本統(tǒng)計數(shù)據(jù),例如計數(shù)、平均值、方差或標(biāo)準(zhǔn)差。可以使用StatsAccumulator來獲取InputEntry的basePrice和actualPrice的統(tǒng)計數(shù)據(jù)。
現(xiàn)在我們已經(jīng)擁有了解決問題的所有材料。Java Streams提供了一個強(qiáng)大的框架來實(shí)現(xiàn)數(shù)據(jù)操作和分析。它的聲明式編程風(fēng)格,對選擇、過濾、分組和聚合的支持,簡化了數(shù)據(jù)操作和統(tǒng)計分析。它的框架還提供了一個強(qiáng)大的實(shí)現(xiàn),可以處理大量的(甚至是無限的流),并通過使用并行性、懶惰性和短路操作來高效處理。所有這些特性使Java Streams成為解決這類問題的絕佳選擇。實(shí)現(xiàn)非常簡單:
Map stats = inputEntries.stream(). filter(i -> !(i.state().equals(“MN”) || i.state().equals(“CA”))).collect( groupingBy(entry -> new CityState(entry.city(), entry.state()), collectingAndThen(Collectors.toList(), list -> {StatsAccumulator sac = new StatsAccumulator(); sac.addAll(list.stream().mapToDouble(InputEntry::basePrice)); StatsAccumulator sas = new StatsAccumulator(); sas.addAll(list.stream().mapToDouble(InputEntry::actualPrice)); return new StatsAggregation(sac, sas);} )));
在代碼的第 2 行,我們使用Stream::filter. 這是一個布爾值函數(shù),用于過濾列表中的元素??梢詫?shí)現(xiàn)一個 lambda 表達(dá)式來刪除任何包含“MN”或“CA”狀態(tài)的元素。
然后繼續(xù)收集列表的元素并調(diào)用Collectors::groupingBy()(第 3 行),它接受兩個參數(shù):
- 一個分類功能,使用CityState記錄來做城市和州的分組(第3行)。
- 下游的收集器,包含屬于同一的元素。使用Collectors::collectingAndThen(第 4 行),它采用兩個參數(shù)分兩步進(jìn)行歸約:
·我們使用Collectors::toList(第 4 行),它返回一個收集器,它將屬于同一的所有元素放到一個列表中。
·隨后對這個列表進(jìn)行了整理轉(zhuǎn)換。使用一個lambda函數(shù)(第5行至第9行)來定義兩個StatsAccumulator(s),在這里分別計算前一個列表中的basePrice和actualPrice元素的統(tǒng)計數(shù)據(jù)。最后,返回到新創(chuàng)建的包含這些元素的StatsAggregation記錄。
正如前文所述,使用Java Streams的優(yōu)勢之一是,它提供了一種簡單的機(jī)制,可以使用多線程進(jìn)行并行處理。這允許利用CPU的多核資源,同時執(zhí)行多個線程。只要在流中添加一個 “parallel”:
Map stats = inputEntries.stream().parallel().
這導(dǎo)致流框架將元素列表細(xì)分為多個部分,并同時在單獨(dú)的線程中運(yùn)行它們。隨著所有不同的線程完成它們的計算,框架將它們串行添加到生成的 Map 中。
在第4行中使用Collectors::groupingByConcurrent而不是Collectors:groupingBy。在這種情況下,框架使用并發(fā)映射,允許將來自不同線程的元素直接插入到此映射中,而不必串行組合。
有了這三種可能性,可以檢查它們?nèi)绾螆?zhí)行之前的統(tǒng)計計算(不包括從 CSV 文件加載數(shù)據(jù)的時間),因?yàn)榧虞d量從500萬條翻倍到2000萬條:
串行 | 平行 | 并行 & GroupByConcurrent | |
五百萬個 元素 | 3.045 秒 | 1.941 秒 | 1.436 秒 |
一千萬個 元素 | 6.405 秒 | 2.876 秒 | 2.785 秒 |
兩千萬個 元素 | 8.507 秒 | 4.956 秒 | 4.537 秒 |
可以看到并行運(yùn)行大大提高了性能;隨著負(fù)載的增加,時間幾乎減半。使用 GroupByConcurrent 還可額外獲得 10% 的收益。
最后,得到結(jié)果是微不足道的;例如,要獲得印第安納州 Blountsville 的統(tǒng)計數(shù)據(jù),我們只需要:
StatsAggregation aggreg = stateAggr.get(new CityState(“Blountsville “, “IN”));System.out.println(“Blountsville, IN”);System.out.println(“basePrice.mean: ” + aggreg.basePrice().mean());System.out.println(“basePrice.populationVariance: ” + aggreg.basePrice().populationVariance());System.out.println(“basePrice.populationStandardDeviation: ” + aggreg.basePrice().populationStandardDeviation());System.out.println(“actualPrice.mean: ” + aggreg.basePrice().mean());System.out.println(“actualPrice.populationVariance: ” + aggreg.actualPrice().populationVariance());System.out.println(“actualPrice.populationStandardDeviation: ” + aggreg.actualPrice().populationStandardDeviation());
得到的結(jié)果:
Blountsville : INbasePrice.mean: 50.302588996763795basePrice.sampleVariance: 830.7527439246837basePrice.sampleStandardDeviation: 28.822781682632293basePrice.count: 309basePrice.min: 0.56basePrice.max: 99.59actualPrice.mean: 508.8927831715211actualPrice.sampleVariance: 78883.35878833274actualPrice.sampleStandardDeviation: 280.86181440048546actualPrice.count: 309actualPrice.min: 0.49actualPrice.max: 999.33
Python的方法
在 Python 中,有幾個庫可以處理數(shù)據(jù)統(tǒng)計和分析。其中,Pandas 庫非常適合處理大量表格數(shù)據(jù),它提供了非常有效的過濾、分組和統(tǒng)計分析方法。
使用 Python 分析以前的數(shù)據(jù):
import pandas as pddef group_aggregations(df_group_by): df_result = df_group_by.agg( {‘basePrice’: [‘count’, ‘min’, ‘max’, ‘mean’, ‘std’, ‘var’], ‘actualPrice’: [‘count’, ‘min’, ‘max’, ‘mean’, ‘std’, ‘var’]} ) return df_result if __name__ == ‘__main__’: df = pd.read_csv(“recordEntries.csv”) excluded_states = [‘MN’, ‘CA’] df_st = df.loc[~ df[‘state’].isin(excluded_states)] group_by = df_st.groupby([‘city’, ‘state’], sort=False) aggregated_results = group_aggregations(group_by)
在主要部分,先調(diào)用pandas.read_csv()(第 11 行)將文件中用逗號分隔的值加載到 PandasDataFrame中。
在第13行,使用~df[‘state’].isin(excluded_states)來得到一個Pandas系列的布爾值,使用pandas.loc()來過濾其中不包括的州(MN和CA)。
接下來,在第14行使用DataFrame.groupby()來按城市和州進(jìn)行分組。結(jié)果由group_aggregations()處理,保存每個組的basePrice和actualPrice的統(tǒng)計數(shù)據(jù)。
在Python中打印結(jié)果是非常直接的。IN和Blountsville的結(jié)果:
print(aggregated_results.loc[‘Blountsville’, ‘IN’][‘basePrice’]) print(aggregated_results.loc[‘Blountsville’, ‘IN’][‘actualPrice’])
統(tǒng)計數(shù)據(jù):
base_price:Name: (Blountsville, IN), dtype: float64count 309.000000min 0.560000max 99.590000mean 50.302589std 28.822782var 830.752744actual_price:Name: (Blountsville, IN), dtype: float64count 309.000000min 0.490000max 999.330000mean 508.892783std 280.861814var 78883.358788
為了并行運(yùn)行前面的代碼,我們必須記住,Python并不像Java那樣支持細(xì)粒度的鎖機(jī)制。必須解決好與全局解釋器鎖(GIL)的問題,無論你有多少個CPU多核或線程,一次只允許一個線程執(zhí)行。
為了支持并發(fā),我們必須考慮到有一個CPU 密集型進(jìn)程,因此,最好的方法是使用multiprocessing。所以需要修改代碼:
from multiprocessing import Poolimport pandas as pddef aggreg_basePrice(df_group): ct_st, grp = df_group return ct_st, grp.basePrice.agg([‘count’, ‘min’, ‘max’, ‘mean’, ‘std’, ‘var’]) if __name__ == ‘__main__’: df = pd.read_csv(“recordEntries.csv”) start = time.perf_counter() excluded_states = [‘MN’, ‘CA’] filtr = ~ df[‘state’].isin(excluded_states) df_st = df.loc[filtr] grouped_by_ct_st = df_st.groupby([‘city’, ‘state’], sort=False) with Pool() as p: list_parallel = p.map(aggreg_basePrice, [(ct_st, grouped) for ct_st, grouped in grouped_by_ct_st]) print(f’Time elapsed parallel: {round(finish – start, 2)} sec’)
和之前一樣,使用Pandas groupby()來獲得按城市和州分組的數(shù)據(jù)(第14行)。在下一行,使用多進(jìn)程庫提供的Pool()來映射分組的數(shù)據(jù),使用aggreg_basePrice來計算每組的統(tǒng)計數(shù)據(jù)。Pool()會對數(shù)據(jù)進(jìn)行分割,并在幾個平行的獨(dú)立進(jìn)程中進(jìn)行統(tǒng)計計算。
正如下面的表格中所示,多進(jìn)程比串行運(yùn)行進(jìn)程慢得多。因此,對于這些類型的問題,不值得使用這種方法。
可以使用另一種并發(fā)運(yùn)行代碼 – Modin。Modin提供了一種無縫的方式來并行化你的代碼,當(dāng)你必須處理大量的數(shù)據(jù)時是非常有用的。將導(dǎo)入語句從import pandas as pd改為import modin.pandas as pd,可以并行運(yùn)行代碼,并利用環(huán)境中可能存在的內(nèi)核集群來加速代碼的執(zhí)行。
下面的表格是剛剛涉及的不同場景的運(yùn)行時間(和以前一樣,不包括從CSV文件中讀取數(shù)據(jù)的時間):
串行 | 多進(jìn)程 | Modin 過程 | |
五百萬個 元素 | 1.94 秒 | 20.25 秒 | 6.99 秒 |
一千萬個 元素 | 4.07 秒 | 25.1 秒 | 12.88 秒 |
兩千萬個 元素 | 7.62 秒 | 36.2 秒 | 25.94 秒 |
根據(jù)表格顯示,在Python中串行運(yùn)行代碼甚至比在Java中更快。然而,使用多進(jìn)程會大大降低性能。使用Moding可以改善結(jié)果,使串行運(yùn)行進(jìn)程更有利。值得一提的是,和以前一樣,我們在計算時間時不包括從CSV文件中讀取數(shù)據(jù)的時間。
可以發(fā)現(xiàn),對于 Pandas 中的 CPU 密集型進(jìn)程來說,并行化代碼是沒有優(yōu)勢的。從某種意義上說,這反映了 Pandas 最初的架構(gòu)方式。Pandas 在串行模式下的運(yùn)行速度令人印象深刻,而且即使處理大量數(shù)據(jù)也具有很好的擴(kuò)展性。
需要指出的是,Python中統(tǒng)計數(shù)字的計算速度取決于它的執(zhí)行方式。為了得到快速的計算,需要應(yīng)用到統(tǒng)計函數(shù)。例如,做一個簡單的pandas.DataFrame.describe()來獲得統(tǒng)計信息,運(yùn)行速度會非常慢。
Java 的 Streams 或 Python 的 Pandas 是對大量數(shù)據(jù)進(jìn)行分析和統(tǒng)計的兩個絕佳選擇。兩者都有非??煽康目蚣埽约白銐虻闹С?,能夠?qū)崿F(xiàn)出色的性能和可擴(kuò)展性。
Java 提供了非常強(qiáng)大的基礎(chǔ)架構(gòu),非常適合處理復(fù)雜的程序流。它非常高效,可以有效地并行運(yùn)行進(jìn)程。適用于快速獲得結(jié)果。
Python 非常適合做數(shù)學(xué)和統(tǒng)計。它非常簡單,相當(dāng)快,非常適合進(jìn)行復(fù)雜的計算。