Run DPAT Application on Unified Server#
About this tutorial#
The Unified Server can function as a replication server for the Container Hub registry, allowing us to run the DPAT application on it without the need to create an Edge Server.
Purpose#
In this tutorial, you will learn how to:
Run DPAT app on Unified Server by app descriptor
Run SMT8 test program to get test limits from the DPAT app
Compatibility and Prerequisites#
System Compatibility#
SmarTest 8 / Nexus 3.1.0 / Edge 3.4.0-prod/ Unified Server 2.3.0-prod
Prerequisites#
Two Container Hub accounts are required:
Two organization(org) accounts configured with the App Descriptor project are required: one org administrator (admin) account and one standard user account.
A project in the ACS Container Hub is also required.
Environment Preparation#
Note: The ‘adv-dpat’ project we are using below is for demonstration purposes. You will need to replace it with your own project accordingly.
Create Essential VMs for RTDI#
Click the “Add” button to add VMs:
Create Host Controller and Edge Server#
Create Unified Server#
Configure Unified Server#
Configure Unified Server by the following.
Replicate projects from Container Hub to Unified Server on Harbor
Create new endpoint
Click “NEW ENDPOINT” in “Registries” page
In the popped up “New Registry Endpoint” dialog, input these entries:
Name: “dpat-container-hub”
Endpoint URL: “https://registry.advantest.com”
Access ID: org user account
Access Secret: the account’s docker secret
Then click “TEST CONNECTION”, after that you will see “Connection Tested successfully” on the top of the dialog, click “OK” to submit the new endpoint to Harbor finally.
Create new replication rule
Click “NEW REPLICATION RULE” in “Replications” page
In the popped up “New Replication Rule” dialog, input or select these entries:
Name: input “container-hub-repl”
Replication Mode: click “Pull-based”
Source registry: select “dpat-container-hub”
Flattening: select “Flatten All Levels”
Trigger Mode: select “Scheduled”
Cron String: input “0 0 */2 * * *” (This means that the replication will be executed every 2 hours automatically)
Then click “SAVE” to submit the new replication rule
Replicate Container hub manually
Wait about 20 minutes for completing replication
You can confirm in “Executions” section of page, the “status” is Succeeded, the “Success Rate” is 100%
Now the DPAT app image is replicated to Unified Server.
Note that: If you have modified DPAT app code and push new docker image to Container hub, you need to manually trigger replication again from the “Replications” page or wait for the automatic replication done.
Application Deployment#
Note: The ‘adv-dpat’ project we are using below is for demonstration purposes. You will need to replace it with your own project accordingly.
Transfer demo program to the Host Controller#
Download the test program
Click to download the application-dpat-v3.1.0-RHEL74.tar.gz archive(a simple DPAT algorithm in Python) to your computer.
Note that: If you are using “RHEL79_ST8.7” VM image for running SMT8, you can download the application-dpat-v3.1.0-RHEL79.tar.gz archive, it contains code enhanced SMT8 test method codes.
Transfer the file to the ~/apps directory on the Host Controller VM
Please refer to the “Transferring files” section of VM Management page.
In the VNC GUI, extract files in the bash console.
For RHEL74:
cd ~/apps/ tar -zxf application-dpat-v3.1.0-RHEL74.tar.gz
For RHEL79:
cd ~/apps/ tar -zxf application-dpat-v3.1.0-RHEL79.tar.gz
Create Docker image for DPAT app#
Load the DPAT base image.
cd ~ wget -N http://10.44.5.139/docker/python39-basic20.tar.zip unzip python39-basic20.tar.zip sudo docker load -i python39-basic20.tar
Log in with the org user account in order to push DPAT docker image
sudo docker login registry.advantest.com --username ChangeToUserName --password ChangeToSecret
Navigate to the DPAT app directory(~/apps/application-dpat-v3.1.0/rd-app_dpat_py), you can find these files:
Dockerfile: this is used for building DPAT app docker image
Click to expand!
FROM registry.advantest.com/adv-dpat/python39-basic:2.0 RUN yum install -y openssh-server RUN mkdir /var/run/sshd RUN echo 'root:root123' | chpasswd RUN sed -i 's/PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config RUN sed 's@session\s*required\s*pam_loginuid.so@session optional pam_loginuid.so@g' -i /etc/pam.d/sshd EXPOSE 22 # set nexus env variables WORKDIR / RUN python3.9 -m pip install pandas jsonschema # copy run files and directories RUN mkdir -p /dpat-app;chmod a+rwx /dpat-app RUN mkdir -p /dpat-app/data;chmod a+rwx /dpat-app/data RUN touch dpat-app/__init__.py COPY workdir /dpat-app/workdir COPY conf /dpat-app/conf # RUN yum -y install libunwind-devel RUN echo 'export $(cat /proc/1/environ |tr "\0" "\n" | xargs)' >> /etc/profile ENV LOG_FILE_PATH "/tmp/app.log" ENV DFF_FLAG="False" WORKDIR /dpat-app/workdir RUN /usr/bin/ssh-keygen -A RUN echo "python3.9 -u /dpat-app/workdir/run_dpat.py&" >> /start.sh RUN echo "/usr/sbin/sshd -D" >> /start.sh CMD sh /start.sh
workdir/run_dpat.py: this is the main entry point for the DPAT app
Click to expand!
""" Copyright 2023 ADVANTEST CORPORATION. All rights reserved Dynamic Part Average Testing This module allows the user to calculate new limits based on dynamic part average testing. It is assumed that this is being run on the docker container with the Nexus connection. """ import json from AdvantestLogging import logger from dpat import DPAT from oneapi import OneAPI, send_command def run(nexus_data, args, save_result_fn): """Callback function that will be called upon receiving data from OneAPI Args: nexus_data: Datalog coming from nexus args: Arguments set by user """ base_limits = args["baseLimits"] dpat = args["DPAT"] # persistent class # compute dpat to get new high/low limits dpat.compute_once(nexus_data, base_limits, logger, args) results_df, new_limits = dpat.datalog() # logger.info(new_limits) if args["VariableControl"] == True: # Send dpat computed data back to Host Controller Nexus, the Smartest will receive the data from Nexus send_command(new_limits, "VariableControl") save_result_fn(results_df) def main(): """Set logger and call OneAPI""" logger.info("Starting DPAT datacolection and computation") logger.info("Copyright 2022 - Advantest America Inc") with open("data/base_limits.json", encoding="utf-8") as json_file: base_limits = json.load(json_file) args = { "DPAT": DPAT(), "baseLimits": base_limits, "config_path": "../conf/test_suites.ini", "setPathStorage": "data", "setPrefixStorage": "Demo_Dpat_123456", "saveStat": True, "VariableControl": False, # Note: smt version is not visible from run_dpat, so it is set in sample.py in consumeLotStart } logger.info("Starting OneAPI") """ Create a OneAPI instance to receive Smartest data from Host Controller Nexus, the callback function "run" will be called when data received with the parameter in the "args" """ oneapi = OneAPI(callback_fn=run, callback_args=args) oneapi.start() if __name__ == "__main__": main()
workdir/dpat.py: this is the core algorithm for DPAT calculation
Click to expand!
""" Copyright 2023 ADVANTEST CORPORATION. All rights reserved This module contains a class and modules that allow the user to calculate new limits based on dynamic part average testing. It is assumed that this is being run on the docker container with the Nexus connection, being called by run_dpat.py. """ import time from typing import Dict, List import numpy as np import pandas as pd class DPAT: """ This class allows the user to calculate new limits based on dynamic part average testing. It is assumed that this is being run on the docker container with the Nexus connection. Contains the following methods: * save_stat(results, df_cumulative_raw, args) * check_new_limits(df_new_limits) * save_stat(results, df_cumulative_raw, args) * stdev_compute(df_cumulative_raw, return_columns) * stdev_full_compute(df_stdev_full, return_columns) * iqr_full_compute(df_iqr_full, return_columns) * iqr_window_compute(df_iqr_full, return_columns) * stdev_compute(df_cumulative_raw, return_columns) * compute(df_base_limits, df_cumulative_raw, args) * datalog(nexus_data, df_base_limits, df_cumulative_raw, args) * run(data, args) * main - the main function of the script """ def __init__(self, ): self.counter = 0 def compute_once(self, nexus_data: str, base_limits: pd.DataFrame, logger, args): """Preprocess data, setting base values and columns. Should run only once. Args: nexus_data: string with pin values coming from nexus base_limits: DataFrame with base limits and parameters args: Dictionary with arguments set on main by user """ self.counter += 1 self.nexus_data = nexus_data self.args = args self.logger = logger self.df_base_limits, self.df_cumulative_raw = self.preprocess_data(base_limits) def save_stat( self, results: pd.DataFrame, df_cumulative_raw: pd.DataFrame, args: Dict ): """Save new limits and raw file into csv. Args: df_base_limits: DataFrame with base limits and parameters df_cumulative_raw: DataFrame with base values and columns args: Dictionary with argsurations parameters set from user """ self.logger.info("=> Start save_stat") set_path_storage = args.get("setPathStorage") set_prefix_storage = args.get("setPrefixStorage") file_to_save_raw = set_path_storage + "/" + set_prefix_storage + "_raw.csv" file_to_save = set_path_storage + "/" + set_prefix_storage + "_stdev.csv" df_cumulative_raw.to_csv(file_to_save_raw, mode="a", index=False) results.to_csv(file_to_save, mode="a", index=False) self.logger.info("=> End save_stat") def check_new_limits(self, df_new_limits: pd.DataFrame) -> List: """Check new limits against existing thresholds and return ids that are off limits. The ids that don't meet the criteria are resetted to base limits. Args: df_new_limits: DataFrame calculated limits Return: test_ids: List with test_ids that are off the base limits """ # New limits that are below user defined threshold below_threshold = df_new_limits[ (df_new_limits["limit_Usl"] - df_new_limits["limit_Lsl"]) < (df_new_limits["PassRangeUsl"] - df_new_limits["PassRangeLsl"]) * df_new_limits["DPAT_Threshold"] ] # New limits that are below base lower limit below_lsl = df_new_limits[ (df_new_limits["N"] < df_new_limits["DPAT_Samples"]) | (df_new_limits["limit_Lsl"] < df_new_limits["PassRangeLsl"]) * df_new_limits["DPAT_Threshold"] ] # New limits that are above base upper limit above_usl = df_new_limits[ (df_new_limits["N"] < df_new_limits["DPAT_Samples"]) | (df_new_limits["limit_Usl"] > df_new_limits["PassRangeUsl"]) * df_new_limits["DPAT_Threshold"] ] test_ids = [] if not above_usl.empty: test_ids = above_usl.TestId.unique().tolist() if not below_lsl.empty: test_ids = test_ids + below_lsl.TestId.unique().tolist() if not below_threshold.empty: test_ids = test_ids + below_threshold.TestId.unique().tolist() return list(set(test_ids)) # return unique testd_ids def stdev_full_compute( self, df_stdev_full: pd.DataFrame, return_columns: List ) -> pd.DataFrame: """Compute stdev method for specific test_ids. It updates the lower and upper limits following this formula: LPL = Mean - <dpat_sigma> * Sigma UPL = Mean + <dpat_sigma> * Sigma, Where <dpat_sigma> is an user defined parameter. Args: df_stdev_full: DataFrame with base values and columns return_columns: Columns to filter returned dataframe on Return: df_result: DataFrame with calculated stdev full and specific return columns """ df_stdev_full.loc[:, 'N'] = \ df_stdev_full.groupby('TestId')['TestId'].transform('count').values df_n = df_stdev_full[df_stdev_full["N"] > 1] df_one = df_stdev_full[df_stdev_full["N"] <= 1] mean = df_n.groupby("TestId")["PinValue"].transform(np.mean) stddev = df_n.groupby("TestId")["PinValue"].transform(lambda x: np.std(x, ddof=1)) # Calculate new limits according to stdev sample df_n.loc[:, "stdev_all"] = stddev.values df_n.loc[:, "limit_Lsl"] = (mean - (stddev * df_n["DPAT_Sigma"])).values df_n.loc[:, "limit_Usl"] = (mean + (stddev * df_n["DPAT_Sigma"])).values # Assign base limits to test_ids with a single instance df_one.loc[:, "limit_Lsl"] = df_one["PassRangeLsl"].values df_one.loc[:, "limit_Usl"] = df_one["PassRangeUsl"].values df_one.loc[:, "stdev_all"] = 0 df_result = pd.concat([df_one, df_n])[return_columns].drop_duplicates() return df_result def stdev_window_compute( self, df_stdev_window: pd.DataFrame, return_columns: List ) -> pd.DataFrame: """Compute stdev method for a specific window of test_ids. The mean and sigma are calculated for that window, and then it updates the lower and upper limits following this formula: LPL = Mean - <dpat_sigma> * Sigma UPL = Mean + <dpat_sigma> * Sigma, Where <dpat_sigma> is an user defined parameter. Args: df_stdev_window: DataFrame with base values and columns return_columns: Columns to filter returned dataframe on Return: df_result: DataFrame with calculated stdev window and specific return columns """ df_stdev_window.loc[:, 'N'] = \ df_stdev_window.groupby('TestId')['TestId'].transform('count').values # Separate test_ids with more than one sample df_n = df_stdev_window[df_stdev_window["N"] > 1] df_one = df_stdev_window[df_stdev_window["N"] <= 1] # Slice df according to window_size df_stdev_window.loc[:, 'N'] = df_stdev_window["DPAT_Window_Size"].values df_window_size = df_n.groupby("TestId").apply( lambda x: x.iloc[int(-x.N.values[0]):] ).reset_index(drop=True) if not df_n.empty: # Calculate new limits mean = df_window_size.groupby("TestId")["PinValue"].transform(np.mean).values stddev = df_window_size.groupby("TestId")["PinValue"].transform( lambda x: np.std(x, ddof=1) ).values df_n.loc[:, "stdev_all"] = stddev df_n.loc[:, "limit_Lsl"] = (mean - (stddev * df_n["DPAT_Sigma"])).values df_n.loc[:, "limit_Usl"] = (mean + (stddev * df_n["DPAT_Sigma"])).values # Assign base limits to test_ids with a single instance df_one.loc[:, "limit_Lsl"] = df_one["PassRangeLsl"].values df_one.loc[:, "limit_Usl"] = df_one["PassRangeUsl"].values df_one.loc[:, "stdev_all"] = 0 df_result = pd.concat([df_n, df_one])[return_columns].drop_duplicates() return df_result def iqr_full_compute( self, df_iqr_full: pd.DataFrame, return_columns: List ) -> pd.DataFrame: """ Update the lower and upper limits following this formula: IQR = Upper Quartile(Q3) - Lower Quartile(Q1) LSL = Q1 - 1.5 IQR USL = Q3 + 1.5 IQR Args: df_iqr_full: DataFrame with base values and columns return_columns: Columns to filter returned dataframe on Return: df_result: DataFrame with calculated IQR full and specific return columns """ window_size = df_iqr_full["DPAT_Window_Size"] df_iqr_full.loc[:, "N"] = window_size df_iqr_full.loc[:, 'N'] = df_iqr_full.groupby('TestId')['TestId'].transform('count') # Separate test_ids with more than one sample df_n = df_iqr_full[df_iqr_full["N"] > 1] df_one = df_iqr_full[df_iqr_full["N"] <= 1] if not df_n.empty: # Compute new limits first_quartile = df_n.groupby("TestId").PinValue.quantile(q=0.25) upper_quartile = df_n.groupby("TestId").PinValue.quantile(q=0.75) iqr = upper_quartile - first_quartile multiple = df_n.groupby("TestId")["DPAT_IQR_Multiple"].first() df_n = df_n[return_columns].drop_duplicates() df_n.loc[:, "limit_Lsl"] = (first_quartile - (multiple * iqr)).values df_n.loc[:, "limit_Usl"] = (upper_quartile + (multiple * iqr)).values df_n.loc[:, "q1_all"] = first_quartile.values df_n.loc[:, "q3_all"] = upper_quartile.values # Assign base limits to test_ids with a single instance df_one.loc[:, "limit_Lsl"] = df_one["PassRangeLsl"].values df_one.loc[:, "limit_Usl"] = df_one["PassRangeUsl"].values df_one.loc[:, "q1_all"] = "N/A" df_one.loc[:, "q3_all"] = "N/A" df_result = pd.concat([df_n, df_one])[return_columns] return df_result def iqr_window_compute( self, df_iqr_window: pd.DataFrame, return_columns: List ) -> pd.DataFrame: """Before calculating the limits, it gets slices the df according to the <window_size>. Then it updates the lower and upper limits following this formula: IQR = Upper Quartile(Q3) - Lower Quartile(Q1) LSL = Q1 - 1.5 IQR USL = Q3 + 1.5 IQR Args: df_iqr_window: DataFrame with base values and columns return_columns: Columns to filter returned dataframe on Return: df_result: DataFrame with calculated IQR window and specific return columns """ window_size = df_iqr_window["DPAT_Window_Size"].values df_iqr_window.loc[:, "window_size"] = window_size df_iqr_window.loc[:, 'N'] = \ df_iqr_window.groupby('TestId')['TestId'].transform('count').values # Separate test_ids with more than one sample df_n = df_iqr_window[df_iqr_window["N"] > 1] df_one = df_iqr_window[df_iqr_window["N"] <= 1] if not df_n.empty: # Slice df according to window_size df_window_size = df_n.groupby("TestId").apply( lambda x: x.iloc[int(-x.window_size.values[0]):] ).reset_index(drop=True) # Calculate IQR first_quartile = df_window_size.groupby(["TestId"]).PinValue.quantile(q=0.25) upper_quartile = df_window_size.groupby(["TestId"]).PinValue.quantile(q=0.75) iqr = upper_quartile - first_quartile multiple = df_n.groupby("TestId")["DPAT_IQR_Multiple"].first() df_n = df_n[return_columns].drop_duplicates() # Calculate new limits df_n.loc[:, "limit_Lsl"] = (first_quartile - (multiple * iqr)).values df_n.loc[:, "limit_Usl"] = (upper_quartile + (multiple * iqr)).values df_n.loc[:, "q1_window"] = first_quartile.values df_n.loc[:, "q3_window"] = upper_quartile.values # Assign base limits to test_ids with a single instance df_one.loc[:, "limit_Lsl"] = df_one["PassRangeLsl"].values df_one.loc[:, "limit_Usl"] = df_one["PassRangeUsl"].values df_one.loc[:, "q1_window"] = "N/A" df_one.loc[:, "q3_window"] = "N/A" df_result = pd.concat([df_n, df_one]) return df_result[return_columns] def stdev_sample_compute( self, df_cumulative_raw: pd.DataFrame, return_columns: List ) -> pd.DataFrame: """ Updates the lower and upper limits following this formula: LSL = Mean - <multiplier> * Sigma USL = Mean + <multiplier> * Sigma Args: df_cumulative_raw: DataFrame with base values and columns return_columns: Columns to filter returned dataframe on Return: df_stdev: DataFrame with calculated stdev and specific return columns """ df_stdev = df_cumulative_raw[ (df_cumulative_raw["DPAT_Type"] == "STDEV") & (df_cumulative_raw["nbr_executions"] > 1) ] df_stdev_one = df_cumulative_raw[ (df_cumulative_raw["DPAT_Type"] == "STDEV") & (df_cumulative_raw["nbr_executions"] <= 1) ] df_stdev.loc[:, "stdev"] = ( ( df_stdev["nbr_executions"] * df_stdev["sum_of_squares"] - df_stdev["sum"] * df_stdev["sum"] ) / ( df_stdev["nbr_executions"] * (df_stdev["nbr_executions"] - 1) ) ).pow(1./2).values mean = ( df_stdev["sum"] / df_stdev["nbr_executions"] ) df_stdev.loc[:, "limit_Lsl"] = (mean - (df_stdev["stdev"] * df_stdev["DPAT_Sigma"])).values df_stdev.loc[:, "limit_Usl"] = (mean + (df_stdev["stdev"] * df_stdev["DPAT_Sigma"])).values df_stdev_one.loc[:, "limit_Lsl"] = df_stdev_one["PassRangeLsl"].values df_stdev_one.loc[:, "limit_Usl"] = df_stdev_one["PassRangeUsl"].values df_stdev = pd.concat([df_stdev, df_stdev_one])[return_columns] return df_stdev def compute( self, df_base_limits: pd.DataFrame, df_cumulative_raw: pd.DataFrame, args: Dict ) -> pd.DataFrame: """Compute new limits with standard deviation and IQR methods Args: df_base_limits: DataFrame with base limits and parameters df_cumulative_raw: DataFrame with base values and columns args: Dictionary with args parameters set from user Returns: new_limits: DataFrame with new limits after compute """ self.logger.info("=>Starting Compute") # define base columns and fill with NANs df_cumulative_raw["stdev_all"] = np.nan df_cumulative_raw["stdev_window"] = np.nan df_cumulative_raw["stdev"] = np.nan df_cumulative_raw["q1_all"] = np.nan df_cumulative_raw["q3_all"] = np.nan df_cumulative_raw["q1_window"] = np.nan df_cumulative_raw["q3_window"] = np.nan df_cumulative_raw["N"] = np.nan df_cumulative_raw["limit_Lsl"] = np.nan df_cumulative_raw["limit_Usl"] = np.nan return_columns = [ "TestId", "stdev_all", "stdev_window", "stdev", "q1_all", "q3_all", "q1_window", "q3_window", "N", "limit_Lsl", "limit_Usl", "PassRangeUsl", "PassRangeLsl" ] in_time = time.time() # Updates the lower and upper limits following this formula: # LSL = Mean - <multiplier> * Sigma (sample) # USL = Mean + <multiplier> * Sigma (sample) df_stdev = self.stdev_sample_compute(df_cumulative_raw, return_columns) # Updates the lower and upper limits according to sigma and mean df_stdev_full = df_cumulative_raw[df_cumulative_raw["DPAT_Type"] == "STDEV_FULL"] df_stdev_full = self.stdev_full_compute(df_stdev_full, return_columns) # Updates limits after getting a sample of <window_size>. df_stdev_window = df_cumulative_raw[ df_cumulative_raw["DPAT_Type"] == "STDEV_RUNNING_WINDOW" ] df_stdev_window = self.stdev_window_compute(df_stdev_window, return_columns) # Calculates limits based on Interquartile range. # IQR = Upper Quartile(Q3) - Lower Quartile(Q1) # LSL = Q1 - 1.5 IQR # USL = Q3 + 1.5 IQR df_iqr_full = df_cumulative_raw[df_cumulative_raw["DPAT_Type"] == "IQR_FULL"] df_iqr_full = self.iqr_full_compute(df_iqr_full, return_columns) # Gets a sample of the dataframe of <window_size> and then uses IQR df_iqr_window = df_cumulative_raw[df_cumulative_raw["DPAT_Type"] == "IQR_RUNNING_WINDOW"] df_iqr_window = self.iqr_window_compute(df_iqr_window, return_columns) # Concatenates the 5 methods in one dataframe new_limits_df = pd.concat( [df_stdev, df_stdev_full, df_stdev_window, df_iqr_full, df_iqr_window] ).sort_values(by = "TestId").reset_index(drop=True) # Check for test_ids that don't meet the criteria and are off limits test_ids_off_limits = self.check_new_limits(new_limits_df.merge(df_base_limits)) if len(test_ids_off_limits) > 0: # Sets the off limits ids to the base values new_limits_df.loc[new_limits_df.TestId.isin(test_ids_off_limits), "limit_Usl"] = \ new_limits_df["PassRangeUsl"] new_limits_df.loc[new_limits_df.TestId.isin(test_ids_off_limits), "limit_Lsl"] = \ new_limits_df["PassRangeLsl"] new_limits_df["PassRangeLsl"] = new_limits_df["limit_Lsl"] new_limits_df["PassRangeUsl"] = new_limits_df["limit_Usl"] # Prepare dataframe with return format new_limits_df = new_limits_df.merge( df_cumulative_raw[["TestId", "TestSuiteName", "Testname", "pins"]].drop_duplicates(), on = "TestId", how="left" ) print(new_limits_df) # Pick return columns and drop duplicates new_limits_df = new_limits_df[[ "TestId", "TestSuiteName", "Testname", "pins", "PassRangeLsl", "PassRangeUsl", ]].drop_duplicates() out_time = time.time() self.logger.info("Compute Setup Test Time=%.3f} sec" % (out_time - in_time)) # Parameter set by user, persists result in a file if args.get("saveStat"): self.save_stat( new_limits_df, df_cumulative_raw, args ) self.logger.info("=> End Compute") return new_limits_df def format_result(self, result_df: pd.DataFrame) -> str: """Convert dataframe to string with variables to be used in VariableControl Follows this format: TESTID0,value TESTID1,value TESTID2,value """ new_limits_df = result_df.copy() new_limits_df.sort_values(by="TestId").reset_index(drop=True, inplace=True) new_limits_df["test_id_var"] = "TestId" + new_limits_df.index.astype(str) + "," + \ new_limits_df["TestId"].astype(str) test_ids = ' '.join(new_limits_df["test_id_var"]) new_limits_df["limits_lsl"] = "limit_lsl." + new_limits_df["TestId"].astype(str) + \ "," + new_limits_df["PassRangeLsl"].astype(str) new_limits_df["limits_usl"] = "limit_usl." + new_limits_df["TestId"].astype(str) + \ "," + new_limits_df["PassRangeUsl"].astype(str) limits_lsl = ' '.join(new_limits_df["limits_lsl"]) limits_usl = ' '.join(new_limits_df["limits_usl"]) variable_command = limits_lsl + " " + test_ids + " " + limits_lsl + " " + limits_usl return variable_command def datalog(self,) -> str: """Prepare data and compute new limits Returns: variable_command: formatted command with new_limits after compute """ nexus_data = self.nexus_data df_base_limits = self.df_base_limits df_cumulative_raw = self.df_cumulative_raw args = self.args self.logger.info("=> Starting Datalog") start = time.time() # Read string from nexus. Note that this does not read a csv file. current_result_datalog = nexus_data # Prepare base values and columns df_cumulative_raw = df_cumulative_raw[["TestId", "sum", "sum_of_squares", "nbr_executions"]] df_cumulative_raw = df_cumulative_raw.merge(current_result_datalog, on="TestId") df_cumulative_raw["nbr_executions"] = df_cumulative_raw["nbr_executions"] + 1 df_cumulative_raw["sum"] = df_cumulative_raw["sum"] + df_cumulative_raw["PinValue"] df_cumulative_raw["sum_of_squares"] = ( df_cumulative_raw["sum_of_squares"] + df_cumulative_raw["PinValue"] * df_cumulative_raw["PinValue"] ) df_cumulative_raw = df_cumulative_raw.merge( df_base_limits, on="TestId", how="left" ) # Compute new limits according to stdev and IQR methods new_limits_df = self.compute( df_base_limits, df_cumulative_raw, args ) end = time.time() self.logger.info( "Total Setup/Parallel Computation and Return result Test Time=%f" % (end-start) ) self.logger.info("=> End of Datalog:") # Format the result in Nexus Variable format return new_limits_df.copy(), self.format_result(new_limits_df) def preprocess_data(self, base_limits: pd.DataFrame) -> pd.DataFrame: """Initialize dataframes with base values before they can be computed Args: base_limits: Dict that contains base limits and parameters, extracted from json file Returns: df_base_limits: DataFrame with base_limits and parameters cumulative_statistics: DataFrame with base columns and values to be used in compute """ self.logger.info("=>Beginning of Init") df_base_limits = pd.DataFrame.from_dict(base_limits, orient="index") df_base_limits["TestId"] = df_base_limits.index.astype(int) df_base_limits.reset_index(inplace=True, drop=True) # Pick only columns of interest cumulative_statistics = df_base_limits[ ["TestId", "PassRangeUsl", "PassRangeLsl", "DPAT_Sigma"] ].copy() # Set 0 to base columns cumulative_statistics["nbr_executions"] = 0 cumulative_statistics["sum_of_squares"] = 0 cumulative_statistics["sum"] = 0 self.logger.info("=>End of Init") return df_base_limits, cumulative_statistics
workdir/oneapi.py:
By using OneAPI, this code performs the following functions:
Retrieves test data from the Host Controller and sends it to the DPAT algorithm.
Receives the DPAT results from the algorithm and sends them back to the Host Controller.
Click to expand!
from liboneAPI import Interface from liboneAPI import AppInfo from sample import SampleMonitor from sample import sendCommand import signal import sys import pandas as pd from AdvantestLogging import logger def send_command(result_str, command): # name = command; # param = "DriverEvent LotStart" # sendCommand(name, param) name = command param = "DriverEvent LotStart" sendCommand(name, param) name = command param = "Config Enabled=1 Timeout=10" sendCommand(name, param) name = command param = "Set " + result_str sendCommand(name, param) logger.info(result_str) class OneAPI: def __init__(self, callback_fn, callback_args): self.callback_args = callback_args self.get_suite_config(callback_args["config_path"]) self.monitor = SampleMonitor( callback_fn=callback_fn, callback_args=self.callback_args ) def get_suite_config(self, config_path): # conf/test_suites.ini test_suites = [] with open(config_path) as f: for line in f: li = line.strip() if not li.startswith("#"): test_suites.append(li.split(",")) self.callback_args["test_suites"] = pd.DataFrame( test_suites, columns=["testNumber", "testName"] ) def start( self, ): signal.signal(signal.SIGINT, quit) logger.info("Press 'Ctrl + C' to exit") me = AppInfo() me.name = "sample" me.vendor = "adv" me.version = "2.1.0" Interface.registerMonitor(self.monitor) NexusDataEnabled = True # whether to enable Nexus Data Streaming and Control TPServiceEnabled = ( True # whether to enable TPService for communication with NexusTPI ) res = Interface.connect(me, NexusDataEnabled, TPServiceEnabled) if res != 0: logger.info(f"Connect fail. code = {res}") sys.exit() logger.info("Connect succeed.") signal.signal(signal.SIGINT, quit) while True: signal.pause() def quit( self, ): res = Interface.disconnect() if res != 0: logger.info(f"Disconnect fail. code = {res}") sys.exit()
workdir/sample.py:
Click to expand!
from liboneAPI import Monitor from liboneAPI import DataType from liboneAPI import TestCell from liboneAPI import Command from liboneAPI import Interface from liboneAPI import toHead from liboneAPI import toSite from liboneAPI import DFFData from liboneAPI import QueryResponse from AdvantestLogging import logger import os import sys import pandas as pd import json import re import threading import time def dff_query(condition: str) -> dict: dffData = {} query_res = QueryResponse() query_res = DFFData.createRawQueryRequest(condition) if query_res.code == DFFData.SUCCEED: jobID = query_res.result if query_res.code == DFFData.COMMON_ERROR: return dffData # error handling: response.errmsg while True: query_res = DFFData.getQueryTaskStatus(jobID) if query_res.code != DFFData.SUCCEED: return dffData # error handling: response.errmsg if query_res.result == "RUNNING": pass # need sleep a while then continue if query_res.result == "COMPLETED": query_res = DFFData.getQueryResult(jobID, "json") if query_res.code != DFFData.SUCCEED: pass # error handling: response.errmsg else : dffData = query_res.result break return dffData def sendCommand(cmd_name, cmd_param): logger.info(f"Command -- send {cmd_name}") tc = TestCell() cmd = Command() cmd.param = cmd_param cmd.name = cmd_name cmd.reason = f"test {cmd.name} command" res = Interface.sendCommand(tc, cmd) if res != 0: logger.info("Send command fail. code = ") class SampleMonitor(Monitor): def __init__(self, callback_fn, callback_args): Monitor.__init__(self) self.mTouchdownCnt = 0 self.result_test_suite = [] self.results_df = pd.DataFrame() self.callback_fn = callback_fn self.callback_args = callback_args self.callback_thread = None # To keep track of the thread self.test_suites = callback_args["test_suites"] self.ecid = None self.time_nanosec = 0 self.stime_nanosec = 0 self.ratime_nanosec = 0 self.atime_nanosec = 0 self.dff_flag = os.environ.get("DFF_FLAG", "false").lower() in ('true', 't') self.dff_dict = {} self.dff_skip_ls = [] def consumeLotStart(self, data): logger.info(sys._getframe().f_code.co_name) self.mTouchdownCnt = 0 # Get SmarTest version smt_ver_info = data.get_TesterosVersion() print("VERSION:" + str(data.get_TesterosVersion())) self._smt_os_version = re.search(r"\d+", smt_ver_info).group() print("VERSION:" + self._smt_os_version) if self._smt_os_version == "8": # for smarTest8 self.callback_args["VariableControl"] = False elif self._smt_os_version == "7": self.callback_args["VariableControl"] = True self.lot_id = data.get_LotId() # logger.info(f"get_Timezone = {data.get_Timezone()}") # logger.info(f"get_SetupTime = {data.get_SetupTime()}") logger.debug(f"get_TimeStamp = {data.get_TimeStamp()}") # logger.info(f"get_StationNumber = {data.get_StationNumber()}") # logger.info(f"get_ModeCode = {data.get_ModeCode()}") # logger.info(f"get_RetestCode = {data.get_RetestCode()}") # logger.info(f"get_ProtectionCode = {data.get_ProtectionCode()}") # logger.info(f"get_BurnTimeMinutes = {data.get_BurnTimeMinutes()}") # logger.info(f"get_CommandCode = {data.get_CommandCode()}") logger.info(f"get_LotId = {data.get_LotId()}") # logger.info(f"get_PartType = {data.get_PartType()}") # logger.info(f"get_NodeName = {data.get_NodeName()}") # logger.info(f"get_TesterType = {data.get_TesterType()}") # logger.info(f"get_JobName = {data.get_JobName()}") # logger.info(f"get_JobRevision = {data.get_JobRevision()}") # logger.info(f"get_SublotId = {data.get_SublotId()}") # logger.info(f"get_OperatorName = {data.get_OperatorName()}") # logger.info(f"get_TesterosType = {data.get_TesterosType()}") # logger.info(f"get_TesterosVersion = {data.get_TesterosVersion()}") # logger.info(f"get_TestType = {data.get_TestType()}") # logger.info(f"get_TestStepCode = {data.get_TestStepCode()}") # logger.info(f"get_TestTemperature = {data.get_TestTemperature()}") # logger.info(f"get_UserText = {data.get_UserText()}") # logger.info(f"get_AuxiliaryFile = {data.get_AuxiliaryFile()}") # logger.info(f"get_PackageType = {data.get_PackageType()}") # logger.info(f"get_FamilyId = {data.get_FamilyId()}") # logger.info(f"get_DateCode = {data.get_DateCode()}") # logger.info(f"get_FacilityId = {data.get_FacilityId()}") # logger.info(f"get_FloorId = {data.get_FloorId()}") # logger.info(f"get_ProcessId = {data.get_ProcessId()}") # logger.info(f"get_OperationFreq = {data.get_OperationFreq()}") # logger.info(f"get_SpecName = {data.get_SpecName()}") # logger.info(f"get_SpecVersion = {data.get_SpecVersion()}") # logger.info(f"get_FlowId = {data.get_FlowId()}") # logger.info(f"get_SetupId = {data.get_SetupId()}") # logger.info(f"get_DesignRevision = {data.get_DesignRevision()}") # logger.info(f"get_EngineeringLotId = {data.get_EngineeringLotId()}") # logger.info(f"get_RomCode = {data.get_RomCode()}") # logger.info(f"get_SerialNumber = {data.get_SerialNumber()}") # logger.info(f"get_SupervisorName = {data.get_SupervisorName()}") # logger.info(f"get_HeadNumber = {data.get_HeadNumber()}") # logger.info(f"get_SiteGroupNumber = {data.get_SiteGroupNumber()}") # headsiteList = data.get_TotalHeadSiteList() # logger.info("get_TotalHeadSiteList:", end="") # for headsite in headsiteList: # logger.info(f" {headsite}", end="") # logger.info("") # logger.info(f"get_ProberHandlerType = {data.get_ProberHandlerType()}") # logger.info(f"get_ProberHandlerId = {data.get_ProberHandlerId()}") # logger.info(f"get_ProbecardType = {data.get_ProbecardType()}") # logger.info(f"get_ProbecardId = {data.get_ProbecardId()}") # logger.info(f"get_LoadboardType = {data.get_LoadboardType()}") # logger.info(f"get_LoadboardId = {data.get_LoadboardId()}") # logger.info(f"get_DibType = {data.get_DibType()}") # logger.info(f"get_DibId = {data.get_DibId()}") # logger.info(f"get_CableType = {data.get_CableType()}") # logger.info(f"get_CableId = {data.get_CableId()}") # logger.info(f"get_ContactorType = {data.get_ContactorType()}") # logger.info(f"get_ContactorId = {data.get_ContactorId()}") # logger.info(f"get_LaserType = {data.get_LaserType()}") # logger.info(f"get_LaserId = {data.get_LaserId()}") # logger.info(f"get_ExtraEquipType = {data.get_ExtraEquipType()}") # logger.info(f"get_ExtraEquipId = {data.get_ExtraEquipId()}") def consumeLotEnd(self, data): logger.info(sys._getframe().f_code.co_name) self.mTouchdownCnt = 0 logger.debug(f"get_TimeStamp = {data.get_TimeStamp()}") # logger.info(f"get_DisPositionCode = {data.get_DisPositionCode()}") # logger.info(f"get_UserDescription = {data.get_UserDescription()}") # logger.info(f"get_ExecDescription = {data.get_ExecDescription()}") def consumeWaferStart(self, data): logger.info(sys._getframe().f_code.co_name) self.mTouchdownCnt = 0 self.dff_dict = {} self.dff_skip_ls = [] logger.debug(f"get_TimeStamp = {data.get_TimeStamp()}") # logger.info(f"get_WaferSize = {data.get_WaferSize()}") # logger.info(f"get_DieHeight = {data.get_DieHeight()}") # logger.info(f"get_DieWidth = {data.get_DieWidth()}") # logger.info(f"get_WaferUnits = {data.get_WaferUnits()}") # logger.info(f"get_WaferFlat = {data.get_WaferFlat()}") # logger.info(f"get_CenterX = {data.get_CenterX()}") # logger.info(f"get_CenterY = {data.get_CenterY()}") # logger.info(f"get_PositiveX = {data.get_PositiveX()}") # logger.info(f"get_PositiveY = {data.get_PositiveY()}") # logger.info(f"get_HeadNumber = {data.get_HeadNumber()}") # logger.info(f"get_SiteGroupNumber = {data.get_SiteGroupNumber()}") # logger.info(f"get_WaferId = {data.get_WaferId()}") def consumeWaferEnd(self, data): logger.info(sys._getframe().f_code.co_name) self.mTouchdownCnt = 0 logger.debug(f"get_TimeStamp = {data.get_TimeStamp()}") # logger.info(f"get_HeadNumber = {data.get_HeadNumber()}") # logger.info(f"get_SiteGroupNumber = {data.get_SiteGroupNumber()}") # logger.info(f"get_TestedCount = {data.get_TestedCount()}") # logger.info(f"get_RetestedCount = {data.get_RetestedCount()}") # logger.info(f"get_GoodCount = {data.get_GoodCount()}") # logger.info(f"get_WaferId = {data.get_WaferId()}") # logger.info(f"get_UserDescription = {data.get_UserDescription()}") # logger.info(f"get_ExecDescription = {data.get_ExecDescription()}") def consumeTestStart(self, data): logger.info(sys._getframe().f_code.co_name) logger.debug(f"get_TimeStamp = {data.get_TimeStamp()}") # headsiteList = data.get_HeadSiteList() # logger.info("get_HeadSiteList:", end="") # for headsite in headsiteList: # logger.info(f" {liboneAPI.toHead(headsite)}|{liboneAPI.toSite(headsite)}", end="") # logger.info("") def consumeTestEnd(self, data): logger.info(sys._getframe().f_code.co_name) self.mTouchdownCnt += 1 logger.debug(f"get_TimeStamp = {data.get_TimeStamp()}") # cnt = data.get_ResultCount() # logger.info(f"get_ResultCount = {cnt}") # for index in range(0, cnt): # tempU32 = data.query_HeadSite(index) # logger.info(f"Head = {liboneAPI.toHead(tempU32)} Site = {liboneAPI.toSite(tempU32)}") # logger.info(f"query_PartFlag = {data.query_PartFlag(index)}") # logger.info(f"query_NumOfTest = {data.query_NumOfTest(index)}") # logger.info(f"query_SBinResult = {data.query_SBinResult(index)}") # logger.info(f"query_HBinResult = {data.query_HBinResult(index)}") # logger.info(f"query_XCoord = {data.query_XCoord(index)}") # logger.info(f"query_YCoord = {data.query_YCoord(index)}") # logger.info(f"query_TestTime = {data.query_TestTime(index)}") # logger.info(f"query_PartId = {data.query_PartId(index)}") # logger.info(f"query_PartText = {data.query_PartText(index)}") def consumeTestFlowStart(self, data): logger.info(sys._getframe().f_code.co_name) logger.debug(f"get_TimeStamp = {data.get_TimeStamp()}") # logger.info(f"get_TestFlowName = {data.get_TestFlowName()}") def consumeTestFlowEnd(self, data): logger.info(sys._getframe().f_code.co_name) logger.debug(f"get_TimeStamp = {data.get_TimeStamp()}") # logger.info(f"get_TestFlowName = {data.get_TestFlowName()}") def consumeParametricTest(self, data): logger.info(sys._getframe().f_code.co_name) cnt = data.get_ResultCount() # logger.info(f"get_ResultCount = {cnt}") for index in range(0, cnt): tempU32 = data.query_HeadSite(index) # logger.info(f"Head = {toHead(tempU32)} Site = {toSite(tempU32)}") # logger.info(f"query_TestNumber = {data.query_TestNumber(index)}") # logger.info(f"query_TestText = {data.query_TestText(index)}") # logger.info(f"query_LowLimit = {data.query_LowLimit(index)}") # logger.info(f"query_HighLimit = {data.query_HighLimit(index)}") # logger.info(f"query_Unit = {data.query_Unit(index)}") # logger.info(f"query_TestFlag = {data.query_TestFlag(index)}") # logger.info(f"query_Result = {data.query_Result(index)}") test_num = data.query_TestNumber(index) if self._smt_os_version == "8": # for smarTest8 test_name = data.query_TestSuite(index) else: test_name = data.query_TestText(index).split(":")[0] result = data.query_Result(index) row = [test_num, test_name, result] # use only specific test_names and test_numbers define in config file # if test_num in set(self.test_suites.testNumber) or test_name in set(self.test_suites.testName): self.result_test_suite.append(row) if self.result_test_suite: # set result test suite as a dataframe test_suite_df = pd.DataFrame(self.result_test_suite, columns=["TestId", "testName", "PinValue"]) # get previous lots data to calculate limit if self.dff_flag and test_num not in self.dff_skip_ls: DPAT_Samples = 5 # default value # get DPAT Sample from base limit if it exists try: if test_name in self.callback_args["baseLimits"].keys(): DPAT_Samples = int(self.callback_args["baseLimits"][test_name]["DPAT_Samples"]) except Exception as e: logger.error(f"Cannot retrieve the DPAT_Samples of {test_name}, {e}") # get previous data using DFF if n_sample is less than DPAT Sample if (test_suite_df["TestId"] == test_num).sum() < DPAT_Samples: # retrieve previous data if it hasn't been fetched before if test_num not in self.dff_dict.keys(): dff_data_ls:list = self.consumeDFFData(test_num) self.dff_dict[test_num] = dff_data_ls dff_data:list = self.dff_dict[test_num] # concatenate previous data to test_suite_df if data is not empty if dff_data: # skip the test number if dff data has already been retrieved self.dff_skip_ls.append(test_num) # append the previous data to result test suite for result in dff_data: self.result_test_suite.append([test_num, test_name, result]) # concatenate previous data df = pd.DataFrame({ "TestId": [test_num] * len(dff_data), "testName": [test_name] * len(dff_data), "PinValue": dff_data }) test_suite_df = pd.concat([test_suite_df, df], axis=0) # execute DPAT using test_suite_df self.callback_thread = self.threaded_callback( self.callback_fn, test_suite_df, self.callback_args ) def consumeFunctionalTest(self, data): logger.info(sys._getframe().f_code.co_name) # cnt = data.get_ResultCount() # logger.info(f"get_ResultCount = {cnt}") # for index in range(0, cnt): # tempU32 = data.query_HeadSite(index) # logger.info(f"Head = {liboneAPI.toHead(tempU32)} Site = {liboneAPI.toSite(tempU32)}") # logger.info(f"query_TestNumber = {data.query_TestNumber(index)}") # logger.info(f"query_TestText = {data.query_TestText(index)}") # logger.info(f"query_TestFlag = {data.query_TestFlag(index)}") # logger.info(f"query_CycleCount = {data.query_CycleCount(index)}") # logger.info(f"query_NumberFail = {data.query_NumberFail(index)}") # failpins = data.query_FailPins(index) # logger.info("query_FailPins:", end="") # for p in failpins: # logger.info(f" {p}", end="") # logger.info("") # logger.info(f"query_VectNam = {data.query_VectNam(index)}") def consumeMultiParametric(self, data): logger.info(sys._getframe().f_code.co_name) # cnt = data.get_ResultCount() # logger.info(f"get_ResultCount = {cnt}") # for index in range(0, cnt): # tempU32 = data.query_HeadSite(index) # logger.info(f"Head = {liboneAPI.toHead(tempU32)} Site = {liboneAPI.toSite(tempU32)}") # logger.info(f"query_TestNumber = {data.query_TestNumber(index)}") # logger.info(f"query_TestText = {data.query_TestText(index)}") # logger.info(f"query_LowLimit = {data.query_LowLimit(index)}") # logger.info(f"query_HighLimit = {data.query_HighLimit(index)}") # logger.info(f"query_Unit = {data.query_Unit(index)}") # logger.info(f"query_TestFlag = {data.query_TestFlag(index)}") # results = data.query_Results(index) # logger.info(f"query_Results: Cnt({len(results)})", end="") # for r in results: # logger.info(f" {r}", end="") # logger.info("") def consumeDeviceData(self, data): logger.info(sys._getframe().f_code.co_name) # logger.info(f"get_TestProgramDir = {data.get_TestProgramDir()}") # logger.info(f"get_TestProgramName = {data.get_TestProgramName()}") # logger.info(f"get_TestProgramPath = {data.get_TestProgramPath()}") # logger.info(f"get_PinConfig = {data.get_PinConfig()}") # logger.info(f"get_ChannelAttribute = {data.get_ChannelAttribute()}") # cnt = data.get_BinInfoCount() # logger.info(f"get_BinInfoCount = {cnt}") # for index in range(0, cnt): # logger.info(f"SBin({data.query_SBinNumber(index)})", end="") # logger.info(f" Name({data.query_SBinName(index)})", end="") # logger.info(f" Type({data.query_SBinType(index)})", end="") # logger.info(f" HBin({data.query_HBinNumber(index)})", end="") # logger.info(f" Name({data.query_HBinName(index)})", end="") # logger.info(f" Type({data.query_HBinType(index)})", end="") # logger.info("") def consumeUserDefinedData(self, data): logger.info(sys._getframe().f_code.co_name) # userDefined = data.get_UserDefined() # for key,value in userDefined.items(): # logger.info(f"[{key}] = {value}") def consumeFile(self, data): logger.info(sys._getframe().f_code.co_name) # logger.info(f"get_FileName = {data.get_FileName()}") def consumeDatalogText(self, data): logger.info(sys._getframe().f_code.co_name) logger.debug(f"get_TimeStamp = {data.get_TimeStamp()}") # logger.info(f"get_DataLogText = {data.get_DataLogText()}") def consumeDFFData(self, test_number: str) -> list: result:list = [] sql_query:str = f""" SELECT lot_id, x_coord, y_coord, p.result, p.low_limit, p.high_limit FROM rtdi_table t LATERAL VIEW explode(t.ptr) as p WHERE p.test_number = {test_number} AND lot_id= '{self.lot_id}' """ dff_data:dict = dff_query(sql_query) # get previous result data if it's not empty if dff_data: parsed_data:dict = json.loads(dff_data) result:list = pd.DataFrame(parsed_data)["result"].tolist() logger.info(f"Retrieve previous data from DFF, data = {result}") else: logger.info("There is no previous data available from DFF") logger.debug(f"The query is '{sql_query}'") return result def consumeData(self, tc, data): try: datatype = data.getType() if datatype == DataType.DATA_TYP_PRODUCTION_LOTSTART: self.consumeLotStart(data) elif datatype == DataType.DATA_TYP_PRODUCTION_LOTEND: if self.callback_thread: self.callback_thread.join() # Wait for the callback thread to finish self.consumeLotEnd(data) elif datatype == DataType.DATA_TYP_PRODUCTION_WAFERSTART: self.consumeWaferStart(data) elif datatype == DataType.DATA_TYP_PRODUCTION_WAFEREND: self.consumeWaferEnd(data) elif datatype == DataType.DATA_TYP_PRODUCTION_TESTSTART: self.consumeTestStart(data) elif datatype == DataType.DATA_TYP_PRODUCTION_TESTEND: self.consumeTestEnd(data) elif datatype == DataType.DATA_TYP_PRODUCTION_TESTFLOWSTART: self.consumeTestFlowStart(data) elif datatype == DataType.DATA_TYP_PRODUCTION_TESTFLOWEND: self.consumeTestFlowEnd(data) elif datatype == DataType.DATA_TYP_MEASURED_PARAMETRIC: self.consumeParametricTest(data) elif datatype == DataType.DATA_TYP_MEASURED_FUNCTIONAL: self.consumeFunctionalTest(data) elif datatype == DataType.DATA_TYP_MEASURED_MULTI_PARAM: self.consumeMultiParametric(data) elif datatype == DataType.DATA_TYP_DEVICE: self.consumeDeviceData(data) elif datatype == DataType.DATA_TYP_USERDEFINED: self.consumeUserDefinedData(data) elif datatype == DataType.DATA_TYP_FILE: self.consumeFile(data) elif datatype == DataType.DATA_TYP_DATALOGTEXT: self.consumeDatalogText(data) else: logger.error(f"Unknown data type: {datatype.name}, data number: {datatype.value}") except Exception as e: logger.error(f"Something went wrong while executing the data handler, dataType = {datatype.name}", exc_info=True) logger.error(f"Error message: {e}") def consumeTPSend(self, tc, data): self.time_nanosec = time.time_ns() logger.info(f"Received data from {tc.testerId}, length is {len(data)} ") logger.info(f"Received data from {tc.testerId}, string is {data} ") lastdata = data datastr = str(data) if "SKU_NAME" in data: jsonObj = json.loads(datastr) logger.info(datastr) for key in jsonObj: logger.debug(key, " : ", jsonObj[key]) logger.debug(f"TPRequest: dataval= {jsonObj[key]}") dataval = jsonObj[key] for subkey in dataval: logger.debug(f"subkey= {subkey}") logger.debug(f"subval= {jsonObj[key][subkey]}") self.ecid = jsonObj["SKU_NAME"]["MSFT_ECID"] logger.info(f"TPSend: {self.ecid=}") self.testname = jsonObj["SKU_NAME"]["testname"] logger.info(f"TPSend: {self.testname=}") # print("TPSend: hosttime= ",hosttime) packetstr = jsonObj["SKU_NAME"]["packetString"] logger.info(f"TPSend: {packetstr=}") rawvalue = jsonObj["SKU_NAME"]["RawValue"] logger.info(f"TPSend: {rawvalue=}") lolim = jsonObj["SKU_NAME"]["LowLim"] logger.info(f"TPSend: {lolim=}") hilim = jsonObj["SKU_NAME"]["HighLim"] logger.info(f"TPSend: {hilim=}") self.stime_nanosec = time.time_ns() return def consumeTPRequest(self, tc, request): logger.info(f"consumeTPRequest Start") logger.info(f"Received request from {tc.testerId}, command is {request}") # Parse the JSON request jsonObj = json.loads(request) action = jsonObj.get("request") # Initialize variables with example values (replace with actual values as needed) adjHBin = -1 self.ratime_nanosec = time.time_ns() if action == "health": print("TPRqst: processing health") response = "success" elif action == "CalcNewLimits" and self.ecid and "PassRangeLsl" in self.results_df: # Construct the dictionary with all the fields edge_min_value = self.results_df["PassRangeLsl"].iloc[-1] edge_max_value = self.results_df["PassRangeUsl"].iloc[-1] selfatime_nanosec = time.time_ns() response_dict = { "MSFT_ECID": self.ecid, "testname": self.testname, "AdjLoLim": str(edge_min_value), "AdjHiLim": str(edge_max_value), "AdjHBin": str(adjHBin), "SendAppTime": str(self.time_nanosec), "SendAppRetTime": str(self.stime_nanosec), "RqstAppTime": str(self.ratime_nanosec), "RqstAppRetTime": str(self.atime_nanosec), } # TODO: Clarify what each of these time requests mean. # Convert the dictionary to a JSON string using json.dumps response = json.dumps(response_dict) else: response = json.dumps({"response": "empty or unsupported action"}) logger.info(f"Response: {response}") return response def threaded_callback(self, callback_fn, dataframe, callback_args): thread = threading.Thread( target=callback_fn, args=(dataframe, callback_args, self.save_result) ) thread.start() return thread def save_result(self, df): self.results_df = df
Build image
cd ~/apps/application-dpat-v3.1.0/rd-app_dpat_py sudo docker build ./ --tag=registry.advantest.com/acsqa-dpat-gemini/dpat:ExampleTag
Push image
sudo docker push registry.advantest.com/acsqa-dpat-gemini/dpat:ExampleTag
Create App descriptor project on the Container Hub#
Login to the ACS Container Hub using the org admin account.
Open “Applications” -> “1:N Application Descriptors” page, click “New Application Descriptor” button
Replicate the new created App Descriptor to Unified Server on Harbor#
Submit feedback for app descriptor running request.#
Click “FEEDBACK” on the right top of “Virtual Machine” page.
In the popped up “Feedback” dialog
add Summary
app descriptor running on Unified Server request
add Description
Host Controller name: XXXX private IP: XXX.XXX.XXX.XXX Unified Server: name: XXXX private IP: XXX.XXX.XXX.XXX Container hub org: XXXX App descriptor name: XXXX Contact me by: <email address>
Click “Submit”
Once the request is completed, the DPAT application will run on the Unified Server.
Configure acs_nexus service#
Edit /opt/acs/nexus/conf/acs_nexus.ini file.
Make sure the Auto_Deploy option is false
[Auto_Deploy] Enabled=false
Make sure the Enabled option is true in TestFloor_Server section
[TestFloor_Server] Enabled=true Control_Port=5001 Data_Port=5002 Heartbeat_Cycle=10 ;; unit s
Restart acs_nexus service
sudo systemctl restart acs_nexus
Note, please ensure you have switched to SmarTest8
Running the SmarTest Test Program#
Start SmarTest8
Run the script that starts SmartTest8:
cd ~/apps/application-dpat-v3.1.0 sh start_smt8.sh
Nexus TPI
You can find “NexusTPI.jar” in the project build path. This dynamic library is used for bi-directional communicating between Test program and Nexus.
Modify code to communicate DPAT app on Unified Server. You can find the involking NexusTPI codes in in the test method file measuredValueFileReader.java
Search and change the code below.
before:
NexusTPI.target("dpat-app").timeout(1);
after:
NexusTPI.location("aus").target("dpat-app").timeout(1);
The completed code is as follow.
Click to expand!
/**
*
*/
package misc;
import java.time.Instant;
import java.util.List;
import java.util.Random;
import org.json.JSONObject;
import base.SOCTestBase;
import nexus.tpi.NexusTPI;
import shifter.GlobVars;
import xoc.dta.annotations.In;
import xoc.dta.datalog.IDatalog;
import xoc.dta.datatypes.MultiSiteDouble;
import xoc.dta.measurement.IMeasurement;
import xoc.dta.resultaccess.IPassFail;
import xoc.dta.testdescriptor.IFunctionalTestDescriptor;
import xoc.dta.testdescriptor.IParametricTestDescriptor;
/**
* ACS Variable control and JSON write demo using SMT8 for Washington
*/
@SuppressWarnings("unused")
public class measuredValueFileReader extends SOCTestBase {
public IMeasurement measurement;
public IParametricTestDescriptor pTD;
public IFunctionalTestDescriptor fTD;
public IFunctionalTestDescriptor testDescriptor;
class params {
private String _upper;
private String _lower;
private String _unit;
public params(String _upper, String lower, String _unit) {
super();
this._upper = _upper;
this._lower = lower;
this._unit = _unit;
}
}
@In
public String testName;
String testtext = "";
String powerResult = "";
int hb_value;
double edge_min_value = 0.0;
double edge_max_value = 0.0;
double prev_edge_min_value = 0.0;
double prev_edge_max_value = 0.0;
boolean c0Flag = false;
boolean c1Flag = false;
String[] ecid_strs = {"U6A629_03_x1_y2", "U6A629_03_x1_y3", "U6A629_03_x1_y4", "U6A629_03_x2_y1", "U6A629_03_x2_y2", "U6A629_03_x2_y6", "U6A629_03_x2_y7", "U6A629_03_x3_y10", "U6A629_03_x3_y2", "U6A629_03_x3_y3", "U6A629_03_x3_y7", "U6A629_03_x3_y8", "U6A629_03_x3_y9", "U6A629_03_x4_y11", "U6A629_03_x4_y1", "U6A629_03_x4_y2", "U6A629_03_x4_y4", "U6A629_03_x5_y0", "U6A629_03_x5_y10", "U6A629_03_x5_y11", "U6A629_03_x5_y3", "U6A629_03_x5_y7", "U6A629_03_x5_y8", "U6A629_03_x6_y3", "U6A629_03_x6_y5", "U6A629_03_x6_y6", "U6A629_03_x6_y7", "U6A629_03_x7_y4", "U6A629_03_x7_y7", "U6A629_03_x8_y2", "U6A629_03_x8_y3", "U6A629_03_x8_y5", "U6A629_03_x8_y7", "U6A629_04_x1_y2", "U6A629_04_x2_y2", "U6A629_04_x2_y3", "U6A629_04_x2_y7", "U6A629_04_x2_y8", "U6A629_04_x2_y9", "U6A629_04_x3_y10", "U6A629_04_x3_y2", "U6A629_04_x3_y3", "U6A629_04_x3_y4", "U6A629_04_x3_y5", "U6A629_04_x3_y7", "U6A629_04_x3_y9", "U6A629_04_x4_y11", "U6A629_04_x4_y2", "U6A629_04_x4_y7", "U6A629_04_x4_y8", "U6A629_04_x4_y9", "U6A629_04_x5_y4", "U6A629_04_x5_y7", "U6A629_04_x6_y5", "U6A629_04_x6_y6", "U6A629_04_x6_y8", "U6A629_04_x7_y10", "U6A629_05_x2_y2", "U6A629_05_x3_y0", "U6A629_05_x4_y1", "U6A629_05_x4_y2", "U6A629_05_x5_y1", "U6A629_05_x5_y2", "U6A629_05_x5_y3", "U6A629_05_x6_y1", "U6A629_05_x6_y3", "U6A629_05_x7_y2", "U6A629_05_x7_y3", "U6A629_05_x8_y3", "U6A633_10_x4_y9", "U6A633_10_x8_y9", "U6A633_11_x2_y2", "U6A633_11_x3_y9", "U6A633_11_x5_y2", "U6A633_11_x6_y10", "U6A633_11_x6_y5", "U6A633_11_x7_y9", "U6A633_11_x8_y8", "U6A633_11_x8_y9", "U6A633_12_x2_y3", "U6A633_12_x2_y5", "U6A633_12_x2_y7", "U6A633_12_x2_y9", "U6A633_12_x3_y1", "U6A633_12_x3_y9", "U6A633_12_x4_y0", "U6A633_12_x5_y1", "U6A633_12_x7_y4"};
int ecid_ptr = 0;
long hnanosec = 0;
List<Double> current_values;
protected IPassFail digResult;
public String pinList;
@Override
public void update() {
}
@SuppressWarnings("static-access")
@Override
public void execute() {
int resh;
int resn;
int resx;
IDatalog datalog = context.datalog(); // use in case logDTR does not work
int index = 0;
Double raw_value = 0.0;
String rvalue = "";
String low_limitStr = "";
String high_limitStr = "";
Double low_limit = 0.0;
Double high_limit = 0.0;
String unit = "";
String packetValStr = "";
String sku_name = "";
String command = "";
/**
* This portion of the code will read an external datalog file in XML and compare data in
* index 0 to the limit variables received from Nexus
*/
if (testName.contains("RX_gain_2412_C0_I[1]")) {
try {
NexusTPI.location("aus").target("dpat-app").timeout(1);
Instant hinstant = Instant.now();
GlobVars.hsnanoSeconds = (hinstant.getEpochSecond()*1000000000) + hinstant.getNano();
System.out.println("HealthCheck Start time= "+GlobVars.hsnanoSeconds);
String jsonHRequest = "{\"health\":\"DoHealthCheck\"}";
int hres = NexusTPI.request(jsonHRequest);
System.out.println("BiDir-request Response= "+hres);
String hresponse = NexusTPI.getResponse();
System.out.println("BiDir:: getRequest Health Response:"+hresponse);
Instant instant = Instant.now();
GlobVars.henanoSeconds = (instant.getEpochSecond()*1000000000) + instant.getNano();
System.out.println("HealthCheck Stop time= "+GlobVars.henanoSeconds);
hparse(hresponse);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
if ((testName.contains("RX_gain_2412_C0_I[1]")) && (!c0Flag)) {
try {
misc.XMLParserforACS.params key = GlobVars.c0_current_params.get(testName);
} catch (Exception e) {
System.out.println("Missing entry in maps for this testname : " + testName);
return;
}
try {
GlobVars.c0_current_values = new XMLParserforACS().parseXMLandReturnRawValues(testName);
} catch (NullPointerException e) {
System.out.print("variable has null value, exiting.\n");
}
try {
GlobVars.c0_current_params = new XMLParserforACS().parseXMLandReturnLimitmap(testName);
} catch (NullPointerException e) {
System.out.print("variable has null value, exiting.\n");
}
c0Flag = true;
low_limit = Double.parseDouble(GlobVars.c0_current_params.get(testName).getLower());
high_limit = Double.parseDouble(GlobVars.c0_current_params.get(testName).getUpper());
unit = GlobVars.c0_current_params.get(testName).getUnit();
GlobVars.orig_LoLim = low_limit;
GlobVars.orig_HiLim = high_limit;
}
if ((testName.contains("RX_gain_2412_C1_I[1]")) && (!c1Flag)) {
try {
misc.XMLParserforACS.params key = GlobVars.c1_current_params.get(testName);
} catch (Exception e) {
System.out.println("Missing entry in maps for this testname : " + testName);
return;
}
try {
GlobVars.c1_current_values = new XMLParserforACS().parseXMLandReturnRawValues(testName);
} catch (NullPointerException e) {
System.out.print("variable has null value, exiting.\n");
}
try {
GlobVars.c1_current_params = new XMLParserforACS().parseXMLandReturnLimitmap(testName);
} catch (NullPointerException e) {
System.out.print("variable has null value, exiting.\n");
}
c1Flag = true;
low_limit = Double.parseDouble(GlobVars.c1_current_params.get(testName).getLower());
high_limit = Double.parseDouble(GlobVars.c1_current_params.get(testName).getUpper());
unit = GlobVars.c1_current_params.get(testName).getUnit();
}
if (testName.contains("RX_gain_2412_C1_I[1]")) {
index = GlobVars.c1_index;
raw_value = GlobVars.c1_current_values.get(index);
low_limit = Double.parseDouble(GlobVars.c1_current_params.get(testName).getLower());
high_limit = Double.parseDouble(GlobVars.c1_current_params.get(testName).getUpper());
unit = GlobVars.c1_current_params.get(testName).getUnit();
}
if (testName.contains("RX_gain_2412_C0_I[1]")) {
index = GlobVars.c0_index;
raw_value = GlobVars.c0_current_values.get(index);
low_limit = Double.parseDouble(GlobVars.c0_current_params.get(testName).getLower());
high_limit = Double.parseDouble(GlobVars.c0_current_params.get(testName).getUpper());
unit = GlobVars.c0_current_params.get(testName).getUnit();
}
if (testName.contains("RX_gain_2412_C0_I[1]")) {
int res;
try {
System.out.println("ECID["+ecid_ptr+"] = "+ecid_strs[ecid_ptr]);
packetValStr = buildPacketString();
rvalue = String.valueOf(raw_value);
low_limitStr = String.valueOf(low_limit);
high_limitStr = String.valueOf(high_limit);
sku_name = "{\"SKU_NAME\":{\"MSFT_ECID\":\""+ecid_strs[ecid_ptr]+"\",\"testname\":\""+testName+"\",\"RawValue\":\""+rvalue+"\",\"LowLim\":\""+low_limitStr+"\",\"HighLim\":\""+high_limitStr+"\",\"packetString\":\""+packetValStr+"\"}}";
System.out.println("\nJson String sku_name length= "+sku_name.length());
Instant instant = Instant.now();
GlobVars.ssnanoSeconds = (instant.getEpochSecond()*1000000000) + instant.getNano();
System.out.println("TPSend Start Time= "+GlobVars.ssnanoSeconds);
res = NexusTPI.send(sku_name);
Instant pinstant = Instant.now();
GlobVars.senanoSeconds = (pinstant.getEpochSecond()*1000000000) + pinstant.getNano();
System.out.println("TPSend Return Time= "+GlobVars.senanoSeconds);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
try {
long millis = 200;
Thread.sleep(millis);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
Instant ginstant = Instant.now();
GlobVars.rsnanoSeconds = (ginstant.getEpochSecond()*1000000000) + ginstant.getNano();
System.out.println("RqstHostTime TPRqst Start= "+GlobVars.rsnanoSeconds);
String jsonRequest = "{\"request\":\"CalcNewLimits\"}";
res = NexusTPI.request(jsonRequest);
System.out.println("BiDir-request Response= "+res);
String response = NexusTPI.getResponse();
System.out.println("BiDir-getRequest Response= "+response);
Instant rinstant = Instant.now();
GlobVars.renanoSeconds = (rinstant.getEpochSecond()*1000000000) + rinstant.getNano();
System.out.println("Request Return Time= "+GlobVars.renanoSeconds);
parse(response);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
if ((testName.contains("RX_gain_2412_C0_I[1]")) && (!GlobVars.limFlag)) {
low_limit = Double.parseDouble(GlobVars.adjlolimStr);
high_limit = Double.parseDouble(GlobVars.adjhilimStr);
}
System.out.println(testName + ": TestingText= " + testtext);
System.out.println(testName + ": low_limit= " + low_limit);
System.out.println(testName + ": high_limit= " + high_limit);
System.out.println("Simulated Test Value Data ");
System.out.println(
"testname = " + testName + " lower = " + low_limit + " upper = " + high_limit
+ " units = " + unit + " raw_value = " + raw_value + " index = " + index);
MultiSiteDouble rawResult = new MultiSiteDouble();
for (int site : context.getActiveSites()) {
rawResult.set(site, raw_value);
}
/** This performs datalog limit evaluation and p/f result and EDL datalogging */
pTD.setHighLimit(high_limit);
pTD.setLowLimit(low_limit);
pTD.evaluate(rawResult);
if (testName.contains("RX_gain_2412_C0_I[1]")) {
if (GlobVars.c0_index < (GlobVars.c0_current_values.size() - 1)) {
GlobVars.c0_index++;
} else {
GlobVars.c0_index = 0;
}
}
if (testName.contains("RX_gain_2412_C1_I[1]")) {
if (GlobVars.c1_index < (GlobVars.c1_current_values.size() - 1)) {
GlobVars.c1_index++;
} else {
GlobVars.c1_index = 0;
}
}
if (pTD.getPassFail().get()) {
System.out.println(
"Sim Value Test " + testName + "\n************ PASSED **************\n");
} else {
System.out.println(testName + "\n************ FAILED *****************\n");
}
if (testName.contains("RX_gain_2412_C0_I[1]")) {
// HTTP performance data: HealthCheck, POST, GET transactions
System.out.println("************ Nexus BiDir Performance Data ******************");
System.out.println("HealthCheck: Host to App Time= "+GlobVars.perf_times.get("Health_h-a_time"));
System.out.println("HealthCheck: App to Host Time= "+GlobVars.perf_times.get("Health_a-h_time"));
System.out.println("HealthCheck: Round-Trip Time= "+GlobVars.perf_times.get("Health_rtd_time"));
System.out.println("TPSend: Host to App Time= "+GlobVars.perf_times.get("Send_h-a_time"));
System.out.println("TPSend: App to Host Time= "+GlobVars.perf_times.get("Send_a-h_time"));
System.out.println("TPSend: Round-Trip Time= "+GlobVars.perf_times.get("Send_rtd_time"));
System.out.println("TPRequest: Host to App Time= "+GlobVars.perf_times.get("Request_h-a_time"));
System.out.println("TPRequest: App to Host Time= "+GlobVars.perf_times.get("Request_a-h_time"));
System.out.println("TPRequest: Round-Trip Time= "+GlobVars.perf_times.get("Request_rtd_time"));
System.out.println("*****************************************************");
}
// Clear large strings used for http post
packetValStr = "";
sku_name = "";
ecid_ptr++;
if (ecid_ptr >= ecid_strs.length) {
ecid_ptr = 0;
}
}
public static String parse(String responseBody) {
if (responseBody.contains("MSFT_ECID")) {
JSONObject results = new JSONObject(responseBody);
String ecid = results.getString("MSFT_ECID");
System.out.println("MSFT_ECID: " + ecid);
String testName = results.getString("testname");
System.out.println("testname: " + testName);
GlobVars.adjlolimStr = results.getString("AdjLoLim");
System.out.println("AdjLoLim: " + GlobVars.adjlolimStr);
GlobVars.adjhilimStr = results.getString("AdjHiLim");
System.out.println("AdjHiLim: " + GlobVars.adjhilimStr);
String sndappTime = results.getString("SendAppTime");
System.out.println("SendAppTime: " + sndappTime);
String sretappTime = results.getString("SendAppRetTime");
System.out.println("SendAppRetTime: " + sretappTime);
String rappTime = results.getString("RqstAppTime");
System.out.println("RqstAppTime: " + rappTime);
String rretappTime = results.getString("RqstAppRetTime");
System.out.println("RqstAppRetTime: " + rretappTime);
long atime = Long.parseLong(sndappTime);
long artime = Long.parseLong(sretappTime);
long ratime = Long.parseLong(rappTime);
long rartime = Long.parseLong(rretappTime);
double deltaSndTime = (atime - GlobVars.ssnanoSeconds)/1000000.0; // Delta time in msec
GlobVars.perf_times.put("Send_h-a_time", deltaSndTime);
System.out.println("Delta TPSend h-a app-host time: "+deltaSndTime+" msec");
double deltaSndRetTime = (GlobVars.senanoSeconds - artime)/1000000.0; // Delta time in msec
GlobVars.perf_times.put("Send_a-h_time", deltaSndRetTime);
System.out.println("Delta TPSend a-h time: "+deltaSndRetTime+" msec");
double sendDeltaTime = (GlobVars.senanoSeconds - GlobVars.ssnanoSeconds)/1000000.0; // Delta time in msec
GlobVars.perf_times.put("Send_rtd_time", sendDeltaTime);
System.out.println("Delta TPSend transaction time: "+sendDeltaTime+" msec");
double deltaRqstTime = (ratime - GlobVars.rsnanoSeconds)/1000000.0; // Delta time in msec
GlobVars.perf_times.put("Request_h-a_time", deltaRqstTime);
System.out.println("Delta TPRequest h-a time: "+deltaRqstTime+" msec");
double deltaRqstRetTime = (GlobVars.renanoSeconds - rartime)/1000000.0; // Delta time in msec
GlobVars.perf_times.put("Request_a-h_time", deltaRqstRetTime);
System.out.println("Delta TPRequest a-h time: "+deltaRqstRetTime+" msec");
double gadeltaTime = (GlobVars.renanoSeconds - GlobVars.rsnanoSeconds)/1000000.0; // Delta time in msec
GlobVars.perf_times.put("Request_rtd_time", gadeltaTime);
// GlobVars.rsnanoSeconds = 0; // clear for next device
}
return responseBody;
}
public static String hparse(String responseBody) {
String hatime = "0";
if (responseBody.contains("health")) {
JSONObject results = new JSONObject(responseBody);
hatime = results.getString("health");
long hatimeval = Long.parseLong(hatime);
System.out.println("health: " + hatime);
double hsdeltaTime = (hatimeval - GlobVars.hsnanoSeconds)/1000000.0; // Delta time in msec
double hrdeltaTime = (GlobVars.henanoSeconds - hatimeval)/1000000.0; // Delta time in msec
double rhdeltaTime = (GlobVars.henanoSeconds - GlobVars.hsnanoSeconds)/1000000.0; // Delta time in msec
GlobVars.perf_times.put("Health_h-a_time", hsdeltaTime);
GlobVars.perf_times.put("Health_a-h_time", hrdeltaTime);
GlobVars.perf_times.put("Health_rtd_time", rhdeltaTime);
System.out.println("Delta health h-a time: "+hsdeltaTime+" msec");
System.out.println("Delta health a-h time: "+hrdeltaTime+" msec");
System.out.println("Delta health rtd time: "+rhdeltaTime+" msec");
}
return hatime;
}
public String buildPacketString() {
int packetSel = GlobVars.packetSelector;
int[] cnt_values = { 203, 1000, 10000, 100000, 1000000,
2000000, 3000000, 4000000, 5000000,
6000000, 7000000, 8000000, 9000000,
10000000, 20000000, 40000000, 60000000, 80000000, 100000000,
120000000, 140000000, 160000000, 180000000, 200000000, 220000000,
240000000, 260000000, 280000000, 300000000};
System.out.println("PacketID: " + packetSel);
String pktStr = "";
int chrcnt = cnt_values[packetSel] - 200; // was 146;
System.out.println("Chrcnt: " + chrcnt);
String valstr = "";
String BUILDCHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
StringBuilder packetstr = new StringBuilder();
Random rnd = new Random();
while (packetstr.length() < chrcnt) { // length of the random string.
int index = (int) (rnd.nextFloat() * BUILDCHARS.length());
packetstr.append(BUILDCHARS.charAt(index));
}
pktStr = packetstr.toString();
GlobVars.packetStr = pktStr;
return pktStr;
}
}
Visualize Results#
To witness the limit changes, open the Result perspective in SmarTest8. In the Test tab, the new limits will be displayed:
Quit SmarTest#
- About this tutorial
- Compatibility and Prerequisites
- Environment Preparation
- Application Deployment
- Transfer demo program to the Host Controller
- Create Docker image for DPAT app
- Create App descriptor project on the Container Hub
- Replicate the new created App Descriptor to Unified Server on Harbor
- Submit feedback for app descriptor running request.
- Configure acs_nexus service
- Running the SmarTest Test Program
- Visualize Results
- Quit SmarTest