Symulowanie-wizualne/sw_lab3.ipynb
Iwona Christop faa0065e91 Add lab3
2022-11-29 10:43:26 +01:00

17 KiB

Regularyzacja przez mnożniki Lagrange'a. Algorytm SVM

import sys
import subprocess
import pkg_resources
import numpy as np

required = { 'scikit-image'}
installed = {pkg.key for pkg in pkg_resources.working_set}
missing = required - installed

if missing: 
    python = sys.executable
    subprocess.check_call([python, '-m', 'pip', 'install', *missing], stdout=subprocess.DEVNULL)

def load_train_data(input_dir, newSize=(64,64)):
    import numpy as np
    import pandas as pd
    import os
    from skimage.io import imread
    import cv2 as cv
    from pathlib import Path
    import random
    from shutil import copyfile, rmtree
    import json

    import seaborn as sns
    import matplotlib.pyplot as plt

    import matplotlib
    
    image_dir = Path(input_dir)
    categories_name = []
    for file in os.listdir(image_dir):
        d = os.path.join(image_dir, file)
        if os.path.isdir(d):
            categories_name.append(file)

    folders = [directory for directory in image_dir.iterdir() if directory.is_dir()]

    train_img = []
    categories_count=[]
    labels=[]
    for i, direc in enumerate(folders):
        count = 0
        for obj in direc.iterdir():
            if os.path.isfile(obj) and os.path.basename(os.path.normpath(obj)) != 'desktop.ini':
                labels.append(os.path.basename(os.path.normpath(direc)))
                count += 1
                img = imread(obj)#zwraca ndarry postaci xSize x ySize x colorDepth
                img = cv.resize(img, newSize, interpolation=cv.INTER_AREA)# zwraca ndarray
                img = img / 255#normalizacja
                train_img.append(img)
        categories_count.append(count)
    X={}
    X["values"] = np.array(train_img)
    X["categories_name"] = categories_name
    X["categories_count"] = categories_count
    X["labels"]=labels
    return X

def load_test_data(input_dir, newSize=(64,64)):
    import numpy as np
    import pandas as pd
    import os
    from skimage.io import imread
    import cv2 as cv
    from pathlib import Path
    import random
    from shutil import copyfile, rmtree
    import json

    import seaborn as sns
    import matplotlib.pyplot as plt

    import matplotlib

    image_path = Path(input_dir)

    labels_path = image_path.parents[0] / 'test_labels.json'

    jsonString = labels_path.read_text()
    objects = json.loads(jsonString)

    categories_name = []
    categories_count=[]
    count = 0
    c = objects[0]['value']
    for e in  objects:
        if e['value'] != c:
            categories_count.append(count)
            c = e['value']
            count = 1
        else:
            count += 1
        if not e['value'] in categories_name:
            categories_name.append(e['value'])

    categories_count.append(count)
    
    test_img = []

    labels=[]
    for e in objects:
        p = image_path / e['filename']
        img = imread(p)#zwraca ndarry postaci xSize x ySize x colorDepth
        img = cv.resize(img, newSize, interpolation=cv.INTER_AREA)# zwraca ndarray
        img = img / 255#normalizacja
        test_img.append(img)
        labels.append(e['value'])

    X={}
    X["values"] = np.array(test_img)
    X["categories_name"] = categories_name
    X["categories_count"] = categories_count
    X["labels"]=labels
    return X

from sklearn.preprocessing import LabelEncoder

# Data load
data_train = load_train_data("train_test_sw/train_sw", newSize=(16,16))
X_train = data_train['values']
y_train = data_train['labels']

data_test = load_test_data("train_test_sw/test_sw", newSize=(16,16))
X_test = data_test['values']
y_test = data_test['labels']

class_le = LabelEncoder()
y_train_enc = class_le.fit_transform(y_train)
y_test_enc = class_le.fit_transform(y_test)

X_train = X_train.flatten().reshape(X_train.shape[0], int(np.prod(X_train.shape) / X_train.shape[0]))
X_test = X_test.flatten().reshape(X_test.shape[0], int(np.prod(X_test.shape) / X_test.shape[0]))

1. Zadanie 1 (2pkt):

Rozwiń algorytm regresji logistycznej z lab. 1, wprowadzając do niego człon regularyzacyjny

class LogisticRegressionL2():
    def __init__(self, l2=1):
        self.l2 = l2

    def mapY(self, y, cls):
        m = len(y)
        yBi = np.matrix(np.zeros(m)).reshape(m, 1)
        yBi[y == cls] = 1.
        return yBi

    def indicatorMatrix(self, y):
        classes = np.unique(y.tolist())
        m = len(y)
        k = len(classes)
        Y = np.matrix(np.zeros((m, k)))
        for i, cls in enumerate(classes):
            Y[:, i] = self.mapY(y, cls)
        return Y
    
    # Zapis macierzowy funkcji softmax
    def softmax(self, X):
        return np.exp(X) / np.sum(np.exp(X))
    
    # Funkcja regresji logistcznej
    def h(self, theta, X):
        return 1.0/(1.0 + np.exp(-X * theta))
    
    # Funkcja kosztu dla regresji logistycznej
    def J(self, h, theta, X, y):
        m = len(y)
        h_val = h(theta, X)
        s1 = np.multiply(y, np.log(h_val))
        s2 = np.multiply((1 - y), np.log(1 - h_val))
        s3 = np.sum(s1+s2, axis=0)/m
        s4 = (self.l2 * np.sum(np.square(theta))) / 2*m
        return -s3 + s4

    # Gradient dla regresji logistycznej
    def dJ(self, h, theta, X, y):
        return 1.0 / (self.l2/len(y)) * (X.T * (h(theta, X) - y))

    # Metoda gradientu prostego dla regresji logistycznej
    def GD(self, h, fJ, fdJ, theta, X, y, alpha=0.01, eps=10**-3, maxSteps=10000):
        errorCurr = fJ(h, theta, X, y)
        errors = [[errorCurr, theta]]
        while True:
            # oblicz nowe theta
            theta = theta - alpha * fdJ(h, theta, X, y)
            # raportuj poziom błędu
            errorCurr, errorPrev = fJ(h, theta, X, y), errorCurr
            # kryteria stopu
            if abs(errorPrev - errorCurr) <= eps:
                break
            if len(errors) > maxSteps:
                break
            errors.append([errorCurr, theta]) 
        return theta, errors

    def trainMaxEnt(self, X, Y):
        n = X.shape[1]
        thetas = []
        for c in range(Y.shape[1]):
            YBi = Y[:,c]
            theta = np.matrix(np.random.random(n)).reshape(n,1)
            # Macierz parametrów theta obliczona dla każdej klasy osobno.
            thetaBest, errors = self.GD(self.h, self.J, self.dJ, theta, 
                                X, YBi, alpha=0.1, eps=10**-4)
            thetas.append(thetaBest)
        return thetas

    def classify(self, thetas, X):
        regs = np.array([(X*theta).item() for theta in thetas])
        probs = self.softmax(regs)
        result = np.argmax(probs)
        return result

    def accuracy(self, expected, predicted):
        return sum(1 for x, y in zip(expected, predicted) if x == y) / len(expected)
log_reg = LogisticRegressionL2(l2 = 0.1)

Y = log_reg.indicatorMatrix(y_train_enc)
thetas = log_reg.trainMaxEnt(X_train, Y)

predicted = [log_reg.classify(thetas, x) for x in X_test]

log_reg.accuracy(y_test_enc, np.array(predicted))
C:\Users\jonas\AppData\Local\Temp\ipykernel_16900\514332287.py:33: RuntimeWarning: divide by zero encountered in log
  s2 = np.multiply((1 - y), np.log(1 - h_val))
C:\Users\jonas\AppData\Local\Temp\ipykernel_16900\514332287.py:33: RuntimeWarning: invalid value encountered in multiply
  s2 = np.multiply((1 - y), np.log(1 - h_val))
C:\Users\jonas\AppData\Local\Temp\ipykernel_16900\514332287.py:26: RuntimeWarning: overflow encountered in exp
  return 1.0/(1.0 + np.exp(-X * theta))
C:\Users\jonas\AppData\Local\Temp\ipykernel_16900\514332287.py:32: RuntimeWarning: divide by zero encountered in log
  s1 = np.multiply(y, np.log(h_val))
C:\Users\jonas\AppData\Local\Temp\ipykernel_16900\514332287.py:32: RuntimeWarning: invalid value encountered in multiply
  s1 = np.multiply(y, np.log(h_val))
C:\Users\jonas\AppData\Local\Temp\ipykernel_16900\514332287.py:22: RuntimeWarning: overflow encountered in exp
  return np.exp(X) / np.sum(np.exp(X))
C:\Users\jonas\AppData\Local\Temp\ipykernel_16900\514332287.py:22: RuntimeWarning: invalid value encountered in true_divide
  return np.exp(X) / np.sum(np.exp(X))
0.5714285714285714

Zadanie 2 (4pkt)

Zaimplementuj algorytm SVM z miękkim marginesem (regularyzacją)

from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils import check_random_state
from sklearn.preprocessing import LabelEncoder


def projection_simplex(v, z=1):
    n_features = v.shape[0]
    u = np.sort(v)[::-1]
    cssv = np.cumsum(u) - z
    ind = np.arange(n_features) + 1
    cond = u - cssv / ind > 0
    rho = ind[cond][-1]
    theta = cssv[cond][-1] / float(rho)
    w = np.maximum(v - theta, 0)
    return w


class MulticlassSVM(BaseEstimator, ClassifierMixin):

    def __init__(self, C=1, max_iter=50, tol=0.05,
                 random_state=None, verbose=0):
        self.C = C
        self.max_iter = max_iter
        self.tol = tol,
        self.random_state = random_state
        self.verbose = verbose

    def _partial_gradient(self, X, y, i):
        # Partial gradient for the ith sample.
        g = np.dot(X[i], self.coef_.T) + 1
        g[y[i]] -= 1
        return g

    def _violation(self, g, y, i):
        # Optimality violation for the ith sample.
        smallest = np.inf
        for k in range(g.shape[0]):
            if k == y[i] and self.dual_coef_[k, i] >= self.C:
                continue
            elif k != y[i] and self.dual_coef_[k, i] >= 0:
                continue

            smallest = min(smallest, g[k])

        return g.max() - smallest

    def _solve_subproblem(self, g, y, norms, i):
        # Prepare inputs to the projection.
        Ci = np.zeros(g.shape[0])
        Ci[y[i]] = self.C
        beta_hat = norms[i] * (Ci - self.dual_coef_[:, i]) + g / norms[i]
        z = self.C * norms[i]

        # Compute projection onto the simplex.
        beta = projection_simplex(beta_hat, z)

        return Ci - self.dual_coef_[:, i] - beta / norms[i]

    def fit(self, X, y):
        n_samples, n_features = X.shape

        # Normalize labels.
        self._label_encoder = LabelEncoder()
        y = self._label_encoder.fit_transform(y)

        # Initialize primal and dual coefficients.
        n_classes = len(self._label_encoder.classes_)
        self.dual_coef_ = np.zeros((n_classes, n_samples), dtype=np.float64)
        self.coef_ = np.zeros((n_classes, n_features))

        # Pre-compute norms.
        norms = np.sqrt(np.sum(X ** 2, axis=1))

        # Shuffle sample indices.
        rs = check_random_state(self.random_state)
        ind = np.arange(n_samples)
        rs.shuffle(ind)

        violation_init = None
        for it in range(self.max_iter):

            for ii in range(n_samples):
                i = ind[ii]

                # All-zero samples can be safely ignored.
                if norms[i] == 0:
                    continue

                g = self._partial_gradient(X, y, i)
                v = self._violation(g, y, i)

                if v < 1e-12:
                    continue

                # Solve subproblem for the ith sample.
                delta = self._solve_subproblem(g, y, norms, i)

                # Update primal and dual coefficients.
                self.coef_ += (delta * X[i][:, np.newaxis]).T
                self.dual_coef_[:, i] += delta

                
        return self

    def predict(self, X):
        decision = np.dot(X, self.coef_.T)
        pred = decision.argmax(axis=1)
        return self._label_encoder.inverse_transform(pred)
X_train = data_train['values'][:,:,:,:3]
X_train_flatten = np.array([x.flatten() for x in X_train])
y_train = data_train['labels']

X_test = data_test['values'][:,:,:,:3]
X_test_flatten = np.array([x.flatten() for x in X_test])
y_test = data_test['labels']
X, y = X_train_flatten, y_train

clf = MulticlassSVM(C=0.1, tol=0.01, max_iter=100, random_state=0, verbose=1)
clf.fit(X, y)
print(clf.score(X_test_flatten, y_test))
0.6988416988416989