Scala: Introductory Study Notes

I usually write notes when I am studying or learning new skills, when I was in University my classmates copied my notes to help them with their learning. That is why I am sharing my notes maybe it will help you.

Scala Basics

Scala “scalable language” runs on the Java VM and interoperates with all Java libraries. You can use Scala for writing scripts or for building large systems and frameworks. Scala is a blend of object orient and functional programming concepts. Scala’s functional programming constructs make it easy to build logic quickly from simple parts while its object-oriented constructs make it easy to structure large system and adapt them to new requirements.

Functional programming is guided by two main ideas:

  1. Functions are first class values. This means a function is a value same as integer or string. A function can passed as an argument to another function, returned as a result from a function.
  2. Operations of should map input values to output values rather than change data in place. This means methods should not have side effects. All data types and objects should be immutable.

Setting Up Scala Development Environment


Figure 1: Where to download scala

Best way to learn is by writing some code. In this section, we will go through a quick tutorial of Scala basics. To run the code samples, you should have standard Scala installation. You can get it from http://www.scala-lang.org. You can also use a Scala plug-in for Eclipse, IntelliJ or NetBeans.

Scala Interpreter


Figure 2: Scala Shell Interpreter

Scala comes with an interactive “shell” for writing Scala expressions and programs. The interpreter will evaluate expression you type and print the resulting value.

scala> 567*8+20

res0: Int = 4556

Scala Variables

Scala has two kinds of variables:

  1. val: is similar to final variable in Java or readonly in C#. once initialized it can never be reassigned.

scala> val str = “This is a book about Scala”

str: String = This is a book about Scala

scala> str = “change this string”

<console>:12: error: reassignment to val

str = “change this string”

  1. var: is similar to a non-final or non readonly variable in Java and C# respectively.

scala> var m = “This is a book about Scala”

m: String = This is a book about Scala

scala> m = “Change this string”

m: String = Change this string

Notice that in both examples, I did not define the type. Scala’s ability to infer types makes the code less cluttered with unnecessary explicit type annotations. Though an explicit type annotation can both ensure the Scala compiler infer the type you intend as well as serve as a useful documentation for the readers of the code. In Scala, you specify a variable type after is name separated by a colon.

scala> val s:String = “Hello Scala Book”

s: String = Hello Scala Book

Scala Functions

scala> def max(x:Int, y: Int): Int = {

| if(x > y) x

| else y

| }

max: (x: Int, y: Int)Int

scala> max(99,767)

res2: Int = 767

Functions definitions starts with “def”, followed by the function’s name, followed by comma-separated list of parameters in parentheses. A type annotation must follow each parameter, because Scala does not infer function parameter types. After the close parenthesis another type annotation that defines the result type of the function. You can leave the result type and Scala will infer it.

while and if constructs

var i = 0

i: Int = 0

scala> while(i < 20){

| if(i >0) println(i)

| i += 1

| }

1

2


19

While (condition) statement

The while will execute the statement or block of statements (complex statement) contained within “{}” as long as the condition is true.

If(condition) statement1 else statement2

If will execute statement1 if the condition is true. If there is an else then statement2 will get executed if the condition is false.

Arrays

scala> val big = new java.math.BigInteger(“3728”)

big: java.math.BigInteger = 3728

scala> val g = new Array[String](5)

g: Array[String] = Array(null, null, null, null, null)

scala> g(0) = “Hi”

scala> g(1) = ” ”

scala> g(2) = ” Scala”

scala> g(3) = ” Says Functional”

scala> g(4) = ” is the way to go”

You instantiate objects using the “new” keyword. You can configure the object instance when you create it. For example as shown in the code above val big with instantiated with BigInteger with value of 3728. And g was created as an array of type string with 5 elements.

Since functions are first class constructs in Scala, you can pass the function literal that takes parameter to another function. If the function literal takes a single parameter, you need not name specify the argument.

scala> g.foreach(print)

Hi Scala Says Functional is the way to go

apply & update methods

In Scala, when you apply parentheses surrounding one or more values to a variable, Scala will transform the code into an invocation of a method named apply on that variable. So g(0) is transformed to g.apply(0). Any application of an object to some arguments in parentheses will be transformed to an apply method call. This will compile only if the type of the object defines an apply method. When an assignment is made to a variable to which parentheses and one or more arguments have been applies Scala will transform that into an invocation of an update method that takes the arguments in parentheses. So g(0) = “hello” will be transformed into g.update(0,”Hello”)

Lists

Scala array is a mutable sequence of objects that all share the same type. And Array[String] contains only string. Although you can’t change the length on an array after is instantiated. You can change its elements values. For an immutable sequence of objects that share the same type you can use Scala’s List class. List[String] can contain only strings.

scala> val a = List(1,2,3)

a: List[Int] = List(1, 2, 3)

scala> val b = List(7,8,3)

b: List[Int] = List(7, 8, 3)

scala> val ab = a ::: b

ab: List[Int] = List(1, 2, 3, 7, 8, 3)

scala> val s = 1::b

s: List[Int] = List(1, 7, 8, 3)

The code above establishes a new val named a initialized with a new List[Int] with values 1,2, and 3. Note that List.apply() is defined as a factory method in Scala. When you call a method on a List that might mutate the list, it will creates and returns a new list with the new value. For example the ::: method is for list concatenation.

scala> val k = 1 :: 2 :: 8 :: 9 :: Nil

k: List[Int] = List(1, 2, 8, 9)

Note, the method :: is method on its right operand in Scala, this issue will be discussed in more detail later.

Tuples

Tuple are a useful object contain. Tuples are immutable and contain different types of elements. Tuples are similar to C language struct construct. Though you do define the tuple structure in advance. Typically you use tuples when you need to return multiple objects from a method.

scala> val complexTuple = (78,”Rony”, “Alicante”, “Spain”)

complexTuple: (Int, String, String, String) = (78,Rony,Alicante,Spain)

scala> println(complexTuple._1)

78

scala> println(complexTuple._3)

Alicante

Once you have a tuple instantiated, you can access its elements individually with a dot, underscore, and the one-based index of the element, as shown in the example above.

Sets and Maps

Scala provides mutable and immutable alternatives for sets and maps, both use the same simple names but they are in different packages or namespaces. The default is the immutable set as shown in the listing below. The ‘+=’ operation returns a new set that is why the operation succeeds with var and fails with val.

scala> var set1 = Set(“Alicante”,”Granada”)

set1: scala.collection.immutable.Set[String] = Set(Alicante, Granada)

scala> var set2 = set1

set2: scala.collection.immutable.Set[String] = Set(Alicante, Granada)

scala> set2 += “Barcelona”

scala> set1

res20: scala.collection.immutable.Set[String] = Set(Alicante, Granada)

scala> set2

res21: scala.collection.immutable.Set[String] = Set(Alicante, Granada, Barceolona)

scala> val set3 = set1

set3: scala.collection.immutable.Set[String] = Set(Alicante, Granada)

scala> set3 += “Barcelona”

<console>:13: error: value += is not a member of scala.collection.immutable.Set[String]

Expression does not convert to assignment because receiver is not assignable.

set3 += “Barcelona”

to use the mutable version you import the scala.collection.mutable package as shown in the listing below

scala> import scala.collection.mutable

import scala.collection.mutable

scala> val mutableset = mutable.Set(“Alicante”,”Granada”)

mutableset: scala.collection.mutable.Set[String] = Set(Alicante, Granada)

scala> mutableset += “Barceolona”

res24: mutableset.type = Set(Alicante, Barceolona, Granada)

Now the ‘+=’ operator works with ‘val’ as the set is modified. The listing below shows similar operations for Map. To create a mutable Map use scala.collection.mutable package Map class implementation. The default Map uses the immutable implementation.

scala> val areaCode = mutable.Map[Int,String]()

areaCode: scala.collection.mutable.Map[Int,String] = Map()

scala> areaCode += (416-> “Toronto”)

res25: areaCode.type = Map(416 -> Toronto)

scala> areaCode += (905-> “GTA”)

res26: areaCode.type = Map(905 -> GTA, 416 -> Toronto)

scala> areaCode += (613-> “Calgary”)

res27: areaCode.type = Map(905 -> GTA, 613 -> Calgary, 416 -> Toronto)

scala> val stars = Map( 1->”*”,2->”**”,3->”***”,4->”****”,5->”*****”)

stars: scala.collection.immutable.Map[Int,String] = Map(5 -> *****, 1 -> *, 2 -> **, 3 -> ***, 4 -> ****)

scala> println(stars(3))

***

Semicolon Inference

In Scala, a semicolon at the end of a statement is usually optional. You can type one if you want but you do not have to if the statement appears by itself on a single line. On the other hand, a semicolon is required if you write multiple statements on a single line:

scala> val s = “Moustafa”; println(s)

scala> if (x <2)

| println(“too small”) else

| println(“Ok”)

Control Structures

Scala has the following control structures:

  1. if: if works like in Java and C#, it tests a condition and then executes one of two code branches depending on whether the condition holds true.

scala> var args:Array[String] = Array()

args: Array[String] = Array()

scala> val fileName = if(!args.isEmpty) args(0) else “default.txt”

fileName: String = default.txt

  1. while: Just like in C# and Java while has a conditions and a body. The body is executed over and over again as long as the condition holds true.

scala> var a = 20

a: Int = 20

scala> var b = 30 ;

b: Int = 30

scala> while (a != 0) { val temp = a

| a = b% a

| b = temp

| }

scala> b

res1: Int = 10

another form of the while loop is the do-while where the code block is executed at least once and keeps on executing as long as the while condition is true

do {

| line = readLine()

| println(” Read: “+ line)

| } while (line !=””)

  1. for: Scala’s for expression lets you combine a few simple ingredients in different ways to express a wide variety of iterations.
    1. Iteration through collections

val filesHere = (new java.io.File(“.”)).listFiles

for(file <- filesHere) println(file)

for ( i<- 1 to 4) prinrln(“Iterntion “+i)

  1. Filtering

val filesHere = (new java.io.File(“.”)).listFiles

for(file <- filesHere

if file.isFile

if file.getName.endsWith(“.scala”)) println(file)

  1. Nested Iteration

def fileLines(file: java.io.File) =
scala.io.Source.fromFile(file).getLines().toList

    def grep(pattern: String) =
								for (
    file <- filesHere
											if file.getName.endsWith(".scala");
    line <- fileLines(file)
															if line.trim.matches(pattern) 
  ) println(file + ": " + line.trim)
  1. Mid-stream variable bindings same as example above

  • Producing a new collection

 

val filesHere = (new java.io.File(“.”)).listFiles

val forLineLengths =

for {
file <- filesHere

if file.getName.endsWith(“.scala”)
line <- fileLines(file)
trimmed = line.trim

if trimmed.matches(“.*for.*”)
} yield trimmed.length

To generate a new value for each iteration in a for expression prefix the for expression body by the keyword ‘yield’. As in the example above each time the body of the for expression executes it produces one value in this case the length of the trimmed line.

  1. Try: this issued for handling exceptions.

def throws1 {

throw new IllegalArgumentException
}

def throws3 {

import java.io.FileReader

import java.io.FileNotFoundException

import java.io.IOException

try {

val f = new FileReader(“input.txt”)

// Use and close file

println(“f [” + f + “]”)
} catch {

case ex: FileNotFoundException => // Handle missing file

println(“ex [” + ex + “]”)

case ex: IOException => // Handle other I/O error

println(“ex [” + ex + “]”)
}

finally {
file.close() // Be sure to close the file
}


}

  1. Match: Scala match expression is similar to C# and Java switch statement. With default case si specified with an underscore (_) and there is no flow from case to case so no need for a break statement
def match2(args: Array[String]) {
								val firstArg = if (!args.isEmpty) args(0) else ""
																	
																	val friend =
    firstArg match {
																		case "salt" => "pepper"
																						case "chips" => "salsa"
																								case "eggs" => "bacon"
																										case _ => "huh?"
																											}           
																												
																												println(friend)
}
  1. function calls.

Almost all of Scala’s control structures result in some value even try expressions

def urlFor(path: String) =
								try {
										new URL(path)
  } catch {
														case e: MalformedURLException =>
																new URL("http://www.scala-lang.org")
  }

Scala does not have break and continue because they do not mesh well with functional literals.

Advertisements

Kafka Administration App

I have been working with Kafka lately, I am building a microservices solution that utilizes Kafka for various tasks. I wanted a simple administration tool to help me with simple tasks for managing the Kafka environment on my development environments. Basically, I wanted the tool to do the following:

  1. Cluster
    1. Describe the Kafka cluster
    2. Describe the configuration
    3. Change the configuration of the cluster
    4. Manage ACLs
      1. Create
      2. Delete
  • Describe
  1. Manage Delegation Tokens
    1. Create a delegation token
    2. Describe delegation token
  • Renew delegation token
  1. Expire delegation token

 

  1. Manage Topics
    1. List topics
    2. Describe topics
    3. Create topics
    4. Delete Topics
    5. Increase the number of topic partitions
  2. Manage Consumer Groups
    1. Lists consumers and the topics they subscribed too
    2. List consumers offsets

 

And the list can go on and on, to simplify the code I started this as a console application in Scala, with IntelliJ community edition. I have not finished the tool yet, but I got it to a usable level that is helping me in my most day to day tasks.

I thought it might be helpful to share the code maybe it helps others. Please do not hesitate to contact me with your comments or thoughts about this tool.

Figure 1 shows the project structure.

091918_2055_KafkaAdmini1.png

Figure 1: Project Structure

When you run the tool you get the following main menu

********************************************************************************

*****    Kafka Admin Tool                                                  *****

*****    (c) Genetic Thought Software Inc 2018                             *****

********************************************************************************

*****    Note: set Kafka environment in environment.properties             *****

*****      Connecting to server: localhost:9092                            *****

*****      Connected to server: localhost:9092                             *****

********************************************************************************

*****    Commands:                                                         *****

*****      1- Describe Cluster                                             *****

*****      2- Topics                                                       *****

*****      3- Consumers                                                    *****

*****      4- Exit                                                         *****

********************************************************************************

*****        Enter selection:                                              *****

 

If for example you selected the Topics menu you get the following menu

********************************************************************************

*****    Topics Commands:                                                  *****

*****      1- List Topics                                                  *****

*****      2- Describe Topics                                              *****

*****      3- Create Topics                                                *****

*****      4- Delete Topic                                                 *****

*****      5- Increase Topic Partitions                                    *****

*****      6- Return                                                       *****

********************************************************************************

*****        Enter selection:                                              *****

1

*****      Topics in this cluster are:                                     *****

*****        Topic: US                                                     *****

*****        Topic: China                                                  *****

*****        Topic: Mexico                                                 *****

*****        Topic: Korai                                                  *****

*****        Topic: Canada                                                 *****

*****        Topic: India                                                  *****

*****        Topic: Italy                                                  *****

*****        Topic: Germany                                                *****

********************************************************************************

 

 

The mainApp is the objects that extends the App trait, here I just setup the logger and load the properties form environment.properties file. Where you set

bootstrap.servers to Kafka node. Initialize the KafkaAdmin object admin and start the controller mainMenu which controls the rest of the lifecycle till the user selects exit.

 

/*   *********************************************************************
(c) Genetic Thought Software Inc.
*********************************************************************
Author: Moustafa Refaat
email: MRefaat@GeneticThought.com
**********************************************************************/

import com.geneticthought.Kafka.Administration.KafkaAdmin
import com.geneticthought.Utilities.{Helpers, Printer}
import org.slf4j.LoggerFactory

object mainApp extends App {

val logger = LoggerFactory.getLogger(“MainApp”)
logger.info(“Starting”)
val props = Helpers.getProperties(“environment.properties”)
Printer.printHeader
Printer.printBoxedString(s”Connecting to server: ” + props.getProperty(“bootstrap.servers”), 1)
val admin = new KafkaAdmin(props)
Printer.printBoxedString(s”Connected to server: ” + props.getProperty(“bootstrap.servers”), 1)
Controller.mainMenu(admin)
Printer.printBoxedString(s”Disconnecting from server: ” + props.getProperty(“bootstrap.servers”), 1)
admin.close
}

 

 

Below is the Controller Object Listing which is very simple implementation that take advantage of some of the Scala functional capabilities

*   *********************************************************************
(c) Genetic Thought Software Inc.
*********************************************************************
Author: Moustafa Refaat
email: MRefaat@GeneticThought.com
**********************************************************************/

import com.geneticthought.Kafka.Administration.KafkaAdmin
import com.geneticthought.Utilities.{Helpers, Printer}

import scala.io.StdIn

object Controller {

def mainMenu(admin: KafkaAdmin): Unit = menu(4, admin, Printer.printCommandsMenu)(mainMenuCommands)

def menu(exCommand: Int, admin: KafkaAdmin, mn: () => Unit)(cmd: (Int, KafkaAdmin) => Unit): Unit = {
var command: Int = 0
while (command != exCommand) {
mn()
Printer.printBoxedString(“Enter selection: “, 2)
var str = StdIn.readLine()
command = Helpers.toInt(str)
cmd(command, admin)
}
}

def mainMenuCommands(cmd: Int, admin: KafkaAdmin): Unit = {
cmd match {
case 1 => describeCluster(admin)
case 2 => TopicsCommand(admin)
case 3 => ConsumerCommands(admin)
case 4 => Printer.printBoxedString(“Exiting “, 2)
case _ => Printer.printBoxedString(“Invalid selection: “, 2)
}
}

def describeCluster(admin: KafkaAdmin): Unit = {
val cluster = admin.descibeCluster
println(cluster.toString)
}

def TopicsCommand(admin: KafkaAdmin): Unit = menu(6, admin, Printer.printTopicsMenu)(topicsMenuCommands)

def topicsMenuCommands(cmd: Int, admin: KafkaAdmin): Unit = {
cmd match {
case 1 => listTopics(admin)
case 2 => describeTopics(admin)
case 3 => createTopics(admin)
case 4 => deleteTopics(admin)
case 5 => increaseTopicPartations(admin)
case 6 => Printer.printBoxedString(“Return to main Menu “, 2)
case _ => Printer.printBoxedString(“Invalid selection: “, 2)
}
}

def listTopics(admin: KafkaAdmin): Unit = {
val topics = admin.getTopics
if (topics.size > 0) Printer.printBoxedString(“Topics in this cluster are:”, 1)
else Printer.printBoxedString(“No Topics in this cluster defined yet”, 1)
for (x <- admin.getTopics) Printer.printBoxedString(s”Topic: $x, 2)
}

def describeTopics(admin: KafkaAdmin): Unit = {
val topics = getTopics
if (topics.size <= 0) {
Printer.printBoxedString(s”No Topics to Describe:$topics , 2)
return
}
Printer.printBoxedString(s”Describe Topics:$topics , 2)

val tpdes = admin.describeTopics(topics)
for (tpd <- tpdes)
Printer.printBoxedString(tpd.toString, 4)
Printer.printBoxedString(s”Describe Topics:$topics  Done”, 2)
}

def createTopics(admin: KafkaAdmin): Unit = {
val topics = getTopics.map(x => (x, 1, 1.toShort)).toList
if (topics.size <= 0) {
Printer.printBoxedString(s”No Topics to Create:$topics , 2)
return
}
admin.createTopics(topics)

Printer.printBoxedString(s”Created Topics: “, 2)
for (t <- topics) Printer.printBoxedString(s”(topicName,Partions,Replication):$t , 4)

}

def deleteTopics(admin: KafkaAdmin): Unit = {
val topics = getTopics
if (topics.size <= 0) {
Printer.printBoxedString(s”No Topics to Delete:$topics , 2)
return
}
admin.deleteTopics(topics)
Printer.printBoxedString(s”Deleted Topics:$topics , 2)
}

def increaseTopicPartations(admin: KafkaAdmin): Unit = {
val topics = getTopics
if (topics.size <= 0) {
Printer.printBoxedString(s”No Topics to Increse Partitions:$topics , 2)
return
}
Printer.printBoxedString(“Enter Total Patitions No: “, 2)
var str = StdIn.readLine()
val partNo = Helpers.toInt(str)
if (partNo <= 1) {
Printer.printBoxedString(s”Partitions Number has to be ?> 1 you entered :$partNo , 2)
return
}
val topicsIncrease: Map[String, Int] = topics.map(x => x -> partNo).toMap
admin.increasePartitions(topicsIncrease)
Printer.printBoxedString(s”Increased Topics $topics partitions to :$partNo , 2)
}

def getTopics: List[String] = {
var cont = true
var
result: List[String] = null
do
try
{
Printer.printBoxedString(“Enter Topics names seperated by comma”, 1)
val input = StdIn.readLine() + “,”
result = input.split(“,”).toList
cont = false
}
catch {
case _ => Printer.printBoxedString(“Invalid Topics List”, 2)

}
while (cont)

result
}

def ConsumerCommands(admin: KafkaAdmin): Unit = menu(2, admin, Printer.printConsumersMenu)(consumersMenuCommands)

def consumersMenuCommands(cmd: Int, admin: KafkaAdmin): Unit = {
cmd match {
case 1 => listConsumers(admin)
case 2 => Printer.printBoxedString(“Return to main Menu “, 2)
case _ => Printer.printBoxedString(“Invalid selection: “, 2)
}
}

def listConsumers(admin: KafkaAdmin): Unit = {

val consumers = admin.listConsumers
Printer.printBoxedString(s”Consumers List: “, 2)
consumers.foreach(x => Printer.printBoxedString(x, 4))
Printer.printBoxedString(s”end of Consumers List “, 2)
}

}

 

Below is the KafkaAdmin Class Listing which is implementing the main functionality of performing actions/queries against Kafka cluster

*   *********************************************************************
(c) Genetic Thought Software Inc.
*********************************************************************
Author: Moustafa Refaat
email: MRefaat@GeneticThought.com
**********************************************************************/

package com.geneticthought.Kafka.Administration

import java.util.Properties

import org.apache.kafka.clients.admin._

import scala.collection.JavaConverters

class KafkaAdmin(val props: Properties) {

private val client: AdminClient = AdminClient.create(props)

def close: Unit = this.client.close()

/* Cluster operations */
def descibeCluster: ClusterInfo = new ClusterInfo(this.client.describeCluster())

/* Topics region */
def getTopics: Set[String] = JavaConverters.asScalaSet(this.client.listTopics.names.get()).toSet

def createTopics(topics: List[(String, Int, Short)]): Unit = {
val kafkaTopics = scala.collection.mutable.ArrayBuffer[NewTopic]()
for (topic <- topics) kafkaTopics += new NewTopic(topic._1, topic._2, topic._3)
val opresult = this.client.createTopics(JavaConverters.asJavaCollection(kafkaTopics))
opresult.all()
}

def deleteTopics(topics: List[String]): Unit = {
val result = this.client.deleteTopics(JavaConverters.asJavaCollection(topics))
result.all()
}

def describeTopics(topics: List[String]): Iterable[TopicDescription] = {
val describeResult = this.client.describeTopics(JavaConverters.asJavaCollection(topics))
val topicsdesc = describeResult.all().get()
JavaConverters.collectionAsScalaIterable(topicsdesc.values())
}

def increasePartitions(partitions: Map[String, Int]): Unit = {

val partionsRequest: scala.collection.mutable.ListMap[String, NewPartitions] =
new scala.collection.mutable.ListMap[String, NewPartitions]()
for ((k, v) <- partitions) {
partionsRequest += (k -> NewPartitions.increaseTo(v))
}
val requestReslut = this.client.createPartitions(JavaConverters.mutableMapAsJavaMap(partionsRequest))
requestReslut.all()
}

/* Configuration region */
/* Records region */
/* consumers region */
def listConsumers: List[String] = {
val consumers = this.client.listConsumerGroups().all().get().toArray()
consumers.map(x => x.toString).toList
}

}

 

The rest of the code is available for download.

 

Hope this helps