Arkadaşlar öncelikle veri setini indirmeniz gerekiyor. Veri 1 gb ın biraz üstünde bu yüzden buraya koyamadım. 8226597 satır 10 kolon büyüklüğünde italat ihracat hareketlerinin olduğu bir veri. Sadece spark dataFrame ve ilgili bir kaç örnek koydum.
Sizdeki diz üstü bilgisayarlar ile çok sıkıntı çıkarmaz diye düşünüyorum.
Kolay gelsin.
1 2 3 4 5 6 7 8 |
import pyspark as spark from pyspark.sql import SparkSession ss = SparkSession.builder.config('spark.analyticHouse.config.option','cagraksu').getOrCreate() data=ss.read.csv('commodity_trade_statistics_data.csv', header='true') data.show(10) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
+---------------+----+---------+--------------------+------+---------+---------+---------------+--------+---------------+ |country_or_area|year|comm_code| commodity| flow|trade_usd|weight_kg| quantity_name|quantity| category| +---------------+----+---------+--------------------+------+---------+---------+---------------+--------+---------------+ | Afghanistan|2016| 010410| Sheep, live|Export| 6088| 2339|Number of items| 51|01_live_animals| | Afghanistan|2016| 010420| Goats, live|Export| 3958| 984|Number of items| 53|01_live_animals| | Afghanistan|2008| 010210|Bovine animals, l...|Import| 1026804| 272|Number of items| 3769|01_live_animals| | Albania|2016| 010290|Bovine animals, l...|Import| 2414533| 1114023|Number of items| 6853|01_live_animals| | Albania|2016| 010392|Swine, live excep...|Import| 14265937| 9484953|Number of items| 96040|01_live_animals| | Albania|2016| 010511|Fowls, live domes...|Import| 2671732| 254652|Number of items| 5629138|01_live_animals| | Albania|2016| 010511|Fowls, live domes...|Export| 87581| 5320|Number of items| 115180|01_live_animals| | Albania|2016| 010519|Poultry, live exc...|Import| 26485| 2908|Number of items| 64000|01_live_animals| | Albania|2016| 010591|Fowls, live domes...|Import| 2421513| 1926850|Number of items| 1006990|01_live_animals| | Albania|2016| 010599|Poultry, live exc...|Import| 251318| 211177|Number of items| 205124|01_live_animals| +---------------+----+---------+--------------------+------+---------+---------+---------------+--------+---------------+ only showing top 10 rows |
Sizdeki diz üstü bilgisayarla
1 |
print((data.count(), len(data.columns))) |
1 |
(8226597, 10) |
1 2 |
data.cache() data.printSchema() |
1 2 3 4 5 6 7 8 9 10 11 |
root |-- country_or_area: string (nullable = true) |-- year: string (nullable = true) |-- comm_code: string (nullable = true) |-- commodity: string (nullable = true) |-- flow: string (nullable = true) |-- trade_usd: string (nullable = true) |-- weight_kg: string (nullable = true) |-- quantity_name: string (nullable = true) |-- quantity: string (nullable = true) |-- category: string (nullable = true) |
1 |
data.describe().toPandas().transpose() |
1 2 3 4 5 6 7 |
import pyspark.sql.functions as fn ser=data.groupBy(['country_or_area', 'year']).agg(fn.sum('trade_usd').alias('sumofTrade'), fn.sum('quantity').alias('quantity'), (fn.sum('trade_usd')/fn.sum('quantity')).alias('mrt')) ser.show(10) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
+---------------+----+-----------------+----------------+------------------+ |country_or_area|year| sumofTrade| quantity| mrt| +---------------+----+-----------------+----------------+------------------+ | Azerbaijan|2007| 2.228440104E10| 1.9066760445E10|1.1687565438440164| | Bahamas|2008| 8.136656649E9| 6.486876441E9| 1.254325825843181| | Bahamas|2007| 7.840346866E9| 5.933556871E9| 1.321356993192288| | Costa Rica|2006| 2.60477453E10| 1.0895801066E10|2.3906223270982028| | Fiji|2015| 5.415131782E9| 2.510226885E9|2.1572280236334094| | Hungary|2014| 2.76159315736E11| 4.6196362918E10|5.9779449786164225| | Japan|2000|1.072186864493E12|6.95608388492E11|1.5413656336396107| | Lebanon|2009| 2.9440686996E10| 9.560315406E9|3.0794681708433167| | Madagascar|1996| 1.333358655E9| 1.39173064E9|0.9580579867092672| | Netherlands|2005| 8.13787850251E11| 9.3217722337E10| 8.72996925744442| +---------------+----+-----------------+----------------+------------------+ only showing top 10 rows |
1 2 3 4 |
data_e=data.select('country_or_area', 'year','trade_usd', fn.when(data.flow.like("Export"),-1).otherwise(1).alias("times")).filter(data.flow.like("Re-Import")) data_e.show(10) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
+---------------+----+---------+-----+ |country_or_area|year|trade_usd|times| +---------------+----+---------+-----+ | Albania|2006| 546949| 1| | Andorra|2012| 2121| 1| | Argentina|2016| 303439| 1| | Argentina|2016| 316781| 1| | Argentina|2015| 34361| 1| | Argentina|2015| 17170| 1| | Argentina|2015| 6970| 1| | Argentina|2014| 24120| 1| | Argentina|2014| 42885| 1| | Argentina|2014| 11276| 1| +---------------+----+---------+-----+ only showing top 10 rows |
1 |
data_e.select('country_or_area').distinct().orderBy('country_or_area').show(10) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
+---------------+ |country_or_area| +---------------+ | Albania| | Algeria| | Andorra| | Angola| | Argentina| | Armenia| | Aruba| | Australia| | Bahrain| | Bangladesh| +---------------+ only showing top 10 rows |
1 2 3 4 5 6 |
smu=data_e.groupBy('country_or_area').pivot('year').agg( fn.sum( fn.when(fn.isnull(data_e.trade_usd),0).otherwise(data_e.trade_usd) *fn.when(fn.isnull(data_e.times),0).otherwise(data_e.times))) smu.select('country_or_area','2011','2012','2013','2014','2015').fillna(0).show(10) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
+---------------+-----------+-----------+------------+------------+-----------+ |country_or_area| 2011| 2012| 2013| 2014| 2015| +---------------+-----------+-----------+------------+------------+-----------+ | Côte d'Ivoire| 0.0| 0.0| 0.0| 0.0| 8272813.0| |Rep. of Moldova| 8071152.0| 5546945.0| 6968892.0| 1.4613458E7| 8301084.0| | Paraguay| 0.0| 0.0| 0.0| 0.0| 0.0| | Yemen| 0.0|4.5168144E7| 0.0| 0.0| 0.0| | Fmr Sudan| 0.0| 0.0| 0.0| 0.0| 0.0| | Tonga| 0.0| 0.0| 0.0| 0.0| 0.0| | Malaysia|3.1289003E7|7.0841959E7|1.92684821E8|1.41658566E8|3.0047307E7| | Fiji| 0.0| 0.0| 103269.0| 125957.0| 214.0| | Czech Rep.| 819046.0| 801175.0| 1588.0| 20795.0| 16815.0| | Malawi| 0.0| 1288388.0| 4207988.0| 3235215.0| 0.0| +---------------+-----------+-----------+------------+------------+-----------+ only showing top 10 rows |
1 2 3 |
data_france=smu.filter(smu.country_or_area.like('France')) data_france.toPandas().T.head(10) |
1 2 3 4 5 6 7 8 9 10 |
country_or_area France 2000 4.01716e+09 2001 4.30344e+09 2002 4.72514e+09 2003 5.91889e+09 2004 7.05895e+09 2005 7.20075e+09 2006 7.40211e+09 2007 9.0772e+09 2008 9.72221e+09 |
1 2 |
smu=smu.na.drop(how="all") smu=smu.fillna(0) |
1 |
smu.stat.corr('2011','2012') |
1 |
0.998124329296175 |
1 2 3 4 5 6 |
from pyspark.ml.feature import VectorAssembler vectorAssembler = VectorAssembler(inputCols = ['2008','2009','2010','2011','2012','2013','2014'], outputCol = 'features') smu_df = vectorAssembler.transform(smu) smu_df.take(2) |
1 2 3 |
[Row(country_or_area="Côte d'Ivoire", 2000=0.0, 2001=0.0, 2002=0.0, 2003=0.0, 2004=0.0, 2005=0.0, 2006=0.0, 2007=0.0, 2008=0.0, 2009=0.0, 2010=0.0, 2011=0.0, 2012=0.0, 2013=0.0, 2014=0.0, 2015=8272813.0, 2016=0.0, features=SparseVector(7, {})), Row(country_or_area='Rep. of Moldova', 2000=0.0, 2001=0.0, 2002=0.0, 2003=4391931.0, 2004=0.0, 2005=0.0, 2006=0.0, 2007=1860474.0, 2008=9337344.0, 2009=5955397.0, 2010=6429529.0, 2011=8071152.0, 2012=5546945.0, 2013=6968892.0, 2014=14613458.0, 2015=8301084.0, 2016=13945858.0, features=DenseVector([9337344.0, 5955397.0, 6429529.0, 8071152.0, 5546945.0, 6968892.0, 14613458.0]))] |
1 2 3 4 5 |
smu_df = smu_df.select(['features', '2015']) splits = smu_df.randomSplit([0.7, 0.3]) train_df = splits[0] test_df = splits[1] |
1 2 3 |
from pyspark.ml.regression import LinearRegression lr = LinearRegression(featuresCol = 'features', labelCol='2015', maxIter=10, regParam=0.3, elasticNetParam=0.8) |
1 2 3 4 |
lrModel = lr.fit(train_df) print("Coefficients: " + str(lrModel.coefficients)) print("Intercept: " + str(lrModel.intercept)) |
1 2 3 4 |
Coefficients: [0.21527016513074196,0.2311709240015085,0.18940185142310242,0.1645641575001596, 0.14379558216704935,0.13077908769652233,0.13913018226512663] Intercept: -151708260.82481796 |
1 2 3 |
trainingSummary = lrModel.summary print("RMSE: %f" % trainingSummary.rootMeanSquaredError) print("r2: %f" % trainingSummary.r2) |
1 2 3 |
RMSE: 987874071.735827 r2: 0.997132 |
1 2 3 |
lr_predictions = lrModel.transform(test_df) lr_predictions.select("prediction","2015","features").show(5) |
1 2 3 4 5 6 7 8 9 10 |
+--------------------+------------+--------------------+ | prediction| 2015| features| +--------------------+------------+--------------------+ |-1.51708260824817...| 0.0| (7,[],[])| |-1.51664512094916...| 214.0|[0.0,49675.0,6523...| |-1.51708260824817...| 0.0| (7,[],[])| |-1.51708260824817...| 0.0| (7,[],[])| |2.0796724744210112E8|4.64815895E8|[0.0,0.0,3.681864...| +--------------------+------------+--------------------+ only showing top 5 rows |
1 2 3 4 5 |
from pyspark.ml.evaluation import RegressionEvaluator lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="2015",metricName="r2") print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions)) |
1 |
R Squared (R2) on test data = 0.737201 |
1 2 3 |
test_result = lrModel.evaluate(test_df) print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError) |
1 |
Root Mean Squared Error (RMSE) on test data = 7.15663e+08 |