"### Regularyzacja przez mnożniki Lagrange'a. Algorytm SVM"
"import sys\n",
"import subprocess\n",
"import pkg_resources\n",
"from load_data import get_dataset\n",
"import numpy as np\n",
"from tabulate import tabulate\n",
"required = { 'scikit-image'}\n",
"installed = {pkg.key for pkg in pkg_resources.working_set}\n",
"missing = required - installed\n",
"if missing: \n",
" python = sys.executable\n",
" subprocess.check_call([python, '-m', 'pip', 'install', *missing], stdout=subprocess.DEVNULL)\n",
"def load_train_data(input_dir, newSize=(64,64)):\n",
" import numpy as np\n",
" import pandas as pd\n",
" import os\n",
" from skimage.io import imread\n",
" import cv2 as cv\n",
" from pathlib import Path\n",
" import random\n",
" from shutil import copyfile, rmtree\n",
" import json\n",
" import seaborn as sns\n",
" import matplotlib.pyplot as plt\n",
" import matplotlib\n",
" image_dir = Path(input_dir)\n",
" categories_name = []\n",
" for file in os.listdir(image_dir):\n",
" d = os.path.join(image_dir, file)\n",
" if os.path.isdir(d):\n",
" categories_name.append(file)\n",
" folders = [directory for directory in image_dir.iterdir() if directory.is_dir()]\n",
" train_img = []\n",
" categories_count=[]\n",
" labels=[]\n",
" for i, direc in enumerate(folders):\n",
" count = 0\n",
" for obj in direc.iterdir():\n",
" if os.path.isfile(obj) and os.path.basename(os.path.normpath(obj)) != 'desktop.ini':\n",
" labels.append(os.path.basename(os.path.normpath(direc)))\n",
" count += 1\n",
" img = imread(obj)#zwraca ndarry postaci xSize x ySize x colorDepth\n",
" img = cv.resize(img, newSize, interpolation=cv.INTER_AREA)# zwraca ndarray\n",
" img = img / 255#normalizacja\n",
" train_img.append(img)\n",
" categories_count.append(count)\n",
" X={}\n",
" X[\"values\"] = np.array(train_img)\n",
" X[\"categories_name\"] = categories_name\n",
" X[\"categories_count\"] = categories_count\n",
" X[\"labels\"]=labels\n",
" return X\n",
"def load_test_data(input_dir, newSize=(64,64)):\n",
" import numpy as np\n",
" import pandas as pd\n",
" import os\n",
" from skimage.io import imread\n",
" import cv2 as cv\n",
" from pathlib import Path\n",
" import random\n",
" from shutil import copyfile, rmtree\n",
" import json\n",
" import seaborn as sns\n",
" import matplotlib.pyplot as plt\n",
" import matplotlib\n",
" image_path = Path(input_dir)\n",
" labels_path = image_path.parents[0] / 'test_labels.json'\n",
" jsonString = labels_path.read_text()\n",
" objects = json.loads(jsonString)\n",
" categories_name = []\n",
" categories_count=[]\n",
" count = 0\n",
" c = objects[0]['value']\n",
" for e in objects:\n",
" if e['value'] != c:\n",
" categories_count.append(count)\n",
" c = e['value']\n",
" count = 1\n",
" else:\n",
" count += 1\n",
" if not e['value'] in categories_name:\n",
" categories_name.append(e['value'])\n",
" categories_count.append(count)\n",
" test_img = []\n",
" labels=[]\n",
" for e in objects:\n",
" p = image_path / e['filename']\n",
" img = imread(p)#zwraca ndarry postaci xSize x ySize x colorDepth\n",
" img = cv.resize(img, newSize, interpolation=cv.INTER_AREA)# zwraca ndarray\n",
" img = img / 255#normalizacja\n",
" test_img.append(img)\n",
" labels.append(e['value'])\n",
" X={}\n",
" X[\"values\"] = np.array(test_img)\n",
" X[\"categories_name\"] = categories_name\n",
" X[\"categories_count\"] = categories_count\n",
" X[\"labels\"]=labels\n",
" return X\n",
"from sklearn.preprocessing import LabelEncoder\n",
"# Data load\n",
"data_train = load_train_data(\"train_test_sw/train_sw\", newSize=(16,16))\n",
"X_train = data_train['values']\n",
"y_train = data_train['labels']\n",
"data_test = load_test_data(\"train_test_sw/test_sw\", newSize=(16,16))\n",
"X_test = data_test['values']\n",
"y_test = data_test['labels']\n",
"class_le = LabelEncoder()\n",
"y_train_enc = class_le.fit_transform(y_train)\n",
"y_test_enc = class_le.fit_transform(y_test)\n",
"X_train = X_train.flatten().reshape(X_train.shape[0], int(np.prod(X_train.shape) / X_train.shape[0]))\n",
"X_test = X_test.flatten().reshape(X_test.shape[0], int(np.prod(X_test.shape) / X_test.shape[0]))"
"X_train, y_train, X_test, y_test = get_dataset(new_size=64)"
"#### 1. Zadanie 1 (2pkt): \n",
"# Zadanie 1 (2 pkt)\n",
"Rozwiń algorytm regresji logistycznej z lab. 1, wprowadzając do niego człon regularyzacyjny"
"Rozwiń algorytm regresji logistycznej z lab. 1, wprowadzając do niego człon regularyzacyjny."
"class LogisticRegressionL2():\n",
"class LogisticRegression():\n",
" def __init__(self, l2=1):\n",
" self.l2 = l2\n",
" def indicatorMatrix(self, y):\n",
" classes = np.unique(y.tolist())\n",
" m = len(y)\n",
" k = len(classes)\n",
" m, k = len(y), len(classes)\n",
" # k = len(classes)\n",
" Y = np.matrix(np.zeros((m, k)))\n",
" for i, cls in enumerate(classes):\n",
" Y[:, i] = self.mapY(y, cls)\n",
" \n",
" # Funkcja regresji logistcznej\n",
" def h(self, theta, X):\n",
" return 1.0/(1.0 + np.exp(-X * theta))\n",
" return 1.0 /(1.0 + np.exp(-X * theta))\n",
" \n",
" # Funkcja kosztu dla regresji logistycznej\n",
" def J(self, h, theta, X
" if len(errors) > maxSteps:\n",
" break\n",
" errors.append([errorCurr, theta]) \n",
" return theta, errors\n",
" # return theta, errors\n",
" return theta\n",
" def trainMaxEnt(self, X, Y):\n",
" n = X.shape[1]\n",
" YBi = Y[:,c]\n",
" theta = np.matrix(np.random.random(n)).reshape(n,1)\n",
" # Macierz parametrów theta obliczona dla każdej klasy osobno.\n",
" thetaBest, errors = self.GD(self.h, self.J, self.dJ, theta, \n",
" X, YBi, alpha=0.1, eps=10**-4)\n",
" thetas.append(thetaBest)\n",
" # thetaBest, errors = self.GD(self.h, self.J, self.dJ, theta, \n",
" # X, YBi, alpha=0.1, eps=10**-4)\n",
" # thetas.append(thetaBest)\n",
" thetas.append(self.GD(self.h, self.J, self.dJ, theta, X, YBi, alpha=0.1, eps=10**-4))\n",
" return thetas\n",
" def classify(self, thetas, X):\n",
" regs = np.array([(X*theta).item() for theta in thetas])\n",
" probs = self.softmax(regs)\n",
" result = np.argmax(probs)\n",
" return result\n",
" return np.argmax(self.softmax(regs))\n",
" # probs = self.softmax(regs)\n",
" # result = np.argmax(probs)\n",
" # return result\n",
" def class_score(self, expected, predicted):\n",
" # accuracy = TP + TN / FP + FN + TP + TN\n",
" accuracy = sum(1 for exp, pred in zip(expected, predicted) if exp == pred) / len(expected)\n",
" # precision = TP / FP + TP\n",
" precision = sum(\n",
" 1 for exp, pred in zip(expected, predicted) if exp == 1.0 and pred == 1.0) / sum(\n",
" 1 for exp, pred in zip(expected, predicted) if exp == 1.0)\n",
" # recall = TP / FN + TP\n",
" recall = sum(\n",
" 1 for exp, pred in zip(expected, predicted) if exp == 1.0 and pred == 1.0) / sum(\n",
" 1 for exp, pred in zip(expected, predicted) if pred == 1.0)\n",
" f1 = (2 * precision * recall) / (precision + recall)\n",
" return accuracy, precision, recall, f1\n",
" def fit(self, X_train, y_train):\n",
" # Y = self.indicatorMatrix(y_train)\n",
" # self.thetas = self.trainMaxEnt(X_train, Y)\n",
" self.thetas = self.trainMaxEnt(X_train, self.indicatorMatrix(y_train))\n",
" def predict(self, X_test):\n",
" return np.array([self.classify(self.thetas, x) for x in X_test])\n",
" def accuracy(self, expected, predicted):\n",
" return sum(1 for x, y in zip(expected, predicted) if x == y) / len(expected)"
"C:\\Users\\jonas\\AppData\\Local\\Temp\\ipykernel_16900\\514332287.py:33: RuntimeWarning: divide by zero encountered in log\n",
"/var/folders/7c/v61kq2b95dzbt7s47fxy0grm0000gn/T/ipykernel_25260/2085424405.py:33: RuntimeWarning: divide by zero encountered in log\n",
" s2 = np.multiply((1 - y), np.log(1 - h_val))\n",
"C:\\Users\\jonas\\AppData\\Local\\Temp\\ipykernel_16900\\514332287.py:33: RuntimeWarning: invalid value encountered in multiply\n",
"/var/folders/7c/v61kq2b95dzbt7s47fxy0grm0000gn/T/ipykernel_25260/2085424405.py:33: RuntimeWarning: invalid value encountered in multiply\n",
" s2 = np.multiply((1 - y), np.log(1 - h_val))\n",
"C:\\Users\\jonas\\AppData\\Local\\Temp\\ipykernel_16900\\514332287.py:26: RuntimeWarning: overflow encountered in exp\n",
" return 1.0/(1.0 + np.exp(-X * theta))\n",
"C:\\Users\\jonas\\AppData\\Local\\Temp\\ipykernel_16900\\514332287.py:32: RuntimeWarning: divide by zero encountered in log\n",
"/var/folders/7c/v61kq2b95dzbt7s47fxy0grm0000gn/T/ipykernel_25260/2085424405.py:26: RuntimeWarning: overflow encountered in exp\n",
" return 1.0 /(1.0 + np.exp(-X * theta))\n",
"/var/folders/7c/v61kq2b95dzbt7s47fxy0grm0000gn/T/ipykernel_25260/2085424405.py:32: RuntimeWarning: divide by zero encountered in log\n",
" s1 = np.multiply(y, np.log(h_val))\n",
"C:\\Users\\jonas\\AppData\\Local\\Temp\\ipykernel_16900\\514332287.py:32: RuntimeWarning: invalid value encountered in multiply\n",
" s1 = np.multiply(y, np.log(h_val))\n",
"C:\\Users\\jonas\\AppData\\Local\\Temp\\ipykernel_16900\\514332287.py:22: RuntimeWarning: overflow encountered in exp\n",
"/var/folders/7c/v61kq2b95dzbt7s47fxy0grm0000gn/T/ipykernel_25260/2085424405.py:32: RuntimeWarning: invalid value encountered in multiply\n",
" s1 = np.multiply(y, np.log(h_val))\n"
"# 16x16, l2=0.01 -> 5m 11.4s\n",
"# 32x32, l2=0.1 -> 20m 31.3s\n",
"# 64x64, l2=0.1 -> 219m 10.8s\n",
"logreg = LogisticRegression(l2=0.1)\n",
"logreg.fit(X_train, y_train)"
"/var/folders/7c/v61kq2b95dzbt7s47fxy0grm0000gn/T/ipykernel_25260/2085424405.py:22: RuntimeWarning: overflow encountered in exp\n",
" return np.exp(X) / np.sum(np.exp(X))\n",
"C:\\Users\\jonas\\AppData\\Local\\Temp\\ipykernel_16900\\514332287.py:22: RuntimeWarning: invalid value encountered in true_divide\n",
"/var/folders/7c/v61kq2b95dzbt7s47fxy0grm0000gn/T/ipykernel_25260/2085424405.py:22: RuntimeWarning: invalid value encountered in true_divide\n",
" return np.exp(X) / np.sum(np.exp(X))\n"
"log_reg = LogisticRegressionL2(l2 = 0.1)\n",
"Y = log_reg.indicatorMatrix(y_train_enc)\n",
"thetas = log_reg.trainMaxEnt(X_train, Y)\n",
"predicted = [log_reg.classify(thetas, x) for x in X_test]\n",
"log_reg.accuracy(y_test_enc, np.array(predicted))"
"logreg.accuracy(y_test, logreg.predict(X_test))"
"#### Zadanie 2 (4pkt)\n",
"# Zadanie 2 (4 pkt)\n",
"Zaimplementuj algorytm SVM z miękkim marginesem (regularyzacją)"
"Zaimplementuj algorytm SVM z miękkim marginesem (regularyzacją)."
"from sklearn.base import BaseEstimator, ClassifierMixin\n",
"from sklearn.utils import check_random_state\n",
"from sklearn.preprocessing import LabelEncoder\n",
"class SVM():\n",
" def __init__(self, lr=0.001, lambda_param=10**-6, n_iters=1000):\n",
" self.lr = lr\n",
" self.lambda_param = lambda_param\n",
" self.n_iters = n_iters\n",
" def _mapY(self, y, cls):\n",
" m = len(y)\n",
" yBi = np.matrix(np.zeros(m)).reshape(m, 1)\n",
" yBi[y == cls] = 1.\n",
" return yBi\n",
"def projection_simplex(v, z=1):\n",
" n_features = v.shape[0]\n",
" u = np.sort(v)[::-1]\n",
" cssv = np.cumsum(u) - z\n",
" ind = np.arange(n_features) + 1\n",
" cond = u - cssv / ind > 0\n",
" rho = ind[cond][-1]\n",
" theta = cssv[cond][-1] / float(rho)\n",
" w = np.maximum(v - theta, 0)\n",
" return w\n",
"class MulticlassSVM(BaseEstimator, ClassifierMixin):\n",
" def __init__(self, C=1, max_iter=50, tol=0.05,\n",
" random_state=None, verbose=0):\n",
" self.C = C\n",
" self.max_iter = max_iter\n",
" self.tol = tol,\n",
" self.random_state = random_state\n",
" self.verbose = verbose\n",
" def _partial_gradient(self, X, y, i):\n",
" # Partial gradient for the ith sample.\n",
" g = np.dot(X[i], self.coef_.T) + 1\n",
" g[y[i]] -= 1\n",
" return g\n",
" def _violation(self, g, y, i):\n",
" # Optimality violation for the ith sample.\n",
" smallest = np.inf\n",
" for k in range(g.shape[0]):\n",
" if k == y[i] and self.dual_coef_[k, i] >= self.C:\n",
" continue\n",
" elif k != y[i] and self.dual_coef_[k, i] >= 0:\n",
" continue\n",
" smallest = min(smallest, g[k])\n",
" return g.max() - smallest\n",
" def _solve_subproblem(self, g, y, norms, i):\n",
" # Prepare inputs to the projection.\n",
" Ci = np.zeros(g.shape[0])\n",
" Ci[y[i]] = self.C\n",
" beta_hat = norms[i] * (Ci - self.dual_coef_[:, i]) + g / norms[i]\n",
" z = self.C * norms[i]\n",
" # Compute projection onto the simplex.\n",
" beta = projection_simplex(beta_hat, z)\n",
" return Ci - self.dual_coef_[:, i] - beta / norms[i]\n",
" def _indicatorMatrix(self, y):\n",
" classes = np.unique(y.tolist())\n",
" m, k = len(y), len(classes)\n",
" Y = np.matrix(np.zeros((m, k)))\n",
" for i, cls in enumerate(classes):\n",
" Y[:, i] = self._mapY(y, cls)\n",
" return Y\n",
" def fit(self, X, y):\n",
" n_samples, n_features = X.shape\n",
" n_classes = len(np.unique(y))\n",
" y = self._indicatorMatrix(y)\n",
" y = np.where(y == 0, -1, 1)\n",
" # Normalize labels.\n",
" self._label_encoder = LabelEncoder()\n",
" y = self._label_encoder.fit_transform(y)\n",
" n_features = X.shape[1]\n",
" self.weights, self.biases = [], []\n",
" # Initialize primal and dual coefficients.\n",
" n_classes = len(self._label_encoder.classes_)\n",
" self.dual_coef_ = np.zeros((n_classes, n_samples), dtype=np.float64)\n",
" self.coef_ = np.zeros((n_classes, n_features))\n",
" for cls in range(n_classes):\n",
" y_ = y[:,cls]\n",
" y_ = np.where(y_ <= 0, -1, 1)\n",
" # Pre-compute norms.\n",
" norms = np.sqrt(np.sum(X ** 2, axis=1))\n",
" w, b = np.zeros(n_features), 0\n",
" # Shuffle sample indices.\n",
" rs = check_random_state(self.random_state)\n",
" ind = np.arange(n_samples)\n",
" rs.shuffle(ind)\n",
" for _ in range(self.n_iters):\n",
" for idx, x_i in enumerate(X):\n",
" condition = y_[idx] * (np.dot(x_i, w) - b) >= 1\n",
" if condition:\n",
" w -= self.lr * (2 * self.lambda_param * w)\n",
" else:\n",
" w -= self.lr * (2 * self.lambda_param * w - np.dot(x_i, y_[idx]))\n",
" b -= self.lr * y_[idx]\n",
" self.weights.append(w)\n",
" self.biases.append(b)\n",
" violation_init = None\n",
" for it in range(self.max_iter):\n",
" for ii in range(n_samples):\n",
" i = ind[ii]\n",
" # All-zero samples can be safely ignored.\n",
" if norms[i] == 0:\n",
" continue\n",
" g = self._partial_gradient(X, y, i)\n",
" v = self._violation(g, y, i)\n",
" if v < 1e-12:\n",
" continue\n",
" # Solve subproblem for the ith sample.\n",
" delta = self._solve_subproblem(g, y, norms, i)\n",
" # Update primal and dual coefficients.\n",
" self.coef_ += (delta * X[i][:, np.newaxis]).T\n",
" self.dual_coef_[:, i] += delta\n",
" \n",
" return self\n",
" def _classify(self, x):\n",
" cls = [np.sign(np.dot(x, self.weights[i]) - self.biases[i]) for i in range(len(self.biases))]\n",
" return cls.index(1.0) if 1.0 in cls else 0\n",
" def predict(self, X):\n",
" decision = np.dot(X, self.coef_.T)\n",
" pred = decision.argmax(axis=1)\n",
" return self._label_encoder.inverse_transform(pred)"
" return list(map(lambda x: self._classify(x), X))\n",
" def accuracy(self, expected, predicted):\n",
" return sum(1 for x, y in zip(expected, predicted) if x == y) / len(expected)"
"X_train = data_train['values'][:,:,:,:3]\n",
"X_train_flatten = np.array([x.flatten() for x in X_train])\n",
"y_train = data_train['labels']\n",
"# 16x16 -> 1m 28.8s\n",
"# 32x32 -> 2m 2.2s\n",
"# 64x64 -> 4m 55.3s\n",
"X_test = data_test['values'][:,:,:,:3]\n",
"X_test_flatten = np.array([x.flatten() for x in X_test])\n",
"y_test = data_test['labels']"
"svm = SVM()\n",
"svm.fit(X_train, y_train)\n"
"X, y = X_train_flatten, y_train\n",
"clf = MulticlassSVM(C=0.1, tol=0.01, max_iter=100, random_state=0, verbose=1)\n",
"clf.fit(X, y)\n",
"print(clf.score(X_test_flatten, y_test))"
"svm.accuracy(y_test, svm.predict(X_test))"
"display_name": "Python 3.10.5 64-bit",
