2/26/2024

Dominant frequency extraction.

 



Let's say we have channel x Length signal data ex)EEG (electroencephalogram) or time series data.

We might wonder what dominant Hz is there.

The code analysis this question and return 5 top dominant frequency. 

.

import numpy as np
from collections import Counter
from scipy.signal import welch

def identify_dominant_frequencies(signal, fs, top_n=5):
freqs, psd = welch(signal, fs)
peak_indices = np.argsort(psd)[-top_n:]
dominant_freqs = freqs[peak_indices]
return dominant_freqs

..
dominant_freqs = identify_dominant_frequencies(signal, fs, top_n)
dominant_freqs_summary[channel].extend(dominant_freqs) # Append the frequencies
..
median_dominant_freqs = {channel: np.median(freqs) if freqs else None for channel, freqs in dominant_freqs_summary.items()}
..

def get_top_n_frequencies(freq_list, top_n=5, bin_width=1.0):
# Bin frequencies into discrete intervals
binned_freqs = np.round(np.array(freq_list) / bin_width) * bin_width
# Count the frequency of each binned frequency
freq_counter = Counter(binned_freqs)
# Find the top N most common binned frequencies
top_freqs = freq_counter.most_common(top_n)
# Extract just the frequencies from the top N tuples (freq, count)
top_freqs = [freq for freq, count in top_freqs]
return top_freqs

# Initialize a dictionary to store the top 5 frequencies for each channel
top_5_freqs_all_channels = {}
bin_width = 1.0

# Calculate the top 5 frequencies for each channel
for channel, freqs in dominant_freqs_summary.items():
top_5_freqs = get_top_n_frequencies(freqs, top_n=5, bin_width=bin_width)
top_5_freqs_all_channels[channel] = top_5_freqs
print(f"{channel}: Top 5 Frequencies = {top_5_freqs}")

..


2/18/2024

GroupShuffleSplit, sklearn

 

There are same eeg_id in data, but we can split it based on same id to train, val using GroupShuffleSplit.

Refer to code:

.



import pandas as pd
from sklearn.model_selection import GroupShuffleSplit

# Load your dataset
train = pd.read_csv('./train.csv')

# Display the shape of the dataset
print("Dataset shape:", train.shape)

# Count unique eeg_id values
unique_eeg_id_count = train['eeg_id'].nunique()
print("Unique eeg_id count:", unique_eeg_id_count)

# Initialize the GroupShuffleSplit
gss = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=42)

# Split the dataset based on the 'eeg_id' to ensure group cohesion
for train_idx, val_idx in gss.split(train, groups=train['eeg_id']):
train_set = train.iloc[train_idx]
val_set = train.iloc[val_idx]

# Now, train_set and val_set are split according to unique eeg_ids,
# ensuring that all records of a single eeg_id are in the same subset
print("Training set shape:", train_set.shape)
print("Validation set shape:", val_set.shape)

..

Thank you.

πŸ™‡πŸ»‍♂️

2/15/2024

interpolation 1d data list, ex) [1, 2, 3, 4] -> [1. , 1.5, 2. , 2.5, 3. , 3.5, 4. ]

 

expand and interpolation n by m data to n x (m+l) 

.

    data = np.array( [[1, 2, 3, 4], [4, 3, 2, 1]] )
data_len = 7
x = np.linspace(0, 1, data.shape[-1])
x2 = np.linspace(0, 1, data_len)
f = interp1d(x, data)
data = f(x2)

..

import below lib.

Thank you!!

from scipy.interpolate import interp1d
.
this is output:

array([[1. , 1.5, 2. , 2.5, 3. , 3.5, 4. ],
                 [4. , 3.5, 3. , 2.5, 2. , 1.5, 1. ]])

2/10/2024

pytorch lightning, save pth with ckpt for top k

 


it's custom checkpoint function

.

class CustomModelCheckpoint(ModelCheckpoint):
def __init__(self, save_top_k_pth=0, *args, **kwargs):
super(CustomModelCheckpoint, self).__init__(*args, **kwargs)
self.save_top_k_pth = save_top_k_pth
# Keep track of saved .pth files to manage the top K
self.saved_pth_files = []

def on_save_checkpoint(self, trainer, pl_module, checkpoint):
# Construct checkpoint path manually (simplified example)
epoch = trainer.current_epoch
metric_score = "{:.2f}".format(trainer.callback_metrics['val_loss'].item())
filename = f"model-epoch={epoch}-val_loss={metric_score}.pth"
dirpath = self.dirpath if self.dirpath else trainer.default_root_dir
pth_path = os.path.join(dirpath, filename)

torch.save(pl_module.state_dict(), pth_path)
self.saved_pth_files.append(pth_path)
# Manage the top K saved .pth files
while len(self.saved_pth_files) > self.save_top_k_pth:
oldest_pth = self.saved_pth_files.pop(0)
if os.path.exists(oldest_pth):
os.remove(oldest_pth)

# Ensure to call the superclass method
return super().on_save_checkpoint(trainer, pl_module, checkpoint)

..


call it on training process

.

logger = loggers.TensorBoardLogger(save_dir="lightning_logs", name=config.model_version)

# Define the checkpoint callback
checkpoint_callback = CustomModelCheckpoint(
monitor='val_loss',
dirpath=f"{logger.save_dir}/{logger.name}/version_{logger.version}",
filename='model-{epoch:02d}-{val_loss:.2f}',
save_top_k=2, # Top 2 checkpoints
save_top_k_pth=2, # Also save top 2 .pth files
mode='min'
)

trainer = Trainer(max_epochs=config.num_epochs, accelerator='gpu',
devices=1, callbacks=[checkpoint_callback],
logger=logger, log_every_n_steps=10)

..



saved top k files (ckpt, pth) file showing up on folder.

Thank you.

πŸ™‡πŸ»‍♂️

2/08/2024

git find large big file which committed.

 Find large file in GitHub repository

.

git rev-list --objects --all | \
git cat-file --batch-check='%(objecttype) %(objectname) %(objectsize) %(rest)' | \
awk '$3 > 100*1024*1024' | sort -k3nr


..

  • git rev-list --objects --all lists all objects in the repository.
  • git cat-file --batch-check='...' checks the type, size, and other details of these objects.
  • awk '$3 > 100*1024*1024' filters objects larger than 100 MB (note: 1024*1024 bytes = 1MB).
  • sort -k3nr sorts these objects by size in descending order.

πŸ™‡πŸ»‍♂️

2/06/2024

iOS swift dictionary example code

 .


// Existing dictionary of ages

var ages: [String: Int] = ["John": 30, "Emma": 25]


// Adding a new dictionary with String keys and String values

var occupations: [String: String] = ["John": "Engineer", "Emma": "Doctor"]


// Adding a new key-value pair to the occupations dictionary

occupations["Mike"] = "Teacher"


// Updating a value for a key in the occupations dictionary

occupations["Emma"] = "Senior Doctor" // Emma got a promotion!


// Accessing a value for a given key in the occupations dictionary

if let occupation = occupations["John"] {

    print("John's occupation is \(occupation).")

} else {

    print("John's occupation is not available.")

}


// Merging the ages and occupations dictionaries

// Assuming you want to create a summary for each person

for (name, age) in ages {

    if let occupation = occupations[name] {

        print("\(name) is \(age) years old and works as a \(occupation).")

    } else {

        print("\(name) is \(age) years old.")

    }

}


// Removing a key-value pair from the occupations dictionary

occupations["Mike"] = nil // Mike's occupation is removed


// Iterating over all key-value pairs in the occupations dictionary

for (name, occupation) in occupations {

    print("\(name) works as a \(occupation).")

}


// Checking the count of elements in both dictionaries

print("There are \(ages.count) people in the ages dictionary.")

print("There are \(occupations.count) occupations listed.")

..


refer to code, hope to get some useful idea.

Thank you.

πŸ™‡πŸ»‍♂️

2/05/2024

Download all YouTube videos in playlist (python)

pip install pytube

replace playlist url in string

.

from pytube import Playlist, YouTube

def download_video(url, max_attempts=3):
for attempt in range(1, max_attempts + 1):
try:
yt = YouTube(url)
video = yt.streams.get_highest_resolution()
video.download()
print(f"Downloaded: {yt.title}")
break
except Exception as e:
print(f"Error downloading video (attempt {attempt}): {url}\n{e}")
if attempt == max_attempts:
print(f"Failed to download video after {max_attempts} attempts: {url}")

# Replace with your playlist URL
playlist_url = 'https://www.youtube.com/playlist?list=xxx'

playlist = Playlist(playlist_url)

# Fetch video URLs
video_urls = playlist.video_urls

# Download each video
for url in video_urls:
download_video(url)

..


Thank you.

πŸ™‡πŸ»‍♂️